疑问:
- 在写 demo 的时候,如果 client 被强制中断,消息来不及处理,这时候消息又出队列了,这样不是会产生严重的问题嘛?
- 一个会话中,可以同时处理一批数据,如果一条失败了,之前的也要求回滚的话,要怎么处理?
- 获取一个消息之后,发现程序无法处理这条消息,想要退还回去,该怎么办?
方案:
这就涉及到队列的 ACK 机制了,这个机制是 MQ 产品必须提供的一个功能。
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* @author Mr.css
* @version 2020-11-12 19:31
*/
public class ExchangeReceive {
private static final String QUEUE_NAME = "queue_name";
public static void main(String[] args) {
try {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) {
//查看消息主体内容
String message = new String(body, StandardCharsets.UTF_8);
System.out.println("[receive]:" + message);
// 手动 ACK 消息
try {
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
}
}
};
// 设置为自动 ACK
channel.basicConsume(QUEUE_NAME, false, consumer);
} catch (IOException | ShutdownSignalException | ConsumerCancelledException | TimeoutException e) {
e.printStackTrace();
}
}
}
消息确认
channel.basicAck(deliveryTag, multiple)
- deliveryTag:消息交付的 tag,取自 Envelope.deliveryTag;
- multiple:置为 true 的时候,表示截止至目前的所有 tag(包括当前 tag);false 表示只包含当前的 tag。
消息拒绝
与 basicAck() 功能相反的函数,不想处理的消息,可以拒绝退回。
消息拒绝的两个函数的参数是一样的,basicNack可以一次性拒绝多个。
requeue:如果被拒绝的消息应该重新排队,而不是丢弃/死信,则为 true
channel.basicNack(deliveryTag, multiple, requeue);
channel.basicReject(deliveryTag, requeue);
扩展
消息拒绝,又会引出死信队列的概念,以后再去研究吧。
标签:false,String,ack,rabbitmq,deliveryTag,消息,import,机制,channel From: https://www.cnblogs.com/chenss15060100790/p/18517794