完成未来其他未来

问题描述:

我有一个外部的未来操作,我可以覆盖它的onOperation完整的方法。我想包装它,并通过关闭Promise完成。但是我无法在未来的未来完成这个未来。例如:完成未来其他未来

import scala.concurrent.{Future, Promise} 
import scala.util.{Failure, Success} 
import scala.concurrent.ExecutionContext.Implicits.global 
def foo():Future[String] = { 
    val p = Promise[String]() 
    channel.addListener(new ChannelFutureListener[IoReadFuture] { 
    override def operationComplete(future: IoReadFuture): Unit = { 
    p.success("hello")} 
    } 
    p.future 
} 

val x = foo() 
x: scala.concurrent.Future[String] = List() 
x.onComplete{ 
    case Success(msg) => println(s"$msg world") 
    case Failure(e) => println(e.getMessage) 
} 
res1: Unit =() 

是有习惯的方法来做到这一点无阻塞

+3

我很抱歉,我不明白的问题 – Jatin

+0

那是因为你是导致当前线程睡眠。 –

+0

@SarveshKumarSingh这是一个错误的陈述。睡眠在“未来{:=>}”区块内。这将在不同的线程上执行 – Jatin

我想你正在尝试创建一个函数,它需要一个delay作为输入并返回一个delayed future

如果你想这样做,你可以用非睡眠方式使用AwaitPromise

import scala.concurrent.ExecutionContext.Implicits.global 
import scala.concurrent.{Future, Promise} 
import scala.concurrent.duration._ 


// returns a future delayed by `delay` seconds 
def getDelayedFutureOfStringValue(delay: Int, value: String): Future[String] = { 
    // we will use this promise for wait only 
    val waitProxyPromise = Promise[Int]() 
    val delayedFuture = Await.ready(waitProxyPromise.future, delay.second).map({ 
    case _ => value 
    }) 
    delayedFuture 
} 

val helloFuture = getDelayedFutureOfStringValue(2, "hello") 

上述amy看起来像一个体面的实施,但它实际上不是。 Await实际上阻塞了线程。可悲的是......在惯用的Scala中,以一种完全无阻碍的方式获得delayedFutue并不容易。

你可以得到一个不错的无阻塞和非睡眠使用从Java定时器实用延迟未来,

import scala.concurrent.ExecutionContext.Implicits.global 
import scala.concurrent.{Future, Promise} 
import java.util.{Timer, TimerTask} 

// delay is number of seconds 
def getDelayedFutureOfStringValue(delay: Int, value: String): Future[String] = { 
    val promise = Promise[String]() 
    val timer = new Timer() 
    val timerTask = new TimerTask { 
    override def run(): Unit = promise.success(value) 
    } 
    timer.schedule(timerTask, delay * 1000) 
    promise.future 
} 

val niceHelloFuture = getDelayedFutureOfStringValue(2, "hello") 

而如果你已经拥有你的未来,你想用它来组成一个依赖未来,那么它很容易。

import scala.concurrent.ExecutionContext.Implicits.global 
import scala.concurrent.{Future, Promise} 
import scala.util.{Failure, Success} 

// I assume you IoReadFuture is either similar to Future or wraps the actual Future 

def foo(value: String):Future[String] = { 
    val p = Promise[String]() 
    channel.addListener(new ChannelFutureListener[IoReadFuture] { 
    override def operationComplete(ioFuture: IoReadFuture): Unit = { 
     ioFuture.future.onComplete(_ => p.success(value)) 
    } 
    } 
    p.future 
} 

演员调度scheduleOne是非阻塞的方式来等待一些代码来执行

import scala.concurrent.duration._ 
import scala.concurrent.ExecutionContext 
import ExecutionContext.Implicits.global 

val p = Promise(doSomething()) 
val system = akka.actor.ActorSystem("system") 
system.scheduler.scheduleOne(deplay seconds)(p.future) 
val f = p.future 
f.flatMap { println(s"${_}") }