nacos 动态配置源码解析

一、动态刷新实现方式 一般分为两种:

  • 客户端主动拉取配置
    • 客户端每隔一段时间从服务端获取一次配置,例如:每隔 3s 请求一次
    • 实现比较简单,弊端也比较明显,实时性较差,且不停的循环会增加服务端压力
  • 服务端主动推送配置
    • 服务端和所有的客户端建立连接,如果配置有变动,推送变动消息到所有客户端
    • 服务端需要维护所有的客户端连接,并建立心跳机制,这种方式无疑会增加服务端压力
nacos 采用了客户端拉取的方式,但采用了长轮询获取配置方式
二、流程解析 大致的业务流程如下:
  • 客户端发起长轮询
  • 服务端接收和响应长轮询请求
  • 用户主动发起配置变更
三、客户端发起长轮询 要想使用 nacos 的配置功能,需要引入相应的依赖包: spring-cloud-starter-alibaba-nacos-config

根据 springboot 自动装配的特性,我们找到对应的 spring.factories 文件,
点击进入到 NacosConfigBootstrapConfiguration 文件中,注意查看 nacosConfigManager() 方法
nacos 动态配置源码解析

该方法中创建了一个 NacosConfigManager 对象,NacosConfigManager 对象的构造方法中调用了 createConfigService(nacosConfigProperties) 方法,用于创建 ConfigService 对象。
nacos 动态配置源码解析

ConfigService 是一个接口,只有一个实现类 NacosConfigService,所以我们直接找到 NacosConfigService 的构造方法:
public NacosConfigService(Properties properties) throws NacosException { ValidatorUtils.checkInitParam(properties); String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE); if (StringUtils.isBlank(encodeTmp)) { this.encode = Constants.ENCODE; } else { this.encode = encodeTmp.trim(); } // 初始化 namespace initNamespace(properties); this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); this.agent.start(); // 重要方法,创建 ClientWorker 对象 this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties); }

接着查看 ClientWorker 的构造方法
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) { this.agent = agent; this.configFilterChainManager = configFilterChainManager; // 里面初始化了长轮询的超时时间,默认为 30s init(properties); this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("" + agent.getName()); t.setDaemon(true); return t; } }); this.executorService = Executors .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("" + agent.getName()); t.setDaemon(true); return t; } }); // 重要方法,初始化一个线程池,延迟 1 毫秒启动,之后每隔 10 毫秒执行一次,调用 checkConfigInfo() 方法 this.executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS); }

接下来看下 checkConfigInfo() 方法中做了什么:
public void checkConfigInfo() { // Dispatch taskes. int listenerSize = cacheMap.size(); // Round up the longingTaskCount. int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { // The task list is no order.So it maybe has issues when changing. // 创建了长轮询对象 LongPollingRunnable ,交由线程池执行 executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; } }

这里有一个非常重要的对象 cacheMap,来看下它长啥样:
/** * groupKey -> cacheData. */ private final ConcurrentHashMap cacheMap = new ConcurrentHashMap();

cacheMap 的主要作用是用来存储监听变更的缓存集合,为了保障线程安全使用了 ConcurrentHashMap 的结构。key 被称为 groupKey ,是由 dataId,group,tenant(租户)拼接而成的字符串;
value 为 CacheData 对象,每个 dataId 都会持有一个 CacheData 对象。
接回上面的 LongPollingRunnable 对象,它是 ClientWorker 的一个内部类,实现了 Runnable 接口,对于这种情况我们直接查看其 run() 方法:
@Override public void run() {List cacheDatas = new ArrayList(); List inInitializingCacheList = new ArrayList(); try { // ====================第一部分==================== // check failover config // 遍历 cacheMap for (CacheData cacheData : cacheMap.values()) { if (cacheData.getTaskId() == taskId) { cacheDatas.add(cacheData); try { // 校验本地文件 checkLocalConfig(cacheData); if (cacheData.isUseLocalConfigInfo()) { // 如果 isUseLocalConfigInfo 返回为 true, 表示缓存和本地配置不一致 cacheData.checkListenerMd5(); } } catch (Exception e) { LOGGER.error("get local config info error", e); } } }// ====================第二部分==================== // check server config List changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); if (!CollectionUtils.isEmpty(changedGroupKeys)) {"get changedGroupKeys:" + changedGroupKeys); }for (String groupKey : changedGroupKeys) { String[] key = GroupKey.parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key.length == 3) { tenant = key[2]; } try { String[] ct = getServerConfig(dataId, group, tenant, 3000L); CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant)); cache.setContent(ct[0]); if (null != ct[1]) { cache.setType(ct[1]); }"[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(ct[0]), ct[1]); } catch (NacosException ioe) { String message = String .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); } } for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId,, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false); } } inInitializingCacheList.clear(); executorService.execute(this); } catch (Throwable e) {// If the rotation training task is abnormal, the next execution time of the task will be punished LOGGER.error("longPolling error : ", e); executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); } }

3.1 第一部分 - 校验本地文件
主要有两个重要方法:checkLocalConfig(cacheData) 和 checkListenerMd5() 方法
checkLocalConfig(cacheData) 这个方法的作用是校验本地文件,分为 3 种情况:
private void checkLocalConfig(CacheData cacheData) { final String dataId = cacheData.dataId; final String group =; final String tenant = cacheData.tenant; // 获取本地文件路径 File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant); // ①如果不使用本地配置,并且本地文件路径存在 if (!cacheData.isUseLocalConfigInfo() && path.exists()) { // 从本地缓存文件中获取配置信息 String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE); // 设置 useLocalConfigInfo 为 true cacheData.setUseLocalConfigInfo(true); // 设置本地配置的版本信息 cacheData.setLocalConfigInfoVersion(path.lastModified()); cacheData.setContent(content); LOGGER.warn( "[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content)); return; }// If use local config info, then it doesn't notify business listener and notify after getting from server. // ②如果本地使用本地配置,但是本地文件路径不存在 if (cacheData.isUseLocalConfigInfo() && !path.exists()) { // 设置 useLocalConfigInfo 为 false 后直接返回 cacheData.setUseLocalConfigInfo(false); LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(), dataId, group, tenant); return; }// When it changed. // ③如果使用本地配置,本地缓存文件路径存在, 并且缓存的时间跟文件的更新时间不一致,说明有改变 if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path .lastModified()) { String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE); cacheData.setUseLocalConfigInfo(true); cacheData.setLocalConfigInfoVersion(path.lastModified()); cacheData.setContent(content); LOGGER.warn( "[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content)); } }

void checkListenerMd5() { for (ManagerListenerWrap wrap : listeners) { if (!md5.equals(wrap.lastCallMd5)) { safeNotifyListener(dataId, group, content, type, md5, wrap); } } }

如果 md5 值不一样,则发送数据变更通知,调用 safeNotifyListener 方法,方法的内容如下:
private void safeNotifyListener(final String dataId, final String group, final String content, final String type, final String md5, final ManagerListenerWrap listenerWrap) { final Listener listener = listenerWrap.listener; // 创建一个 job 对象,用于异步执行 Runnable job = new Runnable() { @Override public void run() { ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader(); ClassLoader appClassLoader = listener.getClass().getClassLoader(); try { // .... 省略其他代码// compare lastContent and content // 如果是 AbstractConfigChangeListener ,创建 ConfigChangeEvent 对象 if (listener instanceof AbstractConfigChangeListener) { Map data = .parseChangeData(listenerWrap.lastContent, content, type); ConfigChangeEvent event = new ConfigChangeEvent(data); ((AbstractConfigChangeListener) listener).receiveConfigChange(event); listenerWrap.lastContent = content; }listenerWrap.lastCallMd5 = md5;"[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5, listener); } catch (NacosException ex) { LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg()); } catch (Throwable t) { LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, group, md5, listener, t.getCause()); } finally { Thread.currentThread().setContextClassLoader(myClassLoader); } } }; final long startNotify = System.currentTimeMillis(); try { if (null != listener.getExecutor()) { // 执行 listener.getExecutor().execute(job); } else {; } } catch (Throwable t) { LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId, group, md5, listener, t.getCause()); } final long finishNotify = System.currentTimeMillis();"[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ", name, (finishNotify - startNotify), dataId, group, md5, listener); }

这个方法中,对 dataId 注册过监听的客户端推送变更后的数据内容。客户端接收通知后通过 receiveConfigInfo() 方法接收回调数据,处理自身业务。
3.2 第二部分 - 检查服务端文件
这里面有两个重要方法:checkUpdateDataIds() 和 getServerConfig() 方法:
checkUpdateDataIds() 该方法中调用了 checkUpdateConfigStr(sb.toString(), isInitializingCacheList) 方法,从服务器中获取 dataId 列表,请求头中加入了长轮询的标识:
List checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {Map params = new HashMap(2); params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString); Map headers = new HashMap(2); // 这里在请求头中塞了一个 "Long-Pulling-Timeout" 标识,这个是服务端长轮询的判断条件,非常重要 headers.put("Long-Pulling-Timeout", "" + timeout); // told server do not hang me up if new initializing cacheData added in if (isInitializingCacheList) { headers.put("Long-Pulling-Timeout-No-Hangup", "true"); }if (StringUtils.isBlank(probeUpdateString)) { return Collections.emptyList(); }try { // In order to prevent the server from handling the delay of the client's long task, // increase the client's read timeout to avoid this problem.long readTimeoutMs = timeout + (long) Math.round(timeout >> 1); // 调用服务端接口:/v1/cs/configs/listener HttpRestResult result = agent .httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), readTimeoutMs); if (result.ok()) { setHealthServer(true); return parseUpdateDataIdResponse(result.getData()); } else { setHealthServer(false); LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.getCode()); } } catch (Exception e) { setHealthServer(false); LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e); throw e; } return Collections.emptyList(); }

getServerConfig() 方法 checkUpdateDataIds() 方法执行完成后,得到了有更新的 changedGroupKeys,循环 changedGroupKeys 列表,调用 getServerConfig() 方法,获取服务端的配置:
public String[] getServerConfig(String dataId, String group, String tenant, long readTimeout) throws NacosException { String[] ct = new String[2]; if (StringUtils.isBlank(group)) { group = Constants.DEFAULT_GROUP; }HttpRestResult result = null; try { Map params = new HashMap(3); if (StringUtils.isBlank(tenant)) { params.put("dataId", dataId); params.put("group", group); } else { params.put("dataId", dataId); params.put("group", group); params.put("tenant", tenant); } // 调用 /v1/cs/configs,获取配置信息 result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout); } catch (Exception ex) { String message = String .format("[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ex); throw new NacosException(NacosException.SERVER_ERROR, ex); } // .....省略其它代码 }

四、服务端接收和响应长轮询请求 在服务端代码 中,找到客户端请求的 /v1/cs/configs/listener 接口,该接口中调用了 doPollingConfig() 方法:
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map clientMd5Map, int probeRequestSize) throws IOException {// Long polling. // 这里判断是否为长轮询的方法,其实就是从请求头中获取 "Long-Pulling-Timeout" 标识 if (LongPollingService.isSupportLongPolling(request)) { // 如果是长轮询的请求,调用 addLongPollingClient() 方法 longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize); return HttpServletResponse.SC_OK + ""; }// .... 省略其它代码 }

记住这个 LongPollingService 对象,非常重要,后面还会用到
addLongPollingClient() 这个方法比较重要,详细代码如下:
public voidaddLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map clientMd5Map, int probeRequestSize) {// str 就是客户端提交请求的超时时间,默认为 30s String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER); String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER); String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER); String tag = req.getHeader("Vipserver-Tag"); int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500); // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout. // 实际的超时时间是 29.5s,被减去了 500ms,主要是考虑到网络请求等耗时 long timeout = Math.max(10000, Long.parseLong(str) - delayTime); if (isFixedPolling()) { timeout = Math.max(10000, getFixedPollingInterval()); // Do nothing but set fix polling timeout. } else { long start = System.currentTimeMillis(); // 比较客户端文件的 md5 和服务端文件的 md5,如果不一样,直接返回客户端配置项有变更 List changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map); if (changedGroups.size() > 0) { generateResponse(req, rsp, changedGroups);"{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {"{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } } String ip = RequestUtil.getRemoteIp(req); // Must be called by http thread, or send response. final AsyncContext asyncContext = req.startAsync(); // AsyncContext.setTimeout() is incorrect, Control by oneself asyncContext.setTimeout(0L); // 如果配置项没有变更,将客户端请求挂起,创建一个 ClientLongPolling 对象,交给定时线程池处理 ConfigExecutor.executeLongPolling( new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag)); }

该方法中,对 md5 进行比较,如果不相同,说明文件内容已经变更,调用 generateResponse() 直接响应客户端。如果配置项没有变更,创建一个 ClientLongPolling 对象,交给定时线程池处理。
ClientLongPolling 对象实现了 Runnable 接口,我们直接看它的 run 方法,内容如下:
@Override public void run() { // ①创建了一个 Runnable 对象,并放入线程池中,每隔 29.5s 执行一次 asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() { @Override public void run() { try { getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis()); // Delete subscriber's relations. boolean removeFlag = allSubs.remove(ClientLongPolling.this); if (removeFlag) { if (isFixedPolling()) { LogUtil.CLIENT_LOG .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix", RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()), "polling", clientMd5Map.size(), probeRequestSize); // 校验 md5 ,判断配置文件内容是否变更 List changedGroups = MD5Util .compareMd5((HttpServletRequest) asyncContext.getRequest(), (HttpServletResponse) asyncContext.getResponse(), clientMd5Map); if (changedGroups.size() > 0) { // 有变动 sendResponse(changedGroups); } else { // 没有变动 sendResponse(null); } } else { LogUtil.CLIENT_LOG .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout", RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()), "polling", clientMd5Map.size(), probeRequestSize); sendResponse(null); } } else { LogUtil.DEFAULT_LOG.warn("client subsciber's relations delete fail."); } } catch (Throwable t) { LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause()); }}}, timeoutTime, TimeUnit.MILLISECONDS); // ②将当前的长轮询对象放到 allSubs 中 allSubs.add(this); }

run 方法中有两个逻辑,先看第二个,将当前的长轮询对象放入 allSubs 中,allSubs 是一个队列,定义如下:
/** * ClientLongPolling subscibers. */ final Queue allSubs;

再看第一个方法,创建了一个 Runnable 对象,并放入线程池中,每隔 29.5s 执行一次。如果这个期间配置没有发生变更,正常返回客户端;如果配置有变更(md5 比较不相同),则调用 sendResponse(changedGroups) 方法
五、用户主动发起配置变更 用户修改了数据,会调用 /nacos/v1/cs/configs 接口,对应的代码入口为:,代码如下:
@PostMapping @Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class) public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response, // .... 省略其它代码 if (StringUtils.isBlank(betaIps)) { if (StringUtils.isBlank(tag)) { persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false); //调用通知方法 notifyConfigChange ConfigChangePublisher .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime())); } else { persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false); //调用通知方法 notifyConfigChange ConfigChangePublisher.notifyConfigChange( new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime())); } } else { // beta publish persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false); //调用通知方法 notifyConfigChange ConfigChangePublisher .notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime())); } ConfigTraceService .logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(), ConfigTraceService.PERSISTENCE_EVENT_PUB, content); return true; }

可以看到,notifyConfigChange() 会在三个地方被调用,创建了 ConfigDataChangeEvent 事件,该事件只有一个接收者
@Autowired public AsyncNotifyService(ServerMemberManager memberManager) { // .... 省略其它代码@Override public void onEvent(Event event) { // ... 省略其它代码 if (!httpQueue.isEmpty()) { // 创建了一个 AsyncTask 对象 ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue)); } if (!rpcQueue.isEmpty()) { ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue)); }} }// ... 省略其它代码 }); }

onEvent() 方法中,创建了一个 AsyncTask 对象,用于通知配置文件有变更的操作,AsyncTask 对象的 run 方法内容为:
@Override public void run() { executeAsyncInvoke(); }private void executeAsyncInvoke() { while (!queue.isEmpty()) { // ... 省略其它代码// 这里会调用 /v1/cs/communication/dataChange 接口,通知配置有变动 restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task)); } } } }

根据调用的 /v1/cs/communication/dataChange 接口,找到对应的代码
@GetMapping("/dataChange") public Boolean notifyConfigInfo( // .... 省略部分代码){// ... 省略其它代码if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) { // 调用 dump() 方法 dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true); } else { dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp); } return true; }

dump 方法的内容如下:
public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) { String groupKey = GroupKey2.getKey(dataId, group, tenant); String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta)); // 主要方法在这里,创建了一个 task 任务 dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, lastModified, handleIp, isBeta));"[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey); }

继续看 addTask() 方法,可以追溯到 的方法。而在 NacosDelayTaskExecuteEngine 类的构造方法中,可以看到如下代码:
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) { super(logger); tasks = new ConcurrentHashMap<>(initCapacity); processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name)); // 创建了一个 ProcessRunnable 对象 processingExecutor .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS); }

查看 ProcessRunnable 对象的 run 方法,会调用 processTasks() 方法。processTasks() 方法中又会调用 getProcessor 获取对应的任务处理器
protected void processTasks() { Collection keys = getAllTaskKeys(); for (Object taskKey : keys) { AbstractDelayTask task = removeTask(taskKey); if (null == task) { continue; } // 获取任务处理器 NacosTaskProcessor processor = getProcessor(taskKey); if (null == processor) { getEngineLog().error("processor not found for task, so discarded. " + task); continue; } try { // ReAdd task if process failed // 调用 process 方法,重要方法 if (!processor.process(task)) { retryFailedTask(taskKey, task); } } catch (Throwable e) { getEngineLog().error("Nacos task execute error : " + e.toString(), e); retryFailedTask(taskKey, task); } } }
这里获取到的是 DumpProcessor 对象,查看 DumpProcessor 的 process 方法,
@Override public boolean process(NacosTask task) { final PersistService persistService = dumpService.getPersistService(); // .... 省略部分代码// 如果是 beta 版本,这里不考虑 if (isBeta) { // .... 省略部分代码 } // 设置一些值 if (StringUtils.isBlank(tag)) { ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant); build.remove(Objects.isNull(cf)); build.content(Objects.isNull(cf) ? null : cf.getContent()); build.type(Objects.isNull(cf) ? null : cf.getType()); } else { // 如果 tag 不为空,这里不考虑 ...省略部分代码 }// 关键代码 return DumpConfigHandler.configDump(; }

configDump() 方法中的代码是:
public static boolean configDump(ConfigDumpEvent event) { // .... 省略部分代码if (event.isBeta()) { // beta 版本的不考虑,.... 省略部分代码 } if (StringUtils.isBlank(event.getTag())) { // .... 省略部分代码if (!event.isRemove()) { // 重要代码,调用 dump() 方法 result = ConfigCacheService.dump(dataId, group, namespaceId, content, lastModified, type); // 记录日志 if (result) { ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(), ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified, content.length()); } } else { // .... 省略部分代码 } return result; } else { // 存在 tag 的暂时不考虑,.... 省略部分代码 return result; }}

  • dump() 方法保存配置文件并更新 md5
  • ConfigTraceService.logDumpEvent() 方法记录日志,这个方法就不展开说了
dump() 方法中,最重要的方法是:updateMd5(groupKey, md5, lastModifiedTs) ,这个方法的内容如下:
public static void updateMd5(String groupKey, String md5, long lastModifiedTs) { CacheItem cache = makeSure(groupKey); if (cache.md5 == null || !cache.md5.equals(md5)) { cache.md5 = md5; cache.lastModifiedTs = lastModifiedTs; // 创建了一个 LocalDataChangeEvent 事件 NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey)); } }

这个方法中创建了一个 LocalDataChangeEvent 事件对象并发送,那这个事件是在哪里处理的呢??还记得之前的长轮询对象 LongPollingService 对象么,来看下它的构造方法:
public LongPollingService() { allSubs = new ConcurrentLinkedQueue(); ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS); // Register LocalDataChangeEvent to NotifyCenter. // 注册了 LocalDataChangeEvent 的监听 NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize); // Register A Subscriber to subscribe LocalDataChangeEvent. NotifyCenter.registerSubscriber(new Subscriber() {@Override public void onEvent(Event event) { if (isFixedPolling()) { // Ignore. } else { // 判断是否为 LocalDataChangeEvent 对象 if (event instanceof LocalDataChangeEvent) { LocalDataChangeEvent evt = (LocalDataChangeEvent) event; ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps)); } } }@Override public Class subscribeType() { return LocalDataChangeEvent.class; } }); }

可以看到 LongPollingService 接受到 LocalDataChangeEvent 对象之后,创建了一个 DataChangeTask 对象,看下它的 run() 方法:
@Override public void run() { try { ConfigCacheService.getContentBetaMd5(groupKey); // 循环 allSubs 队列,取出所有的长连接 for (Iterator iter = allSubs.iterator(); iter.hasNext(); ) { ClientLongPolling clientSub =; if (clientSub.clientMd5Map.containsKey(groupKey)) { // If published tag is not in the beta list, then it skipped. if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) { continue; }// If published tag is not in the tag list, then it skipped. if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) { continue; }getRetainIps().put(clientSub.ip, System.currentTimeMillis()); iter.remove(); // Delete subscribers' relationships. LogUtil.CLIENT_LOG .info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance", RequestUtil .getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()), "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey); // 响应客户端请求 clientSub.sendResponse(Arrays.asList(groupKey)); } }} catch (Throwable t) { LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t)); } }

这一步就是拿出队列中所有的长轮询对象并响应,客户端在接收到响应后会请求 /v1/cs/configs 接口获取最新的配置。至此,所有的流程就全部串起来了。。
六、总结 【nacos 动态配置源码解析】这边文章只是对 nacos 动态配置的主要流程做了梳理。可以看到其中大量使用了异步线程处理来提高效率,有些逻辑看起来比较绕。写的不怎么样,如果有错误或者遗漏的地方,还请批评指正,感谢!
