如何将消息发送到SCDF中的两个不同的输出通道?

问题描述:

我有一个通过默认output通道将消息发送到流中的处理器的源。现在我想通过不同的渠道发送故障消息。如何将消息发送到SCDF中的两个不同的输出通道?

我想我应该创建一个可绑定的接口,该接口从Source延伸并添加额外的通道使用@Output。我如何确保SCDF实际为此频道创建卡夫卡话题? IOW,流定义是什么样的?

E.g.沿的

source | processor | sink source > error-sink

线的东西随着使用source | processor常规output信道/卡夫卡主题,并使用不同的信道/主题source > error-sink

如果需要跟踪下游处理的错误消息,则可以使用与Spring Cloud Stream关联的OOTB DLQ机制。它支持RabbitKafka。您可以在弹性云数据流(SCDF)中将DLQ启用为global setting或按流为单位。

如果你愿意仍然喜欢定义你的自定义渠道来处理不同的消息,你必须创建一个类似于此sample自定义接口。

在SCDF中部署流时,可以分别通过spring.cloud.stream.kafka.bindings.<channelName>.producerspring.cloud.stream.kafka.bindings.<channelName>.consumer绑定属性覆盖生产者和消费者之间的目标。

编辑:

虽然有上面的方法,我了解到,从春节云溪铅(@马吕斯-bogoevici)一个更简单的解决方案。

已经有一个可用的默认错误通道,Spring集成支持它。

有了这个,在您的应用程序中,您可以通过以下方式将自定义消息发送到默认错误通道:@Autowire @Qualifier("errorChannel")。实际上,这种支持是也是可用于所有OOTB应用程序。

然后,您可以通过以下方式覆盖此错误通道的目标:spring.cloud.stream.bindings.error.destination=errorchannel-test。在SCDF中,您可以在流式部署时通过--properties传递此信息。

例如:

流生成foo --definition “MYSOURCE |日志”

流部署富--properties“app.mysource.spring.cloud.stream.bindings.error.destination = errorchannel-test“

+0

谢谢,Sabby。我确实想要处理下游的错误消息,但不是使用与正常消息相同的处理器。此外,错误消息是自定义消息。所以看起来DLQ并不适合,对吗? 该示例有助于将Java代码连接到通道。然而,我正在努力的部分是如何让SCDF将频道连接到Kafka主题。我是否简单地创建从源到错误处理器的另一个流? –

+0

事实证明,您不需要在流定义中做任何事情来实现这一点。 在流应用程序中,只需使用'@ EnableBinding'和一个添加另一个通道的接口,就像Sabby所建议的那样。对于源代码,它必须是一个'MessageChannel'。对于处理器/汇,它必须是一个'SubscribableChannel'。 Spring Cloud Stream将负责将渠道映射到Kafka主题;所有你需要做的就是确保源和处理器/接收器使用相同的通道名称。 没有必要通过属性覆盖目标。 –