完成未来其他未来
问题描述:
我有一个外部的未来操作,我可以覆盖它的onOperation完整的方法。我想包装它,并通过关闭Promise完成。但是我无法在未来的未来完成这个未来。例如:完成未来其他未来
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success}
import scala.concurrent.ExecutionContext.Implicits.global
def foo():Future[String] = {
val p = Promise[String]()
channel.addListener(new ChannelFutureListener[IoReadFuture] {
override def operationComplete(future: IoReadFuture): Unit = {
p.success("hello")}
}
p.future
}
val x = foo()
x: scala.concurrent.Future[String] = List()
x.onComplete{
case Success(msg) => println(s"$msg world")
case Failure(e) => println(e.getMessage)
}
res1: Unit =()
是有习惯的方法来做到这一点无阻塞?
答
我想你正在尝试创建一个函数,它需要一个delay
作为输入并返回一个delayed future
。
如果你想这样做,你可以用非睡眠方式使用Await
和Promise
。
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, Promise}
import scala.concurrent.duration._
// returns a future delayed by `delay` seconds
def getDelayedFutureOfStringValue(delay: Int, value: String): Future[String] = {
// we will use this promise for wait only
val waitProxyPromise = Promise[Int]()
val delayedFuture = Await.ready(waitProxyPromise.future, delay.second).map({
case _ => value
})
delayedFuture
}
val helloFuture = getDelayedFutureOfStringValue(2, "hello")
上述amy看起来像一个体面的实施,但它实际上不是。 Await
实际上阻塞了线程。可悲的是......在惯用的Scala中,以一种完全无阻碍的方式获得delayedFutue并不容易。
你可以得到一个不错的无阻塞和非睡眠使用从Java定时器实用延迟未来,
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, Promise}
import java.util.{Timer, TimerTask}
// delay is number of seconds
def getDelayedFutureOfStringValue(delay: Int, value: String): Future[String] = {
val promise = Promise[String]()
val timer = new Timer()
val timerTask = new TimerTask {
override def run(): Unit = promise.success(value)
}
timer.schedule(timerTask, delay * 1000)
promise.future
}
val niceHelloFuture = getDelayedFutureOfStringValue(2, "hello")
而如果你已经拥有你的未来,你想用它来组成一个依赖未来,那么它很容易。
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success}
// I assume you IoReadFuture is either similar to Future or wraps the actual Future
def foo(value: String):Future[String] = {
val p = Promise[String]()
channel.addListener(new ChannelFutureListener[IoReadFuture] {
override def operationComplete(ioFuture: IoReadFuture): Unit = {
ioFuture.future.onComplete(_ => p.success(value))
}
}
p.future
}
答
演员调度scheduleOne
是非阻塞的方式来等待一些代码来执行
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import ExecutionContext.Implicits.global
val p = Promise(doSomething())
val system = akka.actor.ActorSystem("system")
system.scheduler.scheduleOne(deplay seconds)(p.future)
val f = p.future
f.flatMap { println(s"${_}") }
我很抱歉,我不明白的问题 – Jatin
那是因为你是导致当前线程睡眠。 –
@SarveshKumarSingh这是一个错误的陈述。睡眠在“未来{:=>}”区块内。这将在不同的线程上执行 – Jatin