首页 > 其他分享 >RabbitMQ直连交换机(Direct)

RabbitMQ直连交换机(Direct)

时间:2024-06-16 14:32:23浏览次数:12  
标签:直连 String Direct RabbitMQ 交换机 ROUTING public channel NAME

直连交换机(Direct)

指定交换机模式为直连
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

public class DirectExchange {

    public static final String EXCHANGE_NAME = "direct_exchange_test";

    public static void main(String[] args) {
        try {
            Channel channel = RabbitMqUtils.getChannel();
            //BuiltinExchangeType.DIRECT 指定为直连交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

                List<Map<String,String>> mapList = new ArrayList<>();
                for(int i = 0; i < 11; i++){
                    //key = routingkey value=消息
                    Map<String,String> mapMessage = new HashMap<>();
                    if(i <= 5){
                        mapMessage.put("order","我是订单消息"+i);
                        mapList.add(mapMessage);
                    }else {
                        mapMessage.put("warehouse","我是仓库消息"+i);
                        mapList.add(mapMessage);
                    }
                }
    
                for(Map<String,String> map : mapList){
                    for(Map.Entry<String,String> message : map.entrySet()){
                        //1.交换机名称  2.routingKey 3.其他属性 例如持久化,4.消息
                        channel.basicPublish(EXCHANGE_NAME,message.getKey(),null,message.getValue().getBytes(StandardCharsets.UTF_8));
                        System.out.println("生产者发出消息:"+message.getValue());
                    }
                }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

}

消费者1

public class DirectExchangeConsumer1 {
    //交换机名称
    public static final String EXCHANGE_NAME = "direct_exchange_test";
    public static final String QUEUE_NAME = "order_queue";
    public static final String ROUTING_KEY = "order";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        /**
         * 1.交换机名称
         * 2.交换机类型
         */
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //声明一个队列
        channel.queueDeclare(QUEUE_NAME,true,false,true,null);
        //绑定队列
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
        System.out.println(ROUTING_KEY+":消费者等待接收消息!");


        //接收消息回调
        DeliverCallback callback = (consumerTag, message) -> {
            String receiveMessage = new String(message.getBody(), Charsets.UTF_8.name());
            System.out.println(ROUTING_KEY+"_接收到消息:"+receiveMessage+" 标签:"+consumerTag);
        };
        //取消消息回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println(ROUTING_KEY+"_消费者取消消费接口回调逻辑:"+consumerTag);
        };
        channel.basicConsume(QUEUE_NAME,false,callback,cancelCallback);
    }

}

消费者2

public class DirectExchangeConsumer2 {
    //交换机名称
    public static final String EXCHANGE_NAME = "direct_exchange_test";
    public static final String QUEUE_NAME = "warehouse_queue";
    public static final String ROUTING_KEY = "warehouse";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        /**
         * 1.交换机名称
         * 2.交换机类型
         */
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //声明一个队列
        channel.queueDeclare(QUEUE_NAME,true,false,true,null);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
        System.out.println(ROUTING_KEY+":消费者等待接收消息!");


        //接收消息回调
        DeliverCallback callback = (consumerTag, message) -> {
            String receiveMessage = new String(message.getBody(), Charsets.UTF_8.name());
            System.out.println(ROUTING_KEY+"_接收到消息:"+receiveMessage+" 标签:"+consumerTag);
        };
        //取消消息回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println(ROUTING_KEY+"_消费者取消消费接口回调逻辑:"+consumerTag);
        };
        channel.basicConsume(QUEUE_NAME,false,callback,cancelCallback);
    }

}

测试
生产者分别给不同的routingkey发送消息
在这里插入图片描述
消费者1接收6条消息
在这里插入图片描述
消费者这2接收到5条消息
在这里插入图片描述

实现的方式主要是靠routingkey 在生产者中发送指定的消息,同时声明routingkey,消费者绑定队列的时候声明队列、交换机名称、routingkey,就可以获取到指定消息。

标签:直连,String,Direct,RabbitMQ,交换机,ROUTING,public,channel,NAME
From: https://blog.csdn.net/weixin_42942786/article/details/139716911

相关文章

  • LVS_Director + KeepAlived + 邮件报警
    目录一.环境准备二. 对master和backup操作三.配置master主机四.配置backup主机六.验证虚拟IP七.配置后端两个web服务器对web1和web2主机都进行如下操作: 单独修改web1主机单独修改web2主机验证八.设置邮件报警 一.环境准备KeepAlived在该项目中的功能......
  • 【pycharm调试模式异常】can‘t open file ‘C:\\Program‘: [Errno 2] No such fil
    错误系统:wendows10pycharm版本:pycharm专业版和社区版都有对应问题2023.1和2023.2都有同样问题python版本:3.11官网问题地址上面只是我出问题的环境,不知道其他环境或者版本有没有同样的问题现象:执行debug,程序启动后立即退出,错误信息:can’topenfile‘C:\Progra......
  • 5分钟带你了解RabbitMQ的(普通/镜像)集群
    前言让我们深入探讨RabbitMQ的集群配置,了解各种集群模式的利弊。本次讨论的重点是帮助您快速理解RabbitMQ集群的运作方式,以及选择最适合您需求的模式。好的,话不多说。在RabbitMQ中,即使只有一个节点,该节点的服务也会被作为一个集群来处理。这意味着单节点系统也遵循集群架构的规范......
  • RabbitMQ
    RPCRPC(远程过程调用),简单来说,就是一个进程A向另外一个进程B请求服务。进程A调用进程B的服务,就好像在调用自己进程的服务方法一样,无需关心内部的实现细节。传统的进程之间通信,是由服务端监听在某个端口,客户端进程通过远程过程调用(RPC)方式和服务端进程进行通信,这种通信方式,当消......
  • [罗嗦的详解]BURP官方靶场Lab: SSRF with filter bypass via open redirection vulner
    参考视频:官方把场下的俩个视频https://youtu.be/iF1BPVTqM10抓取checkstore按钮的POST请求,请求体:stockApi=/product/stock/check?productId=3&storeId=1nextstore按钮的get请求:GET/product/nextProduct?currentProductId=3&path=/product?productId=4HTTP/2path参......
  • “RabbitMQ入门指南:从入门到起飞,这一篇就够!打造高效消息通信系统的第一步“。
    1.前言        RabbitMQ是一个开源的消息代理软件,它实现了高级消息队列协议(AMQP)的标准,并用Erlang语言编写。作为消息代理,RabbitMQ接收、存储和转发消息,帮助应用程序之间实现异步通信。它提供了一个强大而灵活的消息传递机制,可以在分布式系统中可靠地传递消息,确保消息......
  • VUE之重定向redirect
    VUE之路由和重定向redirect这个小知识点是在学习做项目的时候遇到的一个问题,借鉴了一个他人的项目,是一个酒店管理系统,拿到源码之后导到我的vscode里。参考链接导的过程比较顺利,正常安装,加依赖,没有报错的情况,这个源码一共设计了4款登录界面,但是我在运行服务的时候,一直是进......
  • springboot rabbitmq如何保证消息顺序消费
    很多时候,消息的消费是不用保证顺序的,比如借助mq实现订单超时的处理。但有些时候,业务中可能会存在多个消息需要顺序处理的情况,比如生成订单和扣减库存消息,那肯定是先执行生成订单的操作,再执行扣减库存的操作。那么这种情况下,是如何保证消息顺序消费的呢?首先,为了效率,我们可以设置......
  • RabbitMQ-如何保证消息不丢失
    RabbitMQ常用于异步发送,mysql,redis,es之间的数据同步,分布式事务,削峰填谷等.....在微服务中,rabbitmq是我们经常用到的消息中间件。它能够异步的在各个业务之中进行消息的接受和发送,那么如何保证rabbitmq的消息不丢失就显得尤为重要。首先要分析问题,我们就要明确rabbitmq在什么时......
  • 图形库使用 Direct3d
    1里面的数学矩阵是三角函数组合出来的旋转的时候xy两个变量距离变第三轴被影响角度2视锥远近四棱锥双剪切平面3三维点A点B点C点确定三位坐标,初始坐标是坐标中中心值x,y,z(0,0,0)4移动三维点点A到点B使用x加减y加减z加减5图片循环扫描整张图片6脚本着色器......