搭建ELK(ElasticSearch+Logstash+Kibana)日志分析系统(三) logstash input output 配置
前面两节已经介绍了Logstash以及常见的配置语法,这一节介绍一下input、output的配置,然后做个file输入输出的demo。这里我介绍的都是比较常见的插件,其他插件用法类似。
1、input配置
1)读取文件(file)
Logstash 使用一个名叫 FileWatch 的 Ruby Gem 库来监听文件变化。这个库支持 glob 展开文件 路径,而且会记录一个 叫 .sincedb 的数据库文件来跟踪被监听的日志文件的当前读取位置。所以,不要担心 logstash 会 漏过你的数据。sincedb 文件 中记录了每个被监听的文件的 inode, major number, minor number 和 pos。
配置示例:
- input {
- file {
- path => ["/var/log/*.log", "/var/log/message"] ## 读取文件的路径,可以一次读取多个文件,也可以写多个file读取多个文件
- type => "blog_test" ## 添加type字段
- start_position => "beginning" ## 读取文件记录的开始位置
- }
- }
其他一些比较有用的配置项:
- ## logstash 每隔多久去检查一次被监听的 path 下是否有新文件。默认值是 15 秒。
- discover_interval
- ## 不想被监听的文件可以排除出去,这里跟 path 一样支持 glob 展开。
- exclude
- ## 一个已经监听中的文件,如果超过这个值的时间内没有更新内容,就关闭监听它的文件句柄。默认是 3600 秒,即一小时。
- close_older
- ## 在每次检查文件列表的时候,如果一个文件的最后修改时间超过这个值,就忽略这个文件。默认是 86400 秒,即一天。
- ignore_older
- ## 如果你不想用默认的 $HOME/.sincedb(Windows 平台上在 C:\Windows\System32\config\systemprofile\.sincedb),可以通过这个配置定义 sincedb 文件到其他位置
- sincedb_path
- ## logstash 每隔多久写一次 sincedb 文件,默认是 15 秒。
- sincedb_write_interval
- ## logstash 每隔多久检查一次被监听文件状态(是否有更新),默认是 1 秒。
- stat_interval
- ## logstash 从什么位置开始读取文件数据,默认是结束位置,也就是说 logstash 进程会以类似 tail -F 的形式运行。如果你是要导入原有数据,把这个设定改成 "beginning",logstash 进程就从头开始读取,类似 less +F 的形式运行。
- start_position
2) 读取redius
还记得上一节的流程图吗,logstash可以通过redis来扩展,简单说说,就是可以将一个Logstash的输出通过redis作为消息 队列,作为另一个Logstash的输入。
配置示例:
- input {
- redis {
- host => "192.168.1.1" ## 从哪个主机读取redis数据
- port => "6380" ## 相应的端口,默认为6379,我习惯用6380
- codec => "json" ## 读取的数据格式
- data_type => "list" ## 读取的数据类型
- key => "logstash:redis" ## 相应监听的keys
- password => "123qwe" #如果有安全认证,此项为密码
- db => 0 #redis数据库的编号
- threads => 1 #启用线程数量
- }
- }
3)从终端输入 —— 一般用来测试
- input {
- stdin {
- add_field => {"field_key" => "field_value"} ## 添加若干字段
- codec => "plain" ## 输出格式
- tags => ["add"] ## 添加tags
- type => "std" ## 添加类型字段
- }
- }
2、output配置
1)输出到Elasticsearch
ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。一般通过logstash将日志格式化后,
输出到elasticSearch保存
配置示例:
- elasticsearch {
- action => "index" ## 建立索引的动作,似乎可以省略
- hosts => "192.168.1.1:9200" ## elasticSearch所在的主机地址,默认端口一般为9200
- index => "husen_test" ## 索引的名称
- codec => "json" ## 输出的codec格式,codec插件在后面会提到
- }
2)保存到文件
将logstash解析后的JSON格式的日志保存到文件里
- output {
- file {
- path => "/home/husen/result.log.gz" ## 保存文件的路径
- message_format => "%{message}" ## 可选项 保存的格式,默认输出所有字段,此处只输出message字段的内容
- gzip => true ## 可选项 是否压缩
- }
- }
3)输出到终端 —— 一般用来测试
- output {
- stdout {
- codec => rubydebug ## 输出格式,此处为调试格式,即JSON
- workers => 2 ##线程数,默认为系统核数
- }
- }
3、stdin->(file 和 stdout) demo演示
演示环境:centos7 + logstash 5.5.1
配置文件如下:
- input {
- # 从终端输入
- stdin{}
- }
- output{
- # JSON格式输出到终端
- stdout{
- codec => rubydebug
- }
- # 保存到文件
- file{
- path => ["/home/husen/logstash_result/test_result.log"]
- }
- }
步骤:
a. 打开终端,进入 /home/husen目录下,将事先写好的配置文件上传到该目录
b. 输入以下命令运行
- ./logstash-5.5.1/bin/logstash -f in_out_test.conf
c. 运行成功之后,直接输入 hello world 并回车,终端上以JSON格式输出解析后的内容
d. crtl+c 退出,进入到配置文件里的保存日志日志的目录,查看结果
- ls
- cd logstash_result/
- ls
- cat test_result.log
- ## 结果为,第一次把hello world 输错了,又重输了1次,所以有两行记录.....
- {"@timestamp":"2017-09-13T03:25:20.937Z","@version":"1","host":"localhost.localdomain","message":"hello wprld"}
- {"@timestamp":"2017-09-13T03:25:31.417Z","@version":"1","host":"localhost.localdomain","message":"hello world"}