前言
Kafka作为一个高性能的分布式消息系统,提供了一系列强大的命令行工具,这些工具位于Kafka安装目录的bin文件夹下(Windows系统下是bin/windows目录)。本文将详细介绍这些工具的使用方法,帮助开发者和运维人员更高效地管理Kafka集群。
前提条件
在开始使用Kafka命令行工具前,确保:
- 已正确安装Kafka(本文基于Kafka 2.4.0版本)
- 已启动ZooKeeper服务
- 已启动Kafka Broker服务
以下是启动ZooKeeper和Kafka服务的基本命令:
1 2 3 4 5
| bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
|
Windows系统下使用对应的.bat文件:
1 2 3 4 5
| # 启动ZooKeeper bin\windows\zookeeper-server-start.bat config\zookeeper.properties
# 启动Kafka bin\windows\kafka-server-start.bat config\server.properties
|
Topic管理
创建Topic
1
| bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic my-topic
|
参数说明:
--create
:创建操作
--bootstrap-server
:Kafka服务器地址
--replication-factor
:副本数量
--partitions
:分区数量
--topic
:Topic名称
列出所有Topic
1
| bin/kafka-topics.sh --list --bootstrap-server localhost:9092
|
查看Topic详情
1
| bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-topic
|
输出示例:
1 2 3 4
| Topic: my-topic PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: my-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: my-topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: my-topic Partition: 2 Leader: 0 Replicas: 0 Isr: 0
|
修改Topic配置
1
| bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --alter --add-config retention.ms=86400000
|
删除Topic
1
| bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic my-topic
|
生产者工具
控制台生产者
Kafka提供了命令行生产者工具,可用于快速测试消息发送:
1
| bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic
|
执行以上命令后,会进入交互式界面,每行输入一条消息,按回车发送:
1 2 3
| >Hello Kafka >This is a test message >Third message
|
带键的消息发送
1
| bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic --property "parse.key=true" --property "key.separator=:"
|
使用示例:
1 2
| >key1:value1 >key2:value2
|
批量导入数据
从文件批量导入数据:
1
| bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic < messages.txt
|
消费者工具
控制台消费者
基本用法:
1
| bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic
|
这个命令只会消费新产生的消息。如果要从头开始消费所有消息,需添加--from-beginning
参数:
1
| bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning
|
指定消费者组
1
| bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --group my-group
|
显示消息键和值
1
| bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning --property print.key=true --property key.separator=":"
|
格式化输出
1
| bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true
|
消费者组管理
列出所有消费者组
1
| bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
|
输出示例:
1 2
| my-group console-consumer-12345
|
查看消费者组详情
1
| bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
|
输出示例:
1 2 3 4
| GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID my-group my-topic 0 5 5 0 consumer-my-group-1-1234 /127.0.0.1 consumer-my-group-1 my-group my-topic 1 3 3 0 consumer-my-group-1-1234 /127.0.0.1 consumer-my-group-1 my-group my-topic 2 7 7 0 consumer-my-group-1-1234 /127.0.0.1 consumer-my-group-1
|
重置消费者组偏移量
将消费者组的偏移量重置到最早的位置:
1
| bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --reset-offsets --to-earliest --all-topics --execute
|
将消费者组的偏移量重置到特定时间点:
1
| bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --reset-offsets --to-datetime 2020-03-20T00:00:00.000 --all-topics --execute
|
将消费者组的偏移量向前或向后移动特定数量:
1
| bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --reset-offsets --shift-by -10 --topic my-topic --execute
|
删除消费者组
1
| bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group
|
配置管理
查看所有配置
1
| bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe
|
动态修改配置
增加配置:
1
| bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --alter --add-config max.message.bytes=1000000
|
删除配置:
1
| bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes
|
集群管理
查看Broker列表
1
| bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids
|
查看Controller信息
1
| bin/zookeeper-shell.sh localhost:2181 get /controller
|
分区重分配
生成重分配计划:
1
| bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics.json --broker-list "0,1,2" --generate
|
执行重分配:
1
| bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --execute
|
检查重分配状态:
1
| bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --verify
|
性能测试
生产者性能测试
1
| bin/kafka-producer-perf-test.sh --topic my-topic --num-records 1000000 --record-size 1000 --throughput 100000 --producer-props bootstrap.servers=localhost:9092
|
参数说明:
--topic
:测试的Topic
--num-records
:发送的消息总数
--record-size
:每条消息的字节数
--throughput
:目标吞吐量(消息数/秒)
--producer-props
:生产者配置
消费者性能测试
1
| bin/kafka-consumer-perf-test.sh --bootstrap-server localhost:9092 --topic my-topic --messages 1000000 --threads 1
|
参数说明:
--topic
:测试的Topic
--messages
:消费的消息总数
--threads
:消费者线程数
安全工具
生成SSL密钥
1
| bin/kafka-configs.sh --bootstrap-server localhost:9092 --command-config admin-client.properties --entity-type users --entity-name admin --alter --add-config 'SCRAM-SHA-256=[password=admin-secret]'
|
ACL管理
添加ACL:
1
| bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Alice --operation Read --topic my-topic
|
列出所有ACL:
1
| bin/kafka-acls.sh --bootstrap-server localhost:9092 --list
|
常见问题与解决方案
1. Topic无法删除
问题描述:使用kafka-topics.sh --delete
命令后,Topic仍然存在。
解决方案:
- 确认server.properties中的
delete.topic.enable=true
- 清除ZooKeeper中的Topic记录:
1
| bin/zookeeper-shell.sh localhost:2181 rmr /admin/delete_topics/my-topic
|
2. 消费者无法接收消息
问题描述:消费者启动后无法接收新消息。
解决方案:
- 确认消费者组ID是否正确
- 检查消费者偏移量:
1
| bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
|
- 如必要,重置消费者偏移量:
1
| bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --reset-offsets --to-latest --all-topics --execute
|
3. 分区Leader不平衡
问题描述:集群中的分区Leader分布不均。
解决方案:
执行优先副本选举:
1
| bin/kafka-preferred-replica-election.sh --bootstrap-server localhost:9092
|
实战案例:完整的消息流程演示
以下是一个完整演示,从创建Topic到生产和消费消息:
步骤1:创建Topic
1
| bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic demo-topic
|
步骤2:查看Topic详情
1
| bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic demo-topic
|
步骤3:启动消费者(在一个终端)
1
| bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo-topic --group demo-group
|
步骤4:启动生产者(在另一个终端)
1
| bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo-topic
|
然后输入消息:
1 2 3
| >Message 1 >Message 2 >Message 3
|
步骤5:查看消费者组状态
1
| bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group demo-group
|
总结
Kafka命令行工具提供了全面的集群管理和操作功能,是开发和运维Kafka系统的重要工具。熟练掌握这些命令,可以帮助我们更高效地进行Kafka集群的日常管理和问题排查。
本文只是涵盖了最常用的一些命令,更多高级用法可以通过--help
参数或查阅Kafka官方文档获取。
希望本文对你有所帮助,让你在Kafka运维之路上更加得心应手!
参考资料