使用slick的3.0.0流结果和Postgresql的正确方法是什么?

问题描述:

我想弄清楚如何使用流畅的流。我使用slick 3.0.0和postgres驱动程序使用slick的3.0.0流结果和Postgresql的正确方法是什么?

情况如下:服务器必须将客户端的数据序列拆分成受大小(字节)限制的块。所以,我写以下光滑查询:

val sequences = TableQuery[Sequences] 
def find(userId: Long, timestamp: Long) = sequences.filter(s ⇒ s.userId === userId && s.timestamp > timestamp).sortBy(_.timestamp.asc).result 
val seq = db.stream(find(0L, 0L)) 

我结合阿卡流Source SEQ写道定制PushPullStage,该数据的限制大小(以字节为单位),并完成上游当它达到大小限制。它工作得很好。问题是 - 当我考虑Postgres的日志,我看到这样的 select * from sequences where user_id = 0 and timestamp > 0 order by timestamp;

查询所以,乍一看这似乎是很多(和不必要的)数据库查询回事,只对每个查询使用几个字节。使用Slick进行流式传输的正确方法是什么?以便最大限度地减少数据库查询并充分利用每个查询中传输的数据?

“正确的方法” 做油滑和Postgres流包括三个方面:

  1. 必须使用db.stream()

  2. 在JDBC驱动程序必须禁用autoCommit。一种方法是通过后缀.transactionally使查询在事务中运行。

  3. 必须将fetchSize设置为0以外的值,否则postgres会一次性将整个结果集推送到客户端。

例:

DB.stream(
    find(0L, 0L) 
    .transactionally 
    .withStatementParameters(fetchSize = 1000) 
).foreach(println) 

相关链接:

https://github.com/slick/slick/issues/1038

https://github.com/slick/slick/issues/809

+0

感谢您的回答,非常有帮助。我对“附录”感到困惑:不是处理数据库在AsyncExecutor中单独维护的执行上下文吗? – JimN

+0

很高兴听到!关于增编:是的,你是对的。它应该是默认值。我删除了这个附录,因为它会让它更加混淆,事后我认为它确实与背压力学有关。我的消费者比网络快得多,因此尽快处理未来的结果是一个更合适的解决方案。 – Rikard

+0

是否有与MySQL的情况等价的? – matanster