1,RabbitMq 简介
是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
2,RabbitMq 几个术语
1. Exchange - 交换机
生产者将消息发送给交换机,交换机按照一定规则分发消息给指定队列。消息根据交换机类型和 binding 可以投递到多个队列中。
常用的交换机有四种。
- 直连交换机
directExchange: 根据 routeKey 匹配队列
@Bean
public DirectExchange directExchangeDemo(){
/*
* 直连交换机
* 一共四个参数:String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
* name: 名称
* durable: 持久化
* autoDelete:自动删除
* arguments:参数
*
* */
return new DirectExchange("directExchangeTest",true,false);
}
- 扇形交换机
FanoutExchange:不用匹配 routekey,所有队列都能获取扇形交换机分发的消息
@Bean
public FanoutExchange fanoutExchangeDemo(){
/* 扇形交换机 */
return new FanoutExchange("fanoutExchangeTest",true,false);
}
- 主题交换机
TopicExchange: 增强版的直连交换机,路由键 routekey 中,* 代表匹配任意一个单词,# 代表匹配任意一个或多个单侧, . 代表一个部分(www.# 可以匹配 www.aaa)
@Bean
public TopicExchange topicExchangeDemo(){
return new TopicExchange("topicExchangeTest1",true,false);
}
- 头部交换机
HeadersExchange : 通过头部键值对匹配队列的交换机
@Bean
public HeadersExchange headersExchangeDemo(){
/* 头部交换机 */
return new HeadersExchange("headersExchangeTest",true,false);
}
2. Broker
接收和分发消息的应用,就是 mq 的服务端。
3. Virtual host
虚拟分组,类似于 nameSpace。
4. Connection
publisher/customer 和 broker 直接的连接。
5. Channel
信道,复用 Connection。
6. Exchange
交换机,message 到达 Broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中。
7. Queue
最终消息被送到这里等待被 customer 取走。
8. Binding
exchange 和 queue 之间的虚拟连接, binding 中可以包含 routing key, Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
3. 消息队列大致使用过程
- 启动一个消息队列服务器
- 客户端连接到消息队列服务器,打开一个 channel
- 客户端声明一个 exchange,并设置相关属性
- 客户端声明一个 queue,并设置相关属性
- 客户端使用 routing key,在 exchage 和 queue 中建立绑定关系
- 生产者投递消息到 exchange,exchange 接收到消息后,就根据消息的 key 和已经设置的 binding,进行消息路由,将消息投递到对应的队列中。
- 消费者消费队列中的消息。
4,消息应答
创建消费者:
/**
* 消费者消费消息
* 1,消费哪个队列
* 2,消费成功之后是否要自动应答 "true" 代表自动应答 "false" 手动应答
* 3,消费者未成功消费的回调
* */
channel.basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback);
确认消费:
/**
* 参数 1,消息标记
* 2,false 表示只应答接收到那个传递的消息
* 用于肯定确认:RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
*
* multiple 的 true 和 false 代表不同意思:
* true 表示批量应答 channel 上未应答的消息,false 表示只应答当前 channel 上的消息。
* */
Channel.basicAck(long deliveryTag, boolean multiple)
拒绝消费
/**
* 参数 1,消息标记
* 2,是否应答 channel 上所有未应答的消息
* 3,是否重新入列
* 用于否定消息
* */
Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
拒绝消费
/**
* 参数 1,消息标记
* 3,是否重新入列
* 用于否定消息,相比 basicNack 缺少 multiple 参数,不能批量确认
* */
Channel.basicReject(long deliveryTag, boolean requeue)
手动确认 demo
public class Customer1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("yanqi");
factory.setPassword("5211314");
factory.setVirtualHost("love");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("Custom1 等待接收消息....");
//消费消息
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
String message = new String(delivery.getBody());
System.out.println(message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
//取消消息
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag);
System.out.println("消息消费被中断");
};
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
* 3,消费成功
* 4.消费者未成功消费的回调
*/
channel.basicConsume("中华艺术宫", false, deliverCallback, cancelCallback);
}
}
5,队列持久化
//durable:true 表示队列持久化,false 表示不持久化,重启 rabbitmq 队列就没了
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments);
/**
* 生成一个队列
* 1.队列名称
* 2.队列里面的消息是否持久化 默认消息存储在内存中
* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
* 5.其他参数
*/
channel.queueDeclare("中华艺术宫", false, false, false, null);
6,消息持久化
//props 中添加 MessageProperties.PERSISTENT_TEXT_PLAIN
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body);
/**
* 发送一个消息
* 1.发送到那个交换机
* 2.路由的 key 是哪个
* 3.其他的参数信息,比如 MessageProperties.PERSISTENT_TEXT_PLAIN 消息持久化
* 4.发送消息的消息体
*/
channel.basicPublish("", "中华艺术宫", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
7,预取值
Channel 上未确认的缓冲区,通过 basicQos(int prefetchCount) 设置值,避免缓冲区无限制未确认大小。通过设置预取值,还可以根据不同消费者性能问题实现不公平分发。
8,发布确认
生成者将 Channel 设置成 confirm 模式,一旦消息被投递到所有匹配的队列之后, broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了。
发布确认
public class PushlierConfirm {
public static void main(String[] args) {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("yanqi");
factory.setPassword("5211314");
factory.setVirtualHost("love");
try (
Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
//发布确认
channel.confirmSelect();
/**
* 生成一个队列
* 1.队列名称
* 2.队列里面的消息是否持久化 默认消息存储在内存中
* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
* 5.其他参数
*/
channel.queueDeclare("中华艺术宫", false, false, false, null);
/**
* 发送一个消息
* 1.发送到那个交换机
* 2.路由的 key 是哪个
* 3.其他的参数信息,比如 MessageProperties.PERSISTENT_TEXT_PLAIN 消息持久化
* 4.发送消息的消息体
*/
int i = 0;
while(true){
String message = "hello world--" + (++i);
channel.basicPublish("", "中华艺术宫", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
boolean b = channel.waitForConfirms();
if(b){
System.out.println("消息 " + i + " 发布成功!");
}else{
System.out.println("消息 " + i + " 发布失败!");
}
Thread.sleep(3_000);
}
//System.out.println("消息发送完毕");
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
9,死信队列
无法被消费的消息。
来源:
1)消息 TTL 过期
2)队列达到最大长度,无法再添加数据到 mq 中
3)被拒绝的消息,并且 requeue = false
声明死信队列 demo
public static void rejectCustom() throws IOException, TimeoutException {
Channel channel = ChannelUtil.getChannel();
/**
* 声明死信队列
* queueDeclare(String queue,
* boolean durable,
* boolean exclusive,
* boolean autoDelete,
* Map<String, Object> arguments)
* queue:队列名
* durable:是否持久化
* exclusive:该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
* autoDelete:是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
* arguments:其他参数
*/
String dead_queue = "dead_queue";
channel.queueDeclare(dead_queue, false, false, false, null);
/**
* 私信队列绑定交换机
* */
String dead_exchange = "dead_exchange";
channel.exchangeDeclare(dead_exchange, BuiltinExchangeType.DIRECT);
channel.queueBind(dead_queue, dead_exchange, "dead_routing");
//声明正常队列
String normal_queue = "normal_queue";
Map<String, Object> params = new HashMap<>();
params.put("x-dead-letter-exchange", dead_exchange);
params.put("x-dead-letter-routing-key", "dead_routing");
channel.queueDeclare(normal_queue, false, false, false, params);
//等待接收消息
System.out.println("等待接收消息----");
channel.basicConsume(normal_queue, false, (consumerTag, message) -> {
System.out.println(new String(message.getBody(), "UTF-8"));
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
}, consumerTag -> {
System.out.println("消费失败");
});
}
10,延时队列
TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间
设置超时时间
1)队列的 TTl,队列中的消息一旦过了 TTL 时间未被消费,就会丢弃(有死信队列就放到死信队列中)
2)消息的 TTL,即使消息过期,也不一定被马上丢弃
延时队列
//消息队列设置延时,投送消息到普通队列,ttl 时间内未被消费,投送到死信队列
public static void delay_queue() throws IOException, TimeoutException {
Channel channel = ChannelUtil.getChannel();
//死信队列
String dead_queue = "delay_dead_queue";
channel.queueDeclare(dead_queue, false, false, false, null);
/**
* 死信队列绑定交换机
* */
String dead_exchange = "delay_dead_exchange";
String delay_routing_key = "delay_routing";
channel.exchangeDeclare(dead_exchange, BuiltinExchangeType.DIRECT);
channel.queueBind(dead_queue, dead_exchange, delay_routing_key);
//声明带有 ttl 的队列
String queue = "delay_queue";
Map<String, Object> params = new HashMap<>();
//设置队列的 ttl 时间
params.put("x-message-ttl", 5000);
params.put("x-dead-letter-exchange", dead_exchange);
params.put("x-dead-letter-routing-key", delay_routing_key);
channel.queueDeclare(queue, false, false, false, params);
channel.basicPublish("", queue, null, "延时队列数据:1".getBytes());
channel.basicPublish("", queue, null, "延时队列数据:2".getBytes());
channel.basicPublish("", queue, null, "延时队列数据:3".getBytes());
}
/**
* 消息延时
* 消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间
* 可以用 java DelayQueue
*/
public static void delay_message() throws IOException, TimeoutException {
Channel channel = ChannelUtil.getChannel();
//死信队列
String dead_queue = "delay_dead_queue";
channel.queueDeclare(dead_queue, false, false, false, null);
/**
* 死信队列绑定交换机
* */
String dead_exchange = "delay_dead_exchange";
String delay_routing_key = "delay_routing";
channel.exchangeDeclare(dead_exchange, BuiltinExchangeType.DIRECT);
channel.queueBind(dead_queue, dead_exchange, delay_routing_key);
//声明带有 ttl 的队列
String queue = "delay_queue";
Map<String, Object> params = new HashMap<>();
//设置队列的 ttl 时间
params.put("x-dead-letter-exchange", dead_exchange);
params.put("x-dead-letter-routing-key", delay_routing_key);
channel.queueDeclare(queue, false, false, false, params);
channel.basicPublish("", queue, new AMQP.BasicProperties().builder().expiration("10000").build(), "延时队列数据:1".getBytes());
channel.basicPublish("", queue, new AMQP.BasicProperties().builder().expiration("3000").build(), "延时队列数据:2".getBytes());
channel.basicPublish("", queue, new AMQP.BasicProperties().builder().expiration("300").build(), "延时队列数据:3".getBytes());
}