弹性搜索:GOT java.lang.IllegalArgumentException异常:对象的编号通过必须是偶数,但为[1]
问题描述:
ElasticSearch版本 - 2.4.0弹性搜索:GOT java.lang.IllegalArgumentException异常:对象的编号通过必须是偶数,但为[1]
日志:
java.lang.IllegalArgumentException: The number of object passed must be even but was [1]
at org.elasticsearch.action.index.IndexRequest.source(IndexRequest.java:451)
at org.elasticsearch.action.index.IndexRequestBuilder.setSource(IndexRequestBuilder.java:186)
at org.apache.kafka.connect.elasticsearchschema.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:138)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:381)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
代码:
鉴于// This method will put the SinkRecords which are sent in bulk to Elastic Search with proper index and type.
public void put(Collection<SinkRecord> sinkRecords) {
try {
// Gets a list of SinkRecord from Kafka broker.
List<SinkRecord> records = new ArrayList<SinkRecord>(sinkRecords);
for (int i = 0; i < records.size(); i++) {
BulkRequestBuilder bulkRequest = client.prepareBulk();
// Looping through the SinkRecords and the size should be less than bulksize.
for (int j = 0; j < bulkSize && i < records.size(); j++, i++) {
SinkRecord record = records.get(i);
// Index and type is hardcoded and record.value() contains the Json message.
bulkRequest.add(client.prepareIndex("operative1", "test").setSource(record.value()));
}
i--;
// Executing bulk requests.
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
}
} catch (Exception e) {
}
}
输入是 - >{ "id1": "file", "value1": "File" }
请帮助解决这个问题。
答
最终代码看起来像这样通过传递Map。
// This method will put the SinkRecords which are sent in bulk to Elastic Search with proper index and type.
public void put(Collection<SinkRecord> sinkRecords) {
try {
ObjectMapper mapper = new ObjectMapper();
// Gets a list of SinkRecord from Kafka broker.
List<SinkRecord> records = new ArrayList<SinkRecord>(sinkRecords);
for (int i = 0; i < records.size(); i++) {
BulkRequestBuilder bulkRequest = client.prepareBulk();
// Looping through the SinkRecords and the size should be less than bulksize.
for (int j = 0; j < bulkSize && i < records.size(); j++, i++) {
SinkRecord record = records.get(i);
// Index and type is hardcoded and record.value() contains the Json message.
Map<String, Object> map = mapper.readValue((String) record.value(), new TypeReference<Map<String, Object>>() {
});
bulkRequest.add(client.prepareIndex("operative1", "test").setSource(map));
}
i--;
// Executing bulk requests.
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
}
} catch (Exception e) {
}
}
我甚至尝试使用下面的代码保持单个记录并面临同样的问题。 IndexResponse response = client.prepareIndex(“operative1”,“test”).setSource(record.value())。execute() .actionGet(); – Renukaradhya
[ElasticsearchSinkTask]中的'put'方法(https://github.com/DataReply/kafka-connect-elastic-search-sink/blob/master/src/main/java/org/apache/kafka/connect/elasticsearchschema /ElasticsearchSinkTask.java#L119)将一个Map(它是一个键,值对)传递给'setSource(...)'。也许你可以看看这里。 – Kammeyer
或者你可能会看看:[传递的对象的弹性搜索数量必须是偶数](https://stackoverflow.com/questions/39187097/elastic-search-number-of-object-passed-must-be-even ?rq = 1) – Kammeyer