Deferred类代码简单示例
openTSDB源码详解之Deferred类代码简单示例
1.示例1
1.1 代码
/**
* simplest with only 1 defer
* 最简单的,仅仅只有1个defer
*/
public static void test1() {
try {
//Deferred deferred = new Deferred(); -> 不传参数,直接new也是可以的
//01.其泛型是String,表示返回值的类型是String型
// 这个Constructor. 会将state设置成 PENDING;
Deferred<String> dfd = new Deferred<String>();
//02.定义一个Callback,其返回值类型是String,输入类型参数是String
Callback<String, String> cb = new MyCallback();
//为这个Deferred对象dfd添加一个Callback对象cb
dfd.addCallback(cb);
System.out.println("test");
Thread.sleep(4000);
//callback: Starts running the callback chain. 开始运行callback链
dfd.callback("callback data after 4 seconds aaa");
} catch (Exception e) {}
}
1.2 运行过程详解
/** Constructor. */
public Deferred() {
state = PENDING;
}
其中state属性值设置如下:
/**
* The current state of this Deferred.
当前Deferred对象的状态
* All state transitions must be done atomically. Since this attribute is
* volatile, a single assignment is atomic. But if you need to do a state
* transition with a test first ("if state is A, move to state B") then you
* must use {@link #casState} to do an atomic CAS (Compare And Swap).
所有的状态转变必须原子性的设置。因为这个属性是volatile,一个赋值操作是原子性的。
但是如果你需要做一个状态转换(使用一个测试语句(如果状态是A,那么将状态设置成B)那么你必须使用casState
去做一个CAS(compare an swap)
01.主义者这个state是使用volatile修饰
*/
private volatile int state;
构造Deferred对象之后,得到dfd
对象的属性值如下:
Deferred@578866604(state=PENDING, result=null,
callback=SimpleDeferExample.MyCallback@1517365b, errback=passthrough)
debug时得到的对象属性如下:new
一个 MyCallback()
对象,并且是cb指向这个对象。debug得到的值如下:
接着将这个引用cb作为一个Callback添加到dfd中。代码如下:
/**
* Registers a callback.
注意一个callback
* <p>
* If the deferred result is already available and isn't an exception, the
* callback is executed immediately from this thread.
* If the deferred result is already available and is an exception, the
* callback is discarded.
如果延迟的结果已经可用,并且并非是一个异常,则调用链会被当前线程立即执行。
如果延迟的结果已经可用,但是是一个异常,则callback将会被抛弃。
* If the deferred result is not available, this callback is queued and will
* be invoked from whichever thread gives this deferred its initial result
* by calling {@link #callback}.
如果延迟的结果不可用,这个callback将会进入队列并且将被给这个deferred对象一个初始值
的线程( 这个线程通过调用callback方法给deferred对象赋初始值 )调用。
* @param cb The callback to register.
* @return {@code this} with an "updated" type.
*/
public <R> Deferred<R> addCallback(final Callback<R, T> cb) {
return addCallbacks(cb, Callback.PASSTHROUGH);
}
其中的这个 PASSTHROUGH
是一个静态final 方法,如下:
/** The identity function (returns its argument). */
public static final Callback<Object, Object> PASSTHROUGH =
new Callback<Object, Object>() {
public Object call(final Object arg) {
return arg;
}
public String toString() {
return "passthrough";
}
};
这个 PASSTHROUGH
其实是用于生成一个 errorback
(其功能仅此而已)。
但是该addCallback(…)会调用如下的方法:
public <R, R2, E> Deferred<R> addCallbacks(final Callback<R, T> cb,
final Callback<R2, E> eb) {...}
public <R, R2, E> Deferred<R> addCallbacks(final Callback<R, T> cb,
final Callback<R2, E> eb) {
if (cb == null) {
throw new NullPointerException("null callback");
} else if (eb == null) {
throw new NullPointerException("null errback");
}
// We need to synchronize on `this' first before the CAS, to prevent
// runCallbacks from switching our state from RUNNING to DONE right
// before we add another callback.
synchronized (this) {
// If we're DONE, switch to RUNNING atomically.
//刚开始的时候,这个state != DONE,state的值为PENDING
if (state == DONE) {
// This "check-then-act" sequence is safe as this is the only code
// path that transitions from DONE to RUNNING and it's synchronized.
state = RUNNING;
} else {
// We get here if weren't DONE (most common code path)
// -or-
// if we were DONE and another thread raced with us to change the
// state and we lost the race (uncommon).
if (callbacks == null) {
//进入到下面这个初始化callbacks中INIT_CALLBACK_CHAIN_SIZE 值为4
callbacks = new Callback[INIT_CALLBACK_CHAIN_SIZE];
}
// Do we need to grow the array?
else if (last_callback == callbacks.length) {
final int oldlen = callbacks.length;
if (oldlen == MAX_CALLBACK_CHAIN_LENGTH * 2) {
throw new CallbackOverflowError("Too many callbacks in " + this
+ " (size=" + (oldlen / 2) + ") when attempting to add cb="
+ cb + '@' + cb.hashCode() + ", eb=" + eb + '@' + eb.hashCode());
}
final int len = Math.min(oldlen * 2, MAX_CALLBACK_CHAIN_LENGTH * 2);
final Callback[] newcbs = new Callback[len];
System.arraycopy(callbacks, next_callback, // Outstanding callbacks.
newcbs, 0, // Move them to the beginning.
last_callback - next_callback); // Number of items.
last_callback -= next_callback;
next_callback = 0;
callbacks = newcbs;
}
//往callbacks数组中增加两个callback
callbacks[last_callback++] = cb;
callbacks[last_callback++] = eb;
return (Deferred<R>) ((Deferred) this);
}
} // end of synchronized block
if (!doCall(result instanceof Exception ? eb : cb)) {
// While we were executing the callback, another thread could have
// added more callbacks. If doCall returned true, it means we're
// PAUSED, so we won't reach this point, because the Deferred we're
// waiting on will call us back later. But if we're still in state
// RUNNING, we'll get to here, and we must check to see if any new
// callbacks were added while we were executing doCall, because if
// there are, we must execute them immediately, because no one else
// is going to execute them for us otherwise.
boolean more;
synchronized (this) {
more = callbacks != null && next_callback != last_callback;
}
if (more) {
runCallbacks(); // Will put us back either in DONE or in PAUSED.
} else {
state = DONE;
}
}
return (Deferred<R>) ((Object) this);
}
last_callback
的定义过程如下:
/**
* Index in {@link #callbacks} past the last callback to invoke.
上一次被调用的callback在callbacks中的索引
* Invariants:
* - When entering DONE, this value is reset to 0. 当进入了DONE状态,这个值将会被重置为0
* - All the callbacks at and after this index are null. 在这个索引处和之后的callbacks均为null
* - This value might be equal to {@code callbacks.length}. 这个值可能和callbacks.length相等
* - All accesses to this value must be done while synchronizing on `this'. 对这个值的所有访问必须在synchronized(this)中
*/
private short last_callback;
该方法不仅仅注册一个cb,同时也注册一个eb。同时生成一个 callbacks
。
最后开始调用callback(final Object initresult)
方法。如下:
/**
* Starts running the callback chain.
开始运行callback链
* <p>
* This posts the initial result that will be passed to the first callback
* in the callback chain. If the argument is an {@link Exception} then
* the "errback" chain will be triggered instead.
这发布初始结果,这个初始结果将会被传递给回调链中的第一个回调函数。如果参数
是一个Exception,那么errback链将会触发。
* <p>
* This method will not let any {@link Exception} thrown by a callback
* propagate. You shouldn't try to catch any {@link RuntimeException} when
* you call this method, as this is unnecessary.
这个方法不会让任何Exception抛出 被一个callback传播。你不应该尝试捕获RuntimeException
当你调用这个方法时,因为这个是不必要的。
* @param initresult The initial result with which to start the 1st callback.
* The following must be true:
* {@code initresult instanceof T || initresult instanceof }{@link Exception}
开始第一个callback的初始结果。
下列将会为true:
initresult instanceof T || initresult instanceof Exception
* @throws AssertionError if this method was already called on this instance.
* @throws AssertionError if {@code initresult == this}.
*/
public void callback(final Object initresult) {
if (!casState(PENDING, RUNNING)) {
throw new AssertionError("This Deferred was already called!"
+ " New result=" + initresult + ", this=" + this);
}
result = initresult;
if (initresult instanceof Deferred) {
// Twisted doesn't allow a callback chain to start with another Deferred
// but I don't see any reason. Maybe it was to help prevent people from
// creating recursive callback chains that would never terminate? We
// already check for the obvious in handleContinuation by preventing
// this Deferred from depending on itself, but there's no way to prevent
// people from building mutually dependant Deferreds or complex cyclic
// chains of Deferreds, unless we keep track in a set of all the
// Deferreds we go through while executing a callback chain, which seems
// like an unnecessary complication for uncommon cases (bad code). Plus,
// when that actually happens and people write buggy code that creates
// cyclic chains, they will quickly get a CallbackOverflowError.
final Deferred d = (Deferred) initresult;
if (this == d) {
throw new AssertionError("A Deferred cannot be given to itself"
+ " as a result. this=" + this);
}
handleContinuation(d, null);
}
runCallbacks();
}
其中的 `casState(…)如下:
/**
* Atomically compares and swaps the state of this Deferred.
原子性的比较并且交换这个Deferred对象的状态
* @param cmp The expected state to compare against.
* @param val The new state to transition to if the comparison is successful. 如果比较成功,则过渡到新的状态
* @return {@code true} if the CAS succeeded, {@code false} otherwise.如果CAS成功,则返回true,反之,返回false。
*/
private boolean casState(final int cmp, final int val) {
return stateUpdater.compareAndSet(this, cmp, val);
}
其中PENDING = 0,RUNNING =1
其中会进入方法 runCallbacks()
中,如下:
/**
* Executes all the callbacks in the current chain.
执行当前链条中的所有的callback
*/
private void runCallbacks() {
while (true) {//注意这里是一个死循环,只有在if()条件中才可以通过break退出
//定义一个正常态的Callback
Callback cb = null;
//接着定义一个错误态的Callback
Callback eb = null;
//使用同步方法锁住当前对象
synchronized (this) {
// Read those into local variables so we can call doCall and invoke the
// callbacks without holding the lock on `this', which would cause a
// deadlock if we try to addCallbacks to `this' while a callback is
// running.
if (callbacks != null && next_callback != last_callback) {
cb = callbacks[next_callback++];
eb = callbacks[next_callback++];
}
// Also, we may need to atomically change the state to DONE.
// Otherwise if another thread is blocked in addCallbacks right before
// we're done processing the last element, we'd enter state DONE and
// leave this method, and then addCallbacks would add callbacks that
// would never get called.
else {
state = DONE;
callbacks = null;
next_callback = last_callback = 0;
break;
}
}
//final long start = System.nanoTime();
//LOG.debug("START >>>>>>>>>>>>>>>>>> doCall(" + cb + ", " + eb + ')');
//如果result是Exception类型,则调用eb,/否则调用cb
if (doCall(result instanceof Exception ? eb : cb)) {
//LOG.debug("PAUSE ================== doCall(" + cb + ", " + eb
// + ") in " + (System.nanoTime() - start) / 1000 + "us");
break;
}
//LOG.debug("DONE <<<<<<<<<<<<<<<<<< doCall(" + cb + ", " + eb
// + "), result=" + result
// + " in " + (System.nanoTime() - start) / 1000 + "us");
}
}
/**
* Executes a single callback, handling continuation if it returns a Deferred.
执行单个callback,如果它返回一个Deferred对象,则继续处理。
* @param cb The callback to execute. 需要执行的callback
* @return {@code true} if the callback returned a Deferred and we switched to
* PAUSED, {@code false} otherwise and we didn't change state.
如果callback返回一个Deferred 并且我们转换到PAUSED,则返回true;如果我们不能切换状态,则为false
*/
@SuppressWarnings("unchecked")
private boolean doCall(final Callback cb) {
try {
//LOG.debug("doCall(" + cb + '@' + cb.hashCode() + ')' + super.hashCode());
result = cb.call(result);
} catch (Exception e) {
result = e;
}
if (result instanceof Deferred) {
handleContinuation((Deferred) result, cb);
return true;
}
return false;
}
而这个 call()
方法则会调用其实现类中的方法,在本例中,则是使用MyCallback类中的 call(String arg)
方法
public String call(String arg) throws Exception {
System.out.println("mycb is called with: " + arg);
return arg;
}
如果返回的result仍为Deferred类型,那么则进入if分支,接着调用 handleContinuation((Deferred) result,cb);
否则直接返回false。
返回false之后,会使用if(doCall(result instanceof Exception? eb:cb)){break}
不会进入这个if代码块,所以直接正常退出了。