首页 > 其他分享 >RabbitMQ:一篇学会RabbitMQ

RabbitMQ:一篇学会RabbitMQ

时间:2024-03-09 16:35:29浏览次数:20  
标签:false 一篇 队列 学会 System RabbitMQ 消息 channel String

MQ概述

MQ,Message Queue,是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生产、存储、消费全过程API的软件系统。消息即数据。一般消息的体量不会很大。
知道是消息队列,他的规律是先进先出就行,光听概念没啥体验的。

MQ的作用

MQ的最重要的3个作用如下:

限流削峰

MQ可以将系统的超量请求暂存其中,以便系统后期可以慢慢进行处理,从而避免了请求的丢失或系统被压垮。
image
说个简单的比喻:全班50个同学同时找老师问问题,老师肯定一时顶不住啊。这时候就需要有个中间人,比如说班长,大家的问题都可以发到班长那里,后面老师可根据自身的情况从班长那处理同学们处理的问题

异步解耦

上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量与并发度,且系统耦合度太高。而异步调用则会解决这些问题。所以两层之间若要实现由同步到异步的转化,一般性做法就是,在这两层间添加一个MQ层。
image
异步解耦和限流削峰不一样的是请求的人群;除了用户发送的请求可以先储存在MQ上,系统间的调用也涉及请求发送,也理应可用mq进行中间储存。
举个简单的例子:假设A是用户,B是服务员,C是大厨。A发起请求要一份菜,B请求C去做菜,但由于C还做其他用户的菜,不能马上做A的菜,这时候A和B在干等着。这样肯定是不行,所以得有个中间人D,B告诉D做啥菜,D记起来,C到时候根据D的记录做就完了呗

数据收集

分布式系统会产生海量级数据流,如:业务日志、监控数据、用户行为等。针对这些数据流进行实时或批量采集汇总,然后对这些数据流进行大数据分析,这是当前互联网平台的必备技术。通过MQ完成此类数据收集是最好的选择。
这时候的MQ的角色就相当于 数据库的功能了。

RabbitMQ简介

AMQP协议

AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP规范发布。类比HTTP。

而RabbitMQ正是遵循了AMQP协议来设计的

RabbitMQ架构图

简单架构图

image


详细架构图

image
1. Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
2. Virtual host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost创建exchange / queue等
3. Connection: publisher / consumer和 broker之间的TCP连接;Connection里包含很多的小连接Channnel
4. Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销
5. Exchange: message到达 broker的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue中去。常用的类型有: direct (point-to-poInt),toplie(puIsI7susSey a
6. Queue:消息最终被送到这里等待consumer 取走
7. Binding: exchange和queue之间的虚拟连接,binding 中可以包含routing key。Binding 信息被保存到exchange 中的查询表中,用于message 的分发依据

RabbitMQ工作模式

RabbitMQ提供了6种工作模式:简单模式、work queues、Publish/Subscribe发布与订阅模式、Routing路由模式、Topics主题模式、RPC远程调用模式(远程调用,不太算MQ;暂不作介绍)。

RabbitMQ管理控制台介绍

image
简单说一下rabbitmq端口情况:
5672端口:用于单机版的rabbitClient连接
25672端口:用于集群版的rabbitClient连接
15672端口:是rabbitmq图形化界面管理端所占的端口

RabbitMQ入门-HelloWold代码(简单队列模式)

本案例是一生产者和一消费者情况。

生产者

package com.atguigu.one.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class HWProducer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        // 创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");//默认localhost
        factory.setPort(5672);//默认是5672
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        //channel 实现了自动 close 接口 自动关闭 不需要显示关闭
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        /**
         * 定义一个队列:对应rabbitmqServer的对应,如果没有,则在rabbitmqServer上创建
         * 1. 队列名称
         * 2. 队列里面的消息是否持久化(是否保存在硬盘) 默认消息存储在内存中
         * 3. 该队列是否只供一个消费者进行消费(是否进行共享) true:可以多个消费者消费 false:只一个消费者消费
         * 4. 是否自动删除(最后一个消费者断开连接以后,该队列是否自动删除) true 自动删除 false:不自动删除
         * 5. 其他参数
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "hello world";
        /**
         * 发送一个消息
         * 1. 发送到哪个交换机;空字符串代表使用默认的交换机
         * 2. 路由的 key 是哪个
         * 3. 其他的参数信息
         * 4. 发送消息的消息体
         */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" 消息发送完毕");

        channel.close();
        connection.close();
    }
}

消费者

package com.atguigu.one.consumer;

import com.rabbitmq.client.*;

public class HWConsumer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        // 创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");//默认localhost
        factory.setPort(5672);//默认是5672
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        //channel 实现了自动 close 接口 自动关闭 不需要显示关闭
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        /**
         * 定义一个队列:对应rabbitmqServer的对应,如果没有,则在rabbitmqServer上创建
         * 1. 队列名称
         * 2. 队列里面的消息是否持久化(是否保存在硬盘) 默认消息存储在内存中
         * 3. 该队列是否只供一个消费者进行消费(是否进行共享) true:可以多个消费者消费 false:只一个消费者消费
         * 4. 是否自动删除(最后一个消费者断开连接以后,该队列是否自动删除) true 自动删除 false:不自动删除
         * 5. 其他参数
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 推送的消息如何进行消费的接口回调
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println(message);
        };
        // 取消消费的一个回调接口 如在消费的时候队列被删除掉了DeliverCallback
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(" 消息消费被中断");
        };
        /**
         * 消费者消费消息
         * 1. 消费哪个队列
         * 2. 消费成功之后是否要自动应答 true 自动应答 false 手动应答
         * 3. 消费者收到消息后的回调函数
         * 4. 消费者取消或中断消费后的回调函数
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

RabbitMQ入门-工作队列模式

本次案例使用一个生产者,两个消费者情况;生产者指定的队列为一次消费模式,且发送多次消息时,两个消费者的消费情况是如何的呢?

生产者1

public class producer1 {
    private static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws Exception {
        try(Channel channel= RabbitMqUtils.getChannel();) {
            /**
             * 定义一个队列:对应rabbitmqServer的对应,如果没有,则在rabbitmqServer上创建
             * 1. 队列名称
             * 2. 队列里面的消息是否持久化(是否保存在硬盘) 默认消息存储在内存中
             * 3. 该队列是否只供一个消费者进行消费(是否进行共享) true:可以多个消费者消费 false:只一个消费者消费
             * 4. 是否自动删除(最后一个消费者断开连接以后,该队列是否自动删除) true 自动删除 false:不自动删除
             * 5. 其他参数
             */
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            // 从控制台当中接受信息
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()){
                String message = scanner.next();
                channel.basicPublish("e2",QUEUE_NAME,null,message.getBytes());
                System.out.println(" 发送消息完成:"+message);
            }
        }
    }
}

消费者1

public class consumer1 {
    private static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println(" 接收到消息:"+receivedMessage);
        };
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println(consumerTag+" 消费者取消消费接口回调逻辑");
        };
        System.out.println("C1 消费者启动等待消费......");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

消费者2

public class consumer2 {
    private static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println(" 接收到消息:"+receivedMessage);
        };
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println(consumerTag+" 消费者取消消费接口回调逻辑");
        };
        System.out.println("C2 消费者启动等待消费......");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

执行上面代码可以发现,当队列为一次消费,生产者发送多个消息到hello队列,两个消费者是轮询来消费的。

RabbitMQ消息应答

在channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback)的参数2表示是否自动应答;true为自动应答,false为手动应答,这里的应答也就是要讲的消息应答,至于是干什么的呢?下面来仔细讲一下:
当消费者接收到消息或者消息处理完成了,然后给broker响应的这一动作叫做消息应答;那么自动应答和手动应答的区别是什么呢?

自动应答:当消费者收到消息后,立马给broker进行响应;此时消息可能在消费者那还没利用起来,执行完channel.basicConsume就应答了。
手动应答:当消费者收到消息后,不会给broker进行响应,而是在我们利用完消息处理好业务代码后,手动的应答。

消息应答的好处是什么?

消息应答的好处是能保证数据利用价值,更准确来讲手动应答能保证消息利用价值,因为broker在没有接收到应答前是会保留消息的,不会删除掉。
如果是自动应答,消息者A还没把消息利用完呢,就挂掉了。此时再broker已经没有此消息了,就造成了本次消费失败。
如果是手动应答,我们可以在处理完业务代码后,消息利用完了,再手动应答。这样就能保证消息消费成功。

手动应答的方法

Channel.basicAck(用于肯定确认):RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
Channel.basicNack(用于否定确认):不处理该消息了直接拒绝,可以将其丢弃了;可以批量拒绝
Channel.basicReject(用于否定确认):不处理该消息了直接拒绝,可以将其丢弃了:一次只拒绝一个

手动应答之批量应答

手动应答比自动应答多了一个功能是:批量应答。这里的批量应答什么意思呢?
比如说现在channel里有【5 -> 6 -> 7 -> 8】4个元素,消费者消费了8并作出应答,顺便把5,6,7这3个还没有消费的也进行了应答,这就叫批量应答。
如何设置是否批量应答呢?在上面的basicAck,basicNack,basicReject方法都有参数设置是否批量应答,true为批量应答,false不批量应答....

批量应答的好处是可以减少资源拥堵,提升消费的效率。但是会存在消息未利用完毕,然后broker数据丢失的情况。

总结

  1. 实际开发尽量手动应答,不要自动应答。
  2. 手动应答在设置是否批量应答时,尽量不要批量应答
    这两个都是保护消息不丢失的手段。

RabbitMQ消息保护机制

消息重新入队

消息重新入队针对的是【broker和消费者】间的消息保护。假设现在队列Q1里有2条消息M1和M2,消费者有A和B去消费Q1里的消息,A收到了M1,B收到了M2;A成功地消费M1并作出了应答,但是B在处理M2的过程中宕机了还未作出应答。此时M2还会保存broker上,并且broker检测到B发生故障了,从而让M2重新入队,给M1进行处理;这个过程就是rabbitmq对消息保护的手段之一

持久化

持久化针对的是【生产者和broker】间的队列和消息保护。如果broker宕机了,默认情况下broker上的队列和消息是会丢失的,因为是在内存进行保存。为了解决这个问题,需要将队列和消息进行持久化(注:队列持久化和消息持久化是两回事)

队列持久化

队列想要持久化很简单,只需要在声明队列时的参数2设置为true即可,如下:

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

这里注意下:一旦队列进行声明定义,是不可以重新修改他的属性的。比如说原先定义了队列A是非持久化的,现在再定义队列A持久化是会报错的;如果确定要修改,需要把原先的队列A先删除掉,可在rabbitmq图形化界面进行操作。

当队列持久化了,在图形化界面队列记录的Featrues值为D,如下:
image

消息持久化

消息持久化也很简单,发送消息时设置MessageProperties.PERSISTENT_TEXT_PLAIN即可,如下:

channel.basicPublish("",TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));

发布确认模式

发布确认模式针对的是【生产者和broker】间的消息保护。默认情况下发布确认模式是关闭的,也就是会出现一个现状:生产者发消息到broker后,具体broker有没有收到,生产者是不清楚;这就会导致一个问题:如果broker此时宕机,而生产者并不清楚这回事而继续输出消息,将导致消息的大量丢失。

为了解决上面的问题,才有了发布确认模式。说简单点,发布确认模式的过程是:

  1. 如果消息是非持久化的,broker在收到消息后,立马响应给生产者此消息已收到。
  2. 如果消息是持久化的,broker收到并保存到磁盘上,才会响应给生产者此消息已收到

对于未确认收到的消息,还是broker进行反馈,接着由生产者进行补发直至成功确认。这样就能够百分百确定消息成功的被broker接收到,不会出现传输过程中的丢失。然后发布确认模式有3种方式,如下:

单个发布确认

单个发布确认是指:生产者发送一条消息后,等待broker响应确认后,才会去发送下一条消息,这个过程是同步的;和以往生产者代码多了两步:

  1. 开启发布确认模式:channel.confirmSelect();
  2. 等待确认:channel.waitForConfirms();这里收到确认结果后才会去发下一条消息。
    示例代码如下:
    public static void main(String[] args) throws Exception {
        //单个发布确认示例; 发布10000 个单独确认消息, 耗时3284ms
        publishMessageIndividually();
    }
	
	
	    //单个消息发布确认
    public static void publishMessageIndividually() throws Exception {
        Integer MESSAGE_COUNT = 10000;

        Channel channel = RabbitMqUtils.getChannel();
        String queueName = "single";
        channel.queueDeclare(queueName, false, false, false, null);
        // 开启发布确认
        channel.confirmSelect();
        long begin = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", queueName, null, message.getBytes());
            // 服务端返回 false 或超时时间内未返回,生产者可以消息重发
            boolean flag = channel.waitForConfirms();
            if (flag) {
                System.out.println("消息发送成功,broker已接收到");
            }
        }
        long end = System.currentTimeMillis();
        System.out.println(" 发布" + MESSAGE_COUNT + " 个单独确认消息, 耗时" + (end - begin) + "ms");
    }

批量发布确认

其实批量确认模式和单个确认模式差不多;比如所现在有10条数据,单个确认模式是每条消息都确认是否收到;假设批量确认的消息数量为5条确认一次,也就是第5条和第10条消息再调用channel.waitForConfirms()进行确认,整个过程还是同步的。示例代码如下:

    public static void main(String[] args) throws Exception {
        //批量发布确认示例; 发布10000 个批量确认消息, 耗时599ms
        publishMessageBatch();
    }

    public static void publishMessageBatch() throws Exception {
        Integer MESSAGE_COUNT = 10000;
        Channel channel = RabbitMqUtils.getChannel();
        String queueName = "batch";
        channel.queueDeclare(queueName, false, false, false, null);
        // 开启发布确认
        channel.confirmSelect();
        // 批量确认消息大小
        int batchSize = 100;
        long begin = System.currentTimeMillis();
        for (int i = 1; i <= MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", queueName, null, message.getBytes());

            if(i % batchSize == 0){
                channel.waitForConfirms();
                System.out.println("批量消息发送成功!"+i);
            }
        }
        long end = System.currentTimeMillis();
        System.out.println(" 发布" + MESSAGE_COUNT + " 个批量确认消息, 耗时" + (end - begin) + "ms");
    }

异步发布确认

异步发布确认上面两个两种确认模式相比,有两个不同点:

  1. 生产者是异步发送消息;broker是异步响应消息
  2. 生产者有2条线程在工作,发送线程只管发消息不需要等待确认,监听线程在监听broker的消息是否确认成功

示例代码如下:

    public static void main(String[] args) throws Exception {
        //异步发布确认示例;发布10000 个异步确认消息, 耗时331ms
        publishMessageAsync();
    }
	
    public static void publishMessageAsync() throws Exception {
        Integer MESSAGE_COUNT = 10000;

        Channel channel = RabbitMqUtils.getChannel();
        String queueName = "async";
        channel.queueDeclare(queueName, false, false, false, null);
        // 开启发布确认
        channel.confirmSelect();
        
        /**
         * 添加一个异步确认的监听器
         * 参数1. 确认收到消息的回调
         * 参数2. 未收到消息的回调
         */
        channel.addConfirmListener(
                /**
                 * 确认收到消息的一个回调
                 * 1. 消息序列号;
                 * 2. 是否批量确认;true:批量确认;可以确认小于等于当前序列号的消息;false:单个确认,确认当前序列号消息
                 */
                (sequenceNumber, multiple) -> {//sequenceNumber当前确认消息的ID;multiple是否批量确认
                    System.out.println("系列号为"+sequenceNumber+"的消息确认成功!");
                },
                /**
                 * 未确认收到消息的一个回调
                 * 1. 消息序列号;
                 * 2. 是否批量确认;true:批量确认;可以确认小于等于当前序列号的消息;false:单个确认,确认当前序列号消息
                 */
                (sequenceNumber, multiple) -> {
                    System.out.println("系列号为"+sequenceNumber+"的消息未确认!");
                }
        );
        long begin = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = " 消息" + i;
            channel.basicPublish("", queueName, null, message.getBytes());
            System.out.println("发送消息"+i);
        }
        long end = System.currentTimeMillis();
        System.out.println(" 发布" + MESSAGE_COUNT + " 个异步确认消息, 耗时" + (end - begin) + "ms");
    }

如何处理异步未确认消息

最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ConcurrentSkipListMap 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

总结

发布确认模式如果要使用,最好使用异步发布确认模式。

消息分发控制

比如说现在有队列Q,消费者A和B都监听这个队列。A消费地快,B消费地慢,但默认情况下,broker是采用轮询进行分发,他不管消费者的消费水平如何,这种就会导致A很空闲,B很忙。
为了解决上面的问题,我们可以在消费者这边设置Qos来限制broker给消费者发送消息的数量;如下:

Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C1 等待接收消息处理时间较短");
		//设置Qos,记得是在消费前设置
        channel.basicQos(5);
        // 采用手动应答
        boolean autoAck = false;
        channel.basicConsume(ACK_QUEUE_NAME, autoAck, (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            SleepUtils.sleep(3);
            System.out.println(" 接收到消息:" + message);
            /**
             * 1. 消息标记 tag/DeliveryTag:理解为消息ID
             * 2. 是否批量应答还未应答消息
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }, (consumerTag) -> {
            System.out.println(consumerTag + " 消费者取消消费接口回调逻辑");
        });

遵循的原则是:消费的快,Qos的值倾向大,消费的慢,Qos倾向小。可能到这大家对Qos还不清楚,比如说现在给消费者A设置Qos为5,就代表消费者A的channel最多接受5个消息,然后其他信息就暂时保存在队列里就不要再发给消费者A的channel了,等A的channel中消息小于5时再发。要注意,这5个消息通常情况下,最前的消息,A正在处理中但未应答,后4个消息还未处理且还未应答...

交换机(exchange)

前面的示例代码中,我们都是【生产者 -> 队列 -> 消费者】的流程来进行消息发送和接收的,其实这个仔细说是不准确的
站在生产者这边的角度:真正的流程应该是【生产者 -> -交换机 -> 队列 -> 消费者】
站在消费者这边的角度:真正的流程应该是【消费者 -> - 队列 】

总结就是:生产者和交换机进行交互,消费者和队列进行交互;
下面简单说下交换机的概念和作用:
交换机概念:生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。
交换机的最重要的两个作用就是:

  1. 接收生产者的消息
  2. 按规则将消息分发到特定的队列里

交换机之Binding、RoutingKey

Binding其实就是绑定的意思,交换机想要将消息分发到队列上,一定得先进行绑定。绑定后才能进行分发,那如何进行绑定呢?也就是我们的RoutingKey,交换机和队列的连接桥梁就是RoutingKey

可以这样理解为:交换机是家,队列是学校,从家到队列的路就是RoutingKey(这里先提前剧透下,RoutingKey是可以出现重复的,不具有唯一性)

还记得生产者这边的发送代码吗?如下:

channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));

参数1是交换机名称;参数二我写的是队列名称,实际这根本不是传队列名称,而是RoutingKey;记住最开始说的话,生产者不跟队列直接打交道,只通过RoutingKe和交换机打交道来发送消息

到这里可能有人会问:为什么参数1交换机名称传"",参数二传队列名称呢?实际上交换机名称传"",代表使用默认的交换机,即如下:
image
参数2传队列名称是因为:默认的交换机和队列自动绑定,然后默认生成的的RoutingKey就是队列的名称

交换机类型

交换机是有类型区分的,目前有:直接(direct), 主题(topic) ,标题(headers) , 扇出(fanout)
下面除了标题(headers)不讲,因为很少用了在实际开发,然后其他都会说下

直接(direct)

如果指定交换机的类型为direct,一般路由到不同队列的RoutingKey是不一样的(当然也可以都一样的,就和后面的fanout是一样的了,因此尽量手动的让他们不一样)
image
下面示例代码:

生产者

public class producer {
    private static final String EXCHANGE_NAME = "exchange1";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        // 创建多个 bindingKey
        Map<String, String> bindingKeyMap = new HashMap<>();
        bindingKeyMap.put("routekying1", "routekying1的消息");
        bindingKeyMap.put("routekying2", "routekying2的消息");

        for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
            String bindingKey = bindingKeyEntry.getKey();
            String message = bindingKeyEntry.getValue();

            channel.basicPublish(EXCHANGE_NAME, bindingKey, null,
                    message.getBytes("UTF-8"));
            System.out.println(" 生产者发出消息:" + message);
        }
    }
}

消费者1和消费者2

public class consumer1 {
    private static final String EXCHANGE_NAME = "exchange1";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        String queueName = "q1";
        channel.queueDeclare(queueName, false, false, false, null);
        //将交换机和队列进行绑定
        channel.queueBind(queueName, EXCHANGE_NAME, "routekying1");
        System.out.println(" 等待接收消息.....");

        channel.basicConsume(queueName, true, (consumerTag, delivery) -> {
            System.out.println("消费的消息是"+delivery.getBody().toString());
        }, consumerTag -> {
        });
    }
}
public class consumer2 {
    private static final String EXCHANGE_NAME = "exchange1";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        String queueName = "q2";
        channel.queueDeclare(queueName, false, false, false, null);
        //将交换机和队列进行绑定
        channel.queueBind(queueName, EXCHANGE_NAME, "routekying2");
        System.out.println(" 等待接收消息.....");

        channel.basicConsume(queueName, true, (consumerTag, delivery) -> {
            System.out.println("消费的消息是"+delivery.getBody().toString());
        }, consumerTag -> {
        });
    }
}

扇出(fanout)

先声明下队列的消息只能是一次消费,因此是做不到两个消费者在同一个队列各消费同一条消息的情况。但是我们可以把同一条消息,发到两个队列里,让两个消费者各消费不同队列同一条消息即可。
而这个就是我们的fanout,其实也是发布订阅模式。只要交换机绑定多个队列,且RoutingKey指定为一样的,就能做到同条消息能多份分发到不同的队列上

示例代码如下:

生产者

public class Producer1 {
    private static final String EXCHANGE_NAME = "exchange2";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        Scanner sc = new Scanner(System.in);
        System.out.println("请输入信息");

        while (sc.hasNext()) {
            String message = sc.nextLine();
            channel.basicPublish(EXCHANGE_NAME, "rk1", null, message.getBytes("UTF-8"));
            System.out.println(" 生产者发出消息" + message);
        }
    }

}

消费者1和消费者2

public class Consumer1 {
    private static final String EXCHANGE_NAME = "exchange2";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        String queueName = "fanout_queue_1";
        channel.queueDeclare(queueName, false, false, false, null);
        // 把该临时队列绑定我们的 exchange 其中 routingkey( 也称之为 binding key) 为空字符串
        channel.queueBind(queueName, EXCHANGE_NAME, "rk1");
        System.out.println(" 等待接收消息, 把接收到的消息打印在屏幕.....");

        channel.basicConsume(queueName, true, (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" 控制台打印接收到的消息" + message);
        }, consumerTag -> {
        });
    }
}
public class Consumer2 {
    private static final String EXCHANGE_NAME = "exchange2";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        String queueName = "fanout_queue_2";
        channel.queueDeclare(queueName, false, false, false, null);
        // 把该临时队列绑定我们的 exchange 其中 routingkey( 也称之为 binding key) 为空字符串
        channel.queueBind(queueName, EXCHANGE_NAME, "rk1");
        System.out.println(" 等待接收消息, 把接收到的消息打印在屏幕.....");

        channel.basicConsume(queueName, true, (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" 控制台打印接收到的消息" + message);
        }, consumerTag -> {
        });
    }
}

这里再说一下要注意的地方:交换机和队列需要提前创建好并指定routingKey,可以在控制台上创建好,也可以通过方法channel.xxxxDeclare()的方式进行定义和创建。

标签:false,一篇,队列,学会,System,RabbitMQ,消息,channel,String
From: https://www.cnblogs.com/ibcdwx/p/16120126.html

相关文章

  • RabbitMQ
    一、组件   二、6种工作模式    HolloWord、WorkQueues、Publish/Subscribe、Routing、Topic、RPC。    2.1、HolloWord模式               该模式不会用到交换机,实际工作场景也不会用到,只是简单的消息的生产和消费。  ......
  • 开源.NET8.0小项目伪微服务框架(分布式、EFCore、Redis、RabbitMQ、Mysql等)
    1、前言为什么说是伪微服务框架,常见微服务框架可能还包括服务容错、服务间的通信、服务追踪和监控、服务注册和发现等等,而我这里为了在使用中的更简单,将很多东西进行了简化或者省略了。年前到现在在开发一个新的小项目,刚好项目最初的很多功能是比较通用的,所以就想着将这些功能抽......
  • C++STL学习第一篇(什么是STL以及string的各种功能用法)
    STLSTL提供了六大组件,彼此之间可以组合套用,这六大组件分别是:容器、算法、迭代器、仿函数、适配器、空间配置器。数据结构和容器管理:STL提供了多种数据结构和容器,如向量(vector)、链表(list)、集合(set)、映射(map)等。这些容器可以帮助程序员方便地存储和管理数据,根据需求进行动态调......
  • 利用SpringAMQP依赖使用RabbitMQ
    消息消费端和提供端需要引入依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>都需要在application.yml进行如下配置spring:rabbitmq:host:192.168.230.100#r......
  • Centos 安装RabbitMQ
    一、步骤   【步骤一】:更新软件包的存储库。yum-yupdate   【步骤二】:Erlang在默认的YUM存储库中不可用,因此您将需要安装EPEL存储库yum-yinstallepel-releaseyum-yupdate   【步骤三】:RabbitMQ是基于Erlang(面向高并发的语言)语言开发,所以在安装Ra......
  • 消息中间件RabbitMQ的原理和使用
    一、什么是MQMQ是MessageQueue的简写,表示消息队列的意思,它是一种用于在应用程序之间传递消息的技术。多用于分布式系统之间进行通信,作为消息中间件使用。MQ的作用有应用解耦、异步提速、流量削峰填谷,当然也有缺点,加入MQ消息中间件会额外增加系统的外部依赖,是系统稳定性降低,同......
  • 3月5号(工程日报第一篇)
    所花时间(包括上课):3小时25分钟代码量(行):91行(课上练习题,Java代码读取哈利波特寻找最长首尾相连单词链)博客量(篇):2篇了解到的知识点:Java对于.txt文本文件的读取操作具体知识点:1、文件相对路径的书写存储 2、判断文件是否存在或是否打开成功 3、文件多行读取并将读取字符串去......
  • docker安装Rabbitmq
    搜索rabbitmq镜像dockersearchrabbitmq默认拉取最新版本镜像dockerpullrabbitmq创建并运行rabbitmq容器dockerrun-d--hostnamemy-rabbit--namemy-rabbit-p15672:15672-p5673:5672rabbitmq-d:该选项表示在后台(detachedmode)运行Docker容器,即启动一个守护......
  • ubuntu安装rabbitmq
    安装所需软件包sudoapt-getinstallwgetapt-transport-https-y添加密钥,整个一起复制curl-1sLf"https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA"|sudogpg--dearmor|sudotee/usr/share/keyrings/com.rabbitmq.tea......
  • RabbitMQ部署
    拉取RabbitMQ镜像,启动容器dockerpullrabbitmq:3-managementdockerrun\-eRABBITMQ_DEFAULT_USER=root\#设置环境变量,账号,密码-eRABBITMQ_DEFAULT_PASS=123\--namemq\#起名--hostnamemq1\#配置主机名,集群部署时候有用,非集群部署可不配-p15672:15672\#......