# Logstash
:一个数据处理程序。可以收集多种格式的数据,加工处理之后,写入多种数据库。
- 采用 Ruby 语言开发,通过 JRuby 解释器运行在 JVM 上。
- 2009 年,Jordan Sissel 发布了 Logstash ,成为了流行的日志采集工具,也可处理其它类型的数据。
- 2013 年,Logstash 被 Elastic 公司收购,组成了 ELK 系统。
# 部署
下载二进制版:
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.10.0-linux-x86_64.tar.gz
然后启动:
bin/logstash -f PATH # --path.config ,指定配置文件的目录,会加载其下所有 *.conf 文件 -e STRING # --config.string ,传入一个字符串作为配置 --log.level=info # 指定日志等级 -V # 显示版本号 -r # --config.reload.automatic ,自动加载发生变化的配置文件 --config.reload.interval 3 # 默认每隔 3 秒检查一次配置文件是否变化
- 如果启用了 -r 选项,则 logstash 检查到配置文件发生变化时,会重新加载。
- 重新加载时,logstash 会根据新的配置文件,尝试创建新管道并使用。如果新管道可以正常运行(比如语法没报错),则用新管道替换旧管道。否则,继续运行旧管道。
- 用
kill -SIGHUP
发送信号,也会使其重新加载一次配置文件。 - 插件不一定能重新加载,可能依然需要重启 logstash 。
- 如果启用了 -r 选项,则 logstash 检查到配置文件发生变化时,会重新加载。
或者用 docker-compose 部署:
version: '3' services: logstash: container_name: logstash image: logstash:7.10.1 restart: unless-stopped ports: - 5044:5044 volumes: - ./config:/usr/share/logstash/config - ./data:/usr/share/logstash/data
- 容器内以非 root 用户运行服务,需要调整挂载目录的权限:
mkdir -p config data chown -R 1000 .
- 容器内以非 root 用户运行服务,需要调整挂载目录的权限:
# 配置
logstash 的配置目录的结构如下:
config/ ├── conf.d/ # 存放一些管道的定义文件 | ├── a.conf | └── b.conf ├── jvm.options # JVM 的配置,比如限制内存 -Xmx ,设置时区 -Duser.timezone=GMT+08 ├── log4j2.properties # Java 日志的配置 ├── logstash.yml # logstash 本身的配置 ├── pipelines.yml # 定义管道 └── startup.options # 自定义 logstash 启动命令的配置,供 systemd 读取
logstash.yml 默认为空,配置示例:
# path.data: /var/lib/logstash # path.logs: /var/log/logstash # log.level: info # log.format: plain # dead_letter_queue.enable: false # 是否启用死信队列,默认为 false pipeline: batch: # size: 125 # input 阶段每接收指定数量的事件,才打包成一个 batch ,供 filter、output 阶段的一个 worker 处理。增加该值会提高处理速度 # delay: 50 # 收集 batch 时,等待接收新事件的超时时间,单位 ms 。如果等待超时,则立即打包成一个 batch 。每个新事件会单独考虑超时时间,因此超时时间应该设置得小些 workers: 4 # 处理 filter、output 阶段的线程数,默认等于 CPU 核数。可以大于 CPU 核数,因为输出阶段的 worker 会等待网络 IO 而不占用 CPU
- pipeline 在内存中处理的 event 数量最大为 size * workers 。
- 接收一个 batch 的最长耗时为 size * delay 。
pipelines.yml 的配置示例:
- pipeline.id: main # 创建一个管道 path.config: "config/conf.d/*.conf" # 导入配置文件 # pipeline.output.workers: 1
- pipelines.yml 可通过 - 创建多个管道,默认会继承 logstash.yml 全局作用域的配置。
# pipeline
# 原理
- Linux 系统上一般通过管道符筛选日志,比如
cat test.log | grep ERROR'
。而 logstash 处理数据的机制也称为管道(pipeline),每条数据称为一个事件(event)。 - logstash 可以运行多个管道,每个管道分为三个阶段:
- input :输入数据。
- filter :过滤、修改数据。该阶段可以省略。
- output :输出数据。
# 示例
通过命令行创建管道的示例:
启动 logstash ,运行一个简单的管道:
bin/logstash -e 'input { stdin { } } output { stdout {} }'
这里接收 stdin 输入的数据,没有 filter ,直接输出到 stdout 。
此时在终端输入一个字符串 Hello ,按下回车,显示的输出如下:
{ "@timestamp" => 2020-01-12T07:37:00.045Z, "host" => "CentOS-1", "message" => "Hello", "@version" => "1" }
通过配置文件创建管道的示例:
创建一个配置文件
config/pipeline.conf
,定义一个管道:input { # file { # 读取文件作为输入 # path => "/var/log/http.log" # } beats { # 接收 beats 的输入 port => "5044" # 监听一个端口,供 beats 发送数据进来。这里采用 TCP 协议通信,而不是 HTTP 协议 host => "0.0.0.0" # client_inactivity_timeout => 60 # 如果 beats 连续多久未活动,则关闭 TCP 连接,单位为秒 # codec => "plain" # 设置处理输入数据的格式。默认为 plain # include_codec_tag => true # 是否给数据添加一个 tag ,记录 codec 信息。默认为 true ,使得每个 event 都有一个 beats_input_codec_plain_applied 标签 } # kafka { # 从 kafka 获取消息 # bootstrap_servers => "localhost:9092" # auto_commit_interval_ms => 5000 # auto_offset_reset => "latest" # codec => "json" # consumer_threads => 1 # group_id => "logstash" # topics_pattern => "logstash.*" # 根据正则表达式,订阅一些 topic # } } # filter { # } output { stdout { # 输出到终端,便于调试 # codec => rubydebug # 输出时默认采用 rubydebug 格式 # codec => rubydebug { metadata => true } # 是否输出 @metadata 字段 } # file { # 输出到文件 # path => "/tmp/logstash.log" # codec => line { # 逐行解析输入的文本 # format => "custom format: %{message}" # 设置处理数据的格式。默认为 json_lines # charset => "UTF-8" # 编码格式,默认为 UTF-8 # } # } elasticsearch { # 输出到 ES # 该插件会将 pipeline 的一组 batch event 放在一个 bulk 请求中发出,如果单个请求超过 20M 则拆分成多个请求 hosts => "http://10.0.0.1:9200" # user => "admin" # password => "123456" # ssl_certificate_verification => true # 使用 HTTPS 连接时,是否验证 SSL 证书 # http_compression => false # 是否对请求 body 进行 gzip 压缩 # index => "%{project}-%{+yyyy.MM.dd}" # 指定写入的索引名。可通过 %{} 插入变量 # document_id => "%{[@metadata][_id]}" # 指定写入的文档 id 。如果已存在相同 id 的文档,则会覆盖它 # manage_template => true # logstash 启动时,是否自动在 ES 中创建索引模板 # template => "/path/to/logstash/logstash-apache.json" # template 的配置文件,默认使用内置的模板 # template_name => "logstash" # template 的名称 # template_overwrite => false # 如果 ES 中已存在同名 template ,是否覆盖它 # retry_initial_interval => 2 # 第 n 次重试之前等待 n*retry_initial_interval 秒 # retry_max_interval => 64 # 重试时的最大间隔时间 } }
- output 阶段,如果某个 event 输出失败,有几种处理措施:
- 大部分情况下,会无限重试。
- 如果 HTTP 响应码为 409 conflict ,则不会重试,丢弃 event 。
- 如果 HTTP 响应码为 400 mapper_parsing_exception 或 404 ,表示不能重试,则打印报错日志,丢弃 event 。
- 可启用死信队列,将这些 event 保存到
data/dead_letter_queue/
目录下,然后可通过 input.dead_letter_queue 插件读取。
- 可启用死信队列,将这些 event 保存到
- 如果 HTTP 响应码为 403 pressure too high ,表示 ES 负载过大,拒绝了 bulk 请求。此时会自动重试,但这会导致 ES 的负载更大,可能返回 503 Unavailable ,最终导致 logstash 放弃重试。因此建议增加 retry 的间隔。
- 输出 event 到 ES 的某个 index 时,日期后缀
%{+yyyy.MM.dd}
取决于 @timestamp 字段的值。- filebeat、logstash、ES 三个软件都要求每条数据包含 @timestamp 字段,采用 UTC 时区。
- 在 Kibana 的 Discover 网页中查询日志时,会自动将 ES 中存储的 @timestamp 字段,转换成用户浏览器的本地时区,然后显示。
- output 阶段,如果某个 event 输出失败,有几种处理措施:
启动 logstash ,运行指定的管道:
bin/logstash -f config/pipeline.conf --log.level=debug
# 语法
pipeline 的语法与 Ruby 相似,特点如下:
- Hash 字典的键值对之间通过空格分隔,比如
{"field1" => "A" "field2" => "B"}
。 - 支持引用变量:
- 用
filed
或[filed]
的格式引用 event 的顶级字段。 - 用
[filed][sub_filed]...
的格式引用子字段。- 引用子字段时,如果父字段不存在,则会自动创建它,因此应该先检查父字段是否存在。如下:
if [json] and [json][version] == "null" { mutate { remove_field => [ "[json][version]" ] } }
- 引用子字段时,如果父字段不存在,则会自动创建它,因此应该先检查父字段是否存在。如下:
- 用
%{filed}
的格式获取字段的值。 - 用
${VAR}
的格式获取终端环境变量的值。 - 例:
filter { if [agent_name] and [@metadata][time] { mutate { add_field { "port" => "${TCP_PORT}" "[@metadata][tmp_name]" => "%{agent_name} %{[@metadata][time]}" } } } }
@metadata
是一个内置字段,不会被 output 阶段输出,因此可以存储一些临时的子字段。
- 用
- 支持使用 if 语句:
- 支持
<
、>
、<=
、>=
、==
、!=
比较运算符。 - 支持
=~
!~
运算符,判断左侧的字符串是否匹配右侧的正则表达式。 - 支持
and
、or
、!
、not
、not in
逻辑运算符。 - 例:
filter { if [level] == "DEBUG" { grok {...} } else if [level] == "WARN" { grok {...} } else { grok {...} } }
if [level] =~ "DEBUG" or [level] =~ "WARN" or [level] =~ "ERROR"
if [level] in ["DEBUG", "WARN", "ERROR"]
if "_grokparsefailure" not in [tags] # 判断一个 tag 是否在 tags 字段中存在
if [level] # 判断一个字段是否存在,且取值不为 false、null
if ![level] { # 如果字段不存在,则添加它 add_field => { "level" => "DEBUG" } }
- 支持
# 插件
# codec
- codec 类型的插件用于按特定的文本格式编码、解码数据,可以用于 pipeline 的 input 或 output 阶段。
- codec 插件举例:
plain # 纯文本,即不进行处理 line # 用于解码输入时,将每行文本视作一个 event 。用于编码输出时,将每个 event 保存成一行文本 multiline # 将连续的多行文本视作同一个 event 。不过该操作可以由 Beats 完成,减轻 logstash 的工作量 json # 按 JSON 格式处理,忽略换行符、缩进 json_lines # 根据换行符 `\n` 将文本分成多行,每行视作一个 JSON 格式的 event rubydebug # 按 Ruby 调试信息的格式处理
# grok
:一个 filter 插件,用于按正则表达式解析 event 中的某些字段。
Kibana 网页上提供的开发工具包含了 grok debugger ,便于调试 grok pattern 。
例:
假设输入数据是一行纯文本格式的日志:
2020-01-12 07:24:43.659+0000 INFO 10.0.0.1 User login successfully
编写一个 grok 表达式来解析:
%{TIMESTAMP_ISO8601:timestamp}\s+(?<level>\S+)\s+(?<client_ip>\S+)\s+(?<message>.*)$
- 可以按
(?<field>pattern)
的格式匹配字段。例如(?<level>\S+)
表示使用正则表达式\S+
进行匹配,将匹配结果赋值给名为 level 的字段。 - 可以按
%{NAME:field}
的格式调用事先定义的正则表达式。例如%{TIMESTAMP_ISO8601:timestamp}
表示使用一个名为 TIMESTAMP_ISO8601 的正则表达式进行匹配,将匹配结果赋值给名为 timestamp 的字段。
- 可以按
grok 输出的结构化数据为:
{ "level": "INFO", "client_ip": "10.0.0.1", "message": "User login successfully", "timestamp": "2020-01-12 07:24:43.659+0000" }
可以事先定义一些正则表达式,然后通过名称调用它们。
- 定义格式为:
NAME pattern
- 例:
INT (?:[+-]?(?:[0-9]+)) WORD \b\w+\b SPACE \s* NOTSPACE \S+ GREEDYDATA .*
- grok 内置了一些 patterns (opens new window) 。
- 定义格式为:
例:在 pipeline 的 filter 中使用 grok 插件
filter { grok { match => { "message" => "%{TIMESTAMP_ISO8601:timestamp}\s+(?<level>\S+)\s+(?<client_ip>\S+)\s+(?<message>.*)$" } # 解析 message 字段,通过正则匹配提取内容另存为字段 overwrite => [ "message" ] # 允许提取的这些字段覆盖 event 中已存在的字段 # patterns_dir => ["config/patterns"] # 加载 patterns 的定义文件 # keep_empty_captures => false # 如果匹配到的字段为空,是否依然保留该字段 # tag_on_failure => ["_grokparsefailure"] # 如果匹配失败,则给 event 添加这些 tag # tag_on_timeout => ["_groktimeout"] # 如果匹配超时,则给 event 添加这些 tag # timeout_millis => 30000 # 匹配的超时时间,单位 ms # 以下是所有 filter 插件通用的配置参数 # add_field => { # 添加字段 # "test_field" => "Hello" # "from_%{IP}" => "this is from %{IP}" # } # add_tag => ["test_tag", "from_%{IP}"] # 添加标签 # remove_field => ["field_1" , "from_%{IP}"] # 删除字段 # remove_tag => ["test_tag", "from_%{IP}"] # 删除标签 # id => "ABC" # 该插件的唯一 id ,默认会自动生成 # enable_metric => true # 是否记录该插件的指标 } }
- 在 if 语句内可以调用 filter 插件,但反之不行。可以写成以下形式:
if xxx { grok {} if xxx { grok {} } }
- 如果输入数据的每行格式可能不同,则可以在 match 中指定多个表达式用于尝试匹配:不过这样会多次执行正则表达式,比如第一个正则表达式总是会被执行,开销较大。不如通过 if 语句选择性地执行 grok 。
match => { "message" => [ "DEBUG (?<message>.*)$", "INFO (?<message>.*)$" ] # break_on_match => true # 当表达式匹配成功时,不再尝试匹配之后的表达式 }
- 在 if 语句内可以调用 filter 插件,但反之不行。可以写成以下形式:
# json
:一个 filter 插件,用于按 JSON 格式解析 event 的一个字段。
- 例:
json { source => "message" # 按 JSON 格式解析 message 字段的值 # target => "json" # 将解析之后的 JSON 字典保存到该字段.如果该字段已存在,则覆盖它。如果不配置,则存储为顶级字段 # skip_on_invalid_json => false # 解析失败时,不发出警告 # tag_on_failure => ["_jsonparsefailure"] }
# date
:一个 filter 插件,用于解析 event 的一个字段,获取时间。
- 例:
if [timestamp] { date { match => ["timestamp", "ISO8601", "yyyy-MM-dd HH:mm:ss.SSSZ", "UNIX", "UNIX_MS"] # 指定源字段,然后可以指定多个尝试匹配的时间字符串格式 remove_field => ["timestamp"] # target => "@timestamp" # 将解析之后的时间保存到该字段。如果该字段已存在,则覆盖它 # tag_on_failure => ["_dateparsefailure"] timezone => "Asia/Shanghai" # 如果没有从源字段解析出时区,则采用该默认时区 } }
# drop
:一个 filter 插件,用于丢弃一些 event 。
- 例:
if [level] == "DEBUG" { drop { # percentage => 40 # 丢弃的概率大概为 40% } }
# mutate
:一个 filter 插件,用于修改 event 的一些字段。
- 例:
mutate { copy => { "field1" => "field2" } # 将 field1 字段的值拷贝到 field2 字段 rename => { "field1" => "field2" } # 将 field1 字段重命名为 field2 replace => { "field1" => "new: %{field2}" } # 替换 field1 字段的值 convert => { # 转换字段的数据类型,默认都是字符串类型 "field1" => "boolean" "field2" => "integer" # 可以按这种格式同时处理多个字段 } lowercase => [ "field1" ] # 将字段的值改为小写 uppercase => [ "field1" ] # 将字段的值改为大写 strip => [ "field1" ] # 删掉字段的值前后的空白字符 split => { "[json][version]" => "." } # 根据指定的字符分割一个字段的值,保存为数组形式 # tag_on_failure => ["_mutate_error"] }
# geoip
:一个 filter 插件,用于查询 IP 地址对应地理位置,包括经纬度坐标、国家名、城市名等。
- 查询时的开销比较大。
- 例:
geoip { source => "client_ip" # 存储 IP 地址的字段 target => "client" # 存储查询结果的字段 # database => "xx/xx/GeoLite2-City.mmdb" # 用于查询的数据库文件。默认使用自带的免费 GeoLite2 数据库,并每天自动更新 # cache_size => 1000 # 缓存区的大小。查询一些重复 IP 或相邻 IP 时,使用缓存可以提高效率 # tag_on_failure => ["_geoip_lookup_failure"] }
# ruby
:一个 filter 插件,用于嵌入 Ruby 代码。
- 例:
ruby { code => "event.cancel if rand <= 0.90" # 执行 Ruby 代码,这里是 90% 的概率取消 event }
- 可以导入一个 Ruby 脚本文件:脚本的内容示例:
ruby { path => "test_filter.rb" script_params => { "percentage" => 0.9 } }
def register(params) # 可以定义一个 register(params) 函数,接收传给脚本的参数 @drop_percentage = params["percentage"] end def filter(event) # 必须定义一个 filter(event) 函数,输入 event ,返回一个包含事件的数组 if rand >= @drop_percentage return [event] else return [] # 返回一个空数组,这会取消 event end end