等待所有文件在触发下一个路由之前被消耗

问题描述:

我有路由消耗文件位置(递归)和路由处理文件在另一个文件位置。有时第一条路线可能会找到多个文件。等待所有文件在触发下一个路由之前被消耗

from(fileLocation) 
.autoStartup(true) 
.routeId("file-scanner") 
.to(newFileLocation) 
.to("direct:processFile") 

from("direct:processFile") 
.routeId("file-processor") 
.bean(*doing some stuff with new file location*) 

那么最终既然如此,有时是file-scanner拷贝一个文件,file-processor过程中的文件,然后复制file-scanner多了一个文件,然后再次运行。

我基本上想要的是file-scanner复制file-processor开始处理文件之前的所有文件。这样,我只能运行一次处理。

fileLocation我从与配置看起来像这样定义的消耗:

recursive=true&noop=true&idempotent=false&delay=3600000&include=.*.json&autoCreate=false&startingDirectoryMustExist=true&readLock=none 

所有决定围绕着一批消费者交换性能。 我想,你可以实现两个非常不同的解决方案:

  • 解决方案的基础上,汇聚集成模式。您需要将批次中的所有文件汇总到可能的字符串列表中,因为您的文件包含JSON。聚合器选项“completionFromBatchConsumer”可以帮助您聚合该给定轮询中从File端点消耗的所有文件。聚合后,您可以一起处理聚合文件。也许你可以开发定制的聚合策略来实现你的bean的逻辑,标记为“用新的文件位置做一些事情”。

  • 触发的基础上,控制总线集成模式:

 

    from(fileLocation) 
    .autoStartup(true) 
    .routeId("file-scanner") 
    .to(newFileLocation) 
    .choice().when(exchangeProperty("CamelBatchComplete")) 
    .process(new Processor() { 
     @Override 
     public void process(Exchange exchange) throws Exception { 
      exchange.getContext().startRoute("file-processor"); 
     } 
    }) 
    .end(); 

    from(newFileLocation). 
    .routeId("file-processor") 
    .autoStartup(false) 
    .bean(*doing some stuff with new file location*) 
    .choice().when(exchangeProperty("CamelBatchComplete")) 
    .process(new Processor() { 
     @Override 
     public void process(Exchange exchange) throws Exception { 
      exchange.getContext().stopRoute("file-processor"); 
     } 
    }) 
    .end(); 

+0

谢谢!我不知道“CamelBatchComplete”属性。解决了我的整个问题 –