JStorm源代码阅读-bolt/spout的执行
无论是bolt还是spout,线程使用Runnable对象都是AsyncLoopRunnable
public class AsyncLoopRunnable implements Runnable {
private RunnableCallback fn;//RunnableCallback implements Runnable, Callback, Shutdownable,实际执行的executor类。
private RunnableCallback killFn;
public void run() {
if (fn == null) {
LOG.error("fn==null");
throw new RuntimeException("AsyncLoopRunnable no core function ");
}
fn.preRun();
try {
while (!shutdown.get()) {
fn.run();
if (shutdown.get()) {
shutdown();
return;
}
Exception e = fn.error();
if (e != null) {
throw e;
}
Object rtn = fn.getResult();
if (this.needQuit(rtn)) {
shutdown();
return;
}
}
} catch (Throwable e) {
if (shutdown.get()) {
shutdown();
} else {
LOG.error("Async loop died!!!" + e.getMessage(), e);
killFn.execute(e);
}
}
}
bolt和spout的executor全部是继承自BaseExecutors。
Bolt的执行过程
BoltExecutor线程一直在重复下面的循环(在循环内部又套了一层循环,暂时不理解为什么要这么做)
//RefreshConnections.java
@Override
public void run() {
if (!isFinishInit) {
initWrapper();
}
while (!taskStatus.isShutdown()) {
try {
//Asynchronous release the queue, but still is single thread
controlQueue.consumeBatch(this);//控制队列
exeQueue.consumeBatchWhenAvailable(this);//实际消息队列
/* processControlEvent();*/
} catch (Throwable e) {
if (!taskStatus.isShutdown()) {
LOG.error(idStr + " bolt exeutor error", e);
}
}
}
}
//BoltExecutors.java
private void processTupleEvent(Tuple tuple) {
if (tuple.getMessageId() != null && tuple.getMessageId().isAnchored()) {
tuple_start_times.put(tuple, System.currentTimeMillis());
}
try {
if (!isSystemBolt && tuple.getSourceStreamId().equals(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID)) {
TopoMasterCtrlEvent event = (TopoMasterCtrlEvent) tuple.getValue(0);
if (event.isTransactionEvent()) {
bolt.execute(tuple);
} else {
LOG.warn("Received unexpected control event, {}", event);
}
} else if (tuple.getSourceStreamId().equals(Common.TOPOLOGY_MASTER_REGISTER_METRICS_RESP_STREAM_ID)) {
this.metricsReporter.updateMetricMeta((Map<String, Long>) tuple.getValue(0));
} else {
bolt.execute(tuple);
}
} catch (Throwable e) {
error = e;
LOG.error("bolt execute error ", e);
report_error.report(e);//一旦走到这里,该task将退出
}
}
//BasicBoltExecutor.java
public void execute(Tuple input) {
_collector.setContext(input);
try {
_bolt.execute(input, _collector);
_collector.getOutputter().ack(input);
} catch (FailedException e) {//只有抛出FailedException才会走到内部重发逻辑。该task不至于死亡。
if (e instanceof ReportedFailedException) {
_collector.reportError(e);
}
_collector.getOutputter().fail(input);//重新发送到exeQueue
}
}
失败的将交给spout的执行线程(SingleThreadSpoutExecutors或者MultipleThreadSpoutExecutors),最终用spout的fail方法进行处理。SingleThreadSpoutExecutors和Storm的运行机制一样,nexttuple和fail/ack在相同线程进行。JStorm认为(很可能是自作聪明,得罪人请见谅)种模式带来限制了Spout的最大吞吐量。于是乎,开发了MultipleThreadSpoutExecutors,ack/fail和nexttuple在不同线程进行。但是这带来了一个问题。
fail方法如下
@Override
public void fail(Object messageId) {
if (!isAtLeastOnceProcessing()) {//使用至多处理一次模式,直接扔掉
return;
}
// Only need to keep track of failed tuples if commits to Kafka are controlled by
// tuple acks, which happens only for at-least-once processing semantics
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
if (!emitted.contains(msgId)) {//如果已经发送列表不包含该消息,则也丢弃。
LOG.debug("Received fail for tuple this spout is no longer tracking."
+ " Partitions may have been reassigned. Ignoring message [{}]", msgId);
return;
}
//增加消息的失败次数
msgId.incrementNumFails();
//试图向retryService注册该消息进行重发
if (!retryService.schedule(msgId)) {//如果注册失败,则该消息处理结束。
LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId);
// this tuple should be removed from emitted only inside the ack() method. This is to ensure
// that the OffsetManager for that TopicPartition is updated and allows commit progression
tupleListener.onMaxRetryReached(msgId);
ack(msgId);//最后仍然要调用ack
} else {
tupleListener.onRetry(msgId);
emitted.remove(msgId);
}
}
可以预见,nextuple会从retryService拿出需要重发的消息进行重复处理,然而在MultipleThreadSpoutExecutors,ack/fail和nexttuple在不同线程进行,这要求存储重发消息的数据结构必须线程安全。所以鄙人以为,在较高错误率条件下,MultipleThreadSpoutExecutors无法提高吞吐量。
Spout的执行过程
首先看SingleThreadSpoutExecutors
//SingleThreadSpoutExecutors.java
public void run() {
//等待topo启动完成
if (checkTopologyFinishInit == false ) {
initWrapper();
int delayRun = ConfigExtension.getSpoutDelayRunSeconds(storm_conf);
long now = System.currentTimeMillis();
while (!checkTopologyFinishInit){
// wait other bolt is ready, but the spout can handle the received message
executeEvent();
controlQueue.consumeBatch(this);
if (System.currentTimeMillis() - now > delayRun * 1000){
executorStatus.setStatus(TaskStatus.RUN);
this.checkTopologyFinishInit = true;
LOG.info("wait {} timeout, begin operate nextTuple", delayRun);
break;
}
}
//等待本task启动完成
while (true){
JStormUtils.sleepMs(10);
if (taskStatus.isRun()){
this.spout.activate();
break;
}else if (taskStatus.isPause()){
this.spout.deactivate();
break;
}
}
LOG.info(idStr + " is ready, due to the topology finish init. ");
}
executeEvent();//内部其实是exeQueue.consumeBatch(this);,消费事件队列。
controlQueue.consumeBatch(this);//消费控制队列
super.nextTuple();//调用父类的SpoutExecutors的nextTuple(),将直接调用到spout的nextTuple()代码。
}
MultipleThreadSpoutExecutors的主要区别是不再负责消费fail/ack事件,而是专注于调用spout的nexttuptile代码。
public class MultipleThreadSpoutExecutors extends SpoutExecutors {
protected AsyncLoopThread ackerRunnableThread;//每个MultipleThreadSpoutExecutors会额外拥又一个AsyncLoopThread用于ack和fail的执行。
public MultipleThreadSpoutExecutors(Task task) {
super(task);
//acker线程
ackerRunnableThread = new AsyncLoopThread(new AckerRunnable(), false, Thread.NORM_PRIORITY, false);
}
@Override
public void init() throws Exception {
super.init();
ackerRunnableThread.start();//启动内部acker线程
}
@Override
public void run() {
if (checkTopologyFinishInit == false) {
initWrapper();
int delayRun = ConfigExtension.getSpoutDelayRunSeconds(storm_conf);
long now = System.currentTimeMillis();
while (!checkTopologyFinishInit){
// wait other bolt is ready,
JStormUtils.sleepMs(100);
if (System.currentTimeMillis() - now > delayRun * 1000){
executorStatus.setStatus(TaskStatus.RUN);
this.checkTopologyFinishInit = true;
LOG.info("wait {} timeout, begin operate nextTuple", delayRun);
break;
}
}
while (true){
JStormUtils.sleepMs(10);
if (taskStatus.isRun()){
this.spout.activate();
break;
}else if (taskStatus.isPause()){
this.spout.deactivate();
break;
}
}
LOG.info(idStr + " is ready, due to the topology finish init.");
}
super.nextTuple();
}
//acker的内部类
class AckerRunnable extends RunnableCallback
@Override
public void run() {
while (shutdown.get() == false) {
try {
//Asynchronous release the queue, but still is single thread
controlQueue.consumeBatchWhenAvailable(MultipleThreadSpoutExecutors.this);//消费控制队列
exeQueue.consumeBatchWhenAvailable(MultipleThreadSpoutExecutors.this);//消费这行队列
} catch (Exception e) {
if (shutdown.get() == false) {
LOG.error("Actor occur unknow exception ", e);
report_error.report(e);
}
}
}
LOG.info("Successfully shutdown Spout's acker thread " + idStr);
}