首页 > 编程语言 >异步发送mq消息sdk源码分析

异步发送mq消息sdk源码分析

时间:2022-10-28 18:38:53浏览次数:51  
标签:int send callback 源码 mq timeout final channel sdk

1.异步发送API接口

 public void send(final Message message, final AsyncSendCallback callback)

2.实现类分析

public void send(final Message message, final AsyncSendCallback callback) {
        checkState();
        send(message, transportConfig.getSendTimeout(), callback);
    }
	
	
@Override
public void send(final Message message, final int timeout, final AsyncSendCallback callback)  {
	checkState();
	if (message == null) {
		return;
	}
	List<Message> messages = new ArrayList<Message>();
	messages.add(message);
	send(messages, timeout, callback);
}	
	
@Override
public void send(final List<Message> messages, final int timeout, final AsyncSendCallback callback) throws JMQException {
	checkState();
	send(messages, timeout, true, callback);
}	

3.调用核心send方法

private void send(final List<Message> messages, final int timeout, final boolean isAsync, final AsyncSendCallback callback)  {
       ......
        // 发送,出错重试
       transport.async(putMessage, (int) sendTimeout, callback);
    }

transport.async接口

void async(Command command, int timeout, CommandCallback callback)
@Override
    public void async(final Command command, final int timeout, final CommandCallback callback) throws JMQException {
        checkState();
        client.async(channel, command, timeout, callback); // 传递了channel,此处的channel是netty包中的channel
    }

看看具体怎么用netty中的channel与后端异步通信

@Override
    public void async(final Channel channel, final Command command, final int timeout, CommandCallback callback)  {
        if (channel == null) {
            throw new IllegalArgumentException("The argument channel must not be null");
        }

        if (command == null) {
            throw new IllegalArgumentException("The argument command must not be null");
        }

        if (callback == null) {
            throw new IllegalArgumentException("The argument callback must not be null");
        }

        int sendTimeout = timeout <= 0 ? config.getSendTimeout() : timeout;
        // 获取信号量
        acquireSemaphore(asyncSemaphore, command, sendTimeout);

        // 发送请求
        final ResponseFuture responseFuture =
                new ResponseFuture(channel, command, sendTimeout, callback, asyncSemaphore, null);
        futures.put(command.getRequestId(), responseFuture);
        // 应答回来的时候或超时会自动释放command
		// channel写数据往后端的同时,返回个future对象,future对象会自动用到注册的listener
        channel.writeAndFlush(command).addListener(new ResponseListener(responseFuture, futures));
    }

标签:int,send,callback,源码,mq,timeout,final,channel,sdk
From: https://www.cnblogs.com/PythonOrg/p/16837036.html

相关文章

  • 工业网关BL110实现西门子S7-400 PLC 接入金鸽MQTT云平台
    LAN接口的配置COM口采集西门子S7-400PLC的配置工业智能网关BL110一共有一个LAN接口,一个WAN接口,可以通过LAN接口采集数据,通过WAN接口接入局域网,设置过程不一样,WAN接口可......
  • 工业网关BL110实现西门子S7-1200 PLC接入MQTT Client One云平台
    LAN接口的配置COM口采集西门子S7-1200PLC的配置工业智能网关BL110一共有一个LAN接口,一个WAN接口,可以通过LAN接口采集数据,通过WAN接口接入局域网,设置过程不一样,WAN接口......
  • AI视频融合平台EasyCVR现已支持华为宇视等四种SDK接入
    EasyCVR视频融合平台支持海量视频的汇聚与管理、转码与分发、鉴权管理、智能分析等,平台融合性强、兼容度高,可支持多协议、多设备接入,包括国标GB28181、RTMP、RTSP/Onvif、......
  • EasyCVR使用大华SDK接入时录像显示失败是什么原因?该如何解决?
    EasyCVR视频融合云服务支持多协议、多类型设备的接入,包括国标GB/T28181、RTMP、RTSP/Onvif协议,以及厂家私有协议,如:海康SDK、大华SDK、海康Ehome等。平台可对前端接入的设备......
  • G. Periodic RMQ Problem
    G.PeriodicRMQProblem题目大意给你一个序列\(a\)让你支持\(1\)\(l\)\(r\)\(x\)区间赋值\(2\)\(l\)\(r\)询问区间最小值我们觉得这个问题太水了,所以我们......
  • java-rabbitmq-官网实例02
    java-rabbitmq-官网实例02描述:  1.定义持久化队列,发送持久化消息,消息接受者需要手动应答,MQ才会删除队列中的消息 2.使用channel.basicQos......
  • java-rabbitmq-官网实例01
    java-rabbitmq-官网实例01描述:最简单实例,使用非持久化队列,生产者发布消息,MQ将消息推送给消费者消费,之后MQ在队列中删除该消息依次运行:D1_Send.......
  • java-rabbitmq-交换机介绍
    java-rabbitmq-交换机介绍RabbitMQ的四种交换机直连交换机:Directexchange扇出交换机:Fanoutexchange主题交换机:Topicexchange首部交换机:H......
  • 直播网站程序源码,获取本地上传图片的尺寸(宽高)
    直播网站程序源码,获取本地上传图片的尺寸(宽高) <inputid="file"@change="uploadImegs($event,3)"type="file"accept="image/*"/>uploadImegs(e){  let_file......
  • 基于百度图像识别SDK开发动植物识别
    1.登录百度智能云官网(没有要先注册账号)2.在官网下载javasdk压缩工具包3.将下载的aip-java-sdk-version.zip解压。  4.在idea新建工程,并添加lib文件夹,把jar包导入。......