基于live555实现流媒体代理服务器(3)
基于live555实现rtsp代理服务器总体业务逻辑流程,现在就可以梳理一下了,参见协议流程图:
注意几点即可:
1、在rtsp代理服务器收到describe消息的时候,通过streamId首先查询前向的rtsp url;然后通过rtsp代理客户端发送options、describe等消息,都是需要异步通信的,通过上一篇文章的介绍的异步通信方式即可;直到等到了前向的sdp,我们才能给后向回复带sdp的200 OK响应;
2、setup消息无需关系顺序,原流程无需修改,上面图片可能少画了一个setup消息,如果是音视频的话,应该有两个setup消息,一个audio、一个video;当然,也可能更多,3d视频的话,也许有两个video的setup等等;
3、在rtsp代理服务器收到play消息的时候,还需要再次进行异步阻塞等待,此时需要检查前面rtsp代理客户端发送的多个setup消息是否都收到了响应,如果少收到了一个响应,就意味着后向代理的流少一个通道;等待的方式还是异步递归阻塞;一般来说检查这个参数即可:
//判断是否所有媒体子会话都发送了setup消息,或建立了媒体通道
Boolean ProxyRTSPClient::StreamIsReady()
{
if(!fSetupQueueTail) {
return True;
}
return False;
}
4、还是关于上面第三条的问题,为什么需要等待所有的setup响应完成?其实,这里不等待的话,媒体代理也是可以完成的,但是如果后向play发送的比较快,而前向rtspclient响应的setup的200OK比较慢的话,就会出现丢失媒体子会话的问题,并不是总是会引起问题的。还有一个就是live555默认的业务逻辑,只要前向回复了至少一个setup响应,那么他就认为可以完成后向代理了;通常情况下,这是不对,你要么代理全部子流,要不不要代理,代理一部分是咋回事呢?看看英文的官方解释:
void ProxyRTSPClient::continueAfterSETUP(int resultCode)
{
if(resultCode != 0) {
// The "SETUP" command failed, so arrange to reset the state. (We don't do this now, because it deletes the
// "ProxyServerMediaSubsession", and we can't do that during "ProxyServerMediaSubsession::createNewStreamSource()".)
scheduleReset();
return;
}
RTSP_INFO("::continueAfterSETUP(): head codec: %s; numSubsessions %d queue:\n",
fSetupQueueHead->codecName(), fSetupQueueHead->fParentSession->numSubsessions());
for(ProxyServerMediaSubsession *p = fSetupQueueHead; p != NULL; p = p->fNext) {
RTSP_INFO("%s\n", p->codecName());
}
envir().taskScheduler().unscheduleDelayedTask(fSubsessionTimerTask); // in case it had been set
// Dequeue the first "ProxyServerMediaSubsession" from our 'SETUP queue'. It will be the one for which this "SETUP" was done:
ProxyServerMediaSubsession *smss = fSetupQueueHead; // Assert: != NULL
fSetupQueueHead = fSetupQueueHead->fNext;
if(fSetupQueueHead == NULL) { fSetupQueueTail = NULL; }
if(fSetupQueueHead != NULL) {
// There are still entries in the queue, for tracks for which we have still to do a "SETUP".
// "SETUP" the first of these now:
sendSetupCommand(fSetupQueueHead->fClientMediaSubsession, ::continueAfterSETUP,
False, fStreamRTPOverTCP, False, fOurAuthenticator);
++fNumSetupsDone;
fSetupQueueHead->fHaveSetupStream = True;
} else {
/**begin:modify by **: 梳理rtsp代理消息流程,要求必须所有子会话都建立完成了,才允许发送play消息;否则就失败
if (fNumSetupsDone >= smss->fParentSession->numSubsessions()) {
// We've now finished setting up each of our subsessions (i.e., 'tracks').
// Continue by sending a "PLAY" command (an 'aggregate' "PLAY" command, on the whole session):
sendPlayCommand(smss->fClientMediaSubsession.parentSession(), ::continueAfterPLAY, -1.0f, -1.0f, 1.0f, fOurAuthenticator);
// the "-1.0f" "start" parameter causes the "PLAY" to be sent without a "Range:" header, in case we'd already done
// a "PLAY" before (as a result of a 'subsession timeout' (note below))
fLastCommandWasPLAY = True;
} else {
// Some of this session's subsessions (i.e., 'tracks') remain to be "SETUP". They might get "SETUP" very soon, but it's
// also possible - if the remote client chose to play only some of the session's tracks - that they might not.
// To allow for this possibility, we set a timer. If the timer expires without the remaining subsessions getting "SETUP",
// then we send a "PLAY" command anyway:
fSubsessionTimerTask
= envir().taskScheduler().scheduleDelayedTask(SUBSESSION_TIMEOUT_SECONDS*MILLION, (TaskFunc*)subsessionTimeout, this);
}
end: modify by ** : 为了保证代理服务器尽量不缓存不丢包,只有在收到客户端的play消息的时候才发送代理客户端play,来确保所有媒体通道都准备好了**/
}
}
虽然有一点道理,不过如果你这样实现,肯定会被老板骂死的,我请求音视频流,偶尔只能听到音频,无视频;偶尔只有视频无音频,这是不对的。
好了为了完整表述,下面还是多贴一点代码吧!rtsp代理服务器play函数处理中,添加等待的代码
void RTSPServer::RTSPClientSession::handleCmd_PLAY(RTSPServer::RTSPClientConnection *ourClientConnection,
ServerMediaSubsession *subsession, char const *fullRequestStr)
{
if(!fWaitSetupFinishFlag) {
//在所有的setup消息全部都处理完之前,不处理重复的paly消息
ourClientConnection->setRTSPResponse("486 ProxyServer Is Busy", fOurSessionId);
RTSP_WARN("unabled to handle the repeat rtsp play message\n");
return;
}
/*begin add by **: 梳理直播代理服务器消息流程*/
ProxyServerMediaSession *sms = (ProxyServerMediaSession *)fOurServerMediaSession;
if(sms->isProxyServerMediaSession() && sms->fProxyRTSPClient && !sms->fProxyRTSPClient->fLastCommandWasPLAY) {
#ifdef DEBUG
RTSP_INFO("Begin to Wait the remoter for preparing media at %s ", dateHeader());
#endif
// wait 6s until setup finished.
unsigned usecsToDelay = 100 * 1000; // 100ms
unsigned times = 60;//等待时间小于客户端保活时间
u_int32_t sessionId = fOurSessionId;
UsageEnvironment &env = envir();
while(true) {
if(!times || !ourClientConnection->fIsActive || !fOurServerMediaSession) {
RTSP_WARN("Wait 200 OK of Setup timeout ...\n");
ourClientConnection->setRTSPResponse("408 Proxy Client Timeout Error", sessionId);
return;
} else if(sms->fProxyRTSPClient && sms->fProxyRTSPClient->StreamIsReady()) {
ProxyRTSPClient::sendPLAY(sms->fProxyRTSPClient);
break;
}
fWaitSetupFinishFlag = 0;
env.taskScheduler().scheduleDelayedTask(usecsToDelay, (TaskFunc *)checkWatchFunc, (void *)&fWaitSetupFinishFlag);
env.taskScheduler().doEventLoop(&fWaitSetupFinishFlag);
--times;
}
#ifdef DEBUG
RTSP_INFO("End to Wait the remoter for preparing media at %s ", dateHeader());
#endif
}
/*end add by **: 梳理直播代理服务器消息流程*/
以下代码省略;
通过上面几个关键点的修改,一个完成的rtsp同步代理服务器就已经实现完成了。
相对于,第一篇文章中,默认的live555ProxyServer.cpp来说,有如下特点:
1、无需启动进程的时候,指定代理rtsp url,中途不容易修改,并且也不会建立N多rtsp连接空跑,纯粹按需连接;
2、整个rtsp代理流程是同步的,前向连接失败可以直接反馈到后向点播;
3、rtsp的流url可以有第三方服务器动态管理,动态添加或删除,这个图中的第三方服务器就是vms,实际流代理是vms中的流媒体代理服务器完成的。