如何在Spring Integration自定义消息处理程序中自动装入Bean?
问题描述:
我希望创建一个自定义消息处理程序来使用流中的检查点。此外,这些检查点将存储在ElasticSearch。如何在Spring Integration自定义消息处理程序中自动装入Bean?
我创建了一个类检查点:
@Component
public class Checkpoint {
public static final String TASK_HEADER_KEY = "task";
public static CheckpointMessageHandlerSpec warn(String message) {
return new CheckpointMessageHandlerSpec(new CheckpointHandler("WARN", message));
}
}
// ... methods omitted: error, info etc
接下来,我创建CheckpointMessageHandlerSpec:
public class CheckpointMessageHandlerSpec extends MessageHandlerSpec<CheckpointMessageHandlerSpec, CheckpointHandler> {
public CheckpointMessageHandlerSpec(CheckpointHandler checkpointHandler) {
this.target = checkpointHandler;
}
public CheckpointMessageHandlerSpec apply(Message<?> message) {
this.target.handleMessage(message);
return _this();
}
@Override
protected CheckpointHandler doGet() {
throw new UnsupportedOperationException();
}
}
CheckpointHandler,在这个类我想注入的东西,比如服务或资料库来自Spring的数据:
public class CheckpointHandler extends IntegrationObjectSupport implements MessageHandler {
private String status;
private String message;
// I want inject services or repositories here
public CheckpointHandler(String status, String message) {
this.status = status;
this.message = message;
}
@Override
public void handleMessage(Message<?> message) {
// Test to watch if I have the bean factory. It is always null
this.getBeanFactory();
Expression expression = EXPRESSION_PARSER.parseExpression("'" + this.message + "'");
// Here I intend to persist information of payload/headers with spring-data-elasticSearch repository previously injected
Object obj = expression.getValue(message);
}
}
最后,使用的例子,一个流之内:
@Bean
public IntegrationFlow checkpointFlow(Checkpoint checkpoint) {
return IntegrationFlows.from(Http.inboundChannelAdapter("/checkpointFlow"))
.enrichHeaders(Collections.singletonMap(Checkpoint.TASK_HEADER_KEY, taskName))
.handle(new AppendMessageHandler())
.wireTap(c -> c.handle(m -> checkpoint.warn("SOMETHING IS HAPPENING HERE. MY PAYLOAD: ' + payload.toString() + '").apply(m)))
.handle(m -> log.info("[LOGGING DEMO] {}" , m.getPayload()))
.get();
}
private class AppendMessageHandler implements GenericHandler {
@Override
public String handle(Object payload, Map headers) {
return new StringBuilder().append(testMessage).toString();
}
}
我错过?有可能这样做吗?我在这个问题后有这个想法How to create custom component and add it to flow in spring java dsl?
谢谢!
答
Bean可以自动布线,如果它们是,那么就是bean。
让我们再看看你的代码!
c.handle(m -> checkpoint.warn("SOMETHING IS HAPPENING HERE. MY PAYLOAD: ' + payload.toString() + '").apply(m))
真正的豆在这里正是Lambda :)。当然,可悲,但不是你的自定义工厂,随后apply()
。您的自定义代码将在每个传入消息的目标Lambda中完全调用,但不知道BeanFactory
。
要解决你的问题,你应该用你的工厂如:
.wireTap(c -> c.handle(checkpoint.warn("SOMETHING IS HAPPENING HERE. MY PAYLOAD: ' + payload.toString() + '")))
和框架,将需要大约为豆你注册并CheckpointHandler
因此,自动装配照顾。
正如你可能已经猜到,你不需要apply()
方法。仅仅因为需要区分汇编阶段,当Java DSL填充bean的树时。初始化和注册阶段,当这个树被框架分析并且bean被注册在应用程序上下文中时。最后,还有一个运行阶段,即当消息从一个通道传送到另一个通道时,虽然所有这些消息处理程序,变换器等都有。