带弹性搜索输出的Logstash:如何写入不同的索引?

问题描述:

我希望能够在这里找到自我昨天以来一直在努力的问题的答案:带弹性搜索输出的Logstash:如何写入不同的索引?

我使用rabbitMQ输入和elasticsearch输出配置Logstash 1.5.6。

的消息刊登在RabbitMQ的散装形式,我logstash消耗它们,并写入他们都elasticsearch默认索引logstash-YYY.MM.DD此配置:

input { 
    rabbitmq { 
    host => 'xxx' 
    user => 'xxx' 
    password => 'xxx' 
    queue => 'xxx' 
    exchange => "xxx" 
    key => 'xxx' 
    durable => true 
} 

output { 
    elasticsearch { 
    host => "xxx" 
    cluster => "elasticsearch" 
    flush_size =>10 
    bind_port => 9300 
    codec => "json" 
    protocol => "http" 
    } 
stdout { codec => rubydebug } 
} 

现在我正在努力do将消息发送给不同的elasticsearch索引。

来自amqp输入的消息已经有索引和类型参数(批量格式)。

所以阅读文档后: https://www.elastic.co/guide/en/logstash/1.5/event-dependent-configuration.html#logstash-config-field-references

我尝试这样做,

input { 
    rabbitmq { 
    host => 'xxx' 
    user => 'xxx' 
    password => 'xxx' 
    queue => 'xxx' 
    exchange => "xxx" 
    key => 'xxx' 
    durable => true 
} 

output { 
    elasticsearch { 
    host => "xxx" 
    cluster => "elasticsearch" 
    flush_size =>10 
    bind_port => 9300 
    codec => "json" 
    protocol => "http" 
    index => "%{[index][_index]}" 
    } 
stdout { codec => rubydebug } 
} 

但是logstash正在做的是创建索引%{[指数] [_指数]},并把那里所有的docs而不是阅读_index参数并发送文档!

我也试过如下:

index => %{index} 
index => '%{index}' 
index => "%{index}" 

但没有一个似乎工作。

任何帮助?

要恢复,这里的主要问题是:如果RabbitMQ的消息具有以下格式:

{"index":{"_index":"indexA","_type":"typeX","_ttl":2592000000}} 
{"@timestamp":"2017-03-09T15:55:54.520Z","@version":"1","@fields":{DATA}} 

如何告诉logstash发送名为“指数A”型“而TYPEx”索引输出? ?

所以每个人,与瓦尔的帮助下,解决办法是:

  • 正如他所说的,因为RabbitMQ的消息已经在批量格式,无需使用elasticsearch输出,HTTP输出到_bulk API将使(我傻)
  • 所以我替换为输出:

    output { 
        http { 
        http_method => "post" 
        url => "http://172.16.1.81:9200/_bulk" 
        format => "message" 
        message => "%{message}" 
    } 
    stdout { codec => json_lines } 
    } 
    
  • 但它仍然是行不通的。我使用的是Logstash 1.5.6,升级到Logstash 2.0.0(https://www.elastic.co/guide/en/logstash/2.4/_upgrading_using_package_managers.html)后,它使用相同的配置。

在那里,它是:)

+0

基本上,我的解决方案+升级到Logstash 2 ;-) – Val

+0

+您可以删除标准输出,这只是为了调试 – Val

+0

随着升级您的问题的elasticsearch输出应该也工作 –

如果您在RabbitMQ的消息已经在批量格式,那么你不需要使用elasticsearch输出,但一个简单的http输出击中_bulk端点会做的伎俩:

output { 
    http { 
     http_method => "post" 
     url => "http://localhost:9200/_bulk" 
     format => "message" 
     message => "%{message}" 
    } 
} 
+0

谢谢回答,但我得到与你的方法如下: “[HTTP输出故障]遇到的非200 HTTP代码200”,:RESPONSE_CODE => 400,:URL =>“HTTP:// localhost:9200/_bulk“, –

+0

酷,很高兴它帮助!它运作了吗? – Val

+0

我试图将日志错误追加到我的答案中,但字符数量有限,如果没有发布未完成的评论,我甚至不能去“内联”! –

如果存储在RabbitMQ的JSON消息,那么这个问题就可以解决了。 在JSON消息中使用索引和类型作为字段,并将这些值分配给Elasticsearch输出插件。

指数=> “%{索引}” //从JSON体INDEX从卡夫卡生产者DOCUMENT_TYPE => “%{类型}”} //从JSON体

TYPE随着接收这种做法,每条消息都可以有自己的索引和类型。