来自单个主题的多个流
问题描述:
如何从单个主题创建多个流?当我做这样的事情:来自单个主题的多个流
KStreamBuilder builder = new KStreamBuilder();
builder.stream(Serdes.String(), Serdes.String(), "master")
/* Filtering logic */
.to(Serdes.String(), Serdes.String(), "output1");
builder.stream(Serdes.String(), Serdes.String(), "master")
/* Filtering logic */
.to(Serdes.String(), Serdes.String(), "output2");
KafkaStreams streams = new KafkaStreams(builder, /* config */);
streams.start();
我得到以下错误:
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic master has already been registered by another source.
at org.apache.kafka.streams.processor.TopologyBuilder.addSource(TopologyBuilder.java:347)
at org.apache.kafka.streams.kstream.KStreamBuilder.stream(KStreamBuilder.java:92)
我需要做KafkaStreams的另一个实例从“大师”每个流?
答
您可以创建一个KStream可以重复使用:
KStream<String, String> inputStream = builder.stream(Serdes.String(), Serdes.String(), "master");
,那么你可以重复使用它:
inputStream.filter(..logic1)
.to(Serdes.String(), Serdes.String(), "output1");
inputStream.filter(..logic2)
.to(Serdes.String(), Serdes.String(), "output2");
KafkaStreams streams = new KafkaStreams(builder, /* config */);
streams.start();
如果你没有在你的过滤器的重叠,你也可以使用' inputStream.branch()'返回非重叠子流。 –