AMQP (Advanced Message Queuing Protocol) 即高级消息队列协议,是一个进程间传递异步消息的网络协议。
AMQP 模型
工作过程如下:首先发布者(Publisher)发布消息(Message),经由交换机 Exchange。交换机根据路由规则将收到的消息分发给与该交换机绑定的 Queue。最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。
关于 AMQP 模型的几点说明:
- 发布者、交换机、队列、消费者都可以有多个。AMQP 是一个网络协议,所以这个过程中的发布者,消费者,消息代理可以分别存在于不同的设备上。
- 布者发布消息时可以给消息指定各种消息属性(Message Meta-data)。有些属性有可能会被消息代理(Brokers)使用,然而其他的属性则是完全不透明的,它们只能被接收消息的应用所使用。
- 从安全角度考虑,网络是不可靠的,又或是消费者在处理消息的过程中意外挂掉,这样没有处理成功的消息就会丢失。基于此原因,AMQP 模块包含了一个消息确认机制:当一个消息从队列中投递给消费者后,不会立即从队列中删除,直到它收到来自消费者的确认回执(Acknowledgement)后,才完全从队列中删除。
- 在某些情况下,例如当一个消息无法被成功路由时(无法从交换机分发到队列),消息或许会被返回给发布者并被丢弃。或者,如果消息代理执行了延期操作,消息会被放入一个死信队列中。此时,消息发布者可以选择某些参数来处理这些特殊情况。
Producer & Consumer
消息生产者(Producer),向 Broker 发送消息的客户端。
消息消费者(Consumer),从 Broker 消费消息的客户端。
Broker
一个 RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点,或者 RabbitMQ 服务实例。大多数情况下可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。
Exchange
Exchange 即交换器,是用来发送消息的 AMQP 实体。Exchange 拿到一个消息之后将它路由给一个或零个队列。Exchange 使用哪种路由算法是由交换机类型和绑定(Bindings)规则所决定的。
Binding
Producer 将消息发给 Exchange 时,一般会指定一个 RoutingKey (路由键),用来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和 BindingKey (绑定键) 联合使用才能最终生效。
RabbitMQ 中通过 Binding (绑定) 将 Exchange 与 Queue(消息队列) 关联起来,在绑定时一般会指定一个 BindingKey,这样 RabbitMQ 就知道如何正确将消息路由到 Queue 中。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则。
生产者将消息发送给交换器,当 BindingKey 和 RoutingKey 相匹配时,消息会被路由到对应的队列中。注意BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型,比如 fanout 类型的交换器就会无视,而是将消息路由到所有绑定到该交换器的队列中。
Exchange 类型
Exchange 有以下 4 种类型,不同的类型对应着不同的路由策略:
direct
Exchange 默认类型。路由规则是把消息路由到 Bindingkey 与 RoutingKey 完全匹配的 Queue 中。direct 类型常用在处理有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。
以上图为例,如果发送消息的时候 RoutingKey="booking",那么消息会路由到 Queue1 和 Queue2。如果在发送消息的时候设置 RoutingKey="create" 或 "confirm",消息只会路由到Queue2。如果以其他的 RoutingKey 发送消息,则消息不会路由到这两个队列中。
fanout
路由规则是把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中,不需要做任何判断操作,所以 fanout 类型是所有的交换机类型里面速度最快的。fanout 类型常用来广播消息。
topic
direct 类型的 Exchange 路由规则是完全匹配 BindingKey 和 RoutingKey ,但是这种严格的匹配方式在很多情况下不能满足实际业务的需求。
topic 类型的 Exchange 在匹配规则上进行了扩展,它与 direct 类型的 Exchange 相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:
- RoutingKey 为一个点号
.
分隔的字符串,其中被.
分隔开的每一段独立的字符串称为一个单词; - BindingKey 和 RoutingKey 一样也是点号
.
分隔的字符串; - BindingKey 中可以存在两种特殊字符串
*
和#
,用于做模糊匹配,其中*
用于匹配一个单词,#
用于匹配零个或多个单词。
以上图为例,如果发送消息的时候 RoutingKey 为
- "com.rabbitmq.client",那么消息会路由到 Queue1 和 Queue2
- "com.hidden.client",那么消息只会路由到 Queue2 中
- "com.hidden.demo",那么消息只会路由到 Queue2 中
- "java.rabbitmq.demo",那么消息只会路由到 Queue1 中
- "java.util.concurrent",那么消息将会被丢弃或者返回给生产者,因为它没有匹配任何路由键。
headers
headers 类型的 Exchange 不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。在绑定队列和交换器时指定一组键值对,当发送消息到交换器时,RabbitMQ 会获取到该消息的 headers(也是一个键值对的形式),对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 类型的 Exchange 性能会很差,不推荐使用。
Queue
Queue 其实是 Message Queue 即消息队列,保存消息并将它们转发给消费者。Queue 是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其消费。
RabbitMQ 中消息只能存储在队列中,而 Kafka 将消息存储在 Topic 中,即该 Topic 对应的 Partition 中。RabbitMQ 的生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费。
当多个消费者订阅同一个队列时,队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,这样避免消息被重复消费。
队列属性
Queue 跟 Exchange 共享某些属性,但是队列也有一些另外的属性:
- Name
- Durable:消息代理重启后,队列依旧存在
- Exclusive:只被一个连接使用,而且当连接关闭后队列即被删除
- Auto-delete:当最后一个消费者退订后即被删除
- Arguments:一些消息代理用他来完成类似与 TTL 的某些额外功能
队列创建
队列在声明(declare)后才能被使用。
如果一个队列尚不存在,声明一个队列会创建它。如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。如果声明中的属性与已存在队列的属性有差异,那么一个错误代码为 406 的通道级异常就会被抛出。
队列持久化
持久化队列(Durable Queues)会被存储在磁盘上,当消息代理(Broker)重启的时候,它依旧存在。没有被持久化的队列称作暂存队列(Transient Queues)。并不是所有的场景和案例都需要将队列持久化。
持久化的队列并不会使得路由到它的消息也具有持久性。倘若消息代理挂掉了,重新启动,那么在重启的过程中持久化队列会被重新声明,无论怎样,只有经过持久化的消息才能被重新恢复。
消息机制
消息确认
AMQP 代理在什么时候删除消息才是正确的?AMQP 0-9-1 规范给我们两种建议:
-
自动确认模式:当消息代理(Broker)将消息发送给应用后立即删除。(使用 AMQP 方法:basic.deliver 或 basic.get-ok)
-
显示确认模式:待 Consumer 发送一个确认回执(acknowledgement)后再删除消息。(使用 AMQP 方法:basic.ack)
如果一个消费者在尚未发送确认回执的情况下挂掉了,那 AMQP 代理会将消息重新投递给另一个消费者。如果当时没有可用的消费者了,消息代理会死等下一个注册到此队列的消费者,然后再次尝试投递。
拒绝消息
当拒绝某条消息时,应用可以告诉消息代理销毁该条消息或者重新将该条消息放入队列。
当此队列只有一个消费者时,有可能存在拒绝消息并将消息重新放入队列的行为而引起消息在同一个消费者身上无限循环的情况。
预取消息
在多个消费者共享一个队列时,明确指定在收到下一个确认回执前每个消费者一次可以接受多少条消息是非常有用的。这可以在试图批量发布消息的时候起到简单的负载均衡和提高消息吞吐量的作用。
消息属性
AMQP 模型中的消息(Message)对象是带有属性(Attributes)的:
属性 | 说明 |
---|---|
Content type | 内容类型 |
Content encoding | 内容编码 |
Routing key | 路由键 |
Delivery mode (persistent or not) | 投递模式(持久化 或 非持久化) |
Message priority | 消息优先权 |
Message publishing timestamp | 消息发布的时间戳 |
Expiration period | 消息有效期 |
Publisher application id | 发布应用的 ID |
有些属性是被 AMQP 代理所使用的,但是大多数是开放给接收它们的应用解释器用的。有些属性是可选的也被称作消息头(headers)。和 HTTP 协议的 X-Headers 很相似,消息属性需要在消息被发布的时候定义。
消息主体
AMQP 的消息除属性外,也含有一个有效载荷 Payload(消息实际携带的数据),它被 AMQP 代理当作不透明的字节数组来对待。
消息代理不会检查或者修改 Payload,消息可以只包含属性而不携带有效载荷,它通常会使用类似 JSON 这种序列化的格式数据。
消息持久化
消息能够以持久化的方式发布,AMQP 代理会将此消息存储在磁盘上。如果服务器重启,系统会确认收到的持久化消息未丢失。
简单地将消息发送给一个持久化的交换机或者路由给一个持久化的队列,并不会使得此消息具有持久化性质:它完全取决与消息本身的持久模式(persistence mode)。将消息以持久化方式发布时,会对性能造成一定的影响(就像数据库操作一样,健壮性的存在必定造成一些性能损失)。
参考:
标签:协议,AMQP,Exchange,队列,消息,路由,RoutingKey From: https://www.cnblogs.com/i9code/p/17998175