前言

Kafka作为一个高性能的分布式消息系统,提供了一系列强大的命令行工具,这些工具位于Kafka安装目录的bin文件夹下(Windows系统下是bin/windows目录)。本文将详细介绍这些工具的使用方法,帮助开发者和运维人员更高效地管理Kafka集群。

前提条件

在开始使用Kafka命令行工具前,确保:

  1. 已正确安装Kafka(本文基于Kafka 2.4.0版本)
  2. 已启动ZooKeeper服务
  3. 已启动Kafka Broker服务

以下是启动ZooKeeper和Kafka服务的基本命令:

1
2
3
4
5
# 启动ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

# 启动Kafka
bin/kafka-server-start.sh config/server.properties

Windows系统下使用对应的.bat文件:

1
2
3
4
5
# 启动ZooKeeper
bin\windows\zookeeper-server-start.bat config\zookeeper.properties

# 启动Kafka
bin\windows\kafka-server-start.bat config\server.properties

Topic管理

创建Topic

1
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic my-topic

参数说明:

  • --create:创建操作
  • --bootstrap-server:Kafka服务器地址
  • --replication-factor:副本数量
  • --partitions:分区数量
  • --topic:Topic名称

列出所有Topic

1
bin/kafka-topics.sh --list --bootstrap-server localhost:9092

查看Topic详情

1
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-topic

输出示例:

1
2
3
4
Topic: my-topic    PartitionCount: 3    ReplicationFactor: 1    Configs: segment.bytes=1073741824
Topic: my-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: my-topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: my-topic Partition: 2 Leader: 0 Replicas: 0 Isr: 0

修改Topic配置

1
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --alter --add-config retention.ms=86400000

删除Topic

1
bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic my-topic

生产者工具

控制台生产者

Kafka提供了命令行生产者工具,可用于快速测试消息发送:

1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic

执行以上命令后,会进入交互式界面,每行输入一条消息,按回车发送:

1
2
3
>Hello Kafka
>This is a test message
>Third message

带键的消息发送

1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic --property "parse.key=true" --property "key.separator=:"

使用示例:

1
2
>key1:value1
>key2:value2

批量导入数据

从文件批量导入数据:

1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic < messages.txt

消费者工具

控制台消费者

基本用法:

1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic

这个命令只会消费新产生的消息。如果要从头开始消费所有消息,需添加--from-beginning参数:

1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning

指定消费者组

1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --group my-group

显示消息键和值

1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning --property print.key=true --property key.separator=":"

格式化输出

1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true

消费者组管理

列出所有消费者组

1
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

输出示例:

1
2
my-group
console-consumer-12345

查看消费者组详情

1
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

输出示例:

1
2
3
4
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
my-group my-topic 0 5 5 0 consumer-my-group-1-1234 /127.0.0.1 consumer-my-group-1
my-group my-topic 1 3 3 0 consumer-my-group-1-1234 /127.0.0.1 consumer-my-group-1
my-group my-topic 2 7 7 0 consumer-my-group-1-1234 /127.0.0.1 consumer-my-group-1

重置消费者组偏移量

将消费者组的偏移量重置到最早的位置:

1
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --reset-offsets --to-earliest --all-topics --execute

将消费者组的偏移量重置到特定时间点:

1
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --reset-offsets --to-datetime 2020-03-20T00:00:00.000 --all-topics --execute

将消费者组的偏移量向前或向后移动特定数量:

1
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --reset-offsets --shift-by -10 --topic my-topic --execute

删除消费者组

1
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group

配置管理

查看所有配置

1
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe

动态修改配置

增加配置:

1
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --alter --add-config max.message.bytes=1000000

删除配置:

1
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes

集群管理

查看Broker列表

1
bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids

查看Controller信息

1
bin/zookeeper-shell.sh localhost:2181 get /controller

分区重分配

生成重分配计划:

1
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics.json --broker-list "0,1,2" --generate

执行重分配:

1
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --execute

检查重分配状态:

1
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --verify

性能测试

生产者性能测试

1
bin/kafka-producer-perf-test.sh --topic my-topic --num-records 1000000 --record-size 1000 --throughput 100000 --producer-props bootstrap.servers=localhost:9092

参数说明:

  • --topic:测试的Topic
  • --num-records:发送的消息总数
  • --record-size:每条消息的字节数
  • --throughput:目标吞吐量(消息数/秒)
  • --producer-props:生产者配置

消费者性能测试

1
bin/kafka-consumer-perf-test.sh --bootstrap-server localhost:9092 --topic my-topic --messages 1000000 --threads 1

参数说明:

  • --topic:测试的Topic
  • --messages:消费的消息总数
  • --threads:消费者线程数

安全工具

生成SSL密钥

1
bin/kafka-configs.sh --bootstrap-server localhost:9092 --command-config admin-client.properties --entity-type users --entity-name admin --alter --add-config 'SCRAM-SHA-256=[password=admin-secret]'

ACL管理

添加ACL:

1
bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Alice --operation Read --topic my-topic

列出所有ACL:

1
bin/kafka-acls.sh --bootstrap-server localhost:9092 --list

常见问题与解决方案

1. Topic无法删除

问题描述:使用kafka-topics.sh --delete命令后,Topic仍然存在。

解决方案

  1. 确认server.properties中的delete.topic.enable=true
  2. 清除ZooKeeper中的Topic记录:
1
bin/zookeeper-shell.sh localhost:2181 rmr /admin/delete_topics/my-topic

2. 消费者无法接收消息

问题描述:消费者启动后无法接收新消息。

解决方案

  1. 确认消费者组ID是否正确
  2. 检查消费者偏移量:
1
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
  1. 如必要,重置消费者偏移量:
1
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --reset-offsets --to-latest --all-topics --execute

3. 分区Leader不平衡

问题描述:集群中的分区Leader分布不均。

解决方案
执行优先副本选举:

1
bin/kafka-preferred-replica-election.sh --bootstrap-server localhost:9092

实战案例:完整的消息流程演示

以下是一个完整演示,从创建Topic到生产和消费消息:

步骤1:创建Topic

1
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic demo-topic

步骤2:查看Topic详情

1
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic demo-topic

步骤3:启动消费者(在一个终端)

1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo-topic --group demo-group

步骤4:启动生产者(在另一个终端)

1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo-topic

然后输入消息:

1
2
3
>Message 1
>Message 2
>Message 3

步骤5:查看消费者组状态

1
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group demo-group

总结

Kafka命令行工具提供了全面的集群管理和操作功能,是开发和运维Kafka系统的重要工具。熟练掌握这些命令,可以帮助我们更高效地进行Kafka集群的日常管理和问题排查。

本文只是涵盖了最常用的一些命令,更多高级用法可以通过--help参数或查阅Kafka官方文档获取。

希望本文对你有所帮助,让你在Kafka运维之路上更加得心应手!

参考资料