首页 > 其他分享 >RabbitMQ

RabbitMQ

时间:2023-02-16 20:36:27浏览次数:48  
标签:connectionFactory String 队列 RabbitMQ import 默认值 channel

RabbitMQ

img

img

img

img

img

img

img

img

img

img

img

img

img

img

img

img

img

img

img

img

img

img

Messaging that just works — RabbitMQ

案例

pom.xml

 <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

生产者

package com.www.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者:发送消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ProducerHelloWorld {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost("192.168.36.100");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost("/ljt");
        // 用户名 默认值 guest
        connectionFactory.setUsername("ljt");
        // 密码 默认值 guest
        connectionFactory.setPassword("ljt");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 5、创建队列 Queue
        /*
         queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         参数:
             1. queue:队列名称
             2. durable:是否持久化,当mq重启之后,还在
             3. exclusive:
                 * 是否独占。只能有一个消费者监听这队列
                 * 当Connection关闭时,是否删除队列
                 *
             4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
             5. arguments:参数。
         */
        // 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare(
                // 队列名称
                "hello_world",
                // 是否持久化,当mq重启之后,还在
                true,
                // 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
                false,
                // 是否自动删除。当没有Consumer时,自动删除掉
                false,
                // 参数
                null
        );
        
         /*
             basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
            参数:
                1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
                2. routingKey:路由名称
                3. props:配置信息
                4. body:发送消息数据
         */
        // 发送消息内容
        String body = "HelloWorld~~~~~~~~~~";
        
        // 6、发送消息
        channel.basicPublish(
                // 交换机名称。简单模式下交换机会使用默认的 ""
                "",
                // 路由名称
                "hello_world",
                // 配置信息
                null,
                // 发送消息数据
                body.getBytes()
        );
        
        // 释放资源
        channel.close();
        connection.close();
        
    }
}


消费者

package com.www.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者:获取消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ConsumerHelloWorld {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost("192.168.36.100");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost("/ljt");
        // 用户名 默认值 guest
        connectionFactory.setUsername("ljt");
        // 密码 默认值 guest
        connectionFactory.setPassword("ljt");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 5、创建队列 Queue
        /*
         queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         参数:
             1. queue:队列名称
             2. durable:是否持久化,当mq重启之后,还在
             3. exclusive:
                 * 是否独占。只能有一个消费者监听这队列
                 * 当Connection关闭时,是否删除队列
                 *
             4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
             5. arguments:参数。
         */
        // 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare(
                // 队列名称
                "hello_world",
                // 是否持久化,当mq重启之后,还在
                true,
                // 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
                false,
                // 是否自动删除。当没有Consumer时,自动删除掉
                false,
                // 参数
                null);
        
         /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
         */
        // 6、接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * <p>
             *   回调方法,当收到消息后,会自动执行该方法
             *      1. consumerTag:标识
             *      2. envelope:获取一些信息,交换机,路由key...
             *      3. properties:配置信息
             *      4. body:数据
             * </p>
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:" + consumerTag);
                System.out.println("Exchange:" + envelope.getExchange());
                System.out.println("RoutingKey:" + envelope.getRoutingKey());
                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body));
            }
        };
        channel.basicConsume(
                // 队列名称
                "hello_world",
                // 是否自动确认
                true,
                // 回调对象
                consumer
        );
        
        // 不需要关闭资源
    }
}


img

工作队列

img

生产者

package com.www.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者:发送消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ProducerWorkQueues {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost("192.168.36.100");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost("/ljt");
        // 用户名 默认值 guest
        connectionFactory.setUsername("ljt");
        // 密码 默认值 guest
        connectionFactory.setPassword("ljt");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 5、创建队列 Queue
        /*
         queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         参数:
             1. queue:队列名称
             2. durable:是否持久化,当mq重启之后,还在
             3. exclusive:
                 * 是否独占。只能有一个消费者监听这队列
                 * 当Connection关闭时,是否删除队列
                 *
             4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
             5. arguments:参数。
         */
        // 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare(
                // 队列名称
                "WorkQueue",
                // 是否持久化,当mq重启之后,还在
                true,
                // 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
                false,
                // 是否自动删除。当没有Consumer时,自动删除掉
                false,
                // 参数
                null
        );
        
         /*
             basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
            参数:
                1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
                2. routingKey:路由名称
                3. props:配置信息
                4. body:发送消息数据
         */
        for (int i = 0; i < 10; i++) {
            // 发送消息内容
            String body = "WorkQueue~~~~~~~~~~" + i;
            
            // 6、发送消息
            channel.basicPublish(
                    // 交换机名称。简单模式下交换机会使用默认的 ""
                    "",
                    // 路由名称
                    "WorkQueue",
                    // 配置信息
                    null,
                    // 发送消息数据
                    body.getBytes()
            );
        }
        
        
        // 释放资源
        channel.close();
        connection.close();
        
    }
}


消费者: 两个

package com.www.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者:获取消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ConsumerWorkQueues1 {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost("192.168.36.100");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost("/ljt");
        // 用户名 默认值 guest
        connectionFactory.setUsername("ljt");
        // 密码 默认值 guest
        connectionFactory.setPassword("ljt");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 5、创建队列 Queue
        /*
         queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         参数:
             1. queue:队列名称
             2. durable:是否持久化,当mq重启之后,还在
             3. exclusive:
                 * 是否独占。只能有一个消费者监听这队列
                 * 当Connection关闭时,是否删除队列
                 *
             4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
             5. arguments:参数。
         */
        // 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare(
                // 队列名称
                "WorkQueue",
                // 是否持久化,当mq重启之后,还在
                true,
                // 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
                false,
                // 是否自动删除。当没有Consumer时,自动删除掉
                false,
                // 参数
                null);
        
         /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
         */
        // 6、接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * <p>
             *   回调方法,当收到消息后,会自动执行该方法
             *      1. consumerTag:标识
             *      2. envelope:获取一些信息,交换机,路由key...
             *      3. properties:配置信息
             *      4. body:数据
             * </p>
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:" + consumerTag);
                System.out.println("Exchange:" + envelope.getExchange());
                System.out.println("RoutingKey:" + envelope.getRoutingKey());
                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body));
            }
        };
        channel.basicConsume(
                // 队列名称
                "WorkQueue",
                // 是否自动确认
                true,
                // 回调对象
                consumer
        );
        
        // 不需要关闭资源
    }
}


订阅模式

img

生产者

package com.www.mq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者:发送消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ProducerPubSub {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost("192.168.36.100");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost("/ljt");
        // 用户名 默认值 guest
        connectionFactory.setUsername("ljt");
        // 密码 默认值 guest
        connectionFactory.setPassword("ljt");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 5、创建交换机
        /*
           exchangeDeclare(
            String exchange,BuiltinExchangeType type,
            boolean durable, boolean autoDelete,
            boolean internal, Map<String, Object> arguments
            )
           参数:
            1. exchange:交换机名称
            2. type:交换机类型
                DIRECT("direct"),:定向
                FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
                TOPIC("topic"),通配符的方式
                HEADERS("headers");参数匹配
            3. durable:是否持久化
            4. autoDelete:自动删除
            5. internal:内部使用。 一般false
            6. arguments:参数
        */
        String exchangeName = "test_fanout";
        channel.exchangeDeclare(
                // 交换机名称
                exchangeName,
                // type:交换机类型
                //   DIRECT("direct"),:定向
                //   FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
                //   TOPIC("topic"),通配符的方式
                //   HEADERS("headers");参数匹配
                BuiltinExchangeType.FANOUT,
                // 是否持久化
                true,
                // 内部使用
                false,
                // 参数
                null
        
        );
        
        // 6、创建队列
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        channel.queueDeclare(
                // 队列名
                queue1Name,
                // 是否持久
                true,
                // 是否独占
                false,
                //是否自动删除
                false,
                // 参数
                null
        );
        channel.queueDeclare(
                // 队列名
                queue2Name,
                // 是否持久
                true,
                // 是否独占
                false,
                //是否自动删除
                false,
                // 参数
                null
        );
        
        // 7、绑定队列和交换机
         /*
            queueBind(String queue, String exchange, String routingKey)
            参数:
                1. queue:队列名称
                2. exchange:交换机名称
                3. routingKey:路由键,绑定规则
                    如果交换机的类型为fanout ,routingKey设置为""
         */
        channel.queueBind(
                // 队列名
                queue1Name,
                // 交换机名
                exchangeName,
                //  routingKey:路由键,绑定规则
                //    如果交换机的类型为fanout ,routingKey设置为""
                ""
        );
        channel.queueBind(
                // 队列名
                queue2Name,
                // 交换机名
                exchangeName,
                //  routingKey:路由键,绑定规则
                //    如果交换机的类型为fanout ,routingKey设置为""
                ""
        );
        
         /*
             basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
            参数:
                1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
                2. routingKey:路由名称
                3. props:配置信息
                4. body:发送消息数据
         */
        // 发送消息内容
        String body = "日志信息:张三调用了findAll方法...日志级别:info...";
        // 8、发送消息
        channel.basicPublish(
                // 交换机名称。简单模式下交换机会使用默认的 ""
                exchangeName,
                // 路由名称 :如果交换机的类型为fanout ,routingKey设置为""
                "",
                // 配置信息
                null,
                // 发送消息数据
                body.getBytes()
        );
        
        
        // 9、释放资源
        channel.close();
        connection.close();
        
    }
}


消费者1

package com.www.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者:获取消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ConsumerPubSub1 {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost("192.168.36.100");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost("/ljt");
        // 用户名 默认值 guest
        connectionFactory.setUsername("ljt");
        // 密码 默认值 guest
        connectionFactory.setPassword("ljt");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 队列名
        String queue1Name = "test_fanout_queue1";
       
        
         /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
         */
        // 5、接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * <p>
             *   回调方法,当收到消息后,会自动执行该方法
             *      1. consumerTag:标识
             *      2. envelope:获取一些信息,交换机,路由key...
             *      3. properties:配置信息
             *      4. body:数据
             * </p>
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:" + consumerTag);
                System.out.println("Exchange:" + envelope.getExchange());
                System.out.println("RoutingKey:" + envelope.getRoutingKey());
                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body));
            }
        };
        channel.basicConsume(
                // 队列名称
                queue1Name,
                // 是否自动确认
                true,
                // 回调对象
                consumer
        );
        
        // 不需要关闭资源
    }
}


消费者2

package com.www.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者:获取消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ConsumerPubSub2 {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost("192.168.36.100");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost("/ljt");
        // 用户名 默认值 guest
        connectionFactory.setUsername("ljt");
        // 密码 默认值 guest
        connectionFactory.setPassword("ljt");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 队列名
        String queue2Name = "test_fanout_queue2";
       
        
         /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
         */
        // 5、接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * <p>
             *   回调方法,当收到消息后,会自动执行该方法
             *      1. consumerTag:标识
             *      2. envelope:获取一些信息,交换机,路由key...
             *      3. properties:配置信息
             *      4. body:数据
             * </p>
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:" + consumerTag);
                System.out.println("Exchange:" + envelope.getExchange());
                System.out.println("RoutingKey:" + envelope.getRoutingKey());
                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body));
            }
        };
        channel.basicConsume(
                // 队列名称
                queue2Name,
                // 是否自动确认
                true,
                // 回调对象
                consumer
        );
        
        // 不需要关闭资源
    }
}


Routing 路由模式

img

img

生产者

package com.www.mq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Routing 工作模式
 * <p>
 * 生产者:发送消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ProducerRouting {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost("192.168.36.100");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost("/ljt");
        // 用户名 默认值 guest
        connectionFactory.setUsername("ljt");
        // 密码 默认值 guest
        connectionFactory.setPassword("ljt");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 5、创建交换机
        /*
           exchangeDeclare(
            String exchange,BuiltinExchangeType type,
            boolean durable, boolean autoDelete,
            boolean internal, Map<String, Object> arguments
            )
           参数:
            1. exchange:交换机名称
            2. type:交换机类型
                DIRECT("direct"),:定向
                FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
                TOPIC("topic"),通配符的方式
                HEADERS("headers");参数匹配
            3. durable:是否持久化
            4. autoDelete:自动删除
            5. internal:内部使用。 一般false
            6. arguments:参数
        */
        String exchangeName = "test_direct";
        channel.exchangeDeclare(
                // 交换机名称
                exchangeName,
                // type:交换机类型
                //   DIRECT("direct"),:定向
                //   FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
                //   TOPIC("topic"),通配符的方式
                //   HEADERS("headers");参数匹配
                BuiltinExchangeType.DIRECT,
                // 是否持久化
                true,
                // 内部使用
                false,
                // 参数
                null
        
        );
        
        // 6、创建队列
        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";
        channel.queueDeclare(
                // 队列名
                queue1Name,
                // 是否持久
                true,
                // 是否独占
                false,
                //是否自动删除
                false,
                // 参数
                null
        );
        channel.queueDeclare(
                // 队列名
                queue2Name,
                // 是否持久
                true,
                // 是否独占
                false,
                //是否自动删除
                false,
                // 参数
                null
        );
        
        // 7、绑定队列和交换机
         /*
            queueBind(String queue, String exchange, String routingKey)
            参数:
                1. queue:队列名称
                2. exchange:交换机名称
                3. routingKey:路由键,绑定规则
                    如果交换机的类型为fanout ,routingKey设置为""
         */
        // 队列1
        channel.queueBind(
                // 队列名
                queue1Name,
                // 交换机名
                exchangeName,
                //  routingKey:路由键,绑定规则
                //    如果交换机的类型为fanout ,routingKey设置为""
                "error"
        );
        // 队列2
        channel.queueBind(
                // 队列名
                queue2Name,
                // 交换机名
                exchangeName,
                //  routingKey:路由键,绑定规则
                //    如果交换机的类型为fanout ,routingKey设置为""
                "info"
        );
        channel.queueBind(
                // 队列名
                queue2Name,
                // 交换机名
                exchangeName,
                //  routingKey:路由键,绑定规则
                //    如果交换机的类型为fanout ,routingKey设置为""
                "error"
        );
        channel.queueBind(
                // 队列名
                queue2Name,
                // 交换机名
                exchangeName,
                //  routingKey:路由键,绑定规则
                //    如果交换机的类型为fanout ,routingKey设置为""
                "waring"
        );
        
         /*
             basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
            参数:
                1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
                2. routingKey:路由名称
                3. props:配置信息
                4. body:发送消息数据
         */
        // 发送消息内容
        String body = "日志信息:张三调用了delete方法...警告。。。日志级别:waring...";
        // 8、发送消息
        channel.basicPublish(
                // 交换机名称。简单模式下交换机会使用默认的 ""
                exchangeName,
                // 路由名称 :如果交换机的类型为fanout ,routingKey设置为""
                "waring",
                // 配置信息
                null,
                // 发送消息数据
                body.getBytes()
        );
        
        
        // 9、释放资源
        channel.close();
        connection.close();
        
    }
}


消费者 1

package com.www.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者:获取消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ConsumerRouting1 {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost("192.168.36.100");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost("/ljt");
        // 用户名 默认值 guest
        connectionFactory.setUsername("ljt");
        // 密码 默认值 guest
        connectionFactory.setPassword("ljt");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 队列名
        String queue2Name = "test_direct_queue1";
         /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
         */
        // 5、接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * <p>
             *   回调方法,当收到消息后,会自动执行该方法
             *      1. consumerTag:标识
             *      2. envelope:获取一些信息,交换机,路由key...
             *      3. properties:配置信息
             *      4. body:数据
             * </p>
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:" + consumerTag);
                System.out.println("Exchange:" + envelope.getExchange());
                System.out.println("RoutingKey:" + envelope.getRoutingKey());
                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body));
            }
        };
        channel.basicConsume(
                // 队列名称
                queue2Name,
                // 是否自动确认
                true,
                // 回调对象
                consumer
        );
        
        // 不需要关闭资源
    }
}


消费者2

package com.www.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者:获取消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ConsumerRouting2 {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost("192.168.36.100");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost("/ljt");
        // 用户名 默认值 guest
        connectionFactory.setUsername("ljt");
        // 密码 默认值 guest
        connectionFactory.setPassword("ljt");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 队列名
        String queue2Name = "test_direct_queue2";
         /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
         */
        // 5、接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * <p>
             *   回调方法,当收到消息后,会自动执行该方法
             *      1. consumerTag:标识
             *      2. envelope:获取一些信息,交换机,路由key...
             *      3. properties:配置信息
             *      4. body:数据
             * </p>
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:" + consumerTag);
                System.out.println("Exchange:" + envelope.getExchange());
                System.out.println("RoutingKey:" + envelope.getRoutingKey());
                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body));
            }
        };
        channel.basicConsume(
                // 队列名称
                queue2Name,
                // 是否自动确认
                true,
                // 回调对象
                consumer
        );
        
        // 不需要关闭资源
    }
}

标签:connectionFactory,String,队列,RabbitMQ,import,默认值,channel
From: https://www.cnblogs.com/wwwljt/p/17128169.html

相关文章

  • rabbitmq
    rabbitmq基础简介由LShift提供的一个消息队列协议(AMQP)的开源实现,由以高性能、健壮以及可伸缩性出名的Erlangrabbitmq包含的关键字消息队列使用过程组成部......
  • 简单-SpringBoot整合RabbitMQ
    目录1.windows下安装erlang环境和rabbitMq服务1.1客户端页面2.准备工作2.1pom依赖2.1启动类注解开启:@EnableRabbit2.2application配置文件3.队列的简单使用3.1配置交换器......
  • RabbitMQ-消息中间键
    MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。快速入门1.publisher实现publicclassPublisherTest{@Testpubl......
  • RabbitMQ 访问被拒“NOT_ALLOWED - access to vhost '/' refused for user”
    RabbitMQ访问被拒“NOT_ALLOWED-accesstovhost'/'refusedforuser”原因是用户未分配权限一、用户角色说明1、超级管理员(administrator)可登陆管理控制台,......
  • RabbitMQ重启后数据和用户丢失
    1、问题描述:在重启RabbitMQ服务后,数据丢失,用户丢失。2、问题原因:在RabbitMQ服务启动后,对主机名进行过修改,而修改主机名会导致数据存储路径发生变化,如果不重启RabbitMQ不会......
  • .NET为什么推荐RabbitMQ消息队列作为首选开发工具
    支持.NetCore(2.0及以上)/.NetFramework(4.5及以上),可以部署在Docker,Windows,Linux,Mac。RabbitMQ作为一款主流的消息队列工具早已广受欢迎。相比于其它的MQ工具,Ra......
  • .NET为什么推荐它作为RabbitMQ消息队列的首选开发工具
    支持.NetCore(2.0及以上)/.NetFramework(4.5及以上),可以部署在Docker,Windows,Linux,Mac。RabbitMQ作为一款主流的消息队列工具早已广受欢迎。相比于其它的MQ工具,Rab......
  • RabbitMQ系列(一)--------安装配置
    安装RabbitMQ 前提先要配置好环境 需要先安装Erlang 1修改主机名    RabbitMQ是通过主机名进行访问的,必须指定能访问的主机名。    命令: #vim......
  • RabbitMQ 快速入门
    RabbitMQ是一款实现了AMQP协议的消息中间件,使用Erlang编写。这篇博客简单介绍一下RabbitMQ,但不介绍特定库的API核心概念RabbitMQ存在一下概念,清楚了一下概念也......
  • Spring Boot RabbitMQ 应用场景
    1.前言消息队列是一个容器,可以对程序产生的消息进行存储。消息队列的主要用途是削峰、异步、解耦,我们用一个实际场景来解释下。有一家果汁生产企业,张三是采购员,负责采购......