rabbitmq整个消息投递的路径为:
produce---> rabbitmq broker---> exchange---> queue---> consumer
1.1 生产端可靠性
在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
*confirm 确认模式, 确认生产者的消息被发送到交换机
*return 退回模式,确认消息从交换机被发到队列中
开启确认模式:
1,yml配置文件设置 publisher-confirm-type: correlated 开启确认模式
2,使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("correlationData = " + correlationData); System.out.println("ack = " + ack); //false System.out.println("cause = " + cause); if (ack) { System.out.println("消息发送到交换机确认成功!"); } else { System.out.println("消息发送到交换机失败,接下来根据业务做处理,比如:重发"); } } });
开启确认模式:
设置ConnectionFactory的publisher-returns="true"开启退回模式。
使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue,失败后,如果设置了rabbitTemplate.setMandatory(true)参数, 则会将消息退回给producer。并执行回调函数returnedMessage。
rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("return执行了"); } }); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"confirm","test confirm"); }
RabbitMQ中也提供了事务机制,但是性能较差,一般不适用。
1.2 消费端可靠性consumer Ack
ack指acknowledge,表示消费端收到消息后的确认方式。
有三种确认方式:
1、自动确认,acknowledge = true
2、手动确认,acknowledge = manual
3、根据异常情况确认, acknowledge = auto 这种使用比较麻烦,先不做记录
其中自动确认消息是指,当消息一旦被consumer收到,则自动确认,并将相应么message从Rabbit MQ的缓存中移除。但是在实际业务中,很可能消息接收到,但是业务处理出现异常,那么消息就会丢失。如果配置了手动确认方式,则需要在业务处理成功后,掉从channal.basicAck(),手动签收,如果出现异常,则调用channal.basicNack()方法,让其自动重发消息。
一般使用手动确认的方式,步骤如下:
1、yml配置手动签收:listener:simple:acknowledge-mode:manual。
2、让监听器类实现ChannelAwareMessageListener。
3、如果消息成功处理,则调用channel.basicAck签收。
4、如果消息处理失败,则调用channel的basicNack()拒绝签收,broker会重新发送给consumer。
try { //int i=1/0; System.out.println("消息:" + msg); // 消费者手动确认消息消费; 参数1:消息id;参数2:是否批量确认 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { e.printStackTrace(); // 需求:消息消费失败了,希望把消息重投到队列中,再次进行消费,如果还是消费失败就拒绝消息 // 获取消息是否是重投的标记 Boolean redelivered = message.getMessageProperties().getRedelivered(); if (redelivered) { // 重投消息:可以拒绝不处理,false表示丢弃或者进入死信队列 channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); System.out.println("消息重投后消费消息失败,拒绝处理"); } else { // 未重投过:重投 channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true); System.out.println("消息消费异常,进行重投!"); } }
·
1.3 持久化
上述方法是为了保证消息在组件间传递不丢失,为了保证消息在组件中不丢失,需要对broker,queue,message进行持久化,设置durable为true
1.4 Broker集群
搭建broker集群,实现高可用
标签:确认,System,RabbitMQ,可靠,消息,println,投递,true,out From: https://www.cnblogs.com/tyleaf/p/17093846.html