首页 > 其他分享 >freeswitch笔记(8)-esl outbound 填坑笔记

freeswitch笔记(8)-esl outbound 填坑笔记

时间:2023-02-07 18:23:58浏览次数:81  
标签:ExecutorService outbound 笔记 填坑 client 线程 freeswitch new esl

github上的esl-client已经N年未更新了,上面有一堆bug,记录一下:

 

一、内存泄露

org.freeswitch.esl.client.transport.message.EslFrameDecoder 这个类,使用了netty的ByteBuf,对netty有了解的同学应该知道,netty底层大量使用了堆外内存,建议开发人员及时手动释放。

https://github.com/esl-client/esl-client/issues/24 也有记载

参考下图,手动加上释放处理即可

 

 

二、线程池优化

org.freeswitch.esl.client.outbound.OutboundChannelInitializer 这个类,每次freeswitch有来电时,会以outbound外联模式,通过tcp连接到esl client,初始化channel。callbackExector是一个单线程池,正常情况下问题倒不大,但是jdk源码:

1 2 3 4 5 6 public static ExecutorService newSingleThreadExecutor() {     return new FinalizableDelegatedExecutorService         (new ThreadPoolExecutor(11,                                 0L, TimeUnit.MILLISECONDS,                                 new LinkedBlockingQueue<Runnable>())); }

LinkedBlockingQueue默认是一个无界队列:

1 2 3 public LinkedBlockingQueue() {     this(Integer.MAX_VALUE); }

有点风险,改成下面这样更安全点:

1 2 3 4 5 6 private ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()         .setNameFormat("outbound-pool-%d").build();   public ExecutorService callbackExecutor = new ThreadPoolExecutor(11,         0L, TimeUnit.MILLISECONDS,         new LinkedBlockingQueue<>(10000), namedThreadFactory);

这个单线程池的用法也顺带研究了下,它真正使用的地方在于org.freeswitch.esl.client.outbound.OutboundClientHandler,用于处理freeswitch发过来的事件

1 2 3 4 5 @Override protected void handleEslEvent(final ChannelHandlerContext ctx, final EslEvent event) {     callbackExecutor.execute(() -> clientHandler.onEslEvent(             new Context(ctx.channel(), OutboundClientHandler.this), event)); }

大家知道Netty本身就有2个线程池:bossGroup,workerGroup,默认大小在io.netty.channel.MultithreadEventLoopGroup中

1 2 3 4 5 6 7 8 static {     DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(             "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));       if (logger.isDebugEnabled()) {         logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);     } }

即:核数*2。 既然已经是线程池了,为啥这里esl的事件又单独交给1个单线程池来处理呢? 先来看OutboundChannelInitializer实例化的地方,在org.freeswitch.esl.client.outbound.SocketClient的doStart里

1 2 3 4 5 6 7 8 9 10 11 12 13 @Override protected void doStart() {     final ServerBootstrap bootstrap = new ServerBootstrap()             .group(bossGroup, workerGroup)             .channel(NioServerSocketChannel.class)             .childOption(ChannelOption.TCP_NODELAY, true)             .childOption(ChannelOption.SO_KEEPALIVE, true)             .childHandler(new OutboundChannelInitializer(clientHandlerFactory));       serverChannel = bootstrap.bind(bindAddress).syncUninterruptibly().channel();     notifyStarted();     log.info("SocketClient waiting for connections on [{}] ...", bindAddress); }

也就是说,只有outbound tcp server启用时,才会对OutboundChannelInitializer做1次初始化,言外之意,刚才的单线程池实例也只会实例化1次。

试想一下,如果在outbound的处理过程中,一通电话进来,我们订阅了一堆事件,这堆事件发过来后,如果让workerGroup并行处理,事件的处理顺序就得不到保证了,这在电话系统中是很重要的,比如:响铃->接听->挂断。肯定要有顺序的!所以为了保证事件处理的顺序性,强制让所有事件,都交给这个单线程池实例来处理,保证了顺序性。

其实不光是outbound,inbound也是类似机制,保证事件接收时按顺序处理。明白这个原理后,回过头来想想,这个单线程池的callbackExector实例,应该处理成static静态实例更稳妥,这样强制让jvm保证肯定只有一个实例,处理事件绝对有顺序。

 

另外,在outbound的onConnect事件里,如果尝试跟freeswitch发命令,会发现block住,后面的代码完全无法执行,这也是一个大坑。解决办法:

将onConnect的处理,放在另外1个专用线程池里

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 class OutboundClientHandler extends AbstractEslClientHandler {       //这是保证事件接收顺序的单线程池     private final ExecutorService onEslEventExecutor;     //这是用于并发处理onConnect的多线程池     private final ExecutorService onConnectExecutor;         public OutboundClientHandler(IClientHandler clientHandler, ExecutorService onEslEventExecutor, ExecutorService onConnectExecutor) {         this.clientHandler = clientHandler;         //构造函数里允许传入         this.onEslEventExecutor = onEslEventExecutor;         this.onConnectExecutor = onConnectExecutor;     }       @Override     public void channelActive(final ChannelHandlerContext ctx) throws Exception {         super.channelActive(ctx);           // Have received a connection from FreeSWITCH server, send connect response         long threadId = Thread.currentThread().getId();         log.debug("Received new connection from server, sending connect message,threadId:" + threadId);           sendApiSingleLineCommand(ctx.channel(), "connect")                 .thenAccept(response ->                         //这里改为线程池执行                         onConnectExecutor.execute(() -> clientHandler.onConnect(                                 new Context(ctx.channel(), OutboundClientHandler.this),                                 new EslEvent(response, true)))                 )                 .exceptionally(throwable -> {                     ctx.channel().close();                     handleDisconnectionNotice();                     return null;                 });     }       @Override     protected void handleEslEvent(final ChannelHandlerContext ctx, final EslEvent event) {         //这里仍然用单一线程池处理,保证顺序         onEslEventExecutor.execute(() -> clientHandler.onEslEvent(                 new Context(ctx.channel(), OutboundClientHandler.this), event));     }       ... }

然后

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class OutboundChannelInitializer extends ChannelInitializer<SocketChannel> {       private final IClientHandlerFactory clientHandlerFactory;       private static ThreadFactory onEslThreadFactory = new ThreadFactoryBuilder()             .setNameFormat("outbound-onEsl-pool-%d").build();       //专门接收订阅事件的单一线程池(保证顺序)     private static ExecutorService onEslExecutor = new ThreadPoolExecutor(11,             0L, TimeUnit.MILLISECONDS,             new LinkedBlockingQueue<>(100000), onEslThreadFactory);       private static ThreadFactory onConnectThreadFactory = new ThreadFactoryBuilder()             .setNameFormat("outbound-onConnect-pool-%d").build();       //专用于处理新来电onConnect的多线程池     private static ExecutorService onConnectExecutor = new ThreadPoolExecutor(32512,             60L, TimeUnit.SECONDS,             new LinkedBlockingQueue<>(2048), onConnectThreadFactory);         public OutboundChannelInitializer(IClientHandlerFactory clientHandlerFactory) {         this.clientHandlerFactory = clientHandlerFactory;     }       /**      * 重载版本,允许开发人员初始化时,传入自己的线程池      * @param clientHandlerFactory      * @param connExecutor      * @param eslExecutor      */     public OutboundChannelInitializer(IClientHandlerFactory clientHandlerFactory, ExecutorService connExecutor, ExecutorService eslExecutor) {         this.clientHandlerFactory = clientHandlerFactory;         onEslExecutor = eslExecutor;         onConnectExecutor = connExecutor;     }       @Override     protected void initChannel(SocketChannel ch) throws Exception {         ChannelPipeline pipeline = ch.pipeline();         // Add the text line codec combination first         pipeline.addLast("encoder"new StringEncoder());         // Note that outbound mode requires the decoder to treat many 'headers' as body lines         pipeline.addLast("decoder"new EslFrameDecoder(8092true));           // now the outbound client logic         //将2个线程池,传入实例         pipeline.addLast("clientHandler",                 new OutboundClientHandler(clientHandlerFactory.createClientHandler(), onEslExecutor, onConnectExecutor));       } }

   

三、源码上的Test示例代码各种错误

https://github.com/esl-client/esl-client/blob/master/src/test/java/OutboundTest.java 这是示例源码

1 String uuid = eslEvent.getEventHeaders().get("unique-id");

45行,这里应该是"Unique-ID",小写取不到值。

另外82行,outbound的onEslEvent方法,其实永远也不会被触发,因为根本没订阅任何事件,inbound的示例部分也有同样问题。

56行,执行后,实测下来,后面的操作其实都是阻塞的,代码无法向下执行,建议改在新线程里执行(或者参考上面的“线程池优化”分析,修改源码)。

 

上述这些问题,笔者已经fork了一份代码进行了修改,有兴趣的同学,欢迎fork,地址:https://github.com/yjmyzz/esl-client

标签:ExecutorService,outbound,笔记,填坑,client,线程,freeswitch,new,esl
From: https://www.cnblogs.com/kn-zheng/p/17099410.html

相关文章

  • freeswitch笔记(7)-放音控制
    来电时,播放音乐是一个很常用的功能,下面是一些相关的命令:一、单次播放playback1originateuser/1000 &playback(ivr/8000/ivr-welcome_to_freeswitch.wav)......
  • freeswitch笔记(6)-会议功能简介
    电话会议是一个常用功能,freeswitch当然支持,下面是基本用法:一、发起会议1conference test bgdialuser/1004上面的命令表示,发起1个名为test的会话,......
  • freeswitch笔记(5)-小型呼叫中心设计思路
    这一篇用esl实战一把,利用eslclient来实现一个小型呼叫中心的原型,先看看下面这张图: 企业通常会对外公布一个400之类的服务电话,当用户拨打这个电话时,实际上背后是一堆客......
  • freeswitch笔记(4)-esl inbound模式的重连及内存泄露问题
    eslinboundclient,内部有一个canSend()方法:123publicbooleancanSend(){    return channel!=null&&channel.isConnected()&&authenticated......
  • FreeSwitch:send_dtmf/uuid_send_dtmf发送按键注意事项
    很多时候我们打电话到公司前台,会听到类似“欢迎致电XXX,办公电话请直拨分机,咨询XX请按1,咨询YY请按2”这样的语音提示。在一些特定流程中,系统自动发起呼叫打到前台,希望实现自......
  • freeswitch批量添加用户
    默认情况下,freeswitch内置了1000-1019这20个用户,如果需要添加更多用户,可以按如下步骤操作:一、复制用户文件\FreeSWITCH\conf\directory\default 下有1000.xml~1019.xm......
  • java_html笔记
    颜色color字体大小 1.数值+单位 2.关键字-px-em字体(可以写多个,但不是全都生效只生效存在的如果全都不存在则使用默认字体)font-family:"abccde",......
  • freeswitch: 如何指定主叫显示号码
    一、origiante时指定主叫号码正常情况下,如果在freeswitch控制台,输入类似下面 命令:originateuser/1000 &park被叫收到振铃提示时,显示的号码类似下面......
  • python笔记1
    1.python转义符\"输出等于“\n换行\r覆盖,后面覆盖前面 \b删除前面一个字节\t制表符:插入四个空格,但是会自动补齐\\两个斜杠表示一个斜杠r在前面写个r表示转义字符......
  • 《可能性的艺术》——读书笔记
    2023.2.3序言①米兰达警告:你有保持沉默的权利,但是你所说的每句话都将成为呈堂证供第一讲:政治比较的维度②智慧的本质就是对事物比例的公正判断2023.2.7第二讲:和......