首页 > 其他分享 >RabbitMQ

RabbitMQ

时间:2023-06-03 22:31:41浏览次数:45  
标签:reasonEntity 队列 spring rabbitmq 消息 RabbitMQ message

https://blog.csdn.net/qq_35387940/article/details/100514134

RabbitMQ 概简介 大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力。 消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,知道接收者取回它。 image.png Producer:消息生产者,负责生产和发送消息到Broker; Broker:消息处理中心,负责消息存储、确认、重试等; Consumer:消息消费中心,负责从Broker中获取消息并处理。

  • 消息队列-特性

异步性:将耗时的同步任务通过发送消息的方式进行异步处理,减少等待时间。 松耦合:不同系统、服务之间可以通过消息队列进行通信,不用关心彼此的实现细节,数据格式一致。 分布式:为了防止消息堵塞,可以对消费者集群进行横向扩展,避免单点故障,同样队列本身也可以。 可靠性:将接收到的消息落盘,就算服务器重启或者发生故障,恢复之后也能重新加载

Message

消息,消息是不具名的,它由消息头和消息体组成 消息头,包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等

Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序

Exchange

交换器,将生产者消息路由给服务器中的队列 类型有direct(默认),fanout, topic, 和headers,具有不同转发策略

Queue

消息队列,保存消息直到发送给消费者

Binding

绑定,用于消息队列和交换器之间的关联

Connection

网络连接,比如一个TCP连接

Channel

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成的。因为对于操作系统来说建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。

Consumer

消息的消费者,表示一个从消息队列中取得消息的客户端应用程序

Virtual Host

虚拟主机,表示一批交换器、消息队列和相关对象。

vhost 是 AMQP 概念的基础,必须在连接时指定

RabbitMQ 默认的 vhost 是 /

Broker

消息队列服务器实体 image.png

RabbitMQ 安装 [docker]

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
直接启动 没有资源会去自动下载
docker update rabbitmq --restart=always 自动启动
http://192.168.56.10:15672/#/
账号 密码 guest

image.png

https://blog.csdn.net/J080624/article/details/80943312

RabbitMQ提供了四种Exchange模式

RabbitMQ提供了四种Exchange模式:fanout,direct,topic,header 。 header模式在实际使用中较少,这里只讨论前三种模式.

fanout 模式#

fanout 模式就是广播模式~,消息来了,会发给所有的队列~ image.png

测试广播模式: 先在交换机创建 fanout模式的交换机,命名为 my.fanout.exchange,然后再到队列创建多个队列,再到交换机绑定队列,fanout可以不设置路由key,因为这个是广播模式的,最后发消息测试。 image.png

Direct 模式#

Direct 模式就是指定队列模式, 消息来了,只发给指定的 Queue, 其他Queue 都收不到。 image.png

Topic 模式#

主题模式,注意这里的主题模式,和 ActivityMQ 里的不一样。 ActivityMQ 里的主题,更像是广播模式。 那么这里的主题模式是什么意思呢? 如图所示消息来源有: 美国新闻,美国天气,欧洲新闻,欧洲天气。 如果你想看 美国主题: 那么就会收到 美国新闻,美国天气。 如果你想看 新闻主题: 那么就会收到 美国新闻,欧洲新闻。 如果你想看 天气主题: 那么就会收到 美国天气,欧洲天气。 如果你想看 欧洲主题: 那么就会收到 欧洲新闻,欧洲天气。

这样就可以灵活搭配~ image.png

创建交换机

image.png

创建队列

image.png

绑定在一起

image.png

image.png

发消息

image.png

看消息

image.png

Nack message requeue true 继续放
Nack message requeue false该方式每次获取一条消息,获取后从队列中移除

所有队列都能收到

image.png image.png

Spring引入RabbitMQ

1.引入依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.application.properties 配置
spring.rabbitmq.host=192.168.56.10
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
账号密码默认不用配置了

3.开启功能
@EnableRabbit
@SpringBootApplication
public class GulimallOrderApplication

给配置文件中
引入amqp场景;RabbitAutoConfiguration就会自动生效
给容器中自动配置了
RabbitTempLate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate 

测试RabbitMQ

@SpringBootTest
class GulimallOrderApplicationTests {
    @Autowired
    AmqpAdmin amqpAdmin;

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    void creatExchange() {
        //创建交换机
        /**String name,
         * boolean durable,
         * boolean autoDelete,
         * Map<String, Object> arguments
         */
        DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false);
        amqpAdmin.declareExchange(directExchange);
    }

    @Test
    void creatQueue() {
        /**
         *创建队列
         * String name, 队列名字
         * boolean durable,  是否持久化
         * boolean exclusive,  是否排ta
         * boolean autoDelete, 是否自动删除
         * @Nullable Map<String, Object> arguments
         */
        Queue queue = new Queue("hello-java-queue", true, false, false);
        amqpAdmin.declareQueue(queue);
    }

    @Test
    public void crateBinding() {
        /**
         * 绑定
         * String destination, 目的地
         * Binding.DestinationType destinationType, 目的地类型-两种不同类型
         * String exchange, 交换机
         * String routingKey, 路由键
         * @Nullable Map<String, Object> arguments 自定义参数
         *
         */
        Binding binding = new Binding("hello-java-queue",
                Binding.DestinationType.QUEUE, "hello-java-exchange", "hello.java", null);
        amqpAdmin.declareBinding(binding);
    }

    /**
     * 发送消息
     */
    @Test
    public void sendMessageTest() {
        OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
        reasonEntity.setId(1L);
        reasonEntity.setCreateTime(new Date());
        reasonEntity.setName("reason");
        reasonEntity.setStatus(1);
        reasonEntity.setSort(2);
        String msg = "Hello World";
        //1、发送消息,如果发送的消息是个对象,会使用序列化机制,将对象写出去,对象必须实现Serializable接口

        //2、发送的对象类型的消息,可以是一个json
        rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",
                reasonEntity);
        log.info("消息发送完成:{}",reasonEntity);
    }

}

配置发送消息转换 json

@Configuration
public class MyRabbitConfig {
    @Bean
    public MessageConverter  messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

//接收消息
@RabbitHandler //标注在方法上(重载区分不同的消息)
//发送的是Message 是一个消息详细信息 头+体  并且可以指定类型实体类
public void reciveMessagew(Message message, OrderReturnReasonEntity content) {
    //Body:'{"id":1,"name":"reason","sort":2,"status":1,"createTime":1603286841263}
    byte[] body = message.getBody();//得到消息体内容
    MessageProperties messageProperties = message.getMessageProperties();//消息头属性信息

    //message.getClass()) 获取类型
    System.out.println("接受消息" + message + content + message.getClass());
}
@RabbitListener(queues = {"hello-java-queue"}) //类+方法上 监听哪些队列
@RabbitHandler //标注在方法上(重载区分不同的消息)

RabbitMQ消息确认机制-可靠抵达

保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认机制。

publisher confirmCallback 确认模式 publisher returnCallback 未投递到 queue 退回模式 consumer ack机制 image.png

# 开启发送端消息抵达Broker确认
spring.rabbitmq.publisher-confirms=true
# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
@GetMapping("/sendMq")
public String sendMq(@RequestParam(value = "num",defaultValue = "10") Integer num) {
    for (int i = 0; i < num; i++) {
        if (i % 2 == 0) {
            OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
            reasonEntity.setId(1L);
            reasonEntity.setCreateTime(new Date());
            reasonEntity.setName("reason" + i);
            //CorrelationData 指定一个UUid
            rabbitTemplate.convertAndSend("hello-java-exchange", "hello22.java",
                    reasonEntity,new CorrelationData(UUID.randomUUID().toString()));
        } else {
            OrderEntity orderEntity = new OrderEntity();
            orderEntity.setOrderSn(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java",
                    orderEntity,new CorrelationData(UUID.randomUUID().toString()));
        }
    }
    return "ok";
}

消息抵达服务器 Broker 服务器收到消息

# 开启发送端消息抵达Broker确认
spring.rabbitmq.publisher-confirms=true (废弃)
spring.rabbitmq.publisher-confirm-type=correlated
/**
 * 定制RabbitTemplate
 */
@PostConstruct //MyRabbitConfig构造器创建完成 执行调用这个方法
public void initRabbitTemplate(){
    //设置一个确认回调机制
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        /*
         * @param correlationData 消息唯一关联id 消息唯一id
         * @param b 消息是否成功收到
         * @param s 失败原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean b, String s) {
        }
    });
}

消息正确抵达队列 消息回调

#开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
# 只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true

/**
 * 只要消息没有投递给指定的队列,就触发这个失败回调
 * message:投递失败的消息详细信息
 * replyCode:回复的状态码
 * replyText:回复的文本内容
 * exchange:当时这个消息发给哪个交换机
 * routingKey:当时这个消息用哪个路邮键
 */
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
    System.out.println("Fail Message[" + message + "]==>replyCode[" + replyCode + "]" +
            "==>replyText[" + replyText + "]==>exchange[" + exchange + "]==>routingKey[" + routingKey + "]");
});

消费端确认 1、消费者获取到消息,成功处理,可以回复Ack给Broker

basic.ack 用于肯定确认;broker将移除此消息 basic.nack 用于否定确认;可以指定broker是否丢弃此消息,可以批量 basic.reject 用于否定确认;同上,但不能批量 2、默认,消息被消费者收到,就会从broker的queue中移除 3、queue无消费者,消息依然会被存储,直到消费者消费 4、消费者收到消息,默认会自动ack。但是如果无法确定此消息是否被处理完成,或者成功处理。我们可以开启手动ack模式。

消息处理成功,ack(),接受下一个消息,此消息broker就会移除。 消息处理失败,nack()/reject(), 重新发送给其他人处理,或者容错处理后ack。 消息一直没有调用ack/nack方法,broker认为此消息正在被处理,不会投递给别人,此时客户端断开,消息不会被broker移除,会投递给别人。

  • 修改 application.properties 配置文件:
# 开启发送端消息抵达Broker确认
spring.rabbitmq.publisher-confirms=true
# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
# 只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
# 手动ack消息,不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  • 测试
@RabbitHandler
    public void revieveMessage(Message message,
        OrderReturnReasonEntity content, Channel channel) throws IOException {

        System.out.println("接收到消息..."+content);

        //拿到主体内容
        byte[] body = message.getBody();

        //拿到的消息头属性信息
        MessageProperties messageProperties = message.getMessageProperties();
        System.out.println("接受到的消息...内容" + message + "===内容:" + content);

        // Thread.sleep(3000);
        // Channel内按顺序自增
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("deliveryTag===>" + deliveryTag);

        try{
            // 签收货物,非批量模式
            channel.basicAck(deliveryTag, false);
        }catch (Exception e){
            // 网络中断(突然)
        }

    }

标签:reasonEntity,队列,spring,rabbitmq,消息,RabbitMQ,message
From: https://blog.51cto.com/u_15993308/6409111

相关文章

  • php rabbitmq队列的几种管理方案
     这里就懒得记录了,直接放上一篇还不错的知乎博主的博客吧。点击前往  ......
  • Rabbitmq在linux服务器的安装步骤
    Linux系统:CentOS7.x(如果是CentOS8.x的话,需要修改下面两个环境版本号中的el7为el8)Erlang:erlang-22.3.4.12-1.el7.x86_64.rpmRabbitMQ:rabbitmq-server-3.8.13-1.el7.noarch.rpm 1安装erlang Linux系统:CentOS7.x(如果是CentOS8.x的话,需要修改下面两个环境版本号中的el7为e......
  • springboot项目rabbitmq消费者消费json格式的String,出现无限循环抛出No method found
    转:springboot项目rabbitmq消费者消费json格式的String,出现无限循环抛出Nomethodfoundforclass[B     ......
  • 深入学习RabbitMQ五种模式(一)
    1.安装erlang下载otp_win64_25.3.exehttps://www.erlang.org/downloadserlang安装完成,需要配置erlang环境变量ERLANG_HOME=E:\software\ErlangOTPPATH=%PATH%;%ERLANG_HOME%\bin;2.安装RabbitMQ下载rabbitmq-server-3.11.13.exehttps://www.rabbitmq.com/download.html进入安装......
  • windows10环境下安装RabbitMQ以及延时插件(图文)
    安装转载:https://www.cnblogs.com/saryli/p/9729591.html插件转载:https://blog.csdn.net/nbdclw/article/details/107441772安装及配置环境第一步:下载并安装erlang原因:RabbitMQ服务端代码是使用并发式语言Erlang编写的,安装RabbitMQ的前提是安装Erlang。下载地址:http://ww......
  • RabbitMq镜像策略模式
    镜像策略ha-modeha-params说明exactlycount集群中队列副本的数量(主队列加上镜像)。count值为1表示一个副本:只有主节点。如果主节点不可用,则其行为取决于队列是否持久化。count值为2表示两个副本:一个队列主队列和一个队列镜像。换句话说......
  • RabbitMQ - 使用amqp库连接RabbitMQ(实例使用)
    1、发送端步骤分解如下:(1)建立连接conn,err:=amqp.Dial("amqp://admin:[email protected]:5672/")(2)打开channel这里的channel是AMQP里的概念,可以理解为多路复用的一个tcp长连接。(3)声明一个队列q,err:=ch.QueueDeclare(...)(4)创建消息msg:=amqp.Publishing{...}(5)发布......
  • RabbitMQ在Windows下设置服务启动
    1.管理员模式运行  cmd 2.进入RabbitMQ安装目录下的sbin目录   输入命令: cdrabbitMQ的sbin路径,进入sbin目录输入命令:rabbitmq-service.batinstall进入服务,开启rabbitMQ服务 ......
  • go-RabbitMQ
    erlang安装编译依赖:yuminstallmakegccgcc-c++build-essentialopensslopenssl-develunixODBCunixODBC-develkernel-develm4ncurses-devel解压:tar-zxvf创建存放环境目录:mkdir/opt/rabbitMq/erlang进入erlang解压目录执行命令:./configure--prefix=/opt/rabbit......
  • RabbitMQ简单介绍
    RabbitMQ是一款开源的消息中间件具备的特点1.高可靠,易扩展,高可用2.支持大多数的编程语言客户端3.遵循AMQP协议,也支持MQTT协议,自身采用Erlang语法开发RabbitMQ整体逻辑结构大体可以由三部分组成:生产者,Broker,消费者 而消息者就是从指定的消息队列中进行消息的消费交换器需......