使用slick的3.0.0流结果和Postgresql的正确方法是什么?
问题描述:
我想弄清楚如何使用流畅的流。我使用slick 3.0.0和postgres驱动程序使用slick的3.0.0流结果和Postgresql的正确方法是什么?
情况如下:服务器必须将客户端的数据序列拆分成受大小(字节)限制的块。所以,我写以下光滑查询:
val sequences = TableQuery[Sequences]
def find(userId: Long, timestamp: Long) = sequences.filter(s ⇒ s.userId === userId && s.timestamp > timestamp).sortBy(_.timestamp.asc).result
val seq = db.stream(find(0L, 0L))
我结合阿卡流Source
SEQ写道定制PushPullStage
,该数据的限制大小(以字节为单位),并完成上游当它达到大小限制。它工作得很好。问题是 - 当我考虑Postgres的日志,我看到这样的 select * from sequences where user_id = 0 and timestamp > 0 order by timestamp;
查询所以,乍一看这似乎是很多(和不必要的)数据库查询回事,只对每个查询使用几个字节。使用Slick进行流式传输的正确方法是什么?以便最大限度地减少数据库查询并充分利用每个查询中传输的数据?
答
“正确的方法” 做油滑和Postgres流包括三个方面:
必须使用db.stream()
在JDBC驱动程序必须禁用
autoCommit
。一种方法是通过后缀.transactionally
使查询在事务中运行。必须将
fetchSize
设置为0以外的值,否则postgres会一次性将整个结果集推送到客户端。
例:
DB.stream(
find(0L, 0L)
.transactionally
.withStatementParameters(fetchSize = 1000)
).foreach(println)
相关链接:
感谢您的回答,非常有帮助。我对“附录”感到困惑:不是处理数据库在AsyncExecutor中单独维护的执行上下文吗? – JimN
很高兴听到!关于增编:是的,你是对的。它应该是默认值。我删除了这个附录,因为它会让它更加混淆,事后我认为它确实与背压力学有关。我的消费者比网络快得多,因此尽快处理未来的结果是一个更合适的解决方案。 – Rikard
是否有与MySQL的情况等价的? – matanster