Spring 集成提供了通道适配器,用于使用高级消息队列协议 (AMQP) 接收和发送消息。
您需要将此依赖项包含在项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-amqp</artifactId>
<version>6.0.0</version>
</dependency>
以下适配器可用:
- 入站通道适配器
- 入站网关
- 出站通道适配器
- 出站网关
- 异步出站网关
- RabbitMQ 流队列入站通道适配器
- RabbitMQ 流队列出站通道适配器
Spring 集成还提供了点对点消息通道和由 AMQP 交换和队列支持的发布-订阅消息通道。
为了提供AMQP支持,Spring Integration依赖于(Spring AMQP),它将Spring的核心概念应用于基于AMQP的消息传递解决方案的开发。 Spring AMQP提供了与Spring JMS类似的语义。
虽然提供的AMQP通道适配器仅用于单向消息传递(发送或接收),但Spring Integration还提供了用于请求-回复操作的入站和出站AMQP网关。
提示: 您应该熟悉Spring AMQP项目的参考文档。 它提供了有关Spring与AMQP集成的更深入的信息,特别是RabbitMQ。
入站通道适配器
以下清单显示了 AMQP 入站通道适配器的可能配置选项:
@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "aName"))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
容器 请注意,在使用XML配置外部容器时,不能使用Spring AMQP命名空间来定义容器。 这是因为命名空间至少需要一个元素。 在此环境中,侦听器位于适配器内部。 因此,您必须使用常规 Spring 定义来定义容器,如以下示例所示: <bean id="container" |
尽管 Spring Integration JMS 和 AMQP 支持相似,但存在重要差异。 JMS 入站通道适配器正在使用底层,并且需要配置的轮询器。 AMQP 入站通道适配器使用 和 是消息驱动的。 在这方面,它更类似于 JMS 消息驱动的通道适配器。 |
从版本 5.5 开始,可以使用在内部调用重试操作时使用的策略进行配置。 有关更多信息,请参阅 JavaDocs。AmqpInboundChannelAdapter
org.springframework.amqp.rabbit.retry.MessageRecoverer
RecoveryCallback
setMessageRecoverer()
批处理消息
有关批处理消息的更多信息,请参阅 Spring AMQP 文档。
要使用 Spring 集成生成批处理消息,只需使用 .BatchingRabbitTemplate
接收批处理消息时,默认情况下,侦听器容器提取每个片段消息,适配器将为每个片段生成 。 从版本 5.2 开始,如果容器的属性设置为 ,则由适配器执行去批处理,并生成一个有效负载为片段有效负载列表(如果适用,转换后)。Message<?>
deBatchingEnabled
false
Message<List<?>>
默认值为 ,但可以在适配器上覆盖。BatchingStrategy
SimpleBatchingStrategy
当重试操作需要恢复时,必须与批处理一起使用。 |
轮询入站通道适配器
概述
版本 5.0.1 引入了轮询通道适配器,允许您按需获取单个消息 — 例如,使用 或 轮询器。 有关详细信息,请参阅延迟确认可轮询消息源。MessageSourcePollingTemplate
它当前不支持 XML 配置。
以下示例显示如何配置:AmqpMessageSource
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from(Amqp.inboundPolledAdapter(connectionFactory(), DSL_QUEUE),
e -> e.poller(Pollers.fixedDelay(1_000)).autoStartup(false))
.handle(p -> {
...
})
.get();
}
有关配置属性,请参阅 Javadoc。
批处理消息
请参阅批处理消息。
对于轮询适配器,没有侦听器容器,批处理消息始终是反批处理的(如果支持这样做)。BatchingStrategy
入站网关
入站网关支持入站通道适配器上的所有属性(除了“通道”被“请求通道”替换),以及一些其他属性。 以下清单显示了可用的属性:
@Bean // return the upper cased payload
public IntegrationFlow amqpInboundGateway(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(Amqp.inboundGateway(connectionFactory, "foo"))
.transform(String.class, String::toUpperCase)
.get();
}
此适配器的唯一 ID。 自选。 |
将转换后的消息发送到的消息通道。 必填。 |
对接收 AMQP 消息时要使用的 的引用。 自选。 默认情况下,只有标准的 AMQP 属性(例如 )被复制到 Spring Integration 和从 Spring Integration 复制。 默认情况下,不会将 AMQP 中的任何用户定义的标头复制到 AMQP 消息或从 AMQP 消息复制。 如果提供了“请求标头名称”或“回复标头名称”,则不允许使用。 |
要从 AMQP 请求映射到 的 AMQP 标头名称的逗号分隔列表。 仅当未提供“标头映射器”引用时,才能提供此属性。 此列表中的值也可以是与标头名称匹配的简单模式(例如 或或)。 |
要映射到 AMQP 回复消息的 AMQP 消息属性中的名称的逗号分隔列表。 所有标准标头(例如)都映射到 AMQP 消息属性,而用户定义的标头映射到“标头”属性。 仅当未提供“标头映射器”引用时,才能提供此属性。 此列表中的值也可以是要与标头名称(例如,或或)匹配的简单模式。 |
消息通道,其中应回复消息。 自选。 |
设置用于从回复通道接收消息的基础。 如果未指定,此属性默认为 (1 秒)。 仅当容器线程在发送回复之前移交给另一个线程时,才适用。 |
自定义的 Bean 引用(以便更好地控制要发送的回复消息)。 您可以提供 的替代实现。 |
当 没有属性时要使用的。 如果未指定此选项,则提供 no,请求消息中不存在任何属性,并且 抛出 AN 是因为无法路由回复。 如果未指定此选项并提供外部选项,则不会引发异常。 您必须指定此选项或配置默认值,并在该模板上, 如果您预计请求消息中不存在任何属性的情况。 |
请参阅入站通道适配器中有关配置属性的说明。listener-container
从版本 5.5 开始,可以使用在内部调用重试操作时使用的策略进行配置。 有关更多信息,请参阅 JavaDocs。AmqpInboundChannelAdapter
org.springframework.amqp.rabbit.retry.MessageRecoverer
RecoveryCallback
setMessageRecoverer()
批处理消息
请参阅批处理消息。
入站终端节点确认模式
默认情况下,入站终端节点使用应答方式,这意味着容器会在下游集成流完成(或使用 或 将消息传递给另一个线程)时自动确认消息。 将模式设置为配置使用者,以便根本不使用确认(代理在发送消息后立即自动确认消息)。 设置模式以允许用户代码在处理过程中的某个其他点确认消息。 为了支持此功能,在此模式下,终结点分别在 和 中提供 和 标头。AUTO
QueueChannel
ExecutorChannel
NONE
MANUAL
Channel
deliveryTag
amqp_channel
amqp_deliveryTag
您可以对 执行任何有效的 Rabbit 命令,但通常只使用 and(或)。 为了不干扰容器的操作,不应保留对通道的引用,而应仅在当前消息的上下文中使用它。Channel
basicAck
basicNack
basicReject
由于 是对“活动”对象的引用,因此它不能序列化,如果持久保存消息,则会丢失。 |
以下示例演示如何使用确认:MANUAL
@ServiceActivator(inputChannel = "foo", outputChannel = "bar")
public Object handle(@Payload String payload, @Header(AmqpHeaders.CHANNEL) Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws Exception {
// Do some processing
if (allOK) {
channel.basicAck(deliveryTag, false);
// perhaps do some more processing
}
else {
channel.basicNack(deliveryTag, false, true);
}
return someResultForDownStreamProcessing;
}
出站终结点
以下出站终结点具有许多类似的配置选项。 从版本 5.2 开始,已添加。 通常,当启用发布者确认时,代理将快速返回一个 ack(或 nack),该 ack(或 nack)将被发送到相应的通道。 如果在收到确认之前关闭了通道,Spring AMQP 框架将合成一个 nack。 “丢失”确认不应发生,但如果设置此属性,则终结点将定期检查它们,并在时间过去而未收到确认时合成 nack。confirm-timeout
出站通道适配器
以下示例显示了 AMQP 出站通道适配器的可用属性:
@Bean
public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate,
MessageChannel amqpOutboundChannel) {
return IntegrationFlow.from(amqpOutboundChannel)
.handle(Amqp.outboundAdapter(amqpTemplate)
.routingKey("queue1")) // default exchange - route to queue 'queue1'
.get();
}
此适配器的唯一 ID。 自选。 |
消息通道,消息应发送到该通道,以便将其转换并发布到 AMQP 交换。 必填。 |
对已配置的 AMQP 模板的 Bean 引用。 可选(默认为 )。 |
向其发送消息的 AMQP 交换的名称。 如果未提供,消息将发送到默认的无名称交换。 与“交换名称表达”相互排斥。 自选。 |
一个 SpEL 表达式,计算该表达式以确定消息发送到的 AMQP 交换的名称,并将消息作为根对象。 如果未提供,消息将发送到默认的无名称交换。 与“交易所名称”相互排斥。 自选。 |
注册多个使用者时此使用者的顺序,从而启用负载平衡和故障转移。 可选(默认为 )。 |
发送消息时使用的固定路由密钥。 默认情况下,这是一个空的 . 与“路由密钥表达式”互斥。 自选。 |
一个 SpEL 表达式,计算该表达式以确定发送消息时要使用的路由密钥,消息作为根对象(例如,“payload.key”)。 默认情况下,这是一个空的 . 与“路由密钥”互斥。 自选。 |
邮件的默认传递方式:或 。 如果设置了传递方式,则覆盖。 如果存在 Spring 集成消息标头,则设置该值。 如果未提供此属性,并且标头映射器未设置此属性,则默认值取决于 使用的基础 Spring AMQP。 如果根本不自定义,则默认值为 。 自选。 |
定义相关性数据的表达式。 如果提供,这会将基础 AMQP 模板配置为接收发布者确认。 需要专用和属性设置为 . 收到发布者确认并提供相关数据时,将根据确认类型将其写入 或 。 确认的有效负载是相关数据,由此表达式定义。 邮件的“amqp_publishConfirm”标头设置为 () 或 ()。 示例:和 。 版本 4.1 引入了消息标头。 它包含用于发布者确认的“nack”。 从版本 4.2 开始,如果表达式解析为实例(例如 ),则在 / 通道上发出的消息基于该消息,并添加其他标头。 以前,无论类型如何,都会使用相关数据作为其有效负载创建新消息。 另请参阅发布者确认和返回的替代机制。 自选。 |
正 () 发布者确认发送到的通道。 有效负载是由 定义的关联数据。 如果表达式为 或 ,则消息是从原始消息构建的,标头设置为 。 另请参阅发布者确认和返回的替代机制。 可选(默认值为 )。 |
将负 () 发布者确认发送到的通道。 有效负载是由 定义的关联数据(如果未配置)。 如果表达式为 或 ,则消息是从原始消息构建的,标头设置为 。 当存在 时,消息是带有有效负载的消息。 另请参阅发布者确认和返回的替代机制。 可选(默认值为 )。 |
设置后,如果在此时间内未收到发布者确认(以毫秒为单位),适配器将合成否定确认 (nack)。 每 50% 检查一次挂起的确认,因此发送 nack 的实际时间将在此值的 1 倍到 1.5 倍之间。 另请参阅发布者确认和返回的替代机制。 默认 none (不会生成 nacks)。 |
设置为 true 时,调用线程将阻塞,等待发布者确认。 这需要配置确认以及 . 线程将阻塞长达 (或默认为 5 秒)。 如果发生超时,将抛出 。 如果启用了返回并返回了消息,或者在等待确认时发生任何其他异常,则将抛出 a 并显示相应的消息。 |
返回的消息发送到的通道。 提供后,基础 AMQP 模板配置为向适配器返回无法传递的消息。 如果未进行配置,则从从 AMQP 接收的数据构造消息,并具有以下附加标头:、、、。 当存在 时,消息是带有有效负载的消息。 另请参阅发布者确认和返回的替代机制。 自选。 |
对用于在发送返回或否定确认的消息时生成实例的实现的引用。 |
对发送 AMQP 消息时要使用的 的引用。 默认情况下,只有标准的 AMQP 属性(例如 )被复制到 Spring 集成 中。 任何用户定义的标头都不会通过默认的“DefaultAmqpHeaderMapper”复制到消息中。 如果提供了“请求标头名称”,则不允许。 自选。 |
要从 映射到 AMQP 消息的 AMQP 标头名称的逗号分隔列表。 如果提供了“标头映射器”引用,则不允许。 此列表中的值也可以是与标头名称匹配的简单模式(例如 或或)。 |
设置为 时,端点将在应用程序上下文初始化期间尝试连接到代理。 这允许对错误配置进行“快速故障”检测,但如果代理关闭,也会导致初始化失败。 当(默认值)时,当发送第一条消息时,将建立连接(如果它尚不存在,因为其他组件建立了它)。 |
设置为 时,类型的有效负载将作为离散消息在单个调用范围内在同一通道上发送。 需要 . when 为 true,在发送消息后调用。 使用事务模板,发送将在新事务或已启动事务(如果存在)中执行。 |
返回通道 使用 a 需要将属性设置为 的 和将属性设置为 的 。 将多个出站终结点与返回符一起使用时,每个终结点都需要一个单独的终结点。 |
出站网关
以下清单显示了 AMQP 出站网关的可能属性:
@Bean
public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate) {
return f -> f.handle(Amqp.outboundGateway(amqpTemplate)
.routingKey("foo")) // default exchange - route to queue 'foo'
.get();
}
@MessagingGateway(defaultRequestChannel = "amqpOutbound.input")
public interface MyGateway {
String sendToRabbit(String data);
}
此适配器的唯一 ID。 自选。 |
消息发送到的消息通道,消息被转换并发布到 AMQP 交换。 必填。 |
对已配置的 AMQP 模板的 Bean 引用。 可选(默认为 )。 |
应向其发送消息的 AMQP 交换的名称。 如果未提供,消息将发送到默认的无名称 cxchange。 与“交换名称表达”相互排斥。 自选。 |
一个 SpEL 表达式,计算该表达式以确定应将消息发送到的 AMQP 交换的名称,并将消息作为根对象。 如果未提供,消息将发送到默认的无名称交换。 与“交易所名称”相互排斥。 自选。 |
注册多个使用者时此使用者的顺序,从而启用负载平衡和故障转移。 可选(默认为 )。 |
从 AMQP 队列接收并转换回复后应发送到的消息通道。 自选。 |
网关在向 发送回复消息时等待的时间。 这仅适用于 can 阻止的情况,例如容量限制当前已满的 。 默认为无穷大。 |
当 时,如果属性中未收到回复消息,网关将引发异常。 默认值为 。 |
发送消息时使用的。 默认情况下,这是一个空的 . 与“路由密钥表达式”互斥。 自选。 |
一个 SpEL 表达式,经过计算以确定发送消息时使用的表达式,将消息作为根对象(例如,“payload.key”)。 默认情况下,这是一个空的 . 与“路由密钥”互斥。 自选。 |
邮件的默认传递方式:或 。 如果设置了传递方式,则覆盖。 如果存在 Spring 集成消息标头,则设置该值。 如果未提供此属性,并且标头映射器未设置此属性,则默认值取决于 使用的基础 Spring AMQP。 如果根本不自定义,则默认值为 。 自选。 |
从版本 4.2 开始。 定义相关数据的表达式。 如果提供,这会将基础 AMQP 模板配置为接收发布者确认。 需要专用和属性设置为 . 收到发布者确认并提供关联数据时,将根据确认类型将其写入 或 。 确认的有效负载是相关数据,由此表达式定义。 邮件的标头“amqp_publishConfirm”设置为 () 或 ()。 为了确认,Spring 集成提供了一个额外的标头。 示例:和 。 如果表达式解析为实例(例如 ),则消息 在 / 通道上发出的基于该消息,并添加了其他标头。 以前,无论类型如何,都会使用相关数据作为其有效负载创建新消息。 另请参阅发布者确认和返回的代机制。 自选。 |
向其发送正 () 发布者确认的通道。 有效负载是由 定义的关联数据。 如果表达式为 或 ,则消息是从原始消息构建的,标头设置为 。 另请参阅发布者确认和返回的替代机制。 可选(默认值为 )。 |
将负 () 发布者确认发送到的通道。 有效负载是由定义的关联数据(如果未配置)。 如果表达式为 或 ,则消息是从原始消息构建的,标头设置为 。 当存在 时,消息是带有有效负载的消息。 另请参阅发布者确认和返回的替代机制。 可选(默认值为 )。 |
设置后,如果在此时间内未收到发布者确认(以毫秒为单位),网关将合成否定确认 (nack)。 每 50% 检查一次挂起的确认,因此发送 nack 的实际时间将在此值的 1 倍到 1.5 倍之间。 默认 none (不会生成 nacks)。 |
返回的消息发送到的通道。 提供后,基础 AMQP 模板配置为向适配器返回无法传递的消息。 如果未进行配置,则根据从 AMQP 接收的数据构造消息,并具有以下附加标头:、、 和 。 当存在 时,消息是带有有效负载的消息。 另请参阅发布者确认和返回的替代机制。 自选。 |
对用于在发送返回或否定确认的消息时生成实例的实现的引用。 |
设置为 时,端点将在应用程序上下文初始化期间尝试连接到代理。 这允许在代理关闭时通过记录错误消息来“快速失败”检测错误配置。 当(默认值)时,当发送第一条消息时,将建立连接(如果它尚不存在,因为其他组件建立了它)。 |
返回通道 使用 a 需要将属性设置为 的 和将属性设置为 的 。 将多个出站终结点与返回符一起使用时,每个终结点都需要一个单独的终结点。 |
基础的默认值为 5 秒。 如果需要更长的超时,则必须在 上配置它。 |
请注意,出站适配器和出站网关配置之间的唯一区别是属性的设置。expectReply
异步出站网关
上一节中讨论的网关是同步的,因为发送线程挂起,直到 收到回复(或发生超时)。 Spring Integration 版本 4.3 添加了一个异步网关,该网关使用 from Spring AMQP。 发送消息时,线程会在发送操作完成后立即返回,收到消息后,将在模板的侦听器容器线程上发送回复。 在轮询器线程上调用网关时,这可能很有用。 线程已释放,可用于框架中的其他任务。AsyncRabbitTemplate
以下清单显示了 AMQP 异步出站网关的可能配置选项:
@Configuration
public class AmqpAsyncApplication {
@Bean
public IntegrationFlow asyncAmqpOutbound(AsyncRabbitTemplate asyncRabbitTemplate) {
return f -> f
.handle(Amqp.asyncOutboundGateway(asyncRabbitTemplate)
.routingKey("queue1")); // default exchange - route to queue 'queue1'
}
@MessagingGateway(defaultRequestChannel = "asyncAmqpOutbound.input")
public interface MyGateway {
String sendToRabbit(String data);
}
}
此适配器的唯一 ID。 自选。 |
消息通道,消息应发送到该通道,以便将其转换并发布到 AMQP 交换。 必填。 |
对已配置的 Bean 引用。 可选(默认为 )。 |
应向其发送消息的 AMQP 交换的名称。 如果未提供,消息将发送到默认的无名称交换。 与“交换名称表达”相互排斥。 自选。 |
一个 SpEL 表达式,计算该表达式以确定消息发送到的 AMQP 交换的名称,并将消息作为根对象。 如果未提供,消息将发送到默认的无名称交换。 与“交易所名称”相互排斥。 自选。 |
注册多个使用者时此使用者的顺序,从而启用负载平衡和故障转移。 可选(默认为 )。 |
从 AMQP 队列接收并转换回复后应发送到的消息通道。 自选。 |
网关在向 发送回复消息时等待的时间。 这仅适用于 can 阻止的情况,例如容量限制当前已满的 。 默认值为无穷大。 |
如果在属性中未收到回复消息,并且此设置为 ,网关将向入站消息的标头发送错误消息。 如果在属性中未收到回复消息,并且此设置为 ,则网关会将错误消息发送到默认值(如果可用)。 默认为 . |
发送消息时要使用的路由密钥。 默认情况下,这是一个空的 . 与“路由密钥表达式”互斥。 自选。 |
一个 SpEL 表达式,经过计算以确定发送消息时要使用的路由密钥, 将消息作为根对象(例如,“有效负载.key”)。 默认情况下,这是一个空的 . 与“路由密钥”互斥。 自选。 |
邮件的默认传递方式:或 。 如果设置了传递方式,则覆盖。 如果存在 Spring 集成消息标头 (),则设置该值。 如果未提供此属性,并且标头映射器未设置此属性,则默认值取决于 使用的基础 Spring AMQP。 如果未自定义,则缺省值为 。 自选。 |
定义相关性数据的表达式。 如果提供,这会将基础 AMQP 模板配置为接收发布者确认。 需要专用 和 ,其属性设置为 。 收到发布者确认并提供相关数据时,确认将写入 或 ,具体取决于确认类型。 确认的有效负载是此表达式定义的相关数据,消息的“amqp_publishConfirm”标头设置为 () 或 ()。 例如,提供了一个额外的标头 ()。 例子:。 如果表达式解析为实例(例如“#this”),则在 / 通道上发出的消息将基于该消息,并添加其他标头。 另请参阅发布者确认和返回的替代机制。 自选。 |
向其发送正 () 发布者确认的通道。 有效负载是由 定义的关联数据。 要求基础数据库将其属性设置为 。 另请参阅发布者确认和返回的替代机制。 可选(默认值为 )。 |
从版本 4.2 开始。 将负 () 发布者确认发送到的通道。 有效负载是由 定义的关联数据。 要求基础数据库将其属性设置为 。 另请参阅发布者确认和返回的替代机制。 可选(默认值为 )。 |
设置后,如果在此时间内未收到发布者确认(以毫秒为单位),网关将合成否定确认 (nack)。 每 50% 检查一次挂起的确认,因此发送 nack 的实际时间将在此值的 1 倍到 1.5 倍之间。 另请参阅发布者确认和返回的替代机制。 默认 none (不会生成 nacks)。 |
返回的消息发送到的通道。 提供后,基础 AMQP 模板配置为将无法传递的消息返回到网关。 该消息是根据从 AMQP 接收的数据构造的,具有以下附加标头:、、 和 。 要求基础数据库将其属性设置为 。 另请参阅发布者确认和返回的替代机制。 自选。 |
设置为 时,端点在应用程序上下文初始化期间尝试连接到代理。 这样做允许“快速失败”检测错误配置,方法是在代理关闭时记录错误消息。 当(默认值)建立连接时(如果由于建立了其他组件而不存在 it) 发送第一条消息时。 |
另请参阅异步服务激活器以获取详细信息。
兔子模板 当您使用确认和退货时,我们建议将有线成专用的。 否则,可能会遇到意想不到的副作用。 |
发布商确认和返回的替代机制
将连接工厂配置为发布者确认并返回时,上述部分将讨论消息通道的配置,以便异步接收确认和返回。 从版本 5.4 开始,还有一个通常更易于使用的附加机制。
在这种情况下,请勿配置 或 确认和返回通道。 相反,在标头中添加一个实例;然后,您可以通过检查已发送消息的实例中的未来状态来等待稍后的结果。 在将来完成之前,将始终填充该字段(如果返回消息)。confirm-correlation-expression
CorrelationData
AmqpHeaders.PUBLISH_CONFIRM_CORRELATION
CorrelationData
returnedMessage
CorrelationData corr = new CorrelationData("someId"); // <--- Unique "id" is required for returns
someFlow.getInputChannel().send(MessageBuilder.withPayload("test")
.setHeader("rk", "someKeyThatWontRoute")
.setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr)
.build());
...
try {
Confirm Confirm = corr.getFuture().get(10, TimeUnit.SECONDS);
Message returned = corr.getReturnedMessage();
if (returned !- null) {
// message could not be routed
}
}
catch { ... }
为了提高性能,您可能希望发送多条消息并稍后等待确认,而不是一次发送一条。 返回的消息是转换后的原始消息;您可以使用所需的任何其他数据对 A 进行子类。CorrelationData
入站消息转换
到达通道适配器或网关的入站消息将使用消息转换器转换为有效负载。 默认情况下,使用 a,用于处理 java 序列化和文本。 默认情况下,标头使用 映射。 如果发生转换错误,并且未定义错误通道,则会将异常引发到容器,并由侦听器容器的错误处理程序处理。 默认错误处理程序将转换错误视为致命错误,消息将被拒绝(如果队列已如此配置,则路由到死信交换)。 如果定义了错误通道,则有效负载为 具有属性(无法转换的 Spring AMQP 消息)和 . 如果容器是(默认值),并且错误流使用错误而不引发异常,则将确认原始消息。 如果错误流引发异常,则异常类型与容器的错误处理程序一起确定消息是否重新排队。 如果容器配置了 ,则有效负载是具有附加属性和 的 。 这使错误流能够调用 或(或)消息,以控制其处置。spring-messaging
Message<?>
SimpleMessageConverter
DefaultHeaderMapper.inboundMapper()
ErrorMessage
ListenerExecutionFailedException
failedMessage
cause
AcknowledgeMode
AUTO
AcknowledgeMode.MANUAL
ManualAckListenerExecutionFailedException
channel
deliveryTag
basicAck
basicNack
basicReject
出站消息转换
Spring AMQP 1.4 引入了 ,其中实际转换器的选择基于 在传入内容类型消息属性上。 这可由入站终端节点使用。ContentTypeDelegatingMessageConverter
从 Spring 集成版本 4.3 开始,您也可以在出站端点上使用 ,标头指定使用哪个转换器。ContentTypeDelegatingMessageConverter
contentType
以下示例配置了一个 ,默认转换器为 (处理 Java 序列化和纯文本),以及一个 JSON 转换器:ContentTypeDelegatingMessageConverter
SimpleMessageConverter
<amqp:outbound-channel-adapter id="withContentTypeConverter" channel="ctRequestChannel"
exchange-name="someExchange"
routing-key="someKey"
amqp-template="amqpTemplateContentTypeConverter" />
<int:channel id="ctRequestChannel"/>
<rabbit:template id="amqpTemplateContentTypeConverter"
connection-factory="connectionFactory" message-converter="ctConverter" />
<bean id="ctConverter"
class="o.s.amqp.support.converter.ContentTypeDelegatingMessageConverter">
<property name="delegates">
<map>
<entry key="application/json">
<bean class="o.s.amqp.support.converter.Jackson2JsonMessageConverter" />
</entry>
</map>
</property>
</bean>
将标头设置为 to 的消息发送到会导致选择 JSON 转换器。ctRequestChannel
contentType
application/json
这适用于出站通道适配器和网关。
从版本 5.0 开始,添加到出站邮件的标头永远不会被映射标头覆盖(默认情况下)。 以前,只有当消息转换器是 (在这种情况下,首先映射标头以便可以选择正确的转换器)时才会出现这种情况。 对于其他转换器,例如 ,映射标头将覆盖转换器添加的任何标头。 当出站邮件具有一些剩余的标头(可能来自入站通道适配器)并且正确的出站被错误地覆盖时,这会导致问题。 解决方法是在将消息发送到出站终结点之前使用标头筛选器删除标头。 但是,在某些情况下,需要以前的行为 - 例如,当包含 JSON 的有效负载不知道内容并将消息属性设置为,但应用程序希望通过设置发送到出站终结点的消息标头来覆盖该行为。 正是这样做的(默认情况下)。 现在,在出站通道适配器和网关(以及 AMQP 支持的通道)上调用了一个属性。 设置此选项可还原覆盖转换器添加的属性的行为。 从版本 5.1.9 开始,当我们生成回复并希望覆盖转换器填充的标头时,提供了类似的情况。 有关更多信息,请参阅其 JavaDocs。 |
出站用户标识
Spring AMQP 版本 1.6 引入了一种机制,允许为出站消息指定默认用户 ID。 始终可以设置标头,该标头现在优先于默认值。 这可能对邮件收件人有用。 对于入站邮件,如果邮件发布者设置了该属性,则该属性将在标头中可用。 请注意,RabbitMQ 会验证用户 ID 是连接的实际用户 ID,还是连接允许模拟。AmqpHeaders.USER_ID
AmqpHeaders.RECEIVED_USER_ID
要为出站消息配置缺省用户标识,请在 上配置该标识,并将出站适配器或网关配置为使用该模板。 同样,要在回复上设置用户 ID 属性,请将适当配置的模板注入入站网关。 有关更多信息,请参阅 Spring AMQP 文档。RabbitTemplate
延迟消息交换
Spring AMQP 支持 RabbitMQ 延迟消息交换插件。 对于入站邮件,标头映射到标头。 设置标头会导致在出站邮件中设置相应的标头。 还可以在出站终结点上指定 and 属性(使用 XML 配置时)。 这些属性优先于标头。x-delay
AmqpHeaders.RECEIVED_DELAY
AMQPHeaders.DELAY
x-delay
delay
delayExpression
delay-expression
AmqpHeaders.DELAY
AMQP 支持的消息通道
有两种消息通道实现可用。 一个是点对点,另一个是发布-订阅。 这两个通道都为基础和(如本章前面的通道适配器和网关所示)提供了广泛的配置属性。 但是,我们在此处显示的示例具有最小配置。 浏览 XML 架构以查看可用属性。AmqpTemplate
SimpleMessageListenerContainer
点对点通道可能类似于以下示例:
<int-amqp:channel id="p2pChannel"/>
在幕后,前面的示例导致声明一个命名,并且此通道发送到该通道(从技术上讲,通过使用与此名称匹配的路由密钥发送到无名称的直接交换)。 此通道还在此 上注册消费者。 如果希望通道是“可轮询的”而不是消息驱动的,请为标志提供值 ,如以下示例所示:Queue
si.p2pChannel
Queue
Queue
Queue
message-driven
false
<int-amqp:channel id="p2pPollableChannel" message-driven="false"/>
发布-订阅通道可能如下所示:
<int-amqp:publish-subscribe-channel id="pubSubChannel"/>
在后台,前面的示例会导致声明名为的扇出交换,并且此通道发送到该扇出交换。 此通道还声明一个以服务器命名的独占、自动删除、非持久,并将其绑定到扇出交换,同时注册使用者以接收消息。 发布-订阅-通道没有“可轮询”选项。 它必须是消息驱动的。si.fanout.pubSubChannel
Queue
Queue
从版本 4.1 开始,AMQP 支持的消息通道(与 一起)支持单独配置 和 对于 . 请注意,以前是默认的。 现在,默认情况下,它适用于 .channel-transacted
template-channel-transacted
transactional
AbstractMessageListenerContainer
RabbitTemplate
channel-transacted
true
false
AbstractMessageListenerContainer
在版本 4.3 之前,AMQP 支持的通道仅支持具有有效负载和标头的消息。 整个消息被转换(序列化)并发送到 RabbitMQ。 现在,您可以将属性(或使用 Java 配置时)设置为 。 当此标志为 时,将转换消息有效负载并映射标头,其方式类似于使用通道适配器时的方式。 这种安排允许 AMQP 支持的通道与不可序列化的有效负载一起使用(可能与其他消息转换器一起使用,例如 )。 有关默认映射标头的更多信息,请参阅 AMQP 消息标头。 您可以通过提供使用 和 属性的自定义映射器来修改映射。 现在,您还可以指定 ,用于在没有标头时设置传递模式。 默认情况下,Spring AMQP 使用交付模式。Serializable
extract-payload
setExtractPayload()
true
true
Jackson2JsonMessageConverter
outbound-header-mapper
inbound-header-mapper
default-delivery-mode
amqp_deliveryMode
MessageProperties
PERSISTENT
与其他支持持久性的通道一样,支持 AMQP 的通道旨在提供消息持久性以避免消息丢失。 它们不用于将工作分发给其他对等应用程序。 为此,请改用通道适配器。 |
从版本 5.0 开始,可轮询通道现在会阻止指定的轮询器线程(默认值为 1 秒)。 以前,与其他实现不同,如果没有可用的消息,线程会立即返回到调度程序,而不管接收超时如何。 阻止比使用 a 检索消息(没有超时)要昂贵一些,因为必须创建一个使用者来接收每条消息。 若要恢复以前的行为,请将轮询器的 0 设置为 0。 |
使用 Java 配置进行配置
以下示例显示如何使用 Java 配置配置通道:
@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("foo");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("bar");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("baz");
factoryBean.setPubSub(false);
return factoryBean;
}
使用 Java DSL 进行配置
以下示例显示如何使用 Java DSL 配置通道:
@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.pollableChannel(connectionFactory)
.queueName("foo"))
...
.get();
}
@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.channel(connectionFactory)
.queueName("bar"))
...
.get();
}
@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.publishSubscribeChannel(connectionFactory)
.queueName("baz"))
...
.get();
}
AMQP 消息标头
概述
Spring 集成 AMQP 适配器会自动映射所有 AMQP 属性和标头。 (这是对 4.3 的更改 - 以前,仅映射标准标头)。 默认情况下,这些属性通过使用 DefaultAmqpHeaderMapper 复制到 Spring Integration 中。MessageHeaders
您可以传入自己的特定于 AMQP 的标头映射程序的实现,因为适配器具有支持这样做的属性。
AMQP 消息属性中的任何用户定义的标头都将复制到 AMQP 消息或从 AMQP 消息复制,除非 的 或 属性显式否定。 默认情况下,对于出站映射器,不映射任何标头。 请参阅本节后面出现的警告,了解原因。requestHeaderNames
replyHeaderNames
DefaultAmqpHeaderMapper
x-*
要覆盖默认值并恢复到 4.3 之前的行为,请在属性中使用 和。STANDARD_REQUEST_HEADERS
STANDARD_REPLY_HEADERS
映射用户定义的标头时,这些值还可以包含要匹配的简单通配符模式(例如 或)。 匹配所有标头。 |
从版本 4.1 开始,(超类)允许为 和 属性(除了现有的 和 )配置令牌,以映射所有用户定义的标头。AbstractHeaderMapper
DefaultAmqpHeaderMapper
NON_STANDARD_HEADERS
requestHeaderNames
replyHeaderNames
STANDARD_REQUEST_HEADERS
STANDARD_REPLY_HEADERS
该类标识由 :org.springframework.amqp.support.AmqpHeaders
DefaultAmqpHeaderMapper
-
amqp_appId
-
amqp_clusterId
-
amqp_contentEncoding
-
amqp_contentLength
-
content-type
(请参阅内容类型标头) -
amqp_correlationId
-
amqp_delay
-
amqp_deliveryMode
-
amqp_deliveryTag
-
amqp_expiration
-
amqp_messageCount
-
amqp_messageId
-
amqp_receivedDelay
-
amqp_receivedDeliveryMode
-
amqp_receivedExchange
-
amqp_receivedRoutingKey
-
amqp_redelivered
-
amqp_replyTo
-
amqp_timestamp
-
amqp_type
-
amqp_userId
-
amqp_publishConfirm
-
amqp_publishConfirmNackCause
-
amqp_returnReplyCode
-
amqp_returnReplyText
-
amqp_returnExchange
-
amqp_returnRoutingKey
-
amqp_channel
-
amqp_consumerTag
-
amqp_consumerQueue
如本节前面所述,使用 标头映射模式 是复制所有标头的常用方法。 但是,这可能会产生一些意想不到的副作用,因为某些 RabbitMQ 专有属性/标头也会被复制。 例如,使用联合身份验证时,收到的消息可能具有一个名为 的属性,该属性包含发送消息的节点。 如果对入站网关上的请求和回复标头映射使用通配符,则会复制此标头,这可能会导致联合身份验证出现一些问题。 此回复消息可能会联合回发送代理,发送代理可能会认为消息正在循环,因此以静默方式丢弃它。 如果您希望使用通配符标头映射的便利性,则可能需要筛选出下游流中的一些标头。 例如,为了避免将标头复制回回复,您可以在将回复发送到 AMQP 入站网关之前使用。 或者,可以显式列出实际要映射的属性,而不是使用通配符。 出于这些原因,对于入站消息,映射器(默认情况下)不映射任何标头。 它也不会将 映射到标头,以避免该标头从入站消息传播到出站消息。 相反,此标头映射到 ,不会映射到输出。 |
从版本 4.3 开始,标头映射中的模式可以通过在模式前面加上 来否定。 否定模式获得优先级,因此诸如 不映射(也不是 )之类的列表。 标准标头加上 和 被映射。 否定技术可能很有用,例如,当 JSON 反序列化逻辑以不同的方式在接收方下游完成时,不映射传入消息的 JSON 类型标头。 为此,应为入站通道适配器/网关的标头映射器配置模式。!
STANDARD_REQUEST_HEADERS,thing1,ba*,!thing2,!thing3,qux,!thing1
thing1
thing2
thing3
bad
qux
!json_*
如果您有一个用户定义的标头,该标头以您希望映射的标头开头,则需要对其进行转义,如下所示:。 现在已映射名为标题的标头。 |
从版本 5.1 开始,如果出站消息中不存在相应的 or 标头,则将分别回退到映射和 to。 入站属性将像以前一样映射到标头。 当消息使用者使用有状态重试时填充属性很有用。 |
页眉contentType
与其他标头不同,不以 ;这允许跨不同技术透明地传递 contentType 标头。 例如,发送到 RabbitMQ 队列的入站 HTTP 消息。AmqpHeaders.CONTENT_TYPE
amqp_
标头映射到Spring AMQP的属性,随后映射到RabbitMQ的属性。contentType
MessageProperties.contentType
content_type
在版本 5.1 之前,此标头也映射为映射中的条目;这是不正确的,此外,该值可能是错误的,因为基础 Spring AMQP 消息转换器可能已更改内容类型。 这样的更改将反映在 first-class 属性中,但不反映在 RabbitMQ 标头映射中。 入站映射忽略标头映射值。 不再映射到标头映射中的条目。MessageProperties.headers
content_type
contentType
严格的消息排序
本节介绍入站和出站消息的消息排序。
入境
如果需要对入站消息进行严格排序,则必须将入站侦听器容器的属性配置为 。 这是因为,如果消息失败并重新传递,它将在现有的预取消息之后到达。 从 Spring AMQP 2.0 版本开始,默认为提高性能。 严格的订购要求是以性能下降为代价的。prefetchCount
1
prefetchCount
250
出境
请考虑以下集成流程:
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.split(s -> s.delimiters(","))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
假设我们发送消息 ,并发送到网关。 虽然消息 、 很可能是按顺序发送的,但不能保证。 这是因为模板为每个发送操作从缓存中“借用”一个通道,并且不能保证对每条消息使用相同的通道。 一种解决方案是在拆分器之前启动事务,但是在 RabbitMQ 中事务成本高昂,并且会使性能降低数百倍。A
B
C
A
B
C
为了以更有效的方式解决这个问题,从版本 5.1 开始,Spring 集成提供了 这是一个 . 请参阅处理消息建议。 在拆分器之前应用时,它可确保在同一通道上执行所有下游操作,并且可以选择等到收到所有已发送消息的发布者确认(如果连接工厂配置为确认)。 以下示例演示如何使用:BoundRabbitChannelAdvice
HandleMessageAdvice
BoundRabbitChannelAdvice
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.split(s -> s.delimiters(",")
.advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
请注意,在建议和出站适配器中使用了相同的(实现)。 该建议在模板的方法中运行下游流,以便所有操作都在同一通道上运行。 如果提供了可选的超时,则当流完成时,建议将调用该方法,如果在指定时间内未收到确认,则会引发异常。RabbitTemplate
RabbitOperations
invoke
waitForConfirmsOrDie
下游流(、 和其他流)中不得有线程切换。 |
AMQP 样品
要试验 AMQP 适配器,请查看 Spring 集成示例 git 存储库中提供的示例,网址为 https://github.com/SpringSource/spring-integration-samples
目前,一个示例通过使用出站通道适配器和入站通道适配器演示了 Spring 集成 AMQP 适配器的基本功能。 由于示例中的 AMQP 代理实现使用 RabbitMQ。
为了运行该示例,您需要一个正在运行的 RabbitMQ 实例。 仅具有基本默认值的本地安装就足够了。 有关详细的 RabbitMQ 安装过程,请参阅 https://www.rabbitmq.com/install.html |
启动示例应用程序后,在命令提示符下输入一些文本,包含该输入文本的消息将调度到 AMQP 队列。 作为回报,Spring Integration 检索该消息并将其打印到控制台。
下图说明了此示例中使用的一组基本 Spring 集成组件。
RabbitMQ 流队列支持
AMQP 示例图像的弹簧集成图::images/spring-integration-amqp-sample-graph.png[]
版本 6.0 引入了对 RabbitMQ 流队列的支持。
这些终结点的 DSL 工厂类是 。Rabbit
RabbitMQ 流入站通道适配器
@Bean
IntegrationFlow flow(Environment env) {
@Bean
IntegrationFlow simpleStream(Environment env) {
return IntegrationFlow.from(RabbitStream.inboundAdapter(env)
.configureContainer(container -> container.queueName("my.stream")))
// ...
.get();
}
@Bean
IntegrationFlow superStream(Environment env) {
return IntegrationFlow.from(RabbitStream.inboundAdapter(env)
.configureContainer(container -> container.superStream("my.stream", "my.consumer")))
// ...
.get();
}
}
RabbitMQ 流出站通道适配器
@Bean标签:标头,amqp,适配器,支持,消息,RabbitMQ,通道,AMQP From: https://blog.51cto.com/u_15326439/5920833
IntegrationFlow outbound(RabbitStreamTemplate template) {
return f -> f
// ...
.handle(RabbitStream.outboundStreamAdapter(template));
}