首页 > 其他分享 >SRS之RTMP推拉流分析

SRS之RTMP推拉流分析

时间:2023-02-02 22:55:35浏览次数:49  
标签:return success err srs SRS 推拉 RTMP error rtmp

  SRS是一个简单高效的实时视频服务器,支持RTMP/WebRTC/HLS/HTTP-FLV/SRT/GB28181;本文以SRS4.0版本进行分析RTMP推拉流架构,SRS整体架构如下图(官网图片)所示:

   有图可知SRS支持多种客户端以不同的媒流体协议进行推流、拉流,内部还包括了不同协议的转换,同时还支持SRS的集群。

  本文主要分析在SRS中RTMP的推流、拉流源码分析,其核心类如下:

SrsServer SRS流媒体服务⼊⼝类
SrsBufferListener 监听器,主要是TCP的监听
SrsTcpListener TCP监听器
SrsRtmpConn RTMP连接,⾥⾯对应了SrsStSocket和SrsCoroutine
SrsRtmpServer 提供与客户端之间的RTMP-命令-协议-消息的交互服务,使⽤SrsRtmpConn 提供的socket读写数据
SrsSource 描述⼀路播放源,包括推流和拉流的描述
SrsConsumer 拉流消费者,每⼀路拉流客户端对应⼀个SrsConsumer
SrsStSocket 经过封装的socket接⼝
SrsRecvThread 负责接收数据,但是要注意的是他这⾥并不是从IO⾥⾯读取数据,从SrsRtmpServer类拉取数据,然后推送到SrsPublishRecvThread(推流⽤),或者 SrsQueueRecvThread(拉流⽤)
SrsQueueRecvThread 主要⽤于拉流,对应的是客户端-服务器的控制消息,和⾳视频消息没有关系。客 户端读取数据还是从consumer的queue⾥⾯去读取。
SrsPublishRecvThread 主要⽤于推流

  RTMP推拉流代码流程如下:

 SRS网络模型分析

  在主函数run_hybrid_server中开始于_srs_hybrid->run()轮询,通过流体服务SrsServer::listen()进入服务端监听,这里分别对不同的协议进行了不同的监听处理,代码如下:

srs_error_t SrsServer::listen()
{
    srs_error_t err = srs_success;
    //rtmp的listen
    if ((err = listen_rtmp()) != srs_success) {
        return srs_error_wrap(err, "rtmp listen");
    }
    
    if ((err = listen_http_api()) != srs_success) {
        return srs_error_wrap(err, "http api listen");
    }

    if ((err = listen_https_api()) != srs_success) {
        return srs_error_wrap(err, "https api listen");
    }
    
    if ((err = listen_http_stream()) != srs_success) {
        return srs_error_wrap(err, "http stream listen");
    }

    if ((err = listen_https_stream()) != srs_success) {
        return srs_error_wrap(err, "https stream listen");
    }
    
    if ((err = listen_stream_caster()) != srs_success) {
        return srs_error_wrap(err, "stream caster listen");
    }
    
    if ((err = conn_manager->start()) != srs_success) {
        return srs_error_wrap(err, "connection manager");
    }

    return err;
}

  进入RTMP对应的listen,这里主要通过SrsBufferListener进一步封装了listen,包括http api、https api的监听都是用SrsBufferListener统一的封装类;

srs_error_t SrsBufferListener::listen(string i, int p)
{
    srs_error_t err = srs_success;
    
    ip = i;
    port = p;
    
    srs_freep(listener);
    listener = new SrsTcpListener(this, ip, port);//new一个SrsTcpListener对象,传一个指针
    
    if ((err = listener->listen()) != srs_success) {
        return srs_error_wrap(err, "buffered tcp listen");
    }
    
    string v = srs_listener_type2string(type);
    srs_trace("%s listen at tcp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd());
    
    return err;
}

  在new SrsTcpListener 时传入了this,其实是在构造的时候给handler赋值,继续进入SrsTcpListener::listen()

//每一个监听,对应一个协程
srs_error_t SrsTcpListener::listen()
{
    srs_error_t err = srs_success;
    //rtmp使用的是tcp,开始listen
    if ((err = srs_tcp_listen(ip, port, &lfd)) != srs_success) {
        return srs_error_wrap(err, "listen at %s:%d", ip.c_str(), port);
    }
    
    srs_freep(trd);
    trd = new SrsSTCoroutine("tcp", this);//创建一个协程,传一个用户(SrsTcpListener)指针,如果协程需要回调,可以通过指针找到对应的对象
    if ((err = trd->start()) != srs_success) {//启动协程,执行SrsSTCoroutine::cycle(),即handle->cycle(),最终是SrsTcpListener::cycle()
        return srs_error_wrap(err, "start coroutine");
    }
    
    return err;
}

  启动协程进行监听,执行cycle(),代码如下:

srs_error_t SrsTcpListener::cycle()
{
    srs_error_t err = srs_success;
    
    while (true) {
        if ((err = trd->pull()) != srs_success) {//读取错误码,判断协程是否结束,不为srs_success时,说明该协程要退出
            return srs_error_wrap(err, "tcp listener");
        }
       //
        srs_netfd_t fd = srs_accept(lfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);//检测新连接
        if(fd == NULL){
            return srs_error_new(ERROR_SOCKET_ACCEPT, "accept at fd=%d", srs_netfd_fileno(lfd));
        }
        
        if ((err = srs_fd_closeexec(srs_netfd_fileno(fd))) != srs_success) {
            return srs_error_wrap(err, "set closeexec");
        }
        
        if ((err = handler->on_tcp_client(fd)) != srs_success) {//handle就是new一个SrsTcpListener对象时,传入的ISrsTcpHandler指针,即SrsBufferListener(SrsBufferListener继承了ISrsTcpHandler)
            return srs_error_wrap(err, "handle fd=%d", srs_netfd_fileno(fd));
        }
    }
    
    return err;
}

  这里的on_tcp_client实际执行的就是构造函数时传入this,即SrsBufferListener的成员函数,代码如下:

//监听新的连接
srs_error_t SrsBufferListener::on_tcp_client(srs_netfd_t stfd)
{
    srs_error_t err = server->accept_client(type, stfd);
    if (err != srs_success) {
        srs_warn("accept client failed, err is %s", srs_error_desc(err).c_str());
        srs_freep(err);
    }
    
    return srs_success;
}

  进入accept_client代码如下:

//type传递了对应的连接类型
srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd)
{
    srs_error_t err = srs_success;
    
    ISrsStartableConneciton* conn = NULL;
    //将fd和一个conn绑定,并返回一个连接conn
    if ((err = fd_to_resource(type, stfd, &conn)) != srs_success) {
        if (srs_error_code(err) == ERROR_SOCKET_GET_PEER_IP && _srs_config->empty_ip_ok()) {
            srs_close_stfd(stfd); srs_error_reset(err);
            return srs_success;
        }
        return srs_error_wrap(err, "fd to resource");
    }
    srs_assert(conn);
    
    // directly enqueue, the cycle thread will remove the client.
    conn_manager->add(conn);//把连接添加到conn_manager进行管理
    //启动类型对应的协程,比如启动rtmp连接对应的协程,每个SrsRtmpConn都有1:1对应的协程
    if ((err = conn->start()) != srs_success) {
        return srs_error_wrap(err, "start conn coroutine");
    }
    
    return err;
}

  此处首先将fd和一个conn绑定,并返回一个连接conn,代码如下:

srs_error_t SrsServer::fd_to_resource(SrsListenerType type, srs_netfd_t stfd, ISrsStartableConneciton** pr)
{
    srs_error_t err = srs_success;
    int fd = srs_netfd_fileno(stfd);
    string ip = srs_get_peer_ip(fd);
    int port = srs_get_peer_port(fd);
    .....
    .....
    
    // 最大连接数判断处理
    .....
    .....

    // The context id may change during creating the bellow objects.
    SrsContextRestore(_srs_context->get_id());
    //new一个类型对应的连接
    if (type == SrsListenerRtmpStream) {
        *pr = new SrsRtmpConn(this, stfd, ip, port);
    } else if (type == SrsListenerHttpApi) {
        *pr = new SrsHttpApi(false, this, stfd, http_api_mux, ip, port);
    } else if (type == SrsListenerHttpsApi) {
        *pr = new SrsHttpApi(true, this, stfd, http_api_mux, ip, port);
    } else if (type == SrsListenerHttpStream) {
        *pr = new SrsResponseOnlyHttpConn(false, this, stfd, http_server, ip, port);
    } else if (type == SrsListenerHttpsStream) {
        *pr = new SrsResponseOnlyHttpConn(true, this, stfd, http_server, ip, port);
    } else {
        srs_warn("close for no service handler. fd=%d, ip=%s:%d", fd, ip.c_str(), port);
        srs_close_stfd(stfd);
        return err;
    }
    
    return err;
}

  其次时将连接conn添加到conn_manager进行管理,最后conn->start()启动协程进行接收/发送数据的处理,这里每一个SrsRtmpConn连接都有1:1对应SrsCoroutine协程,启动后进入SrsRtmpConn::do_cycle()轮询,代码如下:

// rtmp接收数据处理
srs_error_t SrsRtmpConn::do_cycle()
{
    srs_error_t err = srs_success;
    
    srs_trace("RTMP client ip=%s:%d, fd=%d", ip.c_str(), port, srs_netfd_fileno(stfd));
    //设置收发超时时间
    rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT);
    rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT);
    //rtmp 握手
    if ((err = rtmp->handshake()) != srs_success) {
        return srs_error_wrap(err, "rtmp handshake");
    }
    //rtmp代理相关
    uint32_t rip = rtmp->proxy_real_ip();
    if (rip > 0) {
        srs_trace("RTMP proxy real client ip=%d.%d.%d.%d",
            uint8_t(rip>>24), uint8_t(rip>>16), uint8_t(rip>>8), uint8_t(rip));
    }
    
    SrsRequest* req = info->req;
    if ((err = rtmp->connect_app(req)) != srs_success) {//握手成功后,处理client发送的connect
        return srs_error_wrap(err, "rtmp connect tcUrl");
    }
    
    // set client ip to request.
    req->ip = ip;//保存客户端IP
    
    srs_trace("connect app, tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%d, app=%s, args=%s",
        req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(),
        req->schema.c_str(), req->vhost.c_str(), req->port,
        req->app.c_str(), (req->args? "(obj)":"null"));
    
    // show client identity
    if(req->args) {
        std::string srs_version;
        std::string srs_server_ip;
        int srs_pid = 0;
        int srs_id = 0;
        
        SrsAmf0Any* prop = NULL;
        if ((prop = req->args->ensure_property_string("srs_version")) != NULL) {
            srs_version = prop->to_str();
        }
        if ((prop = req->args->ensure_property_string("srs_server_ip")) != NULL) {
            srs_server_ip = prop->to_str();
        }
        if ((prop = req->args->ensure_property_number("srs_pid")) != NULL) {
            srs_pid = (int)prop->to_number();
        }
        if ((prop = req->args->ensure_property_number("srs_id")) != NULL) {
            srs_id = (int)prop->to_number();
        }
        
        if (srs_pid > 0) {
            srs_trace("edge-srs ip=%s, version=%s, pid=%d, id=%d",
                srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);
        }
    }
    //
    if ((err = service_cycle()) != srs_success) {
        err = srs_error_wrap(err, "service cycle");
    }
    
    srs_error_t r0 = srs_success;
    if ((r0 = on_disconnect()) != srs_success) {
        err = srs_error_wrap(err, "on disconnect %s", srs_error_desc(r0).c_str());
        srs_freep(r0);
    }
    
    // If client is redirect to other servers, we already logged the event.
    if (srs_error_code(err) == ERROR_CONTROL_REDIRECT) {
        srs_error_reset(err);
    }
    
    return err;
}

  开始进行RTMP正常的握手交互过程、设置收发超时、rtmp代理,握手成功(处理client发送的connect请求);进入service_cycle(),继续数据交互,设置窗口大小、带宽大小、chunk大小、连接成功响应客户端。

{
    srs_error_t err = srs_success;
    
    SrsRequest* req = info->req;
    //窗口大小设置
    int out_ack_size = _srs_config->get_out_ack_size(req->vhost);
    if (out_ack_size && (err = rtmp->set_window_ack_size(out_ack_size)) != srs_success) {
        return srs_error_wrap(err, "rtmp: set out window ack size");
    }
    
    int in_ack_size = _srs_config->get_in_ack_size(req->vhost);
    if (in_ack_size && (err = rtmp->set_in_window_ack_size(in_ack_size)) != srs_success) {
        return srs_error_wrap(err, "rtmp: set in window ack size");
    }
    //带宽设置
    if ((err = rtmp->set_peer_bandwidth((int)(2.5 * 1000 * 1000), 2)) != srs_success) {
        return srs_error_wrap(err, "rtmp: set peer bandwidth");
    }
    
    // get the ip which client connected.
    std::string local_ip = srs_get_local_ip(srs_netfd_fileno(stfd));
    
    // do bandwidth test if connect to the vhost which is for bandwidth check.
    if (_srs_config->get_bw_check_enabled(req->vhost)) {
        if ((err = bandwidth->bandwidth_check(rtmp, skt, req, local_ip)) != srs_success) {
            return srs_error_wrap(err, "rtmp: bandwidth check");
        }
        return err;
    }
    
    // set chunk size to larger.
    // set the chunk size before any larger response greater than 128,
    // to make OBS happy, @see https://github.com/ossrs/srs/issues/454
    int chunk_size = _srs_config->get_chunk_size(req->vhost); //从配置文件读取chunk size大小,进行设置,一般设置60k,如果太小就得拆分
    if ((err = rtmp->set_chunk_size(chunk_size)) != srs_success) {
        return srs_error_wrap(err, "rtmp: set chunk size %d", chunk_size);
    }
    
    // response the client connect ok.
    if ((err = rtmp->response_connect_app(req, local_ip.c_str())) != srs_success) {//连接成功,响应客户端
        return srs_error_wrap(err, "rtmp: response connect app");
    }
    
    if ((err = rtmp->on_bw_done()) != srs_success) {
        return srs_error_wrap(err, "rtmp: on bw down");
    }
    //真正的循环
    while (true) {
        if ((err = trd->pull()) != srs_success) {
            return srs_error_wrap(err, "rtmp: thread quit");
        }
        
        err = stream_service_cycle();
       .........
    .........
} return err; }

  来到stream_service_cycle(),才是真正推流、拉流处理,值得注意的是,还对cache gop是否开启的设置。

srs_error_t SrsRtmpConn::stream_service_cycle()
{
    srs_error_t err = srs_success;
    ......
    ......
    // find a source to serve.
    SrsLiveSource* source = NULL;//一个直播对应一个SrsLiveSource,一个推流,0~N个拉流
    if ((err = _srs_sources->fetch_or_create(req, server, &source)) != srs_success) {//查找/创建一个source
        return srs_error_wrap(err, "rtmp: fetch source");
    }
    srs_assert(source != NULL);
    //读取配置文件,设置是否需要cache gop
    bool enabled_cache = _srs_config->get_gop_cache(req->vhost);//默认是开的
    srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%s/%s",
        req->get_stream_url().c_str(), ip.c_str(), enabled_cache, info->edge, source->source_id().c_str(), source->pre_source_id().c_str());
    source->set_cache(enabled_cache);//设置
    //推流、拉流处理
    switch (info->type) {
        case SrsRtmpConnPlay: {
            // response connection start play
            if ((err = rtmp->start_play(info->res->stream_id)) != srs_success) {
                return srs_error_wrap(err, "rtmp: start play");
            }
            if ((err = http_hooks_on_play()) != srs_success) {
                return srs_error_wrap(err, "rtmp: callback on play");
            }
            //拉流
            err = playing(source);
            http_hooks_on_stop();
            
            return err;
        }
        case SrsRtmpConnFMLEPublish: {//RTMP基本走这里
            if ((err = rtmp->start_fmle_publish(info->res->stream_id)) != srs_success) {//接收客户端相应的消息,并返回对应的响应
                return srs_error_wrap(err, "rtmp: start FMLE publish");
            }
            
            return publishing(source);
        }
        case SrsRtmpConnHaivisionPublish: {
            if ((err = rtmp->start_haivision_publish(info->res->stream_id)) != srs_success) {
                return srs_error_wrap(err, "rtmp: start HAIVISION publish");
            }
            
            return publishing(source);
        }
        case SrsRtmpConnFlashPublish: {
            if ((err = rtmp->start_flash_publish(info->res->stream_id)) != srs_success) {
                return srs_error_wrap(err, "rtmp: start FLASH publish");
            }
            
            return publishing(source);
        }
        default: {
            return srs_error_new(ERROR_SYSTEM_CLIENT_INVALID, "rtmp: unknown client type=%d", info->type);
        }
    }
    
    return err;
}

   推流流程

  推流流程主要是do_publishing,需要注意的是使用SrsPublishRecvThread封装好的协程与拉流使用的SrsQueueRecvThread区分开来,其代码如下:

//推流流程
srs_error_t SrsRtmpConn::publishing(SrsLiveSource* source)
{
    srs_error_t err = srs_success;
    
    SrsRequest* req = info->req;
    ..............// TODO: FIXME: Should refine the state of publishing.
    if ((err = acquire_publish(source)) == srs_success) {
        // 协程实际是SrsPublishRecvThread内部封装的SrsRecvThread的SrsCoroutine成员变量trd,主要看do_cycle()的流程
        // 参数:rtmp:在协程中有一些rtmp接收数据的处理,req:URL相关,
        SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source, _srs_context->get_id());
        err = do_publishing(source, &rtrd);//实际推流流程,source就是直播对应的那个source
        rtrd.stop();
    }
    ...........
    return err;
}

srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThread* rtrd)
{
    srs_error_t err = srs_success;
    
    SrsRequest* req = info->req;
    SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish();
    SrsAutoFree(SrsPithyPrint, pprint);

    // update the statistic when source disconveried.
    SrsStatistic* stat = SrsStatistic::instance();
    if ((err = stat->on_client(_srs_context->get_id().c_str(), req, this, info->type)) != srs_success) {
        return srs_error_wrap(err, "rtmp: stat client");
    }

    // start isolate recv thread.
    // TODO: FIXME: Pass the callback here.
    if ((err = rtrd->start()) != srs_success) {//启动协程,SrsRecvThread::do_cycle()轮询读取数据
        return srs_error_wrap(err, "rtmp: receive thread");
    }
    
    // initialize the publish timeout.
    publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost);
    publish_normal_timeout = _srs_config->get_publish_normal_timeout(req->vhost);
    
    // set the sock options.
    set_sock_options();
    
    if (true) {
        bool mr = _srs_config->get_mr_enabled(req->vhost);
        srs_utime_t mr_sleep = _srs_config->get_mr_sleep(req->vhost);
        srs_trace("start publish mr=%d/%d, p1stpt=%d, pnt=%d, tcp_nodelay=%d",
            mr, srsu2msi(mr_sleep), srsu2msi(publish_1stpkt_timeout), srsu2msi(publish_normal_timeout), tcp_nodelay);
    }
    
    int64_t nb_msgs = 0;
    uint64_t nb_frames = 0;
    while (true) {
        if ((err = trd->pull()) != srs_success) {
            return srs_error_wrap(err, "rtmp: thread quit");
        }

        pprint->elapse();

        // cond wait for timeout.
        if (nb_msgs == 0) {
            // when not got msgs, wait for a larger timeout.
            // @see https://github.com/ossrs/srs/issues/441
            rtrd->wait(publish_1stpkt_timeout);
        } else {
            rtrd->wait(publish_normal_timeout);
        }
        
        // check the thread error code.
        if ((err = rtrd->error_code()) != srs_success) {
            return srs_error_wrap(err, "rtmp: receive thread");
        }
        
        // when not got any messages, timeout. 超时处理
        if (rtrd->nb_msgs() <= nb_msgs) {
            return srs_error_new(ERROR_SOCKET_TIMEOUT, "rtmp: publish timeout %dms, nb_msgs=%d",
                nb_msgs? srsu2msi(publish_normal_timeout) : srsu2msi(publish_1stpkt_timeout), (int)nb_msgs);
        }
        nb_msgs = rtrd->nb_msgs();//收到消息数量
        
        // Update the stat for video fps.
        // @remark https://github.com/ossrs/srs/issues/851
        SrsStatistic* stat = SrsStatistic::instance();
        if ((err = stat->on_video_frames(req, (int)(rtrd->nb_video_frames() - nb_frames))) != srs_success) {
            return srs_error_wrap(err, "rtmp: stat video frames");
        }
        nb_frames = rtrd->nb_video_frames();//视频帧数量

        // reportable
        if (pprint->can_print()) {
            kbps->sample();
            bool mr = _srs_config->get_mr_enabled(req->vhost);
            srs_utime_t mr_sleep = _srs_config->get_mr_sleep(req->vhost);
            srs_trace("<- " SRS_CONSTS_LOG_CLIENT_PUBLISH " time=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mr=%d/%d, p1stpt=%d, pnt=%d",
                (int)pprint->age(), kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
                kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), mr, srsu2msi(mr_sleep),
                srsu2msi(publish_1stpkt_timeout), srsu2msi(publish_normal_timeout));//码率的计算,s,30s,5min的码率
        }
    }
    
    return err;
}

   看看SrsPublishRecvThread的成员SrsRecvThread trd 的do_cycle()的处理,这里主要是rtmp->recv_message(&msg)接收消息,pumper->consume(msg)把消息推送给消费者。

srs_error_t SrsRecvThread::do_cycle()
{
    srs_error_t err = srs_success;
    
    while (true) {
        if ((err = trd->pull()) != srs_success) {
            return srs_error_wrap(err, "recv thread");
        }
        
        // When the pumper is interrupted, wait then retry.
        if (pumper->interrupted()) {
            srs_usleep(timeout);
            continue;
        }
        
        SrsCommonMessage* msg = NULL;
        
        // Process the received message. 处理收到的消息,rtmp由SrsPublishRecvThread的构造函数传进来
        if ((err = rtmp->recv_message(&msg)) == srs_success) {
            err = pumper->consume(msg);//推送给消费者,pumper也是从SrsPublishRecvThread的SrsRecvThread成员变量trd的构造函数传进来的
        }
        
        if (err != srs_success) {
            // Interrupt the receive thread for any error.
            trd->interrupt();
            
            // Notify the pumper to quit for error.
            pumper->interrupt(err);
            
            return srs_error_wrap(err, "recv thread");
        }
    }
    
    return err;
}

  consume内部进行消息数量、视频帧数量的统计,然后_conn->handle_publish_message(_source, msg)对消息的处理,最终执行函数process_publish_message()。

//audio、video、metaData处理
srs_error_t SrsRtmpConn::process_publish_message(SrsLiveSource* source, SrsCommonMessage* msg)
{
    srs_error_t err = srs_success;
    
    // for edge, directly proxy message to origin.
    if (info->edge) {
        if ((err = source->on_edge_proxy_publish(msg)) != srs_success) {
            return srs_error_wrap(err, "rtmp: proxy publish");
        }
        return err;
    }
    
    // process audio packet RTMP_MSG_AudioMessage  8 
    if (msg->header.is_audio()) {
        if ((err = source->on_audio(msg)) != srs_success) {//audio的处理
            return srs_error_wrap(err, "rtmp: consume audio");
        }
        return err;
    }
    // process video packet RTMP_MSG_VideoMessage 9
    if (msg->header.is_video()) {
        if ((err = source->on_video(msg)) != srs_success) {//video处理
            return srs_error_wrap(err, "rtmp: consume video");
        }
        return err;
    }
    
    // process aggregate packet
    if (msg->header.is_aggregate()) {
        if ((err = source->on_aggregate(msg)) != srs_success) {
            return srs_error_wrap(err, "rtmp: consume aggregate");
        }
        return err;
    }
    
    // process onMetaData MetaData处理  RTMP_MSG_AMF0DataMessage 18 或  RTMP_MSG_AMF3DataMessage 15
    if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
        SrsPacket* pkt = NULL;
        if ((err = rtmp->decode_message(msg, &pkt)) != srs_success) {
            return srs_error_wrap(err, "rtmp: decode message");
        }
        SrsAutoFree(SrsPacket, pkt);
        
        if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {
            SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);//将packet转成metaData
            if ((err = source->on_meta_data(msg, metadata)) != srs_success) {
                return srs_error_wrap(err, "rtmp: consume metadata");
            }
            return err;
        }
        return err;
    }
    
    return err;
}

  process_publish_message对音频、视频、metaData进行处理;先看看音频处理,把msg发送给每一个拉流端消费者,这里的consumers容器保存所有拉流端消费者,在拉流流程中,新建消费者时添加的。

//音频数据处理
srs_error_t SrsLiveSource::on_audio(SrsCommonMessage* shared_audio)
{
    srs_error_t err = srs_success;
    .................
    
    // convert shared_audio to msg, user should not use shared_audio again.
    // 通过引用计数的方式,创建一个消息
    SrsSharedPtrMessage msg;//类似智能指针,数据拷贝实际上是浅拷贝,通过引用计数方式,为0释放内存
    if ((err = msg.create(shared_audio)) != srs_success) {
        return srs_error_wrap(err, "create message");
    }
    
    // directly process the audio message.
    if (!mix_correct) {//默认不做校正,就直接处理,就是不用放到map进行排序
        return on_audio_imp(&msg);
    }
    
    // insert msg to the queue. 
    mix_queue->push(msg.copy());//把流消息都插入到队列中,内部并按时间戳做了排序
    
    // fetch someone from mix queue. 从map中取出来
    SrsSharedPtrMessage* m = mix_queue->pop();//pop时间戳最小的出来
    if (!m) {
        return err;
    }
    
    // consume the monotonically increase message.
    if (m->is_audio()) {
        err = on_audio_imp(m);
    } else {
        err = on_video_imp(m);
    }
    srs_freep(m);
    
    return err;
}

srs_error_t SrsLiveSource::on_audio_imp(SrsSharedPtrMessage* msg)
{
    srs_error_t err = srs_success;
    
    ............................

    // copy to all consumer 把msg拷贝到消费者对象的队列中,即把数据发给每个拉流端
    if (!drop_for_reduce) {
        for (int i = 0; i < (int)consumers.size(); i++) {
            SrsLiveConsumer* consumer = consumers.at(i);
            if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) {//把消息放到消费者队列
                return srs_error_wrap(err, "consume message");
            }
        }
    }
    
    // cache the sequence header of aac, or first packet of mp3.
    // for example, the mp3 is used for hls to write the "right" audio codec.
    // TODO: FIXME: to refine the stream info system.
    if (is_aac_sequence_header || !meta->ash()) {
        if ((err = meta->update_ash(msg)) != srs_success) { //更新audio sequence
            return srs_error_wrap(err, "meta consume audio");
        }
    }
    
    // when sequence header, donot push to gop cache and adjust the timestamp.
    if (is_sequence_header) {
        return err;
    }
    
    // cache the last gop packets
    if ((err = gop_cache->cache(msg)) != srs_success) {
        return srs_error_wrap(err, "gop cache consume audio");
    }

.............
    
    return err;
}

  类似的视频处理,如下:

srs_error_t SrsLiveSource::on_video(SrsCommonMessage* shared_video)
{
    srs_error_t err = srs_success;
    
    .........................................
    
    // convert shared_video to msg, user should not use shared_video again.
    // the payload is transfer to msg, and set to NULL in shared_video.
    SrsSharedPtrMessage msg;//智能指针的封装
    if ((err = msg.create(shared_video)) != srs_success) {
        return srs_error_wrap(err, "create message");
    }
    
    // directly process the video message.
    if (!mix_correct) {
        return on_video_imp(&msg);
    }
    
    // insert msg to the queue.
    mix_queue->push(msg.copy());//把流消息都插入到队列中,内部并按时间戳做了排序
    
    // fetch someone from mix queue.
    SrsSharedPtrMessage* m = mix_queue->pop();//pop时间戳最小的消息出来
    if (!m) {
        return err;
    }
    
    // consume the monotonically increase message.
    if (m->is_audio()) {
        err = on_audio_imp(m);
    } else {
        err = on_video_imp(m);
    }
    srs_freep(m);
    
    return err;
}

srs_error_t SrsLiveSource::on_video_imp(SrsSharedPtrMessage* msg)
{
    srs_error_t err = srs_success;
    .....................
    // cache the sequence header if h264
    // donot cache the sequence header to gop_cache, return here.
    if (is_sequence_header && (err = meta->update_vsh(msg)) != srs_success) { //更新video sequence
        return srs_error_wrap(err, "meta update video");
    }
    
    // Copy to hub to all utilities.
    if ((err = hub->on_video(msg, is_sequence_header)) != srs_success) {
        return srs_error_wrap(err, "hub consume video");
    }

    // For bridger to consume the message.
    if (bridger_ && (err = bridger_->on_video(msg)) != srs_success) {
        return srs_error_wrap(err, "bridger consume video");
    }

    // copy to all consumer 把数据发给拉流端的消费者(队列中)
    if (!drop_for_reduce) {
        for (int i = 0; i < (int)consumers.size(); i++) {
            SrsLiveConsumer* consumer = consumers.at(i);
            if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) {//把消息放到消费者队列中
                return srs_error_wrap(err, "consume video");
            }
        }
    }
    
    // when sequence header, donot push to gop cache and adjust the timestamp.
    if (is_sequence_header) {
        return err;
    }
    
    // cache the last gop packets       cache gop 如果是I帧,就会清空掉,重新push新的数据
    if ((err = gop_cache->cache(msg)) != srs_success) {
        return srs_error_wrap(err, "gop cache consume vdieo");
    }
    .........
    return err;
}

  metaData的处理如下:

srs_error_t SrsLiveSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata)
{
    srs_error_t err = srs_success;
    ..............
    
    // Update the meta cache.  更新metaData保存起来
    bool updated = false;
    if ((err = meta->update_data(&msg->header, metadata, updated)) != srs_success) {
        return srs_error_wrap(err, "update metadata");
    }
    if (!updated) {
        return err;
    }
    
    // when already got metadata, drop when reduce sequence header.
    bool drop_for_reduce = false;
    if (meta->data() && _srs_config->get_reduce_sequence_header(req->vhost)) {
        drop_for_reduce = true;
        srs_warn("drop for reduce sh metadata, size=%d", msg->size);
    }
    
    // copy to all consumer 把推流端发的metaData也插入消费队列中,便于拉流者知道
    if (!drop_for_reduce) {
        std::vector<SrsLiveConsumer*>::iterator it;
        for (it = consumers.begin(); it != consumers.end(); ++it) {
            SrsLiveConsumer* consumer = *it;
            if ((err = consumer->enqueue(meta->data(), atc, jitter_algorithm)) != srs_success) {
                return srs_error_wrap(err, "consume metadata");
            }
        }
    }
    
    // Copy to hub to all utilities.
    return hub->on_meta_data(meta->data(), metadata);
}

拉流流程

   首先每一个拉流端都会绑定一个SrsConsumer消费者,每一个消费者对应一个SrsQueueRecvThread协程,执行do_playing

srs_error_t SrsRtmpConn::playing(SrsLiveSource* source)
{
    srs_error_t err = srs_success;
    ........................
    
    // Create a consumer of source.
    SrsLiveConsumer* consumer = NULL;//消费者,每个拉流都会绑定一个SrsConsumer
    SrsAutoFree(SrsLiveConsumer, consumer);
    if ((err = source->create_consumer(consumer)) != srs_success) {
        return srs_error_wrap(err, "rtmp: create consumer");
    }
    if ((err = source->consumer_dumps(consumer)) != srs_success) {
        return srs_error_wrap(err, "rtmp: dumps consumer");
    }
    
    // 每一个消费者独立一个协程
    SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP, _srs_context->get_id());
    
    if ((err = trd.start()) != srs_success) {
        return srs_error_wrap(err, "rtmp: start receive thread");
    }
    
    // Deliver packets to peer.
    wakable = consumer;
    err = do_playing(source, consumer, &trd);//每个流source绑定一个消费者SrsConsumer
    wakable = NULL;
    
    trd.stop();
    
    // Drop all packets in receiving thread.
    if (!trd.empty()) {
        srs_warn("drop the received %d messages", trd.size());
    }
    
    return err;
}

srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* consumer, SrsQueueRecvThread* rtrd)
{
    srs_error_t err = srs_success;
    ...................................
    
    while (true) {
        // when source is set to expired, disconnect it.
        if ((err = trd->pull()) != srs_success) {//判断协程是否退出
            return srs_error_wrap(err, "rtmp: thread quit");
        }

        // collect elapse for pithy print.
        pprint->elapse();

        // to use isolate thread to recv, can improve about 33% performance.
        while (!rtrd->empty()) {
            SrsCommonMessage* msg = rtrd->pump();
            if ((err = process_play_control_msg(consumer, msg)) != srs_success) {//播放控制处理
                return srs_error_wrap(err, "rtmp: play control message");
            }
        }
        
        // quit when recv thread error.
        if ((err = rtrd->error_code()) != srs_success) {
            return srs_error_wrap(err, "rtmp: recv thread");
        }
        
#ifdef SRS_PERF_QUEUE_COND_WAIT
        // wait for message to incoming.
        // @see https://github.com/ossrs/srs/issues/257
        consumer->wait(mw_msgs, mw_sleep);//等数据累积一段时间攒一定数据,再发送
#endif
        
        // get messages from consumer.
        // each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
        // @remark when enable send_min_interval, only fetch one message a time.
        int count = (send_min_interval > 0)? 1 : 0;
        if ((err = consumer->dump_packets(&msgs, count)) != srs_success) {//从消费队列中一次读取出来,数据从SrsConsumer queue来,实际是从source给过来的
            return srs_error_wrap(err, "rtmp: consumer dump packets");
        }
        ...................................................
// sendout messages, all messages are freed by send_and_free_messages(). // no need to assert msg, for the rtmp will assert it. if (count > 0 && (err = rtmp->send_and_free_messages(msgs.msgs, count, info->res->stream_id)) != srs_success) {//发送数据,给到客户端,最终调用protocol封装好的socket api return srs_error_wrap(err, "rtmp: send %d messages", count); } // if duration specified, and exceed it, stop play live. // @see: https://github.com/ossrs/srs/issues/45 if (user_specified_duration_to_stop) { if (duration >= req->duration) { return srs_error_new(ERROR_RTMP_DURATION_EXCEED, "rtmp: time %d up %d", srsu2msi(duration), srsu2msi(req->duration)); } } // apply the minimal interval for delivery stream in srs_utime_t. if (send_min_interval > 0) { srs_usleep(send_min_interval); } // Yield to another coroutines. // @see https://github.com/ossrs/srs/issues/2194#issuecomment-777437476 srs_thread_yield();//让出cpu,让其他协程继续运行 } return err; }

  do_playing内部process_play_control_msg播放控制处理,consumer->dump_packets(&msgs, count)从消费队列读取数据,最终rtmp->send_and_free_messages(msgs.msgs, count, info->res->stream_id)发送到play客户端。

 

标签:return,success,err,srs,SRS,推拉,RTMP,error,rtmp
From: https://www.cnblogs.com/juju-go/p/17039564.html

相关文章

  • RTMP录屏直播屏幕数据获取与MediaCodec编码
    目录前言RTMP直播实现流程视频采集——MediaProjection编码——MediaCodec音频采集——AudioRecordRTMP音频包数据RTMP视频数据前言本文介绍的是MediaProjection录屏、麦......
  • 终极音频增强软件 SRS Audio Sandbox 特别版
    SRSAudioSandbox是一款个人计算机终极音频增强软件该软件可以提供令人叹为观止的环绕音效,重低音效果并且非常清晰,甚至可以用于桌面扬声器可以作用于个人计算机上的所有音......
  • SRS流媒体服务器——SRS4.0 WebRTC一对一通话环境搭建与逻辑分析
    目录环境搭建SRS4.0WebRTC1对1通话逻辑分析环境搭建1.安装go语⾔环境在Go语言官网找到对应的安装包(https://golang.google.cn/dl/)下载和解析(使用的是阿里云的Ub......
  • ubuntu20.04搭建Nginx+rtmp服务器
    1.ubuntu20.04安装Nginx代理服务器安装nginxsudoaptupdatesudoaptinstallnginx安装完成后,Nginx将会自动被启动。运行下面的命令来验证:   测试安装在网页......
  • 搭建RTMP server
    installnginxwgethttps://nginx.org/download/nginx-1.20.1.tar.gztar-zxvfnginx-1.20.1.tar.gzwgethttps://github.com/arut/nginx-rtmp-module/archive/refs/ta......
  • RTMP、X264与交叉编译
    RTMP、X264与交叉编译​ 与HTTP(超文本传输协议)同样是一个基于TCP的RealTimeMessagingProtocol(实时消息传输协议)。由AdobeSystems公司为Flash播放器和服务器之间音频......
  • 【技术分享】Windows平台低延迟RTMP、RTSP播放器接口设计探讨
    背景我们看过了太多介绍RTSP、RTMP播放相关的技术资料,大多接口设计简约,延迟和扩展能力也受到一定的局限,好多开发者希望我们能从接口设计的角度,大概介绍下大牛直播SDK关于RTM......
  • srs-bench 命令参数详解
    srs-bench项目源码,请在github上搜索安装过程省略安装完成后,进入srs-bench目录,执行srs_bench-h选项:-sfu目标SFU,srs或gb28181或janus。默认值:srs-nn要模......
  • 【转载】SRS、EasyDarwin、ZLMediaKit、Monibuca对比分析
    声明转自liuzhen007的《SRS、EasyDarwin、ZLMediaKit、Monibuca对比分析》作者:liuzhen007链接:https://juejin.cn/post/6926739029496954888来源:稀土掘金著作权归作......
  • 音视频:JavaCV 采集摄像头和麦克风数据推送RTMP流
    需要进行简单的音视频编程,如果不是特别数据C/C++,那么JavaCV应该是比较好的选择,下面记录一下使用JavaCV采集桌面数据的方式。同时采集视频和音频需要最好采用不同的线程进......