1. 概述
1.1 MQ的相关概念
什么是MQ
MQ(message queue),消息队列,本质就是一个队列,遵循FIFFO先进先出原则。
什么是FIFO呢?打个比方,喝啤酒的时候,酒量非常好,喝下的酒都去厕所排出来了,先喝进去的酒先排出来,就是FIFO先进先出。如果酒量不好,喝了就吐,后喝进去的先吐出来,就是LIFO后进先出。FIFO即为队列,LIFO即为栈。
只不过这个队列中存放的内容是一段message消息而已,这个消息是可以进行通信的,用于上下游传递消息。
什么是上下游传递消息?每个人都有QQ,A同学向B同学发送消息,那么A同学就是上游,B同学就是下游,A同学和B同学传递消息,就称为上下游传递消息。
在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务。
为什么要用MQ
MQ有三大功能:流量消峰、应用解耦、异步处理
-
流量消峰
举个例子,如果订单系统每秒最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果一秒内有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。而使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。
-
应用解耦
以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中间用户感受不到物流系统的故障,提升系统的可用性。
-
异步处理
有些服务间调用是异步的,例如A调用B,B需要花费很长时间执行,但是A需要知道B什么时候可以执行完,以前一般有两种方式,A过一段时间去调用B的api查询结果。或者A提供一个callback api,即回调API,B执行完之后调用api通知A服务。这两种方式都不是很优雅。使用消息队列,可以很方便解决这个问题,A调用B服务后,只需要监听B处理完成的消息,当B处理完成后,会发送一条消息给MQ,MQ会将此消息转发给A服务。这样A服务既不用循环调用B的查询api,也不用提供callback api。同样B服务也不用做这些操作。A服务还能及时的得到异步处理成功的消息。
MQ的分类
- ActiveMQ
- 优点:单机吞吐量万级,时效性ms级,可用性高,基于主从架构实现高可用性,消息可靠性高(丢失数据概率较低)
- 缺点:官方社区(Apache)现在对ActiveMQ 5.x维护越来越少,高吞吐量场景较少使用。
- Kafka
- 大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开Kafka,是为大数据而生的消息中间件。Kafka以其百万级TPS的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥着举足轻重的作用。目前已经被LinkedIn,Uber, Twitter, Netflix等大公司所采纳。
- 优点: 性能卓越,单机写入TPS约在百万条/秒,最大的优点,就是吞吐量高。时效性ms级可用性非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用。消费者采用Pull推送方式获取消息,,消息有序,,通过控制能够保证所有消息被消费且仅被消费一次;有优秀的第三方Kafka Web管理界面Kafka-Manager;在日志领域比较成熟,被多家公司和多个开源项目使用;功能支持:功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用
- 缺点:Kafka单机超过64个队列/分区,Load会发生明显的CPU飙高现象,队列越多,load加载越高,发送消息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序,但是一台代理宕机后,就会产生消息乱序,社区更新较慢;
- RocketMQ
- RocketMQ出自阿里巴巴的开源产品,用Java语言实现,在设计时参考了 Kafka,并做出了自己的一些改进。被阿里巴巴广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。
- 优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到0丢失,MQ功能较为完善,还是分布式的,扩展性好,支持10亿级别的消息堆积,不会因为堆积导致性能下降,源码是java,可以自己阅读源码,定制自己公司的MQ
- 缺点:支持的客户端语言不多,目前是java及c++,其中c++不成熟;社区活跃度一般,没有在MQ核心中去实现JMS等接口,有些系统迁移需要修改大量代码
- RabbitMQ
- 2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的、可复用的企业消息系统,是当前最主流的消息中间件之一。
- 优点:由erlang语言编写,因为erlang语言高并发特性,性能较好;吞吐量到万级,MQ功能比较完备,健壮、稳定、易用、跨平台、支持多种语言 如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高。
- 缺点:商业版需要收费,学习成本较高
- 官网https://www.rabbitmq.com/news.html
MQ的选择
- Kafka
Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,如果有日志采集功能,肯定是首选kafka了。 - RocketMQ
天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。RoketMQ在稳定性上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验,如果你的业务有上述高并发场景,建议可以选择RocketMQ。 - RabbitMQ
结合erlang语言本身的并发优势,性能好,时效性微秒级,社区活跃度也比较高,管理界面用起来十分方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备的RabbitMQ。
1.2 RabbitMQ简介
RabbitMQ的概念
RabbitMQ是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑RabbitMQ是一个快递站,一个快递员帮你传递快件。RabbitMQ与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。
四大核心概念
-
生产者
产生数据发送消息的程序是生产者
-
交换机
交换机是RabbitMQ非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
-
队列
队列是RabbitMQ内部使用的一种数据结构,尽管消息流经RabbitMQ和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
-
消费者
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
交换机与队列是一对多或一对一的绑定关系,队列与消费者是一对一或一对多的关系,如果两个消费者对应同一个队列,那么只有一个消费者能收到消息。
RabbitMQ核心部分
Rabiit有六大核心部分,也叫六大模式。
- 简单模式 Hello World
- 工作模式 Work queues
- 发布模式 Publish/Subscribe
- 路由模式 Routing
- 主题模式 Topics
- 发布确认模式 Publisher Confirms
各个名词介绍
-
Broker:RabbitMQ的一个实体。表示接收和分发消息的应用,也叫RabbitMQ Server(RabbitMQ服务器)。RabbitMQ Server就是Message Broker(消息实体)。其中,Exchange为交换机,Queue为队列。
-
Virtual host:当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
-
Exchange:message 到达 Broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。
-
Queue:消息最终被送到这里等待consumer取走
-
Binding:exchange和queue之间的虚拟连接
-
Producer:生产者
-
Consumer:消费者
-
Connection:publisher/consumer和broker之间的TCP连接
-
Channel:在每一个Connection连接中会有多个信道(发消息的通道)。
如果每一次访问 RabbitMQ 都建立一个Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
-
JMS:JMS 即 Java 消息服务( JavaMessage Service )应用程序接口,是一个 Java 平台中关于面向消息中间件的 API。JMS 是 JavaEE 规范中的一种, 类比 JDBC很多消息中间件都实现了 JMS 规范,例如: ActiveMQ 。 RabbitMQ 官方没有提供 JMS 的实现包,但是开源社区有
1.3 RabbitMQ安装
-
官网下载安装包(除了下载RabbitMQ安装包,还要下载erlang运行环境安装包)
-
上传到
/usr/local/software
目录下(如果没有software自己创建) -
安装下载好的安装包
rpm -ivh erlang-21.3-1.el7.x86_64.rpm
yum install socat -y
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
-
添加开机启动RabbitMQ服务
chkconfig rabbitmq-server on
-
启动服务
systemctl start rabbitmq-server.service
-
查看服务状态
systemctl status rabbitmq-server.service
-
停止服务(选择执行)
systemctl stop rabbitmq-server.service
-
开启web管理插件
rabbitmq-plugins enable rabbitmq_management
-
防火墙放行15672端口
firewall-cmd --add-port=15672/tcp --permanent
firewall-cmd --reload
-
浏览器打开
http://服务器IP地址:15672/
-
登录
-
默认登录用户名和密码都为guest,但是使用这个账户登录没有权限
-
添加新用户
-
创建账号
rabbitmqctl add_user admin 123456
-
设置用户角色为超级管理员
rabbitmqctl set_user_tags admin administrator
角色说明:
1、 超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
2、 监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
3、 策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
4、 普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
5、 其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
-
设置用户权限
#set_permissions [-p <vhostpath>] <user> <conf> <write> <read> rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" #用户user_admin具有/vhost1这个virtual host中所有资源的配置、写、读权限
-
当前所有用户和角色
rabbitmqctl list_users
-
使用用户名admin,密码123456即可登录
-
-
-
重置命令
- 关闭应用的命令为
rabbitmqctl stop_app
- 清除的命令为
rabbitmqctl reset
- 重新启动命令为
rabbitmqctl start_app
- 关闭应用的命令为
2. 入门
2.1 Hello World
HelloWorld:简单模式
在本节中,我们将用Java编写两个程序:发送单个消息的生产者和接收消息并打印的消费者。使用到最基本的Java API。
在下图中,“ P”是生产者,“ C”是消费者。中间的框是一个队列-RabbitMQ,代表使用者保留的消息缓冲区
使用步骤如下:
-
添加Maven依赖
<dependencies> <!--rabbitmq依赖客户端--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <!--操作文件流的一个依赖--> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> </dependencies>
-
消息生产者代码
public class Produce { /** * 队列名称 */ public static final String QUEUE_NAME = "hello"; /** * 发消息 * * @param args */ public static void main(String[] args) throws Exception { //1.创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置工厂IP 连接RabbitMQ的队列 factory.setHost("192.168.2.175"); //3.设置端口 factory.setPort(5672); //端口 默认值 5672 //4.设置虚拟机 factory.setVirtualHost("/");//虚拟机 默认值/ //5.设置用户名 factory.setUsername("admin"); //6.设置密码 factory.setPassword("123456"); //7.创建连接 Connection connection = factory.newConnection(); //8.创建信道 Channel channel = connection.createChannel(); //9.生成队列 //如果没有一个名字叫QUEUE_NAME的队列,则会创建该队列,如果有则不会创建 /*参数详解: 队列名, 队列中的消息是否持久化(是否需要保存消息,默认为false:存储在内存中), 该队列是否只供一个消费者进行消费(是否进行消息共享,默认为true:不允许多个消费者消费) 是否自动删除(最后一个消费者断开连接后,该队列是否自动删除,默认为true:自动删除) 其他参数,延迟消息等配置 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); //8.发消息 String message = "Hello World"; /*参数详解: 交换机名称,表示发送到哪个交换机,""表示默认交换机 路由Key(队列名),表示发送到哪个队列 其他参数信息 发送的消息体 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("消息发送完毕"); channel.close(); connection.close(); } }
允许即可发送消息成功
若未关闭防火墙则会出现连接超时的错误
解决方法:打开5672端口
5672端口为客户端连接端口
-
消息消费者代码
/** * 队列名称 */ public static final String QUEUE_NAME = "hello"; /** * 接收消息 * * @param args */ public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("192.168.2.175");//ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/");//虚拟机 默认值/ factory.setUsername("admin");//用户名 默认 guest factory.setPassword("123456");//密码 默认值 guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); //5. 创建队列Queue /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 参数: 1. queue:队列名称 2. durable:是否持久化,当mq重启之后,还在 3. exclusive: * 是否独占。只能有一个消费者监听这队列 * 当Connection关闭时,是否删除队列 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉 5. arguments:参数。 */ //如果没有一个名字叫QUEUE_NAME的队列,则会创建该队列,如果有则不会创建 channel.queueDeclare(QUEUE_NAME,true,false,false,null); /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck:是否自动确认 3. callback:回调对象 */ // 接收消息 Consumer consumer = new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1. consumerTag:标识 2. envelope:获取一些信息,交换机,路由key... 3. properties:配置信息 4. body:数据 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] message) throws IOException { System.out.println("consumerTag:"+consumerTag); System.out.println("Exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties); System.out.println("body:"+new String(message)); } }; channel.basicConsume(QUEUE_NAME,true,consumer); //关闭资源?不要 }
2.2 Work Queues
Work Queues:工作队列
工作队列(又称任务队列),主要思想是减少存在资源密集型任务时全部任务的总等待时间。做法是避免在开始就执行资源密集型任务而导致全部任务总等待时间边长,相反,我们把任务封装为消息并将其发送到队列,将资源密集型任务安排在简单任务之后执行。当有多个工作线程时,这些工作线程将一起处理这些任务。值得注意的是:一个消息只能被处理一次,不可以处理多次。
相反我们安排任务在之后执行,把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
多个线程通过轮询机制处理这些任务
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
实现:
Work Queues与入门程序的简单模式的代码几乎是一样的。可以完全复制,并多复制一个消费者进行多
个消费者同时对消费消息的测试。
首先运行两个消费者代码,后台会创建出一个队列:
接下来生成消息,消费者对消息进行消费
小结:
- 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
- Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,只需要有一个节点成功发送即可。
2.3 Publish/Subscribe
Pub/Sub:订阅模式
在之前的两种模式,一条消息只能被一个消费者消费,而从这种模式开始之后的模式,一条消息就可以被多个消费者同时消费了。
在订阅模型中,多了一个Exchange交换机的角色(在之前两种模式中也有交换机,只不过是默认的交换机),而且过程略有变化。
- P :生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给 X (交换机)
- C :消费者,消息的接收者,会一直等待消息到来
- Queue :消息队列,接收消息、缓存消息
- Exchange :交换机 (X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于 Exchange 的类型。 Exchange 有常见以下 3 种类型:
- Fanout :广播,将消息交给所有绑定到交换机的队列
- Direct :定向,把消息交给符合指定 routing key 的队列
- Topic :通配符,把消息交给符合 routing pattern (路由模式)的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
代码实现:
生产者代码:
public class ProducePubSub {
/**
* 队列1名称
*/
public static final String QUEUE_NAME1 = "pubsub1";
/**
* 队列2名称
*/
public static final String QUEUE_NAME2 = "pubsub2";
/**
* 交换机名称
*/
public static final String EXCHANGE_NAME = "test_fanout";
/**
* 发消息
*
* @param args
*/
public static void main(String[] args) throws Exception {
//1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("192.168.2.175");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建交换机
/*参数说明
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
exchange:交换机名称
type:可以为String类型,也可以为枚举类型BuiltinExchangeType:
* DIRECT("direct"):定向
* FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定的队列
* TOPIC("topic"):通配符
* HEADERS("headers"):参数匹配
durable:是否持久化
autoDelete:是否自动删除
internal:内部使用,一般都为false
arguments:参数列表,这里先设置未null
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true, false, false, null);
//6. 创建队列
channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
channel.queueDeclare(QUEUE_NAME2, true, false, false, null);
//7. 绑定队列和交换机
/*参数说明
queue:绑定的队列名称
exchange:交换机名称
routingKey:路由键,绑定规则
如果交换机的类型为fanout,那么routingKey设置未空字符串
*/
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "");
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "");
//8. 发送消息
String body = "日志信息:张三调用findAll方法,日志级别:info";
channel.basicPublish(EXCHANGE_NAME, "", null, body.getBytes());
//9. 释放资源
channel.close();
connection.close();
}
}
消费者代码:
public class ConsumerPubSub1 {
/**
* 队列1名称
*/
public static final String QUEUE_NAME1 = "pubsub1";
/**
* 接收消息
*
* @param args
*/
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("192.168.2.175");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/");//虚拟机 默认值/
factory.setUsername("admin");//用户名 默认 guest
factory.setPassword("123456");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
// 接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("message:" + new String(body));
System.out.println("将日志信息打印到控制台");
}
};
channel.basicConsume(QUEUE_NAME1, true, consumer);
}
}
public class ConsumerPubSub2 {
/**
* 队列1名称
*/
public static final String QUEUE_NAME2 = "pubsub2";
/**
* 接收消息
*
* @param args
*/
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("192.168.2.175");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/");//虚拟机 默认值/
factory.setUsername("admin");//用户名 默认 guest
factory.setPassword("123456");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
// 接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("message:" + new String(body));
System.out.println("将日志信息保存数据库");
}
};
channel.basicConsume(QUEUE_NAME2, true, consumer);
}
}
小结:
- 交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
- 发布订阅模式与工作队列模式的区别:
- 工作队列模式不用定义交换机,而发布 订阅模式需要定义交换机
- 发布订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息、底层使用默认交换机
- 发布订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机
2.4 Routing
路由模式中,队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey (路由 key)
消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息
图解:
- P :生产者,向 Exchange 发送消息,发送消息时,会指定一个 routing key
- X :Exchange (交换机),接收生产者的消息,然后把消息递交给与 生产者 routing key 完全匹配的队列
- C1 :消费者,其所在队列指定了需要 routing key 为 error 的消息
- C2 :消费者,其所在队列指定了需要 routing key 为 info 、 error 、 warning 的消息
代码实现:
需求:生产者发送日志消息,只将日志级别为error的日志消息存到数据库,其他级别的日志消息只输出到控制台
生产者代码:
public class ProduceRouting {
/**
* 队列1名称
*/
public static final String QUEUE_NAME1 = "routing1";
/**
* 队列2名称
*/
public static final String QUEUE_NAME2 = "routing2";
/**
* 交换机名称
*/
public static final String EXCHANGE_NAME = "test_driect";
/**
* 发消息
*
* @param args
*/
public static void main(String[] args) throws Exception {
//1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("192.168.2.175");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, false, null);
//6. 创建队列
channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
channel.queueDeclare(QUEUE_NAME2, true, false, false, null);
//7. 绑定队列和交换机
//队列1的绑定 error
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "error");
//队列2的绑定 error info warning
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "warning");
//8. 发送消息
String body = "日志信息:张三调用findAll方法,日志级别:info";
channel.basicPublish(EXCHANGE_NAME, "info", null, body.getBytes());
//9. 释放资源
channel.close();
connection.close();
}
}
消费者代码:
public class ConsumerRouting1 {
/**
* 队列1名称
*/
public static final String QUEUE_NAME1 = "routing1";
/**
* 接收消息
*
* @param args
*/
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("192.168.2.175");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/");//虚拟机 默认值/
factory.setUsername("admin");//用户名 默认 guest
factory.setPassword("123456");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
// 接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("message:" + new String(body));
System.out.println("将日志信息保存数据库");
}
};
channel.basicConsume(QUEUE_NAME1, true, consumer);
}
}
public class ConsumerRouting2 {
/**
* 队列1名称
*/
public static final String QUEUE_NAME2 = "routing2";
/**
* 接收消息
*
* @param args
*/
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("192.168.2.175");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/");//虚拟机 默认值/
factory.setUsername("admin");//用户名 默认 guest
factory.setPassword("123456");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
// 接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("message:" + new String(body));
System.out.println("将日志信息打印到控制台");
}
};
channel.basicConsume(QUEUE_NAME2, true, consumer);
}
}
小结:Routing模式要求队列在绑定交换机时要指定 routing key ,消息会转发到符合 routing key 的队列。
2.5 Topics
与路由模式相似,生产者发消息给Exchange交换机,交换机类型为topic,交换机再把对应的消息路由到对应的队列中。但是交换机绑定队列的时候,路由Key是带星号的或者井号的表达式的形式,*代表一个单词,而#代表0个或多个单词。比如说生产者发一个消息,路由Key为A.orange.C,那么消息就会路由分发到Q1队列中。
- Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符
- Routingkey 一般都是有一个或多个单词组成,多个单词之间以 ”.”分割,例如 item.insert
- 通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好 1 个词,例如: item.# 能够匹配 item.insert.abc或者 item.insert item。而* 只能匹配 item.insert
代码实现:
需求:所有error级别的日志存入数据库,所有order系统的日志存入数据库
生产者代码:
public class ProduceTopic {
/**
* 队列1名称
*/
public static final String QUEUE_NAME1 = "topic1";
/**
* 队列2名称
*/
public static final String QUEUE_NAME2 = "topic2";
/**
* 交换机名称
*/
public static final String EXCHANGE_NAME = "test_topic";
/**
* 发消息
*
* @param args
*/
public static void main(String[] args) throws Exception {
//1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("192.168.2.175");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true, false, false, null);
//6. 创建队列
channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
channel.queueDeclare(QUEUE_NAME2, true, false, false, null);
//7. 绑定队列和交换机
//routingKey 由两部分组成:系统的名称.日志的级别
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "#.error");
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "order.*");
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "*.*");
//8. 发送消息
String body = "日志信息:张三调用findAll方法,日志级别:info";
channel.basicPublish(EXCHANGE_NAME, "order.info", null, body.getBytes());
//9. 释放资源
channel.close();
connection.close();
}
}
消费者代码:
public class ConsumerTopic1 {
/**
* 队列1名称
*/
public static final String QUEUE_NAME1 = "topic1";
/**
* 接收消息
*
* @param args
*/
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("192.168.2.175");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
// 接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("message:" + new String(body));
System.out.println("将日志信息保存数据库");
}
};
channel.basicConsume(QUEUE_NAME1, true, consumer);
}
}
public class ConsumerTopic2 {
/**
* 队列2名称
*/
public static final String QUEUE_NAME2 = "topic2";
/**
* 接收消息
*
* @param args
*/
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("192.168.2.175");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
// 接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("message:" + new String(body));
System.out.println("将日志信息打印到控制台");
}
};
channel.basicConsume(QUEUE_NAME2, true, consumer);
}
}
小结:Topic主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置 routing key的时候可以使用通配符,显得更加灵活。
2.6 本章小结
-
简单模式 HelloWorld
一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)。
-
工作队列模式 Work Queue
一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。
-
发布订阅模式 Publish/subscribe
需要设置类型为fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。
-
路由模式 Routing
需要设置类型为direct 的交换机,交换机和队列进行绑定,并且指定 routing key ,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。
-
通配符模式 Topic
需要设置类型为topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key ,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。
3. 整合
3.1 Spring整合RabbitMQ
3.1.1 生产者
步骤:
-
创建生产者工程
-
添加依赖
<dependencies> <!--Spring上下文--> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.1.7.RELEASE</version> </dependency> <!--Spring整合amqp--> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.1.8.RELEASE</version> </dependency> <!--单元测试--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>5.1.7.RELEASE</version> </dependency> </dependencies> <!--1.8编译插件--> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
-
配置整合
RabbitMQ连接信息配置rabbitmq.properties
rabbitmq.host=192.168.2.175 rabbitmq.port=5672 rabbitmq.username=admin rabbitmq.password=123456 rabbitmq.virtual-host=/
Spring核心配置文件 spring-rabbitmq-producer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--加载配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!-- 定义rabbitmq connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/> <!--定义管理交换机、队列--> <rabbit:admin connection-factory="connectionFactory"/> <!--定义持久化队列,不存在则自动创建;不绑定到交换机则绑定到默认交换机 默认交换机类型为direct,名字为:"",路由键为队列的名称 --> <!--参数 id:bean的名称 name:queue的名称 auto-declare:自动创建 auto-delete:自动删除。最后一个消费者和该队列断开连接后自动删除队列 durable:是否持久化 exclusive:是否独占连接 --> <rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/> <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~广播;所有队列都能收到消息~~~~~~~~~~~~~~~~~~~~~~~~~~~~ --> <!--定义广播交换机中的持久化队列,不存在则自动创建--> <rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/> <!--定义广播交换机中的持久化队列,不存在则自动创建--> <rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/> <!--定义广播类型交换机;并绑定上述两个队列--> <rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true"> <rabbit:bindings> <rabbit:binding queue="spring_fanout_queue_1"/> <rabbit:binding queue="spring_fanout_queue_2"/> </rabbit:bindings> </rabbit:fanout-exchange> <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~通配符;*匹配一个单词,#匹配多个单词 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ --> <!--定义广播交换机中的持久化队列,不存在则自动创建--> <rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/> <!--定义广播交换机中的持久化队列,不存在则自动创建--> <rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/> <!--定义广播交换机中的持久化队列,不存在则自动创建--> <rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/> <rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true"> <rabbit:bindings> <rabbit:binding pattern="heima.*" queue="spring_topic_queue_star"/> <rabbit:binding pattern="heima.#" queue="spring_topic_queue_well"/> <rabbit:binding pattern="itcast.#" queue="spring_topic_queue_well2"/> </rabbit:bindings> </rabbit:topic-exchange> <!--定义rabbitTemplate对象操作可以在代码中方便发送消息--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> </beans>
-
编写代码发送消息
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml") public class ProducerTest { //1. 注入RabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; /** * 简单模式发送消息 */ @Test public void testHelloWorld() { //2. 发送消息 rabbitTemplate.convertAndSend("spring_queue", "hello world spring"); } /** * 发送fanout消息 */ @Test public void testFanout() { //2. 发送消息 rabbitTemplate.convertAndSend("spring_fanout_exchange", "", "spring fanout"); } /** * 发送Topic消息 */ @Test public void testTopics() { rabbitTemplate.convertAndSend("spring_topic_exchange", "heima.mark.love", "spring topic"); } }
3.1.2 消费者
步骤:
-
创建消费者工程
-
添加依赖
同消费者
-
配置整合
RabbitMQ连接信息配置rabbitmq.properties同生产者
Spring核心配置文件 spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--加载配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!-- 定义rabbitmq connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/> <bean id="springQueueListener" class="com.mark.rabbitmq.listener.SpringQueueListener"/> <bean id="fanoutListener1" class="com.mark.rabbitmq.listener.FanoutListener1"/> <bean id="fanoutListener2" class="com.mark.rabbitmq.listener.FanoutListener2"/> <bean id="topicListenerStar" class="com.mark.rabbitmq.listener.TopicListenerStar"/> <bean id="topicListenerWell" class="com.mark.rabbitmq.listener.TopicListenerWell"/> <bean id="topicListenerWell2" class="com.mark.rabbitmq.listener.TopicListenerWell2"/> <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true"> <rabbit:listener ref="springQueueListener" queue-names="spring_queue"/> <rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/> <rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/> <rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/> <rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/> <rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/> </rabbit:listener-container> </beans>
-
编写消息监听器
public class FanoutListener1 implements MessageListener { @Override public void onMessage(Message message) { //打印消息 System.out.println(new String(message.getBody())); } }
其他监听器与此代码相同
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml") public class ConsumerTest { @Test public void test1() { boolean flag = true; while (true){ } } }
3.2 SpringBoot整合RabbitMQ
3.2.1 生产者
-
创建生产者 SpringBoot 工程
-
引入 start ,依赖坐标
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency>
-
编写 yml 配置,基本信息配置
#配置RabbitMQ的基本信息 ip、端口、username、password等基本信息 spring: rabbitmq: host: 192.168.2.175 port: 5672 username: admin password: 123456 virtual-host: /
-
定义交换机,队列以及绑定关系的配置类
@Configuration public class RabbitmqConfig { public static final String EXCHANGE_NAME = "boot_topic_exchange"; public static final String QUEUE_NAME = "boot_topic_queue"; /** * 1.配置交换机 */ @Bean("bootExchange") public Exchange bootExchange() { return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); } /** * 2.配置队列 */ @Bean("bootQueue") public Queue bootQueue() { return QueueBuilder.durable(QUEUE_NAME).build(); } /** * 3.队列和交换机绑定关系 Binding对象 */ @Bean public Binding bindingQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs(); } }
-
注入 RabbitTemplate ,调用方法,完成消息发送
@SpringBootTest @RunWith(SpringRunner.class) class ProducerSpirngbootApplicationTests { //1. 注入RabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; @Test void testSend() { rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,"boot.mark","boot mq hello"); } }
3.2.2 消费者
-
创建消费者 SpringBoot 工程
-
引入 start ,依赖坐标
同生产者
-
编写 yml 配置,基本信息配置
同生产者
-
定义监听类,使用 @RabbitListener 注解完成队列监听。
@Component public class RabbitmqListener { @RabbitListener(queues = "boot_topic_queue") public void listenerQueue(Message message){ System.out.println(new String(message.getBody())); } }
小结:
- SpringBoot 提供了快速整合 RabbitMQ 的方式
- 基本信息再 yml 中配置,队列交互机以及绑定关系在配置类中使用 Bean 的方式配置
- 生产端直接注入 RabbitTemplate 完成消息发送
- 消费端直接使用 @RabbitListener 完成消息接收