是否可以使用Kafka Streams访问消息标题?
问题描述:
在卡夫卡0.11中添加了Headers到记录(ProducerRecord & ConsumerRecord),在使用Kafka Streams处理主题时是否可以获取这些标题?当在KStream
调用诸如map
方法提供了key
和记录的value
的论点,但没有办法,我可以看到访问headers
。如果我们可以通过map
而不是ConsumerRecord
这将是很好的。是否可以使用Kafka Streams访问消息标题?
ex。
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((key, value) -> ...) // can I get access to headers in methods like map, filter, aggregate, etc?
...
像这样的工作:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((record) -> {
record.headers();
record.key();
record.value();
})
...
答
记录头是目前不流API进行访问。有一个JIRA增加,虽然这一功能:通过https://issues.apache.org/jira/browse/KAFKA-5632
在当前卡夫卡(0.11和1.0,这将是释放不久)就可以通过处理器API访问记录元数据(即,通过transform()
,transformValues()
,或process()
)给出“上下文”对象。它公开主题,分区,偏移量和时间戳。 Cf https://docs.confluent.io/current/streams/developer-guide.html#applying-processors-and-transformers-processor-api-integration
元数据在DSL级别上不可用。但是,还有一些工作正在进行中,以扩展DSL:https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams
为了阐明Matthias所说的话:是的,Kafka Streams中的Processor API可以访问记录元数据,如主题名称,分区号,偏移量等。 Kafka Streams中的DSL不允许您访问。但是,因为你可以将处理器API和DSL,你仍然可以编写使用DSL的'变换()'或'transformValues()'功能,它允许您通过访问记录元数据的基于DSL的流处理应用在处理器API的处理器/变换器中。 –
感谢大家提供的信息,我会密切关注元数据添加到DSL级别的情况,以便可以更新此答案。 –
@ MatthiasJ.Sax和@ MichaelG.Noll:在https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams,对于'RecordContext'建议,它似乎没有暴露头部。那是会添加的东西吗? –