skynet 简单的消息执行流程情景分析
skynet.lua对比以前优化了一些函数,尤其是对协程的控制,使得消息的处理流程更加清晰。我们现在来一步步剖析这个消息执行流程,加深对skynet reactor模式的理解以及协程的应用。
首先看服务的第一条消息是怎么产生,又是如何被处理的。在创建一个snlua服务后第一条消息靠什么来驱动呢?答案是靠自己(第一步还是得靠自己,然后别人才有机会接触你),看看下面的代码可以清楚的看到:
int snlua_init(struct snlua *l, struct skynet_context *ctx, const char * args) {
int sz = strlen(args);
char * tmp = skynet_malloc(sz);
memcpy(tmp, args, sz);
skynet_callback(ctx, l , launch_cb);
const char * self = skynet_command(ctx, "REG", NULL);
uint32_t handle_id = strtoul(self+1, NULL, 16);
// it must be first message
skynet_send(ctx, 0, handle_id, PTYPE_TAG_DONTCOPY,0, tmp, sz);
return 0;
}
static int launch_cb(struct skynet_context * context, void *ud, int type, int session, uint32_t source , const void * msg, size_t sz) {
assert(type == 0 && session == 0);
struct snlua *l = ud;
skynet_callback(context, NULL, NULL);
int err = init_cb(l, context, msg, sz);
if (err) {
skynet_command(context, "EXIT", NULL);
}
return 0;
}
从上面看到执行第一条的回调函数launch_cb会调用init_cb,就是在这个函数里开始加载解析lua源文件并执行。
一般lua源文件的主函数为skynet.start(),他的内部实现为:
function skynet.start(start_func)
c.callback(skynet.dispatch_message)
init_thread = skynet.timeout(0, function()
skynet.init_service(start_func)
init_thread = nil
end)
end
function skynet.timeout(ti, func)
local session = c.intcommand("TIMEOUT",ti)
assert(session)
local co = co_create(func)
assert(session_id_coroutine[session] == nil)
session_id_coroutine[session] = co
return co -- for debug
end
可以看到实际上这个函数也是由一条消息驱动的,原因这篇文章skynet答疑一 --skynet.start参数为什么要在定时器中执行已经讲过,这里就不再多说。而且这个函数里重新设置了回调函数,回调函数第一次被调用是由于timeout超时消息。我们看看这条消息的类型:
int skynet_timeout(uint32_t handle, int time, int session) {
if (time <= 0) {
struct skynet_message message;
message.source = 0;
message.session = session;
message.data = NULL;
message.sz = (size_t)PTYPE_RESPONSE << MESSAGE_TYPE_SHIFT;
if (skynet_context_push(handle, &message)) {
return -1;
}
} else {
struct timer_event event;
event.handle = handle;
event.session = session;
timer_add(TI, &event, sizeof(event), time);
}
return session;
}
可以看到是PTYPE_RESPONSE(1)类型的。这个消息被调用时又该如何执行呢?我们还是看看相关代码吧:
local function raw_dispatch_message(prototype, msg, sz, session, source)
-- skynet.PTYPE_RESPONSE = 1, read skynet.h
if prototype == 1 then
local co = session_id_coroutine[session]
if co == "BREAK" then
session_id_coroutine[session] = nil
elseif co == nil then
unknown_response(session, source, msg, sz)
else
local tag = session_coroutine_tracetag[co]
if tag then c.trace(tag, "resume") end
session_id_coroutine[session] = nil
suspend(co, coroutine_resume(co, true, msg, sz))
end
else
......
end
end
首先他会查找相关联的协程,这个协程在skynet.timeout中创建,并且和session id相关联。创建协程又是一个很难啃的函数:
local function co_create(f)
local co = table.remove(coroutine_pool)
if co == nil then
co = coroutine_create(function(...)
f(...)
while true do
local session = session_coroutine_id[co]
if session and session ~= 0 then
local source = debug.getinfo(f,"S")
skynet.error(string.format("Maybe forgot response session %s from %s : %s:%d",
session,
skynet.address(session_coroutine_address[co]),
source.source, source.linedefined))
end
-- coroutine exit
local tag = session_coroutine_tracetag[co]
if tag ~= nil then
if tag then c.trace(tag, "end") end
session_coroutine_tracetag[co] = nil
end
local address = session_coroutine_address[co]
if address then
session_coroutine_id[co] = nil
session_coroutine_address[co] = nil
end
-- recycle co into pool
f = nil
coroutine_pool[#coroutine_pool+1] = co
-- recv new main function f
f = coroutine_yield "SUSPEND"
f(coroutine_yield())
end
end)
else
-- pass the main function f to coroutine, and restore running thread
local running = running_thread
coroutine_resume(co, f)
running_thread = running
end
return co
end
他利用了协程池的思想。我们再看看raw_dispatch_message函数,找到了session id对应的协程,就开始coroutine_resume执行协程了。协程函数就是skynet.start()里面的参数函数。
如果这个函数里没有阻塞的调用,就像下面一个简单的函数:
skynet.start(
function()
print('test')
end
)
那么这个协程函数很快就执行完了。请看co_create的代码和下面的图:
接下来分析while循环,首先是查找协程对应的session id。目前我们并没有把协程对应的session id记录下来,只把session id对应的协程记录了下来,所以这个为空。接着是用于调试的一些分析,暂且跳过。
到了关键的地方,保存协程到协程池,然后该协程调用coroutine_yield "SUSPEND"挂起。回到主函数驱动协程的地方,将会调用suspend:
function suspend(co, result, command)
if not result then
local session = session_coroutine_id[co]
if session then -- coroutine may fork by others (session is nil)
local addr = session_coroutine_address[co]
if session ~= 0 then
-- only call response error
local tag = session_coroutine_tracetag[co]
if tag then c.trace(tag, "error") end
c.send(addr, skynet.PTYPE_ERROR, session, "")
end
session_coroutine_id[co] = nil
session_coroutine_address[co] = nil
session_coroutine_tracetag[co] = nil
end
skynet.fork(function() end) -- trigger command "SUSPEND"
error(debug.traceback(co,tostring(command)))
end
if command == "SUSPEND" then
dispatch_wakeup()
dispatch_error_queue()
elseif command == "QUIT" then
-- service exit
return
elseif command == "USER" then
-- See skynet.coutine for detail
error("Call skynet.coroutine.yield out of skynet.coroutine.resume\n" .. debug.traceback(co))
elseif command == nil then
-- debug trace
return
else
error("Unknown command : " .. command .. "\n" .. debug.traceback(co))
end
end
如果协程函数执行没有错误,那么第二个参数result为true,第三个参数是协程挂起时传入的参数,这里为"SUSPEND"。将会调用dispatch_wakeup和dispatch_error_queue,我们将有机会说到这两个函数。
suspend调用结束,那么消息回调函数执行完毕,一条消息的生命周期走完。
我们分析了一条很简单,几乎没有什么卵用,但是很完整的消息回调执行步骤。接下来试着分析稍微复杂点,会被阻塞,例如有call调用,假设一个服务A调用服务B:
skynet.start(
function()
print('in A service ')
skynet.call('B', 'lua', 'cmd', ...)
end
)
当调用call的时候会协程会阻塞:
function skynet.call(addr, typename, ...)
local p = proto[typename]
local session = c.send(addr, p.id , nil , p.pack(...))
if session == nil then
error("call to invalid address " .. skynet.address(addr))
end
return p.unpack(yield_call(addr, session))
end
我们看到通过底层的c调用,call首先会把消息发送给目的地,并且产生一个关联的session id,然后调用yield_call将协程挂起:
local function yield_call(service, session)
watching_session[session] = service
session_id_coroutine[session] = running_thread
local succ, msg, sz = coroutine_yield "SUSPEND"
watching_session[session] = nil
if not succ then
error "call failed"
end
return msg,sz
end
在这个函数里,将把session id对应的协程记录下来,然后挂起协程。接下来的代码分析和上面的一样,此时A服务由timeout产生的消息将执行完毕了,但是这个函数的代码却还没有执行完,第一次分析的时候还觉得有点不可思议。那么什么时候再来执行后面的代码呢,我们慢慢道来。
假设B服务的主函数代码也非常简单,就和第一个例子一样。第一条由timeout产生的消息执行完了,接下来收到了A服务发过来的消息,由于A是用lua协议发的,type是PTYPE_LUA(10),所以将执行raw_dispatch_message后面的部分:
local function raw_dispatch_message(prototype, msg, sz, session, source)
-- skynet.PTYPE_RESPONSE = 1, read skynet.h
if prototype == 1 then
......
else
local p = proto[prototype]
if p == nil then
......
end
local f = p.dispatch
if f then
local co = co_create(f)
session_coroutine_id[co] = session
session_coroutine_address[co] = source
......
suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
else
trace_source[source] = nil
if session ~= 0 then
c.send(source, skynet.PTYPE_ERROR, session, "")
else
unknown_request(session, source, msg, sz, proto[prototype].name)
end
end
end
end
我们可以看到B服务要有相应协议的dispatch函数才有正确的执行,他的动作首先是调用co_create创建一个协程,注意此时将走co_create的后半部分。要明白其中的逻辑,刚开始有点绕,和我初学的时候一样,下面的图帮你理解:
先是将原先挂起的协程恢复,此时coroutine_yield返回,返回值为新的协程函数,然后再次挂起,最后返回一个协程。这段代码着实绕,短短的数几十行代码,协程挂起恢复了好几次。此时协程仍然是挂起的,raw_dispatch_message创建协程(更准确的说是获取协程池中的一个)之后,先将协程对应的session记录下来,同时也将协程对应的源地址也记录下来。然后将再次调用coroutine_resume恢复协程,再次对比上面的图,f(coroutine_yield()),此时将执行p.dispatch(session,source, p.unpack(msg,sz))函数,也就是B服务注册的相关协议的函数了。
当协议处理函数执行完毕后,注意此时协程函数仍然在执行,因为co_create有个while循环。此时将判断该协程对应的session id还在不在,什么意思呢?原来A服务是call调用,上面提到B服务收到消息后执行回调函数时记录了协程对应的session值(注意是send调用是没有session值的)。那么这个记录什么时候去掉呢?不要忘记了,我们必须对A服务进行回复,A服务才能恢复。所以某个服务被call时一定要调用skynet.ret或skynet.response给对方发送一条消息,这样才能清除session记录,避免了一条警告,同时对方服务恢复执行。
接下来co_create就会挂起了,还是主线程supend接管。这样A发送给B的一条消息回调就执行完毕了。下次B服务再有消息将会循环上面过程,周而复始。我们看到这种情况下多条消息只会用到一个协程池的一个协程而已。
再来说A服务。A服务挂起后,等待B服务调用skynet.ret回复,在回复中会带上A给B消息的session,同时消息类型为PTYPE_RESPONSE,这样一来A服务执行其回调函数,和最上面的一样,通过session找到其协程,然后恢复。
这篇文章简单介绍了几个基本消息的执行流程情景分析,下次再讲讲fork和sleep消息的执行流程。
欢迎加入QQ群 858791125 讨论skynet,游戏后台开发,lua脚本语言等问题。