java|Springboot + Kafka 入门实例 入门dem
版本说明
- springboot版本:2.3.3.RELEASE
- kakfa服务端版本:kafka_2.12-2.6.0.tgz
- zookeeper服务端版本:apache-zookeeper-3.6.1-bin.tar.gz
1,搭建好zookeeper服务,本实例zookeeper使用单机伪集群模式,
192.168.1.126:2181, 192.168.1.126:2182, 192.168.1.126:2183
【java|Springboot + Kafka 入门实例 入门dem】2,搭建好kafka服务,本实例kafka使用单机伪集群模式,
192.168.1.126:9092, 192.168.1.126:9093, 192.168.1.126:9094
1. 导入相关依赖
4.0.0 org.springframework.boot
spring-boot-starter-parent
2.3.3.RELEASE
com.example
springboot-kafka-demo
1.0-SNAPSHOT
springboot-kafka-demo
springboot-kafka-demo 1.8
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-test
test
org.springframework.kafka
spring-kafka
org.projectlombok
lombok
true
com.alibaba
fastjson
1.2.54
org.springframework.boot
spring-boot-maven-plugin
2. yml配置
server:
port: 8080
servlet:
context-path: /
tomcat:
uri-encoding: UTF-8spring:
kafka:
#本地虚拟机kafka伪集群
bootstrap-servers: 192.168.1.126:9092,192.168.1.126:9093,192.168.1.126:9094
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
batch-size: 65536
buffer-memory: 524288
#自定义的topic
myTopic1: testTopic1
myTopic2: testTopic2
consumer:
group-id: default-group#默认组id后面会配置多个消费者组
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: latest
enable-auto-commit: false#关闭自动提交 改由spring-kafka提交
auto-commit-interval: 100
max-poll-records: 20#批量消费 一次接收的最大数量
3. 部分代码 消息实体类
package com.example.demo.entity;
import java.util.Date;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class Message {
private Long id;
private String msg;
private Date sendTime;
}
kafka配置类
package com.example.demo.config;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
/**
* kafka配置类
*/
@Data
@Configuration
public class KafkaConfiguration {
/**
* kafaka集群列表
*/
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* kafaka消费group列表
*/
@Value("${spring.kafka.consumer.group-id}")
private String defaultGroupId;
/**
* 消费开始位置
*/
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
/**
* 是否自动提交
*/
@Value("${spring.kafka.consumer.enable-auto-commit}")
private String enableAutoCommit;
/**
* #如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
*/
@Value("${spring.kafka.consumer.auto-commit-interval}")
private String autoCommitInterval;
/**
* 一次调用poll()操作时返回的最大记录数,默认值为500
*/
@Value("${spring.kafka.consumer.max-poll-records}")
private String maxPollRecords;
/**
* 自定义的topic1
*/
@Value("${spring.kafka.producer.myTopic1}")
private String myTopic1;
/**
* 自定义的topic2
*/
@Value("${spring.kafka.producer.myTopic2}")
private String myTopic2;
}
消费者监听类
package com.example.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* 消费者1(监听topic1队列)
*/
@Component
public class ConsumerListener1 {@KafkaListener(topics = "${spring.kafka.producer.myTopic1}")
public void listen(ConsumerRecord,String> record) {
System.out.println(record);
String value = https://www.it610.com/article/record.value();
System.out.println("消费者1接收到消息:" + value);
}
}
测试类
package com.example.demo.controller;
import com.alibaba.fastjson.JSON;
import com.example.demo.config.KafkaConfiguration;
import com.example.demo.entity.Message;
import com.example.demo.service.KafkaService;
import com.example.demo.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaController {
@Autowired
private KafkaService kafkaService;
@Autowired
private KafkaConfiguration kafkaConfiguration;
/**
* 发送文本消息
* @param msg
* @return
*/
@GetMapping("/send/{msg}")
public String send(@PathVariable String msg) {
kafkaService.send(kafkaConfiguration.getMyTopic1(), msg);
return "生产者发送消息给topic1:"+msg;
}/**
* 发送JSON数据
* @return
*/
@GetMapping("/send2")
public String send2() {
Message message = new Message();
message.setId(System.currentTimeMillis());
message.setMsg("生产者发送消息到topic1: " + UUID.getUUID32());
message.setSendTime(new Date());
String value = https://www.it610.com/article/JSON.toJSONString(message);
log.info("生产者发送消息到topic1 message = {}", value);
kafkaService.send(kafkaConfiguration.getMyTopic1(),value);
return value;
}/**
* 发送JSON数据
* @return
*/
@GetMapping("/send3")
public String send3() {
Message message = new Message();
message.setId(System.currentTimeMillis());
message.setMsg("生产者发送消息到topic2: " + UUID.getUUID32());
message.setSendTime(new Date());
String value = https://www.it610.com/article/JSON.toJSONString(message);
log.info("生产者发送消息到topic2 message = {}", value);
kafkaService.send(kafkaConfiguration.getMyTopic2(),value);
return value;
}}
4. 实例运行结果
文章图片
文章图片
文章图片
推荐阅读
- JAVA(抽象类与接口的区别&重载与重写&内存泄漏)
- Activiti(一)SpringBoot2集成Activiti6
- 事件代理
- SpringBoot调用公共模块的自定义注解失效的解决
- Java|Java OpenCV图像处理之SIFT角点检测详解
- 解决SpringBoot引用别的模块无法注入的问题
- java中如何实现重建二叉树
- 数组常用方法一
- 【Hadoop踩雷】Mac下安装Hadoop3以及Java版本问题
- Java|Java基础——数组