丢弃消息
这种场景生产环境应该不怎么用。
入死信队列
cloud:
stream:
bindings:
hking:
destination: hking
content-type: application/json
hking-input:
destination: hking
group: hking
content-type: application/json
rabbit:
bindings:
hking-input:
consumer:
auto-bind-dlq: true
重新入原队列
cloud:
stream:
bindings:
hking:
destination: hking
content-type: application/json
hking-input:
destination: hking
group: hking
content-type: application/json
consumer:
max-attempts: 1
rabbit:
bindings:
hking-input:
consumer:
republish-to-dlq: false
requeue-rejected: true
这样配置的话,消息消费失败,重回原队列,程序这边一直出错的话,会一直接收到这个消息。max-attempts=1就是关闭重试,republish-to-dlq=false是不往死信队列里面转发,在这种情况下,设置requeue-rejected=true,消息会重新回到他出来的消息队列。
应用记录消息日志
@StreamListener(value = HkingInput.QUEUE, condition = "headers['event']=='String'")
public void processMyMessage3(User2 message) {
throw new RuntimeException();
}
/**
* 消息消费失败的降级处理逻辑
*
* @param message
*/
@ServiceActivator(inputChannel = "<destination>.<group>.errors")
public void error(Message<?> message) {
System.out.println(message);
log.info("Message consumer failed, call fallback!");
}
重试和回退机制
Retry Template and retryBackoff。
cloud:
stream:
bindings:
hking:
destination: hking
content-type: application/json
hking-input:
destination: hking
group: hking
content-type: application/json
consumer:
max-attempts: 1
一条消息尝试被一个StreamListener注解的方法多处理几次。max-attempts设置为1就是关闭重试。重试可以设置重试的次数,回退机制可以设置重试的间隔策略。