RabbitMQ
Messaging that just works — RabbitMQ
案例
pom.xml
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
生产者
package com.www.mq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生产者:发送消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ProducerHelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost("192.168.36.100");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost("/ljt");
// 用户名 默认值 guest
connectionFactory.setUsername("ljt");
// 密码 默认值 guest
connectionFactory.setPassword("ljt");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 5、创建队列 Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
// 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare(
// 队列名称
"hello_world",
// 是否持久化,当mq重启之后,还在
true,
// 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
false,
// 是否自动删除。当没有Consumer时,自动删除掉
false,
// 参数
null
);
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
2. routingKey:路由名称
3. props:配置信息
4. body:发送消息数据
*/
// 发送消息内容
String body = "HelloWorld~~~~~~~~~~";
// 6、发送消息
channel.basicPublish(
// 交换机名称。简单模式下交换机会使用默认的 ""
"",
// 路由名称
"hello_world",
// 配置信息
null,
// 发送消息数据
body.getBytes()
);
// 释放资源
channel.close();
connection.close();
}
}
消费者
package com.www.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:获取消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ConsumerHelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost("192.168.36.100");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost("/ljt");
// 用户名 默认值 guest
connectionFactory.setUsername("ljt");
// 密码 默认值 guest
connectionFactory.setPassword("ljt");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 5、创建队列 Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
// 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare(
// 队列名称
"hello_world",
// 是否持久化,当mq重启之后,还在
true,
// 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
false,
// 是否自动删除。当没有Consumer时,自动删除掉
false,
// 参数
null);
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 6、接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* <p>
* 回调方法,当收到消息后,会自动执行该方法
* 1. consumerTag:标识
* 2. envelope:获取一些信息,交换机,路由key...
* 3. properties:配置信息
* 4. body:数据
* </p>
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
};
channel.basicConsume(
// 队列名称
"hello_world",
// 是否自动确认
true,
// 回调对象
consumer
);
// 不需要关闭资源
}
}
工作队列
生产者
package com.www.mq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生产者:发送消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ProducerWorkQueues {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost("192.168.36.100");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost("/ljt");
// 用户名 默认值 guest
connectionFactory.setUsername("ljt");
// 密码 默认值 guest
connectionFactory.setPassword("ljt");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 5、创建队列 Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
// 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare(
// 队列名称
"WorkQueue",
// 是否持久化,当mq重启之后,还在
true,
// 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
false,
// 是否自动删除。当没有Consumer时,自动删除掉
false,
// 参数
null
);
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
2. routingKey:路由名称
3. props:配置信息
4. body:发送消息数据
*/
for (int i = 0; i < 10; i++) {
// 发送消息内容
String body = "WorkQueue~~~~~~~~~~" + i;
// 6、发送消息
channel.basicPublish(
// 交换机名称。简单模式下交换机会使用默认的 ""
"",
// 路由名称
"WorkQueue",
// 配置信息
null,
// 发送消息数据
body.getBytes()
);
}
// 释放资源
channel.close();
connection.close();
}
}
消费者: 两个
package com.www.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:获取消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ConsumerWorkQueues1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost("192.168.36.100");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost("/ljt");
// 用户名 默认值 guest
connectionFactory.setUsername("ljt");
// 密码 默认值 guest
connectionFactory.setPassword("ljt");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 5、创建队列 Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
// 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare(
// 队列名称
"WorkQueue",
// 是否持久化,当mq重启之后,还在
true,
// 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
false,
// 是否自动删除。当没有Consumer时,自动删除掉
false,
// 参数
null);
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 6、接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* <p>
* 回调方法,当收到消息后,会自动执行该方法
* 1. consumerTag:标识
* 2. envelope:获取一些信息,交换机,路由key...
* 3. properties:配置信息
* 4. body:数据
* </p>
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
};
channel.basicConsume(
// 队列名称
"WorkQueue",
// 是否自动确认
true,
// 回调对象
consumer
);
// 不需要关闭资源
}
}
订阅模式
生产者
package com.www.mq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生产者:发送消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ProducerPubSub {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost("192.168.36.100");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost("/ljt");
// 用户名 默认值 guest
connectionFactory.setUsername("ljt");
// 密码 默认值 guest
connectionFactory.setPassword("ljt");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 5、创建交换机
/*
exchangeDeclare(
String exchange,BuiltinExchangeType type,
boolean durable, boolean autoDelete,
boolean internal, Map<String, Object> arguments
)
参数:
1. exchange:交换机名称
2. type:交换机类型
DIRECT("direct"),:定向
FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
TOPIC("topic"),通配符的方式
HEADERS("headers");参数匹配
3. durable:是否持久化
4. autoDelete:自动删除
5. internal:内部使用。 一般false
6. arguments:参数
*/
String exchangeName = "test_fanout";
channel.exchangeDeclare(
// 交换机名称
exchangeName,
// type:交换机类型
// DIRECT("direct"),:定向
// FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
// TOPIC("topic"),通配符的方式
// HEADERS("headers");参数匹配
BuiltinExchangeType.FANOUT,
// 是否持久化
true,
// 内部使用
false,
// 参数
null
);
// 6、创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(
// 队列名
queue1Name,
// 是否持久
true,
// 是否独占
false,
//是否自动删除
false,
// 参数
null
);
channel.queueDeclare(
// 队列名
queue2Name,
// 是否持久
true,
// 是否独占
false,
//是否自动删除
false,
// 参数
null
);
// 7、绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/
channel.queueBind(
// 队列名
queue1Name,
// 交换机名
exchangeName,
// routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为""
""
);
channel.queueBind(
// 队列名
queue2Name,
// 交换机名
exchangeName,
// routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为""
""
);
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
2. routingKey:路由名称
3. props:配置信息
4. body:发送消息数据
*/
// 发送消息内容
String body = "日志信息:张三调用了findAll方法...日志级别:info...";
// 8、发送消息
channel.basicPublish(
// 交换机名称。简单模式下交换机会使用默认的 ""
exchangeName,
// 路由名称 :如果交换机的类型为fanout ,routingKey设置为""
"",
// 配置信息
null,
// 发送消息数据
body.getBytes()
);
// 9、释放资源
channel.close();
connection.close();
}
}
消费者1
package com.www.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:获取消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ConsumerPubSub1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost("192.168.36.100");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost("/ljt");
// 用户名 默认值 guest
connectionFactory.setUsername("ljt");
// 密码 默认值 guest
connectionFactory.setPassword("ljt");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 队列名
String queue1Name = "test_fanout_queue1";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 5、接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* <p>
* 回调方法,当收到消息后,会自动执行该方法
* 1. consumerTag:标识
* 2. envelope:获取一些信息,交换机,路由key...
* 3. properties:配置信息
* 4. body:数据
* </p>
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
};
channel.basicConsume(
// 队列名称
queue1Name,
// 是否自动确认
true,
// 回调对象
consumer
);
// 不需要关闭资源
}
}
消费者2
package com.www.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:获取消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ConsumerPubSub2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost("192.168.36.100");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost("/ljt");
// 用户名 默认值 guest
connectionFactory.setUsername("ljt");
// 密码 默认值 guest
connectionFactory.setPassword("ljt");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 队列名
String queue2Name = "test_fanout_queue2";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 5、接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* <p>
* 回调方法,当收到消息后,会自动执行该方法
* 1. consumerTag:标识
* 2. envelope:获取一些信息,交换机,路由key...
* 3. properties:配置信息
* 4. body:数据
* </p>
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
};
channel.basicConsume(
// 队列名称
queue2Name,
// 是否自动确认
true,
// 回调对象
consumer
);
// 不需要关闭资源
}
}
Routing 路由模式
生产者
package com.www.mq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Routing 工作模式
* <p>
* 生产者:发送消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ProducerRouting {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost("192.168.36.100");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost("/ljt");
// 用户名 默认值 guest
connectionFactory.setUsername("ljt");
// 密码 默认值 guest
connectionFactory.setPassword("ljt");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 5、创建交换机
/*
exchangeDeclare(
String exchange,BuiltinExchangeType type,
boolean durable, boolean autoDelete,
boolean internal, Map<String, Object> arguments
)
参数:
1. exchange:交换机名称
2. type:交换机类型
DIRECT("direct"),:定向
FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
TOPIC("topic"),通配符的方式
HEADERS("headers");参数匹配
3. durable:是否持久化
4. autoDelete:自动删除
5. internal:内部使用。 一般false
6. arguments:参数
*/
String exchangeName = "test_direct";
channel.exchangeDeclare(
// 交换机名称
exchangeName,
// type:交换机类型
// DIRECT("direct"),:定向
// FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
// TOPIC("topic"),通配符的方式
// HEADERS("headers");参数匹配
BuiltinExchangeType.DIRECT,
// 是否持久化
true,
// 内部使用
false,
// 参数
null
);
// 6、创建队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
channel.queueDeclare(
// 队列名
queue1Name,
// 是否持久
true,
// 是否独占
false,
//是否自动删除
false,
// 参数
null
);
channel.queueDeclare(
// 队列名
queue2Name,
// 是否持久
true,
// 是否独占
false,
//是否自动删除
false,
// 参数
null
);
// 7、绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/
// 队列1
channel.queueBind(
// 队列名
queue1Name,
// 交换机名
exchangeName,
// routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为""
"error"
);
// 队列2
channel.queueBind(
// 队列名
queue2Name,
// 交换机名
exchangeName,
// routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为""
"info"
);
channel.queueBind(
// 队列名
queue2Name,
// 交换机名
exchangeName,
// routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为""
"error"
);
channel.queueBind(
// 队列名
queue2Name,
// 交换机名
exchangeName,
// routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为""
"waring"
);
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
2. routingKey:路由名称
3. props:配置信息
4. body:发送消息数据
*/
// 发送消息内容
String body = "日志信息:张三调用了delete方法...警告。。。日志级别:waring...";
// 8、发送消息
channel.basicPublish(
// 交换机名称。简单模式下交换机会使用默认的 ""
exchangeName,
// 路由名称 :如果交换机的类型为fanout ,routingKey设置为""
"waring",
// 配置信息
null,
// 发送消息数据
body.getBytes()
);
// 9、释放资源
channel.close();
connection.close();
}
}
消费者 1
package com.www.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:获取消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ConsumerRouting1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost("192.168.36.100");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost("/ljt");
// 用户名 默认值 guest
connectionFactory.setUsername("ljt");
// 密码 默认值 guest
connectionFactory.setPassword("ljt");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 队列名
String queue2Name = "test_direct_queue1";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 5、接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* <p>
* 回调方法,当收到消息后,会自动执行该方法
* 1. consumerTag:标识
* 2. envelope:获取一些信息,交换机,路由key...
* 3. properties:配置信息
* 4. body:数据
* </p>
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
};
channel.basicConsume(
// 队列名称
queue2Name,
// 是否自动确认
true,
// 回调对象
consumer
);
// 不需要关闭资源
}
}
消费者2
package com.www.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:获取消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ConsumerRouting2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost("192.168.36.100");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost("/ljt");
// 用户名 默认值 guest
connectionFactory.setUsername("ljt");
// 密码 默认值 guest
connectionFactory.setPassword("ljt");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 队列名
String queue2Name = "test_direct_queue2";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 5、接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* <p>
* 回调方法,当收到消息后,会自动执行该方法
* 1. consumerTag:标识
* 2. envelope:获取一些信息,交换机,路由key...
* 3. properties:配置信息
* 4. body:数据
* </p>
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
};
channel.basicConsume(
// 队列名称
queue2Name,
// 是否自动确认
true,
// 回调对象
consumer
);
// 不需要关闭资源
}
}
标签:connectionFactory,String,队列,RabbitMQ,import,默认值,channel
From: https://www.cnblogs.com/wwwljt/p/17128169.html