SRS流媒体服务器(6)源码分析之推流篇
0.基础知识
srs代码优美简洁,重点需明白rtmp相关基础知识。否则很难阅读明白具体可以学习往期博客:
- RTMP协议分析_h264与rtmp格式-CSDN博客
- RTMP 传输结构分析_rtmp chunk消息-CSDN博客
- rtmp封包分析之flv格式封包细节_flv sequence header-CSDN博客
1.回顾
根据SRS流媒体服务器(5)可知,服务启动SrsServer → 初始化 SrsBufferListener → 每个 SrsBufferListener 管理一个 SrsTcpListener → SrsTcpListener 通过协程循环接受新连接 → on_tcp_client 回调到上层SrsServer → SrsServer::accept_client 接收新 TCP 连接 → 创建SrsRtmpConn连接对象→SrsRtmpConn::do_cycle()协程驱动cycle()主循环→完成握手、应用连接、媒体流传输→连接断开清理。本章节目标是分析rtmp应用连接推流创建、媒体流传输。
2.推流源码分析
2.1 开始入口
根据SRS流媒体服务器(5) SrsRtmpConn::do_cycle()处理一个rtmp连接握手,随后执行SrsRtmpConn::stream_service_cycle() 设置各种RTMP参数、处理带宽检查、响应客户端连接、以及循环处理流服务。核心是SrsRtmpConn::stream_service_cycle(),该函数是创建推拉流对象!
/*** @brief 处理RTMP客户端连接的主循环* * 负责处理一个RTMP客户端连接的生命周期,包括连接建立、握手、应用连接、服务循环以及断开连接等步骤。* * @return srs_error_t 错误码*/
srs_error_t SrsRtmpConn::do_cycle()
{srs_error_t err = srs_success;// 打印RTMP客户端的IP地址和端口srs_trace("RTMP client ip=%s:%d, fd=%d", ip.c_str(), port, srs_netfd_fileno(stfd));// 设置RTMP的接收和发送超时时间rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT);rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT);// 执行RTMP握手if ((err = rtmp->handshake()) != srs_success) {return srs_error_wrap(err, "rtmp handshake");}// 获取请求信息SrsRequest* req = info->req;if ((err = rtmp->connect_app(req)) != srs_success) {return srs_error_wrap(err, "rtmp connect tcUrl");}// 执行服务循环if ((err = service_cycle()) != srs_success) {err = srs_error_wrap(err, "service cycle");}srs_error_t r0 = srs_success;if ((r0 = on_disconnect()) != srs_success) {err = srs_error_wrap(err, "on disconnect %s", srs_error_desc(r0).c_str());srs_freep(r0);}// 如果客户端被重定向到其他服务器,则已经记录了该事件// If client is redirect to other servers, we already logged the event.if (srs_error_code(err) == ERROR_CONTROL_REDIRECT) {srs_error_reset(err);}return err;
}/*** @brief 处理RTMP连接的服务循环** 此函数负责处理RTMP连接的服务循环,包括设置各种RTMP参数、处理带宽检查、响应客户端连接、以及循环处理流服务。** @return srs_error_t 错误码。如果成功,返回srs_success;否则返回相应的错误码。*/
srs_error_t SrsRtmpConn::service_cycle()
{srs_error_t err = srs_success;// 获取请求信息SrsRequest* req = info->req;// 设置输出确认窗口大小int out_ack_size = _srs_config->get_out_ack_size(req->vhost);// 设置输入确认窗口大小int in_ack_size = _srs_config->get_in_ack_size(req->vhost);// 设置对等带宽if ((err = rtmp->set_peer_bandwidth((int)(2.5 * 1000 * 1000), 2)) != srs_success) {return srs_error_wrap(err, "rtmp: set peer bandwidth");}// 如果连接到的是带宽检查的虚拟主机,则进行带宽测试// do bandwidth test if connect to the vhost which is for bandwidth check.if (_srs_config->get_bw_check_enabled(req->vhost)).....// 设置块大小// set chunk size to larger.int chunk_size = _srs_config->get_chunk_size(req->vhost);// 响应客户端连接成功// response the client connect ok.if ((err = rtmp->response_connect_app(req, local_ip.c_str())) != srs_success) {return srs_error_wrap(err, "rtmp: response connect app");}// 设置带宽检测完成if ((err = rtmp->on_bw_done()) != srs_success) {return srs_error_wrap(err, "rtmp: on bw down");}// 循环处理流服务while (true) {// 从线程池中拉取任务if ((err = trd->pull()) != srs_success) {return srs_error_wrap(err, "rtmp: thread quit");}// 执行流服务循环err = stream_service_cycle();// 流服务必须以错误终止,从不会成功if (err == srs_success) {continue;}// 如果不是系统控制错误,则返回错误if (!srs_is_system_control_error(err)) {return srs_error_wrap(err, "rtmp: stream service");}// 如果是重新发布错误,则继续服务if (srs_error_code(err) == ERROR_CONTROL_REPUBLISH) {// 设置发送和接收超时时间为较大值,等待编码器重新发布rtmp->set_send_timeout(SRS_REPUBLISH_RECV_TIMEOUT);rtmp->set_recv_timeout(SRS_REPUBLISH_SEND_TIMEOUT);srs_info("rtmp: retry for republish");srs_freep(err);continue;}// 如果是RTMP关闭错误,则重试流服务if (srs_error_code(err) == ERROR_CONTROL_RTMP_CLOSE) {// TODO: FIXME: use ping message to anti-death of socket.// 设置接收超时时间为较大值,用于用户暂停rtmp->set_recv_timeout(SRS_PAUSED_RECV_TIMEOUT);rtmp->set_send_timeout(SRS_PAUSED_SEND_TIMEOUT);srs_trace("rtmp: retry for close");srs_freep(err);continue;}// 对于其他系统控制消息,返回错误// for other system control message, fatal error.return srs_error_wrap(err, "rtmp: reject");}return err;
}
2.1 推拉流对象创建
SrsRtmpConn::stream_service_cycle() 用于创建推拉流SrsLiveSource对象,该函数会进行对应rtmp类型的初始化操作,本博客以FMLE 发布流程举例,随后调用publishing(source);来接收流!
/*** @brief 流服务周期函数** 此函数是 SrsRtmpConn 类的一个成员函数,用于处理 RTMP 连接的服务周期。** @return srs_error_t 类型,表示函数执行的结果。如果执行成功,则返回 srs_success;否则返回相应的错误码。*/
srs_error_t SrsRtmpConn::stream_service_cycle()
{srs_error_t err = srs_success;SrsRequest* req = info->req;// 解析vhostSrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost);if (parsed_vhost) {req->vhost = parsed_vhost->arg0();}// 检查tcUrl各个字段是否为空// 检查vhost// 检查token// 安全检查// 不允许空流名// 设置超时时间// 查找或创建一个源SrsLiveSource* source = NULL;if ((err = _srs_sources->fetch_or_create(req, server, &source)) != srs_success) {return srs_error_wrap(err, "rtmp: fetch source");}srs_assert(source != NULL);// 设置gop缓存bool enabled_cache = _srs_config->get_gop_cache(req->vhost);switch (info->type) {case SrsRtmpConnPlay: {// 开始播放if ((err = rtmp->start_play(info->res->stream_id)) != srs_success) {return srs_error_wrap(err, "rtmp: start play");}if ((err = http_hooks_on_play()) != srs_success) {return srs_error_wrap(err, "rtmp: callback on play");}err = playing(source);http_hooks_on_stop();return err;}case SrsRtmpConnFMLEPublish: {// 开始发布(FMLE)if ((err = rtmp->start_fmle_publish(info->res->stream_id)) != srs_success) {return srs_error_wrap(err, "rtmp: start FMLE publish");}return publishing(source);}case SrsRtmpConnHaivisionPublish: {// 开始发布(Haivision)if ((err = rtmp->start_haivision_publish(info->res->stream_id)) != srs_success) {return srs_error_wrap(err, "rtmp: start HAIVISION publish");}return publishing(source);}case SrsRtmpConnFlashPublish: {// 开始发布(Flash)if ((err = rtmp->start_flash_publish(info->res->stream_id)) != srs_success) {return srs_error_wrap(err, "rtmp: start FLASH publish");}return publishing(source);}default: {// 未知客户端类型return srs_error_new(ERROR_SYSTEM_CLIENT_INVALID, "rtmp: unknown client type=%d", info->type);}}return err;
}
2.2 FMLE Publish
SrsRtmpServer::start_fmle_publish函数处理FMLE类客户端发布RTMP流的完整握手过程。该阶段是处于RTMP复杂握手后,进行协议握手对象创建阶段。这个过程包括三个主要步骤:
- FCPublish阶段:
- 接收客户端的FCPublish命令
- 获取事务ID
- 发送FCPublish响应
- CreateStream阶段:
- 接收客户端的CreateStream命令
- 获取事务ID
- 发送CreateStream响应,分配流ID
- Publish阶段:
- 接收客户端的Publish命令
- 发送两个状态通知:
- onFCPublish: NetStream.Publish.Start
- onStatus: NetStream.Publish.Start
/*** @brief 启动 FMLE 发布流程** 此函数处理来自客户端的 FMLE 发布流程,包括接收 FCPublish 消息、发送 FCPublish 响应、接收 createStream 消息、* 发送 createStream 响应、接收 publish 消息以及发送两个 onStatus 响应(NetStream.Publish.Start)。** @param stream_id 流ID* @return 返回执行结果,如果成功则返回 srs_success,否则返回相应的错误码。*/
srs_error_t SrsRtmpServer::start_fmle_publish(int stream_id)
{// 初始化错误对象为成功srs_error_t err = srs_success;// FCPublish// 定义FCPublish事务ID变量double fc_publish_tid = 0;if (true) {// 创建一个SrsCommonMessage对象指针用于接收RTMP消息SrsCommonMessage* msg = NULL;// 创建一个SrsFMLEStartPacket对象指针用于处理FCPublish命令SrsFMLEStartPacket* pkt = NULL;// 等待并接收FCPublish消息,将消息解析为SrsFMLEStartPacket类型if ((err = expect_message<SrsFMLEStartPacket>(&msg, &pkt)) != srs_success) {return srs_error_wrap(err, "recv FCPublish");}// 自动释放SrsCommonMessage对象,防止内存泄漏SrsAutoFree(SrsCommonMessage, msg);// 自动释放SrsFMLEStartPacket对象,防止内存泄漏SrsAutoFree(SrsFMLEStartPacket, pkt);// 从接收到的数据包中获取事务ID,后续响应需要用到fc_publish_tid = pkt->transaction_id;}// FCPublish response// 处理FCPublish响应if (true) {// 创建一个FCPublish响应包,使用相同的事务IDSrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(fc_publish_tid);// 发送FCPublish响应包并释放内存,stream_id为0表示控制流if ((err = protocol->send_and_free_packet(pkt, 0)) != srs_success) {return srs_error_wrap(err, "send FCPublish response");}}// createStream// 处理createStream命令double create_stream_tid = 0;if (true) {// 创建一个SrsCommonMessage对象指针用于接收RTMP消息SrsCommonMessage* msg = NULL;// 创建一个SrsCreateStreamPacket对象指针用于处理createStream命令SrsCreateStreamPacket* pkt = NULL;// 等待并接收createStream消息,将消息解析为SrsCreateStreamPacket类型if ((err = expect_message<SrsCreateStreamPacket>(&msg, &pkt)) != srs_success) {return srs_error_wrap(err, "recv createStream");}// 自动释放SrsCommonMessage对象,防止内存泄漏SrsAutoFree(SrsCommonMessage, msg);// 自动释放SrsCreateStreamPacket对象,防止内存泄漏SrsAutoFree(SrsCreateStreamPacket, pkt);// 从接收到的数据包中获取事务ID,后续响应需要用到create_stream_tid = pkt->transaction_id;}// createStream response// 处理createStream响应if (true) {// 创建一个createStream响应包,包含事务ID和分配的流IDSrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(create_stream_tid, stream_id);// 发送createStream响应包并释放内存,stream_id为0表示控制流if ((err = protocol->send_and_free_packet(pkt, 0)) != srs_success) {return srs_error_wrap(err, "send createStream response");}}// publish// 处理publish命令if (true) {// 创建一个SrsCommonMessage对象指针用于接收RTMP消息SrsCommonMessage* msg = NULL;// 创建一个SrsPublishPacket对象指针用于处理publish命令SrsPublishPacket* pkt = NULL;// 等待并接收publish消息,将消息解析为SrsPublishPacket类型if ((err = expect_message<SrsPublishPacket>(&msg, &pkt)) != srs_success) {return srs_error_wrap(err, "recv publish");}// 自动释放SrsCommonMessage对象,防止内存泄漏SrsAutoFree(SrsCommonMessage, msg);// 自动释放SrsPublishPacket对象,防止内存泄漏SrsAutoFree(SrsPublishPacket, pkt);}// publish response onFCPublish(NetStream.Publish.Start)// 发送第一个publish响应(onFCPublish)if (true) {// 创建一个状态通知包,用于通知客户端发布已开始SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();// 设置命令名为onFCPublishpkt->command_name = RTMP_AMF0_COMMAND_ON_FC_PUBLISH;// 设置状态码为NetStream.Publish.Startpkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodePublishStart));// 设置状态描述为"Started publishing stream."pkt->data->set(StatusDescription, SrsAmf0Any::str("Started publishing stream."));// 发送onFCPublish状态通知包并释放内存,使用用户指定的流IDif ((err = protocol->send_and_free_packet(pkt, stream_id)) != srs_success) {return srs_error_wrap(err, "send NetStream.Publish.Start");}}// publish response onStatus(NetStream.Publish.Start)// 发送第二个publish响应(onStatus)if (true) {// 创建一个状态通知包,用于通知客户端发布已开始SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();// 设置状态级别为"status"pkt->data->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus));// 设置状态码为NetStream.Publish.Startpkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodePublishStart));// 设置状态描述为"Started publishing stream."pkt->data->set(StatusDescription, SrsAmf0Any::str("Started publishing stream."));// 设置客户端IDpkt->data->set(StatusClientId, SrsAmf0Any::str(RTMP_SIG_CLIENT_ID));// 发送onStatus状态通知包并释放内存,使用用户指定的流IDif ((err = protocol->send_and_free_packet(pkt, stream_id)) != srs_success) {return srs_error_wrap(err, "send NetStream.Publish.Start");}}// 返回处理结果return err;
}
2.3 创建推流数接收对象
SrsRtmpConn::publishing函数是SRS流媒体服务器中处理RTMP推流请求的核心函数。
函数的主要流程分为五个关键步骤:
- 防盗链验证:检查请求的来源是否合法,防止未授权的推流
- HTTP回调通知:通知外部系统有新的推流开始
- 获取发布权限:确保该流可以被发布,避免冲突
- 实际推流处理:创建接收线程,处理推流数据
- 资源释放和清理:结束推流后的清理工作
函数调用链
- SrsRtmpConn::do_publishing - 主发布循环,接收消息并处理
- SrsRtmpConn::handle_publish_message - 首先处理命令消息
- process_publish_message 处理媒体数据
- SrsRtmpConn::process_publish_message - 根据消息类型分发处理逻辑
/*** @brief 开始发布流** 该函数用于处理RTMP流的发布流程。** @param source 流发布源** @return srs_error_t 错误码,成功时返回srs_success*/
srs_error_t SrsRtmpConn::publishing(SrsLiveSource* source)
{srs_error_t err = srs_success;SrsRequest* req = info->req;// 如果配置中启用了Referer校验if (_srs_config->get_refer_enabled(req->vhost)) {// 校验Refererif ((err = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != srs_success) {return srs_error_wrap(err, "rtmp: referer check");}}// 当RTMP连接进行发布时触发HTTP回调if ((err = http_hooks_on_publish()) != srs_success) {return srs_error_wrap(err, "rtmp: callback on publish");}// 获取发布权限if ((err = acquire_publish(source)) == srs_success) {// 使用独立的协程来接收数据// @see: https://github.com/ossrs/srs/issues/237SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source, _srs_context->get_id());err = do_publishing(source, &rtrd);rtrd.stop();}// 无论是否成功获取发布权限,总是释放发布权限// 当在中间过程中获取权限出错时,发布状态已改变但失败,必须清理// @see https://github.com/ossrs/srs/issues/474// @remark 当流繁忙时,不应释放它if (srs_error_code(err) != ERROR_SYSTEM_STREAM_BUSY) {release_publish(source);}// 触发取消发布回调http_hooks_on_unpublish();return err;
}/*** @brief 执行RTMP发布流程的主函数* * 该函数负责处理RTMP发布连接的整个生命周期,包括:* 1. 初始化统计信息和日志打印* 2. 启动独立的接收线程* 3. 设置发布超时参数和套接字选项* 4. 主循环处理消息接收和超时检测* 5. 周期性更新视频帧率和带宽统计* 6. 打印发布状态日志* * @param source 直播源对象指针* @param rtrd 发布接收线程对象指针* @return srs_error_t 返回错误码,srs_success表示成功* * @note 函数包含首次包和正常包两种超时机制* @see 相关issue #441(首次包超时)和#851(视频帧统计)*/
srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThread* rtrd)
{srs_error_t err = srs_success;// 获取请求对象SrsRequest* req = info->req;// 创建简明打印对象,用于周期性日志输出// 当源被发现时更新统计信息// 启动独立的接收协程if ((err = rtrd->start()) != srs_success) {return srs_error_wrap(err, "rtmp: receive thread");}// 初始化发布超时参数// 设置套接字选项// 打印发布相关的配置信息// 消息计数器,记录已处理的消息数量// 视频帧计数器,记录已处理的视频帧数量// 主循环,处理发布流程while (true) {// 检查线程是否需要退出if ((err = trd->pull()) != srs_success) {return srs_error_wrap(err, "rtmp: thread quit");}// 记录时间流逝,用于周期性日志打印pprint->elapse();// 根据消息数量设置不同的等待超时时间if (nb_msgs == 0) {// 当没有收到任何消息时,使用较长的超时时间rtrd->wait(publish_1stpkt_timeout);} else {// 已收到消息后,使用正常的超时时间rtrd->wait(publish_normal_timeout);}// 检查接收线程是否有错误if ((err = rtrd->error_code()) != srs_success) {return srs_error_wrap(err, "rtmp: receive thread");}// 检查是否超时(没有收到新消息)if (rtrd->nb_msgs() <= nb_msgs) {}// 更新已处理的消息数量nb_msgs = rtrd->nb_msgs();// 更新视频帧率统计// 更新已处理的视频帧数量// 周期性打印发布流的统计信息if (pprint->can_print()) {// 采样带宽数据kbps->sample();bool mr = _srs_config->get_mr_enabled(req->vhost);srs_utime_t mr_sleep = _srs_config->get_mr_sleep(req->vhost);srs_trace("<- " SRS_CONSTS_LOG_CLIENT_PUBLISH " time=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mr=%d/%d, p1stpt=%d, pnt=%d",(int)pprint->age(), kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), mr, srsu2msi(mr_sleep),srsu2msi(publish_1stpkt_timeout), srsu2msi(publish_normal_timeout));}}return err;
}/*** @brief 启动接收线程* * @return 错误码*/
srs_error_t SrsPublishRecvThread::start()
{srs_error_t err = srs_success;// 启动线程 SrsRecvThread::start()if ((err = trd.start()) != srs_success) {err = srs_error_wrap(err, "publish recv thread");}// 设置上下文IDncid = cid = trd.cid();return err;
}/*** 启动接收线程** 创建并启动一个新的协程用于接收数据,设置协程栈大小为256KB以避免调用第三方API时崩溃。** @return 返回执行结果,成功返回srs_success,失败返回错误信息* @remark 线程启动失败时会自动释放之前创建的线程资源*/
srs_error_t SrsRecvThread::start()
{srs_error_t err = srs_success;srs_freep(trd);trd = new SrsSTCoroutine("recv", this, _parent_cid);//change stack size to 256K, fix crash when call some 3rd-part api.((SrsSTCoroutine*)trd)->set_stack_size(1 << 18);if ((err = trd->start()) != srs_success) {return srs_error_wrap(err, "recv thread");}return err;
}
2.3.1 接收数据入口
SrsRecvThread::do_cycle 函数是SRS流媒体服务器中负责接收RTMP消息的核心循环处理函数。该函数在接收线程中持续运行,不断地从网络中获取RTMP消息并交给消息处理器(pumper)处理。其主要职责是:
- 持续检查线程状态,及时响应停止请求
- 监控消息泵状态,适时处理中断情况
- 接收RTMP消息并转发给对应的处理器
- 统一处理错误情况,确保资源正确释放
/*** RTMP接收线程的主循环** 设置无超时接收模式提升性能,执行接收循环后恢复超时设置** @return 返回错误码,srs_success表示成功* @remark 使用独立线程接收可提升约33%性能* @see do_cycle() 实际执行接收循环的内部函数*/
srs_error_t SrsRecvThread::cycle()
{srs_error_t err = srs_success;// the multiple messages writev improve performance large,// but the timeout recv will cause 33% sys call performance,// to use isolate thread to recv, can improve about 33% performance.rtmp->set_recv_timeout(SRS_UTIME_NO_TIMEOUT);pumper->on_start();if ((err = do_cycle()) != srs_success) {err = srs_error_wrap(err, "recv thread");}// reset the timeout to pulse mode.rtmp->set_recv_timeout(timeout);pumper->on_stop();return err;
}/*** @brief 接收线程的主循环处理函数** 该函数实现了接收线程的循环处理逻辑,主要包括:* 1. 从传输层拉取数据* 2. 处理泵中断情况* 3. 接收RTMP消息并交给泵处理* 4. 错误处理及中断通知** @return srs_error_t 返回操作结果,成功返回srs_success,失败返回错误码* @note 当发生错误时会中断传输线程和消息泵*/
srs_error_t SrsRecvThread::do_cycle()
{srs_error_t err = srs_success;while (true) {if ((err = trd->pull()) != srs_success) {// 拉取线程状态,若有错误则返回return srs_error_wrap(err, "recv thread");}if (pumper->interrupted()) {// 如果pumper被中断,等待一段时间后重试srs_usleep(timeout);// 微秒级等待continue;}// 定义接收消息的指针SrsCommonMessage* msg = NULL;// Process the received message.if ((err = rtmp->recv_message(&msg)) == srs_success) {err = pumper->consume(msg); // SrsPublishRecvThread::consume 将消息交给pumper处理}if (err != srs_success) {// Interrupt the receive thread for any error.trd->interrupt(); // 中断接收线程// Notify the pumper to quit for error.pumper->interrupt(err);return srs_error_wrap(err, "recv thread");}}return err;
}
srs_error_t SrsRtmpServer::recv_message(SrsCommonMessage** pmsg)
{return protocol->recv_message(pmsg);
}
2.3.2 接收核心逻辑
SrsProtocol::recv_message 函数是SRS服务器中接收RTMP消息的核心函数,它负责从底层网络接收完整的RTMP消息并进行处理。
/*** @brief 接收RTMP消息* * 该函数负责从底层网络接收完整的RTMP消息,并进行必要的处理。* 主要流程包括:* 1. 循环接收消息,直到获取到有效消息或发生错误* 2. 过滤空消息(大小为0的消息)* 3. 对接收到的消息执行回调处理* 4. 将有效消息通过指针参数返回给调用者* * @param pmsg 输出参数,用于返回接收到的RTMP消息指针* @return srs_error_t 成功返回srs_success,失败返回对应的错误码* * @note 调用者需要负责释放返回的消息对象* @see recv_interlaced_message, on_recv_message*/
srs_error_t SrsProtocol::recv_message(SrsCommonMessage **pmsg)
{// 初始化输出参数为NULL*pmsg = NULL;// 定义错误码变量,初始化为成功状态srs_error_t err = srs_success;// 循环接收消息,直到获取到有效消息或发生错误while (true) {// 定义临时消息变量SrsCommonMessage* msg = NULL;// 调用recv_interlaced_message函数接收一个完整的RTMP消息if ((err = recv_interlaced_message(&msg)) != srs_success) {// 如果接收失败,释放消息并返回错误srs_freep(msg);return srs_error_wrap(err, "recv interlaced message");}// 如果消息为NULL,继续循环尝试接收下一个消息if (!msg) {continue;}// 检查消息有效性:如果消息大小为0或负载长度为0,则忽略该消息if (msg->size <= 0 || msg->header.payload_length <= 0) {// 记录日志,输出被忽略的空消息的详细信息// 释放空消息并继续循环srs_freep(msg);continue;}// 调用on_recv_message函数处理接收到的消息(如响应确认消息等)if ((err = on_recv_message(msg)) != srs_success) {// 如果处理失败,释放消息并返回错误srs_freep(msg);return srs_error_wrap(err, "on recv message");}// 将接收到的有效消息赋值给输出参数*pmsg = msg;// 跳出循环,结束接收过程break;}return err;
}
2.3.2.1 接收一个完整RTMP消息块
SrsProtocol::recv_interlaced_message 该函数用于从网络接收交织的RTMP消息,处理分块流的基本头、消息头和消息负载。
/*** 接收完整的RTMP消息** 该函数用于从网络接收交织的RTMP消息,处理分块流的基本头、消息头和消息负载。* 首先读取基本头获取格式和通道ID,然后根据通道ID获取或创建对应的分块流缓存,* 接着读取消息头,最后读取消息负载并返回完整的RTMP消息。** @param pmsg 输出参数,用于返回接收到的RTMP消息指针* @return 返回错误码,成功时返回srs_success** @remark 如果消息不完整会继续尝试读取下一个分块* @see https://github.com/ossrs/srs/issues/249 关于分块流缓存的实现细节*/
srs_error_t SrsProtocol::recv_interlaced_message(SrsCommonMessage **pmsg)
{// 初始化错误码为成功状态srs_error_t err = srs_success;// 读取分块流的基本头部,获取格式和通道IDchar fmt = 0;int cid = 0;if ((err = read_basic_header(fmt, cid)) != srs_success) {// 如果读取基本头部失败,包装错误并返回return srs_error_wrap(err, "read basic header");}// 确保通道ID不为负数srs_assert(cid >= 0);// 获取缓存的分块流对象指针SrsChunkStream* chunk = NULL;// 使用分块流缓存获取分块信息,提高性能if (cid < SRS_PERF_CHUNK_STREAM_CACHE) {// 如果通道ID在缓存范围内,直接使用缓存chunk = cs_cache[cid];} else {// 缓存未命中,使用映射表if (chunk_streams.find(cid) == chunk_streams.end()) {// 如果映射表中不存在该通道ID,创建新的分块流对象chunk = chunk_streams[cid] = new SrsChunkStream(cid);// 设置分块流的首选通道ID,将复制到接收到的消息中chunk->header.perfer_cid = cid;} else {// 映射表中存在该通道ID,直接获取chunk = chunk_streams[cid];}}// 读取分块流的消息头部if ((err = read_message_header(chunk, fmt)) != srs_success) {// 如果读取消息头部失败,包装错误并返回return srs_error_wrap(err, "read message header");}// 从分块流中读取消息负载SrsCommonMessage* msg = NULL;if ((err = read_message_payload(chunk, &msg)) != srs_success) {// 如果读取消息负载失败,包装错误并返回return srs_error_wrap(err, "read message payload");}// 如果没有获取到完整的RTMP消息,尝试下一个分块if (!msg) {// 返回当前状态,表示需要继续读取return err;}// 设置输出参数为接收到的消息*pmsg = msg;return err;
}srs_error_t SrsProtocol::read_basic_header(char& fmt, int& cid)
{srs_error_t err = srs_success;if ((err = in_buffer->grow(skt, 1)) != srs_success) {return srs_error_wrap(err, "basic header requires 1 bytes");}fmt = in_buffer->read_1byte();cid = fmt & 0x3f;fmt = (fmt >> 6) & 0x03;// 2-63, 1B chunk headerif (cid > 1) {return err;// 64-319, 2B chunk header} else if (cid == 0) {if ((err = in_buffer->grow(skt, 1)) != srs_success) {return srs_error_wrap(err, "basic header requires 2 bytes");}cid = 64;cid += (uint8_t)in_buffer->read_1byte();// 64-65599, 3B chunk header} else {srs_assert(cid == 1);if ((err = in_buffer->grow(skt, 2)) != srs_success) {return srs_error_wrap(err, "basic header requires 3 bytes");}cid = 64;cid += (uint8_t)in_buffer->read_1byte();cid += ((uint8_t)in_buffer->read_1byte()) * 256;}return err;
}
2.3.2.2 解析RTMP.msg.chunk头部
SrsProtocol::read_message_header 需要明白message.chunk.type四种类型封包结构。可学习顶置往期博客,结合理论配合代码一起看。详细请看代码,函数执行流程:
1. 检查消息状态
- 判断是否是消息的第一个chunk (
is_first_chunk_of_msg
)- 验证新chunk流必须使用fmt=0格式
2. 创建消息对象
- 如果chunk没有关联消息,创建新的
SrsCommonMessage
对象3. 读取消息头
- 根据fmt类型确定需要读取的字节数
- 从输入缓冲区读取相应长度的数据
4. 解析消息头字段
- 对于fmt=0/1/2:解析3字节的时间戳增量
- 对于fmt=0/1:解析3字节的负载长度和1字节的消息类型
- 对于fmt=0:解析4字节的流ID
5. 处理扩展时间戳
- 如果时间戳增量值为0xffffff,表示使用扩展时间戳
- 读取额外的4字节作为实际时间戳值
- 处理时间戳回绕问题(31位时间戳)
6. 更新消息信息
- 将解析的头部信息复制到chunk的消息对象
- 增加消息计数,允许后续接收fmt=1/2/3格式的消息
/*** 解析RTMP chunk消息头** 该函数负责解析RTMP协议中的chunk消息头,处理不同类型(fmt=0/1/2/3)的消息头格式,* 包括时间戳、负载长度、消息类型和流ID等字段的解析。* *根据往期CSDN博客解析:* <Message Header(消息的头信息)> fmt->即是博客中的msg.chunk.type* parse the message header.* 3bytes: timestamp delta, fmt=0,1,2* 3bytes: payload length, fmt=0,1* 1bytes: message type, fmt=0,1* 4bytes: stream id, fmt=0* where: * fmt=0, 0x0X* fmt=1, 0x4X* fmt=2, 0x8X* fmt=3, 0xCX* * 特殊处理:* 1. 对于第一个数据包(fresh chunk),即使fmt=3也需要更新时间戳* 2. 处理扩展时间戳(extended timestamp)逻辑* 3. 验证消息头格式的合法性** @param chunk 要处理的chunk对象* @param fmt chunk消息头格式类型(0/1/2/3)* @param in_buffer 输入缓冲区* @param skt 套接字** @return 返回错误对象,成功时为srs_success** @remark 遵循RTMP协议规范,同时兼容Adobe产品和librtmp的特殊实现* @see http://blog.csdn.net/win_lin/article/details/13363699*/
srs_error_t SrsProtocol::read_message_header(SrsChunkStream *chunk, char fmt)
{// 定义错误变量,初始化为成功状态srs_error_t err = srs_success;/*// 对于第一个数据包,我们不应该对fmt做任何假设// (当是第一个数据包时,chunk->msg为NULL)// fmt可能是0/1/2/3,FMLE会为某些音频包发送0xC4// 前一个数据包是:// 04 // fmt=0, cid=4// 00 00 1a // timestamp=26// 00 00 9d // payload_length=157// 08 // message_type=8(音频)// 01 00 00 00 // stream_id=1// 当前数据包可能是:// c4 // fmt=3, cid=4// 这是正常的,因为数据包是音频,时间戳增量是26// 当前数据包必须解析为:// fmt=0, cid=4// timestamp=26+26=52// payload_length=157// message_type=8(音频)// stream_id=1// 所以即使是fmt=3的第一个数据包,我们也必须更新时间戳// 新数据包用于更新时间戳,即使是fmt=3的第一个数据包// 新数据包始终表示该chunk是消息的第一个部分*/bool is_first_chunk_of_msg = !chunk->msg;// 但是,我们可以确保当一个chunk流是全新的时,// fmt必须是0,表示一个新的流if (chunk->msg_count == 0 && fmt != RTMP_FMT_TYPE0) {/*// 对于librtmp,如果是ping消息,它会发送一个fmt=1的新流// 0x42 其中:fmt=1, cid=2, cid -> msg.baseheader.csid// 0x00 0x00 0x00 其中:timestamp=0// 0x00 0x00 0x06 其中:payload_length=6// 0x04 其中:message_type=4(协议控制用户控制消息)// 0x00 0x06 其中:(fmt=1无msg.id)事件 Ping(0x06)// 0x00 0x00 0x0d 0x0f 其中:事件数据 4字节ping时间戳*/if (fmt == RTMP_FMT_TYPE1) {// 警告:新chunk以fmt=1开始srs_warn("fresh chunk starts with fmt=1");} else {// must be a RTMP protocol level error.// 必须是RTMP协议级别的错误return srs_error_new(ERROR_RTMP_CHUNK_START, "fresh chunk expect fmt=0, actual=%d, cid=%d", fmt, chunk->cid);}}// 当存在缓存消息时,表示已获取部分消息,// fmt不能是type0,因为type0表示新消息if (chunk->msg && fmt == RTMP_FMT_TYPE0) {// 对于已存在的chunk,fmt不应该是0return srs_error_new(ERROR_RTMP_CHUNK_START, "for existed chunk, fmt should not be 0");}// create msg when new chunk stream start// 当新的chunk流开始时创建消息if (!chunk->msg) {// 创建一个新的通用消息对象chunk->msg = new SrsCommonMessage();}// read message header from socket to buffer.// 从套接字读取消息头到缓冲区static char mh_sizes[] = {11, 7, 3, 0};// 根据fmt类型确定消息头的大小int mh_size = mh_sizes[(int)fmt];//从消息通道中读取mh_size大小的数据到in_buffer,即msgheaderif (mh_size > 0 && (err = in_buffer->grow(skt, mh_size)) != srs_success) {// 如果读取消息头失败,返回错误return srs_error_wrap(err, "read %d bytes message header", mh_size);}/*** parse the message header.* 3bytes: timestamp delta, fmt=0,1,2* 3bytes: payload length, fmt=0,1* 1bytes: message type, fmt=0,1* 4bytes: stream id, fmt=0* where:* fmt=0, 0x0X* fmt=1, 0x4X* fmt=2, 0x8X* fmt=3, 0xCX*/// see also: ngx_rtmp_recv// 参见: ngx_rtmp_recvif (fmt <= RTMP_FMT_TYPE2) {// 从in_buffer中读取mh_size大小的数据到pchar* p = in_buffer->read_slice(mh_size);// 解析时间戳增量(3字节)char* pp = (char*)&chunk->header.timestamp_delta;pp[2] = *p++;pp[1] = *p++;pp[0] = *p++;pp[3] = 0;/*// fmt: 0// 时间戳: 3字节// 如果时间戳大于或等于16777215(十六进制0x00ffffff),// 该值必须为16777215,并且必须存在'扩展时间戳头'。// 否则,该值应为完整的时间戳。//// fmt: 1或2// 时间戳增量: 3字节// 如果增量大于或等于16777215(十六进制0x00ffffff),// 该值必须为16777215,并且必须存在'扩展时间戳头'。// 否则,该值应为完整的增量。*/// 检查是否需要扩展时间戳chunk->extended_timestamp = (chunk->header.timestamp_delta >= RTMP_EXTENDED_TIMESTAMP);if (!chunk->extended_timestamp) {/*// 扩展时间戳: 0或4字节// 当普通时间戳设置为0xffffff时,必须发送此字段,// 如果普通时间戳设置为其他值,则不能发送此字段。// 因此,对于小于0xffffff的值,应使用普通时间戳字段,// 此时扩展时间戳不能存在。// 对于大于或等于0xffffff的值,不能使用普通时间戳字段,// 必须将其设置为0xffffff,并且必须发送扩展时间戳。*/if (fmt == RTMP_FMT_TYPE0) {// 6.1.2.1. Type 0// For a type-0 chunk, the absolute timestamp of the message is sent// here.// 6.1.2.1. 类型0// 对于类型0的chunk,此处发送消息的绝对时间戳chunk->header.timestamp = chunk->header.timestamp_delta;} else {// 6.1.2.2. Type 1// 6.1.2.3. Type 2// For a type-1 or type-2 chunk, the difference between the previous// chunk's timestamp and the current chunk's timestamp is sent here.// 6.1.2.2. 类型1// 6.1.2.3. 类型2// 对于类型1或类型2的chunk,此处发送前一个chunk的时间戳// 与当前chunk时间戳之间的差值chunk->header.timestamp += chunk->header.timestamp_delta;}}if (fmt <= RTMP_FMT_TYPE1) {// 解析负载长度(3字节)int32_t payload_length = 0;pp = (char*)&payload_length;pp[2] = *p++;pp[1] = *p++;pp[0] = *p++;pp[3] = 0;/*// 对于一个消息,如果msg存在于缓存中,大小不能改变// 始终使用实际msg大小进行比较,因为缓存的负载长度可能会改变// 对于fmt类型1(stream_id未改变),用户可以更改负载长度// (这在连续的chunks中是不允许的)*/if (!is_first_chunk_of_msg && chunk->header.payload_length != payload_length) {// 如果不是消息的第一个chunk,但负载长度发生了变化,返回错误return srs_error_new(ERROR_RTMP_PACKET_SIZE, "msg in chunk cache, size=%d cannot change to %d", chunk->header.payload_length, payload_length);}// 设置负载长度和消息类型chunk->header.payload_length = payload_length;chunk->header.message_type = *p++;if (fmt == RTMP_FMT_TYPE0) {// 如果是类型0,还需要解析流ID(4字节)pp = (char*)&chunk->header.stream_id;pp[0] = *p++;pp[1] = *p++;pp[2] = *p++;pp[3] = *p++;}}} else {// update the timestamp even fmt=3 for first chunk packet// 即使是fmt=3的第一个chunk数据包,也要更新时间戳if (is_first_chunk_of_msg && !chunk->extended_timestamp) {// 更新时间戳chunk->header.timestamp += chunk->header.timestamp_delta;}}// read extended-timestamp// 读取扩展时间戳if (chunk->extended_timestamp) {// 增加消息头大小并读取额外的4字节扩展时间戳mh_size += 4;if ((err = in_buffer->grow(skt, 4)) != srs_success) {// 如果读取扩展时间戳失败,返回错误return srs_error_wrap(err, "read 4 bytes ext timestamp");}// grow()后指向切片的指针可能无效// 重置p以获取4字节切片char* p = in_buffer->read_slice(4);// 解析扩展时间戳(4字节)uint32_t timestamp = 0x00;char* pp = (char*)×tamp;pp[3] = *p++;pp[2] = *p++;pp[1] = *p++;pp[0] = *p++;// always use 31bits timestamp, for some server may use 32bits extended timestamp.// 始终使用31位时间戳,因为某些服务器可能使用32位扩展时间戳timestamp &= 0x7fffffff;// RTMP规范和ffmpeg/librtmp是错误的,// 但是,adobe更改了规范,所以flash/FMLE/FMS总是正确的。// 默认为正确以支持flash/FMLE/FMS。//// ffmpeg/librtmp可能不发送此字段,需要检测值。// @参见: http://blog.csdn.net/win_lin/article/details/13363699// 与chunk时间戳比较,该时间戳由chunk消息头类型0,1或2设置。//// @备注,nginx在序列头中发送扩展时间戳,// 在继续C1 chunks中发送时间戳增量,因此与ffmpeg兼容,// 也就是说,nginx-rtmp中没有继续chunks和扩展时间戳。//// @备注,srs总是发送扩展时间戳,以保持简单,// 并与adobe产品兼容。// 获取当前chunk的时间戳uint32_t chunk_timestamp = (uint32_t)chunk->header.timestamp;/*// 如果chunk_timestamp<=0,则chunk之前的数据包没有扩展时间戳,// 始终使用扩展时间戳。//// 关于is_first_chunk_of_msg。// @备注,对于消息的第一个chunk,始终使用扩展时间戳。*/if (!is_first_chunk_of_msg && chunk_timestamp > 0 && chunk_timestamp != timestamp) {// 如果不是消息的第一个chunk,且时间戳不匹配,回退4字节mh_size -= 4;in_buffer->skip(-4);} else {// 否则使用扩展时间戳chunk->header.timestamp = timestamp;}}/*// 扩展时间戳必须是无符号整数,// 24位时间戳: 0xffffff = 16777215ms = 16777.215s = 4.66h// 32位时间戳: 0xffffffff = 4294967295ms = 4294967.295s = 1193.046h = 49.71d// 因为rtmp协议说32位时间戳大约是"50天":// 3. 字节顺序、对齐和时间格式// 因为时间戳通常只有32位长,它们将在// 不到50天后翻转。//// 但是,它的示例说时间戳是31位:// 例如,应用程序可以假设所有// 相邻时间戳在彼此2^31毫秒内,所以// 10000在4000000000之后,而3000000000在// 4000000000之前。// flv规范说时间戳是31位:// 时间戳字段的扩展形成SI32值。这个// 字段表示高8位,而前面的// 时间戳字段表示时间的低24位// 毫秒。// 总之,31位时间戳是可以的。// 将扩展时间戳转换为31位。*/// 将时间戳限制在31位范围内chunk->header.timestamp &= 0x7fffffff;// valid message, the payload_length is 24bits,// so it should never be negative.// 有效消息,payload_length是24位,// 所以它永远不应该是负数srs_assert(chunk->header.payload_length >= 0);// copy header to msg// 将头部复制到消息对象chunk->msg->header = chunk->header;// increase the msg count, the chunk stream can accept fmt=1/2/3 message now.// 增加消息计数,现在chunk流可以接受fmt=1/2/3的消息chunk->msg_count++;return err;
}
2.3.2.3 解析RTMP.msg.payload
SrsProtocol::read_message_payload 会把前面读取的chunk转成SrsCommonMessage msg的结构,并返回给上层使用。
/*** 读取RTMP消息的有效载荷数据** @param chunk 输入的分块流,包含消息头和部分数据* @param pmsg 输出参数,用于返回完整的消息对象* @return 成功返回srs_success,失败返回错误码** 该函数负责:* 1. 处理空消息情况* 2. 计算当前需要读取的载荷大小* 3. 初始化消息载荷缓冲区(若未初始化)* 4. 从网络读取数据并填充到消息缓冲区* 5. 当读取到完整消息时,通过pmsg返回消息对象** 注意:* - 函数会修改chunk->msg的所有权* - 当返回完整消息时,会将chunk->msg置为NULL*/
srs_error_t SrsProtocol::read_message_payload(SrsChunkStream *chunk, SrsCommonMessage **pmsg)
{srs_error_t err = srs_success; // 初始化错误对象为成功状态// empty messageif (chunk->header.payload_length <= 0) { // 检查消息负载长度是否小于等于0// 记录空RTMP消息的详细信息*pmsg = chunk->msg; // 将当前块的消息指针赋值给输出参数chunk->msg = NULL; // 将当前块的消息指针置为NULL,避免重复释放内存return err; // 返回成功状态}srs_assert(chunk->header.payload_length > 0); // 断言消息负载长度必须大于0int payload_size = chunk->header.payload_length - chunk->msg->size; // 计算需要读取的负载大小(总长度减去已读取的大小)payload_size = srs_min(payload_size, in_chunk_size); // 确保不超过设定的块大小限制// create msg payload if not initializedif (!chunk->msg->payload) { // 如果消息负载还未初始化chunk->msg->create_payload(chunk->header.payload_length); // 创建指定大小的负载空间}// read payload to bufferif ((err = in_buffer->grow(skt, payload_size)) != srs_success) { // 确保输入缓冲区有足够空间读取负载数据return srs_error_wrap(err, "read %d bytes payload", payload_size); // 如果读取失败,包装错误并返回}// 从输入缓冲区复制数据到消息负载memcpy(chunk->msg->payload + chunk->msg->size, in_buffer->read_slice(payload_size), payload_size); chunk->msg->size += payload_size; // 更新已读取的消息大小// got entire RTMP message?if (chunk->header.payload_length == chunk->msg->size) { // 检查是否已读取完整个消息*pmsg = chunk->msg; // 如果是,将消息指针赋值给输出参数chunk->msg = NULL; // 将当前块的消息指针置为NULL,避免重复释放内存return err; // 返回成功状态}return err; // 如果消息未完全读取,返回成功状态,等待下一次调用继续读取
}
2.4 处理RTMP消息
2.4.1 处理不同类型消息
SrsProtocol::on_recv_message函数负责处理接收到的RTMP消息,并解析出一些属性字段设置。该函数主要实现:
- 首先尝试发送确认应答消息
- 根据消息类型进行不同的处理:
- 对SetChunkSize、UserControlMessage和WindowAcknowledgementSize消息进行解码和特殊处理
- 对VideoMessage和AudioMessage消息只打印调试信息
- 其他类型消息直接返回
- 对于解码后的消息,根据具体类型执行不同的操作:
- WindowAcknowledgementSize: 更新本地的确认窗口大小
- SetChunkSize: 验证并更新本地的输入块大小
- UserControlMessage: 处理设置缓冲区长度事件和Ping请求事件
SrsProtocol::decode_message函数请参考后续2.5.1 解析消费数据
/*** 处理接收到的RTMP消息** @param msg 接收到的RTMP消息指针,不能为NULL* @return 返回错误码,成功返回srs_success** @remark 主要处理以下消息类型:* - RTMP_MSG_SetChunkSize: 设置块大小* - RTMP_MSG_UserControlMessage: 用户控制消息(如设置缓冲区长度、ping请求等)* - RTMP_MSG_WindowAcknowledgementSize: 设置窗口确认大小* - RTMP_MSG_VideoMessage/RTMP_MSG_AudioMessage: 打印调试信息** @note 会自动释放解码后的packet对象* @warning 对于无效的块大小会返回错误*/
srs_error_t SrsProtocol::on_recv_message(SrsCommonMessage *msg)
{srs_error_t err = srs_success; // 初始化错误对象为成功状态srs_assert(msg != NULL); // 断言确保消息对象不为空if ((err = response_acknowledgement_message()) != srs_success) { // 尝试发送确认应答消息return srs_error_wrap(err, "response ack"); // 如果发送失败,包装错误并返回}SrsPacket* packet = NULL; // 定义数据包指针,初始化为空switch (msg->header.message_type) { // 根据消息类型进行不同处理case RTMP_MSG_SetChunkSize: // 设置块大小消息case RTMP_MSG_UserControlMessage: // 用户控制消息case RTMP_MSG_WindowAcknowledgementSize: // 窗口确认大小消息if ((err = decode_message(msg, &packet)) != srs_success) { // 解码这些类型的消息return srs_error_wrap(err, "decode message"); // 如果解码失败,包装错误并返回}break;case RTMP_MSG_VideoMessage: // 视频消息case RTMP_MSG_AudioMessage: // 音频消息print_debug_info(); // 打印调试信息default: // 其他类型消息return err; // 直接返回成功状态,不做特殊处理}srs_assert(packet); // 断言确保数据包已成功解码SrsAutoFree(SrsPacket, packet); // 使用自动释放对象管理数据包内存switch (msg->header.message_type) { // 根据消息类型进行具体处理case RTMP_MSG_WindowAcknowledgementSize: { // 窗口确认大小消息处理SrsSetWindowAckSizePacket* pkt = dynamic_cast<SrsSetWindowAckSizePacket*>(packet); // 类型转换为窗口确认大小包srs_assert(pkt != NULL); // 断言确保转换成功if (pkt->ackowledgement_window_size > 0) { // 如果确认窗口大小大于0in_ack_size.window = (uint32_t)pkt->ackowledgement_window_size; // 设置输入确认窗口大小}break;}case RTMP_MSG_SetChunkSize: { // 设置块大小消息处理SrsSetChunkSizePacket* pkt = dynamic_cast<SrsSetChunkSizePacket*>(packet); // 类型转换为设置块大小包srs_assert(pkt != NULL); // 断言确保转换成功// 检查块大小是否在有效范围内in_chunk_size = pkt->chunk_size; // 设置输入块大小break;}case RTMP_MSG_UserControlMessage: { // 用户控制消息处理SrsUserControlPacket* pkt = dynamic_cast<SrsUserControlPacket*>(packet); // 类型转换为用户控制包srs_assert(pkt != NULL); // 断言确保转换成功if (pkt->event_type == SrcPCUCSetBufferLength) { // 如果是设置缓冲区长度事件in_buffer_length = pkt->extra_data; // 设置输入缓冲区长度}if (pkt->event_type == SrcPCUCPingRequest) { // 如果是Ping请求事件if ((err = response_ping_message(pkt->event_data)) != srs_success) { // 响应Ping消息return srs_error_wrap(err, "response ping"); // 如果响应失败,包装错误并返回}}break;}default: // 其他类型消息break; // 不做特殊处理}return err; // 返回处理结果
}
2.5 消费数据
SrsRtmpConn::publishing函数中解析rtmp消息并设置本地属性后就是交给消费者处理!
/*** @brief 开始发布流** 该函数用于处理RTMP流的发布流程。** @param source 流发布源** @return srs_error_t 错误码,成功时返回srs_success*/
srs_error_t SrsRtmpConn::publishing(SrsLiveSource* source)
{// 获取发布权限if ((err = acquire_publish(source)) == srs_success) {// 使用独立的协程来接收数据// @see: https://github.com/ossrs/srs/issues/237SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source, _srs_context->get_id());err = do_publishing(source, &rtrd);rtrd.stop();}return err;
}/*** @brief 消费一条消息* * 实现ISrsMessagePumper接口,处理接收到的消息。* 如果上下文ID发生变化,则更新它。* * @param msg 待处理的消息* @return 错误码*/
srs_error_t SrsPublishRecvThread::consume(SrsCommonMessage* msg)
{srs_error_t err = srs_success;// 当上下文ID变化时,更新它if (ncid.compare(cid)) {_srs_context->set_id(ncid);cid = ncid;}// 增加消息计数_nb_msgs++;// 如果是视频消息,增加视频帧计数if (msg->header.is_video()) {video_frames++;}// RTMP连接将处理这个消息err = _conn->handle_publish_message(_source, msg);// 必须释放消息,源会在需要时复制它srs_freep(msg);if (err != srs_success) {return srs_error_wrap(err, "handle publish message");}// 让出给其他协程// @see https://github.com/ossrs/srs/issues/2194#issuecomment-777463768if (++nn_msgs_for_yield_ >= 15) {nn_msgs_for_yield_ = 0;srs_thread_yield();}return err;
}/*** @brief 处理发布消息* * 处理RTMP发布过程中的各类消息,如命令消息和媒体数据* * @param source 流媒体源对象* @param msg 待处理的RTMP消息* @return srs_error_t 错误码,成功返回srs_success*/
srs_error_t SrsRtmpConn::handle_publish_message(SrsLiveSource* source, SrsCommonMessage* msg)
{srs_error_t err = srs_success;// 处理发布事件,检查是否是AMF0或AMF3命令消息if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {SrsPacket* pkt = NULL;// 解码消息if ((err = rtmp->decode_message(msg, &pkt)) != srs_success) {return srs_error_wrap(err, "rtmp: decode message");}SrsAutoFree(SrsPacket, pkt);// 对于Flash发布,任何数据包都视为取消发布if (info->type == SrsRtmpConnFlashPublish) {srs_trace("flash flash publish finished.");return srs_error_new(ERROR_CONTROL_REPUBLISH, "rtmp: republish");}// 对于FMLE,丢弃除FMLE启动包以外的所有包if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {SrsFMLEStartPacket* unpublish = dynamic_cast<SrsFMLEStartPacket*>(pkt);if ((err = rtmp->fmle_unpublish(info->res->stream_id, unpublish->transaction_id)) != srs_success) {return srs_error_wrap(err, "rtmp: republish");}return srs_error_new(ERROR_CONTROL_REPUBLISH, "rtmp: republish");}srs_trace("fmle ignore AMF0/AMF3 command message.");return err;}// 处理视频、音频、数据消息if ((err = process_publish_message(source, msg)) != srs_success) {return srs_error_wrap(err, "rtmp: consume message");}return err;
}
2.5.1 解析消费者数据
SrsProtocol::decode_message 函数是RTMP协议栈中的核心函数之一,用于将原始的RTMP消息解码为特定类型的数据包。
根据消息的类型和内容创建不同种类的数据包:
- AMF命令消息处理:
- 处理AMF0/AMF3格式的命令消息
- 读取命令名并根据命令类型创建相应的数据包
- 对于响应类消息,查找相应的请求命令名并创建对应的响应包
- 支持多种命令类型,如连接、创建流、播放、发布等
- 控制消息处理:
- 处理用户控制消息
- 处理窗口确认大小消息
- 处理确认消息
- 处理设置块大小消息
- 未知消息处理:
- 对于不能识别的消息类型,记录日志并忽略
- 对于常见的控制消息如带宽设置和确认消息,不记录日志以避免日志过多
/*** 解码RTMP消息,对应的数据包** @param msg 输入消息,包含消息头和有效载荷* @param ppacket 输出参数,用于返回解码后的数据包指针* @return 返回错误码,srs_success表示成功** @remark 函数会校验输入参数有效性,解码失败时会自动释放已创建的数据包* @see do_decode_message() 实际执行解码操作的内部函数*/
srs_error_t SrsProtocol::decode_message(SrsCommonMessage *msg, SrsPacket **ppacket)
{// 初始化输出参数为空指针*ppacket = NULL;// 初始化错误对象为成功状态srs_error_t err = srs_success;// 确保输入消息不为空srs_assert(msg != NULL);// 确保消息的有效载荷不为空srs_assert(msg->payload != NULL);// 确保消息大小大于0srs_assert(msg->size > 0);// 创建一个缓冲区对象,用于解析消息的有效载荷SrsBuffer stream(msg->payload, msg->size);// 创建一个数据包指针,用于存储解析后的结果SrsPacket* packet = NULL;// 调用内部解码函数,根据消息头和缓冲区内容创建对应类型的数据包if ((err = do_decode_message(msg->header, &stream, &packet)) != srs_success) {// 解码失败时释放已创建的数据包,避免内存泄漏srs_freep(packet);return srs_error_wrap(err, "decode message");}// 解码成功,将创建的数据包赋值给输出参数*ppacket = packet;// 返回处理结果return err;
}/*** 解码RTMP消息并创建对应的数据包对象** 该函数根据消息头部的类型信息,解析RTMP消息流并创建相应的数据包对象。* 支持处理多种类型的RTMP消息,包括:* - AMF0/AMF3命令消息(connect, createStream, play等)* - 结果/错误响应消息* - 用户控制消息* - 窗口确认大小消息* - 设置块大小消息等** @param header 消息头部信息,包含消息类型等元数据* @param stream 包含消息体的数据缓冲区* @param ppacket 输出参数,用于返回创建的数据包对象指针* @return 返回错误码,srs_success表示成功** @remark 对于未知消息类型,函数会记录日志并忽略(带宽设置和确认消息除外)* @see SrsMessageHeader, SrsBuffer, SrsPacket*/
srs_error_t SrsProtocol::do_decode_message(SrsMessageHeader &header, SrsBuffer *stream, SrsPacket **ppacket)
{// 初始化错误对象为成功状态srs_error_t err = srs_success;// 创建一个数据包指针,用于存储解析后的结果SrsPacket* packet = NULL;// 判断消息类型是否为AMF0/AMF3命令消息或数据消息if (header.is_amf0_command() || header.is_amf3_command() || header.is_amf0_data() || header.is_amf3_data()) {// 对于AMF3命令消息,需要跳过第一个字节 if (header.is_amf3_command() && stream->require(1)) {stream->skip(1);}// 解析AMF0命令消息的命令名称std::string command;if ((err = srs_amf0_read_string(stream, command)) != srs_success) {return srs_error_wrap(err, "decode command name");}// 处理结果或错误消息if (command == RTMP_AMF0_COMMAND_RESULT || command == RTMP_AMF0_COMMAND_ERROR) {// 解析事务IDdouble transactionId = 0.0;if ((err = srs_amf0_read_number(stream, transactionId)) != srs_success) {return srs_error_wrap(err, "decode tid for %s", command.c_str());}// 重置流位置,因为头部读取已完成stream->skip(-1 * stream->pos());if (header.is_amf3_command()) {stream->skip(1);}// 查找相应的请求命令名称if (requests.find(transactionId) == requests.end()) {return srs_error_new(ERROR_RTMP_NO_REQUEST, "find request for command=%s, tid=%.2f", command.c_str(), transactionId);}// 根据请求类型创建对应的响应包std::string request_name = requests[transactionId];if (request_name == RTMP_AMF0_COMMAND_CONNECT) {*ppacket = packet = new SrsConnectAppResPacket();return packet->decode(stream);} else if (request_name == RTMP_AMF0_COMMAND_CREATE_STREAM) {*ppacket = packet = new SrsCreateStreamResPacket(0, 0);return packet->decode(stream);} else if (request_name == RTMP_AMF0_COMMAND_RELEASE_STREAM) {*ppacket = packet = new SrsFMLEStartResPacket(0);return packet->decode(stream);} else if (request_name == RTMP_AMF0_COMMAND_FC_PUBLISH) {*ppacket = packet = new SrsFMLEStartResPacket(0);return packet->decode(stream);} else if (request_name == RTMP_AMF0_COMMAND_UNPUBLISH) {*ppacket = packet = new SrsFMLEStartResPacket(0);return packet->decode(stream);} else {return srs_error_new(ERROR_RTMP_NO_REQUEST, "request=%s, tid=%.2f", request_name.c_str(), transactionId);}}// 重置流位置到起始位置(AMF3命令需重置到1),以重新开始解码stream->skip(-1 * stream->pos());if (header.is_amf3_command()) {stream->skip(1);}// 根据命令名称创建对应类型的数据包if (command == RTMP_AMF0_COMMAND_CONNECT) {*ppacket = packet = new SrsConnectAppPacket();return packet->decode(stream);} else if (command == RTMP_AMF0_COMMAND_CREATE_STREAM) {*ppacket = packet = new SrsCreateStreamPacket();return packet->decode(stream);} else if (command == RTMP_AMF0_COMMAND_PLAY) {*ppacket = packet = new SrsPlayPacket();return packet->decode(stream);} else if (command == RTMP_AMF0_COMMAND_PAUSE) {*ppacket = packet = new SrsPausePacket();return packet->decode(stream);} else if (command == RTMP_AMF0_COMMAND_RELEASE_STREAM) {*ppacket = packet = new SrsFMLEStartPacket();return packet->decode(stream);} else if (command == RTMP_AMF0_COMMAND_FC_PUBLISH) {*ppacket = packet = new SrsFMLEStartPacket();return packet->decode(stream);} else if (command == RTMP_AMF0_COMMAND_PUBLISH) {*ppacket = packet = new SrsPublishPacket();return packet->decode(stream);} else if (command == RTMP_AMF0_COMMAND_UNPUBLISH) {*ppacket = packet = new SrsFMLEStartPacket();return packet->decode(stream);} else if (command == SRS_CONSTS_RTMP_SET_DATAFRAME) {*ppacket = packet = new SrsOnMetaDataPacket();return packet->decode(stream);} else if (command == SRS_CONSTS_RTMP_ON_METADATA) {*ppacket = packet = new SrsOnMetaDataPacket();return packet->decode(stream);} else if (command == SRS_BW_CHECK_FINISHED) {*ppacket = packet = new SrsBandwidthPacket();return packet->decode(stream);} else if (command == SRS_BW_CHECK_PLAYING) {*ppacket = packet = new SrsBandwidthPacket();return packet->decode(stream);} else if (command == SRS_BW_CHECK_PUBLISHING) {*ppacket = packet = new SrsBandwidthPacket();return packet->decode(stream);} else if (command == SRS_BW_CHECK_STARTING_PLAY) {*ppacket = packet = new SrsBandwidthPacket();return packet->decode(stream);} else if (command == SRS_BW_CHECK_STARTING_PUBLISH) {*ppacket = packet = new SrsBandwidthPacket();return packet->decode(stream);} else if (command == SRS_BW_CHECK_START_PLAY) {*ppacket = packet = new SrsBandwidthPacket();return packet->decode(stream);} else if (command == SRS_BW_CHECK_START_PUBLISH) {*ppacket = packet = new SrsBandwidthPacket();return packet->decode(stream);} else if (command == SRS_BW_CHECK_STOPPED_PLAY) {*ppacket = packet = new SrsBandwidthPacket();return packet->decode(stream);} else if (command == SRS_BW_CHECK_STOP_PLAY) {*ppacket = packet = new SrsBandwidthPacket();return packet->decode(stream);} else if (command == SRS_BW_CHECK_STOP_PUBLISH) {*ppacket = packet = new SrsBandwidthPacket();return packet->decode(stream);} else if (command == SRS_BW_CHECK_STOPPED_PUBLISH) {*ppacket = packet = new SrsBandwidthPacket();return packet->decode(stream);} else if (command == SRS_BW_CHECK_FINAL) {*ppacket = packet = new SrsBandwidthPacket();return packet->decode(stream);} else if (command == RTMP_AMF0_COMMAND_CLOSE_STREAM) {*ppacket = packet = new SrsCloseStreamPacket();return packet->decode(stream);} else if (header.is_amf0_command() || header.is_amf3_command()) {*ppacket = packet = new SrsCallPacket();return packet->decode(stream);}// 默认创建一个空包,用于丢弃未知消息*ppacket = packet = new SrsPacket();return err;} else if (header.is_user_control_message()) {// 处理用户控制消息*ppacket = packet = new SrsUserControlPacket();return packet->decode(stream);} else if (header.is_window_ackledgement_size()) {// 处理窗口确认大小消息*ppacket = packet = new SrsSetWindowAckSizePacket();return packet->decode(stream);} else if (header.is_ackledgement()) {// 处理确认消息*ppacket = packet = new SrsAcknowledgementPacket();return packet->decode(stream);} else if (header.is_set_chunk_size()) {// 处理设置块大小消息*ppacket = packet = new SrsSetChunkSizePacket();return packet->decode(stream);} else {// 对于未知消息类型,记录日志并忽略// 但对于带宽设置和确认消息不记录日志,因为这些是常见的控制消息if (!header.is_set_peer_bandwidth() && !header.is_ackledgement()) {srs_trace("drop unknown message, type=%d", header.message_type);}}// 返回处理结果return err;
}
2.5.1.1 特别处理 FMLE取消发布请求
主要功能:
- 函数处理FMLE客户端的取消发布请求
- 按顺序发送三个响应包:
- onFCUnpublish通知
- FCUnpublish响应包
- onStatus标准RTMP状态通知
/*** 处理FMLE取消发布请求,发送三个响应包:* 1. onFCUnpublish(NetStream.unpublish.Success)状态包* 2. FCUnpublish响应包(带事务ID)* 3. onStatus(NetStream.Unpublish.Success)状态包** @param stream_id 流ID* @param unpublish_tid 取消发布事务ID* @return 错误码,成功返回srs_success*/
srs_error_t SrsRtmpServer::fmle_unpublish(int stream_id, double unpublish_tid)
{// 初始化错误变量为成功状态srs_error_t err = srs_success;// publish response onFCUnpublish(NetStream.unpublish.Success)// 第一步:发送onFCUnpublish通知,告知客户端停止发布流成功if (true) {// 创建状态调用包对象SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();// 设置命令名为取消发布命令pkt->command_name = RTMP_AMF0_COMMAND_ON_FC_UNPUBLISH;// 设置状态码为取消发布成功pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodeUnpublishSuccess));// 设置状态描述信息pkt->data->set(StatusDescription, SrsAmf0Any::str("Stop publishing stream."));// 发送并释放数据包,如果发送失败则返回错误if ((err = protocol->send_and_free_packet(pkt, stream_id)) != srs_success) {return srs_error_wrap(err, "send NetStream.unpublish.Success");}}// FCUnpublish response// 第二步:发送FCUnpublish响应包,确认收到客户端的取消发布请求if (true) {// 创建FMLE启动响应包,使用客户端提供的事务IDSrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(unpublish_tid);// 发送并释放数据包,如果发送失败则返回错误if ((err = protocol->send_and_free_packet(pkt, stream_id)) != srs_success) {return srs_error_wrap(err, "send FCUnpublish response");}}// publish response onStatus(NetStream.Unpublish.Success)// 第三步:发送onStatus通知,标准RTMP状态通知,确认流已成功取消发布if (true) {// 创建状态调用包对象SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();// 设置状态级别为"status"(正常状态信息)pkt->data->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus));// 设置状态码为取消发布成功pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodeUnpublishSuccess));// 设置状态描述信息pkt->data->set(StatusDescription, SrsAmf0Any::str("Stream is now unpublished"));// 设置客户端IDpkt->data->set(StatusClientId, SrsAmf0Any::str(RTMP_SIG_CLIENT_ID));// 发送并释放数据包,如果发送失败则返回错误if ((err = protocol->send_and_free_packet(pkt, stream_id)) != srs_success) {return srs_error_wrap(err, "send NetStream.Unpublish.Success");}}// 返回处理结果return err;
}
2.5.2 处理视频、音频、数据消息
SrsRtmpConn::process_publish_message 函数位于RTMP连接处理模块中,负责处理RTMP发布过程中的各类媒体消息和数据包。用于处理视频、音频、聚合包和元数据等多种RTMP消息类型。
/*** @brief 处理发布消息内容* * 根据消息类型(音频、视频、元数据等)处理RTMP发布消息内容* * @param source 流媒体源对象* @param msg 待处理的RTMP消息* @return srs_error_t 错误码,成功返回srs_success*/
srs_error_t SrsRtmpConn::process_publish_message(SrsLiveSource* source, SrsCommonMessage* msg)
{srs_error_t err = srs_success;// 如果是边缘服务器,则直接将消息代理到源站if (info->edge) {if ((err = source->on_edge_proxy_publish(msg)) != srs_success) {return srs_error_wrap(err, "rtmp: proxy publish");}return err;}// 处理音频数据包if (msg->header.is_audio()) {if ((err = source->on_audio(msg)) != srs_success) {return srs_error_wrap(err, "rtmp: consume audio");}return err;}// 处理视频数据包if (msg->header.is_video()) {if ((err = source->on_video(msg)) != srs_success) {return srs_error_wrap(err, "rtmp: consume video");}return err;}if (msg->header.is_aggregate()) {if ((err = source->on_aggregate(msg)) != srs_success) {return srs_error_wrap(err, "rtmp: consume aggregate");}return err;}if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {SrsPacket* pkt = NULL;if ((err = rtmp->decode_message(msg, &pkt)) != srs_success) {return srs_error_wrap(err, "rtmp: decode message");}SrsAutoFree(SrsPacket, pkt);if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);if ((err = source->on_meta_data(msg, metadata)) != srs_success) {return srs_error_wrap(err, "rtmp: consume metadata");}return err;}return err;}return err;
}
2.5.2.1 边缘服务器
SrsEdgeForwarder::proxy函数是SRS流媒体服务器中边缘服务器(Edge)模式的核心转发功能,主要负责将从客户端接收到的RTMP消息转发到源站服务器(Origin)。
- 错误状态检查:首先检查之前的发送操作是否有错误,如有则直接返回错误状态,避免在已知错误状态下继续操作
- 消息过滤:过滤掉不需要转发的消息类型,包括:空消息(大小为0或负数)。RTMP协议控制消息(如chunk size设置、窗口确认大小、普通确认消息)
- 消息复制:创建原始消息的副本,因为原消息由源管理并可能被释放
- 流ID设置:设置正确的流ID,确保消息能够正确路由
- 消息入队:将消息放入转发队列,由专门的转发线程异步处理发送
srs_error_t SrsLiveSource::on_edge_proxy_publish(SrsCommonMessage* msg)
{return publish_edge->on_proxy_publish(msg);
}
srs_error_t SrsPublishEdge::on_proxy_publish(SrsCommonMessage* msg)
{return forwarder->proxy(msg);
}
/*** @brief 边缘转发器的消息代理处理函数** 该函数负责处理传入的消息,根据消息类型和内容决定是否转发:* 1. 检查发送错误码,如有错误直接返回* 2. 过滤无效消息(空消息、分块消息、窗口确认消息等)* 3. 创建消息副本并设置流ID* 4. 将消息加入发送队列** @param msg 待处理的通用消息指针(由调用方管理内存)* @return srs_error_t 返回操作结果,成功返回srs_success*/
srs_error_t SrsEdgeForwarder::proxy(SrsCommonMessage *msg)
{// 初始化错误对象为成功状态,用于错误处理srs_error_t err = srs_success;// 如果之前的发送操作已经出错,直接返回错误信息// 避免在已知错误状态下继续尝试转发if (send_error_code != ERROR_SUCCESS) {return srs_error_new(send_error_code, "edge forwarder");}// 对消息进行过滤,以下几种类型的消息不需要转发到源站:// 1. 空消息(大小<=0)// 2. chunk size设置消息// 3. 窗口确认大小消息// 4. 普通确认消息// 这些消息通常是连接控制消息,对内容传输没有实质影响if (msg->size <= 0|| msg->header.is_set_chunk_size()|| msg->header.is_window_ackledgement_size()|| msg->header.is_ackledgement()) {return err;}// 创建消息的副本,因为原始消息由来源管理并可能被释放// 我们需要一个独立的副本用于转发队列SrsSharedPtrMessage copy;if ((err = copy.create(msg)) != srs_success) {return srs_error_wrap(err, "create message");}// 设置流ID为当前SDK连接的流ID// 确保转发的消息具有正确的流标识copy.stream_id = sdk->sid();// 将消息副本放入转发队列// 转发线程将从队列中取出消息并发送到源站if ((err = queue->enqueue(copy.copy())) != srs_success) {return srs_error_wrap(err, "enqueue message");}// 返回处理结果return err;
}
2.5.2.2 聚合消息数据包
SrsLiveSource::on_aggregate函数负责处理RTMP聚合消息(Aggregate Message),它将聚合消息拆分为单独的音频、视频等子消息,然后将它们分别交给对应的处理函数处理。不要与flv混为一谈
聚合消息是RTMP协议层的优化机制,用于提升传输效率。FLV是存储格式,其标签结构独立于RTMP的传输机制。
/*** 处理RTMP聚合消息,解析并分发其中的子消息** 该函数负责解析RTMP协议中的聚合消息(类型22),将其拆分为多个独立的子消息(音频/视频等),* 并分别调用对应的处理函数。主要处理流程包括:* 1. 解析消息头(类型、大小、时间戳、流ID等)* 2. 调整子消息时间戳与聚合消息保持一致* 3. 根据消息类型分发到音频或视频处理函数** @param msg 输入的聚合消息对象指针* @return 返回错误码,srs_success表示成功** @remark 聚合消息格式参考RTMP协议规范,每个子消息包含:* 1字节类型 + 3字节大小 + 3字节时间戳(低24位) +* 1字节时间戳(高8位) + 3字节流ID + 数据负载 + 4字节previous tag size* @remark 仅处理音频(8)和视频(9)类型的子消息,其他类型将被忽略* @see SrsLiveSource::on_audio(), SrsLiveSource::on_video()*/
srs_error_t SrsLiveSource::on_aggregate(SrsCommonMessage *msg)
{// 初始化错误变量为成功状态srs_error_t err = srs_success;// 创建缓冲区,用于解析聚合消息的负载数据SrsBuffer* stream = new SrsBuffer(msg->payload, msg->size);// 使用自动释放指针管理内存,确保函数退出时释放缓冲区SrsAutoFree(SrsBuffer, stream);// 聚合消息总是使用绝对时间戳,需要计算时间戳的差值// the aggregate message always use abs time.int delta = -1;// 循环解析所有聚合在一起的子消息,直到缓冲区为空while (!stream->empty()) {// 检查是否有足够的数据读取消息类型(1字节)if (!stream->require(1)) {return srs_error_new(ERROR_RTMP_AGGREGATE, "aggregate");}// 读取消息类型(1字节),如音频(8)、视频(9)等int8_t type = stream->read_1bytes();// 检查是否有足够的数据读取数据大小(3字节)if (!stream->require(3)) {return srs_error_new(ERROR_RTMP_AGGREGATE, "aggregate");}// 读取数据大小(3字节)int32_t data_size = stream->read_3bytes();// 确保数据大小合法if (data_size < 0) {return srs_error_new(ERROR_RTMP_AGGREGATE, "aggregate size");}// 检查是否有足够的数据读取时间戳(3字节)if (!stream->require(3)) {return srs_error_new(ERROR_RTMP_AGGREGATE, "aggregate time");}// 读取时间戳(3字节),低24位int32_t timestamp = stream->read_3bytes();// 检查是否有足够的数据读取时间戳的高8位(1字节)if (!stream->require(1)) {return srs_error_new(ERROR_RTMP_AGGREGATE, "aggregate time(high bits)");}// 读取时间戳的高8位(1字节)int32_t time_h = stream->read_1bytes();// 组合时间戳的所有位,形成完整的32位时间戳timestamp |= time_h<<24;// 确保时间戳的最高位为0(正数)timestamp &= 0x7FFFFFFF;// 调整聚合消息中的绝对时间戳// 只有delta为-1表示未初始化的差值if (delta == -1) {// 计算原始消息时间戳与子消息时间戳的差值delta = (int)msg->header.timestamp - (int)timestamp;}// 将差值应用到子消息的时间戳上,保持一致性timestamp += delta;// 检查是否有足够的数据读取流ID(3字节)if (!stream->require(3)) {return srs_error_new(ERROR_RTMP_AGGREGATE, "aggregate stream id");}// 读取流ID(3字节)int32_t stream_id = stream->read_3bytes();// 如果有数据,确保缓冲区中有足够的数据可读if (data_size > 0 && !stream->require(data_size)) {return srs_error_new(ERROR_RTMP_AGGREGATE, "aggregate data");}// 创建通用消息对象,用于存储解析出的子消息SrsCommonMessage o;// 设置子消息头部信息o.header.message_type = type; // 消息类型o.header.payload_length = data_size; // 负载数据长度o.header.timestamp_delta = timestamp; // 时间戳增量o.header.timestamp = timestamp; // 绝对时间戳o.header.stream_id = stream_id; // 流IDo.header.perfer_cid = msg->header.perfer_cid; // 继承原消息的偏好通道ID// 如果有数据,则分配内存并复制数据if (data_size > 0) {o.size = data_size;o.payload = new char[o.size];stream->read_bytes(o.payload, o.size);}// 检查是否有足够的数据读取previous tag size(4字节)if (!stream->require(4)) {return srs_error_new(ERROR_RTMP_AGGREGATE, "aggregate previous tag size");}// 读取但不使用previous tag size(4字节)stream->read_4bytes();// 根据消息类型处理解析出的子消息if (o.header.is_audio()) {// 如果是音频消息,则交由音频处理函数处理if ((err = on_audio(&o)) != srs_success) {return srs_error_wrap(err, "consume audio");}} else if (o.header.is_video()) {// 如果是视频消息,则交由视频处理函数处理if ((err = on_video(&o)) != srs_success) {return srs_error_wrap(err, "consume video");}}// 注意:这里不处理其他类型的消息,如元数据等}// 返回处理结果return err;
}
2.5.2.3 音频数据包
SrsLiveSource::on_audio_imp函数是SRS流媒体服务器处理音频消息的核心实现函数,主要职责是处理接收到的音频数据包并将其分发给各个消费者和其他模块。处理流程详解:
- 序列头检测:识别AAC音频序列头,对序列头和普通数据包进行不同处理
- 重复序列头过滤:通过比较检测重复序列头,可选择性丢弃以减少带宽消耗
- 多目标分发:将音频消息分发给hub(处理HLS/DASH等)、桥接器和消费者
- 元数据缓存:维护音频序列头缓存,用于新连接的客户端
- GOP缓存:将非序列头的音频消息加入GOP缓存,支持播放器快速启动
- ATC时间戳处理:在ATC模式下同步更新序列头和元数据的时间戳
/*** 处理音频消息** 当收到音频数据包时,检查时间戳是否单调递增(若未开启混音修正)。* 若开启混音修正,则将消息放入混音队列后处理;否则直接处理音频消息。** @param shared_audio 共享的音频消息,处理后其payload将被转移* @return 错误码,成功返回srs_success** @remark 若检测到非单调递增且未开启混音修正,会输出警告日志* @see on_audio_imp, on_video_imp*/
srs_error_t SrsLiveSource::on_audio(SrsCommonMessage *shared_audio)
{// 初始化错误对象为成功状态srs_error_t err = srs_success;// 单调递增检测,确保时间戳是递增的if (!mix_correct && is_monotonically_increase) {// 如果上一个数据包时间戳大于当前音频包的时间戳,说明时间戳不是单调递增的if (last_packet_time > 0 && shared_audio->header.timestamp < last_packet_time) {// 设置单调递增标志为false,并发出警告is_monotonically_increase = false;srs_warn("AUDIO: stream not monotonically increase, please open mix_correct.");}}// 更新最后一个数据包的时间戳last_packet_time = shared_audio->header.timestamp;// 将共享音频消息转换为SrsSharedPtrMessage,转换后用户不应再使用shared_audioSrsSharedPtrMessage msg;// 创建共享指针消息,将有效负载从shared_audio转移到msg中if ((err = msg.create(shared_audio)) != srs_success) {return srs_error_wrap(err, "create message");}// 如果没有启用混音校正,直接处理音频消息if (!mix_correct) {return on_audio_imp(&msg);}// 如果启用了混音校正,将消息插入到混音队列中mix_queue->push(msg.copy());// 从混音队列中获取一条消息,可能是音频或视频SrsSharedPtrMessage* m = mix_queue->pop();// 如果没有获取到消息,直接返回当前状态if (!m) {return err;}// 根据消息类型分别处理音频或视频if (m->is_audio()) {// 如果是音频消息,调用音频处理实现函数err = on_audio_imp(m);} else {// 如果是视频消息,调用视频处理实现函数err = on_video_imp(m);}// 释放临时消息对象srs_freep(m);// 返回处理结果return err;
}/*** 处理音频消息的核心逻辑** 1. 检查是否为AAC序列头,并根据配置决定是否丢弃重复序列头* 2. 将音频消息分发到hub、bridger和所有consumer(除非被丢弃)* 3. 更新音频序列头元数据* 4. 非序列头消息会被缓存到GOP缓存* 5. 如果启用了ATC(绝对时间戳),会更新元数据时间戳** @param msg 共享指针音频消息* @return 错误码,成功返回srs_success*/
srs_error_t SrsLiveSource::on_audio_imp(SrsSharedPtrMessage* msg)
{// 初始化返回值为成功状态srs_error_t err = srs_success;// 检测是否为AAC音频的序列头(Sequence Header)bool is_aac_sequence_header = SrsFlvAudio::sh(msg->payload, msg->size);// 用is_sequence_header变量标记是否为序列头,与AAC序列头状态保持一致bool is_sequence_header = is_aac_sequence_header;// 判断是否应该为重复的序列头丢弃当前音频包// whether consumer should drop for the duplicated sequence header.bool drop_for_reduce = false;// 如果是序列头且之前已有音频序列头缓存,并且配置了减少序列头发送if (is_sequence_header && meta->previous_ash() && _srs_config->get_reduce_sequence_header(req->vhost)) {// 如果新旧序列头大小相同,则进一步比较内容if (meta->previous_ash()->size == msg->size) {// 字节比较,如果内容完全相同则标记为需要丢弃drop_for_reduce = srs_bytes_equals(meta->previous_ash()->payload, msg->payload, msg->size);// 记录日志,提示丢弃序列头srs_warn("drop for reduce sh audio, size=%d", msg->size);}}// 将音频消息转发给hub处理,hub负责分发给HLS、DASH等模块if ((err = hub->on_audio(msg)) != srs_success) {// 如果分发失败,包装错误信息并返回return srs_error_wrap(err, "consume audio");}// 如果存在桥接器,则将音频消息传递给桥接器处理// For bridger to consume the message.if (bridger_ && (err = bridger_->on_audio(msg)) != srs_success) {// 如果桥接处理失败,包装错误信息并返回return srs_error_wrap(err, "bridger consume audio");}// 将音频消息复制分发给所有消费者(除非标记为需要丢弃)// copy to all consumerif (!drop_for_reduce) {// 遍历所有消费者for (int i = 0; i < (int)consumers.size(); i++) {// 获取当前消费者SrsLiveConsumer* consumer = consumers.at(i);// 将消息加入消费者队列,应用ATC和抖动算法设置if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) {// 如果入队失败,包装错误信息并返回return srs_error_wrap(err, "consume message");}}}// 缓存AAC序列头或MP3的第一个数据包if (is_aac_sequence_header || !meta->ash()) {// 更新音频序列头缓存if ((err = meta->update_ash(msg)) != srs_success) {// 如果更新失败,包装错误信息并返回return srs_error_wrap(err, "meta consume audio");}}// 对于序列头,不推送到GOP缓存,也不调整时间戳,直接返回// when sequence header, donot push to gop cache and adjust the timestamp.if (is_sequence_header) {return err;}// 缓存音频消息到GOP缓存,用于新连接的消费者快速获取内容if ((err = gop_cache->cache(msg)) != srs_success) {// 如果缓存失败,包装错误信息并返回return srs_error_wrap(err, "gop cache consume audio");}// 如果启用了ATC(绝对时间码)模式,则更新序列头的时间戳为绝对时间if (atc) {// 更新音频序列头的时间戳if (meta->ash()) {meta->ash()->timestamp = msg->timestamp;}// 更新元数据的时间戳if (meta->data()) {meta->data()->timestamp = msg->timestamp;}}// 返回处理结果return err;
}
2.5.2.4 视频数据包
SrsLiveSource::on_video_imp函数是SRS视频流处理的核心实现,负责处理RTMP视频消息并将其分发到适当的组件和消费者。处理流程详解:
- 序列头检测和处理:检测消息是否为H.264序列头,根据配置可选择性丢弃重复序列头,节省带宽,更新元数据缓存中的视频序列头。
- 多目标分发:通过hub分发给HLS/DASH等模块进行流格式转换,如果存在桥接器,将消息传递给桥接器处理,分发给所有已注册的消费者(播放客户端)。
- GOP缓存处理:对于非序列头视频消息,缓存到GOP缓存中,支持新连接客户端快速获取关键帧,提升播放体验
- ATC时间戳处理:在绝对时间码模式下,更新元数据和序列头的时间戳,确保时间戳一致性,支持特殊播放场景。
/*** 处理视频消息的核心逻辑** 1. 检测是否为序列头(SPS/PPS),并根据配置决定是否丢弃重复的序列头* 2. 更新视频序列头元数据* 3. 将视频消息分发给hub、bridger和所有consumer* 4. 非序列头消息会存入GOP缓存* 5. 如果启用了绝对时间戳(ATC),会更新相关元数据的时间戳** @param msg 共享指针包装的视频消息* @return 返回错误码,srs_success表示成功** @note 序列头消息不会存入GOP缓存*/
srs_error_t SrsLiveSource::on_video_imp(SrsSharedPtrMessage *msg)
{// 初始化错误码为成功状态srs_error_t err = srs_success;// 检测是否为视频序列头(关键帧中的SPS/PPS信息)bool is_sequence_header = SrsFlvVideo::sh(msg->payload, msg->size);// 判断是否需要丢弃重复的视频序列头以减少带宽使用// whether consumer should drop for the duplicated sequence header.bool drop_for_reduce = false;// 如果是序列头、已有之前的序列头缓存、且配置了减少序列头发送选项if (is_sequence_header && meta->previous_vsh() && _srs_config->get_reduce_sequence_header(req->vhost)) {// 如果当前序列头与之前缓存的序列头大小相同if (meta->previous_vsh()->size == msg->size) {// 比较序列头内容是否完全相同,相同则标记为需要丢弃drop_for_reduce = srs_bytes_equals(meta->previous_vsh()->payload, msg->payload, msg->size);// 记录日志,提示丢弃了重复序列头srs_warn("drop for reduce sh video, size=%d", msg->size);}}// 如果是序列头,则缓存它if (is_sequence_header && (err = meta->update_vsh(msg)) != srs_success) {// 如果更新序列头缓存失败,返回错误return srs_error_wrap(err, "meta update video");}// 将视频消息转发给hub处理,hub负责分发给HLS、DASH等模块if ((err = hub->on_video(msg, is_sequence_header)) != srs_success) {// 如果hub处理失败,包装错误并返回return srs_error_wrap(err, "hub consume video");}// 如果存在桥接器,将视频消息传递给桥接器处理if (bridger_ && (err = bridger_->on_video(msg)) != srs_success) {// 如果桥接器处理失败,包装错误并返回return srs_error_wrap(err, "bridger consume video");}// 如果不需要丢弃,则复制消息到所有消费者if (!drop_for_reduce) {// 遍历所有消费者for (int i = 0; i < (int)consumers.size(); i++) {// 获取当前消费者SrsLiveConsumer* consumer = consumers.at(i);// 将消息放入消费者队列,使用配置的ATC和抖动算法设置if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) {// 入队失败,包装错误并返回return srs_error_wrap(err, "consume video");}}}// 如果是序列头,不放入GOP缓存,也不调整时间戳,直接返回if (is_sequence_header) {return err;}// 将非序列头的视频消息缓存到GOP缓存if ((err = gop_cache->cache(msg)) != srs_success) {// 缓存失败,包装错误并返回return srs_error_wrap(err, "gop cache consume vdieo");}// 如果启用了ATC(绝对时间码),更新序列头和元数据的时间戳为当前消息的时间戳if (atc) {// 更新视频序列头的时间戳if (meta->vsh()) {meta->vsh()->timestamp = msg->timestamp;}// 更新元数据的时间戳if (meta->data()) {meta->data()->timestamp = msg->timestamp;}}// 返回处理结果return err;
}
2.5.2.5 元数据包
SrsLiveSource::on_meta_data函数主要负责处理RTMP流中的元数据包(onMetaData)
实现细节
- 元数据解析与处理:原始消息对象msg和已解析的元数据包metadata。metadata是一个包含AMF0对象的SrsOnMetaDataPacket对象,里面存储了流的属性信息
- SrsMetaCache::update_data的工作流程:
- 从元数据中移除duration属性(解决ExoPlayer兼容性问题)提取并记录重要的元数据信息(宽度、高度、视频编解码器ID、音频编解码器ID)
- 将服务器信息添加到元数据中。
- 将元数据编码为二进制格式,并创建可共享的消息对象
- SrsOriginHub::on_meta_data的工作流程:
- 在SrsRtmpFormat中处理元数据格式
- 将元数据转发给所有配置的转发目标
- 将元数据发送给DVR模块进行录制
/*** 处理直播源的元数据消息** @param msg 接收到的通用消息,包含元数据包头信息* @param metadata 元数据包内容* @return 错误码,成功返回srs_success** 功能说明:* 1. 根据配置自动检测并设置ATC(绝对时间戳)模式* 2. 更新元数据缓存* 3. 根据reduce_sequence_header配置决定是否丢弃元数据* 4. 将元数据分发给所有消费者* 5. 将元数据转发给hub处理** 注意:当配置了reduce_sequence_header时会丢弃重复的元数据包*/
srs_error_t SrsLiveSource::on_meta_data(SrsCommonMessage *msg, SrsOnMetaDataPacket *metadata)
{// 初始化错误变量为成功状态,用于记录处理过程中的错误srs_error_t err = srs_success;// 检查是否配置了ATC(绝对时间码)模式,并处理bravo-atc自动检测SrsAmf0Any* prop = NULL;// 从配置中读取当前虚拟主机的ATC设置atc = _srs_config->get_atc(req->vhost);// 如果配置中允许自动检测ATC模式if (_srs_config->get_atc_auto(req->vhost)) {// 尝试从元数据中获取"bravo_atc"属性if ((prop = metadata->metadata->get_property("bravo_atc")) != NULL) {// 如果属性存在且值为"true"字符串,则启用ATC模式if (prop->is_string() && prop->to_str() == "true") {atc = true;}}}// 更新元数据缓存,将新的元数据存储到meta对象中// Update the meta cache.bool updated = false;// 调用meta缓存对象的update_data方法更新元数据,并通过updated参数返回是否有变更if ((err = meta->update_data(&msg->header, metadata, updated)) != srs_success) {return srs_error_wrap(err, "update metadata");}// 如果元数据没有变更,则直接返回,避免重复处理if (!updated) {return err;}// 根据配置决定是否丢弃重复的元数据bool drop_for_reduce = false;// 如果已经有元数据,且配置了减少序列头发送,则标记为需丢弃if (meta->data() && _srs_config->get_reduce_sequence_header(req->vhost)) {drop_for_reduce = true;// 输出警告日志,表明因reduce_sequence_header配置丢弃元数据srs_warn("drop for reduce sh metadata, size=%d", msg->size);}// 将元数据分发给所有消费者(订阅者)if (!drop_for_reduce) {// 遍历所有消费者,将元数据发送给它们std::vector<SrsLiveConsumer*>::iterator it;for (it = consumers.begin(); it != consumers.end(); ++it) {SrsLiveConsumer* consumer = *it;// 将元数据放入消费者的消息队列,采用当前的ATC模式和抖动算法if ((err = consumer->enqueue(meta->data(), atc, jitter_algorithm)) != srs_success) {return srs_error_wrap(err, "consume metadata");}}}// 将元数据转发给hub处理,hub负责将元数据分发给各种输出(如HLS、DVR等)// Copy to hub to all utilities.return hub->on_meta_data(meta->data(), metadata);
}
2.5.3 加入消费队列
以上解析出来的数据分发给Hub (SrsOriginHub) Bridger (ISrsStreamBridge) Consumer (SrsLiveConsumer) 本文以消费者作为拉流源第一视角进行分析。SrsLiveConsumer::enqueue 函数是SRS流媒体服务器中负责将RTMP消息加入消费队列的核心函数。这个函数不仅处理消息的入队操作,还实现了时间戳校正和消费者唤醒机制,是连接生产者与消费者的关键环节。
技术要点:
1. 函数使用shared_msg->copy()创建消息副本,而不是直接使用原始消息。这种设计确保了:
- 每个消费者拥有独立的消息副本,避免数据竞争
- 即使某个消费者处理较慢,也不会影响其他消费者
- 支持对每个消费者应用不同的时间戳校正策略
2. 在启用SRS_PERF_QUEUE_COND_WAIT编译宏的情况下,函数实现了基于条件变量的等待-唤醒机制。根据队列中消息数量和持续时间决定是否唤醒等待线程!
/*** 将RTMP消息加入消费队列并进行时间戳校正** @param shared_msg 共享消息指针,将被复制后入队* @param atc 是否使用绝对时间戳(ATC),为true时跳过时间校正* @param ag 抖动校正算法* @return 错误码,成功返回srs_success** @remark 当启用SRS_PERF_QUEUE_COND_WAIT时:* 1) 对于ATC模式,若出现时间戳异常会立即唤醒等待线程* 2) 当队列消息数超过阈值且持续时间达标时,会唤醒等待线程*/
srs_error_t SrsLiveConsumer::enqueue(SrsSharedPtrMessage *shared_msg, bool atc, SrsRtmpJitterAlgorithm ag)
{// 定义错误对象,初始化为成功状态srs_error_t err = srs_success;// 复制共享消息,确保每个消费者拥有独立的消息副本SrsSharedPtrMessage* msg = shared_msg->copy();// 如果不使用绝对时间码模式(atc=false),需要进行时间戳校正if (!atc) {// 使用抖动算法校正时间戳,避免音视频时间戳不同步问题if ((err = jitter->correct(msg, ag)) != srs_success) {// 时间戳校正失败,返回错误并包装错误信息return srs_error_wrap(err, "consume message");}}// 将校正后的消息加入到消费队列,第二个参数为NULL表示没有额外的处理if ((err = queue->enqueue(msg, NULL)) != srs_success) {// 入队失败,返回错误并包装错误信息return srs_error_wrap(err, "enqueue message");}#ifdef SRS_PERF_QUEUE_COND_WAIT// 当队列中消息数量达到唤醒阈值时,通知等待的线程if (mw_waiting) {// 对于RTMP,我们需要同时满足消息数量和持续时间的条件srs_utime_t duration = queue->duration();// ATC模式下,检查时间戳是否异常,若异常则唤醒if (atc) {if (duration > SRS_CONF_DEFAULT_SEND_MIN_INTERVAL) {// 检测到时间戳异常,唤醒等待线程处理mw_wait_cv->signal();mw_waiting = false;return err;}}// 当队列中的消息数量超过等待阈值且持续时间满足条件时,唤醒等待线程if (queue->size() > mw_msgs && duration > mw_sleep) {// 唤醒等待的线程处理积累的消息mw_wait_cv->signal();mw_waiting = false;return err;}}
#endif// 函数成功执行,返回成功状态return err;
}/*** 将消息加入队列并处理可能的溢出情况** @param msg 要入队的共享指针消息* @param is_overflow 输出参数,指示是否发生了队列溢出* @return 错误码,成功返回srs_success** @remark 当jitter关闭时,首个序列头的时间戳可能为0,此时即使队列未溢出也会丢弃关键帧,* 因此需要忽略0时间戳的情况。详见相关GitHub讨论。* @see https://github.com/ossrs/srs/pull/2186#issuecomment-953383063*/
srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage *msg, bool *is_overflow)
{srs_error_t err = srs_success;msgs.push_back(msg);// If jitter is off, the timestamp of first sequence header is zero, which wll cause SRS to shrink and drop the// keyframes even if there is not overflow packets in queue, so we must ignore the zero timestamps, please// @see https://github.com/ossrs/srs/pull/2186#issuecomment-953383063if (msg->is_av() && msg->timestamp != 0) {if (av_start_time == -1) {av_start_time = srs_utime_t(msg->timestamp * SRS_UTIME_MILLISECONDS);}av_end_time = srs_utime_t(msg->timestamp * SRS_UTIME_MILLISECONDS);}if (max_queue_size <= 0) {return err;}while (av_end_time - av_start_time > max_queue_size) {// notice the caller queue already overflow and shrinked.if (is_overflow) {*is_overflow = true;}shrink();}return err;
}
3. 推流源码阅读的扩展知识
3.1 AMF0和AMF3简介
AMF (Action Message Format) 是Adobe开发的一种二进制格式,用于序列化ActionScript对象,主要用于Flash和服务器之间的数据交换。在RTMP协议中:
- AMF0:较早的序列化格式,结构简单,支持的数据类型有限,但兼容性好。消息类型值为20(0x14)。
- AMF3:较新的序列化格式,支持更复杂的数据类型和更紧凑的编码,但处理更复杂。消息类型值为17(0x11)。
3.2 ATC(绝对时间码)
ATC(Absolute Time Clock)是一种时间戳处理机制,当启用ATC时,SRS会直接使用编码器提供的原始时间戳,而不会对时间戳进行调整。这与默认行为不同,默认情况下SRS会调整时间戳使其从0开始,以便Flash播放器能够正常播放。这在多级集群、多设备同步播放等场景中非常有用。SRS提供了atc
和atc_auto
两个配置选项来控制这一功能,并且在SRS 3.0以上版本中,这些配置被移到了play
部分下。
ATC的应用场景
ATC主要用于以下场景:
- 多级集群部署:当使用边缘-源站架构时,保持原始时间戳可以确保不同层级的服务器之间时间戳一致。
- HLS/DASH流分发:在生成HLS或DASH片段时,使用绝对时间码可以保持片段之间的时间关系。
- 多设备同步播放:当需要多个设备同步播放同一个流时,使用绝对时间码可以确保播放进度一致。
3.3 “hub”、“bridger”和“consumer”概念
在SRS中,Hub、Bridger和Consumer是三个关键概念,它们构成了SRS流媒体服务器中数据分发的核心架构。
3.3.1. Consumer(消费者)
Consumer是直接连接到SRS服务器的客户端的抽象,代表了一个播放流媒体的终端用户。
主要职责:
- 接收来自Source的音视频和元数据消息
- 进行时间戳校正(通过jitter算法)
- 维护消息队列,缓存还未发送给客户端的数据
- 处理播放端的暂停/继续请求
特点:
- 每个播放客户端连接对应一个Consumer实例
- 消费者按需从Source获取数据,并使用队列缓冲数据
- 支持GOP缓存,使新连接的客户端能立即看到完整画面
3.3.2. Hub(媒体集线器)
Hub是SRS中负责将媒体数据分发给各种输出模块的中心组件。它仅用作源服务器的实用程序集合。
主要职责:
- 将数据分发给各种媒体处理模块,如HLS、DVR、DASH等
- 管理转发器(Forwarder),将流转发到其他服务器
- 处理编码转换(Transcoder)需求
- 维护媒体格式信息
特点:
- 每个Source对应一个Hub
- 集中管理所有外部输出(非直接客户端连接)
- 负责处理格式转换和适配
3.3.3. Bridger(桥接器)
Bridger是一个可选组件,用于将媒体流桥接到其他类型的Source。它旨在在不同协议之间转换流:ISrsStreamBridge
主要实施包括:
- SrsFrameToRtmpBridge - 将帧转换为 RTMP 协议
- SrsFrameToRtcBridge - 将帧转换为 WebRTC 协议
- SrsCompositeBridge - 用于复杂转换场景的一系列桥接
主要职责:
- 接收Source的音视频数据
- 将数据转发到其他类型的源或目标
- 实现不同协议或格式之间的桥接
特点:是扩展SRS功能的重要机制
3.3.4 媒体分发的流程
SRS 中的媒体流可以概括为:
- 源 (SrsLiveSource) 接收传入的媒体流
- Hub (SrsOriginHub) 处理媒体并将其分发到各种输出模块(HLS、DASH 等)
- Bridger (ISrsStreamBridge) 可在需要时在不同协议之间转换媒体
- Consumer (SrsLiveConsumer) 将媒体传送给各个客户端
3.4 jitter抖动校正算法
示例 1:FULL 算法处理时间戳抖动
假设有以下时间戳序列:100, 110, 90, 120
使用 FULL 算法处理:
- 第一个包 (100):输出 100
- 第二个包 (110):delta = 10,正常,输出 110
- 第三个包 (90):delta = -20,检测到抖动,使用默认 delta 10ms,输出 120
- 第四个包 (120):delta = 30,正常,输出 150
结果:100, 110, 120, 150(单调递增)
示例 2:ZERO 算法重置时间戳
假设有以下时间戳序列:1000, 1010, 990, 1020
使用 ZERO 算法处理:
- 第一个包 (1000):记录基准值 1000,输出 0
- 第二个包 (1010):输出 1010 - 1000 = 10
- 第三个包 (990):输出 990 - 1000 = -10(注意这里是负值,因为 ZERO 不保证单调递增)
- 第四个包 (1020):输出 1020 - 1000 = 20
结果:0, 10, -10, 20(从零开始但不保证单调递增)
示例 3:OFF 算法不校正
假设有以下时间戳序列:1000, 1010, 990, 1020
使用 OFF 算法处理:
- 第一个包 (1000):输出 1000
- 第二个包 (1010):输出 1010
- 第三个包 (990):输出 990
- 第四个包 (1020):输出 1020
结果:1000, 1010, 990, 1020(保持原始时间戳)
4. 拉流发送数据调用栈
后续继续编写拉流详细文章。敬请期待
#从上层到操作系统写入调用栈
(gdb) bt
#0 st_writev (fd=0x604000013b10, iov=0x62d00001e400, iov_size=60, timeout=30000000)at io.c:489
#1 0x0000555555c6ba94 in SrsStSocket::writev (this=0x60700001dad0, iov=0x62d00001e400, iov_size=60, nwrite=0x0) at ./src/protocol/srs_protocol_st.cpp:653
#2 0x0000555555cbfa49 in SrsTcpConnection::writev (this=0x604000013b90, iov=0x62d00001e400, iov_size=60, nwrite=0x0) at ./src/app/srs_app_conn.cpp:566
#3 0x0000555555bfa61b in srs_write_large_iovs (skt=0x604000013b90, iovs=0x62d00001e400, size=60, pnwrite=0x0) at ./src/protocol/srs_protocol_utility.cpp:376
#4 0x0000555555bb1508 in SrsProtocol::do_iovs_send (this=0x611000013640, iovs=0x62d00001e400, size=60) at ./src/protocol/srs_protocol_rtmp_stack.cpp:556
#5 0x0000555555bb1468 in SrsProtocol::do_send_messages (this=0x611000013640, msgs=0x61900001f980, nb_msgs=30) at ./src/protocol/srs_protocol_rtmp_stack.cpp:496
#6 0x0000555555bb3d2f in SrsProtocol::send_and_free_messages (this=0x611000013640, msgs=0x61900001f980, nb_msgs=30, stream_id=1)at ./src/protocol/srs_protocol_rtmp_stack.cpp:752
#7 0x0000555555bc49f1 in SrsRtmpServer::send_and_free_messages (this=0x603000018af0, msgs=0x61900001f980, nb_msgs=30, stream_id=1)at ./src/protocol/srs_protocol_rtmp_stack.cpp:2208
#8 0x0000555555ce54fd in SrsRtmpConn::do_playing (this=0x61200004c3c0, source=..., consumer=0x606000010820, rtrd=0x7ffff29bd5f0) at ./src/app/srs_app_rtmp_conn.cpp:901
#9 0x0000555555ce3100 in SrsRtmpConn::playing (this=0x61200004c3c0, source=...)
--Type <RET> for more, q to quit, c to continue without paging--at ./src/app/srs_app_rtmp_conn.cpp:776
#10 0x0000555555cdf301 in SrsRtmpConn::stream_service_cycle (this=0x61200004c3c0)at ./src/app/srs_app_rtmp_conn.cpp:613
#11 0x0000555555cdc668 in SrsRtmpConn::service_cycle (this=0x61200004c3c0)at ./src/app/srs_app_rtmp_conn.cpp:446
#12 0x0000555555cd97ec in SrsRtmpConn::do_cycle (this=0x61200004c3c0)at ./src/app/srs_app_rtmp_conn.cpp:262
#13 0x0000555555cefe8a in SrsRtmpConn::cycle (this=0x61200004c3c0)at ./src/app/srs_app_rtmp_conn.cpp:1609
#14 0x0000555555d67f02 in SrsFastCoroutine::cycle (this=0x60e000009ce0)at ./src/app/srs_app_st.cpp:309
#15 0x0000555555d68052 in SrsFastCoroutine::pfn (arg=0x60e000009ce0)at ./src/app/srs_app_st.cpp:324
#16 0x00005555560fa66f in _st_thread_main () at sched.c:380
#17 0x00005555560faf95 in st_thread_create (start=0x604000013c50, arg=0x7ffff4c25ed0, joinable=-1890085540, stk_size=-1719832832) at sched.c:666
#18 0x0000555555c65a1b in srs_context_set_cid_of (trd=0x7ffff4c26648, v=...)at ./src/protocol/srs_protocol_log.cpp:91
#19 0x0000555555a5e631 in _SrsContextId::~_SrsContextId (this=0x7ffff4c26298, __in_chrg=<optimized out>) at ./src/core/srs_core.cpp:24
#20 0x0000555555c65bbf in impl_SrsContextRestore::~impl_SrsContextRestore (
--Type <RET> for more, q to quit, c to continue without paging--this=0x7ffff4c26290, __in_chrg=<optimized out>)at ./src/protocol/srs_protocol_log.cpp:101
#21 0x0000555555cb5a7a in SrsServer::do_on_tcp_client (this=0x61100000ff40, listener=0x604000002150, stfd=@0x7ffff4c263b0: 0xffffe984c94)at ./src/app/srs_app_server.cpp:1161
#22 0x00007ffff4749f20 in ?? ()
#23 0x00007ffff4c26648 in ?? ()
#24 0x00007ffff4c263c0 in ?? ()
#25 0x0000000100000001 in ?? ()
#26 0x8f57955c997d6f00 in ?? ()
#27 0x00007ffff4c263d0 in ?? ()
#28 0x00005555560fc8d4 in st_netfd_poll (fd=0x7ffff2999f90, how=1, timeout=18446744073709551615) at io.c:249
学习资料分享
0voice · GitHub