等待所有文件在触发下一个路由之前被消耗
问题描述:
我有路由消耗文件位置(递归)和路由处理文件在另一个文件位置。有时第一条路线可能会找到多个文件。等待所有文件在触发下一个路由之前被消耗
from(fileLocation)
.autoStartup(true)
.routeId("file-scanner")
.to(newFileLocation)
.to("direct:processFile")
from("direct:processFile")
.routeId("file-processor")
.bean(*doing some stuff with new file location*)
那么最终既然如此,有时是file-scanner
拷贝一个文件,file-processor
过程中的文件,然后复制file-scanner
多了一个文件,然后再次运行。
我基本上想要的是file-scanner
复制file-processor
开始处理文件之前的所有文件。这样,我只能运行一次处理。
的fileLocation
我从与配置看起来像这样定义的消耗:
recursive=true&noop=true&idempotent=false&delay=3600000&include=.*.json&autoCreate=false&startingDirectoryMustExist=true&readLock=none
答
所有决定围绕着一批消费者交换性能。 我想,你可以实现两个非常不同的解决方案:
解决方案的基础上,汇聚集成模式。您需要将批次中的所有文件汇总到可能的字符串列表中,因为您的文件包含JSON。聚合器选项“completionFromBatchConsumer”可以帮助您聚合该给定轮询中从File端点消耗的所有文件。聚合后,您可以一起处理聚合文件。也许你可以开发定制的聚合策略来实现你的bean的逻辑,标记为“用新的文件位置做一些事情”。
触发的基础上,控制总线集成模式:
from(fileLocation) .autoStartup(true) .routeId("file-scanner") .to(newFileLocation) .choice().when(exchangeProperty("CamelBatchComplete")) .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { exchange.getContext().startRoute("file-processor"); } }) .end(); from(newFileLocation). .routeId("file-processor") .autoStartup(false) .bean(*doing some stuff with new file location*) .choice().when(exchangeProperty("CamelBatchComplete")) .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { exchange.getContext().stopRoute("file-processor"); } }) .end();
谢谢!我不知道“CamelBatchComplete”属性。解决了我的整个问题 –