业务场景是:有一个rtmp的源,对外提供rtmp的直播节目,地址rtmp://abc.com/live/tv, 现在的需求是要将此节目拉过来,生成HLS对外发布,或对外还是rtmp发布,比如rtmp://my.com/live/tv。 作用嘛,肯定是你懂的! 此时需要一个把rtmp数据拉过来,再推出去的动作,一般推给SRS流媒体服务器后,即可随意对外分发提供rtmp或hls。 如何实现呢? 一般的流媒体服务器,像SRS提供许多流媒体格式的输出,可以用于进行流媒体处理,但此时需要一个拉流的程序。SRS里面用的是ffmpeg程序,但是个人感觉并不是特别好用,比如在某种情况下,ffmpeg卡死了,但是SRS并不会让ffmpeg重新拉流。 这里推荐两种方法: 一、使用zlmedia 的json API实现 最简单的办法就是使用zlmedia的mediaproxy API接口,即可完成,具体可参考 MediaServer支持的HTTP API · ZLMediaKit/ZLMediaKit Wiki · GitHub "/index/api/addStreamProxy" 二、使用zlmedia进行二次开发 如果需要用程序进行自定义实现,可以参照 https://github.com/xia-chu/ZLMediaKit/blob/master/tests/test_pusher.cpp 假定SRS运行在本机的1935端口,推给SRS后,由SRS再处理对外发布。 int main(int argc, char *argv[]) { return domain("rtmp://abc.com/live/hks1", "rtmp://127.0.0.1/live/tv"); } 以下是我根据ZLMediaKit中的 test_pusher修改后的代码,可以从外界传递参数到程序中,实现自定义拉流并转推。 实测此功能正常运行,稳定性还不错。 程序的编译方法:在编译zlmedia的时候会自动编译test目录下的所有.cpp。 #include <signal.h> #include <iostream> #include "Util/logger.h" #include "Util/NoticeCenter.h" #include "Poller/EventPoller.h" #include "Player/PlayerProxy.h" #include "Rtmp/RtmpPusher.h" #include "Common/config.h" #include "Pusher/MediaPusher.h" #include "Util/CMD.h" using namespace std; using namespace toolkit; using namespace mediakit; //推流器,保持强引用 MediaPusher::Ptr pusher; Timer::Ptr g_timer; //声明函数 void rePushDelay(const EventPoller::Ptr &poller,const string &schema,const string &vhost,const string &app, const string &stream, const string &url); //创建推流器并开始推流 void createPusher(const EventPoller::Ptr &poller, const string &schema,const string &vhost,const string &app, const string &stream, const string &url) { //创建推流器并绑定一个MediaSource pusher.reset(new MediaPusher(schema,vhost, app, stream,poller)); //可以指定rtsp推流方式,支持tcp和udp方式,默认tcp // (*pusher)[Client::kRtpType] = Rtsp::RTP_UDP; //设置推流中断处理逻辑 pusher->setOnShutdown([poller,schema,vhost, app, stream, url](const SockException &ex) { WarnL << "Server connection is closed:" << ex.getErrCode() << " " << ex.what(); //重试 rePushDelay(poller,schema,vhost,app, stream, url); }); //设置发布结果处理逻辑 pusher->setOnPublished([poller,schema,vhost, app, stream, url](const SockException &ex) { if (ex) { WarnL << "Publish fail:" << ex.getErrCode() << " " << ex.what(); //如果发布失败,就重试 rePushDelay(poller,schema,vhost,app, stream, url); } else { InfoL << "Publish success,Please play with player:" << url; } }); pusher->publish(url); } //推流失败或断开延迟2秒后重试推流 void rePushDelay(const EventPoller::Ptr &poller,const string &schema,const string &vhost,const string &app, const string &stream, const string &url) { g_timer = std::make_shared<Timer>(2,[poller,schema,vhost,app, stream, url]() { InfoL << "Re-Publishing..."; //重新推流 createPusher(poller,schema,vhost,app, stream, url); //此任务不重复 return false; }, poller); } //这里才是真正执行main函数,你可以把函数名(domain)改成main,然后就可以输入自定义url了 int domain(const string &playUrl, const string &pushUrl) { auto poller = EventPollerPool::Instance().getPoller(); //拉一个流,生成一个RtmpMediaSource,源的名称是"app/stream" //你也可以以其他方式生成RtmpMediaSource,比如说MP4文件(请查看test_rtmpPusherMp4.cpp代码) MediaInfo info(pushUrl); PlayerProxy::Ptr player(new PlayerProxy(DEFAULT_VHOST, "app", "stream",false,false,-1 , poller)); //可以指定rtsp拉流方式,支持tcp和udp方式,默认tcp // (*player)[Client::kRtpType] = Rtsp::RTP_UDP; player->play(playUrl.data()); //监听RtmpMediaSource注册事件,在PlayerProxy播放成功后触发 NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaChanged, [pushUrl,poller](BroadcastMediaChangedArgs) { //媒体源"app/stream"已经注册,这时方可新建一个RtmpPusher对象并绑定该媒体源 if(bRegist && pushUrl.find(sender.getSchema()) == 0){ createPusher(poller,sender.getSchema(),sender.getVhost(),sender.getApp(), sender.getId(), pushUrl); } }); //设置退出信号处理函数 static semaphore sem; signal(SIGINT, [](int) { sem.post(); });// 设置退出信号 sem.wait(); pusher.reset(); g_timer.reset(); return 0; } int main(int argc, char *argv[]) { // ./ffmpeg -loglevel info -f flv -i rtmp://test.yunyingtx.com/live/PLTV/88888888/tv // -vcodec copy -acodec copy -f flv -y rtmp://127.0.0.1:1936/live/tv string input; string output; try { for (int i = 0; i < argc-1; i++) { if (string(argv[i]) == "-i") input = argv[i + 1]; } for (int i = 0; i < argc - 1; i++) { if (string(argv[i]) == "-y") output = argv[i + 1]; } } catch (std::exception &ex) { cout << ex.what() << endl; return -1; } if (input.find("rtmp://") == string::npos || output.find("rtmp://") == string::npos) { return -1; } EventPollerPool::setPoolSize(1); //设置日志 LogLevel logLevel = (LogLevel)LDebug;// Logger::Instance().add(std::make_shared<ConsoleChannel>()); Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>()); auto fileChannel = std::make_shared<FileChannel>("FileChannel", exeDir() + "../logs/", logLevel); fileChannel->setMaxDay(7); Logger::Instance().add(fileChannel); DebugL << "program started." << endl; DebugL << input<<endl; DebugL << output<<endl; return domain(input, output);//"rtmp://127.0.0.1/live/cctv13" } zlmedia本身存在一个问题:拉流时有中断的现象,就是说rtmp源还在播,但是我们的这个程序不知道因为什么原因停止工作了,导致整个媒体处理流程中断。 经过长时间的跟踪和调试,发现ZLMediaKit在处理 rtmp的流程存在问题 https://github.com/xia-chu/ZLMediaKit/issues/455 解决办法是在ZLMediaKit的 src/Rtmp/RtmpPlayerImp.h 文件中 对onCheckMeta() 函数加入对 _delegate为空的判断,只有为空的时候才reset()。 修改前: bool onCheckMeta(const AMFValue &val) override { _rtmp_src = dynamic_pointer_cast<RtmpMediaSource>(_pMediaSrc); if (_rtmp_src) { _rtmp_src->setMetaData(val); _set_meta_data = true; } _delegate.reset(new RtmpDemuxer); _delegate->loadMetaData(val); return true; } 修改后 : bool onCheckMeta(const AMFValue &val) override { _rtmp_src = dynamic_pointer_cast<RtmpMediaSource>(_pMediaSrc); if (_rtmp_src) { _rtmp_src->setMetaData(val); _set_meta_data = true; } if (!_delegate) { _delegate.reset(new RtmpDemuxer); _delegate->loadMetaData(val); } return true; }
标签:const,string,app,hls,rtmp,include,poller,ZLMedia From: https://www.cnblogs.com/kn-zheng/p/17445375.html