排队机制和Elasticsearch 1.4.0

问题描述:

我有一个RabbitMQ代理,在这个代理上发布不同的消息,这些消息最终将作为Elasticsearch中的文档。代理中有多个消费者,这些消费者实际上是分配给amqp入站网关的任务执行程序中的不同线程(在此使用spring集成和spring amqp)。排队机制和Elasticsearch 1.4.0

认为在以下情形:我已经创建了一个ES与文档结构

{ 
    "field1" : "value1", 
    "field2" : "value2" 
} 

后来我送两个更新请求,更新都同场,让我们说field1。如果我一个接一个地发送这个消息(生产中的常见用例),我的消费者线程将以正确的顺序获取消息(amqp允许这样做),但是处理可能发生在错误的顺序中,并且以后更新的值可能是被第一个覆盖。我最终会得到数据。

如何确保我的数据不会被损坏? =>拥有1个消费者线程是不够的,因为如果我想通过添加更多机器与我的消费应用程序来扩展,我仍然会有多个消费者。我可能需要排序消息,但有多台机器可能需要创建某种群集感知组件,我使用SI,所以这在我看来很难实现。

在1.2之前的ES版本中,我们使用了外部版本,比如时间戳,在我的场景中ES会抛出VersionConflictException:第一次更新的版本应该是10000版本,第二次10001版本,如果第一版本会有ES会拒绝10000版本的请求,因为它低于现有版本。但从最新版本,ES球员have removed this functionality进行更新操作。

一个解决方案可能是使用多个队列并在每个队列上有一个使用者;使用散列函数总是将更新路由到同一文档到同一队列,请参阅RabbitMQ Tutorials了解各种选项。

您可以通过添加更多队列(并更改散列函数)来扩展。

对于弹性,请考虑在Spring XD中运行您的消费者。您可以为每个兔子源(每个队列)创建一个实例,如果发生故障,XD将负责将其故障转移到另一个容器节点。

否则,你可以通过具有热备份滚你自己 - 与auto-startup="false"配置的入站适配器,并有一些显示器和使用<control-bus/>启动一个新的实例,如果处于活动状态下降。

编辑:

响应于下面的第四评论。

正如我上面所说,要扩展,你将不得不改变散列函数。因此,在运行时自动添加消费者将会非常棘手。

您不必在jar中对队列名称进行硬编码,您可以使用属性占位符并从属性,系统属性或环境变量中填充它。

该解决方案是最简单的,但有这些限制。但是,您可以构建一个管理应用程序,可以将其扩展出来 - 停止生产者,等待所有队列停顿,重新配置使用者并重新启动生产者--Spring Integration提供了一个<control-bus/>来启动/停止适配器;你也可以通过JMX来完成。

其他解决方案是可能的,但通常需要维护群集间的一些共享状态(可能使用zookeeper等),所以更复杂;而你仍然必须处理竞争条件(第二次更新可能会在第一次之前到达某些消费者)。

+0

如何使用spring-amqp注入这个散列函数?你能给我一个简单的例子吗? – 2014-11-26 11:03:36

+0

只需以某种方式从文档中计算散列,例如'customerNumber%3'(如果有3个队列),并使用它在'rabbitTemplate.send ...(...)'方法中构建'routingKey'。 – 2014-11-26 14:22:24

+0

比方说,我发布到3个不同的队列,并为每个队列注册1个消费者。如果我有3台机器来部署我的应用程序,我如何确保只有一个线程会从队列中获取消息? – 2014-11-27 07:35:43

您可以使用缺省机制进行一致性检查。基本上你想验证你有最新版本的任何你正在更新。

因此,您需要使用对象获取_version。在查询中,您可以通过在顶层设置version = true来完成此操作。这将导致_version与您的查询结果一起返回。然后,在进行更新时,只需将url中的版本参数设置为您拥有的值,并且如果不匹配,则会生成版本冲突。

更好的是使用闭包来处理更新。基本上,它的工作原理如下:有一个更新方法,通过id获取对象,应用封装(更新函数的参数)封装您想要做的修改,然后存储修改后的对象。如果您捕获仍然可能发生的版本冲突,则可以再次获取该对象并将该闭包重新应用于该对象。我们这样做,并在重试之前添加了随机睡眠,这大大减少了多次更新失败的机会,并且是一个不错的设计模式。保持读取和写入在一起可以最大限度地减少冲突的可能性,然后在进一步最小化之前重新尝试睡眠。您可以添加多个重试以进一步降低风险。

+0

这意味着每一次更新操作的调用。如果在同一个文件上有并发更新,发生间隔为ms怎么办?这不会导致数据损坏吗?另一个问题:你在同一领域有2个更新请求。你会希望第二个是最新的应用。有了这个重试,你的第一次更新最后会不会有变化? – 2014-12-01 08:53:04

+0

如果您正在进行并发更新,则无论如何都需要确保您拥有最新版本,然后才能覆盖所有内容。最好的方法是在PUT之前做一个GET。在我们的情况下,如果我们不这样做,我们有很多版本在负载下发生冲突。确实有机会以不同于预期的顺序应用并发更新。所有这种模式的保证是1)你正在更新es中的版本。2)没有其他更新同时被写入。这是乐观锁定的一种形式。您可以在关闭过程中进行额外的检查。 – 2014-12-01 13:35:48