卡桑德拉如何处理阻断datastax Java驱动程序
阻止来自com.datastax.driver.core.Session这种方法卡桑德拉如何处理阻断datastax Java驱动程序
public ResultSet execute(Statement statement);
评论执行fethod执行语句:
此方法一直阻塞至少从 数据库收到了一些结果。但是,对于SELECT查询,它并不保证 结果已被完整接收。但它确实保证从数据库接收到一些 响应,特别是 保证如果请求无效,则通过此方法将抛出异常 。
非阻塞从com.datastax.driver.core.Session
public ResultSetFuture executeAsync(Statement statement);
此方法不会阻止执行fethod。只要查询已经传送到底层网络堆栈,它就会返回 。特别是,从 返回时,此方法不保证查询有效或甚至已将 提交给活动节点。在访问{@link ResultSetFuture}时,任何与查询失败 有关的异常都将被抛出。
我有关于他们的02个问题,因此,如果你能帮助我理解他们,那将是非常好的。
比方说,我有100万条记录,我希望所有这些记录都到达数据库(没有丢失)。
问题1:如果我有线程数为n,所有的线程将有他们需要发送到数据库中记录的相同金额。他们都使用阻止执行调用继续向cassandra发送多个插入查询。如果我增加n的值,它是否也有助于加快我需要将所有记录插入cassandra的时间?
这会导致cassandra性能问题吗? Cassandra是否必须确保对于每个插入记录,群集中的所有节点都应该立即知道新记录?为了保持数据的一致性。 (我假设cassandra节点甚至不会考虑使用本地机器时间来控制记录插入时间)。
问题2:使用非阻塞执行,我如何确保所有的插入操作都成功?我知道的唯一方法是等待ResultSetFuture检查插入查询的执行情况。有没有更好的办法可以做到?非阻塞执行更容易失败,然后阻止执行吗?
非常感谢您的帮助。
如果我有n个线程,所有线程将有相同数量的记录,他们需要发送到数据库。他们都使用阻止执行调用继续向cassandra发送多个插入查询。如果我增加n的值,它是否也有助于加快我需要将所有记录插入cassandra的时间?
在某种程度上。让我们稍微离开客户端实现细节,并从“并发请求数”的角度来看待事物,因为如果使用executeAsync,则不需要为每个正在进行的请求设置线程。在我的测试中,我发现虽然并发请求数量很大,但有一个阈值,即收益递减或性能开始降低。我的一般经验法则是(number of Nodes *
native_transport_max_threads (default: 128)
* 2)
,但您可能会发现更多或更少的更优化的结果。
这里的想法是,在排队更多的请求方面没有什么价值超过cassandra一次可以处理的。在减少进入请求的次数的同时,可以限制驱动程序客户端与cassandra之间的连接不必要的拥塞。
问题2:在非阻塞执行的情况下,如何确保所有插入操作都成功?我知道的唯一方法是等待ResultSetFuture检查插入查询的执行情况。有没有更好的办法可以做到?非阻塞执行更容易失败,然后阻止执行吗?
通过get
等待ResultSetFuture是一个路由,但是如果您正在开发完全异步应用程序,则希望尽可能避免阻塞。使用番石榴,你的两个最好的武器是Futures.addCallback
和Futures.transform
。
Futures.addCallback
允许您注册当驱动器接收到的响应是被执行的FutureCallback
。onSuccess
在成功案例中得到执行,否则onFailure
。Futures.transform
允许您将返回的ResultSetFuture
有效映射到其他内容中。例如,如果您只需要1列的值,则可以使用它将ListenableFuture<ResultSet>
转换为ListenableFuture<String>
,而无需在代码中阻止ResultSetFuture
,然后获取字符串值。
在写的DataLoader程序的情况下,你可以这样做以下:
- 为了简单起见,使用
Semaphore
或一些其他结构具有固定的许可数(将成为您的机上请求的最大数量)。无论何时您使用executeAsync
提交查询,都需要获得许可证。您应该只需要1个线程(但可能需要引入一个#cpu内核大小的池)来从Semaphore获取许可并执行查询。它会阻止收购,直到有一个可用的许可证。 - 使用
Futures.addCallback
为将来从executeAsync
返回。在onSuccess
和onFailure
两种情况下,回调应呼叫Sempahore.release()
。通过释放许可证,这应该允许您的步骤1中的线程继续并提交下一个请求。
为了进一步提高吞吐量,您可能需要考虑使用BatchStatement
并批量提交请求。如果将批次保持较小(50-250是一个好数字),并且批次中的插入共享相同的分区密钥,则这是一个不错的选择。
我没有在'nodes * native_transport_max_threads'位上出售。特别是,推理(没有太多的价值排队更多的请求比卡桑德拉将一次处理)假设旅行时间是即时/微不足道。如果我的客户端和cassandra节点之间的单程时间为100ms,并且服务器可以在2ms内处理请求,那么我希望一次将电线放在〜50。这里的想法是,我现在连线的人会在大约100ms内到达,在那段时间内,服务器可以处理大约50条消息,并且我想让服务器保持繁忙状态,并始终确保它已经工作 –