spring|nacos 概述

nacos 概述
官网:https://nacos.io/zh-cn/docs/what-is-nacos.html



概述

nacos可充当服务注册中心、配置中心,整体架构如下:
spring|nacos 概述
文章图片

provider app:服务提供方,向nacos注册服务信息及元数据
consumer app:服务消费方,从nacos获取服务提供方信息,发起服务调用
open api:nacos提供了api与客户端交互,通过api注册删除更新服务信息、配置信息等
config service:配置服务,存储应用的配置信息,实现配置数据的集中管理与动态更新
name service:对象名和其关联的元数据的映射管理,服务发现和 DNS 就是名字服务的2大场景
consistency protocol:一致性协议,nacos可集群部署实现高可用,使用raft协议选主与数据局同步
nacos console:nacos提供了控制台,方便用户查看、修改nacos中的数据



raft 协议

*********
选主

spring|nacos 概述
文章图片

正常情况下,集群节点只有两种状态:leader、follower,当leader故障之后,触发选主过程;
每个follower节点都有一个倒计时器(150-300ms之间的随机数),当收到leader心跳数据,重置倒计时器,此时由于leader故障,follower收不到重置请求,转变为candidate状态,向其他节点发出投票请求;
其他follower节点收到投票请求后,会比较当前节点与投票请求节点的日志数据,如果投票请求节点的日志数据更新,则同意投票,否则拒绝投票;
如果投票请求节点获取超过半数投票,则成为主节点;否则发起新一轮投票(其他follower节点由于收不到leader节点的心跳数据,也会转变成为candidate,也会发出投票请求),直至选出主节点为止

【spring|nacos 概述】*********
数据同步

spring|nacos 概述
文章图片


正常写入过程:
集群中的任意节点收到数据写入请求,将写入请求转发给leader;
leader先在本地写入,然后将数据发送给follower;
follower将写入状态发送给leader,写入不成功的,leader会不断发送数据进行重试,
半数follower写入成功之后,leader提交数据,并向客户端返回数据写入状态

leader节点故障,恢复后转变为follower数据恢复:
如果数据不一致(旧的leader写入,其余follower都未写入的数据),需要先做日志截断,再做数据同步;
如果旧的主节点数据落后太多,需要先根据新的leader节点的快照数据恢复数据,再做数据同步



注册中心

*********
基本功能

服务提供者在启动时向注册中心注册服务信息,关闭时注销服务信息
服务消费者可从注册中心获取可用的服务提供方信息
注册中心检查服务提供者可用性,下线不可用的服务提供者

*********
nacos 实现原理

spring|nacos 概述
文章图片

服务提供者通过open api注册服务信息,并定时发送心跳数据
服务消费者通过open api获取服务提供方信息,并通过定时任务每隔10s拉取一次数据;
nacos server如果检测到服务提供方出现异常,会将异常信息推送给服务消费者

服务健康检查:服务提供端定时发送心跳、nacos server定时检查
spring|nacos 概述
文章图片

服务提供端(客户端):通过定时任务向nacos server发送心跳数据,并启动线程不断检测nacos server的响应,如果超时没有收到回应,则认为nacos server故障
nacos server(服务端):nacos server会开启定时任务检查服务的最后更新时间,当收到客户端的心跳数据后,会更新服务的最后心跳时间,如果超时没有更新,则将服务标记为不健康状态,同时将服务异常信息推送给服务消费端

消费端服务动态感知:消费端定时拉取数据、nacos server异常推送
spring|nacos 概述
文章图片

服务消费端:消费端拉取数据时会注册事件监听,每隔10s拉取最新数据
nacos server:nacos server与服务提供端有心跳检测机制,会检查服务健康状态,当检测到服务异常之后,会将异常信息推送给消费端;当消费端收到异常推送后,会更新本地存储的服务信息

服务注册调用栈

# NacosServiceRegistry.register注册服务 at com.alibaba.cloud.nacos.registry.NacosServiceRegistry.register(NacosServiceRegistry.java:58)# NacosServiceRegistry方法反射增强 at com.alibaba.cloud.nacos.registry.NacosServiceRegistry$$FastClassBySpringCGLIB$$ca3a8dbd.invoke(:-1) at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) at org.springframework.aop.aspectj.AspectJAfterAdvice.invoke(AspectJAfterAdvice.java:47) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) at org.springframework.aop.framework.adapter.MethodBeforeAdviceInterceptor.invoke(MethodBeforeAdviceInterceptor.java:56) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:95) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691) at com.alibaba.cloud.nacos.registry.NacosServiceRegistry$$EnhancerBySpringCGLIB$$87b4ef86.register(:-1)# AbstractAutoServiceRegistration类 at org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration.register(AbstractAutoServiceRegistration.java:239)# NacosAutoServiceRegistration类 at com.alibaba.cloud.nacos.registry.NacosAutoServiceRegistration.register(NacosAutoServiceRegistration.java:76)# AbstractAutoServiceRegistration类 at org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration.start(AbstractAutoServiceRegistration.java:138) at org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration.bind(AbstractAutoServiceRegistration.java:101) at org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration.onApplicationEvent(AbstractAutoServiceRegistration.java:88) at org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration.onApplicationEvent(AbstractAutoServiceRegistration.java:47)# SimpleApplicationEventMulticaster类 at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172) at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165) at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139)# AbstractApplicationContext类 at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:404) at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:361)# WebServerStartStopLifecycle类 at org.springframework.boot.web.servlet.context.WebServerStartStopLifecycle.start(WebServerStartStopLifecycle.java:46)# DefaultLifecycleProcessor类 at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)# AbstractApplicationContext类 at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:895) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554) - locked <0x23ae> (a java.lang.Object)# ServletWebServerApplicationContext类 at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143)# SpringApplication类 at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:405) at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)# 项目启动入口 at com.example.demo.DemoApplication.main(DemoApplication.java:12)


NacosServiceRegistry:nacos服务注册
public class NacosServiceRegistry implements ServiceRegistry { private static final Logger log = LoggerFactory.getLogger(NacosServiceRegistry.class); private final NacosDiscoveryProperties nacosDiscoveryProperties; private NacosServiceManager nacosServiceManager; public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties, NacosServiceManager nacosServiceManager) { this.nacosDiscoveryProperties = nacosDiscoveryProperties; this.nacosServiceManager = nacosServiceManager; }public void register(Registration registration) {//服务注册 if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..."); } else { NamingService namingService = this.namingService(); String serviceId = registration.getServiceId(); String group = this.nacosDiscoveryProperties.getGroup(); Instance instance = this.getNacosInstanceFromRegistration(registration); try { namingService.registerInstance(serviceId, group, instance); //nameService注册服务 log.info("nacos registry, {} {} {}:{} register finished", new Object[]{group, serviceId, instance.getIp(), instance.getPort()}); } catch (Exception var7) { log.error("nacos registry, {} register failed...{},", new Object[]{serviceId, registration.toString(), var7}); ReflectionUtils.rethrowRuntimeException(var7); }} }


NacosNamingService:实际执行服务注册
public class NacosNamingService implements NamingService {public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); if (instance.isEphemeral()) { BeatInfo beatInfo = this.beatReactor.buildBeatInfo(groupedServiceName, instance); this.beatReactor.addBeatInfo(groupedServiceName, beatInfo); }//心跳检测this.serverProxy.registerService(groupedServiceName, groupName, instance); //服务注册 }


BeatReactor:心跳检测
public class BeatReactor implements Closeable {public void addBeatInfo(String serviceName, BeatInfo beatInfo) { LogUtils.NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); String key = this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()); BeatInfo existBeat = null; if ((existBeat = (BeatInfo)this.dom2Beat.remove(key)) != null) { existBeat.setStopped(true); }this.dom2Beat.put(key, beatInfo); this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS); //定时任务,默认每隔5秒执行一次 MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size()); }


NamingProxy:服务注册
public class NamingProxy implements Closeable {public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { //服务注册 LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{this.namespaceId, serviceName, instance}); Map params = new HashMap(16); params.put("namespaceId", this.namespaceId); params.put("serviceName", serviceName); params.put("groupName", groupName); params.put("clusterName", instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JacksonUtils.toJson(instance.getMetadata())); this.reqApi(UtilAndComs.nacosUrlInstance, params, "POST"); }//向nacos server发送post请求,注册服务



服务发现调用栈
# NacosNamingService.selectInstances:获取服务实例 at com.alibaba.nacos.client.naming.NacosNamingService.selectInstances(NacosNamingService.java:308) at com.alibaba.cloud.nacos.discovery.NacosServiceDiscovery.getInstances(NacosServiceDiscovery.java:58) at com.alibaba.cloud.nacos.discovery.NacosDiscoveryClient.getInstances(NacosDiscoveryClient.java:56)# CompositeDiscoveryClient类 at org.springframework.cloud.client.discovery.composite.CompositeDiscoveryClient.getInstances(CompositeDiscoveryClient.java:53)# DubboCloudRegistry类 at com.alibaba.cloud.dubbo.registry.DubboCloudRegistry.doGetServiceInstances(DubboCloudRegistry.java:395) at com.alibaba.cloud.dubbo.registry.DubboCloudRegistry.getServiceInstances(DubboCloudRegistry.java:389) at com.alibaba.cloud.dubbo.registry.DubboCloudRegistry.lambda$null$1(DubboCloudRegistry.java:207) at com.alibaba.cloud.dubbo.registry.DubboCloudRegistry$$Lambda$533.1835848160.get(Unknown Source:-1) at com.alibaba.cloud.dubbo.registry.DubboCloudRegistry.subscribeURLs(DubboCloudRegistry.java:226) at com.alibaba.cloud.dubbo.registry.DubboCloudRegistry.lambda$subscribeURLs$2(DubboCloudRegistry.java:206) at com.alibaba.cloud.dubbo.registry.DubboCloudRegistry$$Lambda$532.737335019.accept(Unknown Source:-1) at java.lang.Iterable.forEach(Iterable.java:75) at java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1080) at com.alibaba.cloud.dubbo.registry.DubboCloudRegistry.subscribeURLs(DubboCloudRegistry.java:204) at com.alibaba.cloud.dubbo.registry.DubboCloudRegistry.subscribeURLs(DubboCloudRegistry.java:184) at com.alibaba.cloud.dubbo.registry.DubboCloudRegistry.doSubscribe(DubboCloudRegistry.java:177)# subscribe操作 at org.apache.dubbo.registry.support.FailbackRegistry.subscribe(FailbackRegistry.java:333) at org.apache.dubbo.registry.ListenerRegistryWrapper.subscribe(ListenerRegistryWrapper.java:105) at org.apache.dubbo.registry.integration.RegistryDirectory.subscribe(RegistryDirectory.java:185)# refer操作 at org.apache.dubbo.registry.integration.RegistryProtocol.doRefer(RegistryProtocol.java:469) at org.apache.dubbo.registry.integration.RegistryProtocol.refer(RegistryProtocol.java:454) at org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper.refer(ProtocolListenerWrapper.java:72) at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper.refer(ProtocolFilterWrapper.java:161) at org.apache.dubbo.qos.protocol.QosProtocolWrapper.refer(QosProtocolWrapper.java:73) at org.apache.dubbo.rpc.Protocol$Adaptive.refer(Protocol$Adaptive.java:-1)# ReferenceConfig类 at org.apache.dubbo.config.ReferenceConfig.createProxy(ReferenceConfig.java:367) at org.apache.dubbo.config.ReferenceConfig.init(ReferenceConfig.java:305) - locked <0x196e> (a org.apache.dubbo.config.spring.ReferenceBean) at org.apache.dubbo.config.ReferenceConfig.get(ReferenceConfig.java:205)# ReferenceAnnotationBeanPostProcessor类 at org.apache.dubbo.config.spring.beans.factory.annotation.ReferenceAnnotationBeanPostProcessor.doGetInjectedBean(ReferenceAnnotationBeanPostProcessor.java:144)# AbstractAnnotationBeanPostProcessor类 at com.alibaba.spring.beans.factory.annotation.AbstractAnnotationBeanPostProcessor.getInjectedObject(AbstractAnnotationBeanPostProcessor.java:359) at com.alibaba.spring.beans.factory.annotation.AbstractAnnotationBeanPostProcessor$AnnotatedFieldElement.inject(AbstractAnnotationBeanPostProcessor.java:539)# InjectionMetadata类 at org.springframework.beans.factory.annotation.InjectionMetadata.inject(InjectionMetadata.java:119)# AbstractAnnotationBeanPostProcessor类 at com.alibaba.spring.beans.factory.annotation.AbstractAnnotationBeanPostProcessor.postProcessPropertyValues(AbstractAnnotationBeanPostProcessor.java:142)# AbstractAutowireCapableBeanFactory类 at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.populateBean(AbstractAutowireCapableBeanFactory.java:1425) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:593) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:516)# AbstractBeanFactory类 at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:324) at org.springframework.beans.factory.support.AbstractBeanFactory$$Lambda$167.1340848245.getObject(Unknown Source:-1)# DefaultSingletonBeanRegistry类 at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) - locked <0x196f> (a java.util.concurrent.ConcurrentHashMap)# AbstractBeanFactory类 at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:322) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202)# DefaultListableBeanFactory类 at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:897)# AbstractApplicationContext类 at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:879) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:551) - locked <0x1970> (a java.lang.Object)# ServletWebServerApplicationContext类 at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143)# SpringApplication类 at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:405) at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)# 项目启动入口 at com.example.demo.DemoApplication.main(DemoApplication.java:12)


NacosNamingService:获取服务实例
public class NacosNamingService implements NamingService {public List selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException { return this.selectInstances(serviceName, groupName, healthy, true); }//获取服务的时候,同时进行服务订阅public List selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe) throws NacosException { return this.selectInstances(serviceName, groupName, new ArrayList(), healthy, subscribe); }public List selectInstances(String serviceName, String groupName, List clusters, boolean healthy, boolean subscribe) throws NacosException { ServiceInfo serviceInfo; if (subscribe) {//如果进行服务订阅 serviceInfo = this.hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } else { serviceInfo = this.hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); }return this.selectInstances(serviceInfo, healthy); }


HostReactor:服务更新、解析nacos server推送的服务异常信息
public class HostReactor implements Closeable {public ServiceInfo processServiceJson(String json) {//解析服务端推送的消息 ServiceInfo serviceInfo = (ServiceInfo)JacksonUtils.toObj(json, ServiceInfo.class); ServiceInfo oldService = (ServiceInfo)this.serviceInfoMap.get(serviceInfo.getKey()); if (serviceInfo.getHosts() != null && serviceInfo.validate()) { boolean changed = false; if (oldService != null) { if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) { LogUtils.NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: " + serviceInfo.getLastRefTime()); }this.serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); Map oldHostMap = new HashMap(oldService.getHosts().size()); Iterator var6 = oldService.getHosts().iterator(); while(var6.hasNext()) { Instance host = (Instance)var6.next(); oldHostMap.put(host.toInetAddr(), host); }Map newHostMap = new HashMap(serviceInfo.getHosts().size()); Iterator var16 = serviceInfo.getHosts().iterator(); while(var16.hasNext()) { Instance host = (Instance)var16.next(); newHostMap.put(host.toInetAddr(), host); }Set modHosts = new HashSet(); Set newHosts = new HashSet(); Set remvHosts = new HashSet(); List newServiceHosts = new ArrayList(newHostMap.entrySet()); Iterator var11 = newServiceHosts.iterator(); while(true) { Entry entry; Instance host; String key; while(var11.hasNext()) { entry = (Entry)var11.next(); host = (Instance)entry.getValue(); key = (String)entry.getKey(); if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(), ((Instance)oldHostMap.get(key)).toString())) { modHosts.add(host); } else if (!oldHostMap.containsKey(key)) { newHosts.add(host); } }var11 = oldHostMap.entrySet().iterator(); while(var11.hasNext()) { entry = (Entry)var11.next(); host = (Instance)entry.getValue(); key = (String)entry.getKey(); if (!newHostMap.containsKey(key) && !newHostMap.containsKey(key)) { remvHosts.add(host); } }if (newHosts.size() > 0) { changed = true; LogUtils.NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(newHosts)); }if (remvHosts.size() > 0) { changed = true; LogUtils.NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(remvHosts)); }if (modHosts.size() > 0) { changed = true; this.updateBeatInfo(modHosts); LogUtils.NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(modHosts)); }serviceInfo.setJsonFromServer(json); if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) { this.eventDispatcher.serviceChanged(serviceInfo); DiskCache.write(serviceInfo, this.cacheDir); } break; } } else { changed = true; LogUtils.NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(serviceInfo.getHosts())); this.serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); this.eventDispatcher.serviceChanged(serviceInfo); serviceInfo.setJsonFromServer(json); DiskCache.write(serviceInfo, this.cacheDir); }MetricsMonitor.getServiceInfoMapSizeMonitor().set((double)this.serviceInfoMap.size()); if (changed) { LogUtils.NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(serviceInfo.getHosts())); }return serviceInfo; } else { return oldService; } }public ServiceInfo getServiceInfo(String serviceName, String clusters) {//定是从服务端拉取最新服务信息 LogUtils.NAMING_LOGGER.debug("failover-mode: " + this.failoverReactor.isFailoverSwitch()); String key = ServiceInfo.getKey(serviceName, clusters); if (this.failoverReactor.isFailoverSwitch()) { return this.failoverReactor.getService(key); } else { ServiceInfo serviceObj = this.getServiceInfo0(serviceName, clusters); if (null == serviceObj) { serviceObj = new ServiceInfo(serviceName, clusters); this.serviceInfoMap.put(serviceObj.getKey(), serviceObj); this.updatingMap.put(serviceName, new Object()); this.updateServiceNow(serviceName, clusters); this.updatingMap.remove(serviceName); } else if (this.updatingMap.containsKey(serviceName)) { synchronized(serviceObj) { try { serviceObj.wait(5000L); //如果有serviceName信息,等待5s更新服务 } catch (InterruptedException var8) { LogUtils.NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, var8); } } }this.scheduleUpdateIfAbsent(serviceName, clusters); //定时任务更新服务信息 return (ServiceInfo)this.serviceInfoMap.get(serviceObj.getKey()); } }




配置中心

spring|nacos 概述
文章图片

配置数据增删改:客户端通过open api向nacos server添加、更新、删除配置数据
配置数据持久化:默认使用derby文件型数据库在本地存储,可使用mysql存储配置数据
配置数据动态监听:客户端使用长轮询定时从nacos server拉取数据,如果有数据更新立即返回;如果没有数据更新,等待一段时间(默认30s),如果在此期间有数据更新立即返回,如果没有数据更新,等待到期后返回空数据
说明:长轮询返回的数据不是数据内容,而是变更数据的key(dataId、groupId、tenant),然后通过key去nacos server获取数据

读取nacos server配置数据调用栈
# NacosPropertySourceBuilder.loadNacosData:加载配置数据 at com.alibaba.cloud.nacos.client.NacosPropertySourceBuilder.loadNacosData(NacosPropertySourceBuilder.java:85) at com.alibaba.cloud.nacos.client.NacosPropertySourceBuilder.build(NacosPropertySourceBuilder.java:74)# NacosPropertySourceLocator类 at com.alibaba.cloud.nacos.client.NacosPropertySourceLocator.loadNacosPropertySource(NacosPropertySourceLocator.java:204) at com.alibaba.cloud.nacos.client.NacosPropertySourceLocator.loadNacosDataIfPresent(NacosPropertySourceLocator.java:191) at com.alibaba.cloud.nacos.client.NacosPropertySourceLocator.loadApplicationConfiguration(NacosPropertySourceLocator.java:142) at com.alibaba.cloud.nacos.client.NacosPropertySourceLocator.locate(NacosPropertySourceLocator.java:103)# PropertySourceLocator类 at org.springframework.cloud.bootstrap.config.PropertySourceLocator.locateCollection(PropertySourceLocator.java:52) at org.springframework.cloud.bootstrap.config.PropertySourceLocator.locateCollection(PropertySourceLocator.java:47)# PropertySourceBootstrapConfiguration类 at org.springframework.cloud.bootstrap.config.PropertySourceBootstrapConfiguration.initialize(PropertySourceBootstrapConfiguration.java:98)# SpringApplication类 at org.springframework.boot.SpringApplication.applyInitializers(SpringApplication.java:626) at org.springframework.boot.SpringApplication.prepareContext(SpringApplication.java:370) at org.springframework.boot.SpringApplication.run(SpringApplication.java:314) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)# 项目启动入口 at com.example.demo.DemoApplication.main(DemoApplication.java:10)


NacosPropertySourceBuilder
public class NacosPropertySourceBuilder { private static final Logger log = LoggerFactory.getLogger(NacosPropertySourceBuilder.class); private static final Map EMPTY_MAP = new LinkedHashMap(); private ConfigService configService; private long timeout; public NacosPropertySourceBuilder(ConfigService configService, long timeout) { this.configService = configService; this.timeout = timeout; }public void setTimeout(long timeout) { public void setConfigService(ConfigService configService) {public long getTimeout() { public ConfigService getConfigService() {NacosPropertySource build(String dataId, String group, String fileExtension, boolean isRefreshable) { Map p = this.loadNacosData(dataId, group, fileExtension); NacosPropertySource nacosPropertySource = new NacosPropertySource(group, dataId, p, new Date(), isRefreshable); NacosPropertySourceRepository.collectNacosPropertySource(nacosPropertySource); return nacosPropertySource; }private Map loadNacosData(String dataId, String group, String fileExtension) { //加载nacos配置数据 String data = https://www.it610.com/article/null; try { data = this.configService.getConfig(dataId, group, this.timeout); //使用configService读取nacos server配置数据 if (StringUtils.isEmpty(data)) { log.warn("Ignore the empty nacos configuration and get it based on dataId[{}] & group[{}]", dataId, group); return EMPTY_MAP; }if (log.isDebugEnabled()) { log.debug(String.format("Loading nacos data, dataId: '%s', group: '%s', data: %s", dataId, group, data)); }Map dataMap = NacosDataParserHandler.getInstance().parseNacosData(data, fileExtension); return dataMap == null ? EMPTY_MAP : dataMap; } catch (NacosException var6) { log.error("get data from Nacos error,dataId:{}, ", dataId, var6); } catch (Exception var7) { log.error("parse data from Nacos error,dataId:{},data:{},", new Object[]{dataId, data, var7}); }return EMPTY_MAP; } }


NacosConfigService
public class NacosConfigService implements ConfigService { private static final Logger LOGGER = LogUtils.logger(NacosConfigService.class); private static final long POST_TIMEOUT = 3000L; private final HttpAgent agent; private final ClientWorker worker; private String namespace; private final String encode; private final ConfigFilterChainManager configFilterChainManager = new ConfigFilterChainManager(); public NacosConfigService(Properties properties) throws NacosException { ValidatorUtils.checkInitParam(properties); String encodeTmp = properties.getProperty("encode"); if (StringUtils.isBlank(encodeTmp)) { this.encode = "UTF-8"; } else { this.encode = encodeTmp.trim(); }this.initNamespace(properties); this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); this.agent.start(); this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties); //执行长轮询任务 }


ClientWorker
public class ClientWorker implements Closeable { private static final Logger LOGGER = LogUtils.logger(ClientWorker.class); final ScheduledExecutorService executor; //每隔10ms检查一次配置信息 final ScheduledExecutorService executorService; //执行长轮询任务 private final AtomicReference cacheMap = new AtomicReference(new HashMap()); private final HttpAgent agent; private final ConfigFilterChainManager configFilterChainManager; private boolean isHealthServer = true; private long timeout; private double currentLongingTaskCount = 0.0D; private int taskPenaltyTime; private boolean enableRemoteSyncConfig = false; public ClientWorker(final HttpAgent agent, ConfigFilterChainManager configFilterChainManager, Properties properties) { this.agent = agent; this.configFilterChainManager = configFilterChainManager; this.init(properties); this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); t.setDaemon(true); return t; } }); this.executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); t.setDaemon(true); return t; } }); this.executor.scheduleWithFixedDelay(new Runnable() { public void run() { try { ClientWorker.this.checkConfigInfo(); //每隔10ms检查一次配置信息 } catch (Throwable var2) { ClientWorker.LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", var2); }} }, 1L, 10L, TimeUnit.MILLISECONDS); }public void checkConfigInfo() { int listenerSize = ((Map)this.cacheMap.get()).size(); int longingTaskCount = (int)Math.ceil((double)listenerSize / ParamUtil.getPerTaskConfigSize()); if ((double)longingTaskCount > this.currentLongingTaskCount) { for(int i = (int)this.currentLongingTaskCount; i < longingTaskCount; ++i) { this.executorService.execute(new ClientWorker.LongPollingRunnable(i)); }//executorService执行长轮询任务this.currentLongingTaskCount = (double)longingTaskCount; }}************ LongPollingRunnable:内部类,长轮询任务class LongPollingRunnable implements Runnable { private final int taskId; public LongPollingRunnable(int taskId) { this.taskId = taskId; }public void run() { List cacheDatas = new ArrayList(); ArrayList inInitializingCacheList = new ArrayList(); try { Iterator var3 = ((Map)ClientWorker.this.cacheMap.get()).values().iterator(); //本地缓存配置while(var3.hasNext()) { CacheData cacheData = https://www.it610.com/article/(CacheData)var3.next(); if (cacheData.getTaskId() == this.taskId) { cacheDatas.add(cacheData); try { ClientWorker.this.checkLocalConfig(cacheData); if (cacheData.isUseLocalConfigInfo()) { cacheData.checkListenerMd5(); } } catch (Exception var13) { ClientWorker.LOGGER.error("get local config info error", var13); } } }List changedGroupKeys = ClientWorker.this.checkUpdateDataIds(cacheDatas, inInitializingCacheList); //向nacos发起请求,获取变更的配置(groupKey) if (!CollectionUtils.isEmpty(changedGroupKeys)) { ClientWorker.LOGGER.info("get changedGroupKeys:" + changedGroupKeys); }Iterator var16 = changedGroupKeys.iterator(); while(var16.hasNext()) { String groupKey = (String)var16.next(); String[] key = GroupKey.parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key.length == 3) { tenant = key[2]; }try { String[] ct = ClientWorker.this.getServerConfig(dataId, group, tenant, 3000L); //根据dataId、group、tenant,从nacos server读取变更的配置数据 CacheData cache = (CacheData)((Map)ClientWorker.this.cacheMap.get()).get(GroupKey.getKeyTenant(dataId, group, tenant)); cache.setContent(ct[0]); //将变更的数据在本地保存 if (null != ct[1]) { cache.setType(ct[1]); }ClientWorker.LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}", new Object[]{ClientWorker.this.agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(ct[0]), ct[1]}); } catch (NacosException var12) { String message = String.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", ClientWorker.this.agent.getName(), dataId, group, tenant); ClientWorker.LOGGER.error(message, var12); } }var16 = cacheDatas.iterator(); while(true) { CacheData cacheDatax; do { if (!var16.hasNext()) { inInitializingCacheList.clear(); ClientWorker.this.executorService.execute(this); return; }cacheDatax = (CacheData)var16.next(); } while(cacheDatax.isInitializing() && !inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheDatax.dataId, cacheDatax.group, cacheDatax.tenant))); cacheDatax.checkListenerMd5(); cacheDatax.setInitializing(false); } } catch (Throwable var14) { ClientWorker.LOGGER.error("longPolling error : ", var14); ClientWorker.this.executorService.schedule(this, (long)ClientWorker.this.taskPenaltyTime, TimeUnit.MILLISECONDS); } } }



    推荐阅读