弹性搜索:GOT java.lang.IllegalArgumentException异常:对象的编号通过必须是偶数,但为[1]

问题描述:

ElasticSearch版本 - 2.4.0弹性搜索:GOT java.lang.IllegalArgumentException异常:对象的编号通过必须是偶数,但为[1]

日志:

java.lang.IllegalArgumentException: The number of object passed must be even but was [1] 
     at org.elasticsearch.action.index.IndexRequest.source(IndexRequest.java:451) 
     at org.elasticsearch.action.index.IndexRequestBuilder.setSource(IndexRequestBuilder.java:186) 
     at org.apache.kafka.connect.elasticsearchschema.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:138) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:381) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) 
     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) 

代码:

鉴于
// This method will put the SinkRecords which are sent in bulk to Elastic Search with proper index and type. 
public void put(Collection<SinkRecord> sinkRecords) { 
    try { 
     // Gets a list of SinkRecord from Kafka broker. 
     List<SinkRecord> records = new ArrayList<SinkRecord>(sinkRecords); 
     for (int i = 0; i < records.size(); i++) { 
     BulkRequestBuilder bulkRequest = client.prepareBulk(); 
     // Looping through the SinkRecords and the size should be less than bulksize. 
     for (int j = 0; j < bulkSize && i < records.size(); j++, i++) { 
      SinkRecord record = records.get(i); 
      // Index and type is hardcoded and record.value() contains the Json message. 
      bulkRequest.add(client.prepareIndex("operative1", "test").setSource(record.value())); 
     } 
     i--; 
     // Executing bulk requests. 
     BulkResponse bulkResponse = bulkRequest.execute().actionGet(); 
     } 
    } catch (Exception e) { 
    } 
    } 

输入是 - >{ "id1": "file", "value1": "File" }

请帮助解决这个问题。

+0

我甚至尝试使用下面的代码保持单个记录并面临同样的问题。 IndexResponse response = client.prepareIndex(“operative1”,“test”).setSource(record.value())。execute() .actionGet(); – Renukaradhya

+1

[ElasticsearchSinkTask]中的'put'方法(https://github.com/DataReply/kafka-connect-elastic-search-sink/blob/master/src/main/java/org/apache/kafka/connect/elasticsearchschema /ElasticsearchSinkTask.java#L119)将一个Map(它是一个键,值对)传递给'setSource(...)'。也许你可以看看这里。 – Kammeyer

+0

或者你可能会看看:[传递的对象的弹性搜索数量必须是偶数](https://stackoverflow.com/questions/39187097/elastic-search-number-of-object-passed-must-be-even ?rq = 1) – Kammeyer

最终代码看起来像这样通过传递Map。

// This method will put the SinkRecords which are sent in bulk to Elastic Search with proper index and type. 
public void put(Collection<SinkRecord> sinkRecords) { 
    try { 
     ObjectMapper mapper = new ObjectMapper(); 
     // Gets a list of SinkRecord from Kafka broker. 
     List<SinkRecord> records = new ArrayList<SinkRecord>(sinkRecords); 
     for (int i = 0; i < records.size(); i++) { 
     BulkRequestBuilder bulkRequest = client.prepareBulk(); 
     // Looping through the SinkRecords and the size should be less than bulksize. 
     for (int j = 0; j < bulkSize && i < records.size(); j++, i++) { 
      SinkRecord record = records.get(i); 
      // Index and type is hardcoded and record.value() contains the Json message. 
      Map<String, Object> map = mapper.readValue((String) record.value(), new TypeReference<Map<String, Object>>() { 
      }); 
      bulkRequest.add(client.prepareIndex("operative1", "test").setSource(map)); 
     } 
     i--; 
     // Executing bulk requests. 
     BulkResponse bulkResponse = bulkRequest.execute().actionGet(); 
     } 
    } catch (Exception e) { 
    } 
    }