HLS转UDP协议转换器的实现

 

1.   摘要

由于业务的需要,笔者所处的业务环境内的视频传输基本采用的是UDP的方式,为了便于对各类型的视频源进行统一的接入、传输、转发往往需要进行协议的转换。本文探索了对HLS视频流媒体服务提供的索引文件下载、索引文件内容分析、TS切片下载、UDP转发的基础业务流程。

2.   HLS视频服务系统

HTTP Live Streaming(缩写是HLS)是一个由苹果公司提出的基于HTTP流媒体网络传输协议。它的工作原理是把整个流分成一个个小的基于HTTP的TS切片文件来下载。在开始一个流媒体会话时,客户端会下载一个包含元数据的extendedM3U (m3u8)playlist文件,用于寻找可用的媒体流TS切片文件。HLS只请求基本的HTTP报文,与实时传输协议(RTP)、用户数据表协议(UDP)不同,HLS可以穿过任何允许HTTP数据通过的防火墙或者代理服务器。它也很容易使用内容分发网络来传输媒体流。

3.   M3u8文件示意

    通过ffmpeg+nginx+摄像头构建一个基本的HLS推流服务器,通过浏览器访问如下地址:http://192.168.1.102:10000/hls/camera/index.m3u8,可以下载一个名为index.m3u8的文件,用文本编辑器打开,内容如下:

               #EXTM3U             

#EXT-X-VERSION:3

#EXT-X-MEDIA-SEQUENCE:121

#EXT-X-TARGETDURATION:14

#EXTINF:6.360,

121.ts

#EXTINF:10.000,

122.ts

#EXTINF:13.720,

123.ts

这里面重点关注EXTINF之后的时长、.ts对应切片名称,根据时长、下载的切片大小可以计算出码率。

4.   M3u8分析模块

虽然python本身对http的处理提供了丰富的机制和手段,但是相对于m3u8 parser模块对于索引文件的处理来说还是有些单薄。m3u8 parser对于HLS的索引文件处理非常好用,基于m3u8分析器,能够非常方便的实现索引文件下载、内容分析、字段对应值的读取。

m3u8的下载可以通过pip进行,如果开发环境安装了多个版本,需要明确下载到相应的python版本下面。比如Python2.7的版本执行程序为python2,则下载命令为:

[[email protected]]$ python2 -m pip install m3u8

下载完毕后在代码中即可通过importm3u8引用

5.   开发环境

      系统:CentOS 7.4

      Python版本:3.6.3

      IDE:Pycharm

 

6.   HLS分析下载实现

类HLSDownloader为线程类,初始化时传入待下载的URL和TS切片内容存储队列。具体实现见如下代码

importthreading,socket,time

import m3u8 as hls

importurllib.request as request

classHLSDownloader(threading.Thread):

 

    def __init__(self,clip_queue,url):

 

        self.url = url

 

        self.clip_queue = clip_queue

 

        threading.Thread.__init__(self)

 

 

    def DownloadTS(self,url,duration):

 

        response = request.urlopen(url)

 

        if(200 == response.code):

 

            context = response.read()

 

           self.clip_queue.put((context,duration))

 

 

    def run(self):

 

        ts_files = []

 

        while (True):

 

            content = hls.load(self.url)

 

            slic = len(content.segments)

 

            for i in range(slic):

 

                if((content.segments[i].uri ints_files) == False):

 

                    print("startingDownload %s" %(content.segments[i].absolute_uri))

 

                   ts_files.append(content.segments[i].uri)

 

                   self.DownloadTS(content.segments[i].absolute_uri,content.segments[i].duration)

 

            time.sleep(content.target_duration)

7.   UDP转发实现

    通过绑定转发使用的网口、组播地址以及指定接受TS的队列实现从队列读取TS切片内容并通过指定网口将TS数据组播到指定地址和端口

importthreading,socket,time

importurllib.request as request

class UDPSender(threading.Thread):

    def__init__(self, clip_queue, localAddr, groupAddr):

 

       self.clip_queue = clip_queue

       self.groupAddr = groupAddr

 

 

       threading.Thread.__init__(self)

 

       sockHandle = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

        status =sockHandle.bind(localAddr)

        status =sockHandle.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)

       self.sockHandle = sockHandle

 

 

    defrun(self):

 

       udp_frame_size = 7 * 188

 

        #每200ms秒发送一次

       send_interval = 200/1000

 

        while(True):

 

            node= self.clip_queue.get()

            data= node[0]

           duration = node[1]

 

           data_size = len(data)

           frames = data_size//udp_frame_size

 

           total_times = int(duration * (1/send_interval))

            # 根据TS传输速率控制发送速度,太快容易丢包,

           rest_interval = data_size//total_times

 

           sent_data_size = 0

            foreach in range(frames):

 

               frame = data[each * udp_frame_size : (each + 1) * udp_frame_size]

               sent_data_size += self.sockHandle.sendto(frame, self.groupAddr)

 

               if(sent_data_size > rest_interval ):

                   time.sleep(send_interval)

                   sent_data_size = 0

 

           self.sockHandle.sendto(data[each * udp_frame_size : data_size],self.groupAddr)

 

8.   主控程序入口

from queue import Queue

from taskmanager import HLSDownloader,UDPSender

import fcntl,socket,struct

 

def get_ip_address(ifname):

    s =socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    keys =s.fileno()

    returnsocket.inet_ntoa(fcntl.ioctl(keys,0x8915,struct.pack('256s',ifname[:15].encode()))[20:24])

 

 

def main():

    clip_deque =Queue(5)

    url ='http://192.168.1.102:10000/hls/camera/index.m3u8'

    mdloader =HLSDownloader(clip_deque,url)

   mdloader.start()

 

    ip =get_ip_address('eth0')

    udpSender =UDPSender(clip_deque,(ip,11000),('230.1.1.1',5001))

   udpSender.start()

 

if __name__ == '__main__':

main()

 

9.   业务流程

9.1  启动nginx

  笔者手上的nginx不是标准版本,具备了rtmp直播流接收,然后自动形成m3u8索引文件、TS切片文件,提供http服务。

 HLS转UDP协议转换器的实现

9.2  启动ffmpeg

视频部分实时采集摄像头、音频部分实时采集麦克风。Ffmpeg的命令如下:

ffmpeg -f dshow -i video="IntegratedCamera":audio="麦克风 (High Definition Audio 设备)" -aspect4:3 -r 25 -vcodec libx264  -acodec aac-ar 44100 -ab 64k  -ac 2 -f flvrtmp://127.0.0.1:1935/live/camera

正常运行后ffmpeg便可源源不断的打印采集、编码日志

HLS转UDP协议转换器的实现

9.3  原始HLS流查看

可以通过手机、VLC查看推送的直播流,访问地址是http://nginx服务器IP:10000/hls/camera/index.m3u8

HLS转UDP协议转换器的实现

9.4  转发后数据查看

    通过VLC打开udp://@230.1.1.1:5001即可看到流畅的传播画面

   HLS转UDP协议转换器的实现