首页 > 其他分享 >RabbitMQ工作模式-Routing模式

RabbitMQ工作模式-Routing模式

时间:2023-05-20 11:11:55浏览次数:27  
标签:connectionFactory String 队列 RabbitMQ INFORM 交换机 模式 channel Routing

路由模式: 1、每个消费者监听自己的队列,并且设置routingkey。 2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。      Routing模式是可以完成订阅模式的工作的,下面的代码在RoutingKey为“inform”的消息中有所体现

示例代码:

生产者:

public class Producer03_routing {
    //private static final String QUEUE = "HEllO";
    //队列名
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";
    private static final String ROUTINGKEY_EMAIL= "inform_email";
    private static final String ROUTINGKEY_SMS = "inform_sms";


    public static void main(String[] args) throws IOException, TimeoutException {
        //生产者于MQ建立连接
        //通过建立连接工厂建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //配置连接信息
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);  //端口
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //设置虚拟机        实现多个虚拟MQ    每个虚拟机相当于一个独立的MQ
        connectionFactory.setVirtualHost("/");
        //建立新连接
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            //创建会话通道  生产者和mq服务所有通信都在通道中完成
            channel = connection.createChannel();
            /** 声明队列     如果队列在MQ中没有,则创建
             queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
             param1:队列名称
             * param2:是否持久化    (如果重启后,队列还在)
             * param3:队列是否独占此连接-------队列只允许在该连接中访问,连接关闭队列删除
             * param4:队列不再使用时是否自动删除此队列
             * param5:队列参数--------队列拓展参数
             * */
            channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
            channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
            /**
             * 声明交换机
             * param1:交换机名称
             * param2:交换机类型
             *      fanout:对应的工作模式是:发布订阅publish/scribe
             *      direct:对应Routing工作模式
             *      topic:对应通配符工作模式
             *      headers:对应headers模式
             */
            channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
            //交换机和队列绑定String queue, String exchange, String routingKey
            /**
             * 参数明细
             * 1、队列名称
             * 2、交换机名称
             * 3、路由key     交换机根据路由KEY的值将消息转发到指定队列      在发布订阅工作模式时设置为空字符串
             */
            channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);
            channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS);
            for (int i = 0; i < 5; i++) {
                //定义一个消息的内容
                String message = "Hello,Send EMAIL message to User";
                //发送消息
                /**
                 * void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
                 * param1:Exchange的名称,如果没有指定,则使用Default Exchange
                 * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列,(如果是默认路由,routingKEY使用队列名)
                 * param3:消息包含的属性
                 * param4:消息体
                 * */
                //指定RoutingKey
                channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL,null,message.getBytes());
                System.out.println("Send to mq :'" + message + "'");
            }
            for (int i = 0; i < 5; i++) {
                //定义一个消息的内容
                String message = "Hello,Send INFORM message to User";
                //发送消息
                /**
                 * void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
                 * param1:Exchange的名称,如果没有指定,则使用Default Exchange
                 * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列,(如果是默认路由,routingKEY使用队列名)
                 * param3:消息包含的属性
                 * param4:消息体
                 * */
                //指定RoutingKey
                channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform",null,message.getBytes());
                System.out.println("Send to mq :'" + message + "'");
            }


        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(channel != null)
            {
                channel.close();
            }
            if(connection != null)
            {
                connection.close();
            }
        }
    }
}

消费者:

public class Consumer03_routing_email {
    //队列名
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";
    private static final String ROUTINGKEY_EMAIL= "inform_email";
    public static void main(String[] args) throws IOException, TimeoutException {
    //消费者于MQ建立连接
        //通过建立连接工厂建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //配置连接信息
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);  //端口
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //设置虚拟机        实现多个虚拟MQ    每个虚拟机相当于一个独立的MQ
        connectionFactory.setVirtualHost("/");
        //建立新连接
        Connection connection = connectionFactory.newConnection();
        //创建会话通道
        Channel channel = connection.createChannel();

        //声明队列     如果队列在MQ中没有,则创建
        /** queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         param1:队列名称
         * param2:是否持久化    (如果重启后,队列还在)
         * param3:队列是否独占此连接-------队列只允许在该连接中访问,连接关闭队列删除
         * param4:队列不再使用时是否自动删除此队列
         * param5:队列参数--------队列拓展参数
         * */
        channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
        /**
         * 声明交换机
         * param1:交换机名称
         * param2:交换机类型
         *      fanout:对应的工作模式是:发布订阅publish/scribe
         *      direct:对应Routing工作模式
         *      topic:对应通配符工作模式
         *      headers:对应headers模式
         */
        channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
        //交换机和队列绑定String queue, String exchange, String routingKey
        /**
         * 参数明细
         * 1、队列名称
         * 2、交换机名称
         * 3、路由key     交换机根据路由KEY的值将消息转发到指定队列      在发布订阅工作模式时设置为空字符串
         */
        channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);
        channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,"inform");
        //实现消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {

            /**
             当接收到消息后,此方法被调用
             * @param consumerTag  消费者标签:用来标识消费者
             * @param envelope  信封:可以拿到信息
             * @param properties    消息属性
             * @param body  消息内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //super.handleDelivery(consumerTag, envelope, properties, body);
                //交换机
                String exchange = envelope.getExchange();
                //消息ID:在channel中标识消息ID,可用于消息被确认接受
                long deliveryTag = envelope.getDeliveryTag();
                //消息内容
                String message = new String(body,"UTF-8");
                System.out.println("receive a message: " + message);
            }
        };

        //监听队列
        /**
         * 监听队列String queue, boolean autoAck,Consumer callback
         * 参数明细
         * 1、队列名称
         * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置
         为false则需要手动回复
         * 3、消费消息的方法,消费者接收到消息后调用此方法
         */
        channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer);
    }
}

消费者2:

public class Consumer03_routing_sms {
    //队列名
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";
    private static final String ROUTINGKEY_SMS= "inform_sms";
    public static void main(String[] args) throws IOException, TimeoutException {
    //消费者于MQ建立连接
        //通过建立连接工厂建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //配置连接信息
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);  //端口
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //设置虚拟机        实现多个虚拟MQ    每个虚拟机相当于一个独立的MQ
        connectionFactory.setVirtualHost("/");
        //建立新连接
        Connection connection = connectionFactory.newConnection();
        //创建会话通道
        Channel channel = connection.createChannel();

        //声明队列     如果队列在MQ中没有,则创建
        /** queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         param1:队列名称
         * param2:是否持久化    (如果重启后,队列还在)
         * param3:队列是否独占此连接-------队列只允许在该连接中访问,连接关闭队列删除
         * param4:队列不再使用时是否自动删除此队列
         * param5:队列参数--------队列拓展参数
         * */
        channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
        /**
         * 声明交换机
         * param1:交换机名称
         * param2:交换机类型
         *      fanout:对应的工作模式是:发布订阅publish/scribe
         *      direct:对应Routing工作模式
         *      topic:对应通配符工作模式
         *      headers:对应headers模式
         */
        channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
        //交换机和队列绑定String queue, String exchange, String routingKey
        /**
         * 参数明细
         * 1、队列名称
         * 2、交换机名称
         * 3、路由key     交换机根据路由KEY的值将消息转发到指定队列      在发布订阅工作模式时设置为空字符串
         */
        channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS);
        channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,"inform");   //对于一个通道绑定多个队列
        //实现消费方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {

            /**
             当接收到消息后,此方法被调用
             * @param consumerTag  消费者标签:用来标识消费者
             * @param envelope  信封:可以拿到信息
             * @param properties    消息属性
             * @param body  消息内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //super.handleDelivery(consumerTag, envelope, properties, body);
                //交换机
                String exchange = envelope.getExchange();
                //消息ID:在channel中标识消息ID,可用于消息被确认接受
                long deliveryTag = envelope.getDeliveryTag();
                //消息内容
                String message = new String(body,"UTF-8");
                System.out.println("receive a message: " + message);
            }
        };

        //监听队列
        /**
         * 监听队列String queue, boolean autoAck,Consumer callback
         * 参数明细
         * 1、队列名称
         * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置
         为false则需要手动回复
         * 3、消费消息的方法,消费者接收到消息后调用此方法
         */
        channel.basicConsume(QUEUE_INFORM_SMS, true, defaultConsumer);
    }
}

 

标签:connectionFactory,String,队列,RabbitMQ,INFORM,交换机,模式,channel,Routing
From: https://www.cnblogs.com/lksses/p/17416919.html

相关文章

  • 我写了本开源书:《3D编程模式》
    大家好,我写了本开源书,罗列了我从自己的实战项目中提炼出来的关于3D编程(主要包括“3D引擎/游戏引擎”、“编辑器”开发)的各种编程模式本书的在线阅读地址在这里:在线阅读本书的源码在Github中,欢迎star,感恩您:Github地址本书的写作花了我300多个小时,将近3个月的全职写作,凝结了我一......
  • 新高考模式志愿填报
    注意志愿之间梯度的问题,建议高、中、低搭配填报,较为合理,避免退档风险。注意志愿之间梯度的问题,建议高、中、低搭配填报,较为合理,避免退档风险。 4、新高考模式下,专业平行志愿填报有哪些参考策略?答:填报高考志愿方法很多,可根据不同情况先选院校再选专业,或者先选专业再选院校,只要......
  • rabbitmq:pika.exceptions.IncompatibleProtocolError: StreamLostError: ('Transport
    本地连接rabbitmq出现这个问题: 是因为我把port写成了15672,改成5672即可 ......
  • oracle 中的用户、表空间、数据模式光速入门
    oracle中没有limitROWNUM来处理的只能通过嵌套来处理SELECT*FROM(SELECTCOMP_LN.GIM_RENKOU.LASTUPTIMEFROMCOMP_LN.GIM_RENKOUORDERBYCOMP_LN.GIM_RENKOU.LASTUPTIMEDESC)WHEREROWNUM=1oracle首先连接的时候分为servicename和SID(SystemIdentifi......
  • 动态创建ACTIVITY模式
    还记得我们在代理Activity模式里谈到启动插件APK里的Activity的两个难题吗,由于插件里的Activity没在主项目的Manifest里面注册,所以无法经历系统Framework层级的一系列初始化过程,最终导致获得的Activity实例并没有生命周期和无法使用res资源。使用代理Activit......
  • 从零玩转设计模式之单例模式-danlimos
    title:从零玩转设计模式之单例模式date:2022-12-1212:41:03.604updated:2022-12-2315:35:29.0url:https://www.yby6.com/archives/danlimoscategories:-单例模式-设计模式tags:-Java模式-单例模式-设计模式前言单例设计模式是23种设计模式中最常用的设......
  • 从零玩转设计模式之建造者模式-jianzaozhemoshi
    title:从零玩转设计模式之建造者模式date:2022-12-0818:15:30.898updated:2022-12-2315:35:58.428url:https://www.yby6.com/archives/jianzaozhemoshicategories:-设计模式tags:-设计模式-建造者模式什么是建造者模式?建造者模式是一种软件设计模式,它用于......
  • 从零玩转设计模式之简单工厂设计模式-jiandangonchangmoshi
    title:从零玩转设计模式之简单工厂设计模式date:2022-12-0811:31:19.472updated:2022-12-1123:03:34.805url:https://www.yby6.com/archives/jiandangonchangmoshicategories:-设计模式tags:-设计模式简单工厂模式是一种创建型设计模式,用于创建单个对象.它主......
  • 从零玩转设计模式之工厂方法设计模式-gonchangfangfamoshi
    title:从零玩转设计模式之工厂方法设计模式date:2022-12-0813:22:13.669updated:2022-12-1123:03:22.379url:https://www.yby6.com/archives/gonchangfangfamoshicategories:-设计模式tags:-设计模式什么是工厂方法模式?“工厂方法模式”是对简单工厂模式的进......
  • 从零玩转设计模式之原型模式-yuanxingmoshi
    title:从零玩转设计模式之原型模式date:2022-12-1120:05:35.488updated:2022-12-2315:35:44.159url:https://www.yby6.com/archives/yuanxingmoshicategories:-设计模式tags:-设计模式-原型模式什么是原型模式设计模式?原型模式是一种软件设计模式,它允许您......