‌Elastic Stack(原ELK Stack)是一套由Elastic公司开发的开源数据搜索、分析和可视化工具集合,核心组件包括Elasticsearch、Kibana、Beats和Logstash‌,广泛应用于日志分析、安全监控、业务智能等领域。‌‌‌‌ Logstash 能够动态地采集、转换和传输数据,不受格式或复杂度的影响。利用 Grok 从非结构化数据中派生出结构,从 IP 地址解码出地理坐标,匿名化或排除敏感字段,并简化整体处理过程。

参考文章:

ElasticSearch深入解析(一):Elastic Stack全景

ElasticStack对接kafka集群

Kafka

简介

Kafka 是一种高性能的分布式消息队列系统,通过合理的配置和管理,可有效利用 Kafka 特性,满足企业对大规模数据流处理的需求。

  • 高吞吐(海量读写数据,缺点:不支持对象类型传输...)分布式消息系统
  • 概念:Producer:生产者(消息的来源) Consumer:消费者(消息输出) Topic:主题(消息传递的约定)
  • 消息系统介绍 一个消息系统负责将数据从一个应用传递到另外一个应用, 应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。 有两种主要的消息传递模式:点对点传递模式、 发布-订阅模式。(kafka采用)

ELK 为什么要结合Kafka

‌ELK(Elasticsearch、Logstash、Kibana)结合 Kafka的主要原因是为了解决日志采集和处理的性能和扩展性问题‌。 在Kafka中,每个topic都有一个或多个partition,而每个partition都有一个Leader副本。Leader副本负责处理该partition的所有读写请求。当Kafka集群中的节点发生故障时,Leader副本会进行选举,以确保集群的稳定性和数据的持久性。

Kafka在ELK架构中的作用

  • 数据缓冲和扩展性‌:Kafka 作为一个高吞吐量的分布式消息系统,能够均衡网络传输,降低网络闭塞的风险,确保数据不丢失,并且为系统之间的解耦提供了更好的灵活性和扩展性‌。
  • 异步处理‌:通过 Kafka 进行日志的异步传输,可以减少对本地磁盘I/O的影响,提高系统的整体性能和稳定性‌。
  • 峰值处理能力‌:Kafka 能够使关键组件顶住突发的访问压力,避免因突发超负荷请求而崩溃‌。

Kafka在日志处理流程中的具体作用

  • 日志收集‌: Filebeat收集日志,并将其发送到Kafka‌。
  • 日志传输‌:Kafka作为消息队列,缓存日志数据,确保数据的可靠传输和存储‌。
  • 日志处理‌: Logstash订阅 Kafka 的主题,获取日志消息内容,并将其格式化和存储到 Elasticsearch 中‌。

Kafka集群部署

单点部署 Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 1.下载并解压
# 下载 Kafka
wget https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
# 解压 Kafka
tar xf kafka_2.13-3.9.0.tgz -C /usr/local/

# 2.修改配置文件
# Kafka 的唯一标识
broker.id=91
# 修改数据目录
log.dirs=/var/lib/kafka
# 指定 Kafka 的元数据存储在 Zookeeper 集群的路径(znodes)
zookeeper.connect=10.0.0.91:2181,10.0.0.92:2181,10.0.0.93:2181/oldboyedu-kafka-3.9.0

# 3.配置环境变量
[root@elk91 ~]# cat /etc/profile.d/kafka.sh
#!/bin/bash

export KAFKA_HOME=/usr/local/kafka_2.13-3.9.0
export PATH=$PATH:$KAFKA_HOME/bin

# 4.启动并验证是否成功
# 启动 kafka
source /etc/profile.d/kafka.sh
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
# 验证 Kafka 是否启动成功
ss -ntl | grep 9092
# 出现如下则成功
# LISTEN 0 50 *:9092 *:*

集群部署 Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 1、拷贝 Kafka 数据到集群节点(92/93)
# [root@elk91 ~]#
scp -r /usr/local/kafka_2.13-3.9.0/ 10.0.0.92:/usr/local/
scp -r /usr/local/kafka_2.13-3.9.0/ 10.0.0.93:/usr/local/
scp /etc/profile.d/kafka.sh 10.0.0.92:/etc/profile.d/
scp /etc/profile.d/kafka.sh 10.0.0.93:/etc/profile.d/

# 2、修改 92/93 节点的配置文件
# [root@elk92 ~]# :2、修改 92 节点的配置文件
sed -i '/^broker.id/s#91#92#' /usr/local/kafka_2.13-3.9.0/config/server.properties
# [root@elk93 ~]# :修改 93 节点的配置文件
sed -i '/^broker.id/s#91#93#' /usr/local/kafka_2.13-3.9.0/config/server.properties

# 3、启动其他节点的 Kafka
# [root@elk92 ~]# :启动92节点的 Kafka
source /etc/profile.d/kafka.sh && kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
# [root@elk93 ~]# :启动93节点的 Kafka
source /etc/profile.d/kafka.sh && kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

# 4、验证 Zookeeper 集群数据是否写入成功
# [root@elk91 ~]#
echo "ls /oldboyedu-kafka-3.9.0" | nc 10.0.0.91 2181

ELK 对接Kafka集群

ELK(Elasticsearch、Logstash、Kibana)。

此处探讨如何通过这一架构优化,实现高效、可靠且可扩展的日志处理解决方案,以应对日益增长的数据量和复杂多变的业务需求,同时减轻Logstash压力并降低其与Filebeat的耦合性,提升整个系统的性能与稳定性,为企业的数据驱动决策提供坚实的技术支撑。

Kafka集群特性适配 :Kafka具备高吞吐量(如单机每秒可处理10w + /s)、高可用性(通过多副本机制保障数据不丢失)、强扩展性(可方便地进行集群扩展以应对数据增长)以及丰富的生态集成能力(与多种编程语言和工具兼容良好)等特点,使其成为对接ELK的理想选择。其强大的消息队列功能能够很好地满足日志数据海量、实时性要求高的处理需求,确保数据在产生端(如Filebeat采集的日志)和消费端(如Logstash后续处理)之间的高效流转和可靠存储。

架构图解

image-20250630173309605

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
为了减轻Logstash压力以及Logstash和filebeat的耦合性,考虑在Logstash前面加一套MQ集群。
所谓的MQ,指的是Message Queue,即消息队列。但是这种架构无疑是给系统增加了负担:
1.MQ不存在单点问题;
2.MQ具有很强的处理数据能力;
3.增加了集群的整体复杂性,运维和开发的同学都得增加学习成本;

也就是说,这意味消息队列要提供以下特性:
1.MQ集群吞吐量大,能够承担数据的读写;5台32core,32GB读取处理消息数量23w/s,写速度可以达到220m/s,
2.MQ集群要提供非常强的高可用性,不能是单点的故障;
3.文档丰富,社区资源丰富;

市面上有很多MQ产品,典型代表有:
RocketMQ【阿里巴巴,有社区版(功能较差,文档不够丰富,仅支持Java相关的API)和SAAS版本(功能强,需要花钱),性能很好,单机每秒能够处理10w+/s】
ActiveMQ【老牌系统,文档相对丰富,性能一般,单机每秒处理1w+/s】
Kafka【日志收集,大数据分析,性能非常好,单机每秒处理10w+/s,存在丢失数据的风险,但可以忽略不计,API文档非常丰富,基于Java和Scala语言研发,二次开发比较方便,社区完善了Golang,Python等API】
RabbitMQ【金融公司,文档丰富,性能较好,单机每秒处理1w+/s,可以做到数据不丢失,API开发相对来说不太友好,基于Erlang语言研发,国内并不流行,因此二次开发招人比较困难。】

架构实现

首先需要有一套kafka集群,Elasticsearch、Logstash、Kibana都已安装。

  1. filebeat生产kafka集群数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    # 编写filebeat并启动
    [root@elk93 /etc/filebeat/config]# cat filebeat_tcp-to-kafka.yaml .
    filebeat.inputs:
    - type: tcp
    host: "0.0.0.0:9000"

    # 数据输出到kafka
    output.kafka:
    # 指定kafka集群的地址
    hosts:
    - 10.0.0.91:9092
    - 10.0.0.92:9092
    - 10.0.0.93:9092

    # 指定topic
    topic: novacao-linux96-kafka

    [root@elk93 /etc/filebeat/config]# rm -rf /var/lib/filebeat/
    [root@elk93 /etc/filebeat/config]# filebeat -e -c `pwd`/filebeat_tcp-to-kafka.yaml

    # 发送测试数据
    [root@elk91 ~]# echo helllllllllllllllo |nc 10.0.0.93 9000

    # kafka验证数据
    [root@elk92 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.93:9092 --topic novacao-linux96-kafka --from-beginning
    .....
    {"@timestamp":"2025-03-17T12:35:18.320Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.17.28"},"log":{"source":{"address":"10.0.0.91:55810"}},"input":{"type":"tcp"},"agent":{"name":"elk93","type":"filebeat","version":"7.17.28","hostname":"elk93","ephemeral_id":"73d1dee2-d555-4955-b689-75d602e1b5e0","id":"ced21de3-ed8a-4601-acba-07f0d7db5a5a"},"ecs":{"version":"1.12.0"},"host":{"name":"elk93"},"message":"helllllllllllllllo"}
  2. Logstash消费kafka集群数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    # kibana基于开发工具创建账号
    POST /_security/api_key
    {
    "name": "Linux96",
    "role_descriptors": {
    "filebeat_monitoring": {
    "cluster": ["all"],
    "index": [
    {
    "names": ["novacao-logstash-kafka*"],
    "privileges": ["all"]
    }
    ]
    }
    }
    }
    # 生成实例
    {
    "id" : "QSYgpJUBD3ll3qToqN4V",
    "name" : "Linux96",
    "api_key" : "EWyBlHEHTnSQlALuB41hpw",
    "encoded" : "UVNZZ3BKVUJEM2xsM3FUb3FONFY6RVd5QmxIRUhUblNRbEFMdUI0MWhwdw=="
    }

    # 解码数据
    [root@elk91 ~]# echo UVNZZ3BKVUJEM2xsM3FUb3FONFY6RVd5QmxIRUhUblNRbEFMdUI0MWhwdw== |base64 -d ;echo
    QSYgpJUBD3ll3qToqN4V:EWyBlHEHTnSQlALuB41hpw

    # Logstash消费数据
    [root@elk93 /etc/logstash/conf.d]# cat 09-logstash-to-ES_api-keys.conf
    input {
    kafka {
    # 指定kafka集群的地址
    bootstrap_servers => "10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092"
    # 指定从kafka哪个topic拉取数据
    topics => ["novacao-linux96-kafka"]
    # 指定消费者组
    group_id => "linux96-001"
    # 指定拉取数据offset的位置点,常用值:earliest(从头拉取数据),latest(从最新的位置拉取数据)
    auto_offset_reset => "earliest"
    }
    }
    filter {
    json {
    source => "message"
    }

    mutate {
    remove_field => [ "agent","@version","ecs","input","log" ]
    }
    }
    output {
    # stdout {
    # codec => rubydebug
    # }
    elasticsearch {
    hosts => ["10.0.0.91:9200","10.0.0.92:9200","10.0.0.93:9200"]
    index => "novacao-logstash-kafka"
    api_key => "QSYgpJUBD3ll3qToqN4V:EWyBlHEHTnSQlALuB41hpw"
    ssl => true
    ssl_certificate_verification => false
    }
    }

    # 启用logstash
    [root@elk93 /etc/logstash/conf.d]# logstash -rf 09-logstash-to-ES_api-keys.conf
  3. Kibana查看数据http://localhost:5601/