首页 > 其他分享 >RabbitMQ的部分模式

RabbitMQ的部分模式

时间:2024-03-29 16:32:19浏览次数:23  
标签:String com new 模式 RabbitMQ import qy172 部分 channel

1发布订阅模式

发送者

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
public class PublishProduct {
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 服务器的地址,我用的redis,RabbitMQ,现在redis里安装RabbitMQ
        factory.setHost("192.168.74.75");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            // 创建一个通道
            channel = connection.createChannel();
            //创建交换机
            channel.exchangeDeclare("qy172-fanout-exchange", BuiltinExchangeType.FANOUT, true);
            //创建队列,如果存在则不会创建
            channel.queueDeclare("qy172-publish-queue01", true, false, false, null);
            channel.queueDeclare("qy172-publish-queue02", true, false, false, null);
            //交互机和队列绑定
            channel.queueBind("qy172-publish-queue01", "qy172-fanout-exchange", "");
            channel.queueBind("qy172-publish-queue02", "qy172-fanout-exchange", "");
            // 创建消息内容
            HashMap<String, Object> map = new HashMap<>();
            map.put("name", "张三");
            map.put("age", "22");
//把数据给交换机,让他分发给队列
            channel.basicPublish("qy172-fanout-exchange", "", null, JSON.toJSONBytes(map));
            System.out.println("发送成功");
        } catch (IOException e) {
            // 发生 IO 异常时抛出运行时异常
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            // 发生超时异常时抛出运行时异常
            throw new RuntimeException(e);
        } finally {
            if (channel != null) {
                try {
                    // 关闭通道
                    channel.close();
                } catch (IOException | TimeoutException e) {
                    // 发生 IO 或超时异常时抛出运行时异常
                    throw new RuntimeException(e);
                }
            }
            if (connection != null) {
                try {
                    // 关闭连接
                    connection.close();
                } catch (IOException e) {
                    // 发生 IO 异常时抛出运行时异常
                    throw new RuntimeException(e);
                }
            }
        }
    }
}

2订阅个订阅者

订阅者1

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
public class Consumer01 {
    public static void main(String[] args) throws Exception {
        // 创建连接工厂对象
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"
        factory.setHost("192.168.74.75");
        Connection connection = factory.newConnection();
        // 创建一个 RabbitMQ 连接
        Channel channel = connection.createChannel();
        // 创建一个通道,用于与 RabbitMQ 之间的通信
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            // 创建一个消费者对象,并重写其方法
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
               // 消费消息的处理方法
               String json = new String(body);
               // 将消息内容转换为字符串
               Map map = JSON.parseObject(json, Map.class);
               // 使用 JSON 解析成 Map 对象
               System.out.println("消息内容Consumer01"+map);
               // 输出消息内容
           }
       };
        channel.basicConsume("qy172-publish-queue01",true,consumer);
    }
}

订阅者2

package com.aaa;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Consumer02 {
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.74.75");
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String json = new String(body);
                    Map map = JSON.parseObject(json, Map.class);
                    System.out.println("消息内容Consumer02" + map);
                }
            };
            //订阅者2
            channel.basicConsume("qy172-publish-queue02",true,consumer);
        } catch (IOException | TimeoutException e) {
            // 处理连接、通道创建或消费消息时可能抛出的异常
            e.printStackTrace();
        }
    }
}

2路由模式

发送者

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
public class PublishProduct {
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 服务器的地址,我用的redis,RabbitMQ,现在redis里安装RabbitMQ
        factory.setHost("192.168.74.75");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            // 创建一个通道
            channel = connection.createChannel();
            //创建交换机,
            channel.exchangeDeclare("qy172-router-exchange", BuiltinExchangeType.DIRECT, true);
            //创建队列,如果存在则不会创建
            channel.queueDeclare("qy172-router-queue01", true, false, false, null);
            channel.queueDeclare("qy172-router-queue02", true, false, false, null);
            //交互机和队列绑定
            channel.queueBind("qy172-router-queue01", "qy172-router-exchange", "error");
            channel.queueBind("qy172-router-queue02", "qy172-router-exchange", "error");
            channel.queueBind("qy172-router-queue02", "qy172-router-exchange", "info");
            channel.queueBind("qy172-router-queue02", "qy172-router-exchange", "warning");
            // 创建消息内容
            HashMap<String, Object> map = new HashMap<>();
            map.put("name", "张三");
            map.put("age", "22");
            //把数据给交换机,让他分发给队列
            channel.basicPublish("qy172-router-exchange","error",null,JSON.toJSONBytes(map));
//            channel.basicPublish("qy172-router-exchange","info",null,JSON.toJSONBytes(map));
            System.out.println("发送成功");
        } catch (IOException e) {
            // 发生 IO 异常时抛出运行时异常
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            // 发生超时异常时抛出运行时异常
            throw new RuntimeException(e);
        } finally {
            if (channel != null) {
                try {
                    // 关闭通道
                    channel.close();
                } catch (IOException | TimeoutException e) {
                    // 发生 IO 或超时异常时抛出运行时异常
                    throw new RuntimeException(e);
                }
            }
            if (connection != null) {
                try {
                    // 关闭连接
                    connection.close();
                } catch (IOException e) {
                    // 发生 IO 异常时抛出运行时异常
                    throw new RuntimeException(e);
                }
            }
        }
    }
}

接收者1

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
public class Consumer01 {
    public static void main(String[] args) throws Exception {
        // 创建连接工厂对象
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"
        factory.setHost("192.168.74.75");
        Connection connection = factory.newConnection();
        // 创建一个 RabbitMQ 连接
        Channel channel = connection.createChannel();
        // 创建一个通道,用于与 RabbitMQ 之间的通信
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            // 创建一个消费者对象,并重写其方法
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
               // 消费消息的处理方法
               String json = new String(body);
               // 将消息内容转换为字符串
               Map map = JSON.parseObject(json, Map.class);
               // 使用 JSON 解析成 Map 对象
               System.out.println("消息内容Consumer01"+map);
               // 输出消息内容
           }
       };
        channel.basicConsume("qy172-router-queue01",true,consumer);
    }
}

接收者2

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
public class Consumer01 {
    public static void main(String[] args) throws Exception {
        // 创建连接工厂对象
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"
        factory.setHost("192.168.74.75");
        Connection connection = factory.newConnection();
        // 创建一个 RabbitMQ 连接
        Channel channel = connection.createChannel();
        // 创建一个通道,用于与 RabbitMQ 之间的通信
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            // 创建一个消费者对象,并重写其方法
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
               // 消费消息的处理方法
               String json = new String(body);
               // 将消息内容转换为字符串
               Map map = JSON.parseObject(json, Map.class);
               // 使用 JSON 解析成 Map 对象
               System.out.println("消息内容Consumer01"+map);
               // 输出消息内容
           }
       };
        channel.basicConsume("qy172-router-queue01",true,consumer);
    }
}

3主题模式

发送者

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
public class PublishProduct {
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 服务器的地址,我用的redis,RabbitMQ,现在redis里安装RabbitMQ
        factory.setHost("192.168.74.75");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            // 创建一个通道
            channel = connection.createChannel();
            //创建交换机,
            channel.exchangeDeclare("qy172-topic-exchange", BuiltinExchangeType.TOPIC, true);
            //创建队列,如果存在则不会创建
            channel.queueDeclare("qy172-topic-queue01", true, false, false, null);
            channel.queueDeclare("qy172-topic-queue02", true, false, false, null);
            //交互机和队列绑定
            //主题匹配给这个
            channel.queueBind("qy172-topic-queue01", "qy172-topic-exchange", "*.orange.*");
            //主题,也匹配给这个
            channel.queueBind("qy172-topic-queue02", "qy172-topic-exchange", "*.*.rabbit");
            channel.queueBind("qy172-topic-queue02", "qy172-topic-exchange", "lazy.#");
            // 创建消息内容
            HashMap<String, Object> map = new HashMap<>();
            map.put("name", "张三");
            map.put("age", "22");
            //把数据给交换机,让他分发给队列
            channel.basicPublish("qy172-topic-exchange","lazy.orange.rabbit",null,JSON.toJSONBytes(map));
            System.out.println("发送成功");
        } catch (IOException e) {
            // 发生 IO 异常时抛出运行时异常
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            // 发生超时异常时抛出运行时异常
            throw new RuntimeException(e);
        } finally {
            if (channel != null) {
                try {
                    // 关闭通道
                    channel.close();
                } catch (IOException | TimeoutException e) {
                    // 发生 IO 或超时异常时抛出运行时异常
                    throw new RuntimeException(e);
                }
            }
            if (connection != null) {
                try {
                    // 关闭连接
                    connection.close();
                } catch (IOException e) {
                    // 发生 IO 异常时抛出运行时异常
                    throw new RuntimeException(e);
                }
            }
        }
    }
}

接收者1

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
public class Consumer01 {
    public static void main(String[] args) throws Exception {
        // 创建连接工厂对象
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"
        factory.setHost("192.168.74.75");
        Connection connection = factory.newConnection();
        // 创建一个 RabbitMQ 连接
        Channel channel = connection.createChannel();
        // 创建一个通道,用于与 RabbitMQ 之间的通信
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            // 创建一个消费者对象,并重写其方法
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
               // 消费消息的处理方法
               String json = new String(body);
               // 将消息内容转换为字符串
               Map map = JSON.parseObject(json, Map.class);
               // 使用 JSON 解析成 Map 对象
               System.out.println("消息内容Consumer01"+map);
               // 输出消息内容
           }
       };
        channel.basicConsume("qy172-topic-queue01",true,consumer);
    }
}

接收者2

package com.aaa;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Consumer02 {
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.74.75");
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String json = new String(body);
                    Map map = JSON.parseObject(json, Map.class);
                    System.out.println("消息内容Consumer02" + map);
                }
            };
            //订阅者2
            channel.basicConsume("qy172-topic-queue02",true,consumer);
        } catch (IOException | TimeoutException e) {
            // 处理连接、通道创建或消费消息时可能抛出的异常
            e.printStackTrace();
        }
    }
}

标签:String,com,new,模式,RabbitMQ,import,qy172,部分,channel
From: https://blog.csdn.net/CSDNlele666/article/details/137148199

相关文章

  • 歌词跟随播放进度高亮显示,高亮部分一直在页面中间位置,歌词可拖动
    //歌词组件LyricsDisplay.vue<template><divclass="lyric-container"ref="lyricsList":class="{noLyric:!lyrics.length}"><divv-for="(line,index)inlyrics":key="index"ref="lyric......
  • greenplum-centOs7环境-组模式扩容
    1.扩容说明GreenPlum6.X目前支持以下版本操作系统:RedHatEnterpriseLinux64-bit7.xRedHatEnterpriseLinux64-bit6.xCentOS64-bit7.xCentOS64-bit6.xsUbuntu18.04LTS建议采用7.3以上的7系列版本本次扩容使用两个segment的节点.每个节点配置为2核心2G......
  • 《责任链模式(极简c++)》
            本文章属于专栏-概述-《设计模式(极简c++版)》-CSDN博客模式说明方案:责任链模式将请求的发送者和接收者解耦,构成一个链条,并由多个对象对请求进行处理,直到找到合适的处理者为止。优点:实现了请求发送者和接收者的解耦,灵活性高,易于扩展,每个处理者只需关注自......
  • 《代理模式(极简c++)》
            本文章属于专栏-概述-《设计模式(极简c++版)》-CSDN博客模式说明方案:代理模式充当了客户端和实际对象之间的中介,通过引入代理对象来控制对原始对象的访问。优点:通过代理,可以实现对目标对象的控制,提供更多的功能,例如延迟加载、访问控制、日志记录等。缺点......
  • 基于任务的异步模式和基于时间/回调的异步模式
    问题场景描述webapi:需要向另一个服务器发送http请求,等待服务器的回调结果,若指定时间内比如10分钟没有收到回调则返回失败,否则处理回调返回。典型的基于时间/回调的异步模式,和经常使用的await模式不同,await是基于任务的异步模式,任务完成返回。而前面这种应用场景依赖回调处理......
  • 关于工厂模式的思考
    常说的几种工厂模式就是如下几种:简单工厂模式工厂方法模式抽象工厂模式PreWriting标题开玩笑,我可能喜欢开玩笑,大家都乐呵……其中,在23中设计模式中,简单工厂并不在其列,但是常常提起是因为它的思想也是“解耦”,并不能因为它不在23种设计模式中就不算它是设计模式,这种说法在......
  • kube-proxy模式 iptables和ipvs对比
    kube-proxy的ipvs模式和iptables模式在Kubernetes集群中各有优劣,主要体现在性能、功能和支持的协议方面。1.性能:IPVS模式:由于IPVS是专门为负载均衡设计的,它在性能方面通常优于iptables。IPVS使用基于哈希的负载均衡算法,能够快速处理大量的并发连接,其连接处理的名义计算复杂度......
  • 关于RCE的学习(2)nssctf部分题目实操(1)
           结果对一些基础命令的学习,我发现一个非常显而易见的问题,那就是,理论知识如果没有建立在实操的基础上来讲的话,那么理论知识就会显得非常空,那么结合我上一篇对rce学习的文章,我对一下题目做出如下总结。       首先是,[SWPUCTF2021新生赛]easyrce   ......
  • MVC模式与三层架构
    目录1、MVC模式2、三层架构3、MVC+三层架构4、分层后的处理请求与响应流程5、注意事项6、基于MVC模式的框架7、基于三层架构的框架MVC模式和三层架构在Web中经常一起使用,可以实现业务逻辑、数据访问和用户界面的分离,提高代码的可维护性和可扩展性。1、MVC模式M:Model......
  • 设计模式DP-外观模式
    #include<stdio.h>#include<string.h>#include<stdlib.h>//定义子系统AtypedefstructsubsystemA{ void(*operationA)(structsubsystemA*subsystem);}SubsystemA;//定义子系统BtypedefstructsubsystemB{ void(*operationB)(structsubsystem......