Logstash详解

一、Logstash简介

1、官网地址

https://www.elastic.co/guide/en/logstash/7.x/introduction.html

2、软件介绍

官方介绍:Logstash is an open source data collection engine with real-time pipelining capabilities。简单来说logstash就是一根具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可以让你根据自己的需求在中间加上滤网,Logstash提供里很多功能强大的滤网以满足你的各种应用场景。

Logstash常用于日志关系系统中做日志采集设备;

3、系统结构

Logstash详解

Logstash的事件(logstash将数据流中等每一条数据称之为一个event)处理流水线有三个主要角色完成:inputs –> filters –> outputs:

inpust:必须,负责产生事件(Inputs generate events),常用:File、syslog、redis、beats(如:Filebeats)

filters:可选,负责数据处理与转换(filters modify them),常用:grok、mutate、drop、clone、geoip

outpus:必须,负责数据输出(outputs ship them elsewhere),常用:elasticsearch、file、graphite、statsd

其中inputs和outputs支持codecs(coder&decoder)在1.3.0 版之前,logstash 只支持纯文本形式输入,然后以过滤器处理它。但现在,我们可以在输入 期处理不同类型的数据,所以完整的数据流程应该是:input | decode | filter | encode | output;codec 的引入,使得 logstash 可以更好更方便的与其他有自定义数据格式的运维产品共存,比如:graphite、fluent、netflow、collectd,以及使用 msgpack、json、edn 等通用数据格式的其他产品等

4、应用场景

Logstash最常用于ELK(elasticsearch + logstash + kibane)中作为日志收集器使用

二、Logstash安装

1、环境清单

操作系统:CentOS Linux release 7.3.1611

Logstash版本:logstash-5.4.1

Jdk版本:1.8.0_131

2、软件下载

· 下载Jdk:

[[email protected] ~]$ wget --no-check-certificate --no-cookies --header “Cookie: oraclelicense=accept-securebackup-cookie” http://download.oracle.com/otn-pub/java/jdk/8u131-b11/d54c1d3a095b4ff2b6607d096fa80163/jdk-8u131-linux-x64.tar.gz

· 下载Logstash:

[[email protected] ~]$ wget https://artifacts.elastic.co/downloads/logstash/logstash-5.4.1.tar.gz

3、安装步骤

3.1、安装Jdk

· 创建安装目录

[[email protected] ~]$ sudo mkdir /usr/local/Java

· 解压缩安装文件

## 移动安装包到安装目录 ##

[[email protected] ~]$ sudo mv jdk-8u131-linux-x64.tar.gz /usr/local/Java/

## 进入安装目录 ##

[[email protected] ~]$ cd /usr/local/Java/

## 解压缩安装包 ##

[[email protected] Java]$ sudo tar -zxvf jdk-8u131-linux-x64.tar.gz

## 删除安装包 ##

[[email protected] Java]$ sudo rm jdk-8u131-linux-x64.tar.gz

· 测试安装是否成功

## 进入JAVA_HOME ##

[[email protected] Java]$ cd jdk1.8.0_131/

## 测试java命令是否可以正常执行 ##

[[email protected] jdk1.8.0_131]$ ./bin/java -version

· 配置JAVA_HOME环境变量

[[email protected] ~]$ cd ~

[[email protected] ~]$ vi .bash_profile

# .bash_profile

# Get the aliases and functions

if [ -f ~/.bashrc ]; then

. ~/.bashrc

fi

# User specific environment and startup programs

## 配置JAVA_HOME环境变量 ##

JAVA_HOME=/usr/local/Java/jdk1.8.0_131

## 将java执行目录加入到PATH下面 ##

PATH=PATH:PATH:HOME/.local/bin:HOME/bin:HOME/bin:JAVA_HOME/bin

export PATH

~

## 使环境变量生效 ##

[[email protected] ~]$ source .bash_profile

## 测试JAVA_HOME是否正确配置 ##

[[email protected] ~]$ java -version

3.2、安装Logstash

· 创建安装目录

[[email protected] ~]$ sudo mkdir /usr/local/logstash

· 解压缩安装文件

[[email protected] ~]$ sudo mv logstash-5.4.1.tar.gz /usr/local/logstash/

[[email protected] ~]$ cd /usr/local/logstash/

[[email protected] logstash]$ sudo tar -zxvf logstash-5.4.1.tar.gz

· 测试安装是否成功

o 测试一、快速启动,标准输入输出作为input和output,没有filter

[[email protected] logstash]$ cd logstash-5.4.1/

[[email protected] logstash-5.4.1]$ ./bin/logstash -e ‘input { stdin {} } output { stdout {} }’

Sending Logstash’s logs to /usr/local/logstash/logstash-5.4.1/logs which is now configured via log4j2.properties

[2017-06-17T13:37:13,449][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>“path.queue”, :path=>"/usr/local/logstash/logstash-5.4.1/data/queue"}

[2017-06-17T13:37:13,467][INFO ][logstash.agent ] No persistent UUID file found. Generating new UUID {:uuid=>“dcfdb85f-9728-46b2-91ca-78a0d6245fba”, :path=>"/usr/local/logstash/logstash-5.4.1/data/uuid"}

[2017-06-17T13:37:13,579][INFO ][logstash.pipeline ] Starting pipeline {“id”=>“main”, “pipeline.workers”=>2, “pipeline.batch.size”=>125, “pipeline.batch.delay”=>5, “pipeline.max_inflight”=>250}

[2017-06-17T13:37:13,612][INFO ][logstash.pipeline ] Pipeline main started

The stdin plugin is now waiting for input:

[2017-06-17T13:37:13,650][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}

## 此时命令窗口停留在等待输入状态,键盘键入任意字符 ##

hello world

## 下方是Logstash输出到效果 ##

2017-06-17T05:37:29.401Z chenlei.master hello world

o 测试二、在测试一堆基础上加上codec进行格式化输出

[[email protected] logstash-5.4.1]$ ./bin/logstash -e ‘input{stdin{}} output{stdout{codec=>rubydebug}}’

Sending Logstash’s logs to /usr/local/logstash/logstash-5.4.1/logs which is now configured via log4j2.properties

[2017-06-17T14:01:50,325][INFO ][logstash.pipeline ] Starting pipeline {“id”=>“main”, “pipeline.workers”=>2, “pipeline.batch.size”=>125, “pipeline.batch.delay”=>5, “pipeline.max_inflight”=>250}

[2017-06-17T14:01:50,356][INFO ][logstash.pipeline ] Pipeline main started

The stdin plugin is now waiting for input:

[2017-06-17T14:01:50,406][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}

## 此时命令窗口停留在等待输入状态,键盘键入任意字符 ##

hello world

## 下方是Logstash输出到效果 ##

{

“@timestamp” => 2017-06-17T06:02:19.189Z,

“@version” => “1”,

“host” => “chenlei.master”,

“message” => “hello world”

}

三、Logstash参数与配置

1、常用启动参数

Logstash详解

2、配置文件结构及语法

区段

Logstash通过{}来定义区域,区域内可以定义插件,一个区域内可以定义多个插件,如下:

input {

​ stdin {

​ }

​ beats {

​ port => 5044

​ }

}

数据类型

Logstash仅支持少量的数据类型:

Boolean:ssl_enable => true

Number:port => 33

String:name => “Hello world”

Commonts:# this is a comment

字段引用

Logstash数据流中的数据被称之为Event对象,Event以JSON结构构成,Event的属性被称之为字段,如果你像在配置文件中引用这些字段,只需要把字段的名字写在中括号[]里就行了,如[type],对于嵌套字段每层字段名称都写在[]里就可以了,比如:[tags][type];除此之外,对于Logstash的arrag类型支持下标与倒序下表,如:[tags][type][0],[tags][type][-1]。

条件判断

Logstash支持下面的操作符:

equality:==, !=, <, >, <=, >=

regexp:=~, !~

inclusion:in, not in

boolean:and, or, nand, xor

unary:!

例如:

if EXPRESSION {

} else if EXPRESSION {

} else {

}

环境变量引用

Logstash支持引用系统环境变量,环境变量不存在时可以设置默认值,例如:export TCP_PORT=12345

input {

tcp {

​ port => “${TCP_PORT:54321}”

}

}

3、常用输入插件(Input plugin)

3.1、File读取插件

文件读取插件主要用来抓取文件的变化信息,将变化信息封装成Event进程处理或者传递。

配置事例

input

file {

​ path => ["/var/log/*.log", “/var/log/message”]

​ type => “system”

​ start_position => “beginning”

}

}

常用参数

Logstash详解
Logstash详解

3.2、Beats监听插件

Beats插件用于建立监听服务,接收Filebeat或者其他beat发送的Events;

配置事例

input {

​ beats {

​ port => 5044

​ }

}

常用参数(空 => 同上)

Logstash详解
Logstash详解

3.3、TCP监听插件

TCP插件有两种工作模式,“Client”和“Server”,分别用于发送网络数据和监听网络数据。

配置事例

tcp {

​ port => 41414

}

常用参数(空 => 同上)

Logstash详解

3.4、Redis读取插件

用于读取Redis中缓存的数据信息。

配置事例

input {

redis {

​ host => “127.0.0.1”

​ port => 6379

​ data_type => “list”

​ key => “logstash-list”

}

}

常用参数(空 => 同上)

Logstash详解

3.5、Syslog监听插件

监听操作系统syslog信息

配置事例

syslog {

}

常用参数(空 => 同上)

Logstash详解Logstash详解

4、常用过滤插件(Filter plugin)

丰富的过滤器插件的是 logstash威力如此强大的重要因素,过滤器插件主要处理流经当前Logstash的事件信息,可以添加字段、移除字段、转换字段类型,通过正则表达式切分数据等,也可以根据条件判断来进行不同的数据处理方式。

4.1、grok正则捕获

grok 是Logstash中将非结构化数据解析成结构化数据以便于查询的最好工具,非常适合解析syslog logs,apache log, mysql log,以及一些其他的web log

预定义表达式调用

Logstash提供120个常用正则表达式可供安装使用,安装之后你可以通过名称调用它们,语法如下:%{SYNTAX:SEMANTIC}

SYNTAX:表示已经安装的正则表达式的名称

SEMANTIC:表示从Event中匹配到的内容的名称

例如:Event的内容为“[debug] 127.0.0.1 - test log content”,匹配%{IP:client}将获得“client: 127.0.0.1”的结果,前提安装了IP表达式;如果你在捕获数据时想进行数据类型转换可以使用%{NUMBER:num:int}这种语法,默认情况下,所有的返回结果都是string类型,当前Logstash所支持的转换类型仅有“int”和“float”;

一个稍微完整一点的事例:

日志文件http.log内容:55.3.244.1 GET /index.html 15824 0.043

表达式:%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}

配置文件内容:

input {

file {

​ path => “/var/log/http.log”

}

}

filter {

grok {

​ match => {“message” => “%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}”}

}

}

输出结果:

client: 55.3.244.1

method: GET

request: /index.html

bytes: 15824

duration: 0.043

· 自定义表达式调用

语法:(?<field_name>the pattern here)

举例:捕获10或11和长度的十六进制queue_id可以使用表达式(?<queue_id>[0-9A-F]{10,11})

· 安装自定义表达式

与预定义表达式相同,你也可以将自定义的表达式配置到Logstash中,然后就可以像于定义的表达式一样使用;以下是操作步骤说明:

1、在Logstash根目录下创建文件夹“patterns”,在“patterns”文件夹中创建文件“extra”(文件名称无所谓,可自己选择有意义的文件名称);

2、在文件“extra”中添加表达式,格式:patternName regexp,名称与表达式之间用空格隔开即可,如下:

# contents of ./patterns/postfix:

POSTFIX_QUEUEID [0-9A-F]{10,11}

3、使用自定义的表达式时需要指定“patterns_dir”变量,变量内容指向表达式文件所在的目录,举例如下:

## 日志内容 ##

Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=[email protected]

## Logstash配置 ##

filter {

grok {

​ patterns_dir => ["./patterns"]

​ match => { “message” => “%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}” }

}

}

## 运行结果 ##

timestamp: Jan 1 06:25:43

logsource: mailserver14

program: postfix/cleanup

pid: 21403

queue_id: BEF25A72965

· grok常用配置参数(空 => 同上)

Logstash详解
Logstash详解

  • 其他
  • 一般的正则表达式只能匹配单行文本,如果一个Event的内容为多行,可以在pattern前加“(?m)”
  • 对于Hash和Array类型,Hash表示键值对,Array表示数组
  • Grok表达式在线debug地址:http://grokdebug.herokuapp.com
  • 预定义正则表达式参考地址:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns

4.2、date时间处理插件

该插件用于时间字段的格式转换,比如将“Apr 17 09:32:01”(MMM dd HH:mm:ss)转换为“MM-dd HH:mm:ss”。而且通常情况下,Logstash会为自动给Event打上时间戳,但是这个时间戳是Event的处理时间(主要是input接收数据的时间),和日志记录时间会存在偏差(主要原因是buffer),我们可以使用此插件用日志发生时间替换掉默认是时间戳的值。

· 常用配置参数(空 => 同上)

Logstash详解

4.3、mutate数据修改插件

mutate 插件是 Logstash另一个重要插件。它提供了丰富的基础类型数据处理能力。可以重命名,删除,替换和修改事件中的字段。

· 常用配置参数(空 => 同上)

Logstash详解
Logstash详解

4.4、JSON插件

JSON插件用于解码JSON格式的字符串,一般是一堆日志信息中,部分是JSON格式,部分不是的情况下

配置事例

json {

​ source => …

}

## 事例配置,message是JSON格式的字符串:"{“uid”:3081609001,“type”:“signal”}" ##

filter {

​ json {

​ source => “message”

​ target => “jsoncontent”

​ }

}

## 输出结果 ##

{

​ “@version”: “1”,

​ “@timestamp”: “2014-11-18T08:11:33.000Z”,

​ “host”: “web121.mweibo.tc.sinanode.com”,

​ “message”: “{“uid”:3081609001,“type”:“signal”}”,

​ “jsoncontent”: {

​ “uid”: 3081609001,

​ “type”: “signal”

​ }

}

## 如果从事例配置中删除target,输出结果如下 ##

{

​ “@version”: “1”,

​ “@timestamp”: “2014-11-18T08:11:33.000Z”,

​ “host”: “web121.mweibo.tc.sinanode.com”,

​ “message”: “{“uid”:3081609001,“type”:“signal”}”,

​ “uid”: 3081609001,

​ “type”: “signal”

}

· 常用配置参数(空 => 同上)

Logstash详解

4.5、elasticsearch查询过滤插件

用于查询Elasticsearch中的事件,可将查询结果应用于当前事件中

· 常用配置参数(空 => 同上)

Logstash详解

· 4.6、其他

还有很多其他有用插件,如:Split、GeoIP、Ruby,这里就不一一写了,等以后用到再补充

5、常用输出插件(Output plugin)

5.1、ElasticSearch输出插件

用于将事件信息写入到Elasticsearch中,官方推荐插件,ELK必备插件

配置事例

output {

​ elasticsearch {

​ hosts => [“127.0.0.1:9200”]

​ index => “filebeat-%{type}-%{+yyyy.MM.dd}”

​ template_overwrite => true

​ }

}

· 常用配置参数(空 => 同上)

Logstash详解
Logstash详解
Logstash详解

Logstash详解
Logstash详解

5.2、Redis输出插件

用于将Event写入Redis中进行缓存,通常情况下Logstash的Filter处理比较吃系统资源,复杂的Filter处理会非常耗时,如果Event产生速度比较快,可以使用Redis作为buffer使用

· 配置事例

output {

​ redis {

​ host => “127.0.0.1”

​ port => 6379

​ data_type => “list”

​ key => “logstash-list”

​ }

}

· 常用配置参数(空 => 同上)

Logstash详解
Logstash详解

5.3、File输出插件

用于将Event输出到文件内

· 配置事例

output {

​ file {

​ path => …

​ codec => line { format => “custom format: %{message}”}

​ }

}

· 常用配置参数(空 => 同上)

Logstash详解

5.4、TCP插件

Write events over a TCP socket.Each event json is separated by a newline.Can either accept connections from clients or connect to a server, depending on mode.

· 配置事例

tcp {

​ host => …

​ port => …

}

· 常用配置参数(空 => 同上)

Logstash详解

· 6、常用编码插件(Codec plugin)

6.1、JSON编码插件

直接输入预定义好的 JSON 数据,这样就可以省略掉 filter/grok 配置

配置事例

json {

}

· 常用配置参数

Logstash详解

四、Logstash实例

1、接收Filebeat事件,输出到Redis

input {

​ beats {

​ port => 5044

​ }

}

output {

​ redis {

​ host => “127.0.0.1”

​ port => 6379

​ data_type => “list”

​ key => “logstash-list”

​ }

}

2、读取Redis数据,根据“type”判断,分别处理,输出到ES

input {

​ redis {

​ host => “127.0.0.1”

​ port => 6379

​ data_type => “list”

​ key => “logstash-list”

​ }

}

filter {

​ if [type] == “application” {

​ grok {

​ match => [“message”, “(?m)-(?.+?)?????(?>\d\d){1,2}-(?:0?[1-9]|1[0-2])-(??????:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) (?:2[0123]|[01]?[0-9])?????:[0-5][0-9])??????????:[0-5][0-9]|60)(?:[:.,][0-9]+)?)) [(?(\b\w+\b)) ] (?(\b\w+\b)) ((?.?)) - (?.*)”]

​ }

​ date {

​ match => [“logTime”, “yyyy-MM-dd HH:mm:ss,SSS”]

​ }

​ json {

​ source => “message”

​ }

​ date {

​ match => [“timestamp”, “yyyy-MM-dd HH:mm:ss,SSS”]

​ }

​ }

​ if [type] == “application_bizz” {

​ json {

​ source => “message”

​ }

​ date {

​ match => [“timestamp”, “yyyy-MM-dd HH:mm:ss,SSS”]

​ }

​ }

​ mutate {

​ remove_field => ["@version", “beat”, “logTime”]

​ }

}

output {

​ stdout{

​ }

​ elasticsearch {

​ hosts => [“127.0.0.1:9200”]

​ index => “filebeat-%{type}-%{+yyyy.MM.dd}”

​ document_type => “%{documentType}”

​ template_overwrite => true

​ }

}

五、Logstash注意事项

1、问题记录

· 启动logstash慢,输入./bin/logstash没有反应,多出现在新安装的操作系统上

原因

jruby启动的时候jdk回去从/dev/random中初始化随机数熵,新版本的jruby会用RPNG算法产生随后的随机数,但是旧版本的jruby会持续从/dev/random中获取数字。但是不幸的是,random发生器会跟不上生成速度,所以获取随机数的过程会被阻塞,直到随机数池拥有足够的熵然后恢复。这在某些系统上,尤其是虚拟化系统,熵数池可能会比较小从而会减慢jruby的启动速度。

检查一下系统的熵数池 cat /proc/sys/kernel/random/entropy_avail,正常情况这个数字推荐大于1000,对比了一下独立主机的这个数值,大约在700-900之间晃悠。

o 解决

使用伪随机,编辑/usr/local/logstash/logstash-5.4.1/config/jvm.options,在最后增加一行:-Djava.security.egd=file:/dev/urandom