如何将JavaInputDStream JSON转换为ElasticSearch JAVA

问题描述:

嗨,大家好我正在与kafka> spark streaming> Elasticsearch合作。 但我不做火花流JavaInputDStream JSON elasticsearch。如何将JavaInputDStream JSON转换为ElasticSearch JAVA

我的代码:

SparkConf conf = new SparkConf() 
      .setAppName("Streaming") 
      .setMaster("local") 
      .set("es.nodes","localhost:9200") 
      .set("es.index.auto.create","true"); 
    JavaStreamingContext streamingContext = new JavaStreamingContext(conf, new Duration(5000)); 
    Map<String, Object> kafkaParams = new HashMap<>(); 
    kafkaParams.put("bootstrap.servers", "localhost:9092"); 
    kafkaParams.put("key.deserializer", StringDeserializer.class); 
    kafkaParams.put("value.deserializer", StringDeserializer.class); 
    kafkaParams.put("group.id", "exastax"); 
    kafkaParams.put("auto.offset.reset", "latest"); 
    kafkaParams.put("enable.auto.commit", false); 

    Collection<String> topics = Arrays.asList("loglar"); 
    JavaInputDStream<ConsumerRecord<String, String>> stream = 
      KafkaUtils.createDirectStream(
        streamingContext, 
        LocationStrategies.PreferConsistent(), 
        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams) 
      ); 

    JavaPairDStream<String, String> finisStream = stream.mapToPair(record -> new Tuple2<>("", record.value())); 
    finisStream.print(); 
    JavaEsSparkStreaming.saveJsonToEs(finisStream,"spark/docs"); 
    streamingContext.start(); 
    streamingContext.awaitTermination(); 


} 

JavaEsSparkStreaming.saveJsonToEs(finisStream, “火花/文档”); >> finisStream不工作,因为它不是JavaDStream。 如何转换JavaDStream?

JavaEsSparkStreaming.saveJsonToEs作品与JavaDStream

JavaEsSparkStreaming.saveToEsWithMeta作品与JavaPairDStream

要修复代码:

JavaDStream<String> finisStream = stream.map(new Function<Tuple2<String, String>, String>() { 
    public String call(Tuple2<String, String> stringStringTuple2) throws Exception { 
     return stringStringTuple2._2(); 
    } 
}); 

JavaEsSparkStreaming.saveJsonToEs(finisStream,""); 

日Thnx很多关于答案!但我解决了这个代码:

JavaDStream<String> stream1 = stream.map(
       new Function<ConsumerRecord<String, String>, String>() { 
        @Override 
        public String call(ConsumerRecord<String, String> r) { 
         return r.value(); 
        } 
       } 
     ); 
      JavaEsSparkStreaming.saveJsonToEs(stream1,"spark/docs");