首页 > 其他分享 >Netty使用CompletableFuture实现异步串行队列

Netty使用CompletableFuture实现异步串行队列

时间:2023-12-19 17:31:35浏览次数:31  
标签:netty 异步 Netty CompletableFuture org import 接收 public

一、前言

CompletableFuture是JDK1.8提供的一种更加强大的异步编程的api。它实现了Future接口,也就是Future的功能特性CompletableFuture也有。它也实现了CompletionStage接口,CompletionStage接口定义了任务编排的方法,执行某一阶段,可以向下执行后续阶段。

CompletableFuture相比于Future最大的改进就是提供了类似观察者模式的回调监听的功能,也就是当上一阶段执行结束后,可以回调你指定的下一阶段任务,而不是阻塞获取结果之后来处理结果。


二、常用方法

1.异步操作

方法

说明

runAsync(Runnable runnable)


接收Runnable实例,没有返回值

runAsync(Runnable runnable, Executor executor);

接收Runnable实例,指定线程池,没有返回值

supplyAsync(Supplier<U> supplier);

有返回值

supplyAsync(Supplier<U> supplier, Executor executor)

有返回值,指定线程池

2.依赖关系

方法

说明

thenApply()

把前面任务的执行结果,交给后面的Function

thenCompose()

thenCompose方法会在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法,该方法会返回一个新的CompletableFuture实例。

三、问题产生

我们在上一章的基础上继续学习。https://blog.51cto.com/u_13312531/8879299

我们修改上一章节的Netty初始化器,识别ffee开头、efcd结尾的数据。

package com.example.mynetty.netty;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import org.springframework.stereotype.Component;

/**
 * @author qx
 * @date 2023/12/19
 * @des
 */
@Component
public class NettyInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();

        // 基于分隔符的解码器  ffee开头 efcd结尾 提供分隔符解析报文 128表示单条消息的最大长度,解码器查找分隔符的时候,达到该长度没有找到会抛出异常。
        pipeline.addLast(
                new DelimiterBasedFrameDecoder(128, Unpooled.copiedBuffer(new byte[]{(byte) 0xff, (byte) 0xee}),
                        Unpooled.copiedBuffer(new byte[]{(byte) 0xef, (byte) 0xcd})));

        pipeline.addLast(new ByteArrayDecoder());
        pipeline.addLast(new ByteArrayEncoder());

        pipeline.addLast("handler", new MessageHandler());
    }
}

创建一个异步接收数据服务类。

package com.example.mynetty.service;

import cn.hutool.core.util.HexUtil;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

/**
 * @author qx
 * @date 2023/12/19
 * @des 异步服务类
 */
@Service("taskService")
public class TaskService {

    @Async
    public void syncTask(byte[] data) {
        String s = HexUtil.encodeHexStr(data);
        System.out.println("接收的数据:" + s);
    }
}

由于我们使用了@Async,所以需要在启动类上开启异步注解。

package com.example.mynetty;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
@EnableAsync
public class MyNettyApplication {

    public static void main(String[] args) {
        SpringApplication.run(MyNettyApplication.class, args);
    }

}

创建一个获取Spring管理对象的工具类

package com.example.mynetty.utils;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class BeanUtil implements ApplicationListener<ContextRefreshedEvent> {
    /**
     * 上下文对象实例
     */
    private static ApplicationContext ctx;

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        log.info("ContextRefreshedEvent");
        ctx = contextRefreshedEvent.getApplicationContext();
    }

    //获取applicationContext
    public static ApplicationContext getApplicationContext() {
        return ctx;
    }


    //通过name,以及Clazz返回指定的Bean
    public static <T> T getBean(String name, Class<T> clazz) {
        return getApplicationContext().getBean(name, clazz);
    }

}

修改消息处理器,我们在答复后使用异步接收数据,数据用于后续数据库保存等操作。

package com.example.mynetty.netty;

import com.example.mynetty.service.TaskService;
import com.example.mynetty.utils.BeanUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;

/**
 * @author qx
 * @date 2023/12/19
 * @des 处理客户端的消息
 */
@Slf4j
public class MessageHandler extends SimpleChannelInboundHandler<Object> {

    //CompletableFuture<Void> future = CompletableFuture.completedFuture(null);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object o) throws Exception {
        byte[] result = (byte[]) o;
        //log.info("接收到消息:{}", Arrays.toString(result));
        // 答复
        ctx.writeAndFlush(result);

        // 开启异步任务接收数据
        TaskService taskService = BeanUtil.getBean("taskService", TaskService.class);
        taskService.syncTask(result);


        /*future = future.thenCompose(unused -> CompletableFuture.runAsync(()->{
            TaskService taskService = BeanUtil.getBean("taskService", TaskService.class);
            taskService.syncTask(result);
        }));*/
    }
}

测试:

我们创建了10条带顺序的数据,一起发送给Netty接收。

Netty使用CompletableFuture实现异步串行队列_CompletableFuture

我们发现网络调试工具是正常按顺序接收到数据的。

但是我们控制台的日志显示接收到的数据的顺序是混乱的。这种情况我们应该避免的,因为有些操作是按顺序执行下去的,我们获取数据的顺序混乱的话是得不到正确操作的。

接收的数据:
接收的数据:001e040227061f061f0a0a0a0a00000800000006000101016dbf
接收的数据:001e040227061f041f0a0a0a0a0000080000000400010101cd67
接收的数据:001e040227061f081f0a0a0a0a0000080000000800010101cfb2
接收的数据:
接收的数据:001e040227061f091f0a0a0a0a00000800000009000101019fde
接收的数据:
接收的数据:001e040227061f071f0a0a0a0a00000800000007000101013dd3
接收的数据:
接收的数据:001e040227061f051f0a0a0a0a00000800000005000101019d0b
接收的数据:
接收的数据:001e040227061f0c1f0a0a0a0a0000080000000c00010101ce01
接收的数据:
接收的数据:001e040227061f0b1f0a0a0a0a0000080000000b000101013f06
接收的数据:
接收的数据:001e040227061f0a1f0a0a0a0a0000080000000a000101016f6a
接收的数据:
接收的数据:
接收的数据:001e040227061f0d1f0a0a0a0a0000080000000d000101019e6d
接收的数据:

四、问题解决

我们使用CompletableFuture让我们的任务异步串行化,也就是任务编排下去。

我们修改消息处理器。

package com.example.mynetty.netty;

import com.example.mynetty.service.TaskService;
import com.example.mynetty.utils.BeanUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CompletableFuture;

/**
 * @author qx
 * @date 2023/12/19
 * @des 处理客户端的消息
 */
@Slf4j
public class MessageHandler extends SimpleChannelInboundHandler<Object> {

    CompletableFuture<Void> future = CompletableFuture.completedFuture(null);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object o) throws Exception {
        byte[] result = (byte[]) o;
        //log.info("接收到消息:{}", Arrays.toString(result));
        // 答复
        ctx.writeAndFlush(result);


        // 异步串行化 runAsync不需要返回结果 thenCompose意思为一个任务执行完后再执行下一个任务
        future = future.thenCompose(unused -> CompletableFuture.runAsync(() -> {
            TaskService taskService = BeanUtil.getBean("taskService", TaskService.class);
            taskService.syncTask(result);
        }));
    }
}

去掉服务类中的异步。

package com.example.mynetty.service;

import cn.hutool.core.util.HexUtil;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

/**
 * @author qx
 * @date 2023/12/19
 * @des 异步服务类
 */
@Service("taskService")
public class TaskService {

    public void syncTask(byte[] data) {
        // 使用hutool的工具方法解析字节数组数据为16进制数据
        String s = HexUtil.encodeHexStr(data);
        System.out.println("接收的数据:" + s);
    }
}

重启程序继续测试。

Netty使用CompletableFuture实现异步串行队列_Netty_02

我们查看控制台上的接收数据的日志显示的顺序也是和发送时的顺序一致。

Netty使用CompletableFuture实现异步串行队列_Netty_03

标签:netty,异步,Netty,CompletableFuture,org,import,接收,public
From: https://blog.51cto.com/u_13312531/8891662

相关文章

  • java的8种异步实现方式
    异步执行对于开发者来说并不陌生,在实际的开发过程中,很多场景多会使用到异步,相比同步执行,异步可以大大缩短请求链路耗时时间,比如:发送短信、邮件、异步更新等,这些都是典型的可以通过异步实现的场景。异步的八种实现方式线程ThreadFuture异步框架CompletableFutureSpring注解@A......
  • 异步记录第三方接口调用日志的优雅实现(HttpClient+装饰者模式+异步线程池)
    对于第三方接口调用日志这个功能,笔者在工作中曾见过以下两种方式:Restemplate+装饰者模式+MQ实现网关监控+Feign拦截器+观察者模式实现其中观察者模式的实现是我最为佩服的设计,个人认为以上两种实现都显得略过臃肿,应该简化设计,让异步记录的实现更加简洁优雅,因此产生了这样......
  • SpringBoot异步任务获取HttpServletRequest
     前言在使用框架日常开发中需要在controller中进行一些异步操作减少请求时间,但是发现在使用@Anysc注解后会出现Request对象无法获取的情况,本文就此情况给出完整的解决方案原因分析@Anysc注解会开启一个新的线程,主线程的Request和子线程是不共享的,所以获取为null在使用spr......
  • 异步编码规范
    异步编码规范手写promisepromiseA+规范asyncawait原理generator--忽略Promise1.特点1.1状态不可逆转==》不可从一个状态变为另外一个状态promise的方法finallyfinally方法没有参数,也不会改变Promise的状态,它只是在Promise结束时提供了一个通知机制,让......
  • 直播平台源码,教你如何写出同步与异步
    直播平台源码,教你如何写出同步与异步同步示例代码:console.log("开始");functionsyncOperation(){console.log("同步操作");}syncOperation();console.log("结束"); 输出结果:开始同步操作结束 在上述代码中,同步操作syncOperation()按照顺序......
  • 34. 干货系列从零用Rust编写负载均衡及代理,异步测试在Rust中的实现
    wmproxywmproxy已用Rust实现http/https代理,socks5代理,反向代理,静态文件服务器,四层TCP/UDP转发,七层负载均衡,内网穿透,后续将实现websocket代理等,会将实现过程分享出来,感兴趣的可以一起造个轮子项目地址国内:https://gitee.com/tickbh/wmproxygithub:https://github.com/......
  • SpringBoot使用Async注解实现异步线程
    1、启动类增加@EnableAsync注解2、yml增加配置spring:task:execution:pool:max-size:8core-size:8keep-alive:60queue-capacity:1000thread-name-prefix:Asnyc-task-calc-3、编写配置类AsyncTaskConfigimp......
  • Python 异步编程之yield关键字
    背景介绍在前面的篇章中介绍了同步和异步在IO上的对比,从本篇开始探究python中异步的实现方法和原理。python协程的发展流程:python2.5为生成器引用.send()、.throw()、.close()方法python3.3为引入yieldfrom,可以接收返回值,可以使用yieldfrom定义协程Python3.4加入了asy......
  • 记录一次在k8s上,web服务内嵌的netty-socketio注册到nacos,gateway转发路由 遇到的问题
    web服务内嵌的nacos怎么注册?使用javasdk方式参考链接:https://nacos.io/zh-cn/docs/sdk.html每个socket不同怎么设置端口我这里使用的是注解,让用户传过来,并且在bean初始化之前进行变量存储。这个链接里面的[netty-socketio服务端代码编写目录]:https://www.cnblogs.com/x......
  • 异步任务
    参考https://blog.csdn.net/m0_65992672/article/details/130422166 @SpringBootApplication@EnableAsync//开启异步任务支持publicclassApplicationStarter{publicstaticvoidmain(String[]args){SpringApplication.run(ApplicationStarter.class,args);}}......