首页 > 其他分享 >rabbitmq-1

rabbitmq-1

时间:2024-12-30 19:08:41浏览次数:1  
标签:String System rabbitmq delivery println channel out

1.消息队列

点到点模式

发布订阅模式

2.rabbitMQ简介

3.AMQP协议


4.工作原理图及核心概念
原理图

核心概念
Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
Connection:publisher/consumer 和 broker 之间的 TCP 连接
Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
Queue:消息最终被送到这里等待 consumer 取走
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
5.单机版下载搭建
准备工作
克隆虚拟机,配置IP和host,使用xshell链接
安装配置

1,安装erlang:
rpm -ivh  /opt/software/erlang-23.2.1-1.el7.x86_64.rpm 
   #rpm =redhat package  manager 小红帽包管理程序,业界常用linux系统的  软件管理,支持各种 rpm软件的安装,查询,升级,卸载等。
    -i   install 安装
    -v  verbose  显示详情
    -h   hash  以###的方式显示进度
   查询:
  rpm -qa |grep erlang
   卸载:
  rpm -e  erlang-23.2.1-1.el7.x86_64  (上面查询结果)
2,安装依赖:
   socat是一个多功能的网络工具,名字来由是Socket CAT,socat支持多协议,用于协议处理,端口转发,rabbitmq依赖于socat,因此在安装rabbitmq前要安装socat。
   yum install socat  -y 
3,安装rabbitmq:
rpm -ivh  /opt/software/rabbitmq-server-3.8.30-1.el7.noarch.rpm
4,查看服务状态
systemctl status  rabbitmq-server
或者:
service rabbitmq-server status

6.界面及用户权限管理

启动插件,让UI功能可以使用:
  rabbitmq-plugins   enable  rabbitmq_management
启动rabbitmq服务,使用web页面访问:
  systemctl start  rabbitmq-server
  http://192.168.xxx.30:15672/
创建virtaul host :
rabbitmqctl  add_vhost  /root/ 
rabbitmqctl  add_vhost  /scott/
     查询:
     rabbitmqctl  list_vhosts
     删除:
     rabbitmqctl  delete_vhost   /scott/
创建用户(root用户名  tiger 密码):
     rabbitmqctl  add_user   root  tiger
     rabbitmqctl  add_user   scott  123456
设置角色:
    rabbitmqctl  set_user_tags  root    administrator                                 
    rabbitmqctl  set_user_tags  scott   administrator
    
设置权限:
rabbitmqctl  set_permissions  -p  "/root/"  root ".*"  ".*"  ".*"
rabbitmqctl  set_permissions  -p  "/scott/"  scott ".*"  ".*"  ".*" 
  #设置用户权限(设置/根路径下的 配置  读  写权限)
重复上面操作,创建多个VH和用户。

登录:
http://192.168.xxx.30:15672/
使用正确的用户名和密码,登录后,可以看到首页

5种工作模式
https://www.rabbitmq.com/tutorials

准备工作

创建项目,引入jar包
https://mvnrepository.com/search?q=rabbitmq

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.8.0</version>
</dependency>

1.简单模式
简介
实现简单的消息发送和接受
生产者

package com.aaa.rabbitmq.demo1;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @FileName: Producer
 * @Description:
 * @Author: zhz
 * @CreateTime: 2024/12/10 9:02
 * @Version: 1.0.0
 */
public class Producer {

    //定义常量是不能更改
    public static final  String QUEUE_NAME = "queue1";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //实例化ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设置创建工厂的参数
            //链接主机IP
            connectionFactory.setHost("192.168.170.30");
            //设置vhost
            connectionFactory.setVirtualHost("/root/");
            //设置用户和密码
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("tiger");
            //设置端口号
            connectionFactory.setPort(5672);
            //使用链接工厂创建连接
            connection = connectionFactory.newConnection();
            //使用链接创建channel
            channel = connection.createChannel();
            //使用channel定义队列
            //String queue  队列名称,
            // boolean durable 是否持久化(是否存储到磁盘中),
            // boolean exclusive 是否被多个消费者消费  false 允许多个消费者,
            // boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
            // Map<String, Object> arguments  定义其他参数
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //定义发送的消息
            String  sendMsg = "hello qy1781!!!";
            //发送消息
            //String exchange 交换机名称, String routingKey 路由键, AMQP.BasicProperties props 发送时传递其他参数, byte[] body  消息的字节数组
            //交换机名称为空串“”, 不是不使用交换机,而是使用默认交换机
            channel.basicPublish("",QUEUE_NAME,null,sendMsg.getBytes());
            //打印提示
            System.out.println("发送完毕!");
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                //关闭资源
                if(channel!=null){
                    channel.close();
                }
                if(connection!=null){
                    connection.close();
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (TimeoutException e) {
                throw new RuntimeException(e);
            }

        }



    }
}

消费者

package com.aaa.rabbitmq.demo1;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @FileName: Consumer
 * @Description:
 * @Author: zhz
 * @CreateTime: 2024/12/10 9:36
 * @Version: 1.0.0
 */
public class Consumer {
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //实例化ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设置创建工厂的参数
            //链接主机IP
            connectionFactory.setHost("192.168.170.30");
            //设置vhost
            connectionFactory.setVirtualHost("/root/");
            //设置用户和密码
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("tiger");
            //设置端口号
            connectionFactory.setPort(5672);
            //使用链接工厂创建连接
            connection = connectionFactory.newConnection();
            //使用链接创建channel
            channel = connection.createChannel();
            //使用channel定义队列
            //String queue  队列名称,
            // boolean durable 是否持久化(是否存储到磁盘中),
            // boolean exclusive 是否被多个消费者消费  false 允许多个消费者,
            // boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
            // Map<String, Object> arguments  定义其他参数
            channel.queueDeclare(Producer.QUEUE_NAME,false,false,false,null);
            //使用channel获取消息  amq.ctag-E0c-YqMVGbLBAm9ZtT2SfA
            //String queue 队列名称,
            // boolean autoAck 自动确定  消费者一旦拿到消息,自动确认拿到了,
            // DeliverCallback deliverCallback 交付回调,
            // CancelCallback cancelCallback   取消回调
          /*  channel.basicConsume(Producer.QUEUE_NAME,(String s, Delivery delivery)->{
                System.out.println("回调处理信息:"+s);
                //delivery交付对象,存储消息的所有信息  delivery.getBody()消息的字节数组
                System.out.println("消费的消息为:"+new String(delivery.getBody()));
                //消息的其他信息:
                Envelope envelope = delivery.getEnvelope();
                String routingKey = envelope.getRoutingKey();
                String exchange = envelope.getExchange();
                long deliveryTag = envelope.getDeliveryTag();
                System.out.println("路由键routingKey:"+routingKey);
                System.out.println("交换机exchange:"+exchange);
                System.out.println("消息唯一的数字标识:"+deliveryTag);
            },(String s)->{
                System.out.println("回调处理信息:"+s);});*/
             //使用内部类,实例化接口
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("回调处理信息:"+s);
                    //delivery交付对象,存储消息的所有信息  delivery.getBody()消息的字节数组
                    System.out.println("消费的消息为:"+new String(delivery.getBody()));
                    //消息的其他信息:
                    Envelope envelope = delivery.getEnvelope();
                    String routingKey = envelope.getRoutingKey();
                    String exchange = envelope.getExchange();
                    long deliveryTag = envelope.getDeliveryTag();
                    System.out.println("路由键routingKey:"+routingKey);
                    System.out.println("交换机exchange:"+exchange);
                    System.out.println("消息唯一的数字标识:"+deliveryTag);
                }
            };
            CancelCallback cancelCallback = new CancelCallback(){
                @Override
                public void handle(String s) throws IOException {
                    System.out.println("回调处理信息:"+s);
                }
            };
            //消费者消费消息不是一次性的,所以消费者不能关闭资源
            channel.basicConsume(Producer.QUEUE_NAME,true, deliverCallback,cancelCallback);

        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        } finally {
            /*try {
                //关闭资源
                if(channel!=null){
                    channel.close();
                }
                if(connection!=null){
                    connection.close();
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (TimeoutException e) {
                throw new RuntimeException(e);
            }*/

        }
    }
}

工作队列
简介
创建一个工作队列,用于在多个消费者之间分配(竞争轮询)耗时的任务。从而提高整个任务执行效率。

package com.aaa.rabbitmq.demo2;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @FileName: Producer
 * @Description:  work queue   当有多个耗时任务需要执行,生产者发布任务到队列,多个消费者竞争轮询方式,执行任务,提高效率
 * @Author: zhz
 * @CreateTime: 2024/12/10 9:02
 * @Version: 1.0.0
 */
public class Producer {

    //定义常量是不能更改
    public static final  String QUEUE_NAME = "queue2";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //实例化ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设置创建工厂的参数
            //链接主机IP
            connectionFactory.setHost("192.168.170.30");
            //设置vhost
            connectionFactory.setVirtualHost("/root/");
            //设置用户和密码
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("tiger");
            //设置端口号
            connectionFactory.setPort(5672);
            //使用链接工厂创建连接
            connection = connectionFactory.newConnection();
            //使用链接创建channel
            channel = connection.createChannel();
            //使用channel定义队列
            //String queue  队列名称,
            // boolean durable 是否持久化(是否存储到磁盘中),
            // boolean exclusive 是否被多个消费者消费  false 允许多个消费者,
            // boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
            // Map<String, Object> arguments  定义其他参数
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            for (int i = 1; i <= 10; i++) {
                //定义发送的消息
                String  sendMsg = "hello qy178!!!"+i;
                //发送消息
                //String exchange 交换机名称, String routingKey 路由键, AMQP.BasicProperties props 发送时传递其他参数, byte[] body  消息的字节数组
                //交换机名称为空串“”, 不是不使用交换机,而是使用默认交换机
                channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,sendMsg.getBytes());

            }
            //打印提示
            System.out.println("发送完毕!");
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                //关闭资源
                if(channel!=null){
                    channel.close();
                }
                if(connection!=null){
                    connection.close();
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (TimeoutException e) {
                throw new RuntimeException(e);
            }

        }



    }
}

消费者1

package com.aaa.rabbitmq.demo2;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @FileName: Consumer
 * @Description:
 * @Author: zhz
 * @CreateTime: 2024/12/10 9:36
 * @Version: 1.0.0
 */
public class ConsumerA {
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //实例化ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设置创建工厂的参数
            //链接主机IP
            connectionFactory.setHost("192.168.170.30");
            //设置vhost
            connectionFactory.setVirtualHost("/root/");
            //设置用户和密码
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("tiger");
            //设置端口号
            connectionFactory.setPort(5672);
            //使用链接工厂创建连接
            connection = connectionFactory.newConnection();
            //使用链接创建channel
            channel = connection.createChannel();
            //使用channel定义队列
            //String queue  队列名称,
            // boolean durable 是否持久化(是否存储到磁盘中),
            // boolean exclusive 是否被多个消费者消费  false 允许多个消费者,
            // boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
            // Map<String, Object> arguments  定义其他参数
            channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null);
            //使用channel获取消息  amq.ctag-E0c-YqMVGbLBAm9ZtT2SfA
            //String queue 队列名称,
            // boolean autoAck 自动确定  消费者一旦拿到消息,自动确认拿到了,
            // DeliverCallback deliverCallback 交付回调,
            // CancelCallback cancelCallback   取消回调
          /*  channel.basicConsume(Producer.QUEUE_NAME,(String s, Delivery delivery)->{
                System.out.println("回调处理信息:"+s);
                //delivery交付对象,存储消息的所有信息  delivery.getBody()消息的字节数组
                System.out.println("消费的消息为:"+new String(delivery.getBody()));
                //消息的其他信息:
                Envelope envelope = delivery.getEnvelope();
                String routingKey = envelope.getRoutingKey();
                String exchange = envelope.getExchange();
                long deliveryTag = envelope.getDeliveryTag();
                System.out.println("路由键routingKey:"+routingKey);
                System.out.println("交换机exchange:"+exchange);
                System.out.println("消息唯一的数字标识:"+deliveryTag);
            },(String s)->{
                System.out.println("回调处理信息:"+s);});*/
             //使用内部类,实例化接口
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("回调处理信息:"+s);
                    //delivery交付对象,存储消息的所有信息  delivery.getBody()消息的字节数组
                    System.out.println("消费的消息为:"+new String(delivery.getBody()));
                    //消息的其他信息:
                    Envelope envelope = delivery.getEnvelope();
                    String routingKey = envelope.getRoutingKey();
                    String exchange = envelope.getExchange();
                    long deliveryTag = envelope.getDeliveryTag();
                    System.out.println("路由键routingKey:"+routingKey);
                    System.out.println("交换机exchange:"+exchange);
                    System.out.println("消息唯一的数字标识:"+deliveryTag);
                }
            };
            CancelCallback cancelCallback = new CancelCallback(){
                @Override
                public void handle(String s) throws IOException {
                    System.out.println("回调处理信息:"+s);
                }
            };
            //消费者消费消息不是一次性的,所以消费者不能关闭资源
            channel.basicConsume(Producer.QUEUE_NAME,true, deliverCallback,cancelCallback);

        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        } finally {
            /*try {
                //关闭资源
                if(channel!=null){
                    channel.close();
                }
                if(connection!=null){
                    connection.close();
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (TimeoutException e) {
                throw new RuntimeException(e);
            }*/

        }
    }
}

消费者2

package com.aaa.rabbitmq.demo2;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @FileName: Consumer
 * @Description:
 * @Author: zhz
 * @CreateTime: 2024/12/10 9:36
 * @Version: 1.0.0
 */
public class ConsumerB {
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //实例化ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设置创建工厂的参数
            //链接主机IP
            connectionFactory.setHost("192.168.170.30");
            //设置vhost
            connectionFactory.setVirtualHost("/root/");
            //设置用户和密码
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("tiger");
            //设置端口号
            connectionFactory.setPort(5672);
            //使用链接工厂创建连接
            connection = connectionFactory.newConnection();
            //使用链接创建channel
            channel = connection.createChannel();
            //使用channel定义队列
            //String queue  队列名称,
            // boolean durable 是否持久化(是否存储到磁盘中),
            // boolean exclusive 是否被多个消费者消费  false 允许多个消费者,
            // boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
            // Map<String, Object> arguments  定义其他参数
            channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null);
            //使用channel获取消息  amq.ctag-E0c-YqMVGbLBAm9ZtT2SfA
            //String queue 队列名称,
            // boolean autoAck 自动确定  消费者一旦拿到消息,自动确认拿到了,
            // DeliverCallback deliverCallback 交付回调,
            // CancelCallback cancelCallback   取消回调
          /*  channel.basicConsume(Producer.QUEUE_NAME,(String s, Delivery delivery)->{
                System.out.println("回调处理信息:"+s);
                //delivery交付对象,存储消息的所有信息  delivery.getBody()消息的字节数组
                System.out.println("消费的消息为:"+new String(delivery.getBody()));
                //消息的其他信息:
                Envelope envelope = delivery.getEnvelope();
                String routingKey = envelope.getRoutingKey();
                String exchange = envelope.getExchange();
                long deliveryTag = envelope.getDeliveryTag();
                System.out.println("路由键routingKey:"+routingKey);
                System.out.println("交换机exchange:"+exchange);
                System.out.println("消息唯一的数字标识:"+deliveryTag);
            },(String s)->{
                System.out.println("回调处理信息:"+s);});*/
             //使用内部类,实例化接口
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("回调处理信息:"+s);
                    //delivery交付对象,存储消息的所有信息  delivery.getBody()消息的字节数组
                    System.out.println("消费的消息为:"+new String(delivery.getBody()));
                    //消息的其他信息:
                    Envelope envelope = delivery.getEnvelope();
                    String routingKey = envelope.getRoutingKey();
                    String exchange = envelope.getExchange();
                    long deliveryTag = envelope.getDeliveryTag();
                    System.out.println("路由键routingKey:"+routingKey);
                    System.out.println("交换机exchange:"+exchange);
                    System.out.println("消息唯一的数字标识:"+deliveryTag);
                }
            };
            CancelCallback cancelCallback = new CancelCallback(){
                @Override
                public void handle(String s) throws IOException {
                    System.out.println("回调处理信息:"+s);
                }
            };
            //消费者消费消息不是一次性的,所以消费者不能关闭资源
            channel.basicConsume(Producer.QUEUE_NAME,true, deliverCallback,cancelCallback);

        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        } finally {
            /*try {
                //关闭资源
                if(channel!=null){
                    channel.close();
                }
                if(connection!=null){
                    connection.close();
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (TimeoutException e) {
                throw new RuntimeException(e);
            }*/

        }
    }
}

消费者3

package com.aaa.rabbitmq.demo2;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @FileName: Consumer
 * @Description:
 * @Author: zhz
 * @CreateTime: 2024/12/10 9:36
 * @Version: 1.0.0
 */
public class ConsumerC {
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //实例化ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设置创建工厂的参数
            //链接主机IP
            connectionFactory.setHost("192.168.170.30");
            //设置vhost
            connectionFactory.setVirtualHost("/root/");
            //设置用户和密码
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("tiger");
            //设置端口号
            connectionFactory.setPort(5672);
            //使用链接工厂创建连接
            connection = connectionFactory.newConnection();
            //使用链接创建channel
            channel = connection.createChannel();
            //使用channel定义队列
            //String queue  队列名称,
            // boolean durable 是否持久化(是否存储到磁盘中),
            // boolean exclusive 是否被多个消费者消费  false 允许多个消费者,
            // boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
            // Map<String, Object> arguments  定义其他参数
            channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null);
            //使用channel获取消息  amq.ctag-E0c-YqMVGbLBAm9ZtT2SfA
            //String queue 队列名称,
            // boolean autoAck 自动确定  消费者一旦拿到消息,自动确认拿到了,
            // DeliverCallback deliverCallback 交付回调,
            // CancelCallback cancelCallback   取消回调
          /*  channel.basicConsume(Producer.QUEUE_NAME,(String s, Delivery delivery)->{
                System.out.println("回调处理信息:"+s);
                //delivery交付对象,存储消息的所有信息  delivery.getBody()消息的字节数组
                System.out.println("消费的消息为:"+new String(delivery.getBody()));
                //消息的其他信息:
                Envelope envelope = delivery.getEnvelope();
                String routingKey = envelope.getRoutingKey();
                String exchange = envelope.getExchange();
                long deliveryTag = envelope.getDeliveryTag();
                System.out.println("路由键routingKey:"+routingKey);
                System.out.println("交换机exchange:"+exchange);
                System.out.println("消息唯一的数字标识:"+deliveryTag);
            },(String s)->{
                System.out.println("回调处理信息:"+s);});*/
             //使用内部类,实例化接口
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("回调处理信息:"+s);
                    //delivery交付对象,存储消息的所有信息  delivery.getBody()消息的字节数组
                    System.out.println("消费的消息为:"+new String(delivery.getBody()));
                    //消息的其他信息:
                    Envelope envelope = delivery.getEnvelope();
                    String routingKey = envelope.getRoutingKey();
                    String exchange = envelope.getExchange();
                    long deliveryTag = envelope.getDeliveryTag();
                    System.out.println("路由键routingKey:"+routingKey);
                    System.out.println("交换机exchange:"+exchange);
                    System.out.println("消息唯一的数字标识:"+deliveryTag);
                }
            };
            CancelCallback cancelCallback = new CancelCallback(){
                @Override
                public void handle(String s) throws IOException {
                    System.out.println("回调处理信息:"+s);
                }
            };
            //消费者消费消息不是一次性的,所以消费者不能关闭资源
            channel.basicConsume(Producer.QUEUE_NAME,true, deliverCallback,cancelCallback);

        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        } finally {
            /*try {
                //关闭资源
                if(channel!=null){
                    channel.close();
                }
                if(connection!=null){
                    connection.close();
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (TimeoutException e) {
                throw new RuntimeException(e);
            }*/

        }
    }
}

发布订阅
简介
直接(direct)模式:添加一个功能 - 我们将使其可以仅订阅消息的子集。当前演示的是Exchange类型为direct使用。直接direct绑定比扇出fanout更加灵活。

需求:6种日志级别(trace,debug,info,warn,error,fatal),将能够仅将严重错误(error)消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
生产者

package com.aaa.rabbitmq.demo3;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @FileName: Producer
 * @Description:  消息发布订阅模式  exchange 常用类型的fanout  同一个消息被多个消费者同时消费,相同消息处理方式不一样
 * @Author: zhz
 * @CreateTime: 2024/12/10 9:02
 * @Version: 1.0.0
 */
public class Producer {

    //定义常量是不能更改
    public static final  String QUEUE_NAME_3 = "queue3";
    public static final  String QUEUE_NAME_4 = "queue4";
    public static final  String QUEUE_NAME_5 = "queue5";
    private static  final  String EXCHANGE_NAME="exchange_fanout";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //实例化ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设置创建工厂的参数
            //链接主机IP
            connectionFactory.setHost("192.168.170.30");
            //设置vhost
            connectionFactory.setVirtualHost("/root/");
            //设置用户和密码
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("tiger");
            //设置端口号
            connectionFactory.setPort(5672);
            //使用链接工厂创建连接
            connection = connectionFactory.newConnection();
            //使用链接创建channel
            channel = connection.createChannel();
            //使用channel定义交换机
            //String exchange交换机名称, String type 类型
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            //使用channel定义队列
            //String queue  队列名称,
            // boolean durable 是否持久化(是否存储到磁盘中),
            // boolean exclusive 是否被多个消费者消费  false 允许多个消费者,
            // boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
            // Map<String, Object> arguments  定义其他参数
            channel.queueDeclare(QUEUE_NAME_3,true,false,false,null);
            channel.queueDeclare(QUEUE_NAME_4,true,false,false,null);
            channel.queueDeclare(QUEUE_NAME_5,true,false,false,null);

            //让交换和队列进行绑定
            //String queue, String exchange, String routingKey
            channel.queueBind(QUEUE_NAME_3,EXCHANGE_NAME,"");
            channel.queueBind(QUEUE_NAME_4,EXCHANGE_NAME,"");
            channel.queueBind(QUEUE_NAME_5,EXCHANGE_NAME,"");
            for (int i = 1; i <= 10; i++) {
                //定义发送的消息
                String  sendMsg = "hello qy178!!!"+i;
                //发送消息
                //String exchange 交换机名称,
                // String routingKey 路由键,
                // AMQP.BasicProperties props 发送时传递其他参数, byte[] body  消息的字节数组
                //交换机名称为空串“”, 不是不使用交换机,而是使用默认交换机
                channel.basicPublish(EXCHANGE_NAME,"", MessageProperties.PERSISTENT_TEXT_PLAIN,sendMsg.getBytes());

            }
            //打印提示
            System.out.println("发送完毕!");
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                //关闭资源
                if(channel!=null){
                    channel.close();
                }
                if(connection!=null){
                    connection.close();
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (TimeoutException e) {
                throw new RuntimeException(e);
            }

        }



    }
}

消费者1

package com.aaa.rabbitmq.demo3;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @FileName: Consumer
 * @Description:
 * @Author: zhz
 * @CreateTime: 2024/12/10 9:36
 * @Version: 1.0.0
 */
public class ConsumerA {
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //实例化ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设置创建工厂的参数
            //链接主机IP
            connectionFactory.setHost("192.168.170.30");
            //设置vhost
            connectionFactory.setVirtualHost("/root/");
            //设置用户和密码
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("tiger");
            //设置端口号
            connectionFactory.setPort(5672);
            //使用链接工厂创建连接
            connection = connectionFactory.newConnection();
            //使用链接创建channel
            channel = connection.createChannel();
            //使用channel定义队列
            //String queue  队列名称,
            // boolean durable 是否持久化(是否存储到磁盘中),
            // boolean exclusive 是否被多个消费者消费  false 允许多个消费者,
            // boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
            // Map<String, Object> arguments  定义其他参数
            channel.queueDeclare(Producer.QUEUE_NAME_3,true,false,false,null);
            //使用channel获取消息  amq.ctag-E0c-YqMVGbLBAm9ZtT2SfA
            //String queue 队列名称,
            // boolean autoAck 自动确定  消费者一旦拿到消息,自动确认拿到了,
            // DeliverCallback deliverCallback 交付回调,
            // CancelCallback cancelCallback   取消回调
          /*  channel.basicConsume(Producer.QUEUE_NAME,(String s, Delivery delivery)->{
                System.out.println("回调处理信息:"+s);
                //delivery交付对象,存储消息的所有信息  delivery.getBody()消息的字节数组
                System.out.println("消费的消息为:"+new String(delivery.getBody()));
                //消息的其他信息:
                Envelope envelope = delivery.getEnvelope();
                String routingKey = envelope.getRoutingKey();
                String exchange = envelope.getExchange();
                long deliveryTag = envelope.getDeliveryTag();
                System.out.println("路由键routingKey:"+routingKey);
                System.out.println("交换机exchange:"+exchange);
                System.out.println("消息唯一的数字标识:"+deliveryTag);
            },(String s)->{
                System.out.println("回调处理信息:"+s);});*/
             //使用内部类,实例化接口
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("回调处理信息:"+s);
                    //delivery交付对象,存储消息的所有信息  delivery.getBody()消息的字节数组
                    System.out.println("模拟消费者,获取日志信息:"+new String(delivery.getBody())+",模仿打印控制台。。。。");
                    //消息的其他信息:
                    Envelope envelope = delivery.getEnvelope();
                    String routingKey = envelope.getRoutingKey();
                    String exchange = envelope.getExchange();
                    long deliveryTag = envelope.getDeliveryTag();
                    System.out.println("路由键routingKey:"+routingKey);
                    System.out.println("交换机exchange:"+exchange);
                    System.out.println("消息唯一的数字标识:"+deliveryTag);
                }
            };
            CancelCallback cancelCallback = new CancelCallback(){
                @Override
                public void handle(String s) throws IOException {
                    System.out.println("回调处理信息:"+s);
                }
            };
            //消费者消费消息不是一次性的,所以消费者不能关闭资源
            channel.basicConsume(Producer.QUEUE_NAME_3,true, deliverCallback,cancelCallback);

        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        } finally {
            /*try {
                //关闭资源
                if(channel!=null){
                    channel.close();
                }
                if(connection!=null){
                    connection.close();
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (TimeoutException e) {
                throw new RuntimeException(e);
            }*/

        }
    }
}

消费者2

package com.aaa.rabbitmq.demo3;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @FileName: Consumer
 * @Description:
 * @Author: zhz
 * @CreateTime: 2024/12/10 9:36
 * @Version: 1.0.0
 */
public class ConsumerB {
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //实例化ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设置创建工厂的参数
            //链接主机IP
            connectionFactory.setHost("192.168.170.30");
            //设置vhost
            connectionFactory.setVirtualHost("/root/");
            //设置用户和密码
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("tiger");
            //设置端口号
            connectionFactory.setPort(5672);
            //使用链接工厂创建连接
            connection = connectionFactory.newConnection();
            //使用链接创建channel
            channel = connection.createChannel();
            //使用channel定义队列
            //String queue  队列名称,
            // boolean durable 是否持久化(是否存储到磁盘中),
            // boolean exclusive 是否被多个消费者消费  false 允许多个消费者,
            // boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
            // Map<String, Object> arguments  定义其他参数
            channel.queueDeclare(Producer.QUEUE_NAME_4,true,false,false,null);
            //使用channel获取消息  amq.ctag-E0c-YqMVGbLBAm9ZtT2SfA
            //String queue 队列名称,
            // boolean autoAck 自动确定  消费者一旦拿到消息,自动确认拿到了,
            // DeliverCallback deliverCallback 交付回调,
            // CancelCallback cancelCallback   取消回调
          /*  channel.basicConsume(Producer.QUEUE_NAME,(String s, Delivery delivery)->{
                System.out.println("回调处理信息:"+s);
                //delivery交付对象,存储消息的所有信息  delivery.getBody()消息的字节数组
                System.out.println("消费的消息为:"+new String(delivery.getBody()));
                //消息的其他信息:
                Envelope envelope = delivery.getEnvelope();
                String routingKey = envelope.getRoutingKey();
                String exchange = envelope.getExchange();
                long deliveryTag = envelope.getDeliveryTag();
                System.out.println("路由键routingKey:"+routingKey);
                System.out.println("交换机exchange:"+exchange);
                System.out.println("消息唯一的数字标识:"+deliveryTag);
            },(String s)->{
                System.out.println("回调处理信息:"+s);});*/
             //使用内部类,实例化接口
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("回调处理信息:"+s);
                    //delivery交付对象,存储消息的所有信息  delivery.getBody()消息的字节数组
                    System.out.println("模拟消费者,获取日志信息:"+new String(delivery.getBody())+",模仿存储到磁盘。。。。");
                    //消息的其他信息:
                    Envelope envelope = delivery.getEnvelope();
                    String routingKey = envelope.getRoutingKey();
                    String exchange = envelope.getExchange();
                    long deliveryTag = envelope.getDeliveryTag();
                    System.out.println("路由键routingKey:"+routingKey);
                    System.out.println("交换机exchange:"+exchange);
                    System.out.println("消息唯一的数字标识:"+deliveryTag);
                }
            };
            CancelCallback cancelCallback = new CancelCallback(){
                @Override
                public void handle(String s) throws IOException {
                    System.out.println("回调处理信息:"+s);
                }
            };
            //消费者消费消息不是一次性的,所以消费者不能关闭资源
            channel.basicConsume(Producer.QUEUE_NAME_4,true, deliverCallback,cancelCallback);

        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        } finally {
            /*try {
                //关闭资源
                if(channel!=null){
                    channel.close();
                }
                if(connection!=null){
                    connection.close();
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (TimeoutException e) {
                throw new RuntimeException(e);
            }*/

        }
    }
}

消费者3

package com.aaa.rabbitmq.demo3;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @FileName: Consumer
 * @Description:
 * @Author: zhz
 * @CreateTime: 2024/12/10 9:36
 * @Version: 1.0.0
 */
public class ConsumerC {
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //实例化ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设置创建工厂的参数
            //链接主机IP
            connectionFactory.setHost("192.168.170.30");
            //设置vhost
            connectionFactory.setVirtualHost("/root/");
            //设置用户和密码
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("tiger");
            //设置端口号
            connectionFactory.setPort(5672);
            //使用链接工厂创建连接
            connection = connectionFactory.newConnection();
            //使用链接创建channel
            channel = connection.createChannel();
            //使用channel定义队列
            //String queue  队列名称,
            // boolean durable 是否持久化(是否存储到磁盘中),
            // boolean exclusive 是否被多个消费者消费  false 允许多个消费者,
            // boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
            // Map<String, Object> arguments  定义其他参数
            channel.queueDeclare(Producer.QUEUE_NAME_5,true,false,false,null);
            //使用channel获取消息  amq.ctag-E0c-YqMVGbLBAm9ZtT2SfA
            //String queue 队列名称,
            // boolean autoAck 自动确定  消费者一旦拿到消息,自动确认拿到了,
            // DeliverCallback deliverCallback 交付回调,
            // CancelCallback cancelCallback   取消回调
          /*  channel.basicConsume(Producer.QUEUE_NAME,(String s, Delivery delivery)->{
                System.out.println("回调处理信息:"+s);
                //delivery交付对象,存储消息的所有信息  delivery.getBody()消息的字节数组
                System.out.println("消费的消息为:"+new String(delivery.getBody()));
                //消息的其他信息:
                Envelope envelope = delivery.getEnvelope();
                String routingKey = envelope.getRoutingKey();
                String exchange = envelope.getExchange();
                long deliveryTag = envelope.getDeliveryTag();
                System.out.println("路由键routingKey:"+routingKey);
                System.out.println("交换机exchange:"+exchange);
                System.out.println("消息唯一的数字标识:"+deliveryTag);
            },(String s)->{
                System.out.println("回调处理信息:"+s);});*/
             //使用内部类,实例化接口
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("回调处理信息:"+s);
                    //delivery交付对象,存储消息的所有信息  delivery.getBody()消息的字节数组
                    System.out.println("模拟消费者,获取日志信息:"+new String(delivery.getBody())+",模仿存储到数据库。。。。");
                    //消息的其他信息:
                    Envelope envelope = delivery.getEnvelope();
                    String routingKey = envelope.getRoutingKey();
                    String exchange = envelope.getExchange();
                    long deliveryTag = envelope.getDeliveryTag();
                    System.out.println("路由键routingKey:"+routingKey);
                    System.out.println("交换机exchange:"+exchange);
                    System.out.println("消息唯一的数字标识:"+deliveryTag);
                }
            };
            CancelCallback cancelCallback = new CancelCallback(){
                @Override
                public void handle(String s) throws IOException {
                    System.out.println("回调处理信息:"+s);
                }
            };
            //消费者消费消息不是一次性的,所以消费者不能关闭资源
            channel.basicConsume(Producer.QUEUE_NAME_5,true, deliverCallback,cancelCallback);

        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        } finally {
            /*try {
                //关闭资源
                if(channel!=null){
                    channel.close();
                }
                if(connection!=null){
                    connection.close();
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (TimeoutException e) {
                throw new RuntimeException(e);
            }*/

        }
    }
}

路由模式
简介
直接(direct)模式:添加一个功能 - 我们将使其可以仅订阅消息的子集。当前演示的是Exchange类型为direct使用。直接direct绑定比扇出fanout更加灵活。

生产者

package com.aaa.rabbitmq.demo4;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;

/**
 * @FileName: Producer
 * @Description:  路由模式:  需求  日志类型分类trace,debug,info,warn,error,fatal    扇出中不管什么类型,消费者都必须消费,
 * 让消息接收更加灵活  只接受消息子集
 * 当前模式下不去都消费  ,选择error  fatal  保存数据库      warn  保存到磁盘    trace,debug,info打印控制台
 * @Author: zhz
 * @CreateTime: 2024/12/10 9:02
 * @Version: 1.0.0
 */
public class Producer {

    //定义常量是不能更改
    public static final  String QUEUE_NAME_6 = "queue6";
    public static final  String QUEUE_NAME_7 = "queue7";
    public static final  String QUEUE_NAME_8 = "queue8";
    private static  final  String EXCHANGE_NAME="exchange_direct";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //实例化ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设置创建工厂的参数
            //链接主机IP
            connectionFactory.setHost("192.168.170.30");
            //设置vhost
            connectionFactory.setVirtualHost("/root/");
            //设置用户和密码
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("tiger");
            //设置端口号
            connectionFactory.setPort(5672);
            //使用链接工厂创建连接
            connection = connectionFactory.newConnection();
            //使用链接创建channel
            channel = connection.createChannel();
            //使用channel定义交换机
            //String exchange交换机名称, String type 类型
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            //使用channel定义队列
            //String queue  队列名称,
            // boolean durable 是否持久化(是否存储到磁盘中),
            // boolean exclusive 是否被多个消费者消费  false 允许多个消费者,
            // boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
            // Map<String, Object> arguments  定义其他参数
            channel.queueDeclare(QUEUE_NAME_6,true,false,false,null);
            channel.queueDeclare(QUEUE_NAME_7,true,false,false,null);
            channel.queueDeclare(QUEUE_NAME_8,true,false,false,null);

            //让交换和队列进行绑定
            //String queue, String exchange, String routingKey
            channel.queueBind(QUEUE_NAME_6,EXCHANGE_NAME,"fatal");
            channel.queueBind(QUEUE_NAME_6,EXCHANGE_NAME,"error");

            channel.queueBind(QUEUE_NAME_7,EXCHANGE_NAME,"warn");

            channel.queueBind(QUEUE_NAME_8,EXCHANGE_NAME,"trace");
            channel.queueBind(QUEUE_NAME_8,EXCHANGE_NAME,"debug");
            channel.queueBind(QUEUE_NAME_8,EXCHANGE_NAME,"info");

            String[] logTypeArray={"trace","debug","info","warn","error","fatal"};

            Random random =new Random();

            for (int i = 1; i <= 50; i++) {
                int logTypeInt = random.nextInt(logTypeArray.length);
                String  logType= logTypeArray[logTypeInt];
                //定义发送的消息
                String  sendMsg = "hello qy178!!!"+i+",日志类型为:"+logType;
                //发送消息
                //String exchange 交换机名称,
                // String routingKey 路由键,
                // AMQP.BasicProperties props 发送时传递其他参数, byte[] body  消息的字节数组
                //交换机名称为空串“”, 不是不使用交换机,而是使用默认交换机
                channel.basicPublish(EXCHANGE_NAME,logType, MessageProperties.PERSISTENT_TEXT_PLAIN,sendMsg.getBytes());

            }
            //打印提示
            System.out.println("发送完毕!");
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                //关闭资源
                if(channel!=null){
                    channel.close();
                }
                if(connection!=null){
                    connection.close();
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (TimeoutException e) {
                throw new RuntimeException(e);
            }

        }



    }
}

消费者1

package com.aaa.rabbitmq.demo4;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @FileName: Consumer
 * @Description:
 * @Author: zhz
 * @CreateTime: 2024/12/10 9:36
 * @Version: 1.0.0
 */
public class ConsumerA {
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //实例化ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设置创建工厂的参数
            //链接主机IP
            connectionFactory.setHost("192.168.170.30");
            //设置vhost
            connectionFactory.setVirtualHost("/root/");
            //设置用户和密码
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("tiger");
            //设置端口号
            connectionFactory.setPort(5672);
            //使用链接工厂创建连接
            connection = connectionFactory.newConnection();
            //使用链接创建channel
            channel = connection.createChannel();
            //使用channel定义队列
            //String queue  队列名称,
            // boolean durable 是否持久化(是否存储到磁盘中),
            // boolean exclusive 是否被多个消费者消费  false 允许多个消费者,
            // boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
            // Map<String, Object> arguments  定义其他参数
            channel.queueDeclare(Producer.QUEUE_NAME_6,true,false,false,null);
            //使用channel获取消息  amq.ctag-E0c-YqMVGbLBAm9ZtT2SfA
            //String queue 队列名称,
            // boolean autoAck 自动确定  消费者一旦拿到消息,自动确认拿到了,
            // DeliverCallback deliverCallback 交付回调,
            // CancelCallback cancelCallback   取消回调
          /*  channel.basicConsume(Producer.QUEUE_NAME,(String s, Delivery delivery)->{
                System.out.println("回调处理信息:"+s);
                //delivery交付对象,存储消息的所有信息  delivery.getBody()消息的字节数组
                System.out.println("消费的消息为:"+new String(delivery.getBody()));
                //消息的其他信息:
                Envelope envelope = delivery.getEnvelope();
                String routingKey = envelope.getRoutingKey();
                String exchange = envelope.getExchange();
                long deliveryTag = envelope.getDeliveryTag();
                System.out.println("路由键routingKey:"+routingKey);
                System.out.println("交换机exchange:"+exchange);
                System.out.println("消息唯一的数字标识:"+deliveryTag);
            },(String s)->{
                System.out.println("回调处理信息:"+s);});*/
             //使用内部类,实例化接口
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("回调处理信息:"+s);
                    //delivery交付对象,存储消息的所有信息  delivery.getBody()消息的字节数组
                    System.out.println("模拟消费者,获取日志信息,或者error和fatal:"+new String(delivery.getBody())+",模仿放入数据库。。。。");
                    //消息的其他信息:
                    Envelope envelope = delivery.getEnvelope();
                    String routingKey = envelope.getRoutingKey();
                    String exchange = envelope.getExchange();
                    long deliveryTag = envelope.getDeliveryTag();
                    System.out.println("路由键routingKey:"+routingKey);
                    System.out.println("交换机exchange:"+exchange);
                    System.out.println("消息唯一的数字标识:"+deliveryTag);
                }
            };
            CancelCallback cancelCallback = new CancelCallback(){
                @Override
                public void handle(String s) throws IOException {
                    System.out.println("回调处理信息:"+s);
                }
            };
            //消费者消费消息不是一次性的,所以消费者不能关闭资源
            channel.basicConsume(Producer.QUEUE_NAME_6,true, deliverCallback,cancelCallback);

        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        } finally {
            /*try {
                //关闭资源
                if(channel!=null){
                    channel.close();
                }
                if(connection!=null){
                    connection.close();
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (TimeoutException e) {
                throw new RuntimeException(e);
            }*/

        }
    }
}

消费者2

package com.aaa.rabbitmq.demo4;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @FileName: Consumer
 * @Description:
 * @Author: zhz
 * @CreateTime: 2024/12/10 9:36
 * @Version: 1.0.0
 */
public class ConsumerB {
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //实例化ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设置创建工厂的参数
            //链接主机IP
            connectionFactory.setHost("192.168.170.30");
            //设置vhost
            connectionFactory.setVirtualHost("/root/");
            //设置用户和密码
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("tiger");
            //设置端口号
            connectionFactory.setPort(5672);
            //使用链接工厂创建连接
            connection = connectionFactory.newConnection();
            //使用链接创建channel
            channel = connection.createChannel();
            //使用channel定义队列
            //String queue  队列名称,
            // boolean durable 是否持久化(是否存储到磁盘中),
            // boolean exclusive 是否被多个消费者消费  false 允许多个消费者,
            // boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
            // Map<String, Object> arguments  定义其他参数
            channel.queueDeclare(Producer.QUEUE_NAME_7,true,false,false,null);
            //使用channel获取消息  amq.ctag-E0c-YqMVGbLBAm9ZtT2SfA
            //String queue 队列名称,
            // boolean autoAck 自动确定  消费者一旦拿到消息,自动确认拿到了,
            // DeliverCallback deliverCallback 交付回调,
            // CancelCallback cancelCallback   取消回调
          /*  channel.basicConsume(Producer.QUEUE_NAME,(String s, Delivery delivery)->{
                System.out.println("回调处理信息:"+s);
                //delivery交付对象,存储消息的所有信息  delivery.getBody()消息的字节数组
                System.out.println("消费的消息为:"+new String(delivery.getBody()));
                //消息的其他信息:
                Envelope envelope = delivery.getEnvelope();
                String routingKey = envelope.getRoutingKey();
                String exchange = envelope.getExchange();
                long deliveryTag = envelope.getDeliveryTag();
                System.out.println("路由键routingKey:"+routingKey);
                System.out.println("交换机exchange:"+exchange);
                System.out.println("消息唯一的数字标识:"+deliveryTag);
            },(String s)->{
                System.out.println("回调处理信息:"+s);});*/
             //使用内部类,实例化接口
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("回调处理信息:"+s);
                    //delivery交付对象,存储消息的所有信息  delivery.getBody()消息的字节数组
                    System.out.println("模拟消费者,获取日志信息,warn:"+new String(delivery.getBody())+",模仿存入磁盘。。。。");
                    //消息的其他信息:
                    Envelope envelope = delivery.getEnvelope();
                    String routingKey = envelope.getRoutingKey();
                    String exchange = envelope.getExchange();
                    long deliveryTag = envelope.getDeliveryTag();
                    System.out.println("路由键routingKey:"+routingKey);
                    System.out.println("交换机exchange:"+exchange);
                    System.out.println("消息唯一的数字标识:"+deliveryTag);
                }
            };
            CancelCallback cancelCallback = new CancelCallback(){
                @Override
                public void handle(String s) throws IOException {
                    System.out.println("回调处理信息:"+s);
                }
            };
            //消费者消费消息不是一次性的,所以消费者不能关闭资源
            channel.basicConsume(Producer.QUEUE_NAME_7,true, deliverCallback,cancelCallback);

        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        } finally {
            /*try {
                //关闭资源
                if(channel!=null){
                    channel.close();
                }
                if(connection!=null){
                    connection.close();
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (TimeoutException e) {
                throw new RuntimeException(e);
            }*/

        }
    }
}

消费者3

package com.aaa.rabbitmq.demo4;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @FileName: Consumer
 * @Description:
 * @Author: zhz
 * @CreateTime: 2024/12/10 9:36
 * @Version: 1.0.0
 */
public class ConsumerC {
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //实例化ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设置创建工厂的参数
            //链接主机IP
            connectionFactory.setHost("192.168.170.30");
            //设置vhost
            connectionFactory.setVirtualHost("/root/");
            //设置用户和密码
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("tiger");
            //设置端口号
            connectionFactory.setPort(5672);
            //使用链接工厂创建连接
            connection = connectionFactory.newConnection();
            //使用链接创建channel
            channel = connection.createChannel();
            //使用channel定义队列
            //String queue  队列名称,
            // boolean durable 是否持久化(是否存储到磁盘中),
            // boolean exclusive 是否被多个消费者消费  false 允许多个消费者,
            // boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
            // Map<String, Object> arguments  定义其他参数
            channel.queueDeclare(Producer.QUEUE_NAME_8,true,false,false,null);
            //使用channel获取消息  amq.ctag-E0c-YqMVGbLBAm9ZtT2SfA
            //String queue 队列名称,
            // boolean autoAck 自动确定  消费者一旦拿到消息,自动确认拿到了,
            // DeliverCallback deliverCallback 交付回调,
            // CancelCallback cancelCallback   取消回调
          /*  channel.basicConsume(Producer.QUEUE_NAME,(String s, Delivery delivery)->{
                System.out.println("回调处理信息:"+s);
                //delivery交付对象,存储消息的所有信息  delivery.getBody()消息的字节数组
                System.out.println("消费的消息为:"+new String(delivery.getBody()));
                //消息的其他信息:
                Envelope envelope = delivery.getEnvelope();
                String routingKey = envelope.getRoutingKey();
                String exchange = envelope.getExchange();
                long deliveryTag = envelope.getDeliveryTag();
                System.out.println("路由键routingKey:"+routingKey);
                System.out.println("交换机exchange:"+exchange);
                System.out.println("消息唯一的数字标识:"+deliveryTag);
            },(String s)->{
                System.out.println("回调处理信息:"+s);});*/
             //使用内部类,实例化接口
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("回调处理信息:"+s);
                    //delivery交付对象,存储消息的所有信息  delivery.getBody()消息的字节数组
                    System.out.println("模拟消费者,获取日志信息,接受trace,debug,info:"+new String(delivery.getBody())+",模仿打印控制台。。。。");
                    //消息的其他信息:
                    Envelope envelope = delivery.getEnvelope();
                    String routingKey = envelope.getRoutingKey();
                    String exchange = envelope.getExchange();
                    long deliveryTag = envelope.getDeliveryTag();
                    System.out.println("路由键routingKey:"+routingKey);
                    System.out.println("交换机exchange:"+exchange);
                    System.out.println("消息唯一的数字标识:"+deliveryTag);
                }
            };
            CancelCallback cancelCallback = new CancelCallback(){
                @Override
                public void handle(String s) throws IOException {
                    System.out.println("回调处理信息:"+s);
                }
            };
            //消费者消费消息不是一次性的,所以消费者不能关闭资源
            channel.basicConsume(Producer.QUEUE_NAME_8,true, deliverCallback,cancelCallback);

        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        } finally {
            /*try {
                //关闭资源
                if(channel!=null){
                    channel.close();
                }
                if(connection!=null){
                    connection.close();
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (TimeoutException e) {
                throw new RuntimeException(e);
            }*/

        }
    }
}

话题模式
简介
通配符(topic)模式:尽管使用直接direct交换改进了我们的系统,但它仍然有局限性——它不能基于多个标准进行路由。当前演示的是Exchange类型为topic使用。topic方式可以基于多个标准进行路由

路由规则:单词列表,由点分隔,路由键中可以有任意多的单词,最多为 255 个字节。topic比direct方式,本质上更灵活,使用绑定键有两个重要的特殊符号:
*(星号)可以只替换一个单词。

(hash) 可以代替零个或多个单词。 ..*

生产者

package com.aaa.rabbitmq.demo5;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;

/**
 * @FileName: Producer
 * @Description:   话题模式,比路由模式更灵活   可以#代表0到多个单词  *代表1个单词进行路由key的匹配
 * @Author: zhz
 * @CreateTime: 2024/12/10 9:02
 * @Version: 1.0.0
 */
public class Producer {

    //定义常量是不能更改
    public static final  String QUEUE_NAME_9 = "queue9"; //喜欢橙色的动物
    public static final  String QUEUE_NAME_10 = "queue10";//喜欢懒惰或者兔子
    public static final  String QUEUE_NAME_11 = "queue11";//喜欢速度快的
    private static  final  String EXCHANGE_NAME="exchange_topic";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //实例化ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设置创建工厂的参数
            //链接主机IP
            connectionFactory.setHost("192.168.170.30");
            //设置vhost
            connectionFactory.setVirtualHost("/root/");
            //设置用户和密码
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("tiger");
            //设置端口号
            connectionFactory.setPort(5672);
            //使用链接工厂创建连接
            connection = connectionFactory.newConnection();
            //使用链接创建channel
            channel = connection.createChannel();
            //使用channel定义交换机
            //String exchange交换机名称, String type 类型
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            //使用channel定义队列
            //String queue  队列名称,
            // boolean durable 是否持久化(是否存储到磁盘中),
            // boolean exclusive 是否被多个消费者消费  false 允许多个消费者,
            // boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
            // Map<String, Object> arguments  定义其他参数
            channel.queueDeclare(QUEUE_NAME_9,true,false,false,null);
            channel.queueDeclare(QUEUE_NAME_10,true,false,false,null);
            channel.queueDeclare(QUEUE_NAME_11,true,false,false,null);

            //让交换和队列进行绑定
            //String queue, String exchange, String routingKey
            channel.queueBind(QUEUE_NAME_9,EXCHANGE_NAME,"*.orange.*");

            channel.queueBind(QUEUE_NAME_10,EXCHANGE_NAME,"lazy.#");
            channel.queueBind(QUEUE_NAME_10,EXCHANGE_NAME,"*.*.rabbit");

            channel.queueBind(QUEUE_NAME_11,EXCHANGE_NAME,"quick.*.*");

            String[] animalArray={"quick.orange.rabbit","lazy.","lazy.a.b.c.d","lazy.orange.elephant","quick.orange.fox","lazy.brown.fox","lazy.pink.rabbit","quick.brown.fox"};

            Random random =new Random();

            for (int i = 1; i <= 20; i++) {
                int logTypeInt = random.nextInt(animalArray.length);
                String  animal= animalArray[logTypeInt];
                //定义发送的消息
                String  sendMsg = "hello qy178!!!"+i+",动物为:"+animal;
                //发送消息
                //String exchange 交换机名称,
                // String routingKey 路由键,
                // AMQP.BasicProperties props 发送时传递其他参数, byte[] body  消息的字节数组
                //交换机名称为空串“”, 不是不使用交换机,而是使用默认交换机
                channel.basicPublish(EXCHANGE_NAME,animal, MessageProperties.PERSISTENT_TEXT_PLAIN,sendMsg.getBytes());

            }
            //打印提示
            System.out.println("发送完毕!");
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                //关闭资源
                if(channel!=null){
                    channel.close();
                }
                if(connection!=null){
                    connection.close();
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (TimeoutException e) {
                throw new RuntimeException(e);
            }

        }



    }
}

消费者1

package com.aaa.rabbitmq.demo5;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @FileName: Consumer
 * @Description:
 * @Author: zhz
 * @CreateTime: 2024/12/10 9:36
 * @Version: 1.0.0
 */
public class ConsumerA {
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //实例化ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设置创建工厂的参数
            //链接主机IP
            connectionFactory.setHost("192.168.170.30");
            //设置vhost
            connectionFactory.setVirtualHost("/root/");
            //设置用户和密码
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("tiger");
            //设置端口号
            connectionFactory.setPort(5672);
            //使用链接工厂创建连接
            connection = connectionFactory.newConnection();
            //使用链接创建channel
            channel = connection.createChannel();
            //使用channel定义队列
            //String queue  队列名称,
            // boolean durable 是否持久化(是否存储到磁盘中),
            // boolean exclusive 是否被多个消费者消费  false 允许多个消费者,
            // boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
            // Map<String, Object> arguments  定义其他参数
            channel.queueDeclare(Producer.QUEUE_NAME_9,true,false,false,null);
            //使用channel获取消息  amq.ctag-E0c-YqMVGbLBAm9ZtT2SfA
            //String queue 队列名称,
            // boolean autoAck 自动确定  消费者一旦拿到消息,自动确认拿到了,
            // DeliverCallback deliverCallback 交付回调,
            // CancelCallback cancelCallback   取消回调
          /*  channel.basicConsume(Producer.QUEUE_NAME,(String s, Delivery delivery)->{
                System.out.println("回调处理信息:"+s);
                //delivery交付对象,存储消息的所有信息  delivery.getBody()消息的字节数组
                System.out.println("消费的消息为:"+new String(delivery.getBody()));
                //消息的其他信息:
                Envelope envelope = delivery.getEnvelope();
                String routingKey = envelope.getRoutingKey();
                String exchange = envelope.getExchange();
                long deliveryTag = envelope.getDeliveryTag();
                System.out.println("路由键routingKey:"+routingKey);
                System.out.println("交换机exchange:"+exchange);
                System.out.println("消息唯一的数字标识:"+deliveryTag);
            },(String s)->{
                System.out.println("回调处理信息:"+s);});*/
             //使用内部类,实例化接口
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("回调处理信息:"+s);
                    //delivery交付对象,存储消息的所有信息  delivery.getBody()消息的字节数组
                    System.out.println("模拟消费者,喜欢动物是橙色:"+new String(delivery.getBody())+"。。。。");
                    //消息的其他信息:
                    Envelope envelope = delivery.getEnvelope();
                    String routingKey = envelope.getRoutingKey();
                    String exchange = envelope.getExchange();
                    long deliveryTag = envelope.getDeliveryTag();
                    System.out.println("路由键routingKey:"+routingKey);
                    System.out.println("交换机exchange:"+exchange);
                    System.out.println("消息唯一的数字标识:"+deliveryTag);
                }
            };
            CancelCallback cancelCallback = new CancelCallback(){
                @Override
                public void handle(String s) throws IOException {
                    System.out.println("回调处理信息:"+s);
                }
            };
            //消费者消费消息不是一次性的,所以消费者不能关闭资源
            channel.basicConsume(Producer.QUEUE_NAME_9,true, deliverCallback,cancelCallback);

        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        } finally {
            /*try {
                //关闭资源
                if(channel!=null){
                    channel.close();
                }
                if(connection!=null){
                    connection.close();
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (TimeoutException e) {
                throw new RuntimeException(e);
            }*/

        }
    }
}

消费者2

package com.aaa.rabbitmq.demo5;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @FileName: Consumer
 * @Description:
 * @Author: zhz
 * @CreateTime: 2024/12/10 9:36
 * @Version: 1.0.0
 */
public class ConsumerB {
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //实例化ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设置创建工厂的参数
            //链接主机IP
            connectionFactory.setHost("192.168.170.30");
            //设置vhost
            connectionFactory.setVirtualHost("/root/");
            //设置用户和密码
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("tiger");
            //设置端口号
            connectionFactory.setPort(5672);
            //使用链接工厂创建连接
            connection = connectionFactory.newConnection();
            //使用链接创建channel
            channel = connection.createChannel();
            //使用channel定义队列
            //String queue  队列名称,
            // boolean durable 是否持久化(是否存储到磁盘中),
            // boolean exclusive 是否被多个消费者消费  false 允许多个消费者,
            // boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
            // Map<String, Object> arguments  定义其他参数
            channel.queueDeclare(Producer.QUEUE_NAME_10,true,false,false,null);
            //使用channel获取消息  amq.ctag-E0c-YqMVGbLBAm9ZtT2SfA
            //String queue 队列名称,
            // boolean autoAck 自动确定  消费者一旦拿到消息,自动确认拿到了,
            // DeliverCallback deliverCallback 交付回调,
            // CancelCallback cancelCallback   取消回调
          /*  channel.basicConsume(Producer.QUEUE_NAME,(String s, Delivery delivery)->{
                System.out.println("回调处理信息:"+s);
                //delivery交付对象,存储消息的所有信息  delivery.getBody()消息的字节数组
                System.out.println("消费的消息为:"+new String(delivery.getBody()));
                //消息的其他信息:
                Envelope envelope = delivery.getEnvelope();
                String routingKey = envelope.getRoutingKey();
                String exchange = envelope.getExchange();
                long deliveryTag = envelope.getDeliveryTag();
                System.out.println("路由键routingKey:"+routingKey);
                System.out.println("交换机exchange:"+exchange);
                System.out.println("消息唯一的数字标识:"+deliveryTag);
            },(String s)->{
                System.out.println("回调处理信息:"+s);});*/
             //使用内部类,实例化接口
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("回调处理信息:"+s);
                    //delivery交付对象,存储消息的所有信息  delivery.getBody()消息的字节数组
                    System.out.println("模拟消费者,喜欢动物是懒惰的或者兔子:"+new String(delivery.getBody())+"。。。。");
                    //消息的其他信息:
                    Envelope envelope = delivery.getEnvelope();
                    String routingKey = envelope.getRoutingKey();
                    String exchange = envelope.getExchange();
                    long deliveryTag = envelope.getDeliveryTag();
                    System.out.println("路由键routingKey:"+routingKey);
                    System.out.println("交换机exchange:"+exchange);
                    System.out.println("消息唯一的数字标识:"+deliveryTag);
                }
            };
            CancelCallback cancelCallback = new CancelCallback(){
                @Override
                public void handle(String s) throws IOException {
                    System.out.println("回调处理信息:"+s);
                }
            };
            //消费者消费消息不是一次性的,所以消费者不能关闭资源
            channel.basicConsume(Producer.QUEUE_NAME_10,true, deliverCallback,cancelCallback);

        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        } finally {
            /*try {
                //关闭资源
                if(channel!=null){
                    channel.close();
                }
                if(connection!=null){
                    connection.close();
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (TimeoutException e) {
                throw new RuntimeException(e);
            }*/

        }
    }
}

消费者3

package com.aaa.rabbitmq.demo5;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @FileName: Consumer
 * @Description:
 * @Author: zhz
 * @CreateTime: 2024/12/10 9:36
 * @Version: 1.0.0
 */
public class ConsumerC {
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //实例化ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设置创建工厂的参数
            //链接主机IP
            connectionFactory.setHost("192.168.170.30");
            //设置vhost
            connectionFactory.setVirtualHost("/root/");
            //设置用户和密码
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("tiger");
            //设置端口号
            connectionFactory.setPort(5672);
            //使用链接工厂创建连接
            connection = connectionFactory.newConnection();
            //使用链接创建channel
            channel = connection.createChannel();
            //使用channel定义队列
            //String queue  队列名称,
            // boolean durable 是否持久化(是否存储到磁盘中),
            // boolean exclusive 是否被多个消费者消费  false 允许多个消费者,
            // boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
            // Map<String, Object> arguments  定义其他参数
            channel.queueDeclare(Producer.QUEUE_NAME_11,true,false,false,null);
            //使用channel获取消息  amq.ctag-E0c-YqMVGbLBAm9ZtT2SfA
            //String queue 队列名称,
            // boolean autoAck 自动确定  消费者一旦拿到消息,自动确认拿到了,
            // DeliverCallback deliverCallback 交付回调,
            // CancelCallback cancelCallback   取消回调
          /*  channel.basicConsume(Producer.QUEUE_NAME,(String s, Delivery delivery)->{
                System.out.println("回调处理信息:"+s);
                //delivery交付对象,存储消息的所有信息  delivery.getBody()消息的字节数组
                System.out.println("消费的消息为:"+new String(delivery.getBody()));
                //消息的其他信息:
                Envelope envelope = delivery.getEnvelope();
                String routingKey = envelope.getRoutingKey();
                String exchange = envelope.getExchange();
                long deliveryTag = envelope.getDeliveryTag();
                System.out.println("路由键routingKey:"+routingKey);
                System.out.println("交换机exchange:"+exchange);
                System.out.println("消息唯一的数字标识:"+deliveryTag);
            },(String s)->{
                System.out.println("回调处理信息:"+s);});*/
             //使用内部类,实例化接口
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("回调处理信息:"+s);
                    //delivery交付对象,存储消息的所有信息  delivery.getBody()消息的字节数组
                    System.out.println("模拟消费者,喜欢动物是快的:"+new String(delivery.getBody())+"。。。。");
                    //消息的其他信息:
                    Envelope envelope = delivery.getEnvelope();
                    String routingKey = envelope.getRoutingKey();
                    String exchange = envelope.getExchange();
                    long deliveryTag = envelope.getDeliveryTag();
                    System.out.println("路由键routingKey:"+routingKey);
                    System.out.println("交换机exchange:"+exchange);
                    System.out.println("消息唯一的数字标识:"+deliveryTag);
                }
            };
            CancelCallback cancelCallback = new CancelCallback(){
                @Override
                public void handle(String s) throws IOException {
                    System.out.println("回调处理信息:"+s);
                }
            };
            //消费者消费消息不是一次性的,所以消费者不能关闭资源
            channel.basicConsume(Producer.QUEUE_NAME_11,true, deliverCallback,cancelCallback);

        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        } finally {
            /*try {
                //关闭资源
                if(channel!=null){
                    channel.close();
                }
                if(connection!=null){
                    connection.close();
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (TimeoutException e) {
                throw new RuntimeException(e);
            }*/

        }
    }
}

持久化

标签:String,System,rabbitmq,delivery,println,channel,out
From: https://www.cnblogs.com/xiaomubupi/p/18642182

相关文章

  • 3、RabbitMQ队列之工作队列【RabbitMQ官方教程】
    工作队列使用 php-amqplib 在第一个教程中,我们编写了从命名队列发送和接收消息的程序。在本例中,我们将创建一个工作队列,用于在多个工作人员之间分配耗时的任务。工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务,并必须等待其完成。相反,我们把任务安排在以后......
  • 2、RabbitMQ队列之HelloWorld【RabbitMQ官方教程】
    简介1.本教程假设RabbitMQ已安装并在本地主机的标准端口(5672)上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。2.如果你在学习本教程时遇到困难,可以通过 GitHubDiscussions或者RabbitMQcommunityDiscord与我们联系RabbitMQ是一个消息代理:它接受和转发消息。......
  • 1、【RabbitMQ官方教程】简介
    简介RabbitMQ是一个开源的消息代理软件(也被称为消息队列),它实现了高级消息队列协议(AMQP)。本教程旨在帮助开发者通过RabbitMQ创建消息应用的基础知识。教程分为两部分:RabbitMQ队列和RabbitMQ流 RabbitMQ队列这部分教程涵盖了默认的RabbitMQ协议AMQP0-9-1。包括以下......
  • 消息中间件——rabbitmq,kafka,rocketmq
    目录mqmq解决什么问题rabbitmq工作原理消息路由如何保证消息不丢失实现高可用kafka能支持这么大吞吐量的原因如何保证消息不丢失避免重复消费如何保证消息顺序消费数据存储原理IRSleader选举rocketmq为什么不使用zookeeper分布式事务mqmessageQueue,消息......
  • (七).NET6.0部署RabbitMQ
    1.下载erlang语言包OTP。官网地址:https://www.erlang.org/downloads2.Rabbitmq官网下载地址:https://www.rabbitmq.com/download.html需要先安装Erlang语言包,然后再安装RabbitMQ,安装RabbitMQ的服务器名称(电脑名称),以及用户名称,不要带中文,有可能会导致服务无法识别服务器,导致一......
  • RabbitMQ 延迟任务(限时订单) 思路
    一、场景我们经常会碰见,一个需求就是,发送一条指令(消息),延迟一段时间执行,比如说常见的淘宝当下了一个订单后,订单支付时间为半个小时,如果半个小时没有支付,则关闭该订单。当然实现的方式有几种,今天来看看rabbitMQ实现的方式。二、思路:rabbitMQ如何实现1:rabbitMQ为每个队列......
  • RabbitMq的运用(1)—— 基础使用方法介绍
    目录1.RabbitMQ-异步世界的同步器1.1.同步调用1.2.异步调用2.RabbitMq安装1.1windows安装1.2docker安装3.网页端操作3.1登录 3.2队列3.3交换机3.3.1绑定交换机和队列的关系3.3.2发送消息到队列3.3.3在队列中查看信息 3.4更换用户3.4.1新增账号3.4.2创建虚拟......
  • SpringBoot 集成RabbitMQ
    springboot集成MQ 配置文件配置类 发送者 消费者 调用   ......
  • RabbitMQ
    支持多种语言RM架构:如何保证消息可靠性?1.保证消息可以到达Exchange异步回调(Confirums机制)步骤:开启confirms;设置confirms的异步回调(ACK/NCK)2.保证消息可以路由到QueueReturn机制3.保证Queue可以持久化消息RM重启后队列中的消息依然存在4.保证消费者可以正常消费信......
  • Windows 彻底卸载RabbitMQ
    要在Windows系统上彻底卸载RabbitMQ,确保删除所有相关文件和配置,请按照以下步骤操作:1.卸载RabbitMQ和Erlang打开“控制面板”,选择“程序和功能”。在已安装的程序列表中,找到“RabbitMQServer”,右键单击并选择“卸载”。同样地,找到“ErlangOTP”,右键单击并选择“卸载......