SRS(simple-rtmp-server)流媒体服务器源码分析--启动

SRS(simple-rtmp-server)流媒体服务器源码分析--系统启动

一、前言

       小卒最近看SRS源码,随手写下博客,其一为了整理思路,其二也是为日后翻看方便。如果不足之处,请指教!

首先总结一下SRS源码的优点:

       1、轻量级,代码结构清楚,目前SRS3.0代码8万行左右,但几乎满足直播业务的所有要求。

       2、SRS采用State Threads,支持高并发量,高性能。

       3、SRS支持rtmp和hls,满足PC和移动直播要求。

       4、SRS支持集群部署。小集群Forward,大集群edge。

代码分析可分为两个阶段:
       一:分析代码框架,理清楚组织流程
       二:分析代码细节,熟悉SRS工作原理

二、代码分析

相关SRS源码其他总结:

       SRS(simple-rtmp-server)流媒体服务器源码分析--系统启动
       SRS(simple-rtmp-server)流媒体服务器源码分析--RTMP消息play

       SRS(simple-rtmp-server)流媒体服务器源码分析--RTMP信息Publish

           SRS(simple-rtmp-server)流媒体服务器源码分析--HLS切片

现阶段,我主要以代码框架梳理为主。Srs源码框架如下图:

SRS(simple-rtmp-server)流媒体服务器源码分析--启动

       
         系统在启动时,初始化相关类,监听相关端口,若来一个访问请求,则为该链接创建一个线程,专门处理与该链接的操作。
         main函数在srs_main_server.cpp这个文件中。在main函数中,启动参数在这里不做过多介绍。直接从run()-> run_master()看起。
[html] view plain copy
  1. int run_master()  
  2. {  
  3.     int ret = ERROR_SUCCESS;  
  4.       
  5.     if ((ret = _srs_server->initialize_st()) != ERROR_SUCCESS) {  
  6.         return ret;  
  7.     }  
  8.       
  9.     if ((ret = _srs_server->initialize_signal()) != ERROR_SUCCESS) {  
  10.         return ret;  
  11.     }  
  12.     //将pid进程号写进文件  
  13.     if ((ret = _srs_server->acquire_pid_file()) != ERROR_SUCCESS) {  
  14.         return ret;  
  15.     }  
  16.     //客户端监听  
  17.     if ((ret = _srs_server->listen()) != ERROR_SUCCESS) {  
  18.         return ret;  
  19.     }  
  20.       
  21.     if ((ret = _srs_server->register_signal()) != ERROR_SUCCESS) {  
  22.         return ret;  
  23.     }  
  24.       
  25.     if ((ret = _srs_server->http_handle()) != ERROR_SUCCESS) {  
  26.         return ret;  
  27.     }  
  28.       
  29.     if ((ret = _srs_server->ingest()) != ERROR_SUCCESS) {  
  30.         return ret;  
  31.     }  
  32.       
  33.     if ((ret = _srs_server->cycle()) != ERROR_SUCCESS) {  
  34.         return ret;  
  35.     }  
  36.       
  37.     return 0;  
  38. }  

进入客户监听

[html] view plain copy
  1. if ((ret = _srs_server->listen()) != ERROR_SUCCESS) {  
  2.        return ret;  
  3.    }  
 监听内容:  不同的连接请求,有不同的监听
[html] view plain copy
  1. int SrsServer::listen()  
  2. {  
  3.     int ret = ERROR_SUCCESS;  
  4.     // 创建一个rtmp的Streamlistener  
  5.     if ((ret = listen_rtmp()) != ERROR_SUCCESS) {  
  6.         return ret;  
  7.     }  
  8.       
  9.     if ((ret = listen_http_api()) != ERROR_SUCCESS) {  
  10.         return ret;  
  11.     }  
  12.       
  13.     if ((ret = listen_http_stream()) != ERROR_SUCCESS) {  
  14.         return ret;  
  15.     }  
  16.       
  17.     if ((ret = listen_stream_caster()) != ERROR_SUCCESS) {  
  18.         return ret;  
  19.     }  
  20.       
  21.     return ret;  
  22. }  

1、首先分析RTMP连接 

[html] view plain copy
  1. int SrsServer::listen_rtmp()  
  2. {  
  3.     int ret = ERROR_SUCCESS;  
  4.       
  5.     // stream service port.  
  6.     std::vector<std::string> ip_ports = _srs_config->get_listens();  
  7.     srs_assert((int)ip_ports.size() > 0);  
  8.       
  9.     close_listeners(SrsListenerRtmpStream);  
  10.       
  11.     for (int i = 0; i < (int)ip_ports.size(); i++) {  
  12.         SrsListener* listener = new SrsStreamListener(this, SrsListenerRtmpStream);  
  13.         listeners.push_back(listener);  
  14.           
  15.         std::string ip;  
  16.         int port;  
  17.         srs_parse_endpoint(ip_ports[i], ip, port);  
  18.           
  19.         if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) {  
  20.             srs_error("RTMP stream listen at %s:%d failed. ret=%d", ip.c_str(), port, ret);  
  21.             return ret;  
  22.         }  
  23.     }  
  24.       
  25.     return ret;  
  26. }  
        这里是listen_rtmp()函数,你也可以去看看listen_http_api()函数、listen_http_stream()函数,其实结构都很相似,只是在创建SrsStreamListener对象时,传入了不同的参数SrsListenerRtmpStream、SrsListenerHttpApi、SrsListenerHttpStream,代表了不同类型的监听对象。
[html] view plain copy
  1. // listen_rtmp 中listen监听走这里了。  
  2. int SrsStreamListener::listen(string i, int p)  
  3. {  
  4.     int ret = ERROR_SUCCESS;  
  5.       
  6.     ip = i;  
  7.     port = p;  
  8.   
  9.     srs_freep(listener);  
  10.     listener = new SrsTcpListener(this, ip, port);  
  11.   
  12.     if ((ret = listener->listen()) != ERROR_SUCCESS) {  
  13.         srs_error("tcp listen failed. ret=%d", ret);  
  14.         return ret;  
  15.     }  
  16.       
  17.     srs_info("listen thread current_cid=%d, "  
  18.         "listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",  
  19.         _srs_context->get_id(), p, type, listener->fd(), i.c_str(), p);  
  20.   
  21.     srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());  
  22.   
  23.     return ret;  
  24. }  
注意,这里有大量纯虚函数,不要走错路了。进入TCP监听代码
[html] view plain copy
  1. // rtmp tcp监听  
  2. int SrsTcpListener::listen()  
  3. {  
  4. //C++ Socket编程  
  5.     int ret = ERROR_SUCCESS;  
  6.     // 1、创建套接字,流式Socket(SOCK_STREAM)  
  7.     if ((_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {  
  8.         ret = ERROR_SOCKET_CREATE;  
  9.         srs_error("create linux socket error. port=%d, ret=%d", port, ret);  
  10.         return ret;  
  11.     }  
  12.     srs_verbose("create linux socket success. port=%d, fd=%d", port, _fd);  
  13.       
  14.     int reuse_socket = 1;  
  15.     if (setsockopt(_fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)) == -1) {  
  16.         ret = ERROR_SOCKET_SETREUSE;  
  17.         srs_error("setsockopt reuse-addr error. port=%d, ret=%d", port, ret);  
  18.         return ret;  
  19.     }  
  20.     srs_verbose("setsockopt reuse-addr success. port=%d, fd=%d", port, _fd);  
  21.       
  22.     sockaddr_in addr;  
  23.     addr.sin_family = AF_INET;  
  24.     addr.sin_port = htons(port);  
  25.     addr.sin_addr.s_addr = inet_addr(ip.c_str());  
  26.     // 2、绑定套接字到一个IP地址和一个端口上  
  27.     if (bind(_fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) {  
  28.         ret = ERROR_SOCKET_BIND;  
  29.         srs_error("bind socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);  
  30.         return ret;  
  31.     }  
  32.     srs_verbose("bind socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);  
  33.     // 3、将套接字设置为监听模式等待连接请求  
  34.     if (::listen(_fd, SERVER_LISTEN_BACKLOG) == -1) {  
  35.         ret = ERROR_SOCKET_LISTEN;  
  36.         srs_error("listen socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);  
  37.         return ret;  
  38.     }  
  39.     srs_verbose("listen socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);  
  40.       
  41.     if ((_stfd = st_netfd_open_socket(_fd)) == NULL){  
  42.         ret = ERROR_ST_OPEN_SOCKET;  
  43.         srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret);  
  44.         return ret;  
  45.     }  
  46.     srs_verbose("st open socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);  
  47.     // 4、等到连接一个客户之后,开启一个新的线程  
  48.     if ((ret = pthread->start()) != ERROR_SUCCESS) {  
  49.         srs_error("st_thread_create listen thread error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);  
  50.         return ret;  
  51.     }  
  52.     srs_verbose("create st listen thread success, ep=%s:%d", ip.c_str(), port);  
  53.       
  54.     return ret;  
  55. }  
       此代码为C++ TCP  Socket代码,思路比较清晰,可以看到,每接受到一个rtmp访问请求,创建一个”线程“,这里暂时将其称为线程,后面再做具体介绍。创建线程代码如下:
[html] view plain copy
  1. int SrsReusableThread::start()  
  2. {  
  3.     return pthread->start();  
  4. }  

[cpp] view plain copy
  1. int SrsThread::start()  
  2. {  
  3.     int ret = ERROR_SUCCESS;  
  4.   
  5.     if(tid) {  
  6.         srs_info("thread %s already running.", _name);  
  7.         return ret;  
  8.     }  
  9.     if((tid = st_thread_create(thread_fun, this, (_joinable? 1:0), 0)) == NULL){  
  10.         ret = ERROR_ST_CREATE_CYCLE_THREAD;  
  11.     srs_error("st_thread_create failed. ret=%d", ret);  
  12.     return ret;  
  13.     }  
  14.     disposed = false;  
  15.     // we set to loop to true for thread to run.  
  16.     loop = true;  
  17.     // wait for cid to ready, for parent thread to get the cid.  
  18.     while (_cid < 0) {  
  19.         st_usleep(10 * 1000);  
  20.     }  
  21.     // now, cycle thread can run.  
  22.     can_run = true;  
  23.     return ret;  
  24. }  

       来到了st_thread_create,这里要注意,这是SRS开源项目具有高并发,高性能的重要一步。这里创建的是协程,不是线程。协程是有别于进程和线程的一种组件,具有进程的独立性和线程的轻量级,听说微信能够支持8亿用户量,也是采用协程这种网络服务框架:http://www.infoq.com/cn/articles/CplusStyleCorourtine-At-Wechat。

从这里可以看出,srs是一个单线程的服务器,采用协程,主持高并发,高性能。

创建协程,协程函数为:thread_fun()

[html] view plain copy
  1. // 每连链接一个用户,创建一个协程程,该函数为协程函数  
  2. void* SrsThread::thread_fun(void* arg)  
  3. {  
  4.     SrsThread* obj = (SrsThread*)arg;  
  5.     srs_assert(obj);  
  6.     // 进入线程循环  
  7.     obj->thread_cycle();  
  8.       
  9.     // for valgrind to detect.  
  10.     SrsThreadContext* ctx = dynamic_cast<SrsThreadContext*>(_srs_context);  
  11.     if (ctx) {  
  12.         ctx->clear_cid();  
  13.     }  
  14.       
  15.     st_thread_exit(NULL);  
  16.       
  17.     return NULL;  
  18. }  
此时,真正进入了协程循环处理
[html] view plain copy
  1. void SrsThread::thread_cycle()  
  2.     {  
  3.         int ret = ERROR_SUCCESS;  
  4.           
  5.         _srs_context->generate_id();  
  6.         srs_info("thread %s cycle start", _name);  
  7.           
  8.         _cid = _srs_context->get_id();  
  9.           
  10.         srs_assert(handler);  
  11.         handler->on_thread_start();  
  12.           
  13.         // thread is running now.  
  14.         really_terminated = false;  
  15.           
  16.         // wait for cid to ready, for parent thread to get the cid.  
  17.         while (!can_run && loop) {  
  18.             st_usleep(10 * 1000);  
  19.         }  
  20.           
  21.         while (loop) {  
  22.             if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) {  
  23.                 srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", _name, ret);  
  24.                 goto failed;  
  25.             }  
  26.             srs_info("thread %s on before cycle success", _name);  
  27.             // 注意纯虚函数的应用  
  28.             if ((ret = handler->cycle()) != ERROR_SUCCESS) {  
  29.                 if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) {  
  30.                     srs_warn("thread %s cycle failed, ignored and retry, ret=%d", _name, ret);  
  31.                 }  
  32.                 goto failed;  
  33.             }  
  34.             srs_info("thread %s cycle success", _name);  
  35.               
  36.             if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) {  
  37.                 srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", _name, ret);  
  38.                 goto failed;  
  39.             }  
  40.             srs_info("thread %s on end cycle success", _name);  
  41.               
  42.         failed:  
  43.             if (!loop) {  
  44.                 break;  
  45.             }  
  46.               
  47.             // to improve performance, donot sleep when interval is zero.  
  48.             // @see: https://github.com/ossrs/srs/issues/237  
  49.             if (cycle_interval_us != 0) {  
  50.                 st_usleep(cycle_interval_us);  
  51.             }  
  52.         }  
  53.           
  54.         // readly terminated now.  
  55.         really_terminated = true;  
  56.           
  57.         handler->on_thread_stop();  
  58.         srs_info("thread %s cycle finished", _name);  
  59.     }  
       至此,一定要熟悉C++纯虚函数的引用,本人刚学了几天C++,对虚函数和纯虚函数在SRS源码中的应用很不习惯! 好了,进入循环ret = handler->cycle()
[html] view plain copy
  1. int SrsConnection::cycle()  
  2. {  
  3.     int ret = ERROR_SUCCESS;  
  4.       
  5.     _srs_context->generate_id();  
  6.     id = _srs_context->get_id();  
  7.       
  8.     ip = srs_get_peer_ip(st_netfd_fileno(stfd));  
  9.     //srs_trace("ip:%s", ip);  
  10.       
  11.     ret = do_cycle();  
  12.       
  13.     // if socket io error, set to closed.  
  14.     if (srs_is_client_gracefully_close(ret)) {  
  15.         ret = ERROR_SOCKET_CLOSED;  
  16.     }  
  17.       
  18.     // success.  
  19.     if (ret == ERROR_SUCCESS) {  
  20.         srs_trace("client finished.");  
  21.     }  
  22.       
  23.     // client close peer.  
  24.     if (ret == ERROR_SOCKET_CLOSED) {  
  25.         srs_warn("client disconnect peer. ret=%d", ret);  
  26.     }  
  27.   
  28.     return ERROR_SUCCESS;  
  29. }  
进入ret=do_cycle();
[cpp] view plain copy
  1. // TODO: return detail message when error for client.  
  2. int SrsRtmpConn::do_cycle()  
  3. {  
  4.     int ret = ERROR_SUCCESS;  
  5.       
  6.     srs_trace("RTMP client ip=%s", ip.c_str());  
  7.   
  8.     rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);  
  9.     rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);  
  10.       
  11.     if ((ret = rtmp->handshake()) != ERROR_SUCCESS) {  
  12.         srs_error("rtmp handshake failed. ret=%d", ret);  
  13.         return ret;  
  14.     }  
  15.     srs_verbose("rtmp handshake success");  
  16.       
  17.     if ((ret = rtmp->connect_app(req)) != ERROR_SUCCESS) {  
  18.         srs_error("rtmp connect vhost/app failed. ret=%d", ret);  
  19.         return ret;  
  20.     }  
  21.     srs_verbose("rtmp connect app success");  
  22.       
  23.     // set client ip to request.  
  24.     req->ip = ip;  
  25.       
  26.     // discovery vhost, resolve the vhost from config  
  27.     SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost);  
  28.     if (parsed_vhost) {  
  29.         req->vhost = parsed_vhost->arg0();  
  30.     }  
  31.       
  32.     srs_info("discovery app success. schema=%s, vhost=%s, port=%s, app=%s",  
  33.         req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str());  
  34.       
  35.     if (req->schema.empty() || req->vhost.empty() || req->port.empty() || req->app.empty()) {  
  36.         ret = ERROR_RTMP_REQ_TCURL;  
  37.         srs_error("discovery tcUrl failed. "  
  38.             "tcUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, ret=%d",  
  39.             req->tcUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str(), ret);  
  40.         return ret;  
  41.     }  
  42.       
  43.     // check vhost  
  44.     if ((ret = check_vhost()) != ERROR_SUCCESS) {  
  45.         srs_error("check vhost failed. ret=%d", ret);  
  46.         return ret;  
  47.     }  
  48.     srs_verbose("check vhost success.");  
  49.       
  50.     srs_trace("connect app, "  
  51.         "tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, args=%s",   
  52.         req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(),   
  53.         req->schema.c_str(), req->vhost.c_str(), req->port.c_str(),  
  54.         req->app.c_str(), (req->args? "(obj)":"null"));  
  55.       
  56.     // show client identity  
  57.     if(req->args) {  
  58.         std::string srs_version;  
  59.         std::string srs_server_ip;  
  60.         int srs_pid = 0;  
  61.         int srs_id = 0;  
  62.           
  63.         SrsAmf0Any* prop = NULL;  
  64.         if ((prop = req->args->ensure_property_string("srs_version")) != NULL) {  
  65.             srs_version = prop->to_str();  
  66.         }  
  67.         if ((prop = req->args->ensure_property_string("srs_server_ip")) != NULL) {  
  68.             srs_server_ip = prop->to_str();  
  69.         }  
  70.         if ((prop = req->args->ensure_property_number("srs_pid")) != NULL) {  
  71.             srs_pid = (int)prop->to_number();  
  72.         }  
  73.         if ((prop = req->args->ensure_property_number("srs_id")) != NULL) {  
  74.             srs_id = (int)prop->to_number();  
  75.         }  
  76.           
  77.         srs_info("edge-srs ip=%s, version=%s, pid=%d, id=%d",   
  78.             srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);  
  79.         if (srs_pid > 0) {  
  80.             srs_trace("edge-srs ip=%s, version=%s, pid=%d, id=%d",   
  81.                 srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);  
  82.         }  
  83.     }  
  84.       
  85.     ret = service_cycle();  
  86.       
  87.     http_hooks_on_close();  
  88.   
  89.     return ret;  
  90. }  
行了,rtmp连接就到这里,要不然都快到rtmp流接受代码了,和系统启动越走越远了,rtmp流接受后面再分析。


2、再分析http-api连接,回到int SrsServer::listen()函数中,梳理http-api链接

[html] view plain copy
  1. int SrsServer::listen_http_api()  
  2. {  
  3.     int ret = ERROR_SUCCESS;  
  4.       
  5. #ifdef SRS_AUTO_HTTP_API  
  6.     close_listeners(SrsListenerHttpApi);  
  7.     if (_srs_config->get_http_api_enabled()) {  
  8.         SrsListener* listener = new SrsStreamListener(this, SrsListenerHttpApi);  
  9.         listeners.push_back(listener);  
  10.           
  11.         std::string ep = _srs_config->get_http_api_listen();  
  12.           
  13.         std::string ip;  
  14.         int port;  
  15.         srs_parse_endpoint(ep, ip, port);  
  16.           
  17.         if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) {  
  18.             srs_error("HTTP api listen at %s:%d failed. ret=%d", ip.c_str(), port, ret);  
  19.             return ret;  
  20.         }  
  21.     }  
  22. #endif  
  23.       
  24.     return ret;  
  25. }  
listen_http_api()函数和listen_rtmp()函数内容非常像,再走到listener->listen()里面看看,结果来到了
[html] view plain copy
  1. int SrsStreamListener::listen(string i, int p)  
  2. {  
  3.     int ret = ERROR_SUCCESS;  
  4.       
  5.     ip = i;  
  6.     port = p;  
  7.   
  8.     srs_freep(listener);  
  9.     listener = new SrsTcpListener(this, ip, port);  
  10.   
  11.     if ((ret = listener->listen()) != ERROR_SUCCESS) {  
  12.         srs_error("tcp listen failed. ret=%d", ret);  
  13.         return ret;  
  14.     }  
  15.       
  16.     srs_info("listen thread current_cid=%d, "  
  17.         "listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",  
  18.         _srs_context->get_id(), p, type, listener->fd(), i.c_str(), p);  
  19.   
  20.     srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());  
  21.   
  22.     return ret;  
  23. }  
和rtmp链接监听机制完全一样,只是type不同而已
[html] view plain copy
  1. enum SrsListenerType   
  2. {  
  3.     // RTMP client,  
  4.     SrsListenerRtmpStream       = 0,  
  5.     // HTTP api,  
  6.     SrsListenerHttpApi          = 1,  
  7.     // HTTP stream, HDS/HLS/DASH  
  8.     SrsListenerHttpStream       = 2,  
  9.     // UDP stream, MPEG-TS over udp.  
  10.     SrsListenerMpegTsOverUdp    = 3,  
  11.     // TCP stream, RTSP stream.  
  12.     SrsListenerRtsp             = 4,  
  13.     // TCP stream, FLV stream over HTTP.  
  14.     SrsListenerFlv              = 5,  
  15. };  
我就看了两个链接监听,监听到此为止。

3、http api回调注册

回到run_master()函数中,从_srs_server->http_handle()看起。

[html] view plain copy
  1. int SrsServer::http_handle()  
  2. {  
  3.     int ret = ERROR_SUCCESS;  
  4.       
  5. #ifdef SRS_AUTO_HTTP_API  
  6.     srs_assert(http_api_mux);  
  7.     if ((ret = http_api_mux->handle("/", new SrsHttpNotFoundHandler())) != ERROR_SUCCESS) {  
  8.         return ret;  
  9.     }  
  10.     if ((ret = http_api_mux->handle("/api/", new SrsGoApiApi())) != ERROR_SUCCESS) {  
  11.         return ret;  
  12.     }  
  13.     if ((ret = http_api_mux->handle("/api/v1/", new SrsGoApiV1())) != ERROR_SUCCESS) {  
  14.         return ret;  
  15.     }  
  16.     if ((ret = http_api_mux->handle("/api/v1/versions", new SrsGoApiVersion())) != ERROR_SUCCESS) {  
  17.         return ret;  
  18.     }  
  19.     if ((ret = http_api_mux->handle("/api/v1/summaries", new SrsGoApiSummaries())) != ERROR_SUCCESS) {  
  20.         return ret;  
  21.     }  
  22.     if ((ret = http_api_mux->handle("/api/v1/rusages", new SrsGoApiRusages())) != ERROR_SUCCESS) {  
  23.         return ret;  
  24.     }  
  25.     if ((ret = http_api_mux->handle("/api/v1/self_proc_stats", new SrsGoApiSelfProcStats())) != ERROR_SUCCESS) {  
  26.         return ret;  
  27.     }  
  28.     if ((ret = http_api_mux->handle("/api/v1/system_proc_stats", new SrsGoApiSystemProcStats())) != ERROR_SUCCESS) {  
  29.         return ret;  
  30.     }  
  31.     if ((ret = http_api_mux->handle("/api/v1/meminfos", new SrsGoApiMemInfos())) != ERROR_SUCCESS) {  
  32.         return ret;  
  33.     }  
  34.     if ((ret = http_api_mux->handle("/api/v1/authors", new SrsGoApiAuthors())) != ERROR_SUCCESS) {  
  35.         return ret;  
  36.     }  
  37.     if ((ret = http_api_mux->handle("/api/v1/features", new SrsGoApiFeatures())) != ERROR_SUCCESS) {  
  38.         return ret;  
  39.     }  
  40.     if ((ret = http_api_mux->handle("/api/v1/vhosts/", new SrsGoApiVhosts())) != ERROR_SUCCESS) {  
  41.         return ret;  
  42.     }  
  43.     if ((ret = http_api_mux->handle("/api/v1/streams/", new SrsGoApiStreams())) != ERROR_SUCCESS) {  
  44.         return ret;  
  45.     }  
  46.     if ((ret = http_api_mux->handle("/api/v1/clients/", new SrsGoApiClients())) != ERROR_SUCCESS) {  
  47.         return ret;  
  48.     }  
  49.       
  50.     // test the request info.  
  51.     if ((ret = http_api_mux->handle("/api/v1/tests/requests", new SrsGoApiRequests())) != ERROR_SUCCESS) {  
  52.         return ret;  
  53.     }  
  54.     // test the error code response.  
  55.     if ((ret = http_api_mux->handle("/api/v1/tests/errors", new SrsGoApiError())) != ERROR_SUCCESS) {  
  56.         return ret;  
  57.     }  
  58.     // test the redirect mechenism.  
  59.     if ((ret = http_api_mux->handle("/api/v1/tests/redirects", new SrsHttpRedirectHandler("/api/v1/tests/errors", SRS_CONSTS_HTTP_MovedPermanently))) != ERROR_SUCCESS) {  
  60.         return ret;  
  61.     }  
  62.     // test the http vhost.  
  63.     if ((ret = http_api_mux->handle("error.srs.com/api/v1/tests/errors", new SrsGoApiError())) != ERROR_SUCCESS) {  
  64.         return ret;  
  65.     }  
  66.       
  67.     // TODO: FIXME: for console.  
  68.     // TODO: FIXME: support reload.  
  69.     std::string dir = _srs_config->get_http_stream_dir() + "/console";  
  70.     if ((ret = http_api_mux->handle("/console/", new SrsHttpFileServer(dir))) != ERROR_SUCCESS) {  
  71.         srs_error("http: mount console dir=%s failed. ret=%d", dir.c_str(), ret);  
  72.         return ret;  
  73.     }  
  74.     srs_trace("http: api mount /console to %s", dir.c_str());  
  75. #endif  
  76.   
  77.     return ret;  
  78. }  

该函数注册了http-api回调接口。可以参考:https://github.com/ossrs/srs/wiki/v2_CN_HTTPApi

比如我们可以访问http://ip:1985/api/v1  其中ip为SRS服务器地址,就可以看到从该接口返回srs服务器参数。


4、ingest(拉流,SRS主动去拉流,和推流相反)处理


注意:SRS对拉流的处理比较特殊,SRS拉流是通过ffmpeg工具去实现的,SRS代码只是实现简单的系统调用,这部分内容在后面的章节中详细说明。

[html] view plain copy
  1. int SrsIngester::start()  
  2. {  
  3.     int ret = ERROR_SUCCESS;  
  4.       
  5.     if ((ret = parse()) != ERROR_SUCCESS) {  
  6.         clear_engines();  
  7.         ret = ERROR_SUCCESS;  
  8.         return ret;  
  9.     }  
  10.       
  11.     // even no ingesters, we must also start it,  
  12.     // for the reload may add more ingesters.  
  13.       
  14.     // start thread to run all encoding engines.  
  15.     if ((ret = pthread->start()) != ERROR_SUCCESS) {  
  16.         srs_error("st_thread_create failed. ret=%d", ret);  
  17.         return ret;  
  18.     }  
  19.     srs_trace("ingest thread cid=%d, current_cid=%d", pthread->cid(), _srs_context->get_id());  
  20.       
  21.     return ret;  
  22. }  
到此,可以看出,和监听过程一样,进入int SrsThread::start()函数,只是传入对象不一样而已。

5、SRS自服务


[html] view plain copy
  1. int SrsServer::cycle()  
  2. {  
  3.     int ret = ERROR_SUCCESS;  
  4.     srs_trace("SrsServer")  
  5.   
  6.     ret = do_cycle();  
  7.   
  8. #ifdef SRS_AUTO_GPERF_MC  
  9.     destroy();  
  10.       
  11.     // remark, for gmc, never invoke the exit().  
  12.     srs_warn("sleep a long time for system st-threads to cleanup.");  
  13.     st_usleep(3 * 1000 * 1000);  
  14.     srs_warn("system quit");  
  15. #else  
  16.     // normally quit with neccessary cleanup by dispose().  
  17.     srs_warn("main cycle terminated, system quit normally.");  
  18.     dispose();  
  19.     srs_trace("srs terminated");  
  20.       
  21.     // for valgrind to detect.  
  22.     srs_freep(_srs_config);  
  23.     srs_freep(_srs_log);  
  24.       
  25.     exit(0);  
  26. #endif  
  27.       
  28.     return ret;  
  29. }  
[html] view plain copy
  1. int SrsServer::do_cycle()  
  2. {  
  3.     int ret = ERROR_SUCCESS;  
  4.       
  5.     // find the max loop  
  6.     int max = srs_max(0, SRS_SYS_TIME_RESOLUTION_MS_TIMES);  
  7.       
  8. #ifdef SRS_AUTO_STAT  
  9.     max = srs_max(max, SRS_SYS_RUSAGE_RESOLUTION_TIMES);  
  10.     max = srs_max(max, SRS_SYS_CPU_STAT_RESOLUTION_TIMES);  
  11.     max = srs_max(max, SRS_SYS_DISK_STAT_RESOLUTION_TIMES);  
  12.     max = srs_max(max, SRS_SYS_MEMINFO_RESOLUTION_TIMES);  
  13.     max = srs_max(max, SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES);  
  14.     max = srs_max(max, SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES);  
  15.     max = srs_max(max, SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES);  
  16. #endif  
  17.       
  18.     // for asprocess.  
  19.     bool asprocess = _srs_config->get_asprocess();  
  20.       
  21.     // the deamon thread, update the time cache  
  22.     while (true) {  
  23.         if(handler && (ret = handler->on_cycle((int)conns.size())) != ERROR_SUCCESS){  
  24.             srs_error("cycle handle failed. ret=%d", ret);  
  25.             return ret;  
  26.         }  
  27.               
  28.         // the interval in config.  
  29.         int heartbeat_max_resolution = (int)(_srs_config->get_heartbeat_interval() / SRS_SYS_CYCLE_INTERVAL);  
  30.           
  31.         // dynamic fetch the max.  
  32.         int temp_max = max;  
  33.         temp_max = srs_max(temp_max, heartbeat_max_resolution);  
  34.           
  35.         for (int i = 0; i < temp_max; i++) {  
  36.             st_usleep(SRS_SYS_CYCLE_INTERVAL * 1000);  
  37.               
  38.             // asprocess check.  
  39.             if (asprocess && ::getppid() != ppid) {  
  40.                 srs_warn("asprocess ppid changed from %d to %d", ppid, ::getppid());  
  41.                 return ret;  
  42.             }  
  43.               
  44.             // gracefully quit for SIGINT or SIGTERM.  
  45.             if (signal_gracefully_quit) {  
  46.                 srs_trace("cleanup for gracefully terminate.");  
  47.                 return ret;  
  48.             }  
  49.           
  50.             // for gperf heap checker,  
  51.             // @see: research/gperftools/heap-checker/heap_checker.cc  
  52.             // if user interrupt the program, exit to check mem leak.  
  53.             // but, if gperf, use reload to ensure main return normally,  
  54.             // because directly exit will cause core-dump.  
  55. #ifdef SRS_AUTO_GPERF_MC  
  56.             if (signal_gmc_stop) {  
  57.                 srs_warn("gmc got singal to stop server.");  
  58.                 return ret;  
  59.             }  
  60. #endif  
  61.           
  62.             // do reload the config.  
  63.             if (signal_reload) {  
  64.                 signal_reload = false;  
  65.                 srs_info("get signal reload, to reload the config.");  
  66.                   
  67.                 if ((ret = _srs_config->reload()) != ERROR_SUCCESS) {  
  68.                     srs_error("reload config failed. ret=%d", ret);  
  69.                     return ret;  
  70.                 }  
  71.                 srs_trace("reload config success.");  
  72.             }  
  73.               
  74.             // notice the stream sources to cycle.  
  75.             if ((ret = SrsSource::cycle_all()) != ERROR_SUCCESS) {  
  76.                 return ret;  
  77.             }  
  78.               
  79.             // update the cache time  
  80.             if ((i % SRS_SYS_TIME_RESOLUTION_MS_TIMES) == 0) {  
  81.                 srs_info("update current time cache.");  
  82.                 srs_update_system_time_ms();  
  83.             }  
  84.               
  85. #ifdef SRS_AUTO_STAT  
  86.             if ((i % SRS_SYS_RUSAGE_RESOLUTION_TIMES) == 0) {  
  87.                 srs_info("update resource info, rss.");  
  88.                 srs_update_system_rusage();  
  89.             }  
  90.             if ((i % SRS_SYS_CPU_STAT_RESOLUTION_TIMES) == 0) {  
  91.                 srs_info("update cpu info, cpu usage.");  
  92.                 srs_update_proc_stat();  
  93.             }  
  94.             if ((i % SRS_SYS_DISK_STAT_RESOLUTION_TIMES) == 0) {  
  95.                 srs_info("update disk info, disk iops.");  
  96.                 srs_update_disk_stat();  
  97.             }  
  98.             if ((i % SRS_SYS_MEMINFO_RESOLUTION_TIMES) == 0) {  
  99.                 srs_info("update memory info, usage/free.");  
  100.                 srs_update_meminfo();  
  101.             }  
  102.             if ((i % SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES) == 0) {  
  103.                 srs_info("update platform info, uptime/load.");  
  104.                 srs_update_platform_info();  
  105.             }  
  106.             if ((i % SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES) == 0) {  
  107.                 srs_info("update network devices info.");  
  108.                 srs_update_network_devices();  
  109.             }  
  110.             if ((i % SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES) == 0) {  
  111.                 srs_info("update network server kbps info.");  
  112.                 resample_kbps();  
  113.             }  
  114.     #ifdef SRS_AUTO_HTTP_CORE  
  115.             if (_srs_config->get_heartbeat_enabled()) {  
  116.                 if ((i % heartbeat_max_resolution) == 0) {  
  117.                     srs_info("do http heartbeat, for internal server to report.");  
  118.                     http_heartbeat->heartbeat();  
  119.                 }  
  120.             }  
  121.     #endif  
  122. #endif  
  123.               
  124.             srs_info("server main thread loop");  
  125.         }  
  126.     }  
  127.   
  128.     return ret;  
  129. }  
主线程,更新srs时间和缓存!!至此,系统启动代码结构梳理完了。

三、总结

  • 启动不同的业务。
  • 监听不同的客户端类型。
  • 每链接一个客户端,SRS为其创建一个协程,专门负责该路链接信息交互
  • SRS系统采用了协程网络服务框架,使得系统具有高并发,高性能等有点。