学习笔记|【Kafka|常用CLI】Topic管理

本文介绍常用的topic管理命令,主要涉及kafka-topics脚本, kafka-reassign-partitions和kafka-config脚本,前者是专门的topic相关的脚本,中间的是分区重分配相关的脚本,后者是配置相关的脚本,不仅可以管理topic,还能管理broker,consumer等。
1.topic的管理: 增

kafka-topics --bootstrap-server broker_host:port --create --topic --partitions 1 --replication-factor 1


kafka-topics --bootstrap-server broker_host:port --delete --topic


  • 查用户建的某个主题的详细参数describe
kafka-topics --bootstrap-server broker_host:port --describe --topic

  • 查看所有的主题
kafka-topics --bootstrap-server broker_host:port --list

  • 修改主题分区(数量只能增不能减,否则会跑出invalidPartitionsException的异常)
kafka-topics --bootstrap-server broker_host:port --alter --topic --partitions

  • 修改主题级别的参数,比如max.message.bytes
kafka-configs --zookeeper zookeeper_host:port --entity-type topics --entity-name --alter-config max.message.bytes=100485760

  • 变更副本数量
    分为两个步骤:
    1.提供一个json文件(reassign.json)去指定所有的topic的partition和replicas,replicas列表是broker_id,列表的第一个broker_id是leader replica所在的broker。
{"version":1, "partitions":[ {"topic":"topic_name","partition":0,"replicas":[0,1,2]}, {"topic":"topic_name","partition":1,"replicas":[0,2,1]}, {"topic":"topic_name","partition":2,"replicas":[1,0,2]}, {"topic":"topic_name","partition":3,"replicas":[1,2,0]} ]}

2.使用命令
kafka-reassign-partitions --zookeeper zookeeper_host:port --reassignment-json-file reassign.json --execute

  • 修改主题限速
kafka-configs --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=1004857600,follower.replication.throttled.rate=104857600'

  • 主题分区迁移
kafka-reassign-partitions

2.特殊的主题管理 内部主题:__consumer_offsets和__transaction_state
前者是记录consumer消费的进度的,后者是支持事物机制后引入的
  • 查看位移提交数据
kafka-console-consumer --bootstrap-server host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

  • 读取__consumer_offset主题消息,查看消费者组的状态信息
kafka-console-consumer --bootstrap-server host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning

3.运行命令常见的错误 【学习笔记|【Kafka|常用CLI】Topic管理】1.删除主题失败
可能原因:
  • broker down
  • 删除的topic部分分区依然在执行迁移过程
解决:
  • 手动删除zookeeper节点/admin/delete_topic/这个znode
  • 手动删除该topic在磁盘上的分区目录
  • 在zookeeper中执行rmr/controller,触发controller重选举,刷新controller缓存
2.__consumer_offsets占用太多的磁盘
可能原因:
  • cleaner线程挂了,无法清理此内部主题
解决:
  • 显式地用 jstack 命令查看一下 kafka-log-cleaner-thread 前缀的线程状态,如果是这个原因导致的,就重启相应的 Broker
参考资料: kafka核心技术与实战-极客时间

    推荐阅读