Spring Cloud 升级之路 - 2020.0.x - 7. 使用 Spring Cloud

满堂花醉三千客,一剑霜寒十四洲。这篇文章主要讲述Spring Cloud 升级之路 - 2020.0.x - 7. 使用 Spring Cloud相关的知识,希望能为你提供帮助。
我们使用 Spring Cloud 官方推荐的 Spring Cloud LoadBalancer 作为我们的客户端负载均衡器。上一节我们了解了 Spring Cloud LoadBalancer 的结构,接下来我们来说一下我们在使用 Spring Cloud LoadBalancer 要实现的功能:

  1. 我们要实现不同集群之间不互相调用,通过实例的metamap中的zone配置,来区分不同集群的实例。只有实例的metamap中的zone配置一样的实例才能互相调用。这个通过实现自定义的 ServiceInstanceListSupplier 即可实现
  2. 负载均衡的轮询算法,需要请求与请求之间隔离,不能共用同一个 position 导致某个请求失败之后的重试还是原来失败的实例。上一节看到的默认的 RoundRobinLoadBalancer 是所有线程共用同一个原子变量 position 每次请求原子加 1。在这种情况下会有问题:假设有微服务 A 有两个实例:实例 1 和实例 2。请求 A 到达时,RoundRobinLoadBalancer 返回实例 1,这时有请求 B 到达,RoundRobinLoadBalancer 返回实例 2。然后如果请求 A 失败重试,RoundRobinLoadBalancer 又返回了实例 1。这不是我们期望看到的。
针对这两个功能,我们分别编写自己的实现。
实现不同集群不互相调用 Spring Cloud LoadBalancer 中的 zone 配置Spring Cloud LoadBalancer 定义了 LoadBalancerZoneConfig
public class LoadBalancerZoneConfig { //标识当前负载均衡器处于哪一个 zone private String zone; public LoadBalancerZoneConfig(String zone) { this.zone = zone; } public String getZone() { return zone; } public void setZone(String zone) { this.zone = zone; } }

如果没有引入 Eureka 相关依赖,则这个 zone 通过 spring.cloud.loadbalancer.zone 配置:
LoadBalancerAutoConfiguration
@Bean @ConditionalOnMissingBean public LoadBalancerZoneConfig zoneConfig(Environment environment) { return new LoadBalancerZoneConfig(environment.getProperty("spring.cloud.loadbalancer.zone")); }

如果引入了 Eureka 相关依赖,则如果在 Eureka 元数据配置了 zone,则这个 zone 会覆盖 Spring Cloud LoadBalancer 中的 LoadBalancerZoneConfig
EurekaLoadBalancerClientConfiguration
@PostConstruct public void postprocess() { if (!StringUtils.isEmpty(zoneConfig.getZone())) { return; } String zone = getZoneFromEureka(); if (!StringUtils.isEmpty(zone)) { if (LOG.isDebugEnabled()) { LOG.debug("Setting the value of \'" + LOADBALANCER_ZONE + "\' to " + zone); } //设置 `LoadBalancerZoneConfig` zoneConfig.setZone(zone); } }private String getZoneFromEureka() { String zone; //是否配置了 spring.cloud.loadbalancer.eureka.approximateZoneFromHostname 为 true boolean approximateZoneFromHostname = eurekaLoadBalancerProperties.isApproximateZoneFromHostname(); //如果配置了,则尝试从 Eureka 配置的 host 名称中提取 //实际就是以 . 分割 host,然后第二个就是 zone //例如 www.zone1.com 就是 zone1 if (approximateZoneFromHostname & & eurekaConfig != null) { return ZoneUtils.extractApproximateZone(this.eurekaConfig.getHostName(false)); } else { //否则,从 metadata map 中取 zone 这个 key zone = eurekaConfig == null ? null : eurekaConfig.getMetadataMap().get("zone"); //如果这个 key 不存在,则从配置中以 region 从 zone 列表取第一个 zone 作为当前 zone if (StringUtils.isEmpty(zone) & & clientConfig != null) { String[] zones = clientConfig.getAvailabilityZones(clientConfig.getRegion()); // Pick the first one from the regions we want to connect to zone = zones != null & & zones.length > 0 ? zones[0] : null; } return zone; } }

实现 SameZoneOnlyServiceInstanceListSupplier为了实现通过 zone 来过滤同一 zone 下的实例,并且绝对不会返回非同一 zone 下的实例,我们来编写代码:
SameZoneOnlyServiceInstanceListSupplier
/** * 只返回与当前实例同一个 Zone 的服务实例,不同 zone 之间的服务不互相调用 */ public class SameZoneOnlyServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier { /** * 实例元数据 map 中表示 zone 配置的 key */ private final String ZONE = "zone"; /** * 当前 spring cloud loadbalancer 的 zone 配置 */ private final LoadBalancerZoneConfig zoneConfig; private String zone; public SameZoneOnlyServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, LoadBalancerZoneConfig zoneConfig) { super(delegate); this.zoneConfig = zoneConfig; }@Override public Flux< List< ServiceInstance> > get() { return getDelegate().get().map(this::filteredByZone); }//通过 zoneConfig 过滤 private List< ServiceInstance> filteredByZone(List< ServiceInstance> serviceInstances) { if (zone == null) { zone = zoneConfig.getZone(); } if (zone != null) { List< ServiceInstance> filteredInstances = new ArrayList< > (); for (ServiceInstance serviceInstance : serviceInstances) { String instanceZone = getZone(serviceInstance); if (zone.equalsIgnoreCase(instanceZone)) { filteredInstances.add(serviceInstance); } } if (filteredInstances.size() > 0) { return filteredInstances; } } /** * @see ZonePreferenceServiceInstanceListSupplier 在没有相同zone实例的时候返回的是所有实例 * 我们这里为了实现不同 zone 之间不互相调用需要返回空列表 */ return List.of(); }//读取实例的 zone,没有配置则为 null private String getZone(ServiceInstance serviceInstance) { Map< String, String> metadata = https://www.songbingjia.com/android/serviceInstance.getMetadata(); if (metadata != null) { return metadata.get(ZONE); } return null; } }

实现请求与请求之间隔离的负载均衡算法在之前章节的讲述中,我们提到了我们使用 spring-cloud-sleuth 作为链路追踪库。我们想可以通过其中的 traceId,来区分究竟是否是同一个请求。
RoundRobinWithRequestSeparatedPositionLoadBalancer
//一定必须是实现ReactorServiceInstanceLoadBalancer //而不是ReactorLoadBalancer< ServiceInstance> //因为注册的时候是ReactorServiceInstanceLoadBalancer @Log4j2 public class RoundRobinWithRequestSeparatedPositionLoadBalancer implements ReactorServiceInstanceLoadBalancer { private final ServiceInstanceListSupplier serviceInstanceListSupplier; //每次请求算上重试不会超过1分钟 //对于超过1分钟的,这种请求肯定比较重,不应该重试 private final LoadingCache< Long, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES) //随机初始值,防止每次都是从第一个开始调用 .build(k -> new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1000))); private final String serviceId; private final Tracer tracer; public RoundRobinWithRequestSeparatedPositionLoadBalancer(ServiceInstanceListSupplier serviceInstanceListSupplier, String serviceId, Tracer tracer) { this.serviceInstanceListSupplier = serviceInstanceListSupplier; this.serviceId = serviceId; this.tracer = tracer; }@Override public Mono< Response< ServiceInstance> > choose(Request request) { return serviceInstanceListSupplier.get().next().map(serviceInstances -> getInstanceResponse(serviceInstances)); }private Response< ServiceInstance> getInstanceResponse(List< ServiceInstance> serviceInstances) { if (serviceInstances.isEmpty()) { log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); } return getInstanceResponseByRoundRobin(serviceInstances); }private Response< ServiceInstance> getInstanceResponseByRoundRobin(List< ServiceInstance> serviceInstances) { if (serviceInstances.isEmpty()) { log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); } //为了解决原始算法不同调用并发可能导致一个请求重试相同的实例 Span currentSpan = tracer.currentSpan(); if (currentSpan == null) { currentSpan = tracer.newTrace(); } long l = currentSpan.context().traceId(); AtomicInteger seed = positionCache.get(l); int s = seed.getAndIncrement(); int pos = s % serviceInstances.size(); log.info("position {}, seed: {}, instances count: {}", pos, s, serviceInstances.size()); return new DefaultResponse(serviceInstances.stream() //实例返回列表顺序可能不同,为了保持一致,先排序再取 .sorted(Comparator.comparing(ServiceInstance::getInstanceId)) .collect(Collectors.toList()).get(pos)); } }

将上述两个元素加入我们自定义的 LoadBalancerClient 并启用在上一节,我们提到了可以通过 @LoadBalancerClients 注解配置默认的负载均衡器配置,我们这里就是通过这种方式进行配置。首先在 spring.factories 中添加自动配置类:
spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\\ com.github.hashjang.spring.cloud.iiford.service.common.auto.LoadBalancerAutoConfiguration

然后编写这个自动配置类,其实很简单,就是添加一个 @LoadBalancerClients 注解,设置默认配置类:
LoadBalancerAutoConfiguration
@Configuration(proxyBeanMethods = false) @LoadBalancerClients(defaultConfiguration = DefaultLoadBalancerConfiguration.class) public class LoadBalancerAutoConfiguration { }

编写这个默认配置类,将上面我们实现的两个类,组装进去:
DefaultLoadBalancerConfiguration
@Configuration(proxyBeanMethods = false) public class DefaultLoadBalancerConfiguration {@Bean public ServiceInstanceListSupplier serviceInstanceListSupplier( DiscoveryClient discoveryClient, Environment env, ConfigurableApplicationContext context, LoadBalancerZoneConfig zoneConfig ) { ObjectProvider< LoadBalancerCacheManager> cacheManagerProvider = context .getBeanProvider(LoadBalancerCacheManager.class); return//开启服务实例缓存 new CachingServiceInstanceListSupplier( //只能返回同一个 zone 的服务实例 new SameZoneOnlyServiceInstanceListSupplier( //启用通过 discoveryClient 的服务发现 new DiscoveryClientServiceInstanceListSupplier( discoveryClient, env ), zoneConfig ) , cacheManagerProvider.getIfAvailable() ); }@Bean public ReactorLoadBalancer< ServiceInstance> reactorServiceInstanceLoadBalancer( Environment environment, ServiceInstanceListSupplier serviceInstanceListSupplier, Tracer tracer ) { String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME); return new RoundRobinWithRequestSeparatedPositionLoadBalancer( serviceInstanceListSupplier, name, tracer ); } }

这样,我们就实现了自定义的负载均衡器。也理解了 Spring Cloud LoadBalancer 的使用。接下来,我们来单元测试下这些功能。集成测试后面会有单独的章节,不用着急。
单元测试上述功能【Spring Cloud 升级之路 - 2020.0.x - 7. 使用 Spring Cloud】通过这届单元测试,我们也可以了解下一般我们实现 spring cloud 自定义的基础组件,怎么去单元测试。
这里的单元测试主要测试三个场景:
  1. 只返回同一个 zone 下的实例,其他 zone 的不会返回
  2. 对于多个请求,每个请求返回的与上次的实例不同。
  3. 对于多线程的每个请求,如果重试,返回的都是不同的实例
编写代码:
LoadBalancerTest
//SpringRunner也包含了MockitoJUnitRunner,所以 @Mock 等注解也生效了 @RunWith(SpringRunner.class) @SpringBootTest(properties = {LoadBalancerEurekaAutoConfiguration.LOADBALANCER_ZONE + "=zone1"}) public class LoadBalancerTest {@EnableAutoConfiguration(exclude = EurekaDiscoveryClientConfiguration.class) @Configuration public static class App { @Bean public DiscoveryClient discoveryClient() { ServiceInstance zone1Instance1 = Mockito.mock(ServiceInstance.class); ServiceInstance zone1Instance2 = Mockito.mock(ServiceInstance.class); ServiceInstance zone2Instance3 = Mockito.mock(ServiceInstance.class); Map< String, String> zone1 = Map.ofEntries( Map.entry("zone", "zone1") ); Map< String, String> zone2 = Map.ofEntries( Map.entry("zone", "zone2") ); when(zone1Instance1.getMetadata()).thenReturn(zone1); when(zone1Instance1.getInstanceId()).thenReturn("instance1"); when(zone1Instance2.getMetadata()).thenReturn(zone1); when(zone1Instance2.getInstanceId()).thenReturn("instance2"); when(zone2Instance3.getMetadata()).thenReturn(zone2); when(zone2Instance3.getInstanceId()).thenReturn("instance3"); DiscoveryClient mock = Mockito.mock(DiscoveryClient.class); Mockito.when(mock.getInstances("testService")) .thenReturn(List.of(zone1Instance1, zone1Instance2, zone2Instance3)); return mock; } }@Autowired private LoadBalancerClientFactory loadBalancerClientFactory; @Autowired private Tracer tracer; /** * 只返回同一个 zone 下的实例 */ @Test public void testFilteredByZone() { ReactiveLoadBalancer< ServiceInstance> testService = loadBalancerClientFactory.getInstance("testService"); for (int i = 0; i < 100; i++) { ServiceInstance server = Mono.from(testService.choose()).block().getServer(); //必须处于和当前实例同一个zone下 Assert.assertEquals(server.getMetadata().get("zone"), "zone1"); } }/** * 返回不同的实例 */ @Test public void testReturnNext() { ReactiveLoadBalancer< ServiceInstance> testService = loadBalancerClientFactory.getInstance("testService"); //获取服务实例 ServiceInstance server1 = Mono.from(testService.choose()).block().getServer(); ServiceInstance server2 = Mono.from(testService.choose()).block().getServer(); //每次选择的是不同实例 Assert.assertNotEquals(server1.getInstanceId(), server2.getInstanceId()); }/** * 跨线程,默认情况下是可能返回同一实例的,在我们的实现下,保持 * span 则会返回下一个实例,这样保证多线程环境同一个 request 重试会返回下一实例 * @throws Exception */ @Test public void testSameSpanReturnNext() throws Exception { Span span = tracer.nextSpan(); //测试 100 次 for (int i = 0; i < 100; i++) { try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) { ReactiveLoadBalancer< ServiceInstance> testService = loadBalancerClientFactory.getInstance("testService"); //获取实例 ServiceInstance server1 = Mono.from(testService.choose()).block().getServer(); AtomicReference< ServiceInstance> server2 = new AtomicReference< > (); Thread thread = new Thread(() -> { //保持 trace,这样就会认为仍然是同一个请求上下文,这样模拟重试 try (Tracer.SpanInScope cleared2 = tracer.withSpanInScope(span)) { server2.set(Mono.from(testService.choose()).block().getServer()); } }); thread.start(); thread.join(); System.out.println(i); Assert.assertNotEquals(server1.getInstanceId(), server2.get().getInstanceId()); } } } }

运行测试,测试通过。

    推荐阅读