Logstash是一个开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到指定的存储库中。它支持200多个插件,可以从各种数据源(如 Filebeat、 Kafka 、 JDBC 等)采集数据,并通过过滤器进行数据处理(如 Grok解析、字段脱敏、 GeoIP转换等),最后将处理后的数据输出到多个目的地(如 Elasticsearch、 Kafka 、 S3 等)‌。

参考文章:

Elasticsearch系列组件:Logstash强大的日志管理和数据分析工具

ElasticStack对接kafka集群

Logstash

Logstash 是一个开源的数据收集引擎,具有实时管道功能,主要用于日志数据的收集、分析和处理‌。包含3个主要部分: 输入(inputs),过滤器(filters)和输出(outputs)。

基本功能

  1. ‌数据收集‌:可以从多种数据源收集数据,包括日志文件、系统消息队列、数据库等。它支持多种输入插件,如filesyslogredis等,能够灵活地从各种来源捕捉事件‌。
  2. 数据处理‌:可以对收集到的数据进行过滤、转换和格式化。它提供了丰富的过滤器插件,可以对数据进行解析、提取字段、去除无效数据等操作。Logstash还支持对数据进行格式化,以满足不同的存储和处理需求‌。
  3. 数据输出‌:处理后的数据可以输出到多种目标系统,如Elasticsearch 、Kafka 、邮件通知等。Logstash的输出插件支持将数据发送到各种目标,实现了数据的灵活处理和分发‌。
  4. 插件机制‌:提供了丰富的插件机制,用户可以按需安装和使用各种插件,扩展其功能,使其可以灵活适应各种复杂的数据处理场景‌。
  5. 与 Elasticsearch 和 Kibana 的集成‌:作为 Elastic Stack 的一部分,Logstash可以与Elasticsearch和Kibana紧密集成,实现日志的搜索、存储和可视化。这使得Logstash在日志管理和分析中具有强大的功能‌。

Logstash工作原理

Logstash 的工作原理可以分为三个主要步骤:输入(Input)、过滤(Filter)和输出(Output)。

  1. 输入(Input):采集各种样式、大小和来源的数据。Logstash 支持多种类型的输入数据,包括日志文件、系统消息队列、数据库等。Logstash 支持各种输入选择,可同时从众多来源捕捉事件。在配置文件中,你可以指定一个或多个输入源。 本文以beat为例。
  2. 过滤(Filter):实时解析和转换数据。 数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式。Logstash 提供了丰富的筛选器库和功能多样的 Elastic Common Schema。例如:使用 grok 插件来解析非结构化的日志数据,将其转换为结构化的数据。使用 mutate 插件来修改数据,如添加新的字段、删除字段、更改字段的值等。总之,Logstash 能够动态地转换和解析数据,不受格式或复杂度的影响:
    • 利用 Grok 从非结构化数据中派生出结构
    • 从 IP 地址破译出地理坐标
    • 将 PII 数据匿名化,完全排除敏感字段
    • 简化整体处理,不受数据源、格式或架构的影响
  3. 输出(Output):处理后的数据可以被发送到一个或多个目标。Logstash 提供众多输出选择,支持多种类型的输出目标,包括 Elasticsearch、Kafka、邮件通知等,并且能够灵活地解锁众多下游用例。本文以Elasticsearch 为例。

这三个步骤是在 Logstash 的事件处理管道中顺序执行的。每个事件(例如,一行日志数据)都会经过输入、过滤和输出这三个步骤。在过滤阶段,如果一个事件被过滤器丢弃,那么它将不会被发送到输出目标。

image-20250701104045193

Logstash执行模型

是的,你的理解是正确的。Logstash 的执行模型主要包括以下几个步骤:

  1. 每个 Input 启动一个线程:Logstash 会为每个输入插件启动一个线程,这些线程并行运行,从各自的数据源获取数据。

  2. 数据写入队列:输入插件获取的数据会被写入一个队列。默认情况下,这是一个存储在内存中的有界队列,如果 Logstash 意外停止,队列中的数据会丢失。为了防止

数据丢失

,Logstash 提供了两个特性:

  • Persistent Queues:这个特性会将队列存储在磁盘上,即使 Logstash 意外停止,队列中的数据也不会丢失。
  • Dead Letter Queues:这个特性会保存无法处理的事件。需要注意的是,这个特性只支持 Elasticsearch 作为输出源。
  1. 多个 Pipeline Worker 处理数据:Logstash 会启动多个 Pipeline Worker,每个 Worker 会从队列中取出一批数据,然后执行过滤器和输出插件。Worker 的数量和每次处理的数据量可以在配置文件中设置。

Windows下安装启动

  1. 最新版本下载链接,以及历史版本的下载链接:Past Releases of Elastic Stack Software | Elastic或者Download Logstash Free | Get Started Now | Elastic。注意:确保 Logstash 与 Elasticsearch 版本一致。

  2. 下载完成后,解压。创建配置文件 my_logstash.conf ,名字自定义,放在“home”下,和logstash解压后形成的bin,config,jdk等目录文件夹同层级,内容:

    1
    2
    3
    4
    5
    6
    7
    8
    input {
    stdin{
    }
    }
    output {
    stdout{
    }
    }
  3. 启动logstash,启动命令:

    1
    logstash -f my_logstash.conf # 其中logstash是位于bin目录下的Windows执行文件logstash.bat。
  4. 启动成功,那么就可以在浏览器打开地址 http://localhost:9600/,会返回当前logstash的基础信息。


Logstash配置说明

1、Logstash配置介绍

Logstash 的配置主要分为两部分:Pipeline 配置文件Settings 配置文件

  1. Pipeline 配置文件:这是 Logstash 的核心配置,用于定义数据处理的流程,包括输入(input)、过滤(filter)和输出(output)三个部分。每个部分都可以使用多种插件来完成特定的任务。例如,输入部分可以使用 file 插件从文件中读取数据,过滤部分可以使用 grok 插件解析日志,输出部分可以使用 elasticsearch 插件将数据发送到 Elasticsearch。
  2. Settings 配置文件:这是 Logstash 的全局配置,通常在 logstash.yml 文件中设置。这些配置包括 Logstash 实例的名称、数据存储路径、配置文件路径、自动重载配置、工作线程数量等。

这两部分的配置都是以 YAML 格式编写的,可以使用文本编辑器进行编辑。在 Logstash 启动时,它会首先读取 Settings 配置文件,然后加载并执行 Pipeline 配置文件。

2、Pipeline配置文件-输入

在 Pipeline 配置文件中,输入(input)部分定义数据来源。Logstash 提供了多种输入插件,可以从各种数据源读取数据。常用插件:

  1. file:从文件中读取数据。常用的配置项包括 path(文件路径)和 start_position(开始读取的位置)。

    1
    2
    3
    4
    5
    6
    input {
    file {
    path => "/path/to/your/logfile"
    start_position => "beginning"
    }
    }
  2. beats:从 Beats 客户端(如 FilebeatMetricbeat 等)接收数据。常用的配置项包括 port(监听的端口号)。

    1
    2
    3
    4
    5
    input {
    beats {
    port => 5044
    }
    }
  3. http:通过 HTTP 请求接收数据。常用的配置项包括 port(监听的端口号)。

    1
    2
    3
    4
    5
    input {
    http {
    port => 8080
    }
    }
  4. jdbc:从数据库中读取数据。常用的配置项包括 jdbc_driver_library(JDBC 驱动的路径)、jdbc_driver_class(JDBC 驱动的类名)、jdbc_connection_string(数据库连接字符串)、jdbc_user(数据库用户名)和 jdbc_password(数据库密码)。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    input {
    jdbc {
    jdbc_driver_library => "/path/to/your/jdbc/driver"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/yourdatabase"
    jdbc_user => "yourusername"
    jdbc_password => "yourpassword"
    }
    }
  5. kafka:在这个配置中,bootstrap_servers 参数指定了 Kafka 服务器的地址和端口,topics 参数指定从哪个主题读取数据。kafka 输入插件还有许多其他的配置项,你可以根据实际需求进行设置。例如,你可以设置 group_id 参数来指定消费者组,设置 auto_offset_reset 参数来指定在没有初始偏移量或当前偏移量不存在时该如何定位消费位置等。具体的配置项和可能的值,你可以在 Logstash 的官方文档中找到。

    1
    2
    3
    4
    5
    6
    input {
    kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["your_topic"]
    }
    }

注意:可以在一个配置文件中定义多个输入,Logstash 会并行处理所有的输入

3、Pipeline配置文件-过滤

在 Logstash 的 Pipeline 配置文件中,过滤(filter)部分定义了数据处理的规则。过滤器插件可以对数据进行各种操作,如解析、转换、添加和删除字段等。常用的过滤插件及其操作:

  1. grok:grok 过滤器用于解析非结构化的日志数据,将其转换为结构化的数据。它使用模式匹配的方式来解析文本,每个模式是一个名字和正则表达式的组合。例如,grok 过滤器尝试将 message字段的内容匹配为 COMBINEDAPACHELOG 模式,这是一个预定义的模式,用于解析 Apache 日志,配置如下:

    1
    2
    3
    4
    5
    filter {
    grok {
    match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
    }
  2. mutate:mutate 过滤器用于修改事件数据,如添加新的字段、删除字段、更改字段的值等。例如,mutate 过滤器向每个事件添加一个名为 new_field 的新字段,字段的值为 new_value,配置如下:

    1
    2
    3
    4
    5
    filter {
    mutate {
    add_field => { "new_field" => "new_value" }
    }
    }
  3. date:date 过滤器用于解析日期和时间信息,将其转换为 Logstash 的 @timestamp 字段。例如,在这个配置中,date 过滤器会尝试将 timestamp 字段的内容匹配为指定的日期和时间格式,配置如下:

    1
    2
    3
    4
    5
    filter {
    date {
    match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
    }
    }

注意:可以在一个配置文件中定义多个过滤器,Logstash 会按照配置文件中的顺序依次执行这些过滤器。

4、Pipeline配置文件-输出

在 Logstash 的 Pipeline 配置文件中,输出(output)部分定义了处理后的数据应该发送到哪里。Logstash 提供了多种输出插件,可以将数据发送到各种目标。常用的输出插件:

  1. elasticsearch:将数据发送到 Elasticsearch。常用配置项包括 hosts(Elasticsearch 服务器地址和端口)和 index(索引名称)。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    output {
    elasticsearch {
    hosts => ["localhost:9200"] # Elasticsearch 服务器地址和端口
    index => "my-index-%{+YYYY.MM.dd}" # 索引名称
    # 枚举了一些其他的配置
    user => "your username" # 用户名
    password => "your password" # 用户密码
    manage_template => true # Logstash将管理Elasticsearch的模板
    template_overwrite => true # 模板已存在则覆盖
    template => "/path/to/your/template.json" # 参数指向模板文件的路径
    }
    }
  2. file:将数据写入到文件。常用的配置项包括 path(文件路径)。

    1
    2
    3
    4
    5
    output {
    file {
    path => "/path/to/your/file"
    }
    }
  3. stdout:将数据输出到标准输出。常用的配置项包括 codec(编码格式),常用的值有 rubydebug(以 Ruby 的调试格式输出)。

    1
    2
    3
    4
    5
    output {
    stdout {
    codec => rubydebug
    }
    }
  4. kafka:将数据发送到 Kafka。常用的配置项包括 bootstrap_servers(Kafka 服务器的地址和端口)和 topic_id(主题名称)。

    1
    2
    3
    4
    5
    6
    output {
    kafka {
    bootstrap_servers => "localhost:9092"
    topic_id => "your_topic"
    }
    }

注意:可以在一个配置文件中定义多个输出,Logstash 会将每个事件发送到所有的输出

5、Settings配置文件

Logstash 的 Settings 配置文件通常是 logstash.yml,这是 Logstash 的全局配置文件,用于设置 Logstash 运行的一些基本参数。具体的配置项和可能的值,可以在 Logstash 的官方文档中找到。以下列举了一些常见的配置项:

1
2
3
4
5
6
7
node.name: test                      # 设置 Logstash 实例的名称,默认值为当前主机的主机名
path.data: /var/lib/logstash # 设置 Logstash 存储持久化数据的路径,默认值为 Logstash 安装目录下的 data 文件夹
path.config: /etc/logstash/conf.d/*.conf # 设置 Pipeline 配置文件的路径。
config.reload.automatic: true # true 表示 Logstash 会自动检测 Pipeline 配置文件的更改,并重新加载配置。
pipeline.workers: 2 # 设置处理事件的工作线程数量,通常设置为机器的 CPU 核心数。
pipeline.batch.size: 125 # 设置每个批处理的事件数量,增大这个值可以提高吞吐量,但也会增加处理延迟。
pipeline.batch.delay: 50 # 设置两个批处理之间的最大等待时间(以毫秒为单位)。

Logstash使用示例

Logstash Hello world

在这个示例中,Logstash 使用标准输入作为输入源,标准输出作为输出目标,且不指定任何过滤器。命令行操作如下:

1
2
cd logstash-8.10.2
bin/logstash -e 'input { stdin { } } output { stdout {} }'

参数解析:

  • -e 参数用于指定 Pipeline 配置
  • input { stdin { } } 表示使用标准输入作为输入源
  • output { stdout {} } 表示使用标准输出作为输出目标

Logstash 启动成功后,在控制台输入一些文本,然后 Logstash 会将这些文本作为事件数据处理。 另外, Logstash 会自动为每个事件添加一些字段,如 @versionhost@timestamp,然后将处理后的事件输出到标准输出。 例如,输入 “hello world” ,可能输出:

1
2
3
4
5
6
{
"@version": "1",
"host": "localhost",
"@timestamp": "2018-09-18T12:39:38.514Z",
"message": "hello world"
}

日志格式处理

日志内容作为一个整体被存放在 message 字段中,对后续存储及查询都极为不便。可以为该 pipeline 指定一个 grok filter 来对日志格式进行处理。在 first-pipeline.conf 中增加 filter 配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
input { stdin { } }
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
}
output {
stdout { codec => rubydebug } # 用于美化输出
elasticsearch { # Elasticsearch 配置
hosts => [ "localhost:9200" ]
topic_id => "logstash"
}
}

如果需要将数据导入Elasticsearch,只需要在 pipeline 配置文件中增加 Elasticsearch 的 output 即可

  1. 验证配置(注意指定配置文件的路径):

    1
    ./bin/logstash -f first-pipeline.conf --config.test_and_exit 
  2. 启动命令:其中 --config.reload.automatic 选项启用动态重载配置功能

    1
    ./bin/logstash -f first-pipeline.conf --config.reload.automatic 
  3. 预期结果:

    1
    127.0.0.1 - - [28/Sep/2021:10:00:00 +0800] "GET /test.html HTTP/1.1" 200 2326 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"

    这条日志记录了一个 HTTP 请求的信息,包括客户端 IP 地址、请求时间、请求方法和 URL、HTTP 版本、响应状态码、响应体的字节数、Referer 和 User-Agent 等。 我们可以将这个日志作为输入,然后 Logstash 会使用我们的配置来处理这个日志。处理后的结果会被输出到标准输出,格式为 Ruby 的调试格式。

  4. 查询 Elasticsearch 确认数据是否正常上传:

    1
    curl -XGET 'http://localhost:9200/logstash/_search?pretty&q=response=200'

    如果已经配置了kibana,也可以使用 Kibana 查看:http://localhost:5601

Logstash的Grok模式

参考原文链接:https://zhangphil.blog.csdn.net/article/details/125172132

从字符串中匹配并提取中括号关键词后面的数据到指定字段,如[TASKID:***]格式提取到 task_id。在grok debugger里面运行:

1
2
3
4
5
6
7
8
// Sample Data
[DEBUG] [TASKID:1a-2b_3c] [DATA]
// Grok Pattern 在grok里面写正则
(?<task_id>(?<=\[TASKID:).*?(?=\]))
// Structure Data 输出的结果
{
"task id":"1a-2b_3c"
}

一些命令:

1
2
3
4
5
6
7
8
结果都最终存放到result字段:
(?<result>(.*)(?=myend)/?) 提取myend之前的数据
(?<result>(?=mybegin)(.*)/?) 提取mybegin之后的数据
(?<result>(?<=mybegin).*?(?=myend)) 提取mybegin和myend之间的数据,不包含mybegin和myend
(?<result>(taga).*?(?=tagb)) 提取包含taga但不包含tagb的数据
(?<result>(?<=taga).*?(tagb)) 提取内容不包含taga但包含tagb
(?<result>(taga).*?(tagb|tagc)) 提取以taga开头,以tagb或tagc结尾的、所有包含头尾的数据
(?<result>(taga).*?(?=(tagb|tagc))) 提取以taga开头,以tagb或tagc结尾的不包含头尾的数据

Logstash上报数据到ElasticSearch

安装好logstash和elasticsearch后,配置logstash的启动加载文件中的output部分,指明logstash上报到elasticsearch:

参考原文链接:https://zhangphil.blog.csdn.net/article/details/126179168

1
2
3
4
5
6
7
8
9
10
11
12
13
14
output {
elasticsearch {
hosts => ["http://localhost:9200"]
user => "your username"
password => "your password"
index => "fly-%{+YYYY.MM.dd}"
template_overwrite => "false"
}

stdout{
codec => json {
}
}
}