首页 > 其他分享 >RabbitMQ(三)

RabbitMQ(三)

时间:2025-01-15 18:28:01浏览次数:3  
标签:String xxx rabbitmq RabbitMQ import com channel

RabbitMQ中的各模式及其用法

工作队列模式

一、生产者代码

新建一个module,在module下创建属于自己的包,并且创建一个名为“work”的子包,以及工具类包“util”。结构如图所示:
在这里插入图片描述
在pom文件中添加图中所示依赖:

<dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.20.0</version>
        </dependency>
    </dependencies>

此时准备工作基本完成。

1、封装工具类

修改rabbitMQ地址,替换为自己的。

package com.xxx.rabbitmq.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @ClassName: ConnectionUtil
 * @Package: com.xxx.rabbitmq.util
 * @Author: 
 * @CreateDate: 
 * @Version: V1.0.0
 * @Description:
 */

public class ConnectionUtil {
    public static final String HOST_ADDRESS = "192.168.xxx.xxx";

    public static Connection getConnection() throws Exception {

        // 定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        // 设置服务地址
        factory.setHost(HOST_ADDRESS);

        // 端口
        factory.setPort(5672);

        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("123456");

        // 通过工程获取连接
        Connection connection = factory.newConnection();

        return connection;
    }



    public static void main(String[] args) throws Exception {

        Connection con = ConnectionUtil.getConnection();

        // amqp://guest@192.168.xxx.xxx:5672/
        System.out.println(con);

        con.close();

    }

}

2、编写代码

新建生产者类Producer:

package com.xxx.rabbitmq.work;

import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * @ClassName: Producer
 * @Package: com.xxx.rabbitmq.work
 * @Author: 
 * @CreateDate: 
 * @Version: V1.0.0
 * @Description:
 */

public class Producer {
    public static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        for (int i = 1; i <= 10; i++) {

            String body = i+"hello rabbitmq~~~";

            channel.basicPublish("",QUEUE_NAME,null,body.getBytes());

        }

        channel.close();

        connection.close();

    }
}

3、发送消息效果

在这里插入图片描述

二、消费者代码

1、编写代码

创建Consumer1和Consumer2。Consumer2只是类名和打印提示不同,代码完全一样。
Consumer1:

package com.xxx.rabbitmq.work;

import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName: Consumer1
 * @Package: com.xxx.rabbitmq.work
 * @Author: 
 * @CreateDate: 
 * @Version: V1.0.0
 * @Description:
 */

public class Consumer1 {
    static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        Consumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("Consumer1 body:"+new String(body));

            }

        };

        channel.basicConsume(QUEUE_NAME,true,consumer);

    }
}

Consumer2:

package com.xxx.rabbitmq.work;

import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName: Consumer2
 * @Package: com.xxx.rabbitmq.work
 * @Author: 
 * @CreateDate: 
 * @Version: V1.0.0
 * @Description:
 */

public class Consumer2 {
    static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        Consumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("Consumer2 body:"+new String(body));

            }

        };

        channel.basicConsume(QUEUE_NAME,true,consumer);

    }
}

** 注意:**
运行的时候先启动两个消费端程序,然后再启动生产者端程序。
如果已经运行过生产者程序,则手动把work_queue队列删掉。

2、运行效果

最终两个消费端程序竞争结果如下:
在这里插入图片描述
在这里插入图片描述
这样就完成了工作队列模式的演示。

发布订阅模式

一、生产者代码

还是在上面的module内,新建一个名为fanout的子包,在包内创建Producer类:

package com.xxx.rabbitmq.fanout;

import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * @ClassName: Producer
 * @Package: com.xxx.rabbitmq.fanout
 * @Author: 
 * @CreateDate: 
 * @Version: V1.0.0
 * @Description:
 */

public class Producer {
    public static void main(String[] args) throws Exception {

        // 1、获取连接
        Connection connection = ConnectionUtil.getConnection();

        // 2、创建频道
        Channel channel = connection.createChannel();

        // 参数1. exchange:交换机名称
        // 参数2. type:交换机类型
        //     DIRECT("direct"):定向
        //     FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。
        //     TOPIC("topic"):通配符的方式
        //     HEADERS("headers"):参数匹配
        // 参数3. durable:是否持久化
        // 参数4. autoDelete:自动删除
        // 参数5. internal:内部使用。一般false
        // 参数6. arguments:其它参数
        String exchangeName = "test_fanout";

        // 3、创建交换机
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);

        // 4、创建队列
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";

        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);

        // 5、绑定队列和交换机
        // 参数1. queue:队列名称
        // 参数2. exchange:交换机名称
        // 参数3. routingKey:路由键,绑定规则
        //     如果交换机的类型为fanout,routingKey设置为""
        channel.queueBind(queue1Name,exchangeName,"");
        channel.queueBind(queue2Name,exchangeName,"");

        String body = "日志信息:张三调用了findAll方法...日志级别:info...";

        // 6、发送消息
        channel.basicPublish(exchangeName,"",null,body.getBytes());

        // 7、释放资源
        channel.close();
        connection.close();

    }
}

二、消费者代码

1、消费者1号

package com.xxx.rabbitmq.fanout;

import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName: Consumer1
 * @Package: com.xxx.rabbitmq.fanout
 * @Author: 
 * @CreateDate: 
 * @Version: V1.0.0
 * @Description:
 */

public class Consumer1 {
    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        String queue1Name = "test_fanout_queue1";

        channel.queueDeclare(queue1Name,true,false,false,null);

        Consumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("body:"+new String(body));
                System.out.println("队列 1 消费者 1 将日志信息打印到控制台.....");

            }

        };

        channel.basicConsume(queue1Name,true,consumer);

    }
}

2、消费者2号

package com.xxx.rabbitmq.fanout;

import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName: Consumer2
 * @Package: com.xxx.rabbitmq.fanout
 * @Author: 
 * @CreateDate: 
 * @Version: V1.0.0
 * @Description:
 */

public class Consumer2 {
    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        String queue2Name = "test_fanout_queue2";

        channel.queueDeclare(queue2Name,true,false,false,null);

        Consumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("body:"+new String(body));
                System.out.println("队列 2 消费者 2 将日志信息打印到控制台.....");

            }

        };

        channel.basicConsume(queue2Name,true,consumer);

    }
}

三、运行效果

先启动消费者,然后再运行生产者程序发送消息:
在这里插入图片描述
在这里插入图片描述

四、小结

交换机和队列的绑定关系如下图所示:
在这里插入图片描述
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
发布订阅模式与工作队列模式的区别:

  • 工作队列模式本质上是绑定默认交换机
  • 发布订阅模式绑定指定交换机
  • 监听同一个队列的消费端程序彼此之间是竞争关系
  • 绑定同一个交换机的多个队列在发布订阅模式下,消息是广播的,每个队列都能接收到消息

路由模式

一、生产者代码

新建子包routing,并新建Producer类:

package com.xxx.rabbitmq.routing;

import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * @ClassName: Producer
 * @Package: com.xxx.rabbitmq.routing
 * @Author: 
 * @CreateDate: 
 * @Version: V1.0.0
 * @Description:
 */

public class Producer {
    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_direct";

        // 创建交换机
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);

        // 创建队列
        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";

        // 声明(创建)队列
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);

        // 队列绑定交换机
        // 队列1绑定error
        channel.queueBind(queue1Name,exchangeName,"error");

        // 队列2绑定info error warning
        channel.queueBind(queue2Name,exchangeName,"info");
        channel.queueBind(queue2Name,exchangeName,"error");
        channel.queueBind(queue2Name,exchangeName,"warning");

        String message = "日志信息:张三调用了delete方法.错误了,日志级别error";

        // 发送消息
        channel.basicPublish(exchangeName,"error",null,message.getBytes());
        System.out.println(message);

        // 释放资源
        channel.close();
        connection.close();

    }
}

二、消费者代码

1、消费者1号

package com.xxx.rabbitmq.routing;

import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName: Consumer1
 * @Package: com.xxx.rabbitmq.routing
 * @Author: 
 * @CreateDate: 
 * @Version: V1.0.0
 * @Description:
 */

public class Consumer1 {
    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        String queue1Name = "test_direct_queue1";

        channel.queueDeclare(queue1Name,true,false,false,null);

        Consumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("body:"+new String(body));
                System.out.println("Consumer1 将日志信息打印到控制台.....");

            }

        };

        channel.basicConsume(queue1Name,true,consumer);

    }
}

2、消费者2号

package com.xxx.rabbitmq.routing;

import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName: Consumer2
 * @Package: com.xxx.rabbitmq.routing
 * @Author: 
 * @CreateDate: 
 * @Version: V1.0.0
 * @Description:
 */

public class Consumer2 {
    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        String queue2Name = "test_direct_queue2";

        channel.queueDeclare(queue2Name,true,false,false,null);

        Consumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("body:"+new String(body));
                System.out.println("Consumer2 将日志信息存储到数据库.....");

            }

        };

        channel.basicConsume(queue2Name,true,consumer);

    }

}

三、运行结果

1、绑定关系

在这里插入图片描述

2、消费消息

在这里插入图片描述

主题模式

一、生产者代码

新建子包topic,新建生产者类Producer:

package com.xxx.rabbitmq.topic;

import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * @ClassName: Producer
 * @Package: com.xxx.rabbitmq.topic
 * @Author: 
 * @CreateDate: 
 * @Version: V1.0.0
 * @Description:
 */

public class Producer {
    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_topic";

        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);

        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";

        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);

        // 绑定队列和交换机
        // 参数1. queue:队列名称
        // 参数2. exchange:交换机名称
        // 参数3. routingKey:路由键,绑定规则
        //      如果交换机的类型为fanout ,routingKey设置为""
        // routing key 常用格式:系统的名称.日志的级别。
        // 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
        channel.queueBind(queue1Name,exchangeName,"#.error");
        channel.queueBind(queue1Name,exchangeName,"order.*");
        channel.queueBind(queue2Name,exchangeName,"*.*");

        // 分别发送消息到队列:order.info、goods.info、goods.error
        String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]";
        channel.basicPublish(exchangeName,"order.info",null,body.getBytes());

        body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]";
        channel.basicPublish(exchangeName,"goods.info",null,body.getBytes());

        body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]";
        channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());

        channel.close();
        connection.close();

    }
}

二、消费者代码

1、消费者1号

消费者1监听队列1:

package com.xxx.rabbitmq.topic;

import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName: Consumer1
 * @Package: com.xxx.rabbitmq.topic
 * @Author: 
 * @CreateDate: 
 * @Version: V1.0.0
 * @Description:
 */

public class Consumer1 {
    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        String QUEUE_NAME = "test_topic_queue1";

        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        Consumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("body:"+new String(body));

            }

        };

        channel.basicConsume(QUEUE_NAME,true,consumer);

    }
}

2、消费者2号

消费者2监听队列2:

package com.xxx.rabbitmq.topic;

import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName: Consumer2
 * @Package: com.xxx.rabbitmq.topic
 * @Author: 
 * @CreateDate: 
 * @Version: V1.0.0
 * @Description:
 */

public class Consumer2 {
    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        String QUEUE_NAME = "test_topic_queue2";

        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        Consumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("body:"+new String(body));

            }

        };

        channel.basicConsume(QUEUE_NAME,true,consumer);

    }
}

三、运行效果

队列1:
在这里插入图片描述
队列2:
在这里插入图片描述
至此,就完成了RabbitMQ各模式的使用演示。

总结

在选择使用什么模式时,需要对应业务需求,结合需求选择合适的模式。

标签:String,xxx,rabbitmq,RabbitMQ,import,com,channel
From: https://blog.csdn.net/qq_38633763/article/details/145157449

相关文章

  • 深度剖析RabbitMQ:从基础组件到管理页面详解
    文章目录一、简介二、Overview2.1Overview->Totals2.2Overview->Nodesbroker的属性2.3Overview->Churnstatistics2.4Overview->Portsandcontexts2.5Overview->Exportdefinitions2.6Overview->Importdefinitions三、Connections连接的属性四、Channels通道的......
  • 探秘 RabbitMQ 管理页面:关键板块与核心功能全解析
    文章目录一、简介二、Overview2.1Overview->Totals2.2Overview->Nodes2.3Overview->Churnstatistics2.4Overview->Portsandcontexts2.5Overview->Exportdefinitions2.6Overview->Importdefinitions三、Connections四、Channels五、Exchanges六、Queues七、Ad......
  • RabbitMQ-消息消费确认
    我们一般使用的是消费者作为被动方接收RabbitMQ推送消息,另一种是消费者作为主动方可以主动拉取消息。RabbitMq服务器推送消息分为隐式(自动)确认和显示确认。1消费者拉取消息消费者作为主动方拉取消息,每次只能获取一条。using(varchannel=connection.CreateModel()){......
  • RabbitMQ-消息入队
    1分布式异步的问题对于一个业务线的处理,如果是一个完整的处理,应该是消息正常进入队列,同时消息正常被消费掉。问题来了:生产者发送消息,在传输过程中,消息丢失了,咋办?消息发到RabbitMq队列,RabbitMq宕机了,咋办?消费者在消费消息的时候,消费异常了,咋办?方案思路1、要保证消息一定能......
  • RabbitMQ-死信队列
    死信,就是无法被消费的消息,一般来说生产者将消息投递到broker或者直接到队列里了,消费者从队列取出消息进行消费。但某些时候由于特定的原因导致队列中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有死信队列。死信队列还是队列---只是用来接受特......
  • RabbitMQ-集群
    RabbitMQ集群----主备关系,在运行的时候,如果非主要节点宕机,程序操作不受影响;如果主节点宕机了,程序会中断操作。而Rabbitmq集群,会马上让没有宕机的节点参选,选出新的主要节点。程序重试的时候,会进入到新的节点中执行。历史消息不受影响的。基于Docker构建RabbitMQ集群1.启动......
  • RabbitMQ-优先级队列及消息配置
    优先级队列C#数据类型queue----先进先出RabbitMQ---队列-----默认也是先进先出~~RabbitMQ设置优先级----可以配置让消费顺序,不按照先进先出的默认规则;给定的优先级---最终体现在消费者;优先级越高,消费的时候,就优先消费。就在前面消费案例:设置{"vip1","hello2","wor......
  • RabbitMQ 高可用方案:原理、构建与运维全解析
    文章目录前言:1集群方案的原理2RabbitMQ高可用集群相关概念2.1设计集群的目的2.2集群配置方式2.3节点类型3集群架构3.1为什么使用集群3.2集群的特点3.3集群异常处理3.4普通集群模式3.5镜像集群模式前言:在实际生产中,RabbitMQ常以集群方案部署。因选用它......
  • 接口项目uuid算法开发及验证-thinkphp6-rabbitmq
    一、uuid算法开发if(!function_exists('uuid')){/***生成uuid*User:龙哥·三年风水*Date:2024/6/7*Time:11:08*@paramstring$prefix*@returnstring*/functionuuid($prefix=''){$s=......
  • RabbitMQ高级篇之MQ可靠性 Lazy Queue
    文章目录数据持久化的背景和挑战引入惰性队列(LazyQueue)惰性队列的特点惰性队列的潜在问题RabbitMQ中的惰性队列实现如何创建惰性队列(LazyQueue)惰性队列的性能测试惰性队列的优势惰性队列的适用场景小结关键点总结数据持久化的背景和挑战持久化确保了即使Rabbit......