【Scala笔记——道】给你的Future一个Promise --最浪漫的并发模型(一)
Future
曾经,她在他的臂弯中,星空下,问他:“会不会一直对我这么好”, “你会不会在五年后来娶我”…
Future 和 Promise
- Future 代表未来。未来总会来到,但这个未来是成功或是失败我们不得而知。抽象为核心三个方法 `onComplete’, ’onSuccess’, ‘onFailure’。其中 onSuccess和 onFailure是 onComplete的简化。"如果我以后不改变,你会不会永远对我这么好?”这种情况相当于Future[B]有一个前置的条件Future[A],这种由Future[A] => Future[B] 的转化可以抽象为 map。
- Promise 代表承诺。承诺意味着需要付诸行动,但行动过程中总会有困难去克服。抽象为核心方法“complete", ”tryComplete"。其中tryComplete是complete具体的过程。
- Impl.Promise 代表具体的一个承诺继承 Future和Promise。一个具体的承诺是对于一种未来期许所制定的约定。
- KeptPromise.Successful 代表不需要完成已经实现的承诺,例如:A => 你会不会现在来陪在我身边 B=> 开门,我已经在你家门外了
- KeptPromise.Failure 代表不需要完成已经不可实现的承诺。例如:A => 你会不会永远对我这么好,无论我犯了什么错? B 这个时候不需要任何承诺,因为这时任何承诺都注定是不可实现的。
- DefaultPromise 大多数情况下需要 付出,努力实现的承诺。其中AtomicReference 接口表明了一个承诺的状态是可变的,但不会混杂多个承诺。
她的未来 他的承诺
你会不会送我一只玩具熊?
…
你要的是这一只吗?
具体实现如下
val f1 = Future.successful{ Her(said = "你会不会送我一只玩具熊?" ) }
f1.map{
case Her(said) => if( said equals "你会不会送我一只玩具熊?") buyToy(Bear)
else if(said equals "你会不会送我一朵玫瑰?") buyFlower(Rose)
...
case Others => None
}
Future.successful这里调用了上文所述KeptPromise.Successful ,这里她并没有对未来的承诺,她说话的时候就是她未来的实现。
Map 具体实现如下,
def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = { // transform(f, identity)
val p = Promise[S]() // new impl.Promise.DefaultPromis[T]() 他的承诺
onComplete { v => p complete (v map f) } //努力去完成
p.future //承诺的结果
}
p complete 这里主要是调用了 DefaultPromise.tryComplete,这里tryCompleteAndGetListeners 中的 getState方法来自于AtomicReference,实际上获取的是DefaultPromise本身的参数化类型T。
class DefualtPromis[T] extends AtomicReverence[AnyRef](Nil) with Promise[T] {
updateState(null, Nil)
...
def tryComplete(value: Try[T]): Boolean = {
val resolved = resolveTry(value)
tryCompleteAndGetListeners(resolved) match {
case null => false
case rs if rs.isEmpty => true
case rs => rs.foreach(r => r.executeWithValue(resolved)); true
}
}
/** Called by `tryComplete` to store the resolved value and get the list of
* listeners, or `null` if it is already completed.
*/
@tailrec
private def tryCompleteAndGetListeners(v: Try[T]): List[CallbackRunnable[T]] = {
getState match {
case raw: List[_] =>
val cur = raw.asInstanceOf[List[CallbackRunnable[T]]]
if (updateState(cur, v)) cur else tryCompleteAndGetListeners(v)
case _: DefaultPromise[_] =>
compressedRoot().tryCompleteAndGetListeners(v)
case _ => null
}
}
abstract class AbstractPromise extends AtomicReference<Object> {
protected final boolean updateState(Object oldState, Object newState) { return compareAndSet(oldState, newState); }
protected final Object getState() { return get(); }
}
这里核心方法是 tryCompleteAndGetListeners, DefaultPromise创建时的updateState置为Nil,
DefaultPromise 有三种状态
- 正在完成,case List[CallbackRunnable] ,Promise正在完成并有0个或多个需要在完成时的回调
- 已经完成, case Try[T],这里在tryCompleteAndGetListeners直接反馈null
- link到另外一个Promise[T],这里compressedRoot实际上是一个递归方法,相当于获取linkedPromise.getState.tryCompleteAndGetListeners
对于 Future.successful{“你会不会送我一只玩具熊?”}.map{…} 例子,中状态变化为
- new DefaultPromise, State = Nil
- Case List[_] => updateState, State = Try 任务完成
再比方说Future.successful{"你会不会一直对我这么好“}.flatMap{m => Future("我一直在你身边“). map{ case "我一直在你身边" => true} }
这里的FlatMap的状态变化为:
- new DefaultPromise. State = Nil
- Case DefaultPromise => GetRooState 这里的State 获取的是Future(“我一直在你身边”).map的State ,递归调用
- Case List[_] => updateState, State = Try 任务完成
生活如流水
再回头来看map中的 onComplete方法。
def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = { // transform(f, identity)
val p = Promise[S]() // new impl.Promise.DefaultPromis[T]() 他的承诺
onComplete { v => p complete (v map f) } //努力去完成
p.future //承诺的结果
}
def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
val preparedEC = executor.prepare()
val runnable = new CallbackRunnable[T](preparedEC, func)
dispatchOrAddCallback(runnable)
}
/** Tries to add the callback, if already completed, it dispatches the callback to be executed.
* Used by `onComplete()` to add callbacks to a promise and by `link()` to transfer callbacks
* to the root promise when linking two promises togehter.
*/
@tailrec
private def dispatchOrAddCallback(runnable: CallbackRunnable[T]): Unit = {
getState match {
case r: Try[_] => runnable.executeWithValue(r.asInstanceOf[Try[T]])
case _: DefaultPromise[_] => compressedRoot().dispatchOrAddCallback(runnable)
case listeners: List[_] => if (updateState(listeners, runnable :: listeners)) () else dispatchOrAddCallback(runnable)
}
}
executor 这里是使用了ForkJoinThreadPool,onComplete方法相当于在线程池中注册了一个任务。dispatchOrAddCallback是对于任务类型的分类。
实际上从实现层面来看
val f1 = Future.successful{ Her(said = "你会不会送我一只玩具熊?" ) }
val f2 = f1.map{
case Her(said) => if( said equals "你会不会送我一只玩具熊?") buyToy(Bear)
else if(said equals "你会不会送我一朵玫瑰?") buyFlower(Rose)
...
case Others => None
}
f2 是她提出未来的时候,她已经在等待着他的承诺,但此时承诺还未完成。
f1.map{ method} 这时候他的承诺的实现取决于线程池什么时候有空闲线程,如果线程池中一直没有空闲,那承诺一直无法完成。
因此,可以总结如下:
- 不要对于未来有太多希望,如果线程池不足,带来的只能是承诺的超时与失败。Future和Promise的吞吐量需要和线程池大小搭配,这里可以参考Little’s law。少即是多
- 尽量不要有复杂的Future,复杂的Future会有更多的状态判断,虽然使用cas算法,但复杂状态判断依然属于高开销。
Promise a future
val time = "late at night“
val location = (30.5509340658,114.2825317383)
val her = Person().setLocation(location)
val he = Person().setLocation(location)
Future.success{ her said "Will we be togather"}
.flatMap{i =>
Future(he said "Will you marry me?)
.map{
case p : Person => if p.said.equals("Will you marry me") && p equals he => her said "Yes. I do"
case _ = flase}
.map {
......
}
}