基于live555实现流媒体代理服务器(3)

基于live555实现rtsp代理服务器总体业务逻辑流程,现在就可以梳理一下了,参见协议流程图:

基于live555实现流媒体代理服务器(3)

注意几点即可:

1、在rtsp代理服务器收到describe消息的时候,通过streamId首先查询前向的rtsp url;然后通过rtsp代理客户端发送options、describe等消息,都是需要异步通信的,通过上一篇文章的介绍的异步通信方式即可;直到等到了前向的sdp,我们才能给后向回复带sdp的200 OK响应;

2、setup消息无需关系顺序,原流程无需修改,上面图片可能少画了一个setup消息,如果是音视频的话,应该有两个setup消息,一个audio、一个video;当然,也可能更多,3d视频的话,也许有两个video的setup等等;

3、在rtsp代理服务器收到play消息的时候,还需要再次进行异步阻塞等待,此时需要检查前面rtsp代理客户端发送的多个setup消息是否都收到了响应,如果少收到了一个响应,就意味着后向代理的流少一个通道;等待的方式还是异步递归阻塞;一般来说检查这个参数即可:

//判断是否所有媒体子会话都发送了setup消息,或建立了媒体通道
Boolean ProxyRTSPClient::StreamIsReady()
{
  if(!fSetupQueueTail) {
    return True;
  }

  return False;
}

4、还是关于上面第三条的问题,为什么需要等待所有的setup响应完成?其实,这里不等待的话,媒体代理也是可以完成的,但是如果后向play发送的比较快,而前向rtspclient响应的setup的200OK比较慢的话,就会出现丢失媒体子会话的问题,并不是总是会引起问题的。还有一个就是live555默认的业务逻辑,只要前向回复了至少一个setup响应,那么他就认为可以完成后向代理了;通常情况下,这是不对,你要么代理全部子流,要不不要代理,代理一部分是咋回事呢?看看英文的官方解释:

void ProxyRTSPClient::continueAfterSETUP(int resultCode)
{
  if(resultCode != 0) {
    // The "SETUP" command failed, so arrange to reset the state. (We don't do this now, because it deletes the
    // "ProxyServerMediaSubsession", and we can't do that during "ProxyServerMediaSubsession::createNewStreamSource()".)
    scheduleReset();
    return;
  }

  RTSP_INFO("::continueAfterSETUP(): head codec: %s; numSubsessions %d queue:\n",
            fSetupQueueHead->codecName(), fSetupQueueHead->fParentSession->numSubsessions());

  for(ProxyServerMediaSubsession *p = fSetupQueueHead; p != NULL; p = p->fNext) {
    RTSP_INFO("%s\n", p->codecName());
  }

  envir().taskScheduler().unscheduleDelayedTask(fSubsessionTimerTask);  // in case it had been set

  // Dequeue the first "ProxyServerMediaSubsession" from our 'SETUP queue'.  It will be the one for which this "SETUP" was done:
  ProxyServerMediaSubsession *smss = fSetupQueueHead; // Assert: != NULL
  fSetupQueueHead = fSetupQueueHead->fNext;

  if(fSetupQueueHead == NULL) { fSetupQueueTail = NULL; }

  if(fSetupQueueHead != NULL) {
    // There are still entries in the queue, for tracks for which we have still to do a "SETUP".
    // "SETUP" the first of these now:
    sendSetupCommand(fSetupQueueHead->fClientMediaSubsession, ::continueAfterSETUP,
                     False, fStreamRTPOverTCP, False, fOurAuthenticator);
    ++fNumSetupsDone;
    fSetupQueueHead->fHaveSetupStream = True;
  } else {
    /**begin:modify by **: 梳理rtsp代理消息流程,要求必须所有子会话都建立完成了,才允许发送play消息;否则就失败
      if (fNumSetupsDone >= smss->fParentSession->numSubsessions()) {
        // We've now finished setting up each of our subsessions (i.e., 'tracks').
        // Continue by sending a "PLAY" command (an 'aggregate' "PLAY" command, on the whole session):
        sendPlayCommand(smss->fClientMediaSubsession.parentSession(), ::continueAfterPLAY, -1.0f, -1.0f, 1.0f, fOurAuthenticator);
            // the "-1.0f" "start" parameter causes the "PLAY" to be sent without a "Range:" header, in case we'd already done
            // a "PLAY" before (as a result of a 'subsession timeout' (note below))
        fLastCommandWasPLAY = True;
      } else {
        // Some of this session's subsessions (i.e., 'tracks') remain to be "SETUP".  They might get "SETUP" very soon, but it's
        // also possible - if the remote client chose to play only some of the session's tracks - that they might not.
        // To allow for this possibility, we set a timer.  If the timer expires without the remaining subsessions getting "SETUP",
        // then we send a "PLAY" command anyway:

        fSubsessionTimerTask
    = envir().taskScheduler().scheduleDelayedTask(SUBSESSION_TIMEOUT_SECONDS*MILLION, (TaskFunc*)subsessionTimeout, this);
      }
    end: modify by ** : 为了保证代理服务器尽量不缓存不丢包,只有在收到客户端的play消息的时候才发送代理客户端play,来确保所有媒体通道都准备好了**/
  }
}
虽然有一点道理,不过如果你这样实现,肯定会被老板骂死的,我请求音视频流,偶尔只能听到音频,无视频;偶尔只有视频无音频,这是不对的。

好了为了完整表述,下面还是多贴一点代码吧!rtsp代理服务器play函数处理中,添加等待的代码

void RTSPServer::RTSPClientSession::handleCmd_PLAY(RTSPServer::RTSPClientConnection *ourClientConnection,
                                                   ServerMediaSubsession *subsession, char const *fullRequestStr)
{
  if(!fWaitSetupFinishFlag) {
    //在所有的setup消息全部都处理完之前,不处理重复的paly消息
    ourClientConnection->setRTSPResponse("486 ProxyServer Is Busy", fOurSessionId);
    RTSP_WARN("unabled to handle the repeat rtsp play message\n");
    return;
  }

  /*begin add by **: 梳理直播代理服务器消息流程*/
  ProxyServerMediaSession *sms = (ProxyServerMediaSession *)fOurServerMediaSession;
  if(sms->isProxyServerMediaSession() && sms->fProxyRTSPClient && !sms->fProxyRTSPClient->fLastCommandWasPLAY) {
#ifdef DEBUG
    RTSP_INFO("Begin to Wait the remoter for preparing media at %s ", dateHeader());
#endif
    // wait 6s until setup finished.
    unsigned usecsToDelay = 100 * 1000; // 100ms
    unsigned times = 60;//等待时间小于客户端保活时间
    u_int32_t sessionId = fOurSessionId;
    UsageEnvironment &env = envir();
    while(true) {
      if(!times || !ourClientConnection->fIsActive || !fOurServerMediaSession) {
        RTSP_WARN("Wait 200 OK of Setup timeout ...\n");
        ourClientConnection->setRTSPResponse("408 Proxy Client Timeout Error", sessionId);
        return;
      } else if(sms->fProxyRTSPClient && sms->fProxyRTSPClient->StreamIsReady()) {
        ProxyRTSPClient::sendPLAY(sms->fProxyRTSPClient);
        break;
      }
      fWaitSetupFinishFlag = 0;
      env.taskScheduler().scheduleDelayedTask(usecsToDelay, (TaskFunc *)checkWatchFunc, (void *)&fWaitSetupFinishFlag);
      env.taskScheduler().doEventLoop(&fWaitSetupFinishFlag);
      --times;
    }
#ifdef DEBUG
    RTSP_INFO("End to Wait the remoter for preparing media at %s ", dateHeader());
#endif
  }
  /*end add by **: 梳理直播代理服务器消息流程*/

以下代码省略;

通过上面几个关键点的修改,一个完成的rtsp同步代理服务器就已经实现完成了。

相对于,第一篇文章中,默认的live555ProxyServer.cpp来说,有如下特点:

1、无需启动进程的时候,指定代理rtsp url,中途不容易修改,并且也不会建立N多rtsp连接空跑,纯粹按需连接;

2、整个rtsp代理流程是同步的,前向连接失败可以直接反馈到后向点播;

3、rtsp的流url可以有第三方服务器动态管理,动态添加或删除,这个图中的第三方服务器就是vms,实际流代理是vms中的流媒体代理服务器完成的。