前言

Apache Kafka是一个分布式的流处理平台,可用于构建实时数据管道和流式应用程序。本文将详细介绍如何在CentOS 7环境下搭建一个高可用的Kafka集群,包括环境准备、ZooKeeper集群安装、Kafka集群部署、配置优化以及基本的运维操作。

架构规划

本教程将搭建一个由3个节点组成的Kafka集群,同时使用3个节点组成ZooKeeper集群来提供协调服务。

服务器IP 主机名 角色
192.168.1.100 kafka-node1 ZooKeeper + Kafka
192.168.1.101 kafka-node2 ZooKeeper + Kafka
192.168.1.102 kafka-node3 ZooKeeper + Kafka

软件版本

  • CentOS: 7.8
  • JDK: 1.8.0_241
  • ZooKeeper: 3.5.7
  • Kafka: 2.4.0

环境准备

配置主机名与hosts

在所有节点上执行以下操作:

1
2
3
4
5
# 设置主机名
hostnamectl set-hostname kafka-node1 # 根据不同节点设置对应的主机名

# 编辑hosts文件
vi /etc/hosts

添加以下内容到hosts文件:

1
2
3
192.168.1.100 kafka-node1
192.168.1.101 kafka-node2
192.168.1.102 kafka-node3

关闭防火墙和SELinux

为了简化部署过程,我们暂时关闭防火墙和SELinux:

1
2
3
4
5
6
7
# 关闭防火墙
systemctl stop firewalld
systemctl disable firewalld

# 关闭SELinux
setenforce 0
sed -i 's/SELINUX=enforcing/SELINUX=disabled/g' /etc/selinux/config

注意:在生产环境中,应该配置适当的防火墙规则而不是完全关闭防火墙。

安装JDK

Kafka需要Java环境,所以我们需要先安装JDK:

1
2
3
4
5
# 安装OpenJDK
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel

# 验证安装
java -version

或者,如果你希望使用Oracle JDK:

1
2
3
4
5
6
7
8
9
10
11
12
13
# 下载Oracle JDK (需要预先下载jdk-8u241-linux-x64.tar.gz)
mkdir -p /usr/local/java
tar -zxvf jdk-8u241-linux-x64.tar.gz -C /usr/local/java/

# 配置环境变量
echo 'export JAVA_HOME=/usr/local/java/jdk1.8.0_241' >> /etc/profile
echo 'export JRE_HOME=${JAVA_HOME}/jre' >> /etc/profile
echo 'export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib' >> /etc/profile
echo 'export PATH=${JAVA_HOME}/bin:$PATH' >> /etc/profile
source /etc/profile

# 验证安装
java -version

创建Kafka用户

为了安全起见,我们创建一个专门的用户来运行Kafka服务:

1
2
3
4
5
6
7
8
9
# 创建用户和用户组
groupadd kafka
useradd -g kafka kafka

# 创建数据目录
mkdir -p /data/kafka
mkdir -p /data/zookeeper
chown -R kafka:kafka /data/kafka
chown -R kafka:kafka /data/zookeeper

安装ZooKeeper集群

Kafka使用ZooKeeper来存储集群的元数据和消费者信息,所以我们首先需要安装ZooKeeper集群。

下载与解压ZooKeeper

在所有节点上执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
# 下载ZooKeeper
cd /opt
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz

# 解压
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin zookeeper

# 创建数据和日志目录
mkdir -p /data/zookeeper/data
mkdir -p /data/zookeeper/logs
chown -R kafka:kafka /opt/zookeeper
chown -R kafka:kafka /data/zookeeper

配置ZooKeeper

创建ZooKeeper配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 创建配置文件
cat > /opt/zookeeper/conf/zoo.cfg << EOF
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/data
dataLogDir=/data/zookeeper/logs
clientPort=2181
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
server.1=kafka-node1:2888:3888
server.2=kafka-node2:2888:3888
server.3=kafka-node3:2888:3888
EOF

创建myid文件:

在kafka-node1上:

1
echo "1" > /data/zookeeper/data/myid

在kafka-node2上:

1
echo "2" > /data/zookeeper/data/myid

在kafka-node3上:

1
echo "3" > /data/zookeeper/data/myid

创建ZooKeeper服务

创建systemd服务文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
cat > /etc/systemd/system/zookeeper.service << EOF
[Unit]
Description=Apache ZooKeeper
After=network.target

[Service]
Type=forking
User=kafka
Group=kafka
Environment=JAVA_HOME=/usr/lib/jvm/jre
ExecStart=/opt/zookeeper/bin/zkServer.sh start
ExecStop=/opt/zookeeper/bin/zkServer.sh stop
ExecReload=/opt/zookeeper/bin/zkServer.sh restart
Restart=on-failure

[Install]
WantedBy=multi-user.target
EOF

重新加载systemd配置并启动ZooKeeper:

1
2
3
4
systemctl daemon-reload
systemctl start zookeeper
systemctl enable zookeeper
systemctl status zookeeper

验证ZooKeeper集群

使用ZooKeeper客户端连接到集群并验证状态:

1
/opt/zookeeper/bin/zkServer.sh status

连接到ZooKeeper服务:

1
/opt/zookeeper/bin/zkCli.sh -server kafka-node1:2181

在ZooKeeper客户端中执行以下命令:

1
ls /

应该看到ZooKeeper的根节点列表。

安装Kafka集群

在确认ZooKeeper集群工作正常后,我们可以安装Kafka集群。

下载与解压Kafka

在所有节点上执行:

1
2
3
4
5
6
7
8
9
10
11
12
# 下载Kafka
cd /opt
wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz

# 解压
tar -zxvf kafka_2.12-2.4.0.tgz
mv kafka_2.12-2.4.0 kafka

# 创建数据目录
mkdir -p /data/kafka/logs
chown -R kafka:kafka /opt/kafka
chown -R kafka:kafka /data/kafka

配置Kafka

在每个节点上创建Kafka配置文件,需要根据节点的不同设置不同的broker.id:

在kafka-node1上(broker.id=1):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
cat > /opt/kafka/config/server.properties << EOF
# Broker基本配置
broker.id=1
listeners=PLAINTEXT://kafka-node1:9092
advertised.listeners=PLAINTEXT://kafka-node1:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# 日志配置
log.dirs=/data/kafka/logs
num.partitions=3
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

# ZooKeeper配置
zookeeper.connect=kafka-node1:2181,kafka-node2:2181,kafka-node3:2181
zookeeper.connection.timeout.ms=6000

# 副本配置
default.replication.factor=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

# 其他优化配置
group.initial.rebalance.delay.ms=3000
EOF

在kafka-node2上(broker.id=2):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
cat > /opt/kafka/config/server.properties << EOF
# Broker基本配置
broker.id=2
listeners=PLAINTEXT://kafka-node2:9092
advertised.listeners=PLAINTEXT://kafka-node2:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# 日志配置
log.dirs=/data/kafka/logs
num.partitions=3
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

# ZooKeeper配置
zookeeper.connect=kafka-node1:2181,kafka-node2:2181,kafka-node3:2181
zookeeper.connection.timeout.ms=6000

# 副本配置
default.replication.factor=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

# 其他优化配置
group.initial.rebalance.delay.ms=3000
EOF

在kafka-node3上(broker.id=3):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
cat > /opt/kafka/config/server.properties << EOF
# Broker基本配置
broker.id=3
listeners=PLAINTEXT://kafka-node3:9092
advertised.listeners=PLAINTEXT://kafka-node3:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# 日志配置
log.dirs=/data/kafka/logs
num.partitions=3
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

# ZooKeeper配置
zookeeper.connect=kafka-node1:2181,kafka-node2:2181,kafka-node3:2181
zookeeper.connection.timeout.ms=6000

# 副本配置
default.replication.factor=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

# 其他优化配置
group.initial.rebalance.delay.ms=3000
EOF

创建Kafka服务

创建systemd服务文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
cat > /etc/systemd/system/kafka.service << EOF
[Unit]
Description=Apache Kafka
After=network.target zookeeper.service

[Service]
Type=simple
User=kafka
Group=kafka
Environment=JAVA_HOME=/usr/lib/jvm/jre
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-failure

[Install]
WantedBy=multi-user.target
EOF

重新加载systemd配置并启动Kafka:

1
2
3
4
systemctl daemon-reload
systemctl start kafka
systemctl enable kafka
systemctl status kafka

验证Kafka集群

创建测试Topic

1
/opt/kafka/bin/kafka-topics.sh --create --bootstrap-server kafka-node1:9092 --replication-factor 3 --partitions 3 --topic test

查看Topic信息

1
/opt/kafka/bin/kafka-topics.sh --describe --bootstrap-server kafka-node1:9092 --topic test

输出应该显示Topic有3个分区和3个副本:

1
2
3
4
Topic: test    PartitionCount: 3    ReplicationFactor: 3    Configs: 
Topic: test Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: test Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2

发送测试消息

打开一个终端发送消息:

1
/opt/kafka/bin/kafka-console-producer.sh --broker-list kafka-node1:9092 --topic test

输入几条测试消息:

1
2
3
Hello Kafka Cluster
This is a test message
Testing replication

接收测试消息

在另一个终端接收消息:

1
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka-node1:9092 --topic test --from-beginning

应该能看到之前发送的所有消息。

Kafka集群性能调优

操作系统调优

编辑/etc/sysctl.conf文件,添加以下配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 增加文件描述符限制
fs.file-max=100000

# 网络优化
net.core.somaxconn=65535
net.core.netdev_max_backlog=65535
net.ipv4.tcp_max_syn_backlog=65535
net.ipv4.tcp_fin_timeout=30
net.ipv4.tcp_keepalive_time=300
net.ipv4.tcp_keepalive_probes=5
net.ipv4.tcp_keepalive_intvl=15

# 虚拟内存优化
vm.swappiness=1
vm.dirty_ratio=60
vm.dirty_background_ratio=30

应用新的系统配置:

1
sysctl -p

编辑/etc/security/limits.conf文件,增加用户资源限制:

1
2
3
4
kafka soft nofile 65536
kafka hard nofile 65536
kafka soft nproc 32768
kafka hard nproc 32768

JVM调优

编辑/opt/kafka/bin/kafka-server-start.sh文件,修改JVM参数:

1
export KAFKA_HEAP_OPTS="-Xms4g -Xmx4g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"

Kafka参数优化

以下是一些关键Kafka参数的优化建议,可以添加到server.properties文件中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 性能优化
num.replica.fetchers=4
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
replica.lag.time.max.ms=10000

# 吞吐量优化
compression.type=producer
message.max.bytes=1000000
fetch.message.max.bytes=1048576

# 持久性优化
min.insync.replicas=2
unclean.leader.election.enable=false
auto.create.topics.enable=false

监控与维护

使用JMX监控Kafka

编辑/etc/systemd/system/kafka.service文件,添加JMX配置:

1
Environment="KAFKA_JMX_OPTS=-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9999"

重新加载systemd配置并重启Kafka:

1
2
systemctl daemon-reload
systemctl restart kafka

常用维护命令

查看Topic列表:

1
/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server kafka-node1:9092

增加Topic分区数:

1
/opt/kafka/bin/kafka-topics.sh --alter --bootstrap-server kafka-node1:9092 --topic test --partitions 6

检查消费者组:

1
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server kafka-node1:9092 --list

查看消费者组详情:

1
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server kafka-node1:9092 --describe --group my-group

删除Topic:

1
/opt/kafka/bin/kafka-topics.sh --delete --bootstrap-server kafka-node1:9092 --topic test

常见问题与解决方案

Leader选举失败

问题:Kafka集群中某些分区没有Leader。

解决方案

  1. 检查ZooKeeper连接状态
  2. 确保min.insync.replicas配置适当
  3. 重启有问题的Broker

性能下降

问题:Kafka集群性能突然下降。

解决方案

  1. 检查磁盘使用率,可能需要清理旧数据
  2. 调整JVM堆大小
  3. 检查网络连接和吞吐量
  4. 查看GC日志,优化GC参数

Broker无法启动

问题:Kafka Broker无法启动。

解决方案

  1. 检查日志文件中的错误信息
  2. 确保ZooKeeper集群正常运行
  3. 验证配置文件中的参数是否正确
  4. 检查磁盘空间和权限

消息丢失

问题:生产者发送的消息在消费者端丢失。

解决方案

  1. 设置合适的acks值(acks=all)
  2. 增加生产者的重试次数(retries)
  3. 确保min.insync.replicas配置适当
  4. 关闭unclean.leader.election.enable选项

总结

通过本文的指导,我们成功在CentOS 7环境下搭建了一个由3个节点组成的高可用Kafka集群。我们从环境准备开始,依次完成了ZooKeeper集群安装、Kafka集群部署、配置优化,并进行了集群验证和性能调优。此外,我们还介绍了一些常用的维护命令和常见问题的解决方案。

Kafka集群的稳定运行对于构建高性能的实时数据处理系统至关重要。定期维护和优化集群配置,可以确保Kafka集群在高负载下仍能保持良好的性能和可靠性。

参考资料