首页 > 其他分享 >Spring AMQP项目(四)

Spring AMQP项目(四)

时间:2022-12-19 12:01:44浏览次数:67  
标签:return AMQP 项目 Spring 示例 消息 new public String

Spring AMQP项目(四)_客户端

4.1.12. 代理事件侦听器

启用事件交换插件后,如果将类型的 Bean 添加到应用程序上下文中,它将选定的代理事件发布为实例,这些实例可以使用普通的 Spring 或方法使用。 事件由代理发布到主题交换,每个事件类型使用不同的路由密钥。 侦听器使用事件键,这些键用于将 绑定到交换,以便侦听器仅接收选定的事件。 由于它是主题交换,因此可以使用通配符(以及显式请求特定事件),如以下示例所示:​​BrokerEventListener​​​​BrokerEvent​​​​ApplicationListener​​​​@EventListener​​​​amq.rabbitmq.event​​​​AnonymousQueue​

@Bean
public BrokerEventListener eventListener() {
return new BrokerEventListener(connectionFactory(), "user.deleted", "channel.#", "queue.#");
}

您可以使用常规的 Spring 技术进一步缩小单个事件侦听器中接收的事件的范围,如以下示例所示:

@EventListener(condition = "event.eventType == 'queue.created'")
public void listener(BrokerEvent event) {
...
}

4.1.13. 延迟消息交换

版本 1.6 引入了对延迟消息交换插件的支持

该插件目前被标记为实验性,但已经可用一年多(在撰写本文时)。 如果有必要对插件进行更改,我们计划尽快添加对此类更改的支持。 出于这个原因,Spring AMQP中的这种支持也应该被认为是实验性的。 此功能已通过 RabbitMQ 3.6.0 和插件版本 0.0.1 进行了测试。

要使用 将交换声明为延迟,可以将交换 Bean 上的属性设置为 。 使用 交换类型 (、 等) 来设置参数和 声明与类型 的交换。​​RabbitAdmin​​​​delayed​​​​true​​​​RabbitAdmin​​​​Direct​​​​Fanout​​​​x-delayed-type​​​​x-delayed-message​

该属性(缺省值:)在使用 XML 配置交换 Bean 时也可用。 以下示例演示如何使用它:​​delayed​​​​false​

<rabbit:topic-exchange name="topic" delayed="true" />

要发送延迟消息,可以通过 设置标头,如以下示例所示:​​x-delay​​​​MessageProperties​

MessageProperties properties = new MessageProperties();
properties.setDelay(15000);
template.send(exchange, routingKey,
MessageBuilder.withBody("foo".getBytes()).andProperties(properties).build());
rabbitTemplate.convertAndSend(exchange, routingKey, "foo", new MessagePostProcessor() {

@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(15000);
return message;
}

});

要检查消息是否延迟,请使用 上的方法。 它是一个单独的属性,用于避免意外传播到从输入消息生成的输出消息。​​getReceivedDelay()​​​​MessageProperties​

4.1.14. 兔子MQ REST API

启用管理插件后,RabbitMQ 服务器会公开一个 REST API 来监控和配置代理。 现在提供了 API 的 Java 绑定。 这是一个标准的、即时的,因此是阻塞的 API。 它基于 Spring Web 模块及其实现。 另一方面,这是一个基于Reactor Netty项目的响应式、非阻塞实现。​​com.rabbitmq.http.client.Client​​​​RestTemplate​​​​com.rabbitmq.http.client.ReactorNettyClient​

跃点依赖关系 () 现在也是 。​​com.rabbitmq:http-client​​​​optional​

有关更多信息,请参阅他们的 Javadoc。

4.1.15. 异常处理

使用 RabbitMQ Java 客户端的许多操作可能会引发检查异常。 例如,在很多情况下可能会引发实例。 、 和其他 Spring AMQP 组件捕获这些异常,并将它们转换为层次结构中的异常之一。 这些在“org.springframework.amqp”包中定义,并且是层次结构的基础。​​IOException​​​​RabbitTemplate​​​​SimpleMessageListenerContainer​​​​AmqpException​​​​AmqpException​

当侦听器引发异常时,它被包装在 . 通常,消息被代理拒绝并重新排队。 设置为 会导致丢弃消息(或路由到死信交换)。 如消息侦听器和异步案例中所述,侦听器可以抛出 (或) 来有条件地控制此行为。​​ListenerExecutionFailedException​​​​defaultRequeueRejected​​​​false​​​​AmqpRejectAndDontRequeueException​​​​ImmediateRequeueAmqpException​

但是,存在一类错误,其中侦听器无法控制行为。 当遇到无法转换的消息(例如,无效的标头)时,在消息到达用户代码之前会引发一些异常。 设置为 (默认) (或抛出 ),此类消息将一遍又一遍地重新传递。 在版本 1.3.2 之前,用户需要编写一个 自定义 ,如异常处理中所述,以避免这种情况。​​content_encoding​​​​defaultRequeueRejected​​​​true​​​​ImmediateRequeueAmqpException​​​​ErrorHandler​

从版本 1.3.2 开始,默认值现在为 拒绝(并且不重新排队)失败并出现不可恢复错误的消息。 具体而言,它会拒绝失败并出现以下错误的邮件:​​ErrorHandler​​​​ConditionalRejectingErrorHandler​

  • ​o.s.amqp…MessageConversionException​​:在使用 转换传入消息负载时可能会引发。MessageConverter
  • ​o.s.messaging…MessageConversionException​​:如果在映射到方法时需要额外的转换,则转换服务可以引发。@RabbitListener
  • ​o.s.messaging…MethodArgumentNotValidException​​:如果在侦听器中使用验证(例如 )并且验证失败,则可能引发。@Valid
  • ​o.s.messaging…MethodArgumentTypeMismatchException​​:如果入站消息转换为目标方法不正确的类型,则可能会引发。 例如,参数声明为但接收。Message<Foo>Message<Bar>
  • ​java.lang.NoSuchMethodException​​:版本 1.6.3 中新增。
  • ​java.lang.ClassCastException​​:版本 1.6.3 中新增。

您可以使用 配置此错误处理程序的实例,以便用户可以提供自己的条件消息拒绝规则,例如,从 Spring 重试(消息侦听器和异步案例)的委托实现。 此外,now 还有一个可以在决策中使用的属性。 如果该方法返回 ,则错误处理程序将抛出 . 当异常被确定为致命时,默认值会记录一条警告消息。​​FatalExceptionStrategy​​​​BinaryExceptionClassifier​​​​ListenerExecutionFailedException​​​​failedMessage​​​​FatalExceptionStrategy.isFatal()​​​​true​​​​AmqpRejectAndDontRequeueException​​​​FatalExceptionStrategy​

从版本 1.6.3 开始,将用户异常添加到致命列表的一种便捷方法是子类并重写该方法以返回致命异常。​​ConditionalRejectingErrorHandler.DefaultExceptionStrategy​​​​isUserCauseFatal(Throwable cause)​​​​true​

处理 DLQ 消息的常见模式是对这些消息以及其他 DLQ 配置进行设置 ,以便这些消息过期并路由回主队列重试。 此技术的问题在于导致致命异常的消息将永远循环。 从版本 2.1 开始,它会检测消息上的标头,该标头会导致引发致命异常。 将记录并丢弃该消息。 可以通过将 上的属性设置为 来恢复到以前的行为。​​time-to-live​​​​ConditionalRejectingErrorHandler​​​​x-death​​​​discardFatalsWithXDeath​​​​ConditionalRejectingErrorHandler​​​​false​

从版本 2.1.9 开始,默认情况下,具有这些致命异常的消息将被拒绝且不会重新排队,即使容器确认模式为 MANUAL。 这些异常通常发生在调用侦听器之前,因此侦听器没有机会确认或取消消息,因此消息仍处于未确认状态。 要恢复到以前的行为,请将 上的属性设置为 。​​rejectManual​​​​ConditionalRejectingErrorHandler​​​​false​

4.1.16. 交易

Spring Rabbit 框架支持同步和异步用例中的自动事务管理,具有许多不同的语义,可以通过声明方式选择,正如 Spring 事务的现有用户所熟悉的那样。 这使得许多(如果不是最常见的)消息传递模式易于实现。

有两种方法可以将所需的事务语义发送到框架。 在 和 中都有一个标志,如果 ,它告诉框架使用事务通道并通过提交或回滚(取决于结果)结束所有操作(发送或接收),异常表示回滚。 另一个信号是提供带有 Spring 实现之一的外部事务,作为正在进行的操作的上下文。 如果在框架发送或接收消息时已经有一个事务正在进行中,并且标志为 ,则消息传递事务的提交或回滚将推迟到当前事务结束。 如果标志为 ,则没有事务语义适用于消息传递操作(它是自动确认的)。​​RabbitTemplate​​​​SimpleMessageListenerContainer​​​​channelTransacted​​​​true​​​​PlatformTransactionManager​​​​channelTransacted​​​​true​​​​channelTransacted​​​​false​

该标志是配置时间设置。 它在创建 AMQP 组件时声明和处理一次,通常在应用程序启动时。 外部事务原则上更加动态,因为系统在运行时响应当前线程状态。 但是,在实践中,当事务以声明方式分层到应用程序时,它通常也是一个配置设置。​​channelTransacted​

对于与 的同步用例,外部事务由调用方提供,根据口味以声明方式或命令式提供(通常的 Spring 事务模型)。 以下示例显示了一种声明性方法(通常是首选方法,因为它是非侵入性的),其中模板已配置为:​​RabbitTemplate​​​​channelTransacted=true​

@Transactional
public void doSomething() {
String incoming = rabbitTemplate.receiveAndConvert();
// do some more database processing...
String outgoing = processInDatabaseAndExtractReply(incoming);
rabbitTemplate.convertAndSend(outgoing);
}

在前面的示例中,有效负载在标记为 . 如果数据库处理失败并出现异常,那么传入消息将返回到代理,并且不发送传出消息。 这适用于在事务方法链内部的任何操作(例如,除非直接操纵以提前提交事务)。​​String​​​​@Transactional​​​​RabbitTemplate​​​​Channel​

对于具有 的异步用例,如果需要外部事务,则必须由容器在设置侦听器时请求。 为了指示需要外部事务,用户在配置容器时提供容器的实现。 以下示例演示如何执行此操作:​​SimpleMessageListenerContainer​​​​PlatformTransactionManager​

@Configuration
public class ExampleExternalTransactionAmqpConfiguration {

@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setTransactionManager(transactionManager());
container.setChannelTransacted(true);
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}

}

在前面的示例中,事务管理器被添加为从另一个 Bean 定义注入的依赖项(未显示),并且该标志也设置为 。 其效果是,如果侦听器失败并出现异常,事务将回滚,并且消息也将返回到代理。 值得注意的是,如果事务无法提交(例如,由于 数据库约束错误或连接问题),AMQP 事务也会回滚,消息将返回到代理。 这有时被称为“尽力而为的 1 阶段提交”,是一种非常强大的可靠消息传递模式。 如果在前面的示例中将该标志设置为 (默认值),则仍会为侦听器提供外部事务,但所有消息传递操作都会自动确认,因此效果是即使在业务操作回滚时也能提交消息传递操作。​​channelTransacted​​​​true​​​​channelTransacted​​​​false​

条件回滚

在版本 1.6.6 之前,使用外部事务管理器(如 JDBC)时向容器添加回滚规则不起作用。 异常始终回滚事务。​​transactionAttribute​

此外,在容器的建议链中使用事务建议时,条件回滚不是很有用,因为所有侦听器异常都包装在 .​​ListenerExecutionFailedException​

第一个问题已得到纠正,规则现已正确应用。 此外,现在提供了。 它是 的一个子类,唯一的区别是它知道并使用规则的此类异常的原因。 此事务属性可以直接在容器中使用,也可以通过事务建议使用。​​ListenerFailedRuleBasedTransactionAttribute​​​​RuleBasedTransactionAttribute​​​​ListenerExecutionFailedException​

以下示例使用此规则:

@Bean
public AbstractMessageListenerContainer container() {
...
container.setTransactionManager(transactionManager);
RuleBasedTransactionAttribute transactionAttribute =
new ListenerFailedRuleBasedTransactionAttribute();
transactionAttribute.setRollbackRules(Collections.singletonList(
new NoRollbackRuleAttribute(DontRollBackException.class)));
container.setTransactionAttribute(transactionAttribute);
...
}
关于回滚已接收消息的说明

AMQP 事务仅适用于发送到代理的消息和确认。 因此,当Spring事务回滚并且收到消息时,Spring AMQP不仅要回滚事务,还要手动拒绝消息(有点麻烦,但这不是规范所说的)。 对消息拒绝执行的操作与事务无关,并且取决于属性(默认值:)。 有关拒绝失败消息的详细信息,请参阅消息侦听器和异步案例。​​defaultRequeueRejected​​​​true​

有关 RabbitMQ 事务及其限制的更多信息,请参阅 RabbitMQ 代理语义。

在 RabbitMQ 2.7.0 之前,此类消息(以及通道关闭或中止时取消的任何消息)在 Rabbit 代理上排在队列的后面。 从 2.7.0 开始,被拒绝的消息会排在队列的前面,其方式与 JMS 回滚消息类似。

以前,事务回滚时的消息重新排队在本地事务和提供 时不一致。 在前一种情况下,应用了正常的重新排队逻辑( 或 )(请参阅消息侦听器和异步情况​)。 使用事务管理器,消息在回滚时无条件地重新排队。 从版本 2.0 开始,行为是一致的,并且在这两种情况下都应用正常的重新排队逻辑。 若要恢复到以前的行为,可以将容器的属性设置为 。 请参阅消息侦听器容器配置​。​​TransactionManager​​​​AmqpRejectAndDontRequeueException​​​​defaultRequeueRejected=false​​​​alwaysRequeueWithTxManagerRollback​​​​true​

用​​RabbitTransactionManager​

RabbitTransactionManager 是在外部事务中执行 Rabbit 操作并与外部事务同步的替代方法。 此事务管理器是 PlatformTransactionManager 接口的实现,应与单个 Rabbit 一起使用。​​ConnectionFactory​

此策略无法提供 XA 事务 — 例如,为了在消息传递和数据库访问之间共享事务。

需要应用程序代码来检索事务性 Rabbit 资源,而不是后续创建通道的标准调用。 当使用Spring AMQP的RabbitTemplate时,它将自动检测线程绑定的通道并自动参与其事务。​​ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean)​​​​Connection.createChannel()​

使用 Java Configuration,您可以使用以下 Bean 设置新的 RabbitTransactionManager:

@Bean
public RabbitTransactionManager rabbitTransactionManager() {
return new RabbitTransactionManager(connectionFactory);
}

如果您更喜欢 XML 配置,则可以在 XML 应用程序上下文文件中声明以下 Bean:

<bean id="rabbitTxManager"
class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
事务同步

将 RabbitMQ 事务与其他一些事务(例如 DBMS)同步可提供“尽力而为一阶段提交”语义。 RabbitMQ 事务在事务同步的完成后阶段可能无法提交。 基础结构将此记录为错误,但不会向调用代码引发异常。 从版本 2.3.10 开始,您可以在处理事务的同一线程上提交事务后调用。 如果没有发生异常,它将简单地返回;否则,它将抛出一个属性,该属性表示完成的同步状态。​​spring-tx​​​​ConnectionUtils.checkAfterCompletion()​​​​AfterCompletionFailedException​

通过调用启用此功能;这是一个全局标志,适用于所有线程。​​ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true)​

4.1.17. 消息侦听器容器配置

配置与事务和服务质量相关的 (SMLC) 和 (DMLC) 的选项有很多,其中一些选项相互交互。 适用于 SMLC、DMLC 或 (StLC) 的属性(请参阅使用 RabbitMQ 流插件)由相应列中的复选标记指示。 有关帮助您确定哪个容器适合您的应用程序的信息,请参阅选择容器。​​SimpleMessageListenerContainer​​​​DirectMessageListenerContainer​​​​StreamListenerContainer​

下表显示了使用命名空间配置 . 该元素上的属性可以是(默认)或分别指定 或。 命名空间不会公开某些属性。 这些由属性指示。​​<rabbit:listener-container/>​​​​type​​​​simple​​​​direct​​​​SMLC​​​​DMLC​​​​N/A​

4.1.18. 监听器并发

SimpleMessageListenerContainer

默认情况下,侦听器容器启动从队列接收消息的单个使用者。

检查上一节中的表时,可以看到许多控制并发的属性。 最简单的是 ,它创建并发处理消息的(固定)数量的使用者。​​concurrentConsumers​

在版本 1.3.0 之前,这是唯一可用的设置,必须停止并重新启动容器才能更改设置。

从版本 1.3.0 开始,现在可以动态调整属性。 如果在容器运行时更改了使用者,则会根据需要添加或删除使用者,以调整到新设置。​​concurrentConsumers​

此外,还添加了一个名为的新属性,容器会根据工作负载动态调整并发性。 这将与四个附加属性结合使用:、、 和 。 使用默认设置,增加使用者的算法的工作方式如下:​​maxConcurrentConsumers​​​​consecutiveActiveTrigger​​​​startConsumerMinInterval​​​​consecutiveIdleTrigger​​​​stopConsumerMinInterval​

如果未达到 ,并且现有使用者连续十个周期处于活动状态,并且自启动最后一个使用者以来至少已经过去了 10 秒,则启动新的使用者。 如果使用者在 * 毫秒内至少收到一条消息,则认为该使用者处于活动状态。​​maxConcurrentConsumers​​​​batchSize​​​​receiveTimeout​

使用默认设置,减少使用者的算法的工作方式如下:

如果运行次数超过,并且使用者检测到连续十次超时(空闲),并且最后一个使用者至少在 60 秒前停止,则使用者将停止。 超时取决于 和 属性。 如果使用者在 * 毫秒内未收到任何消息,则将其视为空闲。 因此,在默认超时(1 秒)和 4 秒的情况下,在空闲时间 40 秒后考虑停止使用者(四个超时对应于一个空闲检测)。​​concurrentConsumers​​​​receiveTimeout​​​​batchSize​​​​batchSize​​​​receiveTimeout​​​​batchSize​

实际上,只有当整个容器闲置一段时间时,才能停止使用者。 这是因为代理在所有活跃消费者之间共享其工作。

每个使用者使用单个通道,而不考虑配置的队列数量。

从版本 2.0 开始,可以使用属性设置 and 属性,例如 。​​concurrentConsumers​​​​maxConcurrentConsumers​​​​concurrency​​​​2-4​

用​​DirectMessageListenerContainer​

使用此容器,并发性基于配置的队列和 . 每个队列的每个使用者使用单独的通道,并发性由 rabbit 客户端库控制。 默认情况下,在撰写本文时,它使用线程池。​​consumersPerQueue​​​​DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2​

您可以配置 以提供所需的最大并发性。​​taskExecutor​

4.1.19. 独家消费者

从版本 1.3 开始,您可以使用单个独占使用者配置侦听器容器。 这可以防止其他容器从队列中使用,直到当前使用者被取消。 此类容器的并发性必须是 。​​1​

使用独占使用者时,其他容器会尝试根据属性从队列中使用,并在尝试失败时记录消息。​​recoveryInterval​​​​WARN​

4.1.20. 侦听器容器队列

版本 1.3 引入了许多改进,用于处理侦听器容器中的多个队列。

必须将容器配置为侦听至少一个队列。 以前也是这种情况,但现在可以在运行时添加和删除队列。 容器在处理任何预提取的消息后回收(取消并重新创建)使用者。 请参阅 ​​Javadoc​​ 中的 、 和 方法。 删除队列时,必须至少保留一个队列。​​addQueues​​​​addQueueNames​​​​removeQueues​​​​removeQueueNames​

如果使用者的任何队列可用,则使用者现在启动。 以前,如果任何队列不可用,容器将停止。 现在,仅当没有任何队列可用时,才会出现这种情况。 如果并非所有队列都可用,则容器会尝试每 60 秒被动声明(并使用)缺少的队列。

此外,如果使用者从代理收到取消(例如,如果队列被删除),则使用者会尝试恢复,并且恢复的使用者将继续处理来自任何其他已配置队列的消息。 以前,一个队列上的取消会取消整个使用者,最终,容器将由于缺少队列而停止。

如果要永久删除队列,则应在删除队列之前或之后更新容器,以避免将来尝试从中消耗容器。

4.1.21. 弹性:从错误和代理故障中恢复

Spring AMQP提供的一些关键(也是最受欢迎的)高级功能与协议错误或代理故障时的恢复和自动重新连接有关。 我们已经在本指南中看到了所有相关组件,但在此处将它们全部整合在一起并单独调用功能和恢复方案应该会有所帮助。

主要重新连接功能由 本身启用。 使用自动声明功能通常也是有益的。 此外,如果您关心保证交付,您可能还需要在 和 中使用标志(如果您自己做 acks,则使用手册)。​​CachingConnectionFactory​​​​RabbitAdmin​​​​channelTransacted​​​​RabbitTemplate​​​​SimpleMessageListenerContainer​​​​AcknowledgeMode.AUTO​​​​SimpleMessageListenerContainer​

自动声明交换、队列和绑定

该组件可以在启动时声明交换、队列和绑定。 它懒惰地通过 . 因此,如果启动时代理不存在,则无关紧要。 第一次使用 a 时(例如, 通过发送消息)侦听器触发并应用管理功能。 在侦听器中执行自动声明的另一个好处是,如果连接因任何原因而断开(例如, 代理死亡、网络故障等),当重新建立连接时,将再次应用它们。​​RabbitAdmin​​​​ConnectionListener​​​​Connection​

以这种方式声明的队列必须具有固定的名称 — 显式声明或由实例框架生成。 匿名队列是非持久队列、独占队列和自动删除队列。​​AnonymousQueue​

仅当缓存模式为 (默认值) 时,才会执行自动声明。 存在此限制的原因是独占队列和自动删除队列绑定到连接。​​CachingConnectionFactory​​​​CHANNEL​

从版本 2.2.2 开始,将在实际处理声明之前检测类型的 bean 并应用函数。 例如,这很有用,可以在框架中获得第一类支持之前设置新参数(属性)。​​RabbitAdmin​​​​DeclarableCustomizer​

@Bean
public DeclarableCustomizer customizer() {
return dec -> {
if (dec instanceof Queue && ((Queue) dec).getName().equals("my.queue")) {
dec.addArgument("some.new.queue.argument", true);
}
return dec;
};
}

它对于不提供对 Bean 定义的直接访问的项目也很有用。​​Declarable​

另请参阅 RabbitMQ 自动连接/拓扑恢复。

同步操作中的失败和重试选项

如果您在使用时(例如)以同步顺序失去与代理的连接,Spring AMQP 会抛出一个(通常,但并非总是)。 我们不会试图隐瞒存在问题的事实,因此您必须能够捕获并响应异常。 如果您怀疑连接丢失(这不是您的错),最简单的方法是再次尝试该操作。 您可以手动执行此操作,也可以考虑使用 Spring 重试来处理重试(命令性或声明性)。​​RabbitTemplate​​​​AmqpException​​​​AmqpIOException​

Spring 重试提供了几个 AOP 拦截器和很大的灵活性来指定重试的参数(尝试次数、异常类型、退避算法等)。 Spring AMQP 还提供了一些方便的工厂 bean,用于为 AMQP 用例以方便的形式创建 Spring 重试拦截器,并具有可用于实现自定义恢复逻辑的强类型回调接口。 有关更多详细信息,请参阅 Javadoc 和 的属性。 如果没有事务或在重试回调内启动事务,则适合无状态重试。 请注意,无状态重试比有状态重试更易于配置和分析,但如果存在必须回滚或肯定要回滚的正在进行的事务,则通常不合适。 事务过程中断开连接应具有与回滚相同的效果。 因此,对于事务在堆栈较高位置启动的重新连接,有状态重试通常是最佳选择。 有状态重试需要一种机制来唯一标识消息。 最简单的方法是让发件人在消息属性中放置一个唯一值。 提供的消息转换器提供了一个执行此操作的选项:您可以设置为 . 否则,可以将实现注入到侦听器中。 密钥生成器必须为每条消息返回唯一密钥。 在 2.0 版之前的版本中,提供了 a。 它允许没有属性的消息只重试一次(忽略重试设置)。 不再提供此建议,因为与版本 1.2 一起,其功能内置于拦截器和消息侦听器容器中。​​StatefulRetryOperationsInterceptor​​​​StatelessRetryOperationsInterceptor​​​​MessageId​​​​createMessageIds​​​​true​​​​MessageKeyGenerator​​​​MissingMessageIdAdvice​​​​messageId​​​​spring-retry​

为了向后兼容,默认情况下(重试一次后),具有空消息 ID 的消息对使用者(使用者已停止)被视为致命消息。 若要复制 提供的功能,可以在侦听器容器上将该属性设置为 。 使用该设置,使用者将继续运行,并且消息被拒绝(重试一次后)。 它被丢弃或路由到死信队列(如果配置了死信队列)。​​MissingMessageIdAdvice​​​​statefulRetryFatalWithNullMessageId​​​​false​

从版本 1.3 开始,提供了一个构建器 API 来帮助使用 Java(在类中)组装这些拦截器。 以下示例演示如何执行此操作:​​@Configuration​

@Bean
public StatefulRetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateful()
.maxAttempts(5)
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.build();
}

只能以这种方式配置一部分重试功能。 更高级的功能需要将 a 配置为 Spring bean。 有关可用策略及其配置的完整信息,请参阅 Spring 重试 Javadoc。​​RetryTemplate​

使用批处理侦听器重试

建议不要使用批处理侦听器配置重试,除非批处理是由生成者在单个记录中创建的。 有关使用者和生产者创建的批处理的信息,请参阅批处理消息。 对于使用者创建的批处理,框架不知道批处理中的哪条消息导致失败,因此无法在重试用尽后进行恢复。 对于生产者创建的批处理,由于只有一个消息实际失败,因此可以恢复整个消息。 应用程序可能希望通知自定义恢复程序批处理中发生故障的位置,可能通过设置引发的异常的索引属性。

批处理侦听器的重试恢复程序必须实现 。​​MessageBatchRecoverer​

消息侦听器和异步案例

如果 由于业务异常而失败,则异常由消息侦听器容器处理,然后返回到侦听另一条消息。 如果失败是由连接断开(不是业务异常)引起的,则必须取消并重新启动为侦听器收集消息的使用者。 无缝处理,并留下一个日志来说明侦听器正在重新启动。 事实上,它无休止地循环,试图重新启动消费者。 只有当消费者确实表现得非常糟糕时,它才会放弃。 一个副作用是,如果代理在容器启动时关闭,它会继续尝试,直到可以建立连接。​​MessageListener​​​​SimpleMessageListenerContainer​

与协议错误和断开连接断开相反,业务异常处理可能需要更多考虑和一些自定义配置,尤其是在使用事务或容器确认的情况下。 在 2.8.x 之前,RabbitMQ 没有死信行为的定义。 因此,默认情况下,由于业务异常而被拒绝或回滚的邮件可以无休止地重新传递。 为了限制客户重新交付的次数,一种选择是在听众的建议链中。 拦截器可以有一个恢复回调,该回调实现自定义死信操作 - 任何适合您的特定环境的操作。​​StatefulRetryOperationsInterceptor​

另一种方法是将容器的属性设置为 。 这会导致丢弃所有失败的消息。 当使用 RabbitMQ 2.8.x 或更高版本时,这也有助于将消息传递到死信交换。​​defaultRequeueRejected​​​​false​

或者,您可以抛出 . 这样做可以防止消息重新排队,而不管属性的设置如何。​​AmqpRejectAndDontRequeueException​​​​defaultRequeueRejected​

从版本 2.1 开始,引入了 来执行完全相反的逻辑:无论属性的设置如何,消息都将重新排队。​​ImmediateRequeueAmqpException​​​​defaultRequeueRejected​

通常,使用这两种技术的组合。 您可以在建议链中使用 a 和抛出 . 当所有重试都已用尽时调用。 正是这样做的。 默认值使用错误消息并发出消息。​​StatefulRetryOperationsInterceptor​​​​MessageRecoverer​​​​AmqpRejectAndDontRequeueException​​​​MessageRecover​​​​RejectAndDontRequeueRecoverer​​​​MessageRecoverer​​​​WARN​

从版本 1.3 开始,提供了一个新版本,以允许在重试用尽后发布失败的消息。​​RepublishMessageRecoverer​

当恢复程序使用最后一个异常时,消息将被确认,并且不会由代理发送到死信交换(如果已配置)。

当在消费者端使用时,接收的消息在消息属性中。 在本例中为 . 这意味着经纪人的交付模式。 从版本 2.0 开始,如果 是 ,则可以将 配置为要重新发布的消息中的 。 默认情况下,它使用默认值 - 。​​RepublishMessageRecoverer​​​​deliveryMode​​​​receivedDeliveryMode​​​​deliveryMode​​​​null​​​​NON_PERSISTENT​​​​RepublishMessageRecoverer​​​​deliveryMode​​​​null​​​​MessageProperties​​​​MessageDeliveryMode.PERSISTENT​

以下示例演示如何将 设置为 恢复程序:​​RepublishMessageRecoverer​

@Bean
RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
.build();
}

发布消息时,邮件头中包含其他信息,例如异常消息、堆栈跟踪、原始交换和路由密钥。 可以通过创建子类并覆盖 来添加其他标头。 也可以在 中更改 (或任何其他属性),如以下示例所示:​​RepublishMessageRecoverer​​​​additionalHeaders()​​​​deliveryMode​​​​additionalHeaders()​

RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(amqpTemplate, "error") {

protected Map<? extends String, ? extends Object> additionalHeaders(Message message, Throwable cause) {
message.getMessageProperties()
.setDeliveryMode(message.getMessageProperties().getReceivedDeliveryMode());
return null;
}

};

从版本 2.0.5 开始,如果堆栈跟踪太大,则可能会截断堆栈跟踪;这是因为所有标题都必须适合单个帧。 默认情况下,如果堆栈跟踪会导致可用于其他标头的字节少于 20,000 字节(“余量”),则它将被截断。 这可以通过设置恢复程序的属性进行调整,如果您需要更多或更少的空间来容纳其他标头。 从版本 2.1.13、2.2.3 开始,异常消息将包含在此计算中,并且使用以下算法最大化堆栈跟踪量:​​frameMaxHeadroom​

  • 如果单独的堆栈跟踪超过限制,则异常消息标头将被截断为 97 字节加,堆栈跟踪也会被截断。…​
  • 如果堆栈跟踪很小,则消息将被截断(加号)以适应可用字节(但堆栈跟踪本身中的消息被截断为 97 字节加)。…​…​

每当发生任何类型的截断时,将记录原始异常以保留完整信息。 在增强标头后执行计算,以便可以在表达式中使用异常类型等信息。

从版本 2.4.8 开始,错误交换和路由密钥可以作为 SpEL 表达式提供,作为评估的根对象。​​Message​

从版本 2.3.3 开始,提供了一个新的子类;这支持两种样式的发布者确认,并将等待确认后再返回(如果未确认或返回消息,则引发异常)。​​RepublishMessageRecovererWithConfirms​

如果确认类型为 ,则子类还将检测是否返回消息并抛出 ;如果发布被否定确认,它将抛出 .​​CORRELATED​​​​AmqpMessageReturnedException​​​​AmqpNackReceivedException​

如果确认类型为 ,则子类将在通道上调用该方法。​​SIMPLE​​​​waitForConfirmsOrDie​

请参阅发布商确认和退货,了解有关确认和退货的更多信息。

从版本 2.1 开始,添加了 以抛出 ,这会通知侦听器容器将当前失败的消息重新排队。​​ImmediateRequeueMessageRecoverer​​​​ImmediateRequeueAmqpException​

春季重试的异常分类

Spring 重试在确定哪些异常可以调用重试方面具有很大的灵活性。 默认配置会重试所有异常。 鉴于用户异常包含在 中,我们需要确保分类检查异常原因。 默认分类器仅查看顶级异常。​​ListenerExecutionFailedException​

从 Spring 重试 1.0.3 开始,具有一个名为 (默认:) 的属性。 当 时,它会遍历异常原因,直到找到匹配项或没有原因。​​BinaryExceptionClassifier​​​​traverseCauses​​​​false​​​​true​

要使用此分类器重试,您可以使用 create with 构造函数,该构造函数采用最大尝试次数、实例数和布尔值 (),并将此策略注入 .​​SimpleRetryPolicy​​​​Map​​​​Exception​​​​traverseCauses​​​​RetryTemplate​

4.1.22. 多代理(或集群)支持

版本 2.3 在单个应用程序与多个代理或代理集群之间进行通信时增加了更多便利。 在消费者端,主要好处是基础设施可以自动将自动声明的队列与适当的代理相关联。

用一个例子来最好地说明这一点:

@SpringBootApplication(exclude = RabbitAutoConfiguration.class)
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Bean
CachingConnectionFactory cf1() {
return new CachingConnectionFactory("localhost");
}

@Bean
CachingConnectionFactory cf2() {
return new CachingConnectionFactory("otherHost");
}

@Bean
CachingConnectionFactory cf3() {
return new CachingConnectionFactory("thirdHost");
}

@Bean
SimpleRoutingConnectionFactory rcf(CachingConnectionFactory cf1,
CachingConnectionFactory cf2, CachingConnectionFactory cf3) {

SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
rcf.setDefaultTargetConnectionFactory(cf1);
rcf.setTargetConnectionFactories(Map.of("one", cf1, "two", cf2, "three", cf3));
return rcf;
}

@Bean("factory1-admin")
RabbitAdmin admin1(CachingConnectionFactory cf1) {
return new RabbitAdmin(cf1);
}

@Bean("factory2-admin")
RabbitAdmin admin2(CachingConnectionFactory cf2) {
return new RabbitAdmin(cf2);
}

@Bean("factory3-admin")
RabbitAdmin admin3(CachingConnectionFactory cf3) {
return new RabbitAdmin(cf3);
}

@Bean
public RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}

@Bean
public RabbitListenerAnnotationBeanPostProcessor postProcessor(RabbitListenerEndpointRegistry registry) {
MultiRabbitListenerAnnotationBeanPostProcessor postProcessor
= new MultiRabbitListenerAnnotationBeanPostProcessor();
postProcessor.setEndpointRegistry(registry);
postProcessor.setContainerFactoryBeanName("defaultContainerFactory");
return postProcessor;
}

@Bean
public SimpleRabbitListenerContainerFactory factory1(CachingConnectionFactory cf1) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf1);
return factory;
}

@Bean
public SimpleRabbitListenerContainerFactory factory2(CachingConnectionFactory cf2) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf2);
return factory;
}

@Bean
public SimpleRabbitListenerContainerFactory factory3(CachingConnectionFactory cf3) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf3);
return factory;
}

@Bean
RabbitTemplate template(SimpleRoutingConnectionFactory rcf) {
return new RabbitTemplate(rcf);
}

@Bean
ConnectionFactoryContextWrapper wrapper(SimpleRoutingConnectionFactory rcf) {
return new ConnectionFactoryContextWrapper(rcf);
}

}

@Component
class Listeners {

@RabbitListener(queuesToDeclare = @Queue("q1"), containerFactory = "factory1")
public void listen1(String in) {

}

@RabbitListener(queuesToDeclare = @Queue("q2"), containerFactory = "factory2")
public void listen2(String in) {

}

@RabbitListener(queuesToDeclare = @Queue("q3"), containerFactory = "factory3")
public void listen3(String in) {

}

}

如您所见,我们已经声明了 3 套基础结构(连接工厂、管理员、容器工厂)。 如前所述,可以定义要使用的容器工厂;在这种情况下,他们还使用 这会导致队列在代理上声明(如果它不存在)。 通过用约定命名 bean,基础设施能够确定哪个管理员应该声明队列。 这也将适用于也将宣布交换和约束。 它不适用于 ,因为这需要队列已经存在。​​@RabbitListener​​​​queuesToDeclare​​​​RabbitAdmin​​​​<container-factory-name>-admin​​​​bindings = @QueueBinding(…)​​​​queues​

在生产者端,提供了一个方便的类,使使用(请参阅路由连接工厂)更简单。​​ConnectionFactoryContextWrapper​​​​RoutingConnectionFactory​

正如您在上面看到的,添加了带有路由键和 的 bean 。 还有一个使用该工厂。 下面是将该模板与包装器一起使用以路由到其中一个代理集群的示例。​​SimpleRoutingConnectionFactory​​​​one​​​​two​​​​three​​​​RabbitTemplate​

@Bean
public ApplicationRunner runner(RabbitTemplate template, ConnectionFactoryContextWrapper wrapper) {
return args -> {
wrapper.run("one", () -> template.convertAndSend("q1", "toCluster1"));
wrapper.run("two", () -> template.convertAndSend("q2", "toCluster2"));
wrapper.run("three", () -> template.convertAndSend("q3", "toCluster3"));
};
}

4.1.23. 调试

Spring AMQP 提供了广泛的日志记录,尤其是在级别。​​DEBUG​

如果您希望监视应用程序和代理之间的AMQP协议,则可以使用诸如WireShark之类的工具,该工具具有用于解码协议的插件。 或者,RabbitMQ Java客户端附带了一个非常有用的类,称为。 默认情况下,当以 身份运行时,它会侦听端口 5673 并连接到本地主机上的端口 5672。 您可以运行它并更改连接工厂配置以连接到本地主机上的端口 5673。 它在控制台上显示解码的协议。 有关更多信息,请参阅 Javadoc。​​Tracer​​​​main​​​​Tracer​

4.2. 使用 RabbitMQ 流插件

版本 2.4 为 RabbitMQ Stream 插件引入了对 RabbitMQ Stream 插件的 Java 客户端的初始支持。

  • ​RabbitStreamTemplate​
  • ​StreamListenerContainer​

4.2.1. 发送消息

提供 (AMQP) 功能的子集。​​RabbitStreamTemplate​​​​RabbitTemplate​

例 3.兔子流运营

public interface RabbitStreamOperations extends AutoCloseable {

ConvertableFuture<Boolean> send(Message message);

ConvertableFuture<Boolean> convertAndSend(Object message);

ConvertableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);

ConvertableFuture<Boolean> send(com.rabbitmq.stream.Message message);

MessageBuilder messageBuilder();

MessageConverter messageConverter();

StreamMessageConverter streamMessageConverter();

@Override
void close() throws AmqpException;

}

该实现具有以下构造函数和属性:​​RabbitStreamTemplate​

例 4.兔子流模板

public RabbitStreamTemplate(Environment environment, String streamName) {
}

public void setMessageConverter(MessageConverter messageConverter) {
}

public void setStreamConverter(StreamMessageConverter streamConverter) {
}

public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}

在方法中用于将对象转换为 Spring AMQP 。​​MessageConverter​​​​convertAndSend​​​​Message​

用于从 Spring AMQP 转换为本机流。​​StreamMessageConverter​​​​Message​​​​Message​

您也可以直接发送本机流;使用证明对 消息生成器的访问的方法。​​Message​​​​messageBuilder()​​​​Producer​

它提供了一种在生成之前自定义生成器的机制。​​ProducerCustomizer​

请参阅 Java 客户机文档,了解如何定制 和 。​​Environment​​​​Producer​

从版本 3.0 开始,方法返回类型代替 。​​CompletableFuture​​​​ListenableFuture​

4.2.2. 接收消息

异步消息接收由 (以及 在使用时 提供)。​​StreamListenerContainer​​​​StreamRabbitListenerContainerFactory​​​​@RabbitListener​

侦听器容器需要一个 AND 和一个流名称。​​Environment​

您可以使用经典的 接收 Spring AMQP,也可以使用新接口接收本机流:​​Message​​​​MessageListener​​​​Message​

public interface StreamMessageListener extends MessageListener {

void onStreamMessage(Message message, Context context);

}

有关支持的属性的信息,请参阅消息侦听器容器配置。

与模板类似,容器具有属性。​​ConsumerCustomizer​

请参阅 Java 客户机文档,了解如何定制 和 。​​Environment​​​​Consumer​

使用 时,配置 ;此时,大多数属性(等)将被忽略。仅支持 、 和。 此外,只能包含一个流名称。​​@RabbitListener​​​​StreamRabbitListenerContainerFactory​​​​@RabbitListener​​​​concurrency​​​​id​​​​queues​​​​autoStartup​​​​containerFactory​​​​queues​

4.2.3. 例子

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
template.setProducerCustomizer((name, builder) -> builder.name("test"));
return template;
}

@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
return new StreamRabbitListenerContainerFactory(env);
}

@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
...
}

@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
factory.setNativeListener(true);
factory.setConsumerCustomizer((id, builder) -> {
builder.name("myConsumer")
.offset(OffsetSpecification.first())
.manualTrackingStrategy();
});
return factory;
}

@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
...
context.storeOffset();
}

版本 2.4.5 将该属性添加到 (及其工厂)。 还提供了一个新的工厂 Bean,用于创建无状态重试拦截器,该拦截器具有在使用原始流消息时使用的可选选项。​​adviceChain​​​​StreamListenerContainer​​​​StreamMessageRecoverer​

@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
StreamRetryOperationsInterceptorFactoryBean rfb =
new StreamRetryOperationsInterceptorFactoryBean();
rfb.setRetryOperations(retryTemplate);
rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
...
});
return rfb;
}

此容器不支持有状态重试。

4.2.4. 超级流

超级流是分区流的抽象概念,通过将许多流队列绑定到具有参数的交换来实现。​​x-super-stream: true​

供应

为方便起见,可以通过定义 的单个 Bean 来配置超级流。​​SuperStream​

@Bean
SuperStream superStream() {
return new SuperStream("my.super.stream", 3);
}

检测到此 bean 并将声明交换 () 和 3 个队列(分区) - 其中 是 、 、 绑定,路由键等于 。​​RabbitAdmin​​​​my.super.stream​​​​my.super-stream-n​​​​n​​​​0​​​​1​​​​2​​​​n​

如果您还希望通过 AMQP 发布到交易所,则可以提供自定义路由密钥:

@Bean
SuperStream superStream() {
return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
.mapToObj(j -> "rk-" + j)
.collect(Collectors.toList()));
}

键数必须等于分区数。

生产到超级流

您必须将 a 添加到 :​​superStreamRoutingFunction​​​​RabbitStreamTemplate​

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
template.setSuperStreamRouting(message -> {
// some logic to return a String for the client's hashing algorithm
});
return template;
}

您还可以使用 .​​RabbitTemplate​

使用单个活跃使用者消费超级流

在侦听器容器上调用该方法,以在超级流上启用单个活动使用者。​​superStream​

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
StreamListenerContainer container(Environment env, String name) {
StreamListenerContainer container = new StreamListenerContainer(env);
container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
container.setupMessageListener(msg -> {
...
});
container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
return container;
}

此时,当并发大于 1 时,实际并发进一步由 ;若要实现完全并发,请将环境设置为 1。 请参阅配置环境​。​​Environment​​​​maxConsumersByConnection​

4.3. 日志记录子系统 AMQP 追加器

该框架为一些流行的日志记录子系统提供了日志记录追加器:

  • 回落(从Spring AMQP版本1.4开始)
  • log4j2(从Spring AMQP版本1.6开始)

追加程序是使用日志记录子系统的常规机制配置的,以下各节中指定了可用属性。

4.3.1. 通用属性

以下属性可用于所有追加程序:

表 4.常见追加器属性

财产

违约

描述


exchangeName



logs


要向其发布日志事件的交易所的名称。


exchangeType



topic


要向其发布日志事件的交换的类型 — 仅当追加程序声明交换时才需要。 看。​​declareExchange​


routingKeyPattern



%c.%p


记录用于生成路由密钥的子系统模式格式。


applicationId




应用程序 ID — 添加到路由密钥(如果模式包括 )。​​%X{applicationId}​


senderPoolSize



2


用于发布日志事件的线程数。


maxSenderRetries



30


在代理不可用或出现其他错误时重试发送消息的次数。 重试延迟如下:,其中重试次数。​​N ^ log(N)​​​​N​


addresses




以逗号分隔的代理地址列表,格式如下:- 覆盖和 。​​host:port[,host:port]*​​​​host​​​​port​


host



localhost


要连接的 RabbitMQ 主机 。


port



5672


要连接的 RabbitMQ 端口。


virtualHost



/


要连接的 RabbitMQ 虚拟主机。


username



guest


连接时使用的 RabbitMQ 用户。


password



guest


此用户的 RabbitMQ 密码。


useSsl



false


是否使用 SSL 进行 RabbitMQ 连接。 请参阅 RabbitConnectionFactoryBean 和配置 SSL


verifyHostname



true


为 TLS 连接启用服务器主机名验证。 请参阅 RabbitConnectionFactoryBean 和配置 SSL


sslAlgorithm



null


要使用的 SSL 算法。


sslPropertiesLocation



null


SSL 属性文件的位置。


keyStore



null


密钥库的位置。


keyStorePassphrase



null


密钥库的密码短语。


keyStoreType



JKS


密钥库类型。


trustStore



null


信任库的位置。


trustStorePassphrase



null


信任库的密码。


trustStoreType



JKS


信任库类型。


saslConfig



null (RabbitMQ client default applies)


该 - 有关有效值,请参阅 javadoc。​​saslConfig​​​​RabbitUtils.stringToSaslConfig​


contentType



text/plain


​content-type​​日志消息的属性。


contentEncoding




​content-encoding​​日志消息的属性。


declareExchange



false


是否在此追加程序启动时声明配置的交换。 另请参阅和。​​durable​​​​autoDelete​


durable



true


当为 时,持久标志将设置为此值。​​declareExchange​​​​true​


autoDelete



false


当为 时,自动删除标志将设置为此值。​​declareExchange​​​​true​


charset



null


转换为 时要使用的字符集。 默认值:空(使用系统默认字符集)。 如果当前平台不支持该字符集,我们将回退到使用系统字符集。​​String​​​​byte[]​


deliveryMode



PERSISTENT


​PERSISTENT​​​或者确定 RabbitMQ 是否应该持久化消息。​​NON_PERSISTENT​


generateId



false


用于确定属性是否设置为唯一值。​​messageId​


clientConnectionProperties



null


以逗号分隔的 RabbitMQ 连接的自定义客户端属性对列表。​​key:value​


addMdcAsHeaders



true


MDC 属性始终添加到 RabbitMQ 消息标头中,直到引入此属性。 这可能会导致大型 MDC 出现问题,因为 RabbitMQ 对所有标头的缓冲区大小有限,并且此缓冲区非常小。 引入此属性是为了避免在大型MDC的情况下出现问题。 默认情况下,此值设置为 用于向后兼容。 关闭序列化 MDC 到标头。 请注意,默认情况下会将MDC添加到消息中。​​true​​​​false​​​​JsonLayout​

4.3.2. 日志4j 2 追加器

以下示例演示如何配置 Log4j 2 追加器:

<Appenders>
...
<RabbitMQ name="rabbitmq"
addresses="foo:5672,bar:5672" user="guest" password="guest" virtualHost="/"
exchange="log4j2" exchangeType="topic" declareExchange="true" durable="true" autoDelete="false"
applicationId="myAppId" routingKeyPattern="%X{applicationId}.%c.%p"
contentType="text/plain" contentEncoding="UTF-8" generateId="true" deliveryMode="NON_PERSISTENT"
charset="UTF-8"
senderPoolSize="3" maxSenderRetries="5"
addMdcAsHeaders="false">
</RabbitMQ>
</Appenders>


从版本 1.6.10 和 1.7.3 开始,默认情况下,log4j2 追加器将消息发布到调用线程上的 RabbitMQ。 这是因为默认情况下,Log4j 2 不会创建线程安全事件。 如果代理关闭,则 用于重试,重试之间没有延迟。 如果要恢复以前在单独的线程上发布消息的行为 (),可以将该属性设置为 。 但是,您还需要将 Log4j 2 配置为使用 而不是 . 一种方法是 设置系统属性 . 如果将异步发布与 一起使用,则事件很有可能由于串扰而损坏。​​maxSenderRetries​​​​senderPoolSize​​​​async​​​​true​​​​DefaultLogEventFactory​​​​ReusableLogEventFactory​​​​-Dlog4j2.enable.threadlocals=false​​​​ReusableLogEventFactory​


4.3.3. 登录追加器

以下示例演示如何配置回日志追加程序:

<appender name="AMQP" class="org.springframework.amqp.rabbit.logback.AmqpAppender">
<layout>
<pattern><![CDATA[ %d %p %t [%c] - <%m>%n ]]></pattern>
</layout>
<addresses>foo:5672,bar:5672</addresses>
<abbreviation>36</abbreviation>
<includeCallerData>false</includeCallerData>
<applicationId>myApplication</applicationId>
<routingKeyPattern>%property{applicationId}.%c.%p</routingKeyPattern>
<generateId>true</generateId>
<charset>UTF-8</charset>
<durable>false</durable>
<deliveryMode>NON_PERSISTENT</deliveryMode>
<declareExchange>true</declareExchange>
<addMdcAsHeaders>false</addMdcAsHeaders>
</appender>

从版本 1.7.1 开始,Logback 提供了一个选项,默认情况下。 提取调用方数据可能相当昂贵,因为日志事件必须创建一个可抛出对象并检查它以确定调用位置。 因此,默认情况下,将事件添加到事件队列时,不会提取与事件关联的调用方数据。 通过将属性设置为 ,可以将追加程序配置为包含调用方数据。​​AmqpAppender​​​​includeCallerData​​​​false​​​​includeCallerData​​​​true​

从版本 2.0.0 开始,Logback 支持带有该选项的 Logback 编码器。 和选项是互斥的。​​AmqpAppender​​​​encoder​​​​encoder​​​​layout​

4.3.4. 自定义消息

默认情况下,AMQP 追加程序填充以下消息属性:

  • ​deliveryMode​
  • 内容类型
  • ​contentEncoding​​,如果已配置
  • ​messageId​​,如果已配置generateId
  • ​timestamp​​日志事件
  • ​appId​​,如果配置了应用程序 ID

此外,它们使用以下值填充标头:

  • ​categoryName​​日志事件
  • 日志事件的级别
  • ​thread​​:发生日志事件的线程的名称
  • 日志事件调用的堆栈跟踪的位置
  • 所有 MDC 属性的副本(除非设置为addMdcAsHeadersfalse)

每个追加器都可以进行子类化,以便您在发布之前修改消息。 以下示例演示如何自定义日志消息:

public class MyEnhancedAppender extends AmqpAppender {

@Override
public Message postProcessMessageBeforeSend(Message message, Event event) {
message.getMessageProperties().setHeader("foo", "bar");
return message;
}

}

从 2.2.4 开始,log4j2 可以使用和扩展进行扩展​​AmqpAppender​​​​@PluginBuilderFactory​​​​AmqpAppender.Builder​

@Plugin(name = "MyEnhancedAppender", category = "Core", elementType = "appender", printObject = true)
public class MyEnhancedAppender extends AmqpAppender {

public MyEnhancedAppender(String name, Filter filter, Layout<? extends Serializable> layout,
boolean ignoreExceptions, AmqpManager manager, BlockingQueue<Event> eventQueue, String foo, String bar) {
super(name, filter, layout, ignoreExceptions, manager, eventQueue);

@Override
public Message postProcessMessageBeforeSend(Message message, Event event) {
message.getMessageProperties().setHeader("foo", "bar");
return message;
}

@PluginBuilderFactory
public static Builder newBuilder() {
return new Builder();
}

protected static class Builder extends AmqpAppender.Builder {

@Override
protected AmqpAppender buildInstance(String name, Filter filter, Layout<? extends Serializable> layout,
boolean ignoreExceptions, AmqpManager manager, BlockingQueue<Event> eventQueue) {
return new MyEnhancedAppender(name, filter, layout, ignoreExceptions, manager, eventQueue);
}
}

}

4.3.5. 自定义客户端属性

可以通过添加字符串属性或更复杂的属性来添加自定义客户端属性。

简单字符串属性

每个追加器都支持将客户端属性添加到 RabbitMQ 连接。

下面的示例演示如何为日志添加自定义客户端属性:

<appender name="AMQP" ...>
...
<clientConnectionProperties>thing1:thing2,cat:hat</clientConnectionProperties>
...
</appender>

例 5.日志4j2

<Appenders>
...
<RabbitMQ name="rabbitmq"
...
clientConnectionProperties="thing1:thing2,cat:hat"
...
</RabbitMQ>
</Appenders>

The properties are a comma-delimited list of pairs. Keys and values cannot contain commas or colons.​​key:value​

These properties appear on the RabbitMQ Admin UI when the connection is viewed.

Advanced Technique for Logback

You can subclass the Logback appender. Doing so lets you modify the client connection properties before the connection is established. The following example shows how to do so:

public class MyEnhancedAppender extends AmqpAppender {

private String thing1;

@Override
protected void updateConnectionClientProperties(Map<String, Object> clientProperties) {
clientProperties.put("thing1", this.thing1);
}

public void setThing1(String thing1) {
this.thing1 = thing1;
}

}

然后,您可以添加到回.xml。​​<thing1>thing2</thing1>​

对于 String 属性(如前面示例中所示的属性),可以使用前面的技术。 子类允许添加更丰富的属性(例如添加或数字属性)。​​Map​

4.3.6. 提供自定义队列实现

使用 a 将日志记录事件异步发布到 RabbitMQ。 默认情况下,使用 a。 但是,您可以提供任何类型的自定义实现。​​AmqpAppenders​​​​BlockingQueue​​​​LinkedBlockingQueue​​​​BlockingQueue​

以下示例演示如何对 Logback 执行此操作:

public class MyEnhancedAppender extends AmqpAppender {

@Override
protected BlockingQueue<Event> createEventQueue() {
return new ArrayBlockingQueue();
}

}

Log4j 2 追加器支持使用 BlockingQueueFactory,如以下示例所示:

<Appenders>
...
<RabbitMQ name="rabbitmq"
bufferSize="10" ... >
<ArrayBlockingQueue/>
</RabbitMQ>
</Appenders>

4.4. 示例应用程序

Spring AMQP Samples项目包括两个示例应用程序。 第一个是一个简单的“Hello World”示例,演示同步和异步消息接收。 它为了解基本组件提供了一个极好的起点。 第二个示例基于股票交易用例,用于演示实际应用程序中常见的交互类型。 在本章中,我们将快速浏览每个示例,以便您可以专注于最重要的组件。 这些示例都是基于 Maven 的,因此您应该能够将它们直接导入到任何 Maven 感知的 IDE(例如 SpringSource Tool Suite)中。

4.4.1. “你好世界”示例

“Hello World”示例演示了同步和异步消息接收。 可以将示例导入 IDE,然后按照下面的讨论进行操作。​​spring-rabbit-helloworld​

同步示例

在目录中,导航到包。 打开类并注意它包含类级别的注释,并注意方法级别的一些注释。 这是Spring基于Java的配置的一个例子。 您可以​​在此处​​阅读更多相关信息。​​src/main/java​​​​org.springframework.amqp.helloworld​​​​HelloWorldConfiguration​​​​@Configuration​​​​@Bean​

以下清单显示了如何创建连接工厂:

@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}

该配置还包含 的实例,默认情况下,该实例查找交换、队列或绑定类型的任何 bean,然后在代理上声明它们。 实际上,生成的 bean 就是一个例子,因为它是 的实例。​​RabbitAdmin​​​​helloWorldQueue​​​​HelloWorldConfiguration​​​​Queue​

下面的清单显示了 Bean 定义:​​helloWorldQueue​

@Bean
public Queue helloWorldQueue() {
return new Queue(this.helloWorldQueueName);
}

回顾一下 Bean 配置,您可以看到它的名称 set 作为其属性(用于接收消息)和其属性(用于发送消息)。​​rabbitTemplate​​​​helloWorldQueue​​​​queue​​​​routingKey​

现在我们已经探索了配置,我们可以查看实际使用这些组件的代码。 首先,从同一包中打开类。 它包含一个创建弹簧的方法。​​Producer​​​​main()​​​​ApplicationContext​

以下清单显示了该方法:​​main​

public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend("Hello World");
System.out.println("Sent: Hello World");
}

在前面的示例中,检索 Bean 并用于发送 . 由于客户端代码应尽可能依赖于接口,因此类型是 而不是 。 即使 中创建的 bean 是 的实例,依赖接口意味着此代码更具可移植性(您可以独立于代码更改配置)。 由于调用了该方法,因此模板将委托给其实例。 在这种情况下,它使用缺省值 ,但可以向 Bean 提供不同的实现,如 中所定义。​​AmqpTemplate​​​​Message​​​​AmqpTemplate​​​​RabbitTemplate​​​​HelloWorldConfiguration​​​​RabbitTemplate​​​​convertAndSend()​​​​MessageConverter​​​​SimpleMessageConverter​​​​rabbitTemplate​​​​HelloWorldConfiguration​

现在打开课程。 它实际上共享相同的配置基类,这意味着它共享 Bean。 这就是为什么我们为该模板配置了 a(用于发送)和 a(用于接收)。 正如我们在 AmqpTemplate 中所描述的,您可以将 'routetingKey' 参数传递给 send 方法,将 'queue' 参数传递给 receive 方法。 代码基本上是生产者的镜像,调用而不是 。​​Consumer​​​​rabbitTemplate​​​​routingKey​​​​queue​​​​Consumer​​​​receiveAndConvert()​​​​convertAndSend()​

下面的清单显示了 的主要方法:​​Consumer​

public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}

如果先运行 ,然后运行 ,则应在控制台输出中看到。​​Producer​​​​Consumer​​​​Received: Hello World​

异步示例

同步示例演练了同步 Hello World 示例。 本节介绍一个稍微高级但功能更强大的选项。 通过一些修改,Hello World 示例可以提供异步接收的示例,也称为消息驱动的 POJO。 实际上,有一个子包可以提供: .​​org.springframework.amqp.samples.helloworld.async​

同样,我们从发送方开始。 打开类并注意到它创建了一个 和 bean。 这一次,由于配置专用于消息发送端,我们甚至不需要任何队列定义,并且只设置了“routetingKey”属性。 回想一下,消息被发送到交易所,而不是直接发送到队列。 AMQP 默认交换是没有名称的直接交换。 所有队列都绑定到该默认交换,其名称作为路由密钥。 这就是为什么我们只需要在这里提供路由密钥。​​ProducerConfiguration​​​​connectionFactory​​​​rabbitTemplate​​​​RabbitTemplate​

以下清单显示了定义:​​rabbitTemplate​

public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(this.helloWorldQueueName);
return template;
}

由于此示例演示了异步消息接收,因此生产端被设计为连续发送消息(如果它是像同步版本这样的每次执行消息模型,那么它实际上是一个消息驱动的使用者就不会那么明显)。 负责持续发送消息的组件定义为 中的内部类。 它配置为每三秒运行一次。​​ProducerConfiguration​

以下清单显示了该组件:

static class ScheduledProducer {

@Autowired
private volatile RabbitTemplate rabbitTemplate;

private final AtomicInteger counter = new AtomicInteger();

@Scheduled(fixedRate = 3000)
public void sendMessage() {
rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
}
}

您不需要了解所有细节,因为真正的重点应该放在接收方(我们将在下面介绍)。 但是,如果您还不熟悉 Spring 任务调度支持,可以在此处了解更多信息。 简短的故事是,中的 bean 使用调度程序注册任务。​​postProcessor​​​​ProducerConfiguration​

现在我们可以转向接收方。 为了强调消息驱动的 POJO 行为,我们从对消息做出反应的组件开始。 该类被调用并显示在以下清单中:​​HelloWorldHandler​

public class HelloWorldHandler {

public void handleMessage(String text) {
System.out.println("Received: " + text);
}

}

那个班级是一个POJO。 它不扩展任何基类,不实现任何接口,甚至不包含任何导入。 它正在被Spring AMQP“适应”到接口。 然后,可以在 上配置该适配器。 对于此示例,将在类中创建容器。 您可以在那里看到包裹在适配器中的 POJO。​​MessageListener​​​​MessageListenerAdapter​​​​SimpleMessageListenerContainer​​​​ConsumerConfiguration​

以下清单显示了如何定义 :​​listenerContainer​

@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueName(this.helloWorldQueueName);
container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
return container;
}

这是一个 Spring 生命周期组件,默认情况下会自动启动。 如果您查看该类,您会发现它的方法只包含一个单行引导程序来创建 . 生产者的方法也是单行引导程序,因为其方法被注释的组件也会自动启动。 您可以按任意顺序启动 和,并且您应该看到每三秒发送和接收一次消息。​​SimpleMessageListenerContainer​​​​Consumer​​​​main()​​​​ApplicationContext​​​​main()​​​​@Scheduled​​​​Producer​​​​Consumer​

4.4.2. 股票交易

股票交易示例演示了比 Hello World 示例更高级的消息传递方案。 但是,配置非常相似,如果涉及更多的话。 由于我们详细介绍了 Hello World 配置,因此在这里,我们将重点介绍此示例的不同之处。 有一个服务器将市场数据(股票报价)推送到主题交易所。 然后,客户端可以通过绑定具有路由模式的队列来订阅市场数据馈送(例如,)。 此演示的另一个主要功能是由客户端发起并由服务器处理的请求-回复“股票交易”交互。 这涉及客户端在订单请求消息本身中发送的专用队列。​​app.stock.quotes.nasdaq.*​​​​replyTo​

服务器的核心配置位于包中的类中。 它扩展了 . 这是定义服务器和客户端通用资源的地方,包括市场数据主题交换(其名称为“app.stock.marketdata”)和服务器为股票交易公开的队列(其名称为“app.stock.request”)。 在该通用配置文件中,您还会看到 上配置了 。​​RabbitServerConfiguration​​​​org.springframework.amqp.rabbit.stocks.config.server​​​​AbstractStockAppRabbitConfiguration​​​​Jackson2JsonMessageConverter​​​​RabbitTemplate​

特定于服务器的配置由两部分组成。 首先,它在 上配置市场数据交换,这样它就不需要在每次调用发送 . 它在基配置类中定义的抽象回调方法中执行此操作。 下面的清单显示了该方法:​​RabbitTemplate​​​​Message​

public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}

其次,声明股票请求队列。 在这种情况下,它不需要任何显式绑定,因为它绑定到默认的无名称交换,并以其自己的名称作为路由密钥。 如前所述,AMQP 规范定义了该行为。 下面的清单显示了 Bean 的定义:​​stockRequestQueue​

@Bean
public Queue stockRequestQueue() {
return new Queue(STOCK_REQUEST_QUEUE_NAME);
}

现在您已经看到了服务器的 AMQP 资源的配置,请导航到目录下的包。 在那里,您可以看到提供方法的实际类。 它创建一个基于配置文件。 在那里,您可以看到发布虚拟市场数据的计划任务。 该配置依赖于 Spring 的命名空间支持。 引导配置文件还会导入一些其他文件。 最有趣的是 ,它直接在 . 在那里,您可以看到负责处理股票交易请求的 bean。 最后,看看 中定义的 bean(它也在 'src/main/resources' 中)。 该 bean 是该类的一个实例,是消息驱动的 POJO 的一个很好的例子,它也可以发送回复消息。 请注意,它本身并不与框架或任何 AMQP 概念耦合。 它接受 a 并返回 . 下面的清单显示了该方法的定义:​​org.springframework.amqp.rabbit.stocks​​​​src/test/java​​​​Server​​​​main()​​​​ApplicationContext​​​​server-bootstrap.xml​​​​task​​​​server-messaging.xml​​​​src/main/resources​​​​messageListenerContainer​​​​serverHandler​​​​server-handlers.xml​​​​ServerHandler​​​​TradeRequest​​​​TradeResponse​​​​handleMessage​

public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}

现在我们已经了解了服务器最重要的配置和代码,我们可以转向客户端了。 最好的起点可能是 包中的 。 请注意,它声明了两个队列,但没有提供显式名称。 下面的清单显示了两个队列的 Bean 定义:​​RabbitClientConfiguration​​​​org.springframework.amqp.rabbit.stocks.config.client​

@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}

@Bean
public Queue traderJoeQueue() {
return amqpAdmin().declareQueue();
}

这些是专用队列,并且自动生成唯一名称。 客户端使用第一个生成的队列绑定到服务器已公开的市场数据交换。 回想一下,在AMQP中,消费者与队列交互,而生产者与交易所交互。 队列与交易所的“绑定”是告诉代理将消息从给定交易所传递(或路由)到队列的原因。 由于市场数据交换是主题交换,因此可以使用路由模式表示绑定。 使用对象执行此操作,并且该对象是使用流畅的 API 生成的。 以下清单显示了:​​RabbitClientConfiguration​​​​Binding​​​​BindingBuilder​​​​Binding​

@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;

@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}

Notice that the actual value has been externalized in a properties file ( under ), and that we use Spring’s annotation to inject that value. This is generally a good idea. Otherwise, the value would have been hardcoded in a class and unmodifiable without recompilation. In this case, it is much easier to run multiple versions of the client while making changes to the routing pattern used for binding. We can try that now.​​client.properties​​​​src/main/resources​​​​@Value​

Start by running and then . You should see dummy quotations for stocks, because the current value associated with the 'stocks.quote.pattern' key in client.properties is 'app.stock.quotes.nasdaq.'. Now, while keeping the existing ​​Server​​​ and ​​Client​​ running, change that property value to 'app.stock.quotes.nyse.' and start a second instance. You should see that the first client still receives NASDAQ quotes while the second client receives NYSE quotes. You could instead change the pattern to get all stocks or even an individual ticker.​​org.springframework.amqp.rabbit.stocks.Server​​​​org.springframework.amqp.rabbit.stocks.Client​​​​NASDAQ​​​​Client​

The final feature we explore is the request-reply interaction from the client’s perspective. Recall that we have already seen the that accepts objects and returns objects. The corresponding code on the side is in the package. It delegates to the in order to send messages. The following listing shows the method:​​ServerHandler​​​​TradeRequest​​​​TradeResponse​​​​Client​​​​RabbitStockServiceGateway​​​​org.springframework.amqp.rabbit.stocks.gateway​​​​RabbitTemplate​​​​send​

public void send(TradeRequest tradeRequest) {
getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
try {
message.getMessageProperties().setCorrelationId(
UUID.randomUUID().toString().getBytes("UTF-8"));
}
catch (UnsupportedEncodingException e) {
throw new AmqpException(e);
}
return message;
}
});
}

Notice that, prior to sending the message, it sets the address. It provides the queue that was generated by the bean definition (shown earlier). The following listing shows the definition for the class itself:​​replyTo​​​​traderJoeQueue​​​​@Bean​​​​StockServiceGateway​

@Bean
public StockServiceGateway stockServiceGateway() {
RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
gateway.setRabbitTemplate(rabbitTemplate());
gateway.setDefaultReplyToQueue(traderJoeQueue());
return gateway;
}

如果不再运行服务器和客户端,请立即启动它们。 尝试发送格式为“100 TCKR”的请求。 在模拟请求“处理”的短暂人为延迟后,您应该会在客户端上看到一条确认消息。

4.4.3. 从非 Spring 应用程序接收 JSON

Spring 应用程序在发送 JSON 时,将标头设置为完全限定的类名,以帮助接收应用程序将 JSON 转换回 Java 对象。​TypeId

该示例探讨了从非 Spring 应用程序转换 JSON 的几种技术。​​spring-rabbit-json​

另请参阅Jackson2JsonMessageConverter以及DefaultClassMapper的Javadoc。

4.5. 测试支持

为异步应用程序编写集成必然比测试更简单的应用程序更复杂。 当诸如注释之类的抽象进入图片时,这会变得更加复杂。 问题是如何验证在发送消息后,侦听器是否按预期收到了消息。​​@RabbitListener​

框架本身有许多单元测试和集成测试。 有些使用模拟,而另一些则使用实时 RabbitMQ 代理的集成测试。 可以查阅这些测试,了解有关测试方案的一些想法。

Spring AMQP 版本 1.6 引入了 jar,它为测试其中一些更复杂的场景提供了支持。 预计该项目将随着时间的推移而扩展,但我们需要社区反馈,以便为帮助测试所需的功能提出建议。 请使用 ​​JIRA​​ 或 GitHub 问题来提供此类反馈。​​spring-rabbit-test​

4.5.1. @SpringRabbitTest

使用此注释将基础结构 bean 添加到 Spring 测试中。 这在使用时不是必需的,例如,因为Spring Boot的自动配置将添加bean。​​ApplicationContext​​​​@SpringBootTest​

注册的豆类是:

  • ​CachingConnectionFactory​​ (autoConnectionFactory).如果存在,则使用其连接工厂。@RabbitEnabled
  • ​RabbitTemplate​​ (autoRabbitTemplate)
  • ​RabbitAdmin​​ (autoRabbitAdmin)
  • ​RabbitListenerContainerFactory​​ (autoContainerFactory)

此外,还添加了与(支持)相关的 bean。​​@EnableRabbit​​​​@RabbitListener​

例 6.Junit5 示例

@SpringJunitConfig
@SpringRabbitTest
public class MyRabbitTests {

@Autowired
private RabbitTemplate template;

@Autowired
private RabbitAdmin admin;

@Autowired
private RabbitListenerEndpointRegistry registry;

@Test
void test() {
...
}

@Configuration
public static class Config {

...

}

}

使用 JUnit4,替换为 。​​@SpringJunitConfig​​​​@RunWith(SpringRunnner.class)​

4.5.2. 模拟实现​​Answer<?>​

目前有两种实现可以帮助进行测试。​​Answer<?>​

第一个 , 提供返回并倒计时闩锁。 以下示例演示如何使用:​​LatchCountDownAndCallRealMethodAnswer​​​​Answer<Void>​​​​null​​​​LatchCountDownAndCallRealMethodAnswer​

LatchCountDownAndCallRealMethodAnswer answer = this.harness.getLatchAnswerFor("myListener", 2);
doAnswer(answer)
.when(listener).foo(anyString(), anyString());

...

assertThat(answer.await(10)).isTrue();

第二种,提供了一种机制来选择性地调用真实方法,并提供机会 以根据 和结果(如果有)返回自定义结果。​​LambdaAnswer<T>​​​​InvocationOnMock​

考虑以下 POJO:

public class Thing {

public String thing(String thing) {
return thing.toUpperCase();
}

}

以下类测试 POJO:​​Thing​

Thing thing = spy(new Thing());

doAnswer(new LambdaAnswer<String>(true, (i, r) -> r + r))
.when(thing).thing(anyString());
assertEquals("THINGTHING", thing.thing("thing"));

doAnswer(new LambdaAnswer<String>(true, (i, r) -> r + i.getArguments()[0]))
.when(thing).thing(anyString());
assertEquals("THINGthing", thing.thing("thing"));

doAnswer(new LambdaAnswer<String>(false, (i, r) ->
"" + i.getArguments()[0] + i.getArguments()[0])).when(thing).thing(anyString());
assertEquals("thingthing", thing.thing("thing"));

从版本 2.2.3 开始,答案将捕获所测试方法引发的任何异常。 用于获取对它们的引用。​​answer.getExceptions()​

当与​​@RabbitListenerTest和RabbitListenerTestHarness​​结合使用时,用于为听众获得正确构造的答案。​​harness.getLambdaAnswerFor("listenerId", true, …)​

4.5.3. 和​​@RabbitListenerTest​​​​RabbitListenerTestHarness​

用 注释其中一个类会导致框架替换 带有一个名为 (它还允许通过 进行检测)的子类的标准。​​@Configuration​​​​@RabbitListenerTest​​​​RabbitListenerAnnotationBeanPostProcessor​​​​RabbitListenerTestHarness​​​​@RabbitListener​​​​@EnableRabbit​

通过两种方式增强听众。 首先,它将侦听器包装在 中,启用正常的存根和验证操作。 它还可以为侦听器添加 ,从而允许访问参数、结果和引发的任何异常。 您可以使用 上的属性控制启用其中的哪一个(或两者)。 后者用于访问有关调用的较低级别数据。 它还支持在调用异步侦听器之前阻止测试线程。​​RabbitListenerTestHarness​​​​Mockito Spy​​​​Mockito​​​​Advice​​​​@RabbitListenerTest​


​final​​​ ​​@RabbitListener​​​方法不能被监视或建议。 此外,只能监视或建议具有属性的侦听器。​​id​

考虑一些例子。

以下示例使用 spy:

@Configuration
@RabbitListenerTest
public class Config {

@Bean
public Listener listener() {
return new Listener();
}

...

}

public class Listener {

@RabbitListener(id="foo", queues="#{queue1.name}")
public String foo(String foo) {
return foo.toUpperCase();
}

@RabbitListener(id="bar", queues="#{queue2.name}")
public void foo(@Payload String foo, @Header("amqp_receivedRoutingKey") String rk) {
...
}

}

public class MyTests {

@Autowired
private RabbitListenerTestHarness harness;

@Test
public void testTwoWay() throws Exception {
assertEquals("FOO", this.rabbitTemplate.convertSendAndReceive(this.queue1.getName(), "foo"));

Listener listener = this.harness.getSpy("foo");
assertNotNull(listener);
verify(listener).foo("foo");
}

@Test
public void testOneWay() throws Exception {
Listener listener = this.harness.getSpy("bar");
assertNotNull(listener);

LatchCountDownAndCallRealMethodAnswer answer = this.harness.getLatchAnswerFor("bar", 2);
doAnswer(answer).when(listener).foo(anyString(), anyString());

this.rabbitTemplate.convertAndSend(this.queue2.getName(), "bar");
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "baz");

assertTrue(answer.await(10));
verify(listener).foo("bar", this.queue2.getName());
verify(listener).foo("baz", this.queue2.getName());
}

}


将工具注入测试用例,以便我们可以访问间谍。


获取对间谍的引用,以便我们可以验证它是否按预期调用。 由于这是一个发送和接收操作,因此无需挂起测试线程,因为它已经 暂停等待答复。​​RabbitTemplate​


在这种情况下,我们只使用发送操作,因此我们需要一个锁存器来等待对侦听器的异步调用 在容器线程上。 我们使用 ​​Answer<?>​​​ 实现之一来帮助解决这个问题。 重要提示:由于监听器被监视的方式,使用它来获取正确配置的间谍答案非常重要。​​harness.getLatchAnswerFor()​


配置间谍以调用 .​​Answer​

以下示例使用捕获建议:

@Configuration
@ComponentScan
@RabbitListenerTest(spy = false, capture = true)
public class Config {

}

@Service
public class Listener {

private boolean failed;

@RabbitListener(id="foo", queues="#{queue1.name}")
public String foo(String foo) {
return foo.toUpperCase();
}

@RabbitListener(id="bar", queues="#{queue2.name}")
public void foo(@Payload String foo, @Header("amqp_receivedRoutingKey") String rk) {
if (!failed && foo.equals("ex")) {
failed = true;
throw new RuntimeException(foo);
}
failed = false;
}

}

public class MyTests {

@Autowired
private RabbitListenerTestHarness harness;

@Test
public void testTwoWay() throws Exception {
assertEquals("FOO", this.rabbitTemplate.convertSendAndReceive(this.queue1.getName(), "foo"));

InvocationData invocationData =
this.harness.getNextInvocationDataFor("foo", 0, TimeUnit.SECONDS);
assertThat(invocationData.getArguments()[0], equalTo("foo"));
assertThat((String) invocationData.getResult(), equalTo("FOO"));
}

@Test
public void testOneWay() throws Exception {
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "bar");
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "baz");
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "ex");

InvocationData invocationData =
this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS);
Object[] args = invocationData.getArguments();
assertThat((String) args[0], equalTo("bar"));
assertThat((String) args[1], equalTo(queue2.getName()));

invocationData = this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS);
args = invocationData.getArguments();
assertThat((String) args[0], equalTo("baz"));

invocationData = this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS);
args = invocationData.getArguments();
assertThat((String) args[0], equalTo("ex"));
assertEquals("ex", invocationData.getThrowable().getMessage());
}

}


将工具注入测试用例,以便我们可以访问间谍。


用于检索调用数据 - 在本例中,因为它是请求/回复 场景 无需等待任何时间,因为测试线程在等待中已挂起 为结果。​​harness.getNextInvocationDataFor()​​​​RabbitTemplate​


然后,我们可以验证参数和结果是否符合预期。


这次我们需要一些时间来等待数据,因为它是容器线程上的异步操作,我们需要 以挂起测试线程。


当侦听器引发异常时,它在调用数据的属性中可用。​​throwable​


当将自定义 s 与工具一起使用时,为了正常运行,此类答案应从工具()中子类并获取实际侦听器(而不是间谍)并调用 . 有关示例,请参阅提供的 ​​Mockito Answer<?> Implementations 源代码​​​。​​Answer<?>​​​​ForwardsInvocation​​​​getDelegate("myListener")​​​​super.answer(invocation)​

4.5.4. 使用​​TestRabbitTemplate​

提供 是为了执行一些基本的集成测试,而无需代理。 当您将其添加为测试用例中的 时,它会发现上下文中的所有侦听器容器,无论是声明为 或 还是使用注释。 它目前仅支持按队列名称路由。 该模板从容器中提取消息侦听器,并直接在测试线程上调用它。 返回答复的侦听器支持请求-答复消息传递(方法)。​​TestRabbitTemplate​​​​@Bean​​​​@Bean​​​​<bean/>​​​​@RabbitListener​​​​sendAndReceive​

以下测试用例使用该模板:

@RunWith(SpringRunner.class)
public class TestRabbitTemplateTests {

@Autowired
private TestRabbitTemplate template;

@Autowired
private Config config;

@Test
public void testSimpleSends() {
this.template.convertAndSend("foo", "hello1");
assertThat(this.config.fooIn, equalTo("foo:hello1"));
this.template.convertAndSend("bar", "hello2");
assertThat(this.config.barIn, equalTo("bar:hello2"));
assertThat(this.config.smlc1In, equalTo("smlc1:"));
this.template.convertAndSend("foo", "hello3");
assertThat(this.config.fooIn, equalTo("foo:hello1"));
this.template.convertAndSend("bar", "hello4");
assertThat(this.config.barIn, equalTo("bar:hello2"));
assertThat(this.config.smlc1In, equalTo("smlc1:hello3hello4"));

this.template.setBroadcast(true);
this.template.convertAndSend("foo", "hello5");
assertThat(this.config.fooIn, equalTo("foo:hello1foo:hello5"));
this.template.convertAndSend("bar", "hello6");
assertThat(this.config.barIn, equalTo("bar:hello2bar:hello6"));
assertThat(this.config.smlc1In, equalTo("smlc1:hello3hello4hello5hello6"));
}

@Test
public void testSendAndReceive() {
assertThat(this.template.convertSendAndReceive("baz", "hello"), equalTo("baz:hello"));
}
@Configuration
@EnableRabbit
public static class Config {

public String fooIn = "";

public String barIn = "";

public String smlc1In = "smlc1:";

@Bean
public TestRabbitTemplate template() throws IOException {
return new TestRabbitTemplate(connectionFactory());
}

@Bean
public ConnectionFactory connectionFactory() throws IOException {
ConnectionFactory factory = mock(ConnectionFactory.class);
Connection connection = mock(Connection.class);
Channel channel = mock(Channel.class);
willReturn(connection).given(factory).createConnection();
willReturn(channel).given(connection).createChannel(anyBoolean());
given(channel.isOpen()).willReturn(true);
return factory;
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() throws IOException {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
return factory;
}

@RabbitListener(queues = "foo")
public void foo(String in) {
this.fooIn += "foo:" + in;
}

@RabbitListener(queues = "bar")
public void bar(String in) {
this.barIn += "bar:" + in;
}

@RabbitListener(queues = "baz")
public String baz(String in) {
return "baz:" + in;
}

@Bean
public SimpleMessageListenerContainer smlc1() throws IOException {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueueNames("foo", "bar");
container.setMessageListener(new MessageListenerAdapter(new Object() {

public void handleMessage(String in) {
smlc1In += in;
}

}));
return container;
}

}

}

4.5.5. JUnit4​​@Rules​

Spring AMQP 版本 1.7 及更高版本提供了一个名为 . 此 jar 包含几个实用程序实例,用于运行 JUnit4 测试。 请参阅 ​​JUnit5​​ 测试的条件。​​spring-rabbit-junit​​​​@Rule​

用​​BrokerRunning​

​BrokerRunning​​提供一种机制,用于在代理未运行时让测试成功(缺省情况下为 on)。​​localhost​

它还具有初始化和清空队列以及删除队列和交换的实用程序方法。

以下示例显示了其用法:

@ClassRule
public static BrokerRunning brokerRunning = BrokerRunning.isRunningWithEmptyQueues("foo", "bar");

@AfterClass
public static void tearDown() {
brokerRunning.removeTestQueues("some.other.queue.too") // removes foo, bar as well
}

有几种静态方法,例如 ,用于验证代理是否启用了管理插件。​​isRunning…​​​​isBrokerAndManagementRunning()​

配置规则

有时,如果没有代理,您希望测试失败,例如夜间 CI 生成。 要在运行时禁用规则,请将名为 的环境变量设置为 。​​RABBITMQ_SERVER_REQUIRED​​​​true​

您可以使用资源库或环境变量覆盖代理属性,例如主机名:

下面的示例演示如何使用资源库重写属性:

@ClassRule
public static BrokerRunning brokerRunning = BrokerRunning.isRunningWithEmptyQueues("foo", "bar");

static {
brokerRunning.setHostName("10.0.0.1")
}

@AfterClass
public static void tearDown() {
brokerRunning.removeTestQueues("some.other.queue.too") // removes foo, bar as well
}

您还可以通过设置以下环境变量来覆盖属性:

public static final String BROKER_ADMIN_URI = "RABBITMQ_TEST_ADMIN_URI";
public static final String BROKER_HOSTNAME = "RABBITMQ_TEST_HOSTNAME";
public static final String BROKER_PORT = "RABBITMQ_TEST_PORT";
public static final String BROKER_USER = "RABBITMQ_TEST_USER";
public static final String BROKER_PW = "RABBITMQ_TEST_PASSWORD";
public static final String BROKER_ADMIN_USER = "RABBITMQ_TEST_ADMIN_USER";
public static final String BROKER_ADMIN_PW = "RABBITMQ_TEST_ADMIN_PASSWORD";

这些环境变量将覆盖默认设置(对于 amqp 和管理 REST API)。​​localhost:5672​​​​localhost:15672/api/​

更改主机名会影响 和 REST API 连接(除非显式设置了管理员 URI)。​​amqp​​​​management​

​BrokerRunning​​还提供了一个名为的方法,该方法允许您传入包含这些变量的映射。 它们覆盖系统环境变量。 如果您希望对多个测试套件中的测试使用不同的配置,这可能很有用。 重要说明:在调用创建规则实例的任何静态方法之前,必须调用该方法。 变量值将应用于在此调用之后创建的所有实例。 调用以重置规则以使用默认值(包括任何实际环境变量)。​​static​​​​setEnvironmentVariableOverrides​​​​isRunning()​​​​clearEnvironmentVariableOverrides()​

在测试用例中,可以在创建连接工厂时使用; 返回规则的 兔子MQ 。 以下示例演示如何执行此操作:​​brokerRunning​​​​getConnectionFactory()​​​​ConnectionFactory​

@Bean
public CachingConnectionFactory rabbitConnectionFactory() {
return new CachingConnectionFactory(brokerRunning.getConnectionFactory());
}
用​​LongRunningIntegrationTest​

​LongRunningIntegrationTest​​是禁用长时间运行的测试的规则。 您可能希望在开发人员系统上使用它,但请确保在夜间 CI 生成等情况下禁用该规则。

以下示例显示了其用法:

@Rule
public LongRunningIntegrationTest longTests = new LongRunningIntegrationTest();

要在运行时禁用规则,请将名为 的环境变量设置为 。​​RUN_LONG_INTEGRATION_TESTS​​​​true​

4.5.6. JUnit5 条件

版本 2.0.2 引入了对 JUnit5 的支持。

使用注释​​@RabbitAvailable​

此类级注释类似于 JUnit4 @Rules 中讨论的内容。 它由 .​​BrokerRunning​​​​@Rule​​​​RabbitAvailableCondition​

批注具有三个属性:

  • ​queues​​:在每次测试之前声明(和清除)并在所有测试完成后删除的队列数组。
  • ​management​​:如果您的测试还需要在代理上安装管理插件,请将此项设置为此选项。true
  • ​purgeAfterEach​​:(从版本 2.2 开始)当(默认)时,将在测试之间清除。truequeues

它用于检查代理是否可用,如果没有,则跳过测试。 如配置规则中所述,如果没有代理,名为 if 的环境变量会导致测试快速失败。 您可以使用环境变量配置条件,如配置规则中所述。​​RABBITMQ_SERVER_REQUIRED​​​​true​

此外,还支持参数化测试构造函数和方法的参数解析。 支持两种参数类型:​​RabbitAvailableCondition​

  • ​BrokerRunningSupport​​:实例(在 2.2 之前,这是一个 JUnit 4 实例)BrokerRunning
  • ​ConnectionFactory​​:实例的 RabbitMQ 连接工厂BrokerRunningSupport

以下示例显示了两者:

@RabbitAvailable(queues = "rabbitAvailableTests.queue")
public class RabbitAvailableCTORInjectionTests {

private final ConnectionFactory connectionFactory;

public RabbitAvailableCTORInjectionTests(BrokerRunningSupport brokerRunning) {
this.connectionFactory = brokerRunning.getConnectionFactory();
}

@Test
public void test(ConnectionFactory cf) throws Exception {
assertSame(cf, this.connectionFactory);
Connection conn = this.connectionFactory.newConnection();
Channel channel = conn.createChannel();
DeclareOk declareOk = channel.queueDeclarePassive("rabbitAvailableTests.queue");
assertEquals(0, declareOk.getConsumerCount());
channel.close();
conn.close();
}

}

前面的测试在框架本身中,用于验证参数注入以及条件是否正确创建了队列。

实际用户测试可能如下所示:

@RabbitAvailable(queues = "rabbitAvailableTests.queue")
public class RabbitAvailableCTORInjectionTests {

private final CachingConnectionFactory connectionFactory;

public RabbitAvailableCTORInjectionTests(BrokerRunningSupport brokerRunning) {
this.connectionFactory =
new CachingConnectionFactory(brokerRunning.getConnectionFactory());
}

@Test
public void test() throws Exception {
RabbitTemplate template = new RabbitTemplate(this.connectionFactory);
...
}
}

在测试类中使用 Spring 注释应用程序上下文时,可以通过名为 的静态方法获取对条件连接工厂的引用。​​RabbitAvailableCondition.getBrokerRunning()​

从版本 2.2 开始,返回一个对象;以前,返回 JUnit 4 实例。 新类具有与 相同的 API。​​getBrokerRunning()​​​​BrokerRunningSupport​​​​BrokerRunnning​​​​BrokerRunning​

以下测试来自框架并演示了用法:

@RabbitAvailable(queues = {
RabbitTemplateMPPIntegrationTests.QUEUE,
RabbitTemplateMPPIntegrationTests.REPLIES })
@SpringJUnitConfig
@DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
public class RabbitTemplateMPPIntegrationTests {

public static final String QUEUE = "mpp.tests";

public static final String REPLIES = "mpp.tests.replies";

@Autowired
private RabbitTemplate template;

@Autowired
private Config config;

@Test
public void test() {

...

}

@Configuration
@EnableRabbit
public static class Config {

@Bean
public CachingConnectionFactory cf() {
return new CachingConnectionFactory(RabbitAvailableCondition
.getBrokerRunning()
.getConnectionFactory());
}

@Bean
public RabbitTemplate template() {

...

}

@Bean
public SimpleRabbitListenerContainerFactory
rabbitListenerContainerFactory() {

...

}

@RabbitListener(queues = QUEUE)
public byte[] foo(byte[] in) {
return in;
}

}

}
使用注释​​@LongRunning​

与 JUnit4 类似,此注释会导致跳过测试,除非环境变量(或系统属性)设置为 。 以下示例演示如何使用它:​​LongRunningIntegrationTest​​​​@Rule​​​​true​

@RabbitAvailable(queues = SimpleMessageListenerContainerLongTests.QUEUE)
@LongRunning
public class SimpleMessageListenerContainerLongTests {

public static final String QUEUE = "SimpleMessageListenerContainerLongTests.queue";

...

}

缺省情况下,变量为 ,但您可以在批注的属性中指定变量名称。​​RUN_LONG_INTEGRATION_TESTS​​​​value​

标签:return,AMQP,项目,Spring,示例,消息,new,public,String
From: https://blog.51cto.com/u_15326439/5951861

相关文章

  • Spring Cloud 应用 Proxyless Mesh 模式探索与实践
    作者:十眠ServiceMesh简介ServiceMesh早已不是一个新兴的概念,目前已经有许多关于ServiceMesh的探索以及实践。2016年可以说是ServiceMesh的元年,Buoyant公司......
  • Spring boot —— 创建parent工程
    方式一<parent>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-parent</artifactId>      <versio......
  • springboot实现AOP切面编程
    概述AOP(AspectOrientedProgramming)即面向切面编程。面向切面是面向对象中的一种方式而已。在代码执行过程中,动态嵌入其他代码,叫做面向切面编程(将交叉业务逻辑封装成成......
  • Spring boot 入门 ---(一)
    环境JavaSDKv1.6或更高版本。这里使用maven作为构建工具下面是一个典型的pom.xml文件:配置pom.xml文件<?xmlversion="1.0"encoding="utf-8"?><projectxmlns="http://mav......
  • spring boot 整合 FastDFS_Client
    FastDFS-Client使用方式1.在项目Pom当中加入依赖Maven依赖为<dependency><groupId>com.github.tobato</groupId><artifactId>fastdfs-client</artifactId><vers......
  • Spring boot整合 Swagger2 以及遇到的坑
    一、引入依赖:<dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.5.0</version></dependency><dependency><gro......
  • spring boot踩坑日记——idea找不到配置文件
    报错:  原因springboot启动时找不到配置文件可以看出配置文件图标显示不对: 解决:标注配置文件......
  • 亲手实现一个springboot默认配置&起步加载
    实现一、默认配置1、创建springboot项目引入spring-boot-dependencies依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-......
  • 【java】(二) SpringBoot 源码解析——run启动加载过程——准备环境
    1.前言深入学习springboot笔记系列,可能会有错误还请指正,互相勉励,互相学习。上一章讲了SpringApplicaiton是如何初始化的,本章讲解后续的run方法的启动过程。本章涉及......
  • spring boot 环境下使用logback
    一、logback的介绍    Logback是由log4j创始人设计的又一个开源日志组件。logback当前分成三个模块:logback-core,logback-classic和logback-access。logback-core是其......