深入理解Kafka(六)|深入理解Kafka(六) Consumer开发

下面我们来进行consumer的开发。先来写一个Consumer类,在构造方法里初始化KafkaConsumer对象,并设置相应的参数。

private final KafkaConsumer consumer; private List topics = new ArrayList(); public Consumer() { Properties props = new Properties(); props.put("bootstrap.servers","192.168.61.158:9092"); props.put("zookeeper.connect", "192.168.61.151:2181,192.168.61.152:2181,192.168.61.153:2181"); props.put("group.id", "0"); props.put("auto.offset.reset","earliest"); props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("enable.auto.commit","true"); props.put("zookeeper.session.timeout.ms", "4000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); topics.add("goods"); //配置consumer的值 consumer = new KafkaConsumer(props); consumer.subscribe(topics); }

Kafka是通过Properties配置参数的。这里的参数比较多,我们来详细的说一下。
1.bootstrap.servers,服务的节点列表,这个不用再说了。
2.zookeeper.connect,Zookeeper的地址
3.group.id,Consumer Group的id,我们知道,每个Consumer都会属于一个Consumer Group,这里就是给Consumer指定一个所属group的id。
4.auto.offset.reset,从哪里开始消费,earliest是从最早的offset开始消费数据。
5.key.deserializer,key的反序列化器。
6.value.deserializer,value的反序列化器。
7.enable.auto.commit,是否自动提交offset,如果要做到不重复消费,可以设置为手动提交,在业务中处理offset提交的问题。
8.zookeeper.session.timeout.ms,和Zookeeper连接的超时时间。
9.zookeeper.sync.time.ms,Zookeeper同步数据的时间。
10.auto.commit.interval.ms,多长时间进行offset的自动提交。
初始化KafkaConsumer后,我们要把KafkaConsumer订阅要关注的topic。下面看一下consume方法
public void consume(){ConsumerRecords consumerRecord = consumer.poll(1000); Iterator iterator = consumerRecord.iterator(); while (iterator.hasNext()){ ConsumerRecord record = (ConsumerRecord)iterator.next(); LOG.info("offset:"+record.offset()+",key:"+record.key()+",value:"+record.value()); }}

【深入理解Kafka(六)|深入理解Kafka(六) Consumer开发】通过poll方法去broker上拉取消息。下面来写一个测试方法
Consumer consumer = new Consumer(); consumer.consume();

Kafka的consumer开发是很简单的,但是要注意,consumer消费消息的offset管理,否则可能会产生重复消费的情况,在配置中没有设置自动提交offset的话,需要在业务中手动进行处理。
Kafka的consumer开发就介绍到这里了。

    推荐阅读