通过星火流

问题描述:

我是新来的星火道歉问这样一个问题,从阅读卡夫卡经纪人主题的特定分区的数据。我有一个用例,我想在Spark Streaming的帮助下从主题的特定分区读取数据。我正在使用Spark Java API来做所有的事情。通过星火流

我已创建了复制因子2和5个分区一个名为test的话题。希望在火花流Kafka集成指南的帮助下,我能够完成所有这些工作,比如创建一个JavaStreamingContext对象,创建一个到Kafka代理的直接流,并能够读取所有分区中的所有消息。

但是还是我的使用情况不符合,我要读卡夫卡的经纪人,而不是从所有分区中的所有消息的主题的特定分区中的唯一消息。

你应该能够读取特定分区从特定的偏移使用下面的代码。

Map<TopicAndPartition, Long> consumerOffsets = new HashMap<TopicAndPartition, Long>(); 
TopicAndPartition p1 = new TopicAndPartition("yourtopic","yourpartition"); 
consumerOffsets.put(p1,offset); 

JavaInputDStream<String> messages = KafkaUtils.createDirectStream(
     jssc, 
     String.class, 
     String.class, 
     StringDecoder.class, 
     StringDecoder.class, 
     String.class, 
     kafkaParams, 
     consumerOffsetsLong, 
     new Function<MessageAndMetadata<String, String>, String>() { 
      public String call(MessageAndMetadata<String, String> msgAndMeta) throws Exception { 
       return msgAndMeta.message(); 
      } 
     } 
);