通过星火流
问题描述:
我是新来的星火道歉问这样一个问题,从阅读卡夫卡经纪人主题的特定分区的数据。我有一个用例,我想在Spark Streaming的帮助下从主题的特定分区读取数据。我正在使用Spark Java API来做所有的事情。通过星火流
我已创建了复制因子2和5个分区一个名为test的话题。希望在火花流Kafka集成指南的帮助下,我能够完成所有这些工作,比如创建一个JavaStreamingContext对象,创建一个到Kafka代理的直接流,并能够读取所有分区中的所有消息。
但是还是我的使用情况不符合,我要读卡夫卡的经纪人,而不是从所有分区中的所有消息的主题的特定分区中的唯一消息。
答
你应该能够读取特定分区从特定的偏移使用下面的代码。
Map<TopicAndPartition, Long> consumerOffsets = new HashMap<TopicAndPartition, Long>();
TopicAndPartition p1 = new TopicAndPartition("yourtopic","yourpartition");
consumerOffsets.put(p1,offset);
JavaInputDStream<String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
String.class,
kafkaParams,
consumerOffsetsLong,
new Function<MessageAndMetadata<String, String>, String>() {
public String call(MessageAndMetadata<String, String> msgAndMeta) throws Exception {
return msgAndMeta.message();
}
}
);