一、前言
CompletableFuture是JDK1.8提供的一种更加强大的异步编程的api。它实现了Future接口,也就是Future的功能特性CompletableFuture也有。它也实现了CompletionStage接口,CompletionStage接口定义了任务编排的方法,执行某一阶段,可以向下执行后续阶段。
CompletableFuture相比于Future最大的改进就是提供了类似观察者模式的回调监听的功能,也就是当上一阶段执行结束后,可以回调你指定的下一阶段任务,而不是阻塞获取结果之后来处理结果。
二、常用方法
1.异步操作
方法 | 说明 |
runAsync(Runnable runnable) | 接收Runnable实例,没有返回值 |
runAsync(Runnable runnable, Executor executor); | 接收Runnable实例,指定线程池,没有返回值 |
supplyAsync(Supplier<U> supplier); | 有返回值 |
supplyAsync(Supplier<U> supplier, Executor executor) | 有返回值,指定线程池 |
2.依赖关系
方法 | 说明 |
thenApply() | 把前面任务的执行结果,交给后面的Function |
thenCompose() | thenCompose方法会在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法,该方法会返回一个新的CompletableFuture实例。 |
三、问题产生
我们在上一章的基础上继续学习。https://blog.51cto.com/u_13312531/8879299
我们修改上一章节的Netty初始化器,识别ffee开头、efcd结尾的数据。
package com.example.mynetty.netty;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import org.springframework.stereotype.Component;
/**
* @author qx
* @date 2023/12/19
* @des
*/
@Component
public class NettyInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 基于分隔符的解码器 ffee开头 efcd结尾 提供分隔符解析报文 128表示单条消息的最大长度,解码器查找分隔符的时候,达到该长度没有找到会抛出异常。
pipeline.addLast(
new DelimiterBasedFrameDecoder(128, Unpooled.copiedBuffer(new byte[]{(byte) 0xff, (byte) 0xee}),
Unpooled.copiedBuffer(new byte[]{(byte) 0xef, (byte) 0xcd})));
pipeline.addLast(new ByteArrayDecoder());
pipeline.addLast(new ByteArrayEncoder());
pipeline.addLast("handler", new MessageHandler());
}
}
创建一个异步接收数据服务类。
package com.example.mynetty.service;
import cn.hutool.core.util.HexUtil;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
/**
* @author qx
* @date 2023/12/19
* @des 异步服务类
*/
@Service("taskService")
public class TaskService {
@Async
public void syncTask(byte[] data) {
String s = HexUtil.encodeHexStr(data);
System.out.println("接收的数据:" + s);
}
}
由于我们使用了@Async,所以需要在启动类上开启异步注解。
package com.example.mynetty;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
@SpringBootApplication
@EnableAsync
public class MyNettyApplication {
public static void main(String[] args) {
SpringApplication.run(MyNettyApplication.class, args);
}
}
创建一个获取Spring管理对象的工具类
package com.example.mynetty.utils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class BeanUtil implements ApplicationListener<ContextRefreshedEvent> {
/**
* 上下文对象实例
*/
private static ApplicationContext ctx;
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
log.info("ContextRefreshedEvent");
ctx = contextRefreshedEvent.getApplicationContext();
}
//获取applicationContext
public static ApplicationContext getApplicationContext() {
return ctx;
}
//通过name,以及Clazz返回指定的Bean
public static <T> T getBean(String name, Class<T> clazz) {
return getApplicationContext().getBean(name, clazz);
}
}
修改消息处理器,我们在答复后使用异步接收数据,数据用于后续数据库保存等操作。
package com.example.mynetty.netty;
import com.example.mynetty.service.TaskService;
import com.example.mynetty.utils.BeanUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
/**
* @author qx
* @date 2023/12/19
* @des 处理客户端的消息
*/
@Slf4j
public class MessageHandler extends SimpleChannelInboundHandler<Object> {
//CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object o) throws Exception {
byte[] result = (byte[]) o;
//log.info("接收到消息:{}", Arrays.toString(result));
// 答复
ctx.writeAndFlush(result);
// 开启异步任务接收数据
TaskService taskService = BeanUtil.getBean("taskService", TaskService.class);
taskService.syncTask(result);
/*future = future.thenCompose(unused -> CompletableFuture.runAsync(()->{
TaskService taskService = BeanUtil.getBean("taskService", TaskService.class);
taskService.syncTask(result);
}));*/
}
}
测试:
我们创建了10条带顺序的数据,一起发送给Netty接收。
我们发现网络调试工具是正常按顺序接收到数据的。
但是我们控制台的日志显示接收到的数据的顺序是混乱的。这种情况我们应该避免的,因为有些操作是按顺序执行下去的,我们获取数据的顺序混乱的话是得不到正确操作的。
接收的数据:
接收的数据:001e040227061f061f0a0a0a0a00000800000006000101016dbf
接收的数据:001e040227061f041f0a0a0a0a0000080000000400010101cd67
接收的数据:001e040227061f081f0a0a0a0a0000080000000800010101cfb2
接收的数据:
接收的数据:001e040227061f091f0a0a0a0a00000800000009000101019fde
接收的数据:
接收的数据:001e040227061f071f0a0a0a0a00000800000007000101013dd3
接收的数据:
接收的数据:001e040227061f051f0a0a0a0a00000800000005000101019d0b
接收的数据:
接收的数据:001e040227061f0c1f0a0a0a0a0000080000000c00010101ce01
接收的数据:
接收的数据:001e040227061f0b1f0a0a0a0a0000080000000b000101013f06
接收的数据:
接收的数据:001e040227061f0a1f0a0a0a0a0000080000000a000101016f6a
接收的数据:
接收的数据:
接收的数据:001e040227061f0d1f0a0a0a0a0000080000000d000101019e6d
接收的数据:
四、问题解决
我们使用CompletableFuture让我们的任务异步串行化,也就是任务编排下去。
我们修改消息处理器。
package com.example.mynetty.netty;
import com.example.mynetty.service.TaskService;
import com.example.mynetty.utils.BeanUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
/**
* @author qx
* @date 2023/12/19
* @des 处理客户端的消息
*/
@Slf4j
public class MessageHandler extends SimpleChannelInboundHandler<Object> {
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object o) throws Exception {
byte[] result = (byte[]) o;
//log.info("接收到消息:{}", Arrays.toString(result));
// 答复
ctx.writeAndFlush(result);
// 异步串行化 runAsync不需要返回结果 thenCompose意思为一个任务执行完后再执行下一个任务
future = future.thenCompose(unused -> CompletableFuture.runAsync(() -> {
TaskService taskService = BeanUtil.getBean("taskService", TaskService.class);
taskService.syncTask(result);
}));
}
}
去掉服务类中的异步。
package com.example.mynetty.service;
import cn.hutool.core.util.HexUtil;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
/**
* @author qx
* @date 2023/12/19
* @des 异步服务类
*/
@Service("taskService")
public class TaskService {
public void syncTask(byte[] data) {
// 使用hutool的工具方法解析字节数组数据为16进制数据
String s = HexUtil.encodeHexStr(data);
System.out.println("接收的数据:" + s);
}
}
重启程序继续测试。
我们查看控制台上的接收数据的日志显示的顺序也是和发送时的顺序一致。
标签:netty,异步,Netty,CompletableFuture,org,import,接收,public From: https://blog.51cto.com/u_13312531/8891662