前言
Apache Kafka是一个分布式的流处理平台,可用于构建实时数据管道和流式应用程序。本文将详细介绍如何在CentOS 7环境下搭建一个高可用的Kafka集群,包括环境准备、ZooKeeper集群安装、Kafka集群部署、配置优化以及基本的运维操作。
架构规划
本教程将搭建一个由3个节点组成的Kafka集群,同时使用3个节点组成ZooKeeper集群来提供协调服务。
服务器IP |
主机名 |
角色 |
192.168.1.100 |
kafka-node1 |
ZooKeeper + Kafka |
192.168.1.101 |
kafka-node2 |
ZooKeeper + Kafka |
192.168.1.102 |
kafka-node3 |
ZooKeeper + Kafka |
软件版本
- CentOS: 7.8
- JDK: 1.8.0_241
- ZooKeeper: 3.5.7
- Kafka: 2.4.0
环境准备
配置主机名与hosts
在所有节点上执行以下操作:
1 2 3 4 5
| hostnamectl set-hostname kafka-node1
vi /etc/hosts
|
添加以下内容到hosts文件:
1 2 3
| 192.168.1.100 kafka-node1 192.168.1.101 kafka-node2 192.168.1.102 kafka-node3
|
关闭防火墙和SELinux
为了简化部署过程,我们暂时关闭防火墙和SELinux:
1 2 3 4 5 6 7
| systemctl stop firewalld systemctl disable firewalld
setenforce 0 sed -i 's/SELINUX=enforcing/SELINUX=disabled/g' /etc/selinux/config
|
注意:在生产环境中,应该配置适当的防火墙规则而不是完全关闭防火墙。
安装JDK
Kafka需要Java环境,所以我们需要先安装JDK:
1 2 3 4 5
| yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
java -version
|
或者,如果你希望使用Oracle JDK:
1 2 3 4 5 6 7 8 9 10 11 12 13
| mkdir -p /usr/local/java tar -zxvf jdk-8u241-linux-x64.tar.gz -C /usr/local/java/
echo 'export JAVA_HOME=/usr/local/java/jdk1.8.0_241' >> /etc/profile echo 'export JRE_HOME=${JAVA_HOME}/jre' >> /etc/profile echo 'export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib' >> /etc/profile echo 'export PATH=${JAVA_HOME}/bin:$PATH' >> /etc/profile source /etc/profile
java -version
|
创建Kafka用户
为了安全起见,我们创建一个专门的用户来运行Kafka服务:
1 2 3 4 5 6 7 8 9
| groupadd kafka useradd -g kafka kafka
mkdir -p /data/kafka mkdir -p /data/zookeeper chown -R kafka:kafka /data/kafka chown -R kafka:kafka /data/zookeeper
|
安装ZooKeeper集群
Kafka使用ZooKeeper来存储集群的元数据和消费者信息,所以我们首先需要安装ZooKeeper集群。
下载与解压ZooKeeper
在所有节点上执行:
1 2 3 4 5 6 7 8 9 10 11 12 13
| cd /opt wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz mv apache-zookeeper-3.5.7-bin zookeeper
mkdir -p /data/zookeeper/data mkdir -p /data/zookeeper/logs chown -R kafka:kafka /opt/zookeeper chown -R kafka:kafka /data/zookeeper
|
配置ZooKeeper
创建ZooKeeper配置文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| cat > /opt/zookeeper/conf/zoo.cfg << EOF tickTime=2000 initLimit=10 syncLimit=5 dataDir=/data/zookeeper/data dataLogDir=/data/zookeeper/logs clientPort=2181 autopurge.snapRetainCount=3 autopurge.purgeInterval=1 server.1=kafka-node1:2888:3888 server.2=kafka-node2:2888:3888 server.3=kafka-node3:2888:3888 EOF
|
创建myid文件:
在kafka-node1上:
1
| echo "1" > /data/zookeeper/data/myid
|
在kafka-node2上:
1
| echo "2" > /data/zookeeper/data/myid
|
在kafka-node3上:
1
| echo "3" > /data/zookeeper/data/myid
|
创建ZooKeeper服务
创建systemd服务文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| cat > /etc/systemd/system/zookeeper.service << EOF [Unit] Description=Apache ZooKeeper After=network.target
[Service] Type=forking User=kafka Group=kafka Environment=JAVA_HOME=/usr/lib/jvm/jre ExecStart=/opt/zookeeper/bin/zkServer.sh start ExecStop=/opt/zookeeper/bin/zkServer.sh stop ExecReload=/opt/zookeeper/bin/zkServer.sh restart Restart=on-failure
[Install] WantedBy=multi-user.target EOF
|
重新加载systemd配置并启动ZooKeeper:
1 2 3 4
| systemctl daemon-reload systemctl start zookeeper systemctl enable zookeeper systemctl status zookeeper
|
验证ZooKeeper集群
使用ZooKeeper客户端连接到集群并验证状态:
1
| /opt/zookeeper/bin/zkServer.sh status
|
连接到ZooKeeper服务:
1
| /opt/zookeeper/bin/zkCli.sh -server kafka-node1:2181
|
在ZooKeeper客户端中执行以下命令:
应该看到ZooKeeper的根节点列表。
安装Kafka集群
在确认ZooKeeper集群工作正常后,我们可以安装Kafka集群。
下载与解压Kafka
在所有节点上执行:
1 2 3 4 5 6 7 8 9 10 11 12
| cd /opt wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
tar -zxvf kafka_2.12-2.4.0.tgz mv kafka_2.12-2.4.0 kafka
mkdir -p /data/kafka/logs chown -R kafka:kafka /opt/kafka chown -R kafka:kafka /data/kafka
|
配置Kafka
在每个节点上创建Kafka配置文件,需要根据节点的不同设置不同的broker.id:
在kafka-node1上(broker.id=1):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| cat > /opt/kafka/config/server.properties << EOF # Broker基本配置 broker.id=1 listeners=PLAINTEXT://kafka-node1:9092 advertised.listeners=PLAINTEXT://kafka-node1:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600
# 日志配置 log.dirs=/data/kafka/logs num.partitions=3 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000
# ZooKeeper配置 zookeeper.connect=kafka-node1:2181,kafka-node2:2181,kafka-node3:2181 zookeeper.connection.timeout.ms=6000
# 副本配置 default.replication.factor=3 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=2
# 其他优化配置 group.initial.rebalance.delay.ms=3000 EOF
|
在kafka-node2上(broker.id=2):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| cat > /opt/kafka/config/server.properties << EOF # Broker基本配置 broker.id=2 listeners=PLAINTEXT://kafka-node2:9092 advertised.listeners=PLAINTEXT://kafka-node2:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600
# 日志配置 log.dirs=/data/kafka/logs num.partitions=3 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000
# ZooKeeper配置 zookeeper.connect=kafka-node1:2181,kafka-node2:2181,kafka-node3:2181 zookeeper.connection.timeout.ms=6000
# 副本配置 default.replication.factor=3 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=2
# 其他优化配置 group.initial.rebalance.delay.ms=3000 EOF
|
在kafka-node3上(broker.id=3):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| cat > /opt/kafka/config/server.properties << EOF # Broker基本配置 broker.id=3 listeners=PLAINTEXT://kafka-node3:9092 advertised.listeners=PLAINTEXT://kafka-node3:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600
# 日志配置 log.dirs=/data/kafka/logs num.partitions=3 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000
# ZooKeeper配置 zookeeper.connect=kafka-node1:2181,kafka-node2:2181,kafka-node3:2181 zookeeper.connection.timeout.ms=6000
# 副本配置 default.replication.factor=3 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=2
# 其他优化配置 group.initial.rebalance.delay.ms=3000 EOF
|
创建Kafka服务
创建systemd服务文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| cat > /etc/systemd/system/kafka.service << EOF [Unit] Description=Apache Kafka After=network.target zookeeper.service
[Service] Type=simple User=kafka Group=kafka Environment=JAVA_HOME=/usr/lib/jvm/jre ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties ExecStop=/opt/kafka/bin/kafka-server-stop.sh Restart=on-failure
[Install] WantedBy=multi-user.target EOF
|
重新加载systemd配置并启动Kafka:
1 2 3 4
| systemctl daemon-reload systemctl start kafka systemctl enable kafka systemctl status kafka
|
验证Kafka集群
创建测试Topic
1
| /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server kafka-node1:9092 --replication-factor 3 --partitions 3 --topic test
|
查看Topic信息
1
| /opt/kafka/bin/kafka-topics.sh --describe --bootstrap-server kafka-node1:9092 --topic test
|
输出应该显示Topic有3个分区和3个副本:
1 2 3 4
| Topic: test PartitionCount: 3 ReplicationFactor: 3 Configs: Topic: test Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: test Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: test Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
|
发送测试消息
打开一个终端发送消息:
1
| /opt/kafka/bin/kafka-console-producer.sh --broker-list kafka-node1:9092 --topic test
|
输入几条测试消息:
1 2 3
| Hello Kafka Cluster This is a test message Testing replication
|
接收测试消息
在另一个终端接收消息:
1
| /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka-node1:9092 --topic test --from-beginning
|
应该能看到之前发送的所有消息。
Kafka集群性能调优
操作系统调优
编辑/etc/sysctl.conf文件,添加以下配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| # 增加文件描述符限制 fs.file-max=100000
# 网络优化 net.core.somaxconn=65535 net.core.netdev_max_backlog=65535 net.ipv4.tcp_max_syn_backlog=65535 net.ipv4.tcp_fin_timeout=30 net.ipv4.tcp_keepalive_time=300 net.ipv4.tcp_keepalive_probes=5 net.ipv4.tcp_keepalive_intvl=15
# 虚拟内存优化 vm.swappiness=1 vm.dirty_ratio=60 vm.dirty_background_ratio=30
|
应用新的系统配置:
编辑/etc/security/limits.conf文件,增加用户资源限制:
1 2 3 4
| kafka soft nofile 65536 kafka hard nofile 65536 kafka soft nproc 32768 kafka hard nproc 32768
|
JVM调优
编辑/opt/kafka/bin/kafka-server-start.sh文件,修改JVM参数:
1
| export KAFKA_HEAP_OPTS="-Xms4g -Xmx4g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"
|
Kafka参数优化
以下是一些关键Kafka参数的优化建议,可以添加到server.properties文件中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| # 性能优化 num.replica.fetchers=4 replica.fetch.max.bytes=1048576 replica.fetch.wait.max.ms=500 replica.lag.time.max.ms=10000
# 吞吐量优化 compression.type=producer message.max.bytes=1000000 fetch.message.max.bytes=1048576
# 持久性优化 min.insync.replicas=2 unclean.leader.election.enable=false auto.create.topics.enable=false
|
监控与维护
使用JMX监控Kafka
编辑/etc/systemd/system/kafka.service文件,添加JMX配置:
1
| Environment="KAFKA_JMX_OPTS=-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9999"
|
重新加载systemd配置并重启Kafka:
1 2
| systemctl daemon-reload systemctl restart kafka
|
常用维护命令
查看Topic列表:
1
| /opt/kafka/bin/kafka-topics.sh --list --bootstrap-server kafka-node1:9092
|
增加Topic分区数:
1
| /opt/kafka/bin/kafka-topics.sh --alter --bootstrap-server kafka-node1:9092 --topic test --partitions 6
|
检查消费者组:
1
| /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server kafka-node1:9092 --list
|
查看消费者组详情:
1
| /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server kafka-node1:9092 --describe --group my-group
|
删除Topic:
1
| /opt/kafka/bin/kafka-topics.sh --delete --bootstrap-server kafka-node1:9092 --topic test
|
常见问题与解决方案
Leader选举失败
问题:Kafka集群中某些分区没有Leader。
解决方案:
- 检查ZooKeeper连接状态
- 确保min.insync.replicas配置适当
- 重启有问题的Broker
性能下降
问题:Kafka集群性能突然下降。
解决方案:
- 检查磁盘使用率,可能需要清理旧数据
- 调整JVM堆大小
- 检查网络连接和吞吐量
- 查看GC日志,优化GC参数
Broker无法启动
问题:Kafka Broker无法启动。
解决方案:
- 检查日志文件中的错误信息
- 确保ZooKeeper集群正常运行
- 验证配置文件中的参数是否正确
- 检查磁盘空间和权限
消息丢失
问题:生产者发送的消息在消费者端丢失。
解决方案:
- 设置合适的acks值(acks=all)
- 增加生产者的重试次数(retries)
- 确保min.insync.replicas配置适当
- 关闭unclean.leader.election.enable选项
总结
通过本文的指导,我们成功在CentOS 7环境下搭建了一个由3个节点组成的高可用Kafka集群。我们从环境准备开始,依次完成了ZooKeeper集群安装、Kafka集群部署、配置优化,并进行了集群验证和性能调优。此外,我们还介绍了一些常用的维护命令和常见问题的解决方案。
Kafka集群的稳定运行对于构建高性能的实时数据处理系统至关重要。定期维护和优化集群配置,可以确保Kafka集群在高负载下仍能保持良好的性能和可靠性。
参考资料