Golang 实现 RTP

在 Coding 之前我们先来简单介绍一下 RTP(Real-time Transport Protocol), 正如它的名字所说,用于互联网的实时传输协议,通过 IP 网络传输音频和视频的网络协议。

由音视频传输工作小组开发,1996年首次发布,并提出了以下使用设想。

  1. 简单的多播音频会议

使用 IP 的多播服务进行语音通信。通过某种分配机制,获取多播组地址和端口对。一个端口用于音频数据的,另一个用于控制(RTCP)包,地址和端口信息被分发给预期的参与者。如果需要加密,可通过特定格式进行加密。

  1. 音视频会议

如果在会议中同时使用音视频媒体,那么它们是作为单独的 RTP 会话传输。音频,视频两个媒体分别使用不同的 UDP 端口对传输单独的 RTP 和 RTCP 数组包,多播地址可能相同,可能不同。进行这种分离的动机是如果参与者只想接受一种媒体,可以进行选择。

  1. Mixer 和 Translator

我们需要考虑这样一种情况,在某个会议中,大多数人处于高速网络链路中,而某个地方的一小部分人只能低速率连接。为了防止所有人使用低带宽,可以在低带宽区域放置一个 RTP 级的中继器 Mixer。Mixer 将接收的音频报文重新同步为发送方 20 ms恒定间隔,重建音频为单一流,音频编码为低速带宽的音频,然后转发给低速链路上的带宽数据包流。

  1. 分层编码

多媒体应用程序应该能调节传输速率以匹配接收者容量或适应网络拥塞。可以将调节速率的任务通过将分层编码和分层传输系统相结合来实现接收器。在基于 IP 多播的 RTP 上下文中,每个 RTP 会话均承载在自己的多播组上。然后,接收者可以只通过加入组播组合适的子集来调整接收带宽。

RTP 数据包头部字段

只有当 Mixer 存在时,才会存在 CSRC 标识符列表。这些字段具有以下含义。前 12 个 8 位组在每一个包中都有。

  • version (V): 2 bits

RTP 版本。

  • 填充 §: 1 bit

如果设置了填充位,包中包含至少一个填充 8 位组,其他填充位不属于 Payload。

  • 扩展 (X): 1 bit

如果设置了扩展位则存在。

  • CSRC 数量(CC): 4 bits

CSRC 数量包含在固定头中,CSRC 标识符数量。

  • 标记 (M): 1 bit

标记由配置文件定义。用于标记数据包流中例如帧边界之类的重要事件。

  • payload 类型(PT): 7 bits

该字段指出 RTP 有效载荷格式,由应用程序进行解释。接收者必须忽略无法理解的有效载荷类型的数据包。

  • 序列号: 16 bits

每次 RTP 数据包发送时增加,可能用于接收者检测包丢失并且恢复包序列。

  • 时间戳: 32 bits

该字段反映了 RTP 数据包中第一个 8 位组的采样时刻。

  • SSRC: 32 bits

标识同步源,这个标识符应该选择随机,在同一个 RTP 对话的两个同步源应该有不同的同步标识。

  • CSRC 列表:0 到 15 项, 其中每项 32 bits

该字段表示对该 payload 数据做出贡献所有 SSRC。

Golang 的相关实现

RTP 的实现有一些,不过通过 Go 实现有一些好处。

  • 易于测试

这里的易于测试不仅仅是体现在容易书写,能够快速通过源码,函数直接生成相应测试函数。而且更重要的是能够提供相应的基准测试,提供计时,并行执行,内存统计等参数供开发者进行相应调整。

  • 语言层面强大的 Web 开发能力

能够基于语言层面快速的对例JSON 解析,字段封装 。无需引入三方库。

  • 性能较为优异

相比于 Python,Ruby 等解释型语言快,比 node, erlang 等语言更易书写。如果服务中需要用并发,内置关键字 go 就可以快速起多个 goroutine。

Go 社区的 RTP 有 RTP 相关实现,对应的测试也比较全面,简单介绍一下。

package_test.go (基础测试)

func TestBasic(t *testing.T) {
  p := &Packet{}

  if err := p.Unmarshal([]byte{}); err == nil {
    t.Fatal("Unmarshal did not error on zero length packet")
  }

  rawPkt := []byte{
    0x90, 0xe0, 0x69, 0x8f, 0xd9, 0xc2, 0x93, 0xda, 0x1c, 0x64,
    0x27, 0x82, 0x00, 0x01, 0x00, 0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0x98, 0x36, 0xbe, 0x88, 0x9e,
  }
  parsedPacket := &Packet{
        // 固定头部
    Header: Header{
      Marker:           true,
      Extension:        true,
      ExtensionProfile: 1,
      Extensions: []Extension{
        {0, []byte{
          0xFF, 0xFF, 0xFF, 0xFF,
        }},
      },
      Version:        2,
      PayloadOffset:  20,
      PayloadType:    96,
      SequenceNumber: 27023,
      Timestamp:      3653407706,
      SSRC:           476325762,
      CSRC:           []uint32{},
    },
        // 有效负载
    Payload: rawPkt[20:],
    Raw:     rawPkt,
  }

  // Unmarshal to the used Packet should work as well.
  for i := 0; i < 2; i++ {
    t.Run(fmt.Sprintf("Run%d", i+1), func(t *testing.T) {
      if err := p.Unmarshal(rawPkt); err != nil {
        t.Error(err)
      } else if !reflect.DeepEqual(p, parsedPacket) {
        t.Errorf("TestBasic unmarshal: got %#v, want %#v", p, parsedPacket)
      }

      if parsedPacket.Header.MarshalSize() != 20 {
        t.Errorf("wrong computed header marshal size")
      } else if parsedPacket.MarshalSize() != len(rawPkt) {
        t.Errorf("wrong computed marshal size")
      }

      if p.PayloadOffset != 20 {
        t.Errorf("wrong payload offset: %d != %d", p.PayloadOffset, 20)
      }

      raw, err := p.Marshal()
      if err != nil {
        t.Error(err)
      } else if !reflect.DeepEqual(raw, rawPkt) {
        t.Errorf("TestBasic marshal: got %#v, want %#v", raw, rawPkt)
      }

      if p.PayloadOffset != 20 {
        t.Errorf("wrong payload offset: %d != %d", p.PayloadOffset, 20)
      }
    })
  }
}

基本测试中,利用 Golang 自带的 Unmarshal 快速将 byte 切片转换为相应结构体。减少了相关封包,解包等代码的工作量。在网络传输中,也能够在语言层面直接完成大端,小端编码的转换,减少编码的烦恼。

h.SequenceNumber = binary.BigEndian.Uint16(rawPacket[seqNumOffset : seqNumOffset+seqNumLength])
h.Timestamp = binary.BigEndian.Uint32(rawPacket[timestampOffset : timestampOffset+timestampLength])
h.SSRC = binary.BigEndian.Uint32(rawPacket[ssrcOffset : ssrcOffset+ssrcLength])

其中关于切片的相关操作十分便捷,可以获取数组中的某一段数据,操作比较灵活,在协议数据的传输过程中,通过切片,获取某段数据进行相应处理。

m := copy(buf[n:], p.Payload)
p.Raw = buf[:n+m]

在实现完成后,Golang 的子测试能够进行嵌套测试。对执行特定测试用例特别有用,只有子测试完成后,父测试才会返回。

func TestVP8PartitionHeadChecker_IsPartitionHead(t *testing.T) {
	checker := &VP8PartitionHeadChecker{}
	t.Run("SmallPacket", func(t *testing.T) {
		if checker.IsPartitionHead([]byte{0x00}) {
			t.Fatal("Small packet should not be the head of a new partition")
		}
	})
	t.Run("SFlagON", func(t *testing.T) {
		if !checker.IsPartitionHead([]byte{0x10, 0x00, 0x00, 0x00}) {
			t.Fatal("Packet with S flag should be the head of a new partition")
		}
	})
	t.Run("SFlagOFF", func(t *testing.T) {
		if checker.IsPartitionHead([]byte{0x00, 0x00, 0x00, 0x00}) {
			t.Fatal("Packet without S flag should not be the head of a new partition")
		}
	})
}

更多的相关实现可以去 GitHub(https://github.com/pion/rtp) 上看一下实现源码。

结尾

如果人为去关注相关的传输细节,可能在底层耗费大量时间,目前市面上有很多相关的实现方案,有开源的,和一些公司提供的一些方案。目前经过业界实践,陌陌,小米众多公司都采用了声网的 SDK 去进行相关的业务时间,部分公司甚至已经将核心业务交由处理,可见其稳定性。个人去测试了一下他们的云课堂相关服务,回放,在线演示等功能十分便捷,可以节约大量开发时间。

推荐阅读
作者信息
piglig
TA 暂未填写个人简介
文章
3
相关专栏
本专栏仅用于分享音视频相关的技术文章,与其他开发者和 Agora 研发团队交流、分享行业前沿技术、资讯。发帖前,请参考「社区发帖指南」,方便您更好的展示所发表的文章和内容。