1.消息队列
点到点模式
发布订阅模式
2.rabbitMQ简介
3.AMQP协议
4.工作原理图及核心概念
原理图
核心概念
Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
Connection:publisher/consumer 和 broker 之间的 TCP 连接
Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
Queue:消息最终被送到这里等待 consumer 取走
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
5.单机版下载搭建
准备工作
克隆虚拟机,配置IP和host,使用xshell链接
安装配置
1,安装erlang:
rpm -ivh /opt/software/erlang-23.2.1-1.el7.x86_64.rpm
#rpm =redhat package manager 小红帽包管理程序,业界常用linux系统的 软件管理,支持各种 rpm软件的安装,查询,升级,卸载等。
-i install 安装
-v verbose 显示详情
-h hash 以###的方式显示进度
查询:
rpm -qa |grep erlang
卸载:
rpm -e erlang-23.2.1-1.el7.x86_64 (上面查询结果)
2,安装依赖:
socat是一个多功能的网络工具,名字来由是Socket CAT,socat支持多协议,用于协议处理,端口转发,rabbitmq依赖于socat,因此在安装rabbitmq前要安装socat。
yum install socat -y
3,安装rabbitmq:
rpm -ivh /opt/software/rabbitmq-server-3.8.30-1.el7.noarch.rpm
4,查看服务状态
systemctl status rabbitmq-server
或者:
service rabbitmq-server status
6.界面及用户权限管理
启动插件,让UI功能可以使用:
rabbitmq-plugins enable rabbitmq_management
启动rabbitmq服务,使用web页面访问:
systemctl start rabbitmq-server
http://192.168.xxx.30:15672/
创建virtaul host :
rabbitmqctl add_vhost /root/
rabbitmqctl add_vhost /scott/
查询:
rabbitmqctl list_vhosts
删除:
rabbitmqctl delete_vhost /scott/
创建用户(root用户名 tiger 密码):
rabbitmqctl add_user root tiger
rabbitmqctl add_user scott 123456
设置角色:
rabbitmqctl set_user_tags root administrator
rabbitmqctl set_user_tags scott administrator
设置权限:
rabbitmqctl set_permissions -p "/root/" root ".*" ".*" ".*"
rabbitmqctl set_permissions -p "/scott/" scott ".*" ".*" ".*"
#设置用户权限(设置/根路径下的 配置 读 写权限)
重复上面操作,创建多个VH和用户。
登录:
http://192.168.xxx.30:15672/
使用正确的用户名和密码,登录后,可以看到首页
5种工作模式
https://www.rabbitmq.com/tutorials
准备工作
创建项目,引入jar包
https://mvnrepository.com/search?q=rabbitmq
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
1.简单模式
简介
实现简单的消息发送和接受
生产者
package com.aaa.rabbitmq.demo1;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @FileName: Producer
* @Description:
* @Author: zhz
* @CreateTime: 2024/12/10 9:02
* @Version: 1.0.0
*/
public class Producer {
//定义常量是不能更改
public static final String QUEUE_NAME = "queue1";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置创建工厂的参数
//链接主机IP
connectionFactory.setHost("192.168.170.30");
//设置vhost
connectionFactory.setVirtualHost("/root/");
//设置用户和密码
connectionFactory.setUsername("root");
connectionFactory.setPassword("tiger");
//设置端口号
connectionFactory.setPort(5672);
//使用链接工厂创建连接
connection = connectionFactory.newConnection();
//使用链接创建channel
channel = connection.createChannel();
//使用channel定义队列
//String queue 队列名称,
// boolean durable 是否持久化(是否存储到磁盘中),
// boolean exclusive 是否被多个消费者消费 false 允许多个消费者,
// boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
// Map<String, Object> arguments 定义其他参数
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//定义发送的消息
String sendMsg = "hello qy1781!!!";
//发送消息
//String exchange 交换机名称, String routingKey 路由键, AMQP.BasicProperties props 发送时传递其他参数, byte[] body 消息的字节数组
//交换机名称为空串“”, 不是不使用交换机,而是使用默认交换机
channel.basicPublish("",QUEUE_NAME,null,sendMsg.getBytes());
//打印提示
System.out.println("发送完毕!");
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
} finally {
try {
//关闭资源
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
}
消费者
package com.aaa.rabbitmq.demo1;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @FileName: Consumer
* @Description:
* @Author: zhz
* @CreateTime: 2024/12/10 9:36
* @Version: 1.0.0
*/
public class Consumer {
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置创建工厂的参数
//链接主机IP
connectionFactory.setHost("192.168.170.30");
//设置vhost
connectionFactory.setVirtualHost("/root/");
//设置用户和密码
connectionFactory.setUsername("root");
connectionFactory.setPassword("tiger");
//设置端口号
connectionFactory.setPort(5672);
//使用链接工厂创建连接
connection = connectionFactory.newConnection();
//使用链接创建channel
channel = connection.createChannel();
//使用channel定义队列
//String queue 队列名称,
// boolean durable 是否持久化(是否存储到磁盘中),
// boolean exclusive 是否被多个消费者消费 false 允许多个消费者,
// boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
// Map<String, Object> arguments 定义其他参数
channel.queueDeclare(Producer.QUEUE_NAME,false,false,false,null);
//使用channel获取消息 amq.ctag-E0c-YqMVGbLBAm9ZtT2SfA
//String queue 队列名称,
// boolean autoAck 自动确定 消费者一旦拿到消息,自动确认拿到了,
// DeliverCallback deliverCallback 交付回调,
// CancelCallback cancelCallback 取消回调
/* channel.basicConsume(Producer.QUEUE_NAME,(String s, Delivery delivery)->{
System.out.println("回调处理信息:"+s);
//delivery交付对象,存储消息的所有信息 delivery.getBody()消息的字节数组
System.out.println("消费的消息为:"+new String(delivery.getBody()));
//消息的其他信息:
Envelope envelope = delivery.getEnvelope();
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("路由键routingKey:"+routingKey);
System.out.println("交换机exchange:"+exchange);
System.out.println("消息唯一的数字标识:"+deliveryTag);
},(String s)->{
System.out.println("回调处理信息:"+s);});*/
//使用内部类,实例化接口
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("回调处理信息:"+s);
//delivery交付对象,存储消息的所有信息 delivery.getBody()消息的字节数组
System.out.println("消费的消息为:"+new String(delivery.getBody()));
//消息的其他信息:
Envelope envelope = delivery.getEnvelope();
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("路由键routingKey:"+routingKey);
System.out.println("交换机exchange:"+exchange);
System.out.println("消息唯一的数字标识:"+deliveryTag);
}
};
CancelCallback cancelCallback = new CancelCallback(){
@Override
public void handle(String s) throws IOException {
System.out.println("回调处理信息:"+s);
}
};
//消费者消费消息不是一次性的,所以消费者不能关闭资源
channel.basicConsume(Producer.QUEUE_NAME,true, deliverCallback,cancelCallback);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
} finally {
/*try {
//关闭资源
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}*/
}
}
}
工作队列
简介
创建一个工作队列,用于在多个消费者之间分配(竞争轮询)耗时的任务。从而提高整个任务执行效率。
package com.aaa.rabbitmq.demo2;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @FileName: Producer
* @Description: work queue 当有多个耗时任务需要执行,生产者发布任务到队列,多个消费者竞争轮询方式,执行任务,提高效率
* @Author: zhz
* @CreateTime: 2024/12/10 9:02
* @Version: 1.0.0
*/
public class Producer {
//定义常量是不能更改
public static final String QUEUE_NAME = "queue2";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置创建工厂的参数
//链接主机IP
connectionFactory.setHost("192.168.170.30");
//设置vhost
connectionFactory.setVirtualHost("/root/");
//设置用户和密码
connectionFactory.setUsername("root");
connectionFactory.setPassword("tiger");
//设置端口号
connectionFactory.setPort(5672);
//使用链接工厂创建连接
connection = connectionFactory.newConnection();
//使用链接创建channel
channel = connection.createChannel();
//使用channel定义队列
//String queue 队列名称,
// boolean durable 是否持久化(是否存储到磁盘中),
// boolean exclusive 是否被多个消费者消费 false 允许多个消费者,
// boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
// Map<String, Object> arguments 定义其他参数
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
for (int i = 1; i <= 10; i++) {
//定义发送的消息
String sendMsg = "hello qy178!!!"+i;
//发送消息
//String exchange 交换机名称, String routingKey 路由键, AMQP.BasicProperties props 发送时传递其他参数, byte[] body 消息的字节数组
//交换机名称为空串“”, 不是不使用交换机,而是使用默认交换机
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,sendMsg.getBytes());
}
//打印提示
System.out.println("发送完毕!");
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
} finally {
try {
//关闭资源
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
}
消费者1
package com.aaa.rabbitmq.demo2;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @FileName: Consumer
* @Description:
* @Author: zhz
* @CreateTime: 2024/12/10 9:36
* @Version: 1.0.0
*/
public class ConsumerA {
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置创建工厂的参数
//链接主机IP
connectionFactory.setHost("192.168.170.30");
//设置vhost
connectionFactory.setVirtualHost("/root/");
//设置用户和密码
connectionFactory.setUsername("root");
connectionFactory.setPassword("tiger");
//设置端口号
connectionFactory.setPort(5672);
//使用链接工厂创建连接
connection = connectionFactory.newConnection();
//使用链接创建channel
channel = connection.createChannel();
//使用channel定义队列
//String queue 队列名称,
// boolean durable 是否持久化(是否存储到磁盘中),
// boolean exclusive 是否被多个消费者消费 false 允许多个消费者,
// boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
// Map<String, Object> arguments 定义其他参数
channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null);
//使用channel获取消息 amq.ctag-E0c-YqMVGbLBAm9ZtT2SfA
//String queue 队列名称,
// boolean autoAck 自动确定 消费者一旦拿到消息,自动确认拿到了,
// DeliverCallback deliverCallback 交付回调,
// CancelCallback cancelCallback 取消回调
/* channel.basicConsume(Producer.QUEUE_NAME,(String s, Delivery delivery)->{
System.out.println("回调处理信息:"+s);
//delivery交付对象,存储消息的所有信息 delivery.getBody()消息的字节数组
System.out.println("消费的消息为:"+new String(delivery.getBody()));
//消息的其他信息:
Envelope envelope = delivery.getEnvelope();
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("路由键routingKey:"+routingKey);
System.out.println("交换机exchange:"+exchange);
System.out.println("消息唯一的数字标识:"+deliveryTag);
},(String s)->{
System.out.println("回调处理信息:"+s);});*/
//使用内部类,实例化接口
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("回调处理信息:"+s);
//delivery交付对象,存储消息的所有信息 delivery.getBody()消息的字节数组
System.out.println("消费的消息为:"+new String(delivery.getBody()));
//消息的其他信息:
Envelope envelope = delivery.getEnvelope();
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("路由键routingKey:"+routingKey);
System.out.println("交换机exchange:"+exchange);
System.out.println("消息唯一的数字标识:"+deliveryTag);
}
};
CancelCallback cancelCallback = new CancelCallback(){
@Override
public void handle(String s) throws IOException {
System.out.println("回调处理信息:"+s);
}
};
//消费者消费消息不是一次性的,所以消费者不能关闭资源
channel.basicConsume(Producer.QUEUE_NAME,true, deliverCallback,cancelCallback);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
} finally {
/*try {
//关闭资源
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}*/
}
}
}
消费者2
package com.aaa.rabbitmq.demo2;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @FileName: Consumer
* @Description:
* @Author: zhz
* @CreateTime: 2024/12/10 9:36
* @Version: 1.0.0
*/
public class ConsumerB {
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置创建工厂的参数
//链接主机IP
connectionFactory.setHost("192.168.170.30");
//设置vhost
connectionFactory.setVirtualHost("/root/");
//设置用户和密码
connectionFactory.setUsername("root");
connectionFactory.setPassword("tiger");
//设置端口号
connectionFactory.setPort(5672);
//使用链接工厂创建连接
connection = connectionFactory.newConnection();
//使用链接创建channel
channel = connection.createChannel();
//使用channel定义队列
//String queue 队列名称,
// boolean durable 是否持久化(是否存储到磁盘中),
// boolean exclusive 是否被多个消费者消费 false 允许多个消费者,
// boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
// Map<String, Object> arguments 定义其他参数
channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null);
//使用channel获取消息 amq.ctag-E0c-YqMVGbLBAm9ZtT2SfA
//String queue 队列名称,
// boolean autoAck 自动确定 消费者一旦拿到消息,自动确认拿到了,
// DeliverCallback deliverCallback 交付回调,
// CancelCallback cancelCallback 取消回调
/* channel.basicConsume(Producer.QUEUE_NAME,(String s, Delivery delivery)->{
System.out.println("回调处理信息:"+s);
//delivery交付对象,存储消息的所有信息 delivery.getBody()消息的字节数组
System.out.println("消费的消息为:"+new String(delivery.getBody()));
//消息的其他信息:
Envelope envelope = delivery.getEnvelope();
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("路由键routingKey:"+routingKey);
System.out.println("交换机exchange:"+exchange);
System.out.println("消息唯一的数字标识:"+deliveryTag);
},(String s)->{
System.out.println("回调处理信息:"+s);});*/
//使用内部类,实例化接口
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("回调处理信息:"+s);
//delivery交付对象,存储消息的所有信息 delivery.getBody()消息的字节数组
System.out.println("消费的消息为:"+new String(delivery.getBody()));
//消息的其他信息:
Envelope envelope = delivery.getEnvelope();
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("路由键routingKey:"+routingKey);
System.out.println("交换机exchange:"+exchange);
System.out.println("消息唯一的数字标识:"+deliveryTag);
}
};
CancelCallback cancelCallback = new CancelCallback(){
@Override
public void handle(String s) throws IOException {
System.out.println("回调处理信息:"+s);
}
};
//消费者消费消息不是一次性的,所以消费者不能关闭资源
channel.basicConsume(Producer.QUEUE_NAME,true, deliverCallback,cancelCallback);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
} finally {
/*try {
//关闭资源
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}*/
}
}
}
消费者3
package com.aaa.rabbitmq.demo2;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @FileName: Consumer
* @Description:
* @Author: zhz
* @CreateTime: 2024/12/10 9:36
* @Version: 1.0.0
*/
public class ConsumerC {
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置创建工厂的参数
//链接主机IP
connectionFactory.setHost("192.168.170.30");
//设置vhost
connectionFactory.setVirtualHost("/root/");
//设置用户和密码
connectionFactory.setUsername("root");
connectionFactory.setPassword("tiger");
//设置端口号
connectionFactory.setPort(5672);
//使用链接工厂创建连接
connection = connectionFactory.newConnection();
//使用链接创建channel
channel = connection.createChannel();
//使用channel定义队列
//String queue 队列名称,
// boolean durable 是否持久化(是否存储到磁盘中),
// boolean exclusive 是否被多个消费者消费 false 允许多个消费者,
// boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
// Map<String, Object> arguments 定义其他参数
channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null);
//使用channel获取消息 amq.ctag-E0c-YqMVGbLBAm9ZtT2SfA
//String queue 队列名称,
// boolean autoAck 自动确定 消费者一旦拿到消息,自动确认拿到了,
// DeliverCallback deliverCallback 交付回调,
// CancelCallback cancelCallback 取消回调
/* channel.basicConsume(Producer.QUEUE_NAME,(String s, Delivery delivery)->{
System.out.println("回调处理信息:"+s);
//delivery交付对象,存储消息的所有信息 delivery.getBody()消息的字节数组
System.out.println("消费的消息为:"+new String(delivery.getBody()));
//消息的其他信息:
Envelope envelope = delivery.getEnvelope();
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("路由键routingKey:"+routingKey);
System.out.println("交换机exchange:"+exchange);
System.out.println("消息唯一的数字标识:"+deliveryTag);
},(String s)->{
System.out.println("回调处理信息:"+s);});*/
//使用内部类,实例化接口
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("回调处理信息:"+s);
//delivery交付对象,存储消息的所有信息 delivery.getBody()消息的字节数组
System.out.println("消费的消息为:"+new String(delivery.getBody()));
//消息的其他信息:
Envelope envelope = delivery.getEnvelope();
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("路由键routingKey:"+routingKey);
System.out.println("交换机exchange:"+exchange);
System.out.println("消息唯一的数字标识:"+deliveryTag);
}
};
CancelCallback cancelCallback = new CancelCallback(){
@Override
public void handle(String s) throws IOException {
System.out.println("回调处理信息:"+s);
}
};
//消费者消费消息不是一次性的,所以消费者不能关闭资源
channel.basicConsume(Producer.QUEUE_NAME,true, deliverCallback,cancelCallback);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
} finally {
/*try {
//关闭资源
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}*/
}
}
}
发布订阅
简介
直接(direct)模式:添加一个功能 - 我们将使其可以仅订阅消息的子集。当前演示的是Exchange类型为direct使用。直接direct绑定比扇出fanout更加灵活。
需求:6种日志级别(trace,debug,info,warn,error,fatal),将能够仅将严重错误(error)消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
生产者
package com.aaa.rabbitmq.demo3;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @FileName: Producer
* @Description: 消息发布订阅模式 exchange 常用类型的fanout 同一个消息被多个消费者同时消费,相同消息处理方式不一样
* @Author: zhz
* @CreateTime: 2024/12/10 9:02
* @Version: 1.0.0
*/
public class Producer {
//定义常量是不能更改
public static final String QUEUE_NAME_3 = "queue3";
public static final String QUEUE_NAME_4 = "queue4";
public static final String QUEUE_NAME_5 = "queue5";
private static final String EXCHANGE_NAME="exchange_fanout";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置创建工厂的参数
//链接主机IP
connectionFactory.setHost("192.168.170.30");
//设置vhost
connectionFactory.setVirtualHost("/root/");
//设置用户和密码
connectionFactory.setUsername("root");
connectionFactory.setPassword("tiger");
//设置端口号
connectionFactory.setPort(5672);
//使用链接工厂创建连接
connection = connectionFactory.newConnection();
//使用链接创建channel
channel = connection.createChannel();
//使用channel定义交换机
//String exchange交换机名称, String type 类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//使用channel定义队列
//String queue 队列名称,
// boolean durable 是否持久化(是否存储到磁盘中),
// boolean exclusive 是否被多个消费者消费 false 允许多个消费者,
// boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
// Map<String, Object> arguments 定义其他参数
channel.queueDeclare(QUEUE_NAME_3,true,false,false,null);
channel.queueDeclare(QUEUE_NAME_4,true,false,false,null);
channel.queueDeclare(QUEUE_NAME_5,true,false,false,null);
//让交换和队列进行绑定
//String queue, String exchange, String routingKey
channel.queueBind(QUEUE_NAME_3,EXCHANGE_NAME,"");
channel.queueBind(QUEUE_NAME_4,EXCHANGE_NAME,"");
channel.queueBind(QUEUE_NAME_5,EXCHANGE_NAME,"");
for (int i = 1; i <= 10; i++) {
//定义发送的消息
String sendMsg = "hello qy178!!!"+i;
//发送消息
//String exchange 交换机名称,
// String routingKey 路由键,
// AMQP.BasicProperties props 发送时传递其他参数, byte[] body 消息的字节数组
//交换机名称为空串“”, 不是不使用交换机,而是使用默认交换机
channel.basicPublish(EXCHANGE_NAME,"", MessageProperties.PERSISTENT_TEXT_PLAIN,sendMsg.getBytes());
}
//打印提示
System.out.println("发送完毕!");
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
} finally {
try {
//关闭资源
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
}
消费者1
package com.aaa.rabbitmq.demo3;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @FileName: Consumer
* @Description:
* @Author: zhz
* @CreateTime: 2024/12/10 9:36
* @Version: 1.0.0
*/
public class ConsumerA {
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置创建工厂的参数
//链接主机IP
connectionFactory.setHost("192.168.170.30");
//设置vhost
connectionFactory.setVirtualHost("/root/");
//设置用户和密码
connectionFactory.setUsername("root");
connectionFactory.setPassword("tiger");
//设置端口号
connectionFactory.setPort(5672);
//使用链接工厂创建连接
connection = connectionFactory.newConnection();
//使用链接创建channel
channel = connection.createChannel();
//使用channel定义队列
//String queue 队列名称,
// boolean durable 是否持久化(是否存储到磁盘中),
// boolean exclusive 是否被多个消费者消费 false 允许多个消费者,
// boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
// Map<String, Object> arguments 定义其他参数
channel.queueDeclare(Producer.QUEUE_NAME_3,true,false,false,null);
//使用channel获取消息 amq.ctag-E0c-YqMVGbLBAm9ZtT2SfA
//String queue 队列名称,
// boolean autoAck 自动确定 消费者一旦拿到消息,自动确认拿到了,
// DeliverCallback deliverCallback 交付回调,
// CancelCallback cancelCallback 取消回调
/* channel.basicConsume(Producer.QUEUE_NAME,(String s, Delivery delivery)->{
System.out.println("回调处理信息:"+s);
//delivery交付对象,存储消息的所有信息 delivery.getBody()消息的字节数组
System.out.println("消费的消息为:"+new String(delivery.getBody()));
//消息的其他信息:
Envelope envelope = delivery.getEnvelope();
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("路由键routingKey:"+routingKey);
System.out.println("交换机exchange:"+exchange);
System.out.println("消息唯一的数字标识:"+deliveryTag);
},(String s)->{
System.out.println("回调处理信息:"+s);});*/
//使用内部类,实例化接口
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("回调处理信息:"+s);
//delivery交付对象,存储消息的所有信息 delivery.getBody()消息的字节数组
System.out.println("模拟消费者,获取日志信息:"+new String(delivery.getBody())+",模仿打印控制台。。。。");
//消息的其他信息:
Envelope envelope = delivery.getEnvelope();
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("路由键routingKey:"+routingKey);
System.out.println("交换机exchange:"+exchange);
System.out.println("消息唯一的数字标识:"+deliveryTag);
}
};
CancelCallback cancelCallback = new CancelCallback(){
@Override
public void handle(String s) throws IOException {
System.out.println("回调处理信息:"+s);
}
};
//消费者消费消息不是一次性的,所以消费者不能关闭资源
channel.basicConsume(Producer.QUEUE_NAME_3,true, deliverCallback,cancelCallback);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
} finally {
/*try {
//关闭资源
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}*/
}
}
}
消费者2
package com.aaa.rabbitmq.demo3;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @FileName: Consumer
* @Description:
* @Author: zhz
* @CreateTime: 2024/12/10 9:36
* @Version: 1.0.0
*/
public class ConsumerB {
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置创建工厂的参数
//链接主机IP
connectionFactory.setHost("192.168.170.30");
//设置vhost
connectionFactory.setVirtualHost("/root/");
//设置用户和密码
connectionFactory.setUsername("root");
connectionFactory.setPassword("tiger");
//设置端口号
connectionFactory.setPort(5672);
//使用链接工厂创建连接
connection = connectionFactory.newConnection();
//使用链接创建channel
channel = connection.createChannel();
//使用channel定义队列
//String queue 队列名称,
// boolean durable 是否持久化(是否存储到磁盘中),
// boolean exclusive 是否被多个消费者消费 false 允许多个消费者,
// boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
// Map<String, Object> arguments 定义其他参数
channel.queueDeclare(Producer.QUEUE_NAME_4,true,false,false,null);
//使用channel获取消息 amq.ctag-E0c-YqMVGbLBAm9ZtT2SfA
//String queue 队列名称,
// boolean autoAck 自动确定 消费者一旦拿到消息,自动确认拿到了,
// DeliverCallback deliverCallback 交付回调,
// CancelCallback cancelCallback 取消回调
/* channel.basicConsume(Producer.QUEUE_NAME,(String s, Delivery delivery)->{
System.out.println("回调处理信息:"+s);
//delivery交付对象,存储消息的所有信息 delivery.getBody()消息的字节数组
System.out.println("消费的消息为:"+new String(delivery.getBody()));
//消息的其他信息:
Envelope envelope = delivery.getEnvelope();
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("路由键routingKey:"+routingKey);
System.out.println("交换机exchange:"+exchange);
System.out.println("消息唯一的数字标识:"+deliveryTag);
},(String s)->{
System.out.println("回调处理信息:"+s);});*/
//使用内部类,实例化接口
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("回调处理信息:"+s);
//delivery交付对象,存储消息的所有信息 delivery.getBody()消息的字节数组
System.out.println("模拟消费者,获取日志信息:"+new String(delivery.getBody())+",模仿存储到磁盘。。。。");
//消息的其他信息:
Envelope envelope = delivery.getEnvelope();
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("路由键routingKey:"+routingKey);
System.out.println("交换机exchange:"+exchange);
System.out.println("消息唯一的数字标识:"+deliveryTag);
}
};
CancelCallback cancelCallback = new CancelCallback(){
@Override
public void handle(String s) throws IOException {
System.out.println("回调处理信息:"+s);
}
};
//消费者消费消息不是一次性的,所以消费者不能关闭资源
channel.basicConsume(Producer.QUEUE_NAME_4,true, deliverCallback,cancelCallback);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
} finally {
/*try {
//关闭资源
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}*/
}
}
}
消费者3
package com.aaa.rabbitmq.demo3;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @FileName: Consumer
* @Description:
* @Author: zhz
* @CreateTime: 2024/12/10 9:36
* @Version: 1.0.0
*/
public class ConsumerC {
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置创建工厂的参数
//链接主机IP
connectionFactory.setHost("192.168.170.30");
//设置vhost
connectionFactory.setVirtualHost("/root/");
//设置用户和密码
connectionFactory.setUsername("root");
connectionFactory.setPassword("tiger");
//设置端口号
connectionFactory.setPort(5672);
//使用链接工厂创建连接
connection = connectionFactory.newConnection();
//使用链接创建channel
channel = connection.createChannel();
//使用channel定义队列
//String queue 队列名称,
// boolean durable 是否持久化(是否存储到磁盘中),
// boolean exclusive 是否被多个消费者消费 false 允许多个消费者,
// boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
// Map<String, Object> arguments 定义其他参数
channel.queueDeclare(Producer.QUEUE_NAME_5,true,false,false,null);
//使用channel获取消息 amq.ctag-E0c-YqMVGbLBAm9ZtT2SfA
//String queue 队列名称,
// boolean autoAck 自动确定 消费者一旦拿到消息,自动确认拿到了,
// DeliverCallback deliverCallback 交付回调,
// CancelCallback cancelCallback 取消回调
/* channel.basicConsume(Producer.QUEUE_NAME,(String s, Delivery delivery)->{
System.out.println("回调处理信息:"+s);
//delivery交付对象,存储消息的所有信息 delivery.getBody()消息的字节数组
System.out.println("消费的消息为:"+new String(delivery.getBody()));
//消息的其他信息:
Envelope envelope = delivery.getEnvelope();
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("路由键routingKey:"+routingKey);
System.out.println("交换机exchange:"+exchange);
System.out.println("消息唯一的数字标识:"+deliveryTag);
},(String s)->{
System.out.println("回调处理信息:"+s);});*/
//使用内部类,实例化接口
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("回调处理信息:"+s);
//delivery交付对象,存储消息的所有信息 delivery.getBody()消息的字节数组
System.out.println("模拟消费者,获取日志信息:"+new String(delivery.getBody())+",模仿存储到数据库。。。。");
//消息的其他信息:
Envelope envelope = delivery.getEnvelope();
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("路由键routingKey:"+routingKey);
System.out.println("交换机exchange:"+exchange);
System.out.println("消息唯一的数字标识:"+deliveryTag);
}
};
CancelCallback cancelCallback = new CancelCallback(){
@Override
public void handle(String s) throws IOException {
System.out.println("回调处理信息:"+s);
}
};
//消费者消费消息不是一次性的,所以消费者不能关闭资源
channel.basicConsume(Producer.QUEUE_NAME_5,true, deliverCallback,cancelCallback);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
} finally {
/*try {
//关闭资源
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}*/
}
}
}
路由模式
简介
直接(direct)模式:添加一个功能 - 我们将使其可以仅订阅消息的子集。当前演示的是Exchange类型为direct使用。直接direct绑定比扇出fanout更加灵活。
生产者
package com.aaa.rabbitmq.demo4;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;
/**
* @FileName: Producer
* @Description: 路由模式: 需求 日志类型分类trace,debug,info,warn,error,fatal 扇出中不管什么类型,消费者都必须消费,
* 让消息接收更加灵活 只接受消息子集
* 当前模式下不去都消费 ,选择error fatal 保存数据库 warn 保存到磁盘 trace,debug,info打印控制台
* @Author: zhz
* @CreateTime: 2024/12/10 9:02
* @Version: 1.0.0
*/
public class Producer {
//定义常量是不能更改
public static final String QUEUE_NAME_6 = "queue6";
public static final String QUEUE_NAME_7 = "queue7";
public static final String QUEUE_NAME_8 = "queue8";
private static final String EXCHANGE_NAME="exchange_direct";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置创建工厂的参数
//链接主机IP
connectionFactory.setHost("192.168.170.30");
//设置vhost
connectionFactory.setVirtualHost("/root/");
//设置用户和密码
connectionFactory.setUsername("root");
connectionFactory.setPassword("tiger");
//设置端口号
connectionFactory.setPort(5672);
//使用链接工厂创建连接
connection = connectionFactory.newConnection();
//使用链接创建channel
channel = connection.createChannel();
//使用channel定义交换机
//String exchange交换机名称, String type 类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//使用channel定义队列
//String queue 队列名称,
// boolean durable 是否持久化(是否存储到磁盘中),
// boolean exclusive 是否被多个消费者消费 false 允许多个消费者,
// boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
// Map<String, Object> arguments 定义其他参数
channel.queueDeclare(QUEUE_NAME_6,true,false,false,null);
channel.queueDeclare(QUEUE_NAME_7,true,false,false,null);
channel.queueDeclare(QUEUE_NAME_8,true,false,false,null);
//让交换和队列进行绑定
//String queue, String exchange, String routingKey
channel.queueBind(QUEUE_NAME_6,EXCHANGE_NAME,"fatal");
channel.queueBind(QUEUE_NAME_6,EXCHANGE_NAME,"error");
channel.queueBind(QUEUE_NAME_7,EXCHANGE_NAME,"warn");
channel.queueBind(QUEUE_NAME_8,EXCHANGE_NAME,"trace");
channel.queueBind(QUEUE_NAME_8,EXCHANGE_NAME,"debug");
channel.queueBind(QUEUE_NAME_8,EXCHANGE_NAME,"info");
String[] logTypeArray={"trace","debug","info","warn","error","fatal"};
Random random =new Random();
for (int i = 1; i <= 50; i++) {
int logTypeInt = random.nextInt(logTypeArray.length);
String logType= logTypeArray[logTypeInt];
//定义发送的消息
String sendMsg = "hello qy178!!!"+i+",日志类型为:"+logType;
//发送消息
//String exchange 交换机名称,
// String routingKey 路由键,
// AMQP.BasicProperties props 发送时传递其他参数, byte[] body 消息的字节数组
//交换机名称为空串“”, 不是不使用交换机,而是使用默认交换机
channel.basicPublish(EXCHANGE_NAME,logType, MessageProperties.PERSISTENT_TEXT_PLAIN,sendMsg.getBytes());
}
//打印提示
System.out.println("发送完毕!");
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
} finally {
try {
//关闭资源
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
}
消费者1
package com.aaa.rabbitmq.demo4;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @FileName: Consumer
* @Description:
* @Author: zhz
* @CreateTime: 2024/12/10 9:36
* @Version: 1.0.0
*/
public class ConsumerA {
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置创建工厂的参数
//链接主机IP
connectionFactory.setHost("192.168.170.30");
//设置vhost
connectionFactory.setVirtualHost("/root/");
//设置用户和密码
connectionFactory.setUsername("root");
connectionFactory.setPassword("tiger");
//设置端口号
connectionFactory.setPort(5672);
//使用链接工厂创建连接
connection = connectionFactory.newConnection();
//使用链接创建channel
channel = connection.createChannel();
//使用channel定义队列
//String queue 队列名称,
// boolean durable 是否持久化(是否存储到磁盘中),
// boolean exclusive 是否被多个消费者消费 false 允许多个消费者,
// boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
// Map<String, Object> arguments 定义其他参数
channel.queueDeclare(Producer.QUEUE_NAME_6,true,false,false,null);
//使用channel获取消息 amq.ctag-E0c-YqMVGbLBAm9ZtT2SfA
//String queue 队列名称,
// boolean autoAck 自动确定 消费者一旦拿到消息,自动确认拿到了,
// DeliverCallback deliverCallback 交付回调,
// CancelCallback cancelCallback 取消回调
/* channel.basicConsume(Producer.QUEUE_NAME,(String s, Delivery delivery)->{
System.out.println("回调处理信息:"+s);
//delivery交付对象,存储消息的所有信息 delivery.getBody()消息的字节数组
System.out.println("消费的消息为:"+new String(delivery.getBody()));
//消息的其他信息:
Envelope envelope = delivery.getEnvelope();
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("路由键routingKey:"+routingKey);
System.out.println("交换机exchange:"+exchange);
System.out.println("消息唯一的数字标识:"+deliveryTag);
},(String s)->{
System.out.println("回调处理信息:"+s);});*/
//使用内部类,实例化接口
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("回调处理信息:"+s);
//delivery交付对象,存储消息的所有信息 delivery.getBody()消息的字节数组
System.out.println("模拟消费者,获取日志信息,或者error和fatal:"+new String(delivery.getBody())+",模仿放入数据库。。。。");
//消息的其他信息:
Envelope envelope = delivery.getEnvelope();
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("路由键routingKey:"+routingKey);
System.out.println("交换机exchange:"+exchange);
System.out.println("消息唯一的数字标识:"+deliveryTag);
}
};
CancelCallback cancelCallback = new CancelCallback(){
@Override
public void handle(String s) throws IOException {
System.out.println("回调处理信息:"+s);
}
};
//消费者消费消息不是一次性的,所以消费者不能关闭资源
channel.basicConsume(Producer.QUEUE_NAME_6,true, deliverCallback,cancelCallback);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
} finally {
/*try {
//关闭资源
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}*/
}
}
}
消费者2
package com.aaa.rabbitmq.demo4;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @FileName: Consumer
* @Description:
* @Author: zhz
* @CreateTime: 2024/12/10 9:36
* @Version: 1.0.0
*/
public class ConsumerB {
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置创建工厂的参数
//链接主机IP
connectionFactory.setHost("192.168.170.30");
//设置vhost
connectionFactory.setVirtualHost("/root/");
//设置用户和密码
connectionFactory.setUsername("root");
connectionFactory.setPassword("tiger");
//设置端口号
connectionFactory.setPort(5672);
//使用链接工厂创建连接
connection = connectionFactory.newConnection();
//使用链接创建channel
channel = connection.createChannel();
//使用channel定义队列
//String queue 队列名称,
// boolean durable 是否持久化(是否存储到磁盘中),
// boolean exclusive 是否被多个消费者消费 false 允许多个消费者,
// boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
// Map<String, Object> arguments 定义其他参数
channel.queueDeclare(Producer.QUEUE_NAME_7,true,false,false,null);
//使用channel获取消息 amq.ctag-E0c-YqMVGbLBAm9ZtT2SfA
//String queue 队列名称,
// boolean autoAck 自动确定 消费者一旦拿到消息,自动确认拿到了,
// DeliverCallback deliverCallback 交付回调,
// CancelCallback cancelCallback 取消回调
/* channel.basicConsume(Producer.QUEUE_NAME,(String s, Delivery delivery)->{
System.out.println("回调处理信息:"+s);
//delivery交付对象,存储消息的所有信息 delivery.getBody()消息的字节数组
System.out.println("消费的消息为:"+new String(delivery.getBody()));
//消息的其他信息:
Envelope envelope = delivery.getEnvelope();
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("路由键routingKey:"+routingKey);
System.out.println("交换机exchange:"+exchange);
System.out.println("消息唯一的数字标识:"+deliveryTag);
},(String s)->{
System.out.println("回调处理信息:"+s);});*/
//使用内部类,实例化接口
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("回调处理信息:"+s);
//delivery交付对象,存储消息的所有信息 delivery.getBody()消息的字节数组
System.out.println("模拟消费者,获取日志信息,warn:"+new String(delivery.getBody())+",模仿存入磁盘。。。。");
//消息的其他信息:
Envelope envelope = delivery.getEnvelope();
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("路由键routingKey:"+routingKey);
System.out.println("交换机exchange:"+exchange);
System.out.println("消息唯一的数字标识:"+deliveryTag);
}
};
CancelCallback cancelCallback = new CancelCallback(){
@Override
public void handle(String s) throws IOException {
System.out.println("回调处理信息:"+s);
}
};
//消费者消费消息不是一次性的,所以消费者不能关闭资源
channel.basicConsume(Producer.QUEUE_NAME_7,true, deliverCallback,cancelCallback);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
} finally {
/*try {
//关闭资源
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}*/
}
}
}
消费者3
package com.aaa.rabbitmq.demo4;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @FileName: Consumer
* @Description:
* @Author: zhz
* @CreateTime: 2024/12/10 9:36
* @Version: 1.0.0
*/
public class ConsumerC {
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置创建工厂的参数
//链接主机IP
connectionFactory.setHost("192.168.170.30");
//设置vhost
connectionFactory.setVirtualHost("/root/");
//设置用户和密码
connectionFactory.setUsername("root");
connectionFactory.setPassword("tiger");
//设置端口号
connectionFactory.setPort(5672);
//使用链接工厂创建连接
connection = connectionFactory.newConnection();
//使用链接创建channel
channel = connection.createChannel();
//使用channel定义队列
//String queue 队列名称,
// boolean durable 是否持久化(是否存储到磁盘中),
// boolean exclusive 是否被多个消费者消费 false 允许多个消费者,
// boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
// Map<String, Object> arguments 定义其他参数
channel.queueDeclare(Producer.QUEUE_NAME_8,true,false,false,null);
//使用channel获取消息 amq.ctag-E0c-YqMVGbLBAm9ZtT2SfA
//String queue 队列名称,
// boolean autoAck 自动确定 消费者一旦拿到消息,自动确认拿到了,
// DeliverCallback deliverCallback 交付回调,
// CancelCallback cancelCallback 取消回调
/* channel.basicConsume(Producer.QUEUE_NAME,(String s, Delivery delivery)->{
System.out.println("回调处理信息:"+s);
//delivery交付对象,存储消息的所有信息 delivery.getBody()消息的字节数组
System.out.println("消费的消息为:"+new String(delivery.getBody()));
//消息的其他信息:
Envelope envelope = delivery.getEnvelope();
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("路由键routingKey:"+routingKey);
System.out.println("交换机exchange:"+exchange);
System.out.println("消息唯一的数字标识:"+deliveryTag);
},(String s)->{
System.out.println("回调处理信息:"+s);});*/
//使用内部类,实例化接口
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("回调处理信息:"+s);
//delivery交付对象,存储消息的所有信息 delivery.getBody()消息的字节数组
System.out.println("模拟消费者,获取日志信息,接受trace,debug,info:"+new String(delivery.getBody())+",模仿打印控制台。。。。");
//消息的其他信息:
Envelope envelope = delivery.getEnvelope();
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("路由键routingKey:"+routingKey);
System.out.println("交换机exchange:"+exchange);
System.out.println("消息唯一的数字标识:"+deliveryTag);
}
};
CancelCallback cancelCallback = new CancelCallback(){
@Override
public void handle(String s) throws IOException {
System.out.println("回调处理信息:"+s);
}
};
//消费者消费消息不是一次性的,所以消费者不能关闭资源
channel.basicConsume(Producer.QUEUE_NAME_8,true, deliverCallback,cancelCallback);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
} finally {
/*try {
//关闭资源
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}*/
}
}
}
话题模式
简介
通配符(topic)模式:尽管使用直接direct交换改进了我们的系统,但它仍然有局限性——它不能基于多个标准进行路由。当前演示的是Exchange类型为topic使用。topic方式可以基于多个标准进行路由
。
路由规则:单词列表,由点分隔,路由键中可以有任意多的单词,最多为 255 个字节。topic比direct方式,本质上更灵活,使用绑定键有两个重要的特殊符号:
*(星号)可以只替换一个单词。
(hash) 可以代替零个或多个单词。 ..*
生产者
package com.aaa.rabbitmq.demo5;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;
/**
* @FileName: Producer
* @Description: 话题模式,比路由模式更灵活 可以#代表0到多个单词 *代表1个单词进行路由key的匹配
* @Author: zhz
* @CreateTime: 2024/12/10 9:02
* @Version: 1.0.0
*/
public class Producer {
//定义常量是不能更改
public static final String QUEUE_NAME_9 = "queue9"; //喜欢橙色的动物
public static final String QUEUE_NAME_10 = "queue10";//喜欢懒惰或者兔子
public static final String QUEUE_NAME_11 = "queue11";//喜欢速度快的
private static final String EXCHANGE_NAME="exchange_topic";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置创建工厂的参数
//链接主机IP
connectionFactory.setHost("192.168.170.30");
//设置vhost
connectionFactory.setVirtualHost("/root/");
//设置用户和密码
connectionFactory.setUsername("root");
connectionFactory.setPassword("tiger");
//设置端口号
connectionFactory.setPort(5672);
//使用链接工厂创建连接
connection = connectionFactory.newConnection();
//使用链接创建channel
channel = connection.createChannel();
//使用channel定义交换机
//String exchange交换机名称, String type 类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//使用channel定义队列
//String queue 队列名称,
// boolean durable 是否持久化(是否存储到磁盘中),
// boolean exclusive 是否被多个消费者消费 false 允许多个消费者,
// boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
// Map<String, Object> arguments 定义其他参数
channel.queueDeclare(QUEUE_NAME_9,true,false,false,null);
channel.queueDeclare(QUEUE_NAME_10,true,false,false,null);
channel.queueDeclare(QUEUE_NAME_11,true,false,false,null);
//让交换和队列进行绑定
//String queue, String exchange, String routingKey
channel.queueBind(QUEUE_NAME_9,EXCHANGE_NAME,"*.orange.*");
channel.queueBind(QUEUE_NAME_10,EXCHANGE_NAME,"lazy.#");
channel.queueBind(QUEUE_NAME_10,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(QUEUE_NAME_11,EXCHANGE_NAME,"quick.*.*");
String[] animalArray={"quick.orange.rabbit","lazy.","lazy.a.b.c.d","lazy.orange.elephant","quick.orange.fox","lazy.brown.fox","lazy.pink.rabbit","quick.brown.fox"};
Random random =new Random();
for (int i = 1; i <= 20; i++) {
int logTypeInt = random.nextInt(animalArray.length);
String animal= animalArray[logTypeInt];
//定义发送的消息
String sendMsg = "hello qy178!!!"+i+",动物为:"+animal;
//发送消息
//String exchange 交换机名称,
// String routingKey 路由键,
// AMQP.BasicProperties props 发送时传递其他参数, byte[] body 消息的字节数组
//交换机名称为空串“”, 不是不使用交换机,而是使用默认交换机
channel.basicPublish(EXCHANGE_NAME,animal, MessageProperties.PERSISTENT_TEXT_PLAIN,sendMsg.getBytes());
}
//打印提示
System.out.println("发送完毕!");
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
} finally {
try {
//关闭资源
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
}
消费者1
package com.aaa.rabbitmq.demo5;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @FileName: Consumer
* @Description:
* @Author: zhz
* @CreateTime: 2024/12/10 9:36
* @Version: 1.0.0
*/
public class ConsumerA {
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置创建工厂的参数
//链接主机IP
connectionFactory.setHost("192.168.170.30");
//设置vhost
connectionFactory.setVirtualHost("/root/");
//设置用户和密码
connectionFactory.setUsername("root");
connectionFactory.setPassword("tiger");
//设置端口号
connectionFactory.setPort(5672);
//使用链接工厂创建连接
connection = connectionFactory.newConnection();
//使用链接创建channel
channel = connection.createChannel();
//使用channel定义队列
//String queue 队列名称,
// boolean durable 是否持久化(是否存储到磁盘中),
// boolean exclusive 是否被多个消费者消费 false 允许多个消费者,
// boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
// Map<String, Object> arguments 定义其他参数
channel.queueDeclare(Producer.QUEUE_NAME_9,true,false,false,null);
//使用channel获取消息 amq.ctag-E0c-YqMVGbLBAm9ZtT2SfA
//String queue 队列名称,
// boolean autoAck 自动确定 消费者一旦拿到消息,自动确认拿到了,
// DeliverCallback deliverCallback 交付回调,
// CancelCallback cancelCallback 取消回调
/* channel.basicConsume(Producer.QUEUE_NAME,(String s, Delivery delivery)->{
System.out.println("回调处理信息:"+s);
//delivery交付对象,存储消息的所有信息 delivery.getBody()消息的字节数组
System.out.println("消费的消息为:"+new String(delivery.getBody()));
//消息的其他信息:
Envelope envelope = delivery.getEnvelope();
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("路由键routingKey:"+routingKey);
System.out.println("交换机exchange:"+exchange);
System.out.println("消息唯一的数字标识:"+deliveryTag);
},(String s)->{
System.out.println("回调处理信息:"+s);});*/
//使用内部类,实例化接口
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("回调处理信息:"+s);
//delivery交付对象,存储消息的所有信息 delivery.getBody()消息的字节数组
System.out.println("模拟消费者,喜欢动物是橙色:"+new String(delivery.getBody())+"。。。。");
//消息的其他信息:
Envelope envelope = delivery.getEnvelope();
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("路由键routingKey:"+routingKey);
System.out.println("交换机exchange:"+exchange);
System.out.println("消息唯一的数字标识:"+deliveryTag);
}
};
CancelCallback cancelCallback = new CancelCallback(){
@Override
public void handle(String s) throws IOException {
System.out.println("回调处理信息:"+s);
}
};
//消费者消费消息不是一次性的,所以消费者不能关闭资源
channel.basicConsume(Producer.QUEUE_NAME_9,true, deliverCallback,cancelCallback);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
} finally {
/*try {
//关闭资源
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}*/
}
}
}
消费者2
package com.aaa.rabbitmq.demo5;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @FileName: Consumer
* @Description:
* @Author: zhz
* @CreateTime: 2024/12/10 9:36
* @Version: 1.0.0
*/
public class ConsumerB {
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置创建工厂的参数
//链接主机IP
connectionFactory.setHost("192.168.170.30");
//设置vhost
connectionFactory.setVirtualHost("/root/");
//设置用户和密码
connectionFactory.setUsername("root");
connectionFactory.setPassword("tiger");
//设置端口号
connectionFactory.setPort(5672);
//使用链接工厂创建连接
connection = connectionFactory.newConnection();
//使用链接创建channel
channel = connection.createChannel();
//使用channel定义队列
//String queue 队列名称,
// boolean durable 是否持久化(是否存储到磁盘中),
// boolean exclusive 是否被多个消费者消费 false 允许多个消费者,
// boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
// Map<String, Object> arguments 定义其他参数
channel.queueDeclare(Producer.QUEUE_NAME_10,true,false,false,null);
//使用channel获取消息 amq.ctag-E0c-YqMVGbLBAm9ZtT2SfA
//String queue 队列名称,
// boolean autoAck 自动确定 消费者一旦拿到消息,自动确认拿到了,
// DeliverCallback deliverCallback 交付回调,
// CancelCallback cancelCallback 取消回调
/* channel.basicConsume(Producer.QUEUE_NAME,(String s, Delivery delivery)->{
System.out.println("回调处理信息:"+s);
//delivery交付对象,存储消息的所有信息 delivery.getBody()消息的字节数组
System.out.println("消费的消息为:"+new String(delivery.getBody()));
//消息的其他信息:
Envelope envelope = delivery.getEnvelope();
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("路由键routingKey:"+routingKey);
System.out.println("交换机exchange:"+exchange);
System.out.println("消息唯一的数字标识:"+deliveryTag);
},(String s)->{
System.out.println("回调处理信息:"+s);});*/
//使用内部类,实例化接口
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("回调处理信息:"+s);
//delivery交付对象,存储消息的所有信息 delivery.getBody()消息的字节数组
System.out.println("模拟消费者,喜欢动物是懒惰的或者兔子:"+new String(delivery.getBody())+"。。。。");
//消息的其他信息:
Envelope envelope = delivery.getEnvelope();
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("路由键routingKey:"+routingKey);
System.out.println("交换机exchange:"+exchange);
System.out.println("消息唯一的数字标识:"+deliveryTag);
}
};
CancelCallback cancelCallback = new CancelCallback(){
@Override
public void handle(String s) throws IOException {
System.out.println("回调处理信息:"+s);
}
};
//消费者消费消息不是一次性的,所以消费者不能关闭资源
channel.basicConsume(Producer.QUEUE_NAME_10,true, deliverCallback,cancelCallback);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
} finally {
/*try {
//关闭资源
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}*/
}
}
}
消费者3
package com.aaa.rabbitmq.demo5;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @FileName: Consumer
* @Description:
* @Author: zhz
* @CreateTime: 2024/12/10 9:36
* @Version: 1.0.0
*/
public class ConsumerC {
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//实例化ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置创建工厂的参数
//链接主机IP
connectionFactory.setHost("192.168.170.30");
//设置vhost
connectionFactory.setVirtualHost("/root/");
//设置用户和密码
connectionFactory.setUsername("root");
connectionFactory.setPassword("tiger");
//设置端口号
connectionFactory.setPort(5672);
//使用链接工厂创建连接
connection = connectionFactory.newConnection();
//使用链接创建channel
channel = connection.createChannel();
//使用channel定义队列
//String queue 队列名称,
// boolean durable 是否持久化(是否存储到磁盘中),
// boolean exclusive 是否被多个消费者消费 false 允许多个消费者,
// boolean autoDelete 是否自动删除 至少有一消费者使用过该队列,并且当前任何消费者都断开,该队列的消息会自动删除,
// Map<String, Object> arguments 定义其他参数
channel.queueDeclare(Producer.QUEUE_NAME_11,true,false,false,null);
//使用channel获取消息 amq.ctag-E0c-YqMVGbLBAm9ZtT2SfA
//String queue 队列名称,
// boolean autoAck 自动确定 消费者一旦拿到消息,自动确认拿到了,
// DeliverCallback deliverCallback 交付回调,
// CancelCallback cancelCallback 取消回调
/* channel.basicConsume(Producer.QUEUE_NAME,(String s, Delivery delivery)->{
System.out.println("回调处理信息:"+s);
//delivery交付对象,存储消息的所有信息 delivery.getBody()消息的字节数组
System.out.println("消费的消息为:"+new String(delivery.getBody()));
//消息的其他信息:
Envelope envelope = delivery.getEnvelope();
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("路由键routingKey:"+routingKey);
System.out.println("交换机exchange:"+exchange);
System.out.println("消息唯一的数字标识:"+deliveryTag);
},(String s)->{
System.out.println("回调处理信息:"+s);});*/
//使用内部类,实例化接口
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("回调处理信息:"+s);
//delivery交付对象,存储消息的所有信息 delivery.getBody()消息的字节数组
System.out.println("模拟消费者,喜欢动物是快的:"+new String(delivery.getBody())+"。。。。");
//消息的其他信息:
Envelope envelope = delivery.getEnvelope();
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("路由键routingKey:"+routingKey);
System.out.println("交换机exchange:"+exchange);
System.out.println("消息唯一的数字标识:"+deliveryTag);
}
};
CancelCallback cancelCallback = new CancelCallback(){
@Override
public void handle(String s) throws IOException {
System.out.println("回调处理信息:"+s);
}
};
//消费者消费消息不是一次性的,所以消费者不能关闭资源
channel.basicConsume(Producer.QUEUE_NAME_11,true, deliverCallback,cancelCallback);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
} finally {
/*try {
//关闭资源
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}*/
}
}
}
持久化
标签:String,System,rabbitmq,delivery,println,channel,out From: https://www.cnblogs.com/xiaomubupi/p/18642182