nacos 动态配置源码解析

一、动态刷新实现方式 一般分为两种:

  • 客户端主动拉取配置
    • 客户端每隔一段时间从服务端获取一次配置,例如:每隔 3s 请求一次
    • 实现比较简单,弊端也比较明显,实时性较差,且不停的循环会增加服务端压力
  • 服务端主动推送配置
    • 服务端和所有的客户端建立连接,如果配置有变动,推送变动消息到所有客户端
    • 服务端需要维护所有的客户端连接,并建立心跳机制,这种方式无疑会增加服务端压力
nacos 采用了客户端拉取的方式,但采用了长轮询获取配置方式
二、流程解析 大致的业务流程如下:
nacos 动态配置源码解析
文章图片

下面将根据三块内容对源码进行分析:
  • 客户端发起长轮询
  • 服务端接收和响应长轮询请求
  • 用户主动发起配置变更
三、客户端发起长轮询 要想使用 nacos 的配置功能,需要引入相应的依赖包:
com.alibaba.cloud spring-cloud-starter-alibaba-nacos-config

根据 springboot 自动装配的特性,我们找到对应的 spring.factories 文件,
nacos 动态配置源码解析
文章图片

点击进入到 NacosConfigBootstrapConfiguration 文件中,注意查看 nacosConfigManager() 方法
nacos 动态配置源码解析
文章图片

该方法中创建了一个 NacosConfigManager 对象,NacosConfigManager 对象的构造方法中调用了 createConfigService(nacosConfigProperties) 方法,用于创建 ConfigService 对象。
nacos 动态配置源码解析
文章图片

ConfigService 是一个接口,只有一个实现类 NacosConfigService,所以我们直接找到 NacosConfigService 的构造方法:
public NacosConfigService(Properties properties) throws NacosException { ValidatorUtils.checkInitParam(properties); String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE); if (StringUtils.isBlank(encodeTmp)) { this.encode = Constants.ENCODE; } else { this.encode = encodeTmp.trim(); } // 初始化 namespace initNamespace(properties); this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); this.agent.start(); // 重要方法,创建 ClientWorker 对象 this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties); }

接着查看 ClientWorker 的构造方法
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) { this.agent = agent; this.configFilterChainManager = configFilterChainManager; // 里面初始化了长轮询的超时时间,默认为 30s init(properties); this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); t.setDaemon(true); return t; } }); this.executorService = Executors .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); t.setDaemon(true); return t; } }); // 重要方法,初始化一个线程池,延迟 1 毫秒启动,之后每隔 10 毫秒执行一次,调用 checkConfigInfo() 方法 this.executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS); }

接下来看下 checkConfigInfo() 方法中做了什么:
public void checkConfigInfo() { // Dispatch taskes. int listenerSize = cacheMap.size(); // Round up the longingTaskCount. int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { // The task list is no order.So it maybe has issues when changing. // 创建了长轮询对象 LongPollingRunnable ,交由线程池执行 executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; } }

这里有一个非常重要的对象 cacheMap,来看下它长啥样:
/** * groupKey -> cacheData. */ private final ConcurrentHashMap cacheMap = new ConcurrentHashMap();

cacheMap 的主要作用是用来存储监听变更的缓存集合,为了保障线程安全使用了 ConcurrentHashMap 的结构。key 被称为 groupKey ,是由 dataId,group,tenant(租户)拼接而成的字符串;
value 为 CacheData 对象,每个 dataId 都会持有一个 CacheData 对象。
接回上面的 LongPollingRunnable 对象,它是 ClientWorker 的一个内部类,实现了 Runnable 接口,对于这种情况我们直接查看其 run() 方法:
@Override public void run() {List cacheDatas = new ArrayList(); List inInitializingCacheList = new ArrayList(); try { // ====================第一部分==================== // check failover config // 遍历 cacheMap for (CacheData cacheData : cacheMap.values()) { if (cacheData.getTaskId() == taskId) { cacheDatas.add(cacheData); try { // 校验本地文件 checkLocalConfig(cacheData); if (cacheData.isUseLocalConfigInfo()) { // 如果 isUseLocalConfigInfo 返回为 true, 表示缓存和本地配置不一致 cacheData.checkListenerMd5(); } } catch (Exception e) { LOGGER.error("get local config info error", e); } } }// ====================第二部分==================== // check server config List changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); if (!CollectionUtils.isEmpty(changedGroupKeys)) { LOGGER.info("get changedGroupKeys:" + changedGroupKeys); }for (String groupKey : changedGroupKeys) { String[] key = GroupKey.parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key.length == 3) { tenant = key[2]; } try { String[] ct = getServerConfig(dataId, group, tenant, 3000L); CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant)); cache.setContent(ct[0]); if (null != ct[1]) { cache.setType(ct[1]); } LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(ct[0]), ct[1]); } catch (NacosException ioe) { String message = String .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); } } for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false); } } inInitializingCacheList.clear(); executorService.execute(this); } catch (Throwable e) {// If the rotation training task is abnormal, the next execution time of the task will be punished LOGGER.error("longPolling error : ", e); executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); } }

这一段代码较长,拆分为两段来看:
3.1 第一部分 - 校验本地文件
主要有两个重要方法:checkLocalConfig(cacheData) 和 checkListenerMd5() 方法
checkLocalConfig(cacheData) 这个方法的作用是校验本地文件,分为 3 种情况:
private void checkLocalConfig(CacheData cacheData) { final String dataId = cacheData.dataId; final String group = cacheData.group; final String tenant = cacheData.tenant; // 获取本地文件路径 File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant); // ①如果不使用本地配置,并且本地文件路径存在 if (!cacheData.isUseLocalConfigInfo() && path.exists()) { // 从本地缓存文件中获取配置信息 String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE); // 设置 useLocalConfigInfo 为 true cacheData.setUseLocalConfigInfo(true); // 设置本地配置的版本信息 cacheData.setLocalConfigInfoVersion(path.lastModified()); cacheData.setContent(content); LOGGER.warn( "[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content)); return; }// If use local config info, then it doesn't notify business listener and notify after getting from server. // ②如果本地使用本地配置,但是本地文件路径不存在 if (cacheData.isUseLocalConfigInfo() && !path.exists()) { // 设置 useLocalConfigInfo 为 false 后直接返回 cacheData.setUseLocalConfigInfo(false); LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(), dataId, group, tenant); return; }// When it changed. // ③如果使用本地配置,本地缓存文件路径存在, 并且缓存的时间跟文件的更新时间不一致,说明有改变 if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path .lastModified()) { String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE); cacheData.setUseLocalConfigInfo(true); cacheData.setLocalConfigInfoVersion(path.lastModified()); cacheData.setContent(content); LOGGER.warn( "[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content)); } }

checkListenerMd5()
void checkListenerMd5() { for (ManagerListenerWrap wrap : listeners) { if (!md5.equals(wrap.lastCallMd5)) { safeNotifyListener(dataId, group, content, type, md5, wrap); } } }

如果 md5 值不一样,则发送数据变更通知,调用 safeNotifyListener 方法,方法的内容如下:
private void safeNotifyListener(final String dataId, final String group, final String content, final String type, final String md5, final ManagerListenerWrap listenerWrap) { final Listener listener = listenerWrap.listener; // 创建一个 job 对象,用于异步执行 Runnable job = new Runnable() { @Override public void run() { ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader(); ClassLoader appClassLoader = listener.getClass().getClassLoader(); try { // .... 省略其他代码// compare lastContent and content // 如果是 AbstractConfigChangeListener ,创建 ConfigChangeEvent 对象 if (listener instanceof AbstractConfigChangeListener) { Map data = https://www.it610.com/article/ConfigChangeHandler.getInstance() .parseChangeData(listenerWrap.lastContent, content, type); ConfigChangeEvent event = new ConfigChangeEvent(data); ((AbstractConfigChangeListener) listener).receiveConfigChange(event); listenerWrap.lastContent = content; }listenerWrap.lastCallMd5 = md5; LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5, listener); } catch (NacosException ex) { LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg()); } catch (Throwable t) { LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, group, md5, listener, t.getCause()); } finally { Thread.currentThread().setContextClassLoader(myClassLoader); } } }; final long startNotify = System.currentTimeMillis(); try { if (null != listener.getExecutor()) { // 执行 listener.getExecutor().execute(job); } else { job.run(); } } catch (Throwable t) { LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId, group, md5, listener, t.getCause()); } final long finishNotify = System.currentTimeMillis(); LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ", name, (finishNotify - startNotify), dataId, group, md5, listener); }

这个方法中,对 dataId 注册过监听的客户端推送变更后的数据内容。客户端接收通知后通过 receiveConfigInfo() 方法接收回调数据,处理自身业务。
3.2 第二部分 - 检查服务端文件
这里面有两个重要方法:checkUpdateDataIds() 和 getServerConfig() 方法:
checkUpdateDataIds() 该方法中调用了 checkUpdateConfigStr(sb.toString(), isInitializingCacheList) 方法,从服务器中获取 dataId 列表,请求头中加入了长轮询的标识:
List checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {Map params = new HashMap(2); params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString); Map headers = new HashMap(2); // 这里在请求头中塞了一个 "Long-Pulling-Timeout" 标识,这个是服务端长轮询的判断条件,非常重要 headers.put("Long-Pulling-Timeout", "" + timeout); // told server do not hang me up if new initializing cacheData added in if (isInitializingCacheList) { headers.put("Long-Pulling-Timeout-No-Hangup", "true"); }if (StringUtils.isBlank(probeUpdateString)) { return Collections.emptyList(); }try { // In order to prevent the server from handling the delay of the client's long task, // increase the client's read timeout to avoid this problem.long readTimeoutMs = timeout + (long) Math.round(timeout >> 1); // 调用服务端接口:/v1/cs/configs/listener HttpRestResult result = agent .httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), readTimeoutMs); if (result.ok()) { setHealthServer(true); return parseUpdateDataIdResponse(result.getData()); } else { setHealthServer(false); LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.getCode()); } } catch (Exception e) { setHealthServer(false); LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e); throw e; } return Collections.emptyList(); }

getServerConfig() 方法 checkUpdateDataIds() 方法执行完成后,得到了有更新的 changedGroupKeys,循环 changedGroupKeys 列表,调用 getServerConfig() 方法,获取服务端的配置:
public String[] getServerConfig(String dataId, String group, String tenant, long readTimeout) throws NacosException { String[] ct = new String[2]; if (StringUtils.isBlank(group)) { group = Constants.DEFAULT_GROUP; }HttpRestResult result = null; try { Map params = new HashMap(3); if (StringUtils.isBlank(tenant)) { params.put("dataId", dataId); params.put("group", group); } else { params.put("dataId", dataId); params.put("group", group); params.put("tenant", tenant); } // 调用 /v1/cs/configs,获取配置信息 result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout); } catch (Exception ex) { String message = String .format("[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ex); throw new NacosException(NacosException.SERVER_ERROR, ex); } // .....省略其它代码 }

至此,客户端发起长轮询的代码已经分析完成。
四、服务端接收和响应长轮询请求 在服务端代码 com.alibaba.nacos.config.server.controller.ConfigController#listener 中,找到客户端请求的 /v1/cs/configs/listener 接口,该接口中调用了 doPollingConfig() 方法:
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map clientMd5Map, int probeRequestSize) throws IOException {// Long polling. // 这里判断是否为长轮询的方法,其实就是从请求头中获取 "Long-Pulling-Timeout" 标识 if (LongPollingService.isSupportLongPolling(request)) { // 如果是长轮询的请求,调用 addLongPollingClient() 方法 longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize); return HttpServletResponse.SC_OK + ""; }// .... 省略其它代码 }

记住这个 LongPollingService 对象,非常重要,后面还会用到
addLongPollingClient() 这个方法比较重要,详细代码如下:
public voidaddLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map clientMd5Map, int probeRequestSize) {// str 就是客户端提交请求的超时时间,默认为 30s String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER); String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER); String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER); String tag = req.getHeader("Vipserver-Tag"); int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500); // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout. // 实际的超时时间是 29.5s,被减去了 500ms,主要是考虑到网络请求等耗时 long timeout = Math.max(10000, Long.parseLong(str) - delayTime); if (isFixedPolling()) { timeout = Math.max(10000, getFixedPollingInterval()); // Do nothing but set fix polling timeout. } else { long start = System.currentTimeMillis(); // 比较客户端文件的 md5 和服务端文件的 md5,如果不一样,直接返回客户端配置项有变更 List changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map); if (changedGroups.size() > 0) { generateResponse(req, rsp, changedGroups); LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) { LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } } String ip = RequestUtil.getRemoteIp(req); // Must be called by http thread, or send response. final AsyncContext asyncContext = req.startAsync(); // AsyncContext.setTimeout() is incorrect, Control by oneself asyncContext.setTimeout(0L); // 如果配置项没有变更,将客户端请求挂起,创建一个 ClientLongPolling 对象,交给定时线程池处理 ConfigExecutor.executeLongPolling( new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag)); }

该方法中,对 md5 进行比较,如果不相同,说明文件内容已经变更,调用 generateResponse() 直接响应客户端。如果配置项没有变更,创建一个 ClientLongPolling 对象,交给定时线程池处理。
ClientLongPolling 对象实现了 Runnable 接口,我们直接看它的 run 方法,内容如下:
@Override public void run() { // ①创建了一个 Runnable 对象,并放入线程池中,每隔 29.5s 执行一次 asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() { @Override public void run() { try { getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis()); // Delete subscriber's relations. boolean removeFlag = allSubs.remove(ClientLongPolling.this); if (removeFlag) { if (isFixedPolling()) { LogUtil.CLIENT_LOG .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix", RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()), "polling", clientMd5Map.size(), probeRequestSize); // 校验 md5 ,判断配置文件内容是否变更 List changedGroups = MD5Util .compareMd5((HttpServletRequest) asyncContext.getRequest(), (HttpServletResponse) asyncContext.getResponse(), clientMd5Map); if (changedGroups.size() > 0) { // 有变动 sendResponse(changedGroups); } else { // 没有变动 sendResponse(null); } } else { LogUtil.CLIENT_LOG .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout", RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()), "polling", clientMd5Map.size(), probeRequestSize); sendResponse(null); } } else { LogUtil.DEFAULT_LOG.warn("client subsciber's relations delete fail."); } } catch (Throwable t) { LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause()); }}}, timeoutTime, TimeUnit.MILLISECONDS); // ②将当前的长轮询对象放到 allSubs 中 allSubs.add(this); }

run 方法中有两个逻辑,先看第二个,将当前的长轮询对象放入 allSubs 中,allSubs 是一个队列,定义如下:
/** * ClientLongPolling subscibers. */ final Queue allSubs;

再看第一个方法,创建了一个 Runnable 对象,并放入线程池中,每隔 29.5s 执行一次。如果这个期间配置没有发生变更,正常返回客户端;如果配置有变更(md5 比较不相同),则调用 sendResponse(changedGroups) 方法
响应客户端。
五、用户主动发起配置变更 用户修改了数据,会调用 /nacos/v1/cs/configs 接口,对应的代码入口为: com.alibaba.nacos.config.server.controller.ConfigController#publishConfig,代码如下:
@PostMapping @Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class) public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response, // .... 省略其它代码 if (StringUtils.isBlank(betaIps)) { if (StringUtils.isBlank(tag)) { persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false); //调用通知方法 notifyConfigChange ConfigChangePublisher .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime())); } else { persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false); //调用通知方法 notifyConfigChange ConfigChangePublisher.notifyConfigChange( new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime())); } } else { // beta publish persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false); //调用通知方法 notifyConfigChange ConfigChangePublisher .notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime())); } ConfigTraceService .logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(), ConfigTraceService.PERSISTENCE_EVENT_PUB, content); return true; }

可以看到,notifyConfigChange() 会在三个地方被调用,创建了 ConfigDataChangeEvent 事件,该事件只有一个接收者 com.alibaba.nacos.config.server.service.notify.AsyncNotifyService#AsyncNotifyService
@Autowired public AsyncNotifyService(ServerMemberManager memberManager) { // .... 省略其它代码@Override public void onEvent(Event event) { // ... 省略其它代码 if (!httpQueue.isEmpty()) { // 创建了一个 AsyncTask 对象 ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue)); } if (!rpcQueue.isEmpty()) { ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue)); }} }// ... 省略其它代码 }); }

onEvent() 方法中,创建了一个 AsyncTask 对象,用于通知配置文件有变更的操作,AsyncTask 对象的 run 方法内容为:
@Override public void run() { executeAsyncInvoke(); }private void executeAsyncInvoke() { while (!queue.isEmpty()) { // ... 省略其它代码// 这里会调用 /v1/cs/communication/dataChange 接口,通知配置有变动 restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task)); } } } }

根据调用的 /v1/cs/communication/dataChange 接口,找到对应的代码:com.alibaba.nacos.config.server.controller.CommunicationController#notifyConfigInfo
@GetMapping("/dataChange") public Boolean notifyConfigInfo( // .... 省略部分代码){// ... 省略其它代码if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) { // 调用 dump() 方法 dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true); } else { dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp); } return true; }

dump 方法的内容如下:
public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) { String groupKey = GroupKey2.getKey(dataId, group, tenant); String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta)); // 主要方法在这里,创建了一个 task 任务 dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, lastModified, handleIp, isBeta)); DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey); }

继续看 addTask() 方法,可以追溯到 com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngine#addTask 的方法。而在 NacosDelayTaskExecuteEngine 类的构造方法中,可以看到如下代码:
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) { super(logger); tasks = new ConcurrentHashMap<>(initCapacity); processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name)); // 创建了一个 ProcessRunnable 对象 processingExecutor .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS); }

查看 ProcessRunnable 对象的 run 方法,会调用 processTasks() 方法。processTasks() 方法中又会调用 getProcessor 获取对应的任务处理器
protected void processTasks() { Collection keys = getAllTaskKeys(); for (Object taskKey : keys) { AbstractDelayTask task = removeTask(taskKey); if (null == task) { continue; } // 获取任务处理器 NacosTaskProcessor processor = getProcessor(taskKey); if (null == processor) { getEngineLog().error("processor not found for task, so discarded. " + task); continue; } try { // ReAdd task if process failed // 调用 process 方法,重要方法 if (!processor.process(task)) { retryFailedTask(taskKey, task); } } catch (Throwable e) { getEngineLog().error("Nacos task execute error : " + e.toString(), e); retryFailedTask(taskKey, task); } } }
这里获取到的是 DumpProcessor 对象,查看 DumpProcessor 的 process 方法,com.alibaba.nacos.config.server.service.dump.processor.DumpProcessor#process
@Override public boolean process(NacosTask task) { final PersistService persistService = dumpService.getPersistService(); // .... 省略部分代码// 如果是 beta 版本,这里不考虑 if (isBeta) { // .... 省略部分代码 } // 设置一些值 if (StringUtils.isBlank(tag)) { ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant); build.remove(Objects.isNull(cf)); build.content(Objects.isNull(cf) ? null : cf.getContent()); build.type(Objects.isNull(cf) ? null : cf.getType()); } else { // 如果 tag 不为空,这里不考虑 ...省略部分代码 }// 关键代码 return DumpConfigHandler.configDump(build.build()); }

configDump() 方法中的代码是:
public static boolean configDump(ConfigDumpEvent event) { // .... 省略部分代码if (event.isBeta()) { // beta 版本的不考虑,.... 省略部分代码 } if (StringUtils.isBlank(event.getTag())) { // .... 省略部分代码if (!event.isRemove()) { // 重要代码,调用 dump() 方法 result = ConfigCacheService.dump(dataId, group, namespaceId, content, lastModified, type); // 记录日志 if (result) { ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(), ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified, content.length()); } } else { // .... 省略部分代码 } return result; } else { // 存在 tag 的暂时不考虑,.... 省略部分代码 return result; }}

好了,兜兜转转这么久,终于到了最重要的一段方法,这段代码主要做两件事情:
  • dump() 方法保存配置文件并更新 md5
  • ConfigTraceService.logDumpEvent() 方法记录日志,这个方法就不展开说了
dump() 方法中,最重要的方法是:updateMd5(groupKey, md5, lastModifiedTs) ,这个方法的内容如下:
public static void updateMd5(String groupKey, String md5, long lastModifiedTs) { CacheItem cache = makeSure(groupKey); if (cache.md5 == null || !cache.md5.equals(md5)) { cache.md5 = md5; cache.lastModifiedTs = lastModifiedTs; // 创建了一个 LocalDataChangeEvent 事件 NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey)); } }

这个方法中创建了一个 LocalDataChangeEvent 事件对象并发送,那这个事件是在哪里处理的呢??还记得之前的长轮询对象 LongPollingService 对象么,来看下它的构造方法:
public LongPollingService() { allSubs = new ConcurrentLinkedQueue(); ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS); // Register LocalDataChangeEvent to NotifyCenter. // 注册了 LocalDataChangeEvent 的监听 NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize); // Register A Subscriber to subscribe LocalDataChangeEvent. NotifyCenter.registerSubscriber(new Subscriber() {@Override public void onEvent(Event event) { if (isFixedPolling()) { // Ignore. } else { // 判断是否为 LocalDataChangeEvent 对象 if (event instanceof LocalDataChangeEvent) { LocalDataChangeEvent evt = (LocalDataChangeEvent) event; ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps)); } } }@Override public Class subscribeType() { return LocalDataChangeEvent.class; } }); }

可以看到 LongPollingService 接受到 LocalDataChangeEvent 对象之后,创建了一个 DataChangeTask 对象,看下它的 run() 方法:
@Override public void run() { try { ConfigCacheService.getContentBetaMd5(groupKey); // 循环 allSubs 队列,取出所有的长连接 for (Iterator iter = allSubs.iterator(); iter.hasNext(); ) { ClientLongPolling clientSub = iter.next(); if (clientSub.clientMd5Map.containsKey(groupKey)) { // If published tag is not in the beta list, then it skipped. if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) { continue; }// If published tag is not in the tag list, then it skipped. if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) { continue; }getRetainIps().put(clientSub.ip, System.currentTimeMillis()); iter.remove(); // Delete subscribers' relationships. LogUtil.CLIENT_LOG .info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance", RequestUtil .getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()), "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey); // 响应客户端请求 clientSub.sendResponse(Arrays.asList(groupKey)); } }} catch (Throwable t) { LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t)); } }

这一步就是拿出队列中所有的长轮询对象并响应,客户端在接收到响应后会请求 /v1/cs/configs 接口获取最新的配置。至此,所有的流程就全部串起来了。。
六、总结 【nacos 动态配置源码解析】这边文章只是对 nacos 动态配置的主要流程做了梳理。可以看到其中大量使用了异步线程处理来提高效率,有些逻辑看起来比较绕。写的不怎么样,如果有错误或者遗漏的地方,还请批评指正,感谢!

    推荐阅读