1.异步发送API接口
public void send(final Message message, final AsyncSendCallback callback)
2.实现类分析
public void send(final Message message, final AsyncSendCallback callback) {
checkState();
send(message, transportConfig.getSendTimeout(), callback);
}
@Override
public void send(final Message message, final int timeout, final AsyncSendCallback callback) {
checkState();
if (message == null) {
return;
}
List<Message> messages = new ArrayList<Message>();
messages.add(message);
send(messages, timeout, callback);
}
@Override
public void send(final List<Message> messages, final int timeout, final AsyncSendCallback callback) throws JMQException {
checkState();
send(messages, timeout, true, callback);
}
3.调用核心send方法
private void send(final List<Message> messages, final int timeout, final boolean isAsync, final AsyncSendCallback callback) {
......
// 发送,出错重试
transport.async(putMessage, (int) sendTimeout, callback);
}
transport.async接口
void async(Command command, int timeout, CommandCallback callback)
@Override
public void async(final Command command, final int timeout, final CommandCallback callback) throws JMQException {
checkState();
client.async(channel, command, timeout, callback); // 传递了channel,此处的channel是netty包中的channel
}
看看具体怎么用netty中的channel与后端异步通信
@Override
public void async(final Channel channel, final Command command, final int timeout, CommandCallback callback) {
if (channel == null) {
throw new IllegalArgumentException("The argument channel must not be null");
}
if (command == null) {
throw new IllegalArgumentException("The argument command must not be null");
}
if (callback == null) {
throw new IllegalArgumentException("The argument callback must not be null");
}
int sendTimeout = timeout <= 0 ? config.getSendTimeout() : timeout;
// 获取信号量
acquireSemaphore(asyncSemaphore, command, sendTimeout);
// 发送请求
final ResponseFuture responseFuture =
new ResponseFuture(channel, command, sendTimeout, callback, asyncSemaphore, null);
futures.put(command.getRequestId(), responseFuture);
// 应答回来的时候或超时会自动释放command
// channel写数据往后端的同时,返回个future对象,future对象会自动用到注册的listener
channel.writeAndFlush(command).addListener(new ResponseListener(responseFuture, futures));
}
标签:int,send,callback,源码,mq,timeout,final,channel,sdk
From: https://www.cnblogs.com/PythonOrg/p/16837036.html