首页 > 其他分享 >RabbitMQ笔记

RabbitMQ笔记

时间:2022-11-24 21:22:06浏览次数:71  
标签:NAME EXCHANGE 队列 笔记 static RabbitMQ channel String

RabbitMQ笔记

个人学习笔记记录

参考:尚硅谷

1.消息队列

2. 轮训分发消息

3.消息应答

4.发布确认

5.交换机

5.1Exchanges

5.1.1Exchanges概念

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产 者甚至都不知道这些消息传递传递到了哪些队列中。 相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来 自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消 息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

5.2临时队列

5.3绑定(bindings)

5.4 Fanout交换机

5.4.1 Fanout介绍

​ fanout (扇出)交换机模式,它是将所有接受到的消息发送给它知道的工作队列中,广播。

系统默认中也有Fanout

![fanout](D:\Downloads\Typora\repo\mq\rabbitmq photo\fanout.png)

Fanout实操

image-20221124203933184

Logs 和临时队列的绑定关系如下图 PS:因为是随机生成的队列,所以queueName是随机的

![img](file:///C:\Users\Administrator\AppData\Roaming\Tencent\Users\1845472368\QQ\WinTemp\RichOle$@$WS~5EC1{}S$7@QT_FF$G.png)

ReceiveLogs01 和 ReceiveLogs02 将接收到的消息打印在控制台:

package com.cheng.rabbitmq.five;

import com.cheng.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * 收到logs02
 *
 * @Description: ReceiveLogs02
 * @Author cheng
 * @Date: 2022/11/24 20:13
 * @Version 1.0
 * @date 2022/11/24
 */
public class ReceiveLogs02 {

    private static final String EXCHANGE_NAME = "logs";


    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMQUtils.getChannel();

        //声明一个交换机 fanout扇出
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        //产生队列 临时队列
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName,EXCHANGE_NAME,"1");
        //等待消息
        System.out.println("R2等待消息===========");
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("ReceiveLogs02接受到的消息为:" + new String(message.getBody(),"utf-8"));
        };

        channel.basicConsume(queueName,true,deliverCallback, ConsumerTag->{});
    }
}

EmitLogs发送消息到交换机

package com.cheng.rabbitmq.five;

import com.cheng.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;

import javax.print.attribute.standard.NumberUp;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/**
 * 发出日志
 *
 * @Description: EmitLogs
 * @Author cheng
 * @Date: 2022/11/24 20:12
 * @Version 1.0
 * @date 2022/11/24
 */
public class EmitLogs {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMQUtils.getChannel();

        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        Scanner scanner = new Scanner(System.in);

        System.out.println("输入消息:");
        while (scanner.hasNext()){
            String msg = scanner.nextLine();
            channel.basicPublish(EXCHANGE_NAME,"", null,msg.getBytes("UTF-8"));
        }

    }

}

结果:当生产者发送消息到交换机时,交换机向绑定此交换机的所有队列发送消息,即使消费者的队列的routingKey不同,仍会接受到交换机广播的消息。

5.5 Direct交换机

介绍

direct 这种类型的工作方式是,消息只去到它绑定的 routingKey 队列中去。(路由模式)

image-20211017173331747

​ 从上图可以看出交换机模式为direct,Q1队列的routingKey为orange,Q2队列有两个routingKey分别为black,green;

在这种绑定情况下,生产者发布消息到 exchange 上,路由键为 orange 的消息会被发布到队列 Q1。路由键为 black 和 green 的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。

多重绑定

image-20211017173707075

当然如果 exchange 的绑定类型是 direct,但是它绑定的多个队列的 key 如果都相同,在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多,如上图所示。

实战

生产者:

public class EmitLogs {
    private static final String EXCHANGE_NAME = "direct_logs";
    private static final String ERROR = "ERROR";
    private static final String WARNING = "WARNING";
    private static final String INFO = "INFO";


    /**
     * 主要
     *
     * @param args arg游戏
     * @throws IOException
     * @throws TimeoutException 超时异常
     */
    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMQUtils.getChannel();

         //绑定交换机 direct模式
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        
        Scanner scanner = new Scanner(System.in);

        System.out.println("输入消息:");
        Boolean flag = true;
        while (scanner.hasNext()) {
            String msg = scanner.nextLine();
            System.out.println(flag);
            channel.basicPublish(EXCHANGE_NAME,INFO,null,msg.getBytes("UTF-8"));
            if (flag)
                channel.basicPublish(EXCHANGE_NAME, ERROR, null, msg.getBytes("UTF-8"));
            else
                channel.basicPublish(EXCHANGE_NAME,WARNING,null,msg.getBytes("UTF-8"));
            flag = !flag;
        }

    }

}

消费者1:

public class ReceiveLogs01 {
    private static final String EXCHANGE_NAME = "direct_logs";
    private static final String ERROR = "ERROR";
    private static final String WARNING = "WARNING";
    private static final String INFO = "INFO";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
		//绑定routingKey
        String queue = channel.queueDeclare().getQueue();
        channel.queueBind(queue,EXCHANGE_NAME,ERROR);
        channel.queueBind(queue,EXCHANGE_NAME,INFO);

        System.out.println("R1等待接受消息");
        DeliverCallback deliverCallback = (tag,msg)-> {
            System.out.println("消息为:" + new String(msg.getBody(),"UTF-8"));
        };

        channel.basicConsume(queue,true,deliverCallback,tag->{});
    }
}

消费者2:绑定routingKey不同

public class ReceiveLogs02 {

    private static final String EXCHANGE_NAME = "direct_logs";
    private static final String ERROR = "ERROR";
    private static final String WARNING = "WARNING";
    private static final String INFO = "INFO";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        String queue = channel.queueDeclare().getQueue();
        channel.queueBind(queue,EXCHANGE_NAME,WARNING);
        channel.queueBind(queue,EXCHANGE_NAME,INFO);

        System.out.println("R2等待接受消息");
        DeliverCallback deliverCallback = (tag, msg)-> {
            System.out.println("消息为:" + new String(msg.getBody(),"UTF-8"));
        };

        channel.basicConsume(queue,true,deliverCallback,tag->{});
    }
}

总结:

  • 生产者向ROUTING_KEY_INFO发送消息时,消费者1、2都可以收到消息
  • 生产者向ROUTING_KEY_WARNING发送消息时,消费者2可以收到消息
  • 生产者向ROUTING_KEY_ERROR发送消息时,消费者1可以收到消息
  • 如果启动多个消费者1,生产者向ROUTING_KEY_INFOROUTING_KEY_WARNING发送消息时,多个消费者1将会轮流公平接收消息,且每个消息只会被消费一次。

5.6 Topic

标签:NAME,EXCHANGE,队列,笔记,static,RabbitMQ,channel,String
From: https://www.cnblogs.com/chengbb/p/16923389.html

相关文章

  • Spring Boot 整合 RabbitMQ 之 Fanout Exchange模式 (三)
    摘要:那前面已经介绍过了Direct模式(一)Topic转发模式(二),这次介绍下FanoutExchange形式又叫广播形式,因此我们发送到路由器的消息会使得绑定到该路由器的每一个Queue接收......
  • Spring Boot 整合 RabbitMQ 之 Topic转发模式 (二)
    摘要:上一篇介绍了Direct模式的消息发生机制,这篇介绍下Topic转发模式的消息发生机制。一:首先我们看发送端,我们需要配置队列Queue,再配置交换机(Exchange),再把队列按照相应......
  • golang使用pprof笔记
    ==背景==程序研发完之后,发现程序会异常结束,然后由容器重新拉起,重启的频率不定,为了排查这个问题,准备使用pprof找找线索。 ==相关文档==pprof性能调优读懂pprof生成的......
  • springboot自定义starter源码笔记
    starter:1、这个场景需要使用到的依赖是什么?2、如何编写自动配置@Configuration//指定这个类是一个配置类@ConditionalOnXXX//在指定条件成立的情况下自动配置类生效@Aut......
  • 机器学习 数学基础 学习笔记 (5)常见统计量
    1.期望离散型随机变量的一切可能的取值xi与对应的概率Pi(=xi)之积的和称为该离散型随机变量的数学期望(设级数绝对收敛),记为E(x)。随机变量最基本的数学特征之一。它反映随机......
  • DockerFile解析-笔记-全
    是什么DockerFile是用来构建Docker镜像的构建文件,是由一系列命令和参数构成的脚本。构建三步骤:   编写dockerfile文件   dockerbuild   dockerrun文件什么......
  • RabbitMQ报错:Error: unable to perform an operation on node 'rabbit@manage01'.
    安装完成之后问题描述:1、打开http://ip:15672/#/后台管理页面会很慢2、springboot项目连接mq经常连接不上(偶尔可以连上)3、报错:AmqpTimeoutException:java.util.concurre......
  • java基础笔记
    一、泛型受限1、类型通配符上限:类/接口<?extends实参类型>要求该泛型的类型只能是实参类型,或者实参类型的子类类型 2、类型通配符下限:类/接口<?extends实参类型>要......
  • 修改rabbitMQ3.8默认端口号
    1、创建配置文件(最好就放这个目录,本人就遇到放其他目录不能加载配置文件,搞了几个小时才搞定)cd /etc/rabbitmqvimrabbitmq.conf 2、配置文件内容如下:(默认的数据管理......
  • 【JAVA笔记】JAVA常用的字符串操作03
    一、Java中常用的字符串操作publicclassCommon_String_Operations{publicstaticvoidmain(String[]args){booleanp1=isEmpty("aa");S......