解析多JSON在logstash

问题描述:

神交我已经得到了格式的JSON:解析多JSON在logstash

{ 
    "SOURCE":"Source A", 
    "Model":"ModelABC", 
    "Qty":"3" 
} 

我试图解析使用logstash这个JSON。基本上我想让logstash输出成为我可以使用kibana分析的key:value对的列表。我认为这可以开箱即用。从大量的阅读中,我明白我必须使用grok插件(我仍然不确定json插件的用途)。但我无法获得所有领域的活动。我得到了多个事件(即使对于我的JSON的每个属性也是如此)。像这样:

{ 
     "message" => " \"SOURCE\": \"Source A\",", 
     "@version" => "1", 
    "@timestamp" => "2014-08-31T01:26:23.432Z", 
      "type" => "my-json", 
      "tags" => [ 
     [0] "tag-json" 
    ], 
      "host" => "myserver.example.com", 
      "path" => "/opt/mount/ELK/json/mytestjson.json" 
} 
{ 
     "message" => " \"Model\": \"ModelABC\",", 
     "@version" => "1", 
    "@timestamp" => "2014-08-31T01:26:23.438Z", 
      "type" => "my-json", 
      "tags" => [ 
     [0] "tag-json" 
    ], 
      "host" => "myserver.example.com", 
      "path" => "/opt/mount/ELK/json/mytestjson.json" 
} 
{ 
     "message" => " \"Qty\": \"3\",", 
     "@version" => "1", 
    "@timestamp" => "2014-08-31T01:26:23.438Z", 
      "type" => "my-json", 
      "tags" => [ 
     [0] "tag-json" 
    ], 
      "host" => "myserver.example.com", 
      "path" => "/opt/mount/ELK/json/mytestjson.json" 
} 

我应该使用多行编解码器还是json_lines编解码器?如果是这样,我该怎么做?我是否需要编写自己的Grok模式,或者是否有JSON的通用方法,这会为我提供一个事件,其中包含我为上述一个事件获取的键值对:我找不到任何有关这方面的文件。任何帮助,将不胜感激。我的conf文件显示如下:

input 
{ 
     file 
     { 
       type => "my-json" 
       path => ["/opt/mount/ELK/json/mytestjson.json"] 
       codec => json 
       tags => "tag-json" 
     } 
} 

filter 
{ 
    if [type] == "my-json" 
    { 
     date { locale => "en" match => [ "RECEIVE-TIMESTAMP", "yyyy-mm-dd HH:mm:ss" ] } 
    } 
} 

output 
{ 
     elasticsearch 
     { 
       host => localhost 
     } 
     stdout { codec => rubydebug } 
} 

我想我找到了对我的问题的工作答案。我不确定它是否是一个干净的解决方案,但它有助于解析上述类型的多行JSON。

input 
{ 
    file 
    { 
     codec => multiline 
     { 
      pattern => '^\{' 
      negate => true 
      what => previous     
     } 
     path => ["/opt/mount/ELK/json/*.json"] 
     start_position => "beginning" 
     sincedb_path => "/dev/null" 
     exclude => "*.gz" 
    } 
} 

filter 
{ 
    mutate 
    { 
     replace => [ "message", "%{message}}" ] 
     gsub => [ 'message','\n',''] 
    } 
    if [message] =~ /^{.*}$/ 
    { 
     json { source => message } 
    } 

} 

output 
{ 
    stdout { codec => rubydebug } 
} 

我mutliline编解码器不处理的最后一个大括号,因此它不会出现作为一个JSON来json { source => message }。因此,变异滤波器:

replace => [ "message", "%{message}}" ] 

这增加了缺失的大括号。和

gsub => [ 'message','\n',''] 

删除引入的\n字符。在最后,我有一个可以通过json { source => message }

读取的单行JSON如果有更简单/更简单的方法将原始多行JSON转换为单行JSON,请执行POST操作感觉上面不太干净。

您将需要使用multiline编解码器。

input { 
    file { 
    codec => multiline { 
     pattern => '^{' 
     negate => true 
     what => previous 
    } 
    path => ['/opt/mount/ELK/json/mytestjson.json'] 
    } 
} 
filter { 
    json { 
    source => message 
    remove_field => message 
    } 
} 

您遇到的问题必须与文件中的最后一个事件有关。直到文件中出现另一个事件(因此基本上会丢失文件中的最后一个事件),它才会显示出来 - 您可以在文件轮换处理这种情况之前在文件中附加一个{

+0

感谢Alcanzar,我得到一个JSON解析失败,但: [0] “_jsonparsefailure” 试图改变模式 模式=> '^ \ {' ,但仍是同样的事情。而且我的文件每个文件只有1个JSON,即只有一个{或}字符。每个文件都将是一个事件(1个文件= 1个JSON = 1个事件) – Dan 2014-09-04 15:30:56

+0

您可能需要将'start_postion =>开始'添加到您的文件输入以确保它始于记录的开头......还有什么其他人在你的文件? (你可以删除过滤器,只需添加一个'output {stdout {}}'来查看它传递给json过滤器的过程) – Alcanzar 2014-09-04 15:34:09

+0

我注意到我的生产JSON确实有额外的“{”和“}”:(So我的JSON实际上是:{“SOURCE”:“Source A”,“Model”:“ModelABC”,“Qty”:“3”“DESC”:“{\”New prod-125 \“}”}在评论中没有很好地解析) 我无法对这些JSON进行更改,我们从另一个源接收它们,并且需要按原样使用 – Dan 2014-09-04 17:16:19