kafka的诞生,是为了解决linkedin的数据管道问题,起初linkedin采用了ActiveMQ来进行数据交换,大约是在2010年前后,那时的ActiveMQ还远远无法满足linkedin对数据传递系统的要求,经常由于各种缺陷而导致消息阻塞或者服务无法正常访问,为了能够解决这个问题,linkedin决定研发自己的消息传递系统,当时linkedin的首席架构师jay kreps便开始组织团队进行消息传递系统的研发。

参考原文:Kafka集群部署实战

Kafka

Kafka 是一种高性能的分布式消息队列系统,通过合理的配置和管理,可以有效地利用 Kafka 的特性,满足企业对大规模数据流处理的需求。在部署和使用 Kafka 集群时,需要注意网络配置、主机名解析等问题,以确保集群的稳定运行。

Kafka是什么?

Kafka‌是一个分布式流处理平台,最初由LinkedIn开发,后成为Apache开源项目。Kafka主要用于构建实时数据管道和流应用,具有高吞吐量、低延迟和可扩展性等特点‌。Kafka的核心功能包括:

  • 消息发布与订阅‌:Kafka允许应用程序发布和订阅消息流。
  • 分布式日志‌:Kafka将数据持久化到磁盘,支持高吞吐量的数据收集。
  • 流处理‌:支持复杂的流处理操作,如转换、过滤和聚合

kafka 可以脱离 zookeeper 单独使用吗?为什么?

kafka 不能脱离 zookeeper 单独使用,因为 kafka 使用 zookeeper 管理和协调 kafka 的节点服务器

kafka 有几种数据保留的策略?

kafka 有两种数据保存策略:按照过期时间保留按照存储的消息大小保留

kafka 设置了 7 天和 10G 清除数据,第五天消息达 10G,kafka 如何处理?

这个时候 kafka 会执行数据清除工作,时间和大小不论那个满足条件,都会清空数据

什么情况会导致 kafka 运行变慢?

cpu 性能瓶颈
磁盘读写瓶颈
网络瓶颈

使用 kafka 集群需要注意什么?

集群的数量不是越多越好,最好不要超过 7 个,因为节点越多,消息复制需要的时间就越长,整个群组的吞吐量就越低
集群数量最好是单数,因为超过一半故障集群就不能用了,设置为单数容错率更高

Kafka与ZooKeeper的关系

Kafka依赖 ZooKeeper来管理集群元数据、控制器选举和消费者组协调等任务。ZooKeeper 为 Kafka 提供了选主(leader election)、集群成员管理等核心功能,确保Kafka能够在多个节点之间进行有效的通信和管理‌。随着 Kafka 的发展,其对 ZooKeeper的依赖也带来了一些问题:

  • 复杂性增加‌:ZooKeeper 是一个独立的外部组件,增加了运维的复杂度。
  • 性能瓶颈‌:在高负载情况下,ZooKeeper 可能成为系统的瓶颈,限制Kafka的扩展能力。
  • 一致性问题‌:Kafka内部的一致性模型与 ZooKeeper 的一致性模型有所不同,可能导致状态不一致,影响系统的稳定性和消息传递的可靠性‌。

Kafka集群部署

集群运维容量规划建议

指标 推荐值 监控阈值
分区数量/Broker ≤4000 ≥3500 告警
副本同步延迟 <100ms >500ms 告警
磁盘使用率 ≤75% ≥85% 紧急处理
网络吞吐/Broker ≤70% 带宽 ≥85% 限流

单点部署 Kafka

  1. 下载并解压

    1
    2
    3
    4
    # 下载 Kafka
    wget https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
    # 解压 Kafka
    tar xf kafka_2.13-3.9.0.tgz -C /usr/local/
  2. 修改配置文件

    1
    2
    3
    4
    5
    6
    # Kafka 的唯一标识
    broker.id=91
    # 修改数据目录
    log.dirs=/var/lib/kafka
    # 指定 Kafka 的元数据存储在 Zookeeper 集群的路径(znodes)
    zookeeper.connect=10.0.0.91:2181,10.0.0.92:2181,10.0.0.93:2181/oldboyedu-kafka-3.9.0
  3. 配置环境变量

    1
    2
    3
    4
    5
    [root@elk91 ~]# cat /etc/profile.d/kafka.sh
    #!/bin/bash

    export KAFKA_HOME=/usr/local/kafka_2.13-3.9.0
    export PATH=$PATH:$KAFKA_HOME/bin
  4. 启动并验证是否成功

    1
    2
    3
    4
    5
    6
    7
    # 启动 kafka
    source /etc/profile.d/kafka.sh
    kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
    # 验证 Kafka 是否启动成功
    ss -ntl | grep 9092
    # 出现如下则成功
    # LISTEN 0 50 *:9092 *:*

集群部署 Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 1、拷贝 Kafka 数据到集群节点(92/93)
# [root@elk91 ~]#
scp -r /usr/local/kafka_2.13-3.9.0/ 10.0.0.92:/usr/local/
scp -r /usr/local/kafka_2.13-3.9.0/ 10.0.0.93:/usr/local/
scp /etc/profile.d/kafka.sh 10.0.0.92:/etc/profile.d/
scp /etc/profile.d/kafka.sh 10.0.0.93:/etc/profile.d/

# 2、修改 92/93 节点的配置文件
# [root@elk92 ~]# :2、修改 92 节点的配置文件
sed -i '/^broker.id/s#91#92#' /usr/local/kafka_2.13-3.9.0/config/server.properties
# [root@elk93 ~]# :修改 93 节点的配置文件
sed -i '/^broker.id/s#91#93#' /usr/local/kafka_2.13-3.9.0/config/server.properties

# 3、启动其他节点的 Kafka
# [root@elk92 ~]# :启动92节点的 Kafka
source /etc/profile.d/kafka.sh && kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
# [root@elk93 ~]# :启动93节点的 Kafka
source /etc/profile.d/kafka.sh && kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

# 4、验证 Zookeeper 集群数据是否写入成功
# [root@elk91 ~]#
echo "ls /oldboyedu-kafka-3.9.0" | nc 10.0.0.91 2181

常见问题及解决方案

未知主机异常

在连接 [Kafka 集群](https://so.csdn.net/so/search?q=Kafka 集群&spm=1001.2101.3001.7020)时,出现未知主机异常,日志显示:

1
2
[2025-03-17 10:53:58,604] WARN [Controller id=91, targetBrokerId=93] Error connecting to node elk93:9092 (id: 93 rack: null) (org.apache.kafka.clients.NetworkClient)
java.net.UnknownHostException: elk93

这是由于 Kafka 集群在连接时进行了反向解析,而主机名未正确解析。需要在 Kafka 配置文件中指定监听的 IP 地址:

1
2
3
4
5
6
# 修改配置文件监听的IP地址
listeners=PLAINTEXT://10.0.0.91:9092

# [root@elk91 ~]# 重启kafka
kafka-server-stop.sh
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

Topic 命令执行超时

执行 Kafka Topic 命令时,出现超时错误:

1
Error while executing topic command : Timed out waiting for a node assignment. Call: listTopics

这可能是由于 Kafka 集群节点间网络通信不畅或配置错误导致的。解决方案:

  • 检查 Kafka 集群节点间的网络连通性,确保各节点间可以正常通信。

  • 确保 Kafka 配置文件中的 zookeeper.connect 配置正确,能够连接到 Zookeeper 集群。

  • 检查 Kafka 服务是否正常运行,各节点的监听地址和端口是否正确。


Kafka 常用脚本管理

Topic 管理

1
2
3
4
5
6
7
8
9
10
11
12
# 查看现有的 Topics 列表
kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --list
# 创建指定分区副本的 Topic
kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --create --partitions 2 --replication-factor 3 --topic tml666

# 查看分区的详细信息
kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --topic tml666 --describe
# 修改分区数
kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --alter --partitions 5 --topic tml666

# 删除 Topic
kafka-topics.sh --bootstrap-server 10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092 --delete --topic tml666

Producer 脚本

1
2
3
4
# 创建生产者程序
kafka-console-producer.sh --bootstrap-server 10.0.0.92:9092 --topic producer-001
# 查看现有的 Topics 列表
kafka-topics.sh --bootstrap-server 10.0.0.92:9092 --list

Consumer 脚本

1
2
3
4
# 创建消费者程序
kafka-console-consumer.sh --bootstrap-server 10.0.0.91:9092 --topic producer-001
# 从最老的 offset 获取数据
kafka-console-consumer.sh --bootstrap-server 10.0.0.91:9092 --topic producer-001 --from-beginning

Consumer Group 脚本

1
2
3
4
5
6
# 创建 Topic
kafka-topics.sh --bootstrap-server 10.0.0.92:9092 --create --topic consumer-grop001 --partitions 3 --replication-factor 1
# 创建生产者写入数据
kafka-console-consumer.sh --bootstrap-server 10.0.0.91:9092 --topic consumer-grop001 --consumer-property group.id=xixi --from-beginning
# 查看消费者组的详细信息
kafka-consumer-groups.sh --bootstrap-server 10.0.0.92:9092 --group xixi --describe ;echo