【Soul源码阅读-09】数据同步之nacos

目标

  • soul nacos 方式数据同步原理及源码分析
上一篇我们对Soul网关的 http长轮询 数据同步方式做了简单的分析,了解了一下 http长轮询 同步的基本流程。接下来我们看一下Soul网关的nacos 数据同步方式。
Soul网关开启 nacos 同步:
  • soul-bootstrap新增如下依赖:
    org.dromara soul-spring-boot-starter-sync-data-nacos 2.2.1

  • application.yml添加相关配置
    soul : sync: nacos: url: localhost:8848 namespace: 1c10d748-af86-43b9-8265-75f487d20c6c acm: enabled: false endpoint: acm.aliyun.com namespace: accessKey: secretKey: #url: 配置成你的nacos地址,集群环境请使用(,)分隔。 # 其他参数配置,请参考naocs官网。

soul-admin 配置,或在 soul-admin 启动参数中设置 --soul.sync.zookeeper='',然后重启服务
soul : sync: nacos: url: localhost:8848 namespace: 1c10d748-af86-43b9-8265-75f487d20c6c acm: enabled: false endpoint: acm.aliyun.com namespace: accessKey: secretKey:

源码分析 soul-admin 数据同步
soul-admin 的数据变更通知,Soul 网关的四种数据同步方式webscoket、zookeeper、http长轮询、nacos原理都是一样的,只是不同的数据同步配置对应的事件处理器不一样,之前zookeeper数据同步已做了分析,这里就不在赘述。。
  • nacos 监听器源码分析
同之前的分析的同步方式类似,NacosDataChangedListener类为DataChangedListener接口的具体实现,以修改Selector为例:
public void onSelectorChanged(final List changed, final DataEventTypeEnum eventType) { //getConfig 通过 configService 获取配置信息 updateSelectorMap(getConfig(SELECTOR_DATA_ID)); switch (eventType) { case DELETE: ... break; case REFRESH: case MYSELF: ... break; default: changed.forEach(selector -> { List ls = SELECTOR_MAP .getOrDefault(selector.getPluginName(), new ArrayList<>()) .stream() .filter(s -> !s.getId().equals(selector.getId())) .sorted(SELECTOR_DATA_COMPARATOR) .collect(Collectors.toList()); ls.add(selector); //替换成最新的选择器信息 SELECTOR_MAP.put(selector.getPluginName(), ls); }); break; } //发布数据 publishConfig(SELECTOR_DATA_ID, SELECTOR_MAP); } private void updateSelectorMap(final String configInfo) { JsonObject jo = GsonUtils.getInstance().fromJson(configInfo, JsonObject.class); //当前 SELECTOR_MAP 所有 key Set set = new HashSet<>(SELECTOR_MAP.keySet()); for (Entry e : jo.entrySet()) { set.remove(e.getKey()); List ls = new ArrayList<>(); e.getValue().getAsJsonArray().forEach(je -> ls.add(GsonUtils.getInstance().fromJson(je, SelectorData.class))); //将获取的配置信息放入SELECTOR_MAP SELECTOR_MAP.put(e.getKey(), ls); } //为什么还要再remove?set已经在for循环中remove为空,并发考虑吗? SELECTOR_MAP.keySet().removeAll(set); }

【【Soul源码阅读-09】数据同步之nacos】至此,soul-admin已经完成了数据发送。
soul-bootstrap 网关数据同步
开启nacos同步,需要在soul-bootstrap中引入soul-spring-boot-starter-sync-data-nacos,在项目中找到对应的自定义spring-boot-starter,发现了NacosSyncDataService配置类。
@Configuration @ConditionalOnClass(NacosSyncDataService.class) @ConditionalOnProperty(prefix = "soul.sync.nacos", name = "url") @Slf4j public class NacosSyncDataConfiguration {/** * Nacos sync data service. * * @param configServicethe config service * @param pluginSubscriber the plugin subscriber * @param metaSubscribersthe meta subscribers * @param authSubscribersthe auth subscribers * @return the sync data service */ @Bean public SyncDataService nacosSyncDataService(final ObjectProvider configService, final ObjectProvider pluginSubscriber, final ObjectProvider> metaSubscribers, final ObjectProvider authSubscribers) { log.info("you use nacos sync soul data......."); return new NacosSyncDataService(configService.getIfAvailable(), pluginSubscriber.getIfAvailable(), metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList)); }/** * Nacos config service config service. * * @param nacosConfig the nacos config * @return the config service * @throws Exception the exception */ @Bean public ConfigService nacosConfigService(final NacosConfig nacosConfig) throws Exception { Properties properties = new Properties(); if (nacosConfig.getAcm() != null && nacosConfig.getAcm().isEnabled()) { properties.put(PropertyKeyConst.ENDPOINT, nacosConfig.getAcm().getEndpoint()); properties.put(PropertyKeyConst.NAMESPACE, nacosConfig.getAcm().getNamespace()); properties.put(PropertyKeyConst.ACCESS_KEY, nacosConfig.getAcm().getAccessKey()); properties.put(PropertyKeyConst.SECRET_KEY, nacosConfig.getAcm().getSecretKey()); } else { properties.put(PropertyKeyConst.SERVER_ADDR, nacosConfig.getUrl()); properties.put(PropertyKeyConst.NAMESPACE, nacosConfig.getNamespace()); } return NacosFactory.createConfigService(properties); }/** * Http config http config. * * @return the http config */ @Bean @ConfigurationProperties(prefix = "soul.sync.nacos") public NacosConfig nacosConfig() { return new NacosConfig(); } }

Selector为例,看一下NacosSyncDataService类监听Selector数据变化的逻辑:
protected void updateSelectorMap(final String configInfo) { /*if(configInfo == null){ return; }*/ try { //configInfo 为空,导致 json 反序列化失败 List selectorDataList = GsonUtils.getInstance().toObjectMapList(configInfo, SelectorData.class).values().stream().flatMap(Collection::stream).collect(Collectors.toList()); selectorDataList.forEach(selectorData -> Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> { //移除缓存数据 subscriber.unSelectorSubscribe(selectorData); //保存缓存数据 subscriber.onSelectorSubscribe(selectorData); })); } catch (JsonParseException e) { log.error("sync selector data have error:", e); } }

上面unSelectorSubscribe(selectorData)、onSelectorSubscribe(selectorData)为更新缓存数据的方法,具体的实现类为CommonPluginDataSubscriber,这和上一篇webscoket、zookeeper更新缓存数据的调用是一样的。
nacos更新数据方式和webscoket、zookeeper最大不同:nacos每次都是全量更新,而webscoket、zookeeper只有在启动的时候进行一次全量更新,其他时候都是增量更新。
问题 soul-bootstrap启动NPE错误导致启动失败
Caused by: java.lang.NullPointerException: null at org.dromara.soul.sync.data.nacos.handler.NacosCacheHandler.updatePluginMap(NacosCacheHandler.java:90) ~[classes/:na] at org.dromara.soul.sync.data.nacos.handler.NacosCacheHandler.watcherData(NacosCacheHandler.java:167) ~[classes/:na] at org.dromara.soul.sync.data.nacos.NacosSyncDataService.start(NacosSyncDataService.java:56) ~[classes/:na] at org.dromara.soul.sync.data.nacos.NacosSyncDataService.(NacosSyncDataService.java:49) ~[classes/:na] at org.dromara.soul.springboot.starter.sync.data.nacos.NacosSyncDataConfiguration.nacosSyncDataService(NacosSyncDataConfiguration.java:66)

通过错误信息,定位到代码
protected void updateSelectorMap(final String configInfo) { /*if(configInfo == null){ return; }*/ try { //configInfo 为空,导致 json 反序列化失败 List selectorDataList = GsonUtils.getInstance().toObjectMapList(configInfo, SelectorData.class).values().stream().flatMap(Collection::stream).collect(Collectors.toList()); selectorDataList.forEach(selectorData -> Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> { //移除缓存数据 subscriber.unSelectorSubscribe(selectorData); //保存缓存数据 subscriber.onSelectorSubscribe(selectorData); })); } catch (JsonParseException e) { log.error("sync selector data have error:", e); } }

soul-admin后台更改插件、选择器和规则配置,再次启动,异常消失。怀疑与nacos的处理机制有关,每次更新是全量更新,第一次初始化的时候nacos没有数据,但更新过数据后就会触发publishConfig发布数据。
soulissues中已经有人提了issue,应该很快会修复的。
至此,nacos数据同步源码分析完成。

    推荐阅读