RabbitMQ是一个开源的消息中间件,采用AMQP协议来实现消息的生产、消费和路由。它主要由以下几个组件构成:
-
Producer:消息生产者,即发送消息的应用程序。
-
Exchange:消息交换机,用于接收生产者发送的消息,并将其路由到对应的队列中。
-
Queue:消息队列,用于存储消息,等待消费者来消费。
-
Binding:将交换机和队列进行绑定,以便将消息路由到正确的队列中。
-
Consumer:消息消费者,即接收并处理消息的应用程序。
RabbitMQ的消息流程如下:
-
生产者将消息发送到交换机。
-
交换机接收到消息后,根据绑定规则将消息路由到对应的队列中。
-
消费者从队列中接收消息,并进行处理。
RabbitMQ支持多种不同的交换机类型,包括:
-
Direct Exchange:直接交换机,将消息路由到指定的队列中。
-
Fanout Exchange:广播交换机,将消息路由到所有绑定到该交换机的队列中。
-
Topic Exchange:主题交换机,根据通配符匹配规则将消息路由到对应的队列中。
-
Headers Exchange:头交换机,根据消息头中的键值对进行匹配,并将消息路由到对应的队列中。
在RabbitMQ中,队列和交换机都有持久化和非持久化之分。持久化的队列和交换机可以在RabbitMQ服务器重启后仍然存在,而非持久化的队列和交换机则会在服务器重启后被删除。
总之,RabbitMQ是一个功能强大的消息中间件,可以为分布式系统提供高效、可靠的消息传递服务。
AMQP(Advanced Message Queuing Protocol):
AMQP(Advanced Message Queuing Protocol)是一种网络协议,用于在应用程序之间传递消息。它是一个开放标准,由AMQP工作组开发和维护。AMQP协议支持跨平台、跨语言的消息传递,它可以在不同的操作系统、编程语言和网络环境中使用。
AMQP协议的主要特点包括:
-
可靠性:AMQP协议提供了可靠的消息传递机制,确保消息不会丢失或重复传递。
-
灵活性:AMQP协议支持多种消息传递模式,包括点对点、发布/订阅和路由模式。
-
互操作性:AMQP协议是一个开放标准,支持多种编程语言和操作系统,因此可以在不同的平台之间进行消息传递。
-
安全性:AMQP协议支持多种安全机制,包括身份验证、加密和访问控制。
总之,AMQP协议是一种可靠、灵活、互操作和安全的消息传递协议,被广泛应用于分布式系统、云计算和物联网等领域。
在 RabbitMQ 中,Exchange、Queue 和 routingKey 是消息传递的三个关键概念,它们之间的关系如下:
-
Exchange:Exchange 是 RabbitMQ 中的消息交换机,它负责接收生产者发送的消息,并根据消息的 routingKey 进行路由分发。Exchange 有四种类型:direct、topic、headers 和 fanout。
-
Queue:Queue 是消息队列,它负责存储 Exchange 路由过来的消息。消费者从 Queue 中获取消息进行消费。
-
routingKey:routingKey 是生产者发送消息时指定的关键字,用于指定消息的路由规则。Exchange 根据 routingKey 进行路由分发,将消息发送到指定的 Queue 中。
Exchange、Queue 和 routingKey 的用法如下:
-
生产者发送消息时,需要指定 Exchange 和 routingKey。Exchange 根据 routingKey 进行路由分发,将消息发送到指定的 Queue 中。
-
消费者从 Queue 中获取消息进行消费。消费者可以订阅多个 Queue,从而消费多个 Queue 中的消息。
-
Exchange 和 Queue 可以绑定多个关系。一个 Exchange 可以绑定多个 Queue,一个 Queue 可以绑定多个 Exchange,从而实现不同的消息路由规则。
总之,Exchange、Queue 和 routingKey 是 RabbitMQ 中消息传递的三个关键概念,它们之间的关系和用法需要根据具体的业务场景来设计和实现。
在 RabbitMQ 中,有四种类型的交换机:direct、topic、headers 和 fanout。其中,direct 和 topic 类型的交换机都会使用 routing key 进行路由。在 direct 类型的交换机中,routing key 必须与队列的绑定键完全匹配,才能将消息路由到该队列。而在 topic 类型的交换机中,routing key 可以使用通配符进行匹配,从而将消息路由到多个队列。
举个例子,假设有一个名为 "logs" 的交换机,其中有两个队列 "error_logs" 和 "info_logs"。当生产者发送一条日志消息时,可以将消息的 routing key 设置为 "error" 或 "info",交换机会将该消息路由到相应的队列中,从而实现日志的分类和处理。
RabbitMQ dmeo示例:
- 引入依赖
首先需要在项目中引入RabbitMQ的依赖,可以在pom.xml文件中添加以下依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.11.0</version>
</dependency>
- 创建连接和通道
在使用RabbitMQ之前,需要先创建一个连接和一个通道。连接是一个TCP连接,通道是在连接内部的一个虚拟连接,可以使用通道进行消息的发送和接收。
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
- 创建交换机
交换机是消息的路由中心,它接收从生产者发送过来的消息,并根据路由键将消息路由到一个或多个队列中。在RabbitMQ中,有四种类型的交换机:直连交换机(direct)、主题交换机(topic)、扇形交换机(fanout)和头交换机(headers)。
String exchangeName = "myExchange";
String exchangeType = "direct";
boolean durable = true;
boolean autoDelete = false;
boolean internal = false;
Map<String, Object> arguments = null;
channel.exchangeDeclare(exchangeName, exchangeType, durable, autoDelete, internal, arguments);
参数说明:
-
exchangeName:交换机名称
-
exchangeType:交换机类型,可选值有direct、fanout、topic、headers
-
durable:是否持久化
-
autoDelete:是否自动删除
-
arguments:额外的参数,可为null
- 创建队列
队列是消息的存储中心,它接收交换机路由过来的消息,并将消息存储在队列中,等待消费者进行消费。在创建队列时,需要指定队列的名称、是否持久化、是否自动删除等参数。
String queueName = "myQueue";
boolean durable = true;
boolean exclusive = false;
boolean autoDelete = false;
Map<String, Object> arguments = null;
channel.queueDeclare(queueName, durable, exclusive, autoDelete, arguments);
参数说明:
-
queueName:队列名称
-
durable:是否持久化
-
exclusive:是否排他性队列
-
autoDelete:是否自动删除
-
arguments:额外的参数,可为null
- 绑定队列和交换机
在将队列绑定到交换机时,需要指定交换机的名称、路由键和队列的名称。路由键是用来指定消息路由到哪个队列的,它可以是一个字符串,也可以是一个正则表达式。
String routingKey = "myRoutingKey";
channel.queueBind(queueName, exchangeName, routingKey);
参数说明:
-
queueName:队列名称
-
exchangeName:交换机名称
-
routingKey:路由键,用于将消息路由到指定的队列
- 发送消息
在发送消息时,需要指定交换机的名称、路由键和消息的内容。消息可以是一个字符串,也可以是一个字节数组。
String message = "Hello, RabbitMQ!";
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
- 接收消息
在接收消息时,需要先创建一个消费者对象,并将其注册到队列中。然后在回调函数中处理接收到的消息。
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
}
};
channel.basicConsume(queueName, true, consumer);
以上是RabbitMQ交换机创建、队列创建、队列绑定的详细步骤以及每个参数的用法。下面是Java代码的示例:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class RabbitMQDemo {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接和通道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 创建交换机
String exchangeName = "myExchange";
String exchangeType = "direct";
boolean durable = true;
boolean autoDelete = false;
boolean internal = false;
Map<String, Object> arguments = null;
channel.exchangeDeclare(exchangeName, exchangeType, durable, autoDelete, internal, arguments);
// 创建队列
String queueName = "myQueue";
boolean exclusive = false;
boolean autoDelete = false;
channel.queueDeclare(queueName, true, exclusive, autoDelete, null);
// 绑定队列和交换机
String routingKey = "myRoutingKey";
channel.queueBind(queueName, exchangeName, routingKey);
// 发送消息
String message = "Hello, RabbitMQ!";
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
// 接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
}
};
channel.basicConsume(queueName, true, consumer);
// 关闭连接和通道
channel.close();
connection.close();
}
}
RabbitMQ生产者:
在RabbitMQ中,生产者是向消息队列发送消息的应用程序。下面是RabbitMQ中生产者的各个参数的详解以及相应的Java代码示例:
- Hostname:RabbitMQ服务器的主机名或IP地址。
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
- Port:RabbitMQ服务器的端口号。
ConnectionFactory factory = new ConnectionFactory();
factory.setPort(5672);
- Username和Password:连接RabbitMQ服务器的用户名和密码。
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
- Virtual Host:RabbitMQ服务器中的虚拟主机。
ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost("/");
- Exchange Name:消息发送到的交换机的名称。
String exchangeName = "myExchange";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
- Routing Key:消息发送到的队列的路由键。
String routingKey = "myRoutingKey";
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
- Message Properties:消息的属性,如消息ID、消息类型、过期时间等。
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2)
.priority(1)
.build();
channel.basicPublish(exchangeName, routingKey, properties, message.getBytes());
以上是RabbitMQ中生产者的各个参数的详解以及相应的Java代码示例。
RabbitMQ消费者:
在RabbitMQ中,消费者可以通过设置不同的参数来控制其行为。以下是RabbitMQ中消费者的各个参数的详细解释:
-
queue:指定要消费的队列名称。
-
autoAck:是否自动确认消息,默认为true,即自动确认消息。
-
exclusive:是否为独占模式,默认为false,即非独占模式。
-
consumerTag:消费者标签,用于在取消消费者时指定消费者。
-
noLocal:是否禁止消费者使用与生产者相同的连接来接收消息,默认为false,即不禁止。
-
noAck:是否需要手动确认消息,默认为false,即需要手动确认。
-
arguments:消费者的其他参数,如优先级等。
以下是一个使用Java编写的消费者示例,其中设置了queue、autoAck、consumerTag和noAck参数:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
// 创建连接
Connection connection = factory.newConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
}
};
// 开始消费
channel.basicConsume(QUEUE_NAME, true, "myConsumerTag", false, false, null, consumer);
}
}
在上面的示例中,我们创建了一个名为hello的队列,并创建了一个消费者。消费者使用DefaultConsumer类实现,重写了handleDelivery方法来处理接收到的消息。在最后,我们调用了basicConsume方法来开始消费消息。其中,第二个参数autoAck设置为true,表示自动确认消息;第三个参数consumerTag设置为myConsumerTag,用于在取消消费者时指定消费者;第四个参数noAck设置为false,表示需要手动确认消息。
标签:String,队列,路由,RabbitMQ,交换机,消息,详解 From: https://www.cnblogs.com/huangdh/p/17762404.html