首页 > 其他分享 >消息推送性能提升-1-单服务器

消息推送性能提升-1-单服务器

时间:2023-02-23 12:12:21浏览次数:44  
标签:daemon Reactor 性能 threadNamePrefix TCP 线程 customThreadPoolConfig 服务器 推送

TCP 服务端的设计

服务端采用 Netty 框架,我们使用的是 Netty 的主从多线程 Reactor 模型。Reactor 模型是 Netty 实现高性能的基础,Netty 的 Reactor 模型分为三种:

1.单线程模型、2.多线程模型、3.主从多线程模型。

主从多线程模型由多个 Reactor 线程组成,MainReactor 负责处理客户端连接的 Accept 事件,连接建立成功后将新创建的连接对象注册至 SubReactor。SubReactor 分配线程池中的 I/O 线程与其连接绑定,负责连接生命周期内所有的 I/O 事件。主从多线程模型可以利用 CPU 的多核来提升系统的吞吐量,因此这也是 Netty 推荐使用的模型。

我们需要在服务端定义 boss 和 worker 这两个 Reactor。其中,boss 是主 Reactor,worker 是从 Reactor。它们分别使用不同的 NioEventLoopGroup,主 Reactor 负责处理 Accept 然后把 Channel 注册到从 Reactor 上,从 Reactor 主要负责 Channel 生命周期内的所有 I/O 事件。

ChannelHandler 的使用

从上述代码中,可以看到 worker 处理了各种各样的 Handler。其中,ServerIdleHandler 继承 Netty 自带的 IdleStateHandler 类,用于检测连接的有效性。如果 150秒内没有收到心跳,则断开连接。

MagicNumValidator:用于 TCP 报文的魔数校验。

PacketCodecHandler:解析报文的 Handler。PacketCodecHandler 继承自 ByteToMessageCodec ,它是用来处理 byte-to-message 和 message-to-byte,便于解码字节消息成 POJO 或编码 POJO 消息成字节。

这一步非常关键。因为 TCP 作为传输层的协议,无法理解上层业务数据的具体含义,它根据 TCP 缓冲区的实际情况进行数据包的划分。在业务上认为是一个完整的包,很可能会被 TCP 拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包进行发送,这就是所谓的 TCP 粘包和拆包问题。在这一步,我们通过自定义的编解码器解决了粘包和拆包问题。

在这里,我们看到 PacketCodecHandler 使用上面提到的报文管理类 PacketManager 的 encode()、decode() 方法来完成编解码的过程。

HeartBeatHandler:心跳的 Handler,接收 TCP 客户端发来的"ping",然后给客户端返回"pong"。

ResponseHandler:通用的处理接收 TCP 客户端发来业务指令的 Handler,可以根据对应的指令去查询对应的 Handler,并对这些命令进行响应。

最后,我们在 ResponseHandler 中,看到还有一个 ThreadPool,它是一个业务线程池。但是在我们所定义的 TCPServer 中, worker 本身使用了一个线程池,为何还需要一个业务线程池呢?

业务线程池的使用

Netty 的 Reactor 线程模型适合处理耗时短的任务场景,对于耗时较长的 ChannelHandler 来说,维护一个业务线程池是一个比较好的做法。将编解码后的数据封装成任务放入线程池中,避免 ChannelHandler 阻塞而造成 EventLoop 不可用。

如果有复杂且耗时的业务逻辑,我推荐的做法是在 ChannelHandler 处理器中自定义新的业务线程池,从而将这些耗时的操作提交到业务线程池中执行。

例如定义一个业务线程池,代码如下:

@Slf4j

public final class ThreadPoolFactoryUtils {
 

/**
* 通过 threadNamePrefix 来区分不同线程池(我们可以把相同 threadNamePrefix 的线程池看作是为同一业务场景服务)。
* key: threadNamePrefix
* value: threadPool
*/
private static final Map<String, ExecutorService> THREAD_POOLS = new ConcurrentHashMap<>();
private ThreadPoolFactoryUtils() {
}
public static ExecutorService createCustomThreadPoolIfAbsent(String threadNamePrefix) {
CustomThreadPoolConfig customThreadPoolConfig = new CustomThreadPoolConfig();
return createCustomThreadPoolIfAbsent(customThreadPoolConfig, threadNamePrefix, false);
}
public static ExecutorService createCustomThreadPoolIfAbsent(String threadNamePrefix, CustomThreadPoolConfig customThreadPoolConfig) {
return createCustomThreadPoolIfAbsent(customThreadPoolConfig, threadNamePrefix, false);
}
public static ExecutorService createCustomThreadPoolIfAbsent(CustomThreadPoolConfig customThreadPoolConfig, String threadNamePrefix, Boolean daemon) {
// 存在则取出,不存新建一个
ExecutorService threadPool = THREAD_POOLS.computeIfAbsent(threadNamePrefix, k -> createThreadPool(customThreadPoolConfig, threadNamePrefix, daemon));
// 如果 threadPool 被 shutdown 的话就重新创建一个
if (threadPool.isShutdown() || threadPool.isTerminated()) {
THREAD_POOLS.remove(threadNamePrefix);
threadPool = createThreadPool(customThreadPoolConfig, threadNamePrefix, daemon);
THREAD_POOLS.put(threadNamePrefix, threadPool);
}
return threadPool;
}
/**
* shutDown 所有线程池
*/
public static void shutDownAllThreadPool() {
log.info("call shutDownAllThreadPool method");
THREAD_POOLS.entrySet().parallelStream().forEach(entry -> {
ExecutorService executorService = entry.getValue();
executorService.shutdown();
log.info("shut down thread pool [{}] [{}]", entry.getKey(), executorService.isTerminated());
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("Thread pool never terminated");
executorService.shutdownNow();
}
});
}
private static ExecutorService createThreadPool(CustomThreadPoolConfig customThreadPoolConfig, String threadNamePrefix, Boolean daemon) {
ThreadFactory threadFactory = createThreadFactory(threadNamePrefix, daemon);
return new ThreadPoolExecutor(customThreadPoolConfig.getCorePoolSize(), customThreadPoolConfig.getMaximumPoolSize(),
customThreadPoolConfig.getKeepAliveTime(), customThreadPoolConfig.getUnit(), customThreadPoolConfig.getWorkQueue(),
threadFactory);
}
/**
* 创建 ThreadFactory 。如果threadNamePrefix不为空则使用自建ThreadFactory,否则使用defaultThreadFactory
*
* @param threadNamePrefix 作为创建的线程名字的前缀
* @param daemon 指定是否为 Daemon Thread(守护线程)
* @return ThreadFactory
*/
public static ThreadFactory createThreadFactory(String threadNamePrefix, Boolean daemon) {
if (threadNamePrefix != null) {
if (daemon != null) {
return new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "-%d")
.setDaemon(daemon).build();
} else {
return new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-%d").build();
}
}
return Executors.defaultThreadFactory();
}

业务线程池的使用很简单,在 businessAction() 方法中, block 参数是一个 Lambda 表达式,用于执行耗时的业务逻辑,通过 Header 中的 command 来查找服务端所对应使用的 ChannelHandler ,并作出响应。

针对特别复杂的业务,还可以根据业务的特点拆分出多个业务线程池。这样做的好处是:即使某个业务逻辑出现异常造成线程池资源耗尽,也不会影响到其他业务逻辑,从而提高应用程序整体的可用性。也做到了线程池的隔离。

总结,只有充分理解各个硬件、各个软件系统可实现的功能,才能设计出合理的自定义协议。反之,理解了一些常用的中间件相关协议,也可以帮助我们深入理解这些中间件,甚至还可以实现各个中间件的代理功能。

实际上对于其他的高级语言实现自定义协议也是类似的。当你真正理解了自定义 TCP 协议,以后再遇到新的协议,例如自定义的串口协议,会更容易理解。
————————————————
版权声明:本文为CSDN博主「凌涛」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/sinat_33462342/article/details/124178392

标签:daemon,Reactor,性能,threadNamePrefix,TCP,线程,customThreadPoolConfig,服务器,推送
From: https://www.cnblogs.com/cnhk19/p/17147466.html

相关文章