首页 > 其他分享 >RabbitMQ高级之消息可靠性投递

RabbitMQ高级之消息可靠性投递

时间:2023-06-20 14:22:33浏览次数:50  
标签:www 可靠性 http springframework 投递 RabbitMQ import org schema

什么是可靠性投递?

生产者:作为消息发送方希望杜绝任何消息丢失或者投递失败场景。

RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

分别是:

  • confirm 确认模式
  • return 退回模式

消息投递路线

如下所示是生产者到消费者的模型:

RabbitMQ的整个消息投递的路径

Producer--->RabbitMQ Broker(Server)--->Exchange--->Queue--->Consumer

  • 消息从 producerexchange 则会返回一个 confirmCallback
  • 消息从 exchange 到 queue 投递失败则会返回一个 returnCallback

我们将利用这两个 callback 控制消息的可靠性投递。

确认模式

消息从 producerexchange 则会返回一个 confirmCallback ,不论消息是否成功到达exchange ,回调都会执行,只不过返回的bool类型的值是true or false的区别。

代码实现很简单,两步即可

步骤

(1)在XML配置文件中设置 ConnectionFactory 开启 publisher-confirms="true"

(2)在 rabbitTemplate 定义 ConfirmCallBack() 回调函数

代码实现 

spring-rabbitmq-producer.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:context="http://www.springframework.org/schema/context"
  5. xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  6. xmlns:rabb="http://www.springframework.org/schema/rabbit"
  7. xsi:schemaLocation="http://www.springframework.org/schema/beans
  8. http://www.springframework.org/schema/beans/spring-beans.xsd
  9. http://www.springframework.org/schema/context
  10. https://www.springframework.org/schema/context/spring-context.xsd
  11. http://www.springframework.org/schema/rabbit
  12. http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
  13. <!-- 1.加载配置文件-->
  14. <context:property-placeholder location="classpath:rabbitmq.properties"/>
  15. <!-- 2.定义rabbitmq connectionFactory -->
  16. <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
  17. port="${rabbitmq.port}"
  18. username="${rabbitmq.username}"
  19. password="${rabbitmq.password}"
  20. virtual-host="${rabbitmq.virtual-host}"
  21. publisher-confirms="true"/>
  22. <!-- publisher-confirms="true" 确认模式开启!!! -->
  23. <!-- 3.定义管理交换机、队列-->
  24. <rabbit:admin connection-factory="connectionFactory"/>
  25. <!-- 消息的可靠性投递 -->
  26. <rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue>
  27. <rabbit:direct-exchange name="test_exchange_confirm">
  28. <rabbit:bindings>
  29. <rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding>
  30. </rabbit:bindings>
  31. </rabbit:direct-exchange>
  32. <!-- 定义rabbitTemplate对象操作可以在代码中方便发送消息-->
  33. <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
  34. </beans>

ProducerTest

  1. package com.Harmony;
  2. import org.junit.Test;
  3. import org.junit.runner.RunWith;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.connection.CorrelationData;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.test.context.ContextConfiguration;
  9. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
  10. @RunWith(SpringJUnit4ClassRunner.class)
  11. @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
  12. public class ProducerTest {
  13. /**
  14. * 确认模式
  15. * 开启:
  16. * 1. ConnectionFactory 中开启 publisher-confirms="true"
  17. * 2. 在 rabbitTemplate 定义 ConfirmCallBack() 回调函数
  18. */
  19. @Autowired
  20. private RabbitTemplate rabbitTemplate;
  21. // 2. 定义回调
  22. // 一、确认模式
  23. @Test
  24. public void testConfirm() throws InterruptedException {
  25. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  26. /**
  27. *
  28. * @param correlationData: 配置信息,在convertAndSend()中的重载方法有该参数
  29. * @param ack: 代表exchange交换机是否收到了信息,true为成功,false为失败
  30. * @param cause: 失败原因; 如果成功为null
  31. */
  32. @Override
  33. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  34. System.out.println(ack);
  35. System.out.println("confirm方法被执行了......");
  36. if (ack) {
  37. System.out.println("接收成功:" + cause);
  38. } else {
  39. System.out.println("接收失败:" + cause);
  40. // 以后可能会做一些处理
  41. }
  42. }
  43. });
  44. // 3. 发送消息
  45. rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message confirm......");
  46. // 由于在测试种,执行结束线程直接没有了!
  47. Thread.sleep(2000);
  48. }
  49. }

退回模式

当消息从exchange路由到 queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退 回给producer。并执行回调函数returnedMessage。

步骤

(1)开启回退模式 publisher-returns="true"

(2)设置ReturnCallback

(3)设置Exchange处理消息的模式:

        如果消息没有路由到Queue,则会丢弃消息(默认)

        如果消息没有路由到Queue,返回给消息发送方

代码实现

spring-rabbitmq-producer.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:context="http://www.springframework.org/schema/context"
  5. xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  6. xmlns:rabb="http://www.springframework.org/schema/rabbit"
  7. xsi:schemaLocation="http://www.springframework.org/schema/beans
  8. http://www.springframework.org/schema/beans/spring-beans.xsd
  9. http://www.springframework.org/schema/context
  10. https://www.springframework.org/schema/context/spring-context.xsd
  11. http://www.springframework.org/schema/rabbit
  12. http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
  13. <!-- 1.加载配置文件-->
  14. <context:property-placeholder location="classpath:rabbitmq.properties"/>
  15. <!-- 2.定义rabbitmq connectionFactory -->
  16. <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
  17. port="${rabbitmq.port}"
  18. username="${rabbitmq.username}"
  19. password="${rabbitmq.password}"
  20. virtual-host="${rabbitmq.virtual-host}"
  21. publisher-returns="true" />
  22. <!-- publisher-returns="true" 回退模式开启!!! -->
  23. <!-- 3.定义管理交换机、队列-->
  24. <rabbit:admin connection-factory="connectionFactory"/>
  25. <!-- 消息的可靠性投递 -->
  26. <rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue>
  27. <rabbit:direct-exchange name="test_exchange_confirm">
  28. <rabbit:bindings>
  29. <rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding>
  30. </rabbit:bindings>
  31. </rabbit:direct-exchange>
  32. <!-- 定义rabbitTemplate对象操作可以在代码中方便发送消息-->
  33. <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
  34. </beans>

ProducerTest

  1. package com.Harmony;
  2. import org.junit.Test;
  3. import org.junit.runner.RunWith;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.connection.CorrelationData;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.test.context.ContextConfiguration;
  9. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
  10. @RunWith(SpringJUnit4ClassRunner.class)
  11. @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
  12. public class ProducerTest {
  13. @Autowired
  14. private RabbitTemplate rabbitTemplate;
  15. @Test
  16. public void testReturn() throws InterruptedException {
  17. // 设置交换机处理失败消息的模式
  18. // 生产者可以拿到之前发送失败的消息
  19. rabbitTemplate.setMandatory(true);
  20. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
  21. /**
  22. *
  23. * @param message: 消息对象
  24. * @param replyCode: 错误码
  25. * @param replyText: 错误信息
  26. * @param exchange: 交换机
  27. * @param routingKey: 路由键
  28. */
  29. @Override
  30. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  31. System.out.println("return 执行了...");
  32. System.out.println(message);
  33. System.out.println(replyCode);
  34. System.out.println(replyText);
  35. System.out.println(exchange);
  36. System.out.println(routingKey);
  37. }
  38. });
  39. // 3. 发送消息
  40. rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message confirm......");
  41. Thread.sleep(2000);
  42. }
  43. }

总述

➢ 设置ConnectionFactory的publisher-confirms="true" 开启确认模式。

➢ 使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回 调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发 送失败,需要处理。

➢ 设置ConnectionFactory的publisher-returns="true" 开启退回模式。

➢ 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到 queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退 回给producer。并执行回调函数returnedMessage。

➢ 在RabbitMQ中也提供了事务机制,但是性能较差,此处不做讲解。 使用channel下列方法,完成事务控制:

  • txSelect(), 用于将当前channel设置成transaction模式
  • txCommit(),用于提交事务
  • txRollback(),用于回滚事务

文章知识点与官方知识档案匹配,可进一步学习相关知识Java技能树首页概览120777 人正在系统学习中

标签:www,可靠性,http,springframework,投递,RabbitMQ,import,org,schema
From: https://www.cnblogs.com/weidaijie/p/17493548.html

相关文章

  • PDD200A101用性以及实时性、可互操作性、可靠性、抗干扰性
    PDD200A101用性以及实时性、可互操作性、可靠性、抗干扰性PDD200A101用性以及实时性、可互操作性、可靠性、抗干扰性  工业以太网是应用于工业控制领域的以太网技术,在技术上与商用以太网(即IEEE802.3标准)兼容,但是实际产品和应用却又完全不同。这主要表现普通商用以太网的产......
  • 记录 Windows 下 RabbitMQ 的部署
    1、软件下载安装前需要安装Erlang语言环境:https://www.erlang.org/downloadsRabbitMQ下载:https://www.rabbitmq.com/download.html2、运行安装包,一路下一步3、打开开始菜单,找到RabbitMQCommandPrompt(sbindir)运行......
  • RabbitMQ快速使用代码手册
    本篇博客的内容为RabbitMQ在开发过程中的快速上手使用,侧重于代码部分,几乎没有相关概念的介绍,相关概念请参考以下csdn博客,两篇都是我找的精华帖,供大家学习。本篇博客也持续更新~~~内容代码部分由于word转md格式有些问题,可以直接查看我的有道云笔记,链接:https://note.youdao.com/s/A......
  • SpringBoot快速整合RabbitMq小案例
    对于一个直接创建的springBoot项目工程来说,可以按照以下步骤使用rabbitmq添加依赖:添加rabbitMQ的依赖。<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>配置连接:在配置文件中配置虚拟主......
  • CentOS下安装Erlang和RabbitMQ
    (1).源码安装ErlangRabbitMQ依赖于Erlang,所以在安装RabbitMQ之前,需要先安装Erlang。注意:Erlang和RabbitMQ之间有版本的依赖关系,详见:https://www.rabbitmq.com/which-erlang.html。我这里使用版本是Erlang24.2+RabbitMQ3.9.11。1)首先,安装编译工具和开发包(依赖包)......
  • springboot rabbitmq配置
    YMLrabbitmq:host:xxx.xxx.xxx.xxxport:5672virtual-host:devusername:xxxpassword:xxxpublisher-confirm-type:correlatedpublisher-returns:truelistener:direct:acknowledge-mode:autosimple:......
  • windows下安装rabbitmq
    1、Erlang的安装因为RabbitMQ是用Erlang语言编写的,所以要安装RabbitMQ先要安装Erlang。下载地址:http://www.erlang.org/downloads下载完成后就双击一直next后安装[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sosn4mCm-1684822676988)(C:\Users\Adm......
  • RabbitMQ如何保证消息可靠性?
    RabbitMQ如何保证消息可靠性?   为什么会提到MQ的消息可靠性?   线上环境中,最近偶尔出现了这样的情况:业务执行过程中突然被中断了,后面的不再执行。问题定位到了MQ生产消息的位置   一、如何保证消息的可靠性?  一个消息会经历四个节点,只有保证这四个节点的可......
  • rabbitMq
    rabbitMq一、安装安装准备工具1.Eralng,底下连接已提供otp_win64_20.2.exe存放地址:otp_win64_25.3.2.exe2.rabbitmq,底下链接已提供rabbitmq-server-3.7.4.exe链接:rabbitmq-server-3.11.16.exe————————————————安装第一步:安装otp_win64_20.2.exe,一......
  • 黑马rabbitmq
    消息中间件面试题-参考回答面试官:RabbitMQ-如何保证消息不丢失候选人:嗯!我们当时MYSQL和Redis的数据双写一致性就是采用RabbitMQ实现同步的,这里面就要求了消息的高可用性,我们要保证消息的不丢失。主要从三个层面考虑第一个是开启生产者确认机制,确保生产者的消息能到达队列,如果......