一、RabbitMQ的基本结构、角色和消息模型
MQ的基本结构:
RabbitMQ中的一些角色:
- publisher:生产者
- consumer:消费者
- exchange个:交换机,负责消息路由
- queue:队列,存储消息
- virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离
RabbitMQ官方提供了5个不同的Demo示例,对应了不同的消息模型:
1.Basic Queue(基本消息队列) 简单队列模型 简单模式:在这种模式下,生产者将消息发送到队列,消费者监听队列并消费消息。消息被消费后,会自动从队列中删除。这种模式的优点是简单易懂,但缺点是如果消费者处理消息的速度慢,可能会导致消息处理不及时。
2.Work queues(工作消息队列) ,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。 工作队列模型 ,一个生产者将消息分发给多个消费者。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型,多个消费者共同处理消息处理,速度就能大大提高了。工作模式:在这种模式下,多个消费者竞争同一个队列中的消息,谁先获取到消息谁就处理该消息。这种模式适用于处理高并发的情况,但需要注意避免多个消费者处理同一消息的问题。
发布订阅(Publish、 Subscribe) ,又根据交换机类型不同分为三种:
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
3.Fanout Exchange(广播)
发布/订阅模型 ,生产者发布消息,多个消费者同时收取
4.Topic Exchange(主题 )Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key`的时候使用通配符!主题模式:主题模式是路由模式的一种变体,它使用通配符来匹配路由键,使得消息可以发送到多个队列中。这种模式增加了消息的灵活性,但也需要小心设计通配符规则以避免不必要的消息匹配。
5.Direct Exchange(路由 )
路由模式:在这种模式下,生产者将消息发送到交换机,交换机根据路由键将消息路由到特定的队列中,只有绑定了相应路由键的队列才能接收消息。这种模式适用于需要根据不同业务场景处理不同消息的情况。
二、入门案例 (基本消息队列)
简单队列模式的模型图:
官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:
- publisher:消息发布者,将消息发送到队列queue
- queue:消息队列,负责接受并缓存消息
- consumer:订阅队列,处理队列中的消息
工程包括三部分:
- mq-demo:父工程,管理项目依赖
- publisher:消息的发送者
- consumer:消息的消费者
基本消息队列的消息发送流程:
1. 建立connection
2. 创建channel
3. 利用channel声明队列
4. 利用channel向队列发送消息
publisher代码如下:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.junit.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; public class PublisherTest { @Test public void testSendMessage() throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("192.168.150.101"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("rbmq"); factory.setPassword("123456"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.发送消息 String message = "hello, rabbitmq!"; channel.basicPublish("", queueName, null, message.getBytes()); System.out.println("发送消息成功:【" + message + "】"); // 5.关闭通道和连接 channel.close(); connection.close(); } }
基本消息队列的消息接收流程:
1. 建立connection
2. 创建channel
3. 利用channel声明队列
4. 定义consumer的消费行为handleDelivery()
5. 利用channel将消费者与队列绑定
consumer代码如下:
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConsumerTest { public static void main(String[] args) throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("192.168.150.101"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("rbmq"); factory.setPassword("123456"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.订阅消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 5.处理消息 String message = new String(body); System.out.println("接收到消息:【" + message + "】"); } }); System.out.println("等待接收消息。。。。"); } }
演示测试成功。