ElasticStack Kafka 缓冲区
Elastic Stack(原ELK Stack)是一套由Elastic公司开发的开源数据搜索、分析和可视化工具集合,核心组件包括Elasticsearch、Kibana、Beats和Logstash,广泛应用于日志分析、安全监控、业务智能等领域。 Logstash 能够动态地采集、转换和传输数据,不受格式或复杂度的影响。利用 Grok 从非结构化数据中派生出结构,从 IP 地址解码出地理坐标,匿名化或排除敏感字段,并简化整体处理过程。
参考文章:
Kafka
简介
Kafka 是一种高性能的分布式消息队列系统,通过合理的配置和管理,可有效利用 Kafka 特性,满足企业对大规模数据流处理的需求。
- 高吞吐
(海量读写数据,缺点:不支持对象类型传输...)、分布式消息系统 - 概念:
Producer:生产者(消息的来源)Consumer:消费者(消息输出)Topic:主题(消息传递的约定) - 消息系统介绍 一个消息系统负责将数据从一个应用传递到另外一个应用, 应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。 有两种主要的消息传递模式:
点对点传递模式、发布-订阅模式。(kafka采用) 
ELK 为什么要结合Kafka
ELK(Elasticsearch、Logstash、Kibana)结合 Kafka的主要原因是为了解决日志采集和处理的性能和扩展性问题。 在Kafka中,每个topic都有一个或多个partition,而每个partition都有一个Leader副本。Leader副本负责处理该partition的所有读写请求。当Kafka集群中的节点发生故障时,Leader副本会进行选举,以确保集群的稳定性和数据的持久性。
Kafka在ELK架构中的作用
- 数据缓冲和扩展性:Kafka 作为一个高吞吐量的分布式消息系统,能够均衡网络传输,降低网络闭塞的风险,确保数据不丢失,并且为系统之间的解耦提供了更好的灵活性和扩展性。
 - 异步处理:通过 Kafka 进行日志的异步传输,可以减少对本地磁盘I/O的影响,提高系统的整体性能和稳定性。
 - 峰值处理能力:Kafka 能够使关键组件顶住突发的访问压力,避免因突发超负荷请求而崩溃。
 
Kafka在日志处理流程中的具体作用
- 日志收集: Filebeat收集日志,并将其发送到Kafka。
 - 日志传输:Kafka作为消息队列,缓存日志数据,确保数据的可靠传输和存储。
 - 日志处理: Logstash订阅 Kafka 的主题,获取日志消息内容,并将其格式化和存储到 Elasticsearch 中。
 
Kafka集群部署
单点部署 Kafka
1  | # 1.下载并解压  | 
集群部署 Kafka
1  | # 1、拷贝 Kafka 数据到集群节点(92/93)  | 
ELK 对接Kafka集群
ELK(Elasticsearch、Logstash、Kibana)。
此处探讨如何通过这一架构优化,实现高效、可靠且可扩展的日志处理解决方案,以应对日益增长的数据量和复杂多变的业务需求,同时减轻Logstash压力并降低其与Filebeat的耦合性,提升整个系统的性能与稳定性,为企业的数据驱动决策提供坚实的技术支撑。
Kafka集群特性适配 :Kafka具备高吞吐量(如单机每秒可处理10w + /s)、高可用性(通过多副本机制保障数据不丢失)、强扩展性(可方便地进行集群扩展以应对数据增长)以及丰富的生态集成能力(与多种编程语言和工具兼容良好)等特点,使其成为对接ELK的理想选择。其强大的消息队列功能能够很好地满足日志数据海量、实时性要求高的处理需求,确保数据在产生端(如Filebeat采集的日志)和消费端(如Logstash后续处理)之间的高效流转和可靠存储。
架构图解

1  | 为了减轻Logstash压力以及Logstash和filebeat的耦合性,考虑在Logstash前面加一套MQ集群。  | 
架构实现
首先需要有一套kafka集群,Elasticsearch、Logstash、Kibana都已安装。
filebeat生产kafka集群数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27# 编写filebeat并启动
[root@elk93 /etc/filebeat/config]# cat filebeat_tcp-to-kafka.yaml .
filebeat.inputs:
- type: tcp
host: "0.0.0.0:9000"
# 数据输出到kafka
output.kafka:
# 指定kafka集群的地址
hosts:
- 10.0.0.91:9092
- 10.0.0.92:9092
- 10.0.0.93:9092
# 指定topic
topic: novacao-linux96-kafka
[root@elk93 /etc/filebeat/config]# rm -rf /var/lib/filebeat/
[root@elk93 /etc/filebeat/config]# filebeat -e -c `pwd`/filebeat_tcp-to-kafka.yaml
# 发送测试数据
[root@elk91 ~]# echo helllllllllllllllo |nc 10.0.0.93 9000
# kafka验证数据
[root@elk92 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.93:9092 --topic novacao-linux96-kafka --from-beginning
.....
{"@timestamp":"2025-03-17T12:35:18.320Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.17.28"},"log":{"source":{"address":"10.0.0.91:55810"}},"input":{"type":"tcp"},"agent":{"name":"elk93","type":"filebeat","version":"7.17.28","hostname":"elk93","ephemeral_id":"73d1dee2-d555-4955-b689-75d602e1b5e0","id":"ced21de3-ed8a-4601-acba-07f0d7db5a5a"},"ecs":{"version":"1.12.0"},"host":{"name":"elk93"},"message":"helllllllllllllllo"}Logstash消费kafka集群数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66# kibana基于开发工具创建账号
POST /_security/api_key
{
"name": "Linux96",
"role_descriptors": {
"filebeat_monitoring": {
"cluster": ["all"],
"index": [
{
"names": ["novacao-logstash-kafka*"],
"privileges": ["all"]
}
]
}
}
}
# 生成实例
{
"id" : "QSYgpJUBD3ll3qToqN4V",
"name" : "Linux96",
"api_key" : "EWyBlHEHTnSQlALuB41hpw",
"encoded" : "UVNZZ3BKVUJEM2xsM3FUb3FONFY6RVd5QmxIRUhUblNRbEFMdUI0MWhwdw=="
}
# 解码数据
[root@elk91 ~]# echo UVNZZ3BKVUJEM2xsM3FUb3FONFY6RVd5QmxIRUhUblNRbEFMdUI0MWhwdw== |base64 -d ;echo
QSYgpJUBD3ll3qToqN4V:EWyBlHEHTnSQlALuB41hpw
# Logstash消费数据
[root@elk93 /etc/logstash/conf.d]# cat 09-logstash-to-ES_api-keys.conf
input {
kafka {
# 指定kafka集群的地址
bootstrap_servers => "10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092"
# 指定从kafka哪个topic拉取数据
topics => ["novacao-linux96-kafka"]
# 指定消费者组
group_id => "linux96-001"
# 指定拉取数据offset的位置点,常用值:earliest(从头拉取数据),latest(从最新的位置拉取数据)
auto_offset_reset => "earliest"
}
}
filter {
json {
source => "message"
}
mutate {
remove_field => [ "agent","@version","ecs","input","log" ]
}
}
output {
# stdout {
# codec => rubydebug
# }
elasticsearch {
hosts => ["10.0.0.91:9200","10.0.0.92:9200","10.0.0.93:9200"]
index => "novacao-logstash-kafka"
api_key => "QSYgpJUBD3ll3qToqN4V:EWyBlHEHTnSQlALuB41hpw"
ssl => true
ssl_certificate_verification => false
}
}
# 启用logstash
[root@elk93 /etc/logstash/conf.d]# logstash -rf 09-logstash-to-ES_api-keys.confKibana查看数据:http://localhost:5601/




