整合Spring Cloud Stream Binder与GCP Pubsub进行消息发送与接收

1 前言 之前的文章《整合Spring Cloud Stream Binder与RabbitMQ进行消息发送与接收》讲解了Spring Cloud streamRabbitMQ的整合,本文将简单介绍一下Spring Cloud StreamGoogle Cloud Pub/Sub的整合。
2 通过Emulator启动Pub/Sub 因使用实际的GCP Pub/Sub相对麻烦,本文通过模拟器来运行。
关于Google Cloud SDK的安装可参考:Mac安装Google Cloud SDK
gcloud components install beta gcloud components install pubsub-emulator

$ gcloud beta emulators pubsub start --project=pkslow-prj --host-port= Executing: /google-cloud-sdk/platform/pubsub-emulator/bin/cloud-pubsub-emulator --host= --port=8511 [pubsub] This is the Google Pub/Sub fake. [pubsub] Implementation may be incomplete or differ from the real system. [pubsub] May 11, 2021 10:27:31 PM com.google.cloud.pubsub.testing.v1.Main main [pubsub] INFO: IAM integration is disabled. IAM policy methods and ACL checks are not supported [pubsub] SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". [pubsub] SLF4J: Defaulting to no-operation (NOP) logger implementation [pubsub] SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. [pubsub] May 11, 2021 10:27:32 PM com.google.cloud.pubsub.testing.v1.Main main [pubsub] INFO: Server started, listening on 8511

3 整合 引入依赖:
org.springframework.cloud spring-cloud-gcp-pubsub-stream-binder

package com.pkslow.cloud.stream.binder.pubsub; @SpringBootApplication public class StreamBinderPubsub { private static final Logger log = LoggerFactory.getLogger(StreamBinderPubsub.class); public static void main(String[] args) { SpringApplication.run(StreamBinderPubsub.class, args); }@Bean public Supplier pkslowSource() { return () -> { String message = "www.pkslow.com"; log.info("Sending value: " + message); return message; }; }@Bean public Consumer pkslowSink() { return message -> { log.info("Received message " + message); }; } }

配置Pub/Sub属性与Cloud Stream属性:
spring: cloud: stream: function: definition: pkslowSource; pkslowSink bindings: pkslowSource-out-0: destination: pkslow-topic pkslowSink-in-0: destination: pkslow-topic poller: fixed-delay: 500 gcp: pubsub: emulator-host: localhost:8511 project-id: pkslow-prj

如果是实际的GCP Pub/Sub,指定key文件即可:
spring: cloud: gcp: credentials: location: file:/xxx.json

4 总结 代码请查看:https://github.com/LarryDpk/p...
