卡桑德拉如何处理阻断datastax Java驱动程序

问题描述:

阻止来自com.datastax.driver.core.Session这种方法卡桑德拉如何处理阻断datastax Java驱动程序

public ResultSet execute(Statement statement); 

评论执行fethod执行语句:

此方法一直阻塞至少从 数据库收到了一些结果。但是,对于SELECT查询,它并不保证 结果已被完整接收。但它确实保证从数据库接收到一些 响应,特别是 保证如果请求无效,则通过此方法将抛出异常 。

非阻塞从com.datastax.driver.core.Session

public ResultSetFuture executeAsync(Statement statement); 

此方法不会阻止执行fethod。只要查询已经传送到底层网络堆栈,它就会返回 。特别是,从 返回时,此方法不保证查询有效或甚至已将 提交给活动节点。在访问{@link ResultSetFuture}时,任何与查询失败 有关的异常都将被抛出。

我有关于他们的02个问题,因此,如果你能帮助我理解他们,那将是非常好的。

比方说,我有100万条记录,我希望所有这些记录都到达数据库(没有丢失)。

问题1:如果我有线程数为n,所有的线程将有他们需要发送到数据库中记录的相同金额。他们都使用阻止执行调用继续向cassandra发送多个插入查询。如果我增加n的值,它是否也有助于加快我需要将所有记录插入cassandra的时间?

这会导致cassandra性能问题吗? Cassandra是否必须确保对于每个插入记录,群集中的所有节点都应该立即知道新记录?为了保持数据的一致性。 (我假设cassandra节点甚至不会考虑使用本地机器时间来控制记录插入时间)。

问题2:使用非阻塞执行,我如何确保所有的插入操作都成功?我知道的唯一方法是等待ResultSetFuture检查插入查询的执行情况。有没有更好的办法可以做到?非阻塞执行更容易失败,然后阻止执行吗?

非常感谢您的帮助。

如果我有n个线程,所有线程将有相同数量的记录,他们需要发送到数据库。他们都使用阻止执行调用继续向cassandra发送多个插入查询。如果我增加n的值,它是否也有助于加快我需要将所有记录插入cassandra的时间?

在某种程度上。让我们稍微离开客户端实现细节,并从“并发请求数”的角度来看待事物,因为如果使用executeAsync,则不需要为每个正在进行的请求设置线程。在我的测试中,我发现虽然并发请求数量很大,但有一个阈值,即收益递减或性能开始降低。我的一般经验法则是(number of Nodes *native_transport_max_threads (default: 128)* 2),但您可能会发现更多或更少的更优化的结果。

这里的想法是,在排队更多的请求方面没有什么价值超过cassandra一次可以处理的。在减少进入请求的次数的同时,可以限制驱动程序客户端与cassandra之间的连接不必要的拥塞。

问题2:在非阻塞执行的情况下,如何确保所有插入操作都成功?我知道的唯一方法是等待ResultSetFuture检查插入查询的执行情况。有没有更好的办法可以做到?非阻塞执行更容易失败,然后阻止执行吗?

通过get等待ResultSetFuture是一个路由,但是如果您正在开发完全异步应用程序,则希望尽可能避免阻塞。使用番石榴,你的两个最好的武器是Futures.addCallbackFutures.transform

  • Futures.addCallback允许您注册当驱动器接收到的响应是被执行的FutureCallbackonSuccess在成功案例中得到执行,否则onFailure

  • Futures.transform允许您将返回的ResultSetFuture有效映射到其他内容中。例如,如果您只需要1列的值,则可以使用它将ListenableFuture<ResultSet>转换为ListenableFuture<String>,而无需在代码中阻止ResultSetFuture,然后获取字符串值。

在写的DataLoader程序的情况下,你可以这样做以下:

  1. 为了简单起见,使用Semaphore或一些其他结构具有固定的许可数(将成为您的机上请求的最大数量)。无论何时您使用executeAsync提交查询,都需要获得许可证。您应该只需要1个线程(但可能需要引入一个#cpu内核大小的池)来从Semaphore获取许可并执行查询。它会阻止收购,直到有一个可用的许可证。
  2. 使用Futures.addCallback为将来从executeAsync返回。在onSuccessonFailure两种情况下,回调应呼叫Sempahore.release()。通过释放许可证,这应该允许您的步骤1中的线程继续并提交下一个请求。

为了进一步提高吞吐量,您可能需要考虑使用BatchStatement并批量提交请求。如果将批次保持较小(50-250是一个好数字),并且批次中的插入共享相同的分区密钥,则这是一个不错的选择。

+0

我没有在'nodes * native_transport_max_threads'位上出售。特别是,推理(没有太多的价值排队更多的请求比卡桑德拉将一次处理)假设旅行时间是即时/微不足道。如果我的客户端和cassandra节点之间的单程时间为100ms,并且服务器可以在2ms内处理请求,那么我希望一次将电线放在〜50。这里的想法是,我现在连线的人会在大约100ms内到达,在那段时间内,服务器可以处理大约50条消息,并且我想让服务器保持繁忙状态,并始终确保它已经工作 –