Kurento 源码解析系列(4)- RtpEndpoint端点c层代码的调用处理
当完成rtpbaseendpoint的初始化后,根据我们业务调用的逻辑。首先是在服务端调用genrateOff方法,从而生成服务端的sdp信息返回给远端RTP,
然后远端处理sdp后返回answer给本端rtp,再然后就是本端的sdpbaseendpoint继续answer这个offer后,进开始发送数据。流程大致如下
假如我们有A服务器的rtp端点需要和B服务器的rtp端点进行通信,具体的代码在kurento中调用方法的流程如下:
首先在A中,我们调用genrateOffer方法,实际是发送了1个generate_offer信号,引发了kmsbasesdpendpoint.c中注册的回调
kms_base_sdp_endpoint_class_init (KmsBaseSdpEndpointClass * klass)方法中
klass->generate_offer = kms_base_sdp_endpoint_generate_offer;
klass->process_offer = kms_base_sdp_endpoint_process_offer;
klass->process_answer = kms_base_sdp_endpoint_process_answer;
A中将触发kms_base_sdp_endpoint_generate_offer方法,调用这个方法后,将调用
kms_base_sdp_endpoint_init_sdp_handlers (self, sess),初始化sdp处理类,这里升入解析后发现其实并么有真正处理SDP
然后通过 offer = kms_sdp_session_generate_offer (sess),这里才最终生成所需要的offer,详细生成offer的方法如下:
在kmssdpsession.c文件中.
GstSDPMessage *
kms_sdp_session_generate_offer (KmsSdpSession * self)
{
GstSDPMessage *offer = NULL;
GError *err = NULL;
gchar *sdp_str = NULL;//这里方法有引用了sdp_agent的create_offer方法,真是层层调用.
offer = kms_sdp_agent_create_offer (self->agent, &err);
if (err != NULL) {
GST_ERROR_OBJECT (self, "Generating SDP Offer: %s", err->message);
goto error;
}kms_sdp_agent_set_local_description (self->agent, offer, &err);
if (err != NULL) {
GST_ERROR_OBJECT (self, "Generating SDP Offer: %s", err->message);
goto error;
}if (gst_sdp_message_copy (offer, &self->local_sdp) != GST_SDP_OK) {
GST_ERROR_OBJECT (self, "Generating SDP Offer: gst_sdp_message_copy");
goto error;
}GST_DEBUG_OBJECT (self, "Generated SDP Offer:\n%s",
(sdp_str = gst_sdp_message_as_text (offer)));
g_free (sdp_str);
sdp_str = NULL;return offer;
error:
g_clear_error (&err);if (offer != NULL) {
gst_sdp_message_free (offer);
}return NULL;
}
继续关注sdpagent.c中的方法
GstSDPMessage *
kms_sdp_agent_create_offer (KmsSdpAgent * agent, GError ** error)
{
g_return_val_if_fail (KMS_IS_SDP_AGENT (agent), NULL);return KMS_SDP_AGENT_GET_CLASS (agent)->create_offer (agent, error);
}
继续看sdp_agent的create_offer方法的实现方法
klass->create_offer = kms_sdp_agent_create_offer_impl;具体方法实现如下
static GstSDPMessage *
kms_sdp_agent_create_offer_impl (KmsSdpAgent * agent, GError ** error)
{
GstSDPMessage *offer = NULL;
GstSDPOrigin o;
GSList *tmp = NULL;
gboolean state_changed = FALSE;
gboolean failed = TRUE;SDP_AGENT_LOCK (agent);
if (agent->priv->state != KMS_SDP_AGENT_STATE_UNNEGOTIATED &&
agent->priv->state != KMS_SDP_AGENT_STATE_NEGOTIATED) {
g_set_error (error, KMS_SDP_AGENT_ERROR, SDP_AGENT_INVALID_STATE,
"Agent in state %s", SDP_AGENT_STATE (agent));
goto end;
}if (gst_sdp_message_new (&offer) != GST_SDP_OK) {
g_set_error_literal (error, KMS_SDP_AGENT_ERROR, SDP_AGENT_INVALID_STATE,
"Can not allocate SDP offer");
goto end;
}if (!kms_sdp_agent_set_default_session_attributes (offer, error)) {
goto end;
}if (agent->priv->state == KMS_SDP_AGENT_STATE_NEGOTIATED) {
const GstSDPOrigin *orig;orig = gst_sdp_message_get_origin (agent->priv->local_description);
set_sdp_session_description (&agent->priv->local, orig->sess_id,
orig->sess_version);
} else {
generate_sdp_session_description (&agent->priv->local);
}kms_sdp_agent_origin_init (agent, &o, agent->priv->local.id,
agent->priv->local.version);if (!kms_sdp_agent_set_origin (offer, &o, error)) {
goto end;
}/* Execute pre-processing extensions */
if (!kms_sdp_agent_offer_processing_extensions (agent, offer, TRUE, error)) {
goto end;
}tmp = g_slist_copy_deep (agent->priv->offer_handlers,
(GCopyFunc) sdp_handler_ref, NULL);
kms_sdp_agent_merge_offer_handlers (agent);/* Process medias */
if (!kms_sdp_agent_create_media_offer (agent, offer, error)) {
goto end;
}if (!kms_sdp_agent_update_session_version (agent, offer, error)) {
goto end;
}/* Execute post-processing extensions */
if (!kms_sdp_agent_offer_processing_extensions (agent, offer, FALSE, error)) {
gst_sdp_message_free (offer);
offer = NULL;
goto end;
}g_slist_free_full (tmp, (GDestroyNotify) kms_ref_struct_unref);
SDP_AGENT_NEW_STATE (agent, KMS_SDP_AGENT_STATE_LOCAL_OFFER);
state_changed = TRUE;
tmp = NULL;failed = FALSE;
end:
if (tmp != NULL) {
g_slist_free_full (agent->priv->offer_handlers,
(GDestroyNotify) kms_ref_struct_unref);
agent->priv->offer_handlers = tmp;
}SDP_AGENT_UNLOCK (agent);
if (failed && offer != NULL) {
gst_sdp_message_free (offer);
return NULL;
}if (state_changed) {
g_object_notify_by_pspec (G_OBJECT (agent), obj_properties[PROP_STATE]);
}return offer;
}
其中几个主要的GST原生的结构体
GstSDPOrigin , spd协议中o=xxx,后面的内容
/**
* GstSDPOrigin:
* @username: the user's login on the originating host, or it is "-"
* if the originating host does not support the concept of user ids.
* @sess_id: is a numeric string such that the tuple of @username, @sess_id,
* @nettype, @addrtype and @addr form a globally unique identifier for the
* session.
* @sess_version: a version number for this announcement
* @nettype: the type of network. "IN" is defined to have the meaning
* "Internet".
* @addrtype: the type of @addr.
* @addr: the globally unique address of the machine from which the session was
* created.
*
* The contents of the SDP "o=" field which gives the originator of the session
* (their username and the address of the user's host) plus a session id and
* session version number.
*/
typedef struct {
gchar *username;
gchar *sess_id;
gchar *sess_version;
gchar *nettype;
gchar *addrtype;
gchar *addr;
} GstSDPOrigin;
sdp协议中,c=xxx后面的内容
/**
* GstSDPConnection:
* @nettype: the type of network. "IN" is defined to have the meaning
* "Internet".
* @addrtype: the type of @address.
* @address: the address
* @ttl: the time to live of the address
* @addr_number: the number of layers
*
* The contents of the SDP "c=" field which contains connection data.
*/
typedef struct {
gchar *nettype;
gchar *addrtype;
gchar *address;
guint ttl;
guint addr_number;
} GstSDPConnection;
sdp协议最后解析为结构体的格式
/**
* GstSDPMessage:
* @version: the protocol version
* @origin: owner/creator and session identifier
* @session_name: session name
* @information: session information
* @uri: URI of description
* @emails: array of #gchar with email addresses
* @phones: array of #gchar with phone numbers
* @connection: connection information for the session
* @bandwidths: array of #GstSDPBandwidth with bandwidth information
* @times: array of #GstSDPTime with time descriptions
* @zones: array of #GstSDPZone with time zone adjustments
* @key: encryption key
* @attributes: array of #GstSDPAttribute with session attributes
* @medias: array of #GstSDPMedia with media descriptions
*
* The contents of the SDP message.
*/
typedef struct {
gchar *version;
GstSDPOrigin origin;
gchar *session_name;
gchar *information;
gchar *uri;
GArray *emails;
GArray *phones;
GstSDPConnection connection;
GArray *bandwidths;
GArray *times;
GArray *zones;
GstSDPKey key;
GArray *attributes;
GArray *medias;
} GstSDPMessage;
//根据调用rtp的业务流程,当创建sdp并且建立网络链接后,baseRtpEndpoint将根据media的类型来创建并添加1个数据拆包的pad,然后根据再根据拉取数据的endpoint数据类型,看后续如何处理
//具体调用方法
static void
kms_base_rtp_endpoint_rtpbin_pad_added(
GstElement *rtpbin, GstPad *pad, KmsBaseRtpEndpoint *self)
{
//agnostic这里可以认为是1个临时的中间pad,用于根据map协商的处理媒体信息来确认下一步连接什么处理的拆包pad
//depayloader则是拆包的bin,如果拉流和推流是相同的媒体类型,则只是拆包
//如果拉流和推流媒体类型不一致,并且都是系统支持的编码类型,则会自动进行转码
GstElement *agnostic, *depayloader;
gboolean added = TRUE;
KmsMediaType media;
GstCaps *caps;GST_PAD_STREAM_LOCK(pad);
//检测如果是音频,获取1个音频转码的element
if (g_str_has_prefix(GST_OBJECT_NAME(pad), AUDIO_RTPBIN_RECV_RTP_SRC))
{
agnostic = kms_element_get_audio_agnosticbin(KMS_ELEMENT(self));
media = KMS_MEDIA_TYPE_AUDIO;
}//检测如果是视频,获取1个视频可以转码的element
else if (g_str_has_prefix(
GST_OBJECT_NAME(pad), VIDEO_RTPBIN_RECV_RTP_SRC))
{
agnostic = kms_element_get_video_agnosticbin(KMS_ELEMENT(self));
media = KMS_MEDIA_TYPE_VIDEO;
//这里用于处理rtcp需要返回的相关信息
if (self->priv->rl != NULL)
{
self->priv->rl->event_manager = kms_utils_remb_event_manager_create(pad);
}
}
//如果是其他的数据,则不处理
else
{
added = FALSE;
goto end;
}
//获取rtp中sdp携带的当前处理媒体流的负载类型信息
//caps=application/x-rtp, media=(string)video, payload=(int)96, clock-rate=(int)90000, encoding-name=(string)H264, packetization-mode=(string)1, sprop-parameter-sets=(string)"Z2QAH62EAQwgCGEAQwgCGEAQwgCEK1AoAt03AQEBQAAAAwBAAAAMoQ\=\=\,aO48sA\=\=", profile-level-id=(string)64001F
caps = gst_pad_query_caps(pad, NULL);
GST_ERROR_OBJECT(self,
"New pad: %" GST_PTR_FORMAT " for linking to %" GST_PTR_FORMAT
" with caps %" GST_PTR_FORMAT,
pad, agnostic, caps);
//根据rtp的sdp协商的信息来获取正确的拆包的gst对象,目前video支持的h264和vp8
depayloader = kms_base_rtp_endpoint_get_depayloader_for_caps(caps);
//释放caps,这里caps相当于再中间做一些存取的变量
gst_caps_unref(caps);//如果找到了对应的拆包对象
if (depayloader != NULL)
{
GST_DEBUG_OBJECT(self, "Found depayloader %" GST_PTR_FORMAT, depayloader);
kms_base_rtp_endpoint_update_stats(self, depayloader, media);
gst_bin_add(GST_BIN(self), depayloader);
gst_element_link_pads(depayloader, "src", agnostic, "sink");
gst_element_link_pads(rtpbin, GST_OBJECT_NAME(pad), depayloader, "sink");
gst_element_sync_state_with_parent(depayloader);
}
//如果没找到,则直接将数据输出到1个fakesink对象,这个相当于1个黑洞,类似于linux输出流到/dev/null
else
{
GstElement *fake = gst_element_factory_make("fakesink", NULL);g_object_set(fake, "async", FALSE, "sync", FALSE, NULL);
GST_WARNING_OBJECT(
self, "Depayloder not found for pad %" GST_PTR_FORMAT, pad);gst_bin_add(GST_BIN(self), fake);
gst_element_link_pads(rtpbin, GST_OBJECT_NAME(pad), fake, "sink");
gst_element_sync_state_with_parent(fake);
}end:
GST_PAD_STREAM_UNLOCK(pad);//如果添加pad成功,则发送1个MEDIA_START的信号,通知basertpendpoint的子对象,进行业务层面的处理
//给业务层出发1个启动媒体的事件
if (added)
{
g_signal_emit(G_OBJECT(self), obj_signals[MEDIA_START], 0, media, TRUE);
}
}
//其中关键的1步骤是根据caps获取到depayload的对象,方法如下:
static GstElement *
kms_base_rtp_endpoint_get_depayloader_for_caps(GstCaps *caps)
{
GstElementFactory *factory;
GstElement *depayloader = NULL;
GList *payloader_list, *filtered_list, *l;payloader_list = gst_element_factory_list_get_elements(
GST_ELEMENT_FACTORY_TYPE_DEPAYLOADER, GST_RANK_NONE);
filtered_list = gst_element_factory_list_filter(
payloader_list, caps, GST_PAD_SINK, FALSE);if (filtered_list == NULL)
{
goto end;
}for (l = filtered_list; l != NULL; l = l->next)
{
factory = GST_ELEMENT_FACTORY(l->data);if (factory == NULL)
{
continue;
}if (g_strcmp0(gst_plugin_feature_get_name(factory), "asteriskh263") == 0)
{
/* Do not use asteriskh263 for H263 */
continue;
}depayloader = gst_element_factory_create(factory, NULL);
if (depayloader != NULL)
{
kms_utils_depayloader_monitor_pts_out(depayloader);
break;
}
}end:
gst_plugin_feature_list_free(filtered_list);
gst_plugin_feature_list_free(payloader_list);return depayloader;
}
//如果成功的找到了1个拆包的对象 , 则继续处理这个depayload输出的pts,并生成1个新的pts输出
void
kms_utils_depayloader_monitor_pts_out (GstElement * depayloader)
{
GstPad *src_pad;GST_INFO_OBJECT (depayloader, "Add probe: Adjust depayloader PTS out");
//获取depayloader的src的pad
src_pad = gst_element_get_static_pad (depayloader, "src");
//添加1个发现缓存数据的检测时间,检测到数据后对根据服务器的时间对pts进行重新调整
gst_pad_add_probe (src_pad,
GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST,
//具体调整pts的方法
(GstPadProbeCallback) kms_utils_depayloader_pts_out_probe,
kms_utils_adjust_pts_data_new (depayloader),
(GDestroyNotify) kms_utils_adjust_pts_data_destroy);
g_object_unref (src_pad);
}
//具体处理拆包后数据的pts的方法
static GstPadProbeReturn
kms_utils_depayloader_pts_out_probe (GstPad * pad, GstPadProbeInfo * info,
AdjustPtsData * data)
{
if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER) {
GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info);buffer = gst_buffer_make_writable (buffer);
kms_utils_depayloader_adjust_pts_out (data, buffer);
GST_PAD_PROBE_INFO_DATA (info) = buffer;
}
else if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
GstBufferList *list = GST_PAD_PROBE_INFO_BUFFER_LIST (info);list = gst_buffer_list_make_writable (list);
gst_buffer_list_foreach (list,
(GstBufferListFunc) kms_utils_depayloader_pts_out_it, data);
GST_PAD_PROBE_INFO_DATA (info) = list;
}
return GST_PAD_PROBE_OK;
}
//这里,rtpEndpoint的调用及拆包就完成了。
//接下来,如果我们要增加kurento支持的rtpEndpoint拆包的类型,那就要看看depayloader是如何生成的。我们将在下一章继续分析