带弹性搜索输出的Logstash:如何写入不同的索引?
我希望能够在这里找到自我昨天以来一直在努力的问题的答案:带弹性搜索输出的Logstash:如何写入不同的索引?
我使用rabbitMQ输入和elasticsearch输出配置Logstash 1.5.6。
的消息刊登在RabbitMQ的散装形式,我logstash消耗它们,并写入他们都elasticsearch默认索引logstash-YYY.MM.DD此配置:
input {
rabbitmq {
host => 'xxx'
user => 'xxx'
password => 'xxx'
queue => 'xxx'
exchange => "xxx"
key => 'xxx'
durable => true
}
output {
elasticsearch {
host => "xxx"
cluster => "elasticsearch"
flush_size =>10
bind_port => 9300
codec => "json"
protocol => "http"
}
stdout { codec => rubydebug }
}
现在我正在努力do将消息发送给不同的elasticsearch索引。
来自amqp输入的消息已经有索引和类型参数(批量格式)。
我尝试这样做,
input {
rabbitmq {
host => 'xxx'
user => 'xxx'
password => 'xxx'
queue => 'xxx'
exchange => "xxx"
key => 'xxx'
durable => true
}
output {
elasticsearch {
host => "xxx"
cluster => "elasticsearch"
flush_size =>10
bind_port => 9300
codec => "json"
protocol => "http"
index => "%{[index][_index]}"
}
stdout { codec => rubydebug }
}
但是logstash正在做的是创建索引%{[指数] [_指数]},并把那里所有的docs而不是阅读_index参数并发送文档!
我也试过如下:
index => %{index}
index => '%{index}'
index => "%{index}"
但没有一个似乎工作。
任何帮助?
要恢复,这里的主要问题是:如果RabbitMQ的消息具有以下格式:
{"index":{"_index":"indexA","_type":"typeX","_ttl":2592000000}}
{"@timestamp":"2017-03-09T15:55:54.520Z","@version":"1","@fields":{DATA}}
如何告诉logstash发送名为“指数A”型“而TYPEx”索引输出? ?
所以每个人,与瓦尔的帮助下,解决办法是:
- 正如他所说的,因为RabbitMQ的消息已经在批量格式,无需使用elasticsearch输出,HTTP输出到_bulk API将使(我傻)
-
所以我替换为输出:
output { http { http_method => "post" url => "http://172.16.1.81:9200/_bulk" format => "message" message => "%{message}" } stdout { codec => json_lines } }
但它仍然是行不通的。我使用的是Logstash 1.5.6,升级到Logstash 2.0.0(https://www.elastic.co/guide/en/logstash/2.4/_upgrading_using_package_managers.html)后,它使用相同的配置。
在那里,它是:)
如果您在RabbitMQ的消息已经在批量格式,那么你不需要使用elasticsearch
输出,但一个简单的http
输出击中_bulk
端点会做的伎俩:
output {
http {
http_method => "post"
url => "http://localhost:9200/_bulk"
format => "message"
message => "%{message}"
}
}
谢谢回答,但我得到与你的方法如下: “[HTTP输出故障]遇到的非200 HTTP代码200”,:RESPONSE_CODE => 400,:URL =>“HTTP:// localhost:9200/_bulk“, –
酷,很高兴它帮助!它运作了吗? – Val
我试图将日志错误追加到我的答案中,但字符数量有限,如果没有发布未完成的评论,我甚至不能去“内联”! –
如果存储在RabbitMQ的JSON消息,那么这个问题就可以解决了。 在JSON消息中使用索引和类型作为字段,并将这些值分配给Elasticsearch输出插件。
指数=> “%{索引}” //从JSON体INDEX从卡夫卡生产者DOCUMENT_TYPE => “%{类型}”} //从JSON体
TYPE随着接收这种做法,每条消息都可以有自己的索引和类型。
基本上,我的解决方案+升级到Logstash 2 ;-) – Val
+您可以删除标准输出,这只是为了调试 – Val
随着升级您的问题的elasticsearch输出应该也工作 –