首页 > 其他分享 >seata 客户端如何接收响应消息

seata 客户端如何接收响应消息

时间:2023-03-08 11:25:01浏览次数:30  
标签:rpcMessage seata getBody futures MessageFuture 接收 messageFuture 客户端

seata是使用CompletableFuture来处理响应结果的。seata单独封装了MessageFuture类,用来包裹CompletableFuture:

public class MessageFuture {
    private RpcMessage requestMessage;
    private long timeout;
    private long start = System.currentTimeMillis();
    private transient CompletableFuture<Object> origin = new CompletableFuture<>();
}

在AbstractNettyRemoting抽象类中,seata定义了一个futures变量,用来存放MessageFuture信息:

 protected final ConcurrentHashMap<Integer, MessageFuture> futures = new ConcurrentHashMap<>();

在AbstractNettyRemoting抽象类中,发送消息方法sendSyn会将创建MessageFuture实例,并将MessageFuture实例缓存futures中。

 MessageFuture messageFuture = new MessageFuture();
        messageFuture.setRequestMessage(rpcMessage);
        messageFuture.setTimeout(timeoutMillis);
        futures.put(rpcMessage.getId(), messageFuture);

        channelWritableCheck(channel, rpcMessage.getBody());

        String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
        doBeforeRpcHooks(remoteAddr, rpcMessage);

        // 发送消息
        channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
            if (!future.isSuccess()) {
                MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
                if (messageFuture1 != null) {
                    messageFuture1.setResultMessage(future.cause());
                }
                destroyChannel(future.channel());
            }
        });

        try {
            // 获取响应结果,如果获取不到会一直等待,直到超时
            Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
            doAfterRpcHooks(remoteAddr, rpcMessage, result);
            return result;
        } catch (Exception exx) {
            LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(), rpcMessage.getBody());
            if (exx instanceof TimeoutException) {
                throw (TimeoutException) exx;
            } else {
                throw new RuntimeException(exx);
            }
        }

客户端在注册Processor时,会通过ClientOnResponseProcessor类,获取futures.

ClientOnResponseProcessor onResponseProcessor = new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());

这样,客户端在接收到服务端的消息时,会将响应结果set进MessageFuture中:

MessageFuture messageFuture = futures.remove(rpcMessage.getId());
if (messageFuture != null) {
    // 设置响应结果,此时messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);获取结果等待结束
    messageFuture.setResultMessage(rpcMessage.getBody());
} else {
    if (rpcMessage.getBody() instanceof AbstractResultMessage) {
        if (transactionMessageHandler != null) {
            transactionMessageHandler.onResponse((AbstractResultMessage) rpcMessage.getBody(), null);
        }
    }
}

标签:rpcMessage,seata,getBody,futures,MessageFuture,接收,messageFuture,客户端
From: https://www.cnblogs.com/demon001/p/17191218.html

相关文章

  • JavaMail 邮件发送,有意思的附件名乱码 → 客户端正常,web端乱码
    开心一刻昨晚,媳妇很感伤的看着我媳妇:以后岁数大了,我要走你前面去了,你再找个老伴我:我不想找媳妇:你找一个,不用替我守着,以后你说你头疼发烧,也得有个给你......
  • 连接linux的远程客户端软件出现输入命令行卡的问题
    xshell输入慢的问题是由ssh的服务端在连接时会自动检测dns环境是否一致所导致的,这里将UseDNSyes改为UseDNSno即可具体操作如下: 打开sshd服务的配置文件vi/etc/ssh/s......
  • QDialog接收不到enter按键响应
    自定义了一个QDialog窗口,需要接收快捷键enter确认;一开始发现无法接收enter健,其他健可以;1.QDialog的构造函数设置为setFocusPolicy(Qt::StrongFocus);2.将Dialog上的QPu......
  • udp客户端 用不用 bind 的区别
    无连接的socket的客户端和服务端以及面向连接socket的服务端通过调用bind函数来配置本地信息。使用bind函数时,通过将my_addr.sin_port置为0,函数会自动为你选择一个未占用的......
  • 微服务之事务管理Seata
    Seata是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务,也是SpringCloudAlibaba提供的组件, Seata官方文档:为什么需要Sea......
  • Vineyard 论文被 SIGMOD'2023 接收,助力计算引擎之间高效数据交换
    Vineyard(CNCFsandbox项目)是脱胎于GraphScope底层存储、用于在复杂工作流中不同计算引擎之间进行高效数据交换的中间件,该工作的论文被数据库领域顶级学术会议SIGMOD......
  • Indy实现FTP客户端
    首先大家了解一下FTP的基本知识:FTP是一个标准协议,它是在计算机和网络之间交换文件的最简单的方法。FTP也是应用TCP/IP协议的应用协议标准。FTP通常于将作者的文件上传至......
  • 通过RestController里面的方法,改变UDP接收到的结构体里面的数据InputStruct
    我有一个java的springboot程序,里面包括一个UDP监听程序,监听来自第三方UDP发送程序。第三方UDP发送过来的是一个结构体,我在本地程序也建立一个结构体InputStruct来接收数据,......
  • android | 静态注册广播接收器的坑
    android|静态注册广播接收器的坑按书上写的并不能正常接收,查了一下是这个原因:反正就是要加上包名类名,有点离谱哦。具体如下: btn.setOnClickListener(newView.OnCli......
  • Canvas、客户端、表单
    Canvasvarcanvas=document.querySelector('.myCanvas');varwidth=canvas.width=window.innerWidth;varheight=canvas.height=window.innerHeight;滚动条......