首页 > 数据库 >Spring Integration对Redis的支持

Spring Integration对Redis的支持

时间:2022-12-13 15:32:26浏览次数:118  
标签:Spring 适配器 Redis Integration 实例 消息 序列化

Spring Integration对Redis的支持_通道适配器

Spring Integration 2.1引入了对Redis​的支持:“一个开源的高级键值存储”。 这种支持以基于 Redis 以及发布-订阅消息传递适配器的形式出现,Redis 通过其 PUBLISH、SUBSCRIBE和 UNSUBSCRIBE​命令支持这些适配器。​​MessageStore​

您需要将此依赖项包含在项目中:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
<version>6.0.0</version>
</dependency>

您还需要包含 Redis 客户端依赖项,例如 Lettuce。

要下载、安装和运行 Redis,请参阅 Redis 文档。

连接到 Redis

要开始与 Redis 交互,您首先需要连接到它。 Spring Integration 使用另一个 Spring 项目 Spring Data Redis 提供的支持,该项目提供了典型的 Spring 构造:和 。 这些抽象简化了与多个 Redis 客户端 Java API 的集成。 目前,Spring Data Redis 支持 Jedis 和 Lettuce。​​ConnectionFactory​​​​Template​

用​​RedisConnectionFactory​

要连接到 Redis,您可以使用接口的实现之一。 以下清单显示了接口定义:​​RedisConnectionFactory​

public interface RedisConnectionFactory extends PersistenceExceptionTranslator {

/**
* Provides a suitable connection for interacting with Redis.
* @return connection for interacting with Redis.
*/
RedisConnection getConnection();
}

以下示例显示了如何在 Java 中创建:​​LettuceConnectionFactory​

LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet();

下面的示例展示了如何在 Spring 的 XML 配置中创建:​​LettuceConnectionFactory​

<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>

的实现提供了一组属性,例如端口和主机,您可以根据需要设置这些属性。 拥有 的实例后,可以创建 的实例并将其注入 .​​RedisConnectionFactory​​​​RedisConnectionFactory​​​​RedisTemplate​​​​RedisConnectionFactory​

用​​RedisTemplate​

与 Spring 中的其他模板类(例如 和 )一样,它是一个简化 Redis 数据访问代码的辅助类。 有关及其变体的更多信息(例如),请参阅 Spring Data Redis 文档。​​JdbcTemplate​​​​JmsTemplate​​​​RedisTemplate​​​​RedisTemplate​​​​StringRedisTemplate​

以下示例演示如何在 Java 中创建 的实例:​​RedisTemplate​

RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);

下面的示例展示了如何在 Spring 的 XML 配置中创建 的实例:​​RedisTemplate​

<bean id="redisTemplate"
class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>

使用 Redis 发送消息

如简介中所述,Redis 通过其 、 和命令提供对发布-订阅消息传递的支持。 与JMS和AMQP一样,Spring Integration提供了消息通道和适配器,用于通过Redis发送和接收消息。​​PUBLISH​​​​SUBSCRIBE​​​​UNSUBSCRIBE​

瑞迪斯发布/订阅频道

与 JMS 类似,在某些情况下,生产者和使用者都打算成为同一应用程序的一部分,在同一进程中运行。 可以使用一对入站和出站通道适配器来实现此目的。 但是,与Spring Integration的JMS支持一样,有一种更简单的方法来解决这个用例。 您可以创建发布-订阅通道,如以下示例所示:

<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>

A 的行为很像 Spring 集成主命名空间中的普通元素。 它可以由任何终结点的 和属性引用。 不同之处在于,此通道由 Redis 主题名称支持:属性指定的值。 但是,与 JMS 不同的是,本主题不必提前创建,甚至不必由 Redis 自动创建。 在 Redis 中,主题是扮演地址角色的简单值。 生成者和使用者可以使用与其主题名称相同的值进行通信。 对此通道的简单订阅意味着可以在生产和消费终结点之间实现异步发布-订阅消息传递。 但是,与通过在简单的 Spring 集成元素中添加元素创建的异步消息通道不同,消息不会存储在内存中队列中。 相反,这些消息通过 Redis 传递,这使您可以依赖它对持久性和集群的支持以及与其他非 Java 平台的互操作性。​​publish-subscribe-channel​​​​<publish-subscribe-channel/>​​​​input-channel​​​​output-channel​​​​String​​​​topic-name​​​​String​​​​String​​​​<queue/>​​​​<channel/>​

Redis 入站通道适配器

Redis 入站通道适配器 () 以与其他入站适配器相同的方式将传入的 Redis 消息调整为 Spring 消息。 它接收特定于平台的消息(在本例中为 Redis),并使用策略将它们转换为 Spring 消息。 以下示例演示如何配置 Redis 入站通道适配器:​​RedisInboundChannelAdapter​​​​MessageConverter​

<int-redis:inbound-channel-adapter id="redisAdapter"
topics="thing1, thing2"
channel="receiveChannel"
error-channel="testErrorChannel"
message-converter="testConverter" />

<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>

<bean id="testConverter" class="things.something.SampleMessageConverter" />

前面的示例显示了 Redis 入站通道适配器的简单但完整的配置。 请注意,前面的配置依赖于熟悉的 Spring 范式,即自动发现某些 bean。 在这种情况下,将隐式注入适配器。 可以改用属性显式指定它。​​redisConnectionFactory​​​​connection-factory​

另请注意,上述配置为适配器注入了自定义 . 该方法类似于 JMS,其中实例用于在 Redis 消息和 Spring 集成消息有效负载之间进行转换。 默认值为 .​​MessageConverter​​​​MessageConverter​​​​SimpleMessageConverter​

入站适配器可以订阅多个主题名称,因此属性中的值集以逗号分隔。​​topics​

从 V3.0 开始,除了现有属性之外,入站适配器现在还具有该属性。 此属性包含一组以逗号分隔的 Redis 主题模式。 有关 Redis 发布-订阅的更多信息,请参阅 Redis 发布/订阅。​​topics​​​​topic-patterns​

入站适配器可以使用 对 Redis 消息正文进行反序列化。 的属性可以设置为空字符串,这将生成属性的值。 在这种情况下,Redis 消息的原始正文作为消息有效负载提供。​​RedisSerializer​​​​serializer​​​​<int-redis:inbound-channel-adapter>​​​​null​​​​RedisSerializer​​​​byte[]​

从 V5.0 开始,您可以使用 的属性向入站适配器提供实例。 此外,收到的 Spring 集成消息现在具有指示已发布消息来源的标头:主题或模式。 您可以将此下游用于路由逻辑。​​Executor​​​​task-executor​​​​<int-redis:inbound-channel-adapter>​​​​RedisHeaders.MESSAGE_SOURCE​

Redis 出站通道适配器

Redis 出站通道适配器将传出的 Spring 集成消息调整为 Redis 消息,方式与其他出站适配器相同。 它接收 Spring 集成消息,并使用策略将它们转换为特定于平台的消息(在本例中为 Redis)。 以下示例演示如何配置 Redis 出站通道适配器:​​MessageConverter​

<int-redis:outbound-channel-adapter id="outboundAdapter"
channel="sendChannel"
topic="thing1"
message-converter="testConverter"/>

<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379"/>
</bean>

<bean id="testConverter" class="things.something.SampleMessageConverter" />

该配置与 Redis 入站通道适配器并行。 适配器隐式注入 ,该 定义为其 Bean 名称。 此示例还包括可选(和自定义)(bean)。​​RedisConnectionFactory​​​​redisConnectionFactory​​​​MessageConverter​​​​testConverter​

从 Spring Integration 3.0 开始,提供了该属性的替代方法:您可以使用该属性在运行时确定消息的 Redis 主题。 这些属性是互斥的。​​<int-redis:outbound-channel-adapter>​​​​topic​​​​topic-expression​

Redis 队列入站通道适配器

Spring Integration 3.0引入了一个队列入站通道适配器,用于从Redis列表中“弹出”消息。 默认情况下,它使用“右弹出”,但您可以将其配置为使用“左弹出”。 适配器是消息驱动的。 它使用内部侦听器线程,不使用轮询器。

以下清单显示了 的所有可用属性:​​queue-inbound-channel-adapter​

<int-redis:queue-inbound-channel-adapter id=""  
channel=""
auto-startup=""
phase=""
connection-factory=""
queue=""
error-channel=""
serializer=""
receive-timeout=""
recovery-interval=""
expect-message=""
task-executor=""
right-pop=""/>

组件 Bean 名称。 如果未提供该属性,则会在应用程序上下文中创建并注册 ,并将此属性作为 Bean 名称。 在这种情况下,端点本身使用 Bean 名称加 . (如果 Bean 名称为 ,则端点注册为 。​​channel​​​​DirectChannel​​​​id​​​​id​​​​.adapter​​​​thing1​​​​thing1.adapter​

要从此终端节点向其发送实例的实例。​​MessageChannel​​​​Message​

一个属性,用于指定此终结点是否应在应用程序上下文启动后自动启动。 默认为 .​​SmartLifecycle​​​​true​

一个属性,用于指定启动此终结点的阶段。 默认为 .​​SmartLifecycle​​​​0​

对 Bean 的引用。 默认为 .​​RedisConnectionFactory​​​​redisConnectionFactory​

执行基于队列的“pop”操作以获取 Redis 消息的 Redis 列表的名称。

从终端节点的侦听任务收到异常时向其发送实例的实例。 默认情况下,基础使用应用程序上下文中的默认值。​​MessageChannel​​​​ErrorMessage​​​​MessagePublishingErrorHandler​​​​errorChannel​

豆引用。 它可以是一个空字符串,这意味着“没有序列化程序”。 在这种情况下,来自入站 Redis 消息的原始消息将作为有效负载发送到 。 默认情况下,它是一个 .​​RedisSerializer​​​​byte[]​​​​channel​​​​Message​​​​JdkSerializationRedisSerializer​

“pop”操作等待队列中的 Redis 消息的超时(以毫秒为单位)。 默认值为 1 秒。

侦听器任务在重新启动侦听器任务之前,在“pop”操作出现异常后应休眠的时间(以毫秒为单位)。

指定此终端节点是否期望 Redis 队列中的数据包含整个实例。 如果此属性设置为 ,则不能为空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。 其默认值为 。​​Message​​​​true​​​​serializer​​​​false​

对 Spring (或标准 JDK 1.5+) bean 的引用。 它用于基础侦听任务。 它默认为 .​​TaskExecutor​​​​Executor​​​​SimpleAsyncTaskExecutor​

指定此端点应使用“右弹出”(当)还是“左弹出”(当)从 Redis 列表中读取消息。 如果 ,则 Redis 列表在与默认 Redis 队列出站通道适配器一起使用时充当队列。 将其设置为与通过“右推”写入列表的软件一起使用,或实现类似堆栈的消息顺序。 其默认值为 。 从版本 4.3 开始。​​true​​​​false​​​​true​​​​FIFO​​​​false​​​​true​

必须配置多个线程进行处理;否则,当 在出现错误后尝试重新启动侦听器任务时,可能会出现死锁。 可用于处理这些错误,以避免重新启动,但最好不要将应用程序暴露在可能的死锁情况下。 有关可能的实现,请参阅 Spring 框架参考手册​。​​task-executor​​​​RedisQueueMessageDrivenEndpoint​​​​errorChannel​​​​TaskExecutor​

Redis 队列出站通道适配器

Spring Integration 3.0引入了一个队列出站通道适配器,用于从Spring Integration消息“推送”到Redis列表。 默认情况下,它使用“左推”,但您可以将其配置为使用“右推”。 以下清单显示了 Redis 的所有可用属性:​​queue-outbound-channel-adapter​

<int-redis:queue-outbound-channel-adapter id=""  
channel=""
connection-factory=""
queue=""
queue-expression=""
serializer=""
extract-payload=""
left-push=""/>


组件 Bean 名称。 如果未提供该属性,则会在应用程序上下文中创建并注册 ,并将此属性作为 Bean 名称。 在这种情况下,端点注册的 Bean 名称为 加 。 (如果 Bean 名称为 ,则端点注册为 。​​channel​​​​DirectChannel​​​​id​​​​id​​​​.adapter​​​​thing1​​​​thing1.adapter​


此终结点从中接收实例的 。​​MessageChannel​​​​Message​


对 Bean 的引用。 默认为 .​​RedisConnectionFactory​​​​redisConnectionFactory​


执行基于队列的“推送”操作以发送 Redis 消息的 Redis 列表的名称。 此属性与 互斥。​​queue-expression​


用于确定 Redis 列表名称的 SpEL。 它使用运行时的传入作为变量。 此属性与 互斥。​​Expression​​​​Message​​​​#root​​​​queue​


豆类引用。 它默认为 . 但是,对于有效负载,如果未提供引用,则使用 a。​​RedisSerializer​​​​JdkSerializationRedisSerializer​​​​String​​​​StringRedisSerializer​​​​serializer​


指定此终端节点是应仅将有效负载发送还是将整个负载发送到 Redis 队列。 默认为 .​​Message​​​​true​


指定此端点应使用“左推”(当)还是“右推”(当)将消息写入 Redis 列表。 如果为 ,则 Redis 列表在与默认 Redis 队列入站通道适配器一起使用时充当队列。 将其设置为与以“左弹出”从列表中读取的软件一起使用,或实现类似堆栈的消息顺序。 默认为 . 从版本 4.3 开始。​​true​​​​false​​​​true​​​​FIFO​​​​false​​​​true​

瑞迪斯应用程序事件

从 Spring Integration 3.0 开始,Redis 模块提供了 的实现,而 又是一个 . 封装了 Redis 操作的异常(端点是事件的“源”)。 例如,在捕获操作中的异常后发出这些事件。 例外可以是任何泛型或 . 使用 处理这些事件对于确定后台 Redis 任务的问题和采取管理操作非常有用。​​IntegrationEvent​​​​org.springframework.context.ApplicationEvent​​​​RedisExceptionEvent​​​​<int-redis:queue-inbound-channel-adapter/>​​​​BoundListOperations.rightPop​​​​org.springframework.data.redis.RedisSystemException​​​​org.springframework.data.redis.RedisConnectionFailureException​​​​<int-event:inbound-channel-adapter/>​

红与红消息存储

企业集成模式 (EIP) 一书中所述,邮件存储允许您保留消息。 当考虑可靠性时,这在处理具有缓冲消息功能的组件(聚合器、重新排序器等)时非常有用。 在 Spring 集成中,该策略还为声明检查模式提供了基础,EIP 中也有描述。​​MessageStore​

Spring 集成的 Redis 模块提供了 . 以下示例演示如何将其与聚合器一起使用:​​RedisMessageStore​

<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="redisMessageStore"/>

前面的示例是一个 Bean 配置,它需要 a 作为构造函数参数。​​RedisConnectionFactory​

缺省情况下,使用 Java 序列化来序列化消息。 但是,如果要使用其他序列化技术(如 JSON),可以通过设置 .​​RedisMessageStore​​​​valueSerializer​​​​RedisMessageStore​

从版本 4.3.10 开始,框架分别为实例和实例提供了 Jackson 序列化程序和解序列化程序实现 — 和 。 必须使用的选项配置它们。 此外,还应设置为每个序列化的复杂对象添加类型信息(如果您信任源)。 然后在反序列化期间使用该类型信息。 该框架提供了一个名为 的实用程序方法,该方法已随前面提到的所有属性和序列化程序一起提供。 此实用程序方法附带一个参数,用于限制 Java 包进行反序列化以避免安全漏洞。 默认的受信任软件包:、、。 若要在 中管理 JSON 序列化,必须以类似于以下示例的方式对其进行配置:​​Message​​​​MessageHeaders​​​​MessageJacksonDeserializer​​​​MessageHeadersJacksonSerializer​​​​SimpleModule​​​​ObjectMapper​​​​enableDefaultTyping​​​​ObjectMapper​​​​JacksonJsonUtils.messagingAwareMapper()​​​​trustedPackages​​​​java.util​​​​java.lang​​​​org.springframework.messaging.support​​​​org.springframework.integration.support​​​​org.springframework.integration.message​​​​org.springframework.integration.store​​​​RedisMessageStore​

RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
store.setValueSerializer(serializer);

从版本 4.3.12 开始,支持允许区分同一 Redis 服务器上的存储实例的选项。​​RedisMessageStore​​​​prefix​

Redis 通道消息存储库

前面显示的内容将每个组维护为单个键(组 ID)下的值。 虽然您可以使用它来支持持久性,但为此目的提供了一个专用的(从版本 4.0 开始)。 此存储在每个通道、发送消息和接收消息时使用 。 默认情况下,此存储也使用 JDK 序列化,但您可以修改值序列化程序,如前所述。​​RedisMessageStore​​​​QueueChannel​​​​RedisChannelMessageStore​​​​LIST​​​​LPUSH​​​​RPOP​

我们建议使用此存储支持通道,而不是使用常规 . 以下示例定义了一个 Redis 消息存储库,并在具有队列的通道中使用它:​​RedisMessageStore​

<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
<int:queue message-store="redisMessageStore"/>
<int:channel>

用于存储数据的键的格式为:(在前面的示例中,)。​​<storeBeanName>:<channelId>​​​​redisMessageStore:somePersistentQueueChannel​

此外,还提供了子类。 当您将其与 一起使用时,将按 (FIFO) 优先级顺序接收消息。 它使用标准标头并支持优先级值 ()。 具有其他优先级的消息(以及没有优先级的消息)将按FIFO顺序在任何具有优先级的消息之后检索。​​RedisChannelPriorityMessageStore​​​​QueueChannel​​​​IntegrationMessageHeaderAccessor.PRIORITY​​​​0 - 9​

这些存储仅实现而不实现 。 它们只能用于支持.​​BasicMessageGroupStore​​​​MessageGroupStore​​​​QueueChannel​

红地元数据存储

Spring Integration 3.0引入了一个新的基于Redis的MetadataStore(参见Metadata Store)实现。 可以使用 来维护跨应用程序重新启动的状态。 您可以将此新实现与适配器一起使用,例如:​​RedisMetadataStore​​​​MetadataStore​​​​MetadataStore​

  • 饲料
  • 文件
  • 邮票
  • 自来水龙

要指示这些适配器使用 new ,请声明一个名为 的 Spring Bean。 源入站通道适配器和源入站通道适配器都自动拾取并使用声明的 . 下面的示例演示如何声明这样的 Bean:​​RedisMetadataStore​​​​metadataStore​​​​RedisMetadataStore​

<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore">
<constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>

由RedisProperties支持。 与它的交互使用 BoundHashOperations,这反过来又需要整个存储区。 在 的情况下,这扮演了一个区域的角色,这在分布式环境中很有用,当多个应用程序使用相同的 Redis 服务器时。 缺省情况下,此值为 。​​RedisMetadataStore​​​​key​​​​Properties​​​​MetadataStore​​​​key​​​​key​​​​MetaData​

从版本 4.0 开始,此存储实现了 ,使其在多个应用程序实例之间可靠地共享,其中只允许一个实例存储或修改键的值。​​ConcurrentMetadataStore​

您不能将 (例如,在 中)与 Redis 集群一起使用,因为当前不支持原子性命令。​​RedisMetadataStore.replace()​​​​AbstractPersistentAcceptOnceFileListFilter​​​​WATCH​

Redis 存储入站通道适配器

Redis 存储入站通道适配器是一个轮询使用者,它从 Redis 集合读取数据并将其作为有效负载发送。 以下示例演示如何配置 Redis 存储入站通道适配器:​​Message​

<int-redis:store-inbound-channel-adapter id="listAdapter"
connection-factory="redisConnectionFactory"
key="myCollection"
channel="redisChannel"
collection-type="LIST" >
<int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter>

前面的示例演示如何使用元素配置 Redis 存储入站通道适配器,为各种属性提供值,例如:​​store-inbound-channel-adapter​

  • ​key​​或:正在使用的集合的密钥的名称。key-expression
  • ​collection-type​​:此适配器支持的集合类型的枚举。 支持的集合包括 、、 、 和 。LISTSETZSETPROPERTIESMAP
  • ​connection-factory​​:对 实例的引用。o.s.data.redis.connection.RedisConnectionFactory
  • ​redis-template​​:对 实例的引用。o.s.data.redis.core.RedisTemplate
  • 所有其他入站适配器通用的其他属性(例如“通道”)。

不能同时设置 和 。​​redis-template​​​​connection-factory​


默认情况下,适配器使用 . 这将使用键、值、哈希键和哈希值的实例。 如果 Redis 存储包含使用其他技术序列化的对象,则必须提供配置了适当序列化程序的对象。 例如,如果使用设置为 的 Redis 存储出站适配器写入存储,则必须提供如下所示的配置:​​StringRedisTemplate​​​​StringRedisSerializer​​​​RedisTemplate​​​​extract-payload-elements​​​​false​​​​RedisTemplate​

<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="redisConnectionFactory"/>
<property name="keySerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
</property>
<property name="hashKeySerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
</property>
</bean>


对键和哈希键使用序列化程序,对值和哈希值使用默认的 JDK 序列化序列化程序。​​RedisTemplate​​​​String​


因为它具有 的文本值,所以前面的示例相对简单且静态。 有时,您可能需要根据某些条件在运行时更改密钥的值。 为此,请改用,其中提供的表达式可以是任何有效的 SpEL 表达式。​​key​​​​key-expression​

此外,您可能希望对从 Redis 集合读取的成功处理数据执行一些后处理。 例如,您可能希望在处理完值后移动或删除该值。 您可以使用 Spring Integration 2.2 中添加的事务同步功能来执行此操作。 以下示例使用和事务同步:​​key-expression​

<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization"
connection-factory="redisConnectionFactory"
key-expression="'presidents'"
channel="otherRedisChannel"
auto-startup="false"
collection-type="ZSET">
<int:poller fixed-rate="1000" max-messages-per-poll="2">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-redis:store-inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.removeByScore(18, 18)"/>
</int:transaction-synchronization-factory>

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>

可以使用元素将轮询器声明为事务性轮询器。 此元素可以引用真正的事务管理器(例如,如果流的其他部分调用 JDBC)。 如果您没有“真实”事务,则可以使用 ,这是 Spring 的实现,可以在没有实际事务时使用 Redis 适配器的事务同步功能。​​transactional​​​​o.s.i.transaction.PseudoTransactionManager​​​​PlatformTransactionManager​

这不会使 Redis 活动本身成为事务性活动。 它允许在成功(提交)或失败(回滚)之前或之后执行操作的同步。

轮询器是事务性的后,您可以设置 on 元素的实例。 创建 的实例。 为方便起见,我们公开了一个默认的基于 SpEL 的 ,它允许您配置 SpEL 表达式,其执行与事务协调(同步)。 支持提交前、提交后和回滚后的表达式,以及发送评估结果(如果有)的通道(每种事件一个)。 对于每个子元素,可以指定 和 属性。 如果仅存在该属性,则接收到的消息将作为特定同步方案的一部分发送到那里。 如果仅存在该属性,并且表达式的结果为非 null 值,则会生成一条消息,并将结果作为有效负载发送到默认通道 (),并显示在日志中(在级别)。 如果您希望评估结果转到特定渠道,请添加属性。 如果表达式的结果为 null 或 void,则不会生成任何消息。​​o.s.i.transaction.TransactionSynchronizationFactory​​​​transactional​​​​TransactionSynchronizationFactory​​​​TransactionSynchronization​​​​TransactionSynchronizationFactory​​​​expression​​​​channel​​​​channel​​​​expression​​​​NullChannel​​​​DEBUG​​​​channel​

有关事务同步的详细信息,请参阅事务同步。

RedisStore 出站通道适配器

RedisStore 出站通道适配器允许您将消息负载写入 Redis 集合,如以下示例所示:

<int-redis:store-outbound-channel-adapter id="redisListAdapter"
collection-type="LIST"
channel="requestChannel"
key="myCollection" />

前面的配置 Redis 存储出站通道适配器使用该元素。 它为各种属性提供值,例如:​​store-inbound-channel-adapter​

  • ​key​​或:正在使用的集合的密钥的名称。key-expression
  • ​extract-payload-elements​​:如果设置为 (默认值) 并且有效负载是“多值”对象(即 a 或 a)的实例,则使用 “addAll” 和 “putAll” 语义存储该对象。 否则,如果设置为 ,则有效负载将存储为单个条目,而不管其类型如何。 如果有效负载不是“多值”对象的实例,则忽略此属性的值,并且有效负载始终存储为单个条目。trueCollectionMapfalse
  • ​collection-type​​:此适配器支持的类型的枚举。 支持的集合包括 、、 、 和 。CollectionLISTSETZSETPROPERTIESMAP
  • ​map-key-expression​​:返回所存储条目的密钥名称的 SpEL 表达式。 仅当 is or 和 'extract-payload-elements' 为 false 时,它才适用。collection-typeMAPPROPERTIES
  • ​connection-factory​​:对 实例的引用。o.s.data.redis.connection.RedisConnectionFactory
  • ​redis-template​​:对 实例的引用。o.s.data.redis.core.RedisTemplate
  • 所有其他入站适配器通用的其他属性(例如“通道”)。

不能同时设置 和 。​​redis-template​​​​connection-factory​

默认情况下,适配器使用 . 这将使用键、值、哈希键和哈希值的实例。 但是,如果设置为 ,则将使用具有键和哈希键的实例以及值和哈希值的实例 s。 使用 JDK 序列化程序时,了解 Java 序列化用于所有值非常重要,无论该值是否实际上是集合。 如果需要对值的序列化进行更多控制,请考虑提供自己的值,而不是依赖这些默认值。​​StringRedisTemplate​​​​StringRedisSerializer​​​​extract-payload-elements​​​​false​​​​RedisTemplate​​​​StringRedisSerializer​​​​JdkSerializationRedisSerializer​​​​RedisTemplate​

由于它具有 和其他属性的文本值,因此前面的示例相对简单且静态。 有时,您可能需要根据某些条件在运行时动态更改值。 为此,请使用它们的等效项(、 等),其中提供的表达式可以是任何有效的 SpEL 表达式。​​key​​​​-expression​​​​key-expression​​​​map-key-expression​

Redis 出站命令网关

Spring Integration 4.0 引入了 Redis 命令网关,允许您使用通用方法执行任何标准的 Redis 命令。 以下清单显示了 Redis 出站网关的可用属性:​​RedisConnection#execute​

<int-redis:outbound-gateway
request-channel=""
reply-channel=""
requires-reply=""
reply-timeout=""
connection-factory=""
redis-template=""
arguments-serializer=""
command-expression=""
argument-expressions=""
use-command-variable=""
arguments-strategy="" />

此终结点从中接收实例的 。​​MessageChannel​​​​Message​

此终结点发送回复实例的位置。​​MessageChannel​​​​Message​

指定此出站网关是否必须返回非空值。 默认为 . 当 Redis 返回值时抛出 A。​​true​​​​ReplyRequiredException​​​​null​

等待回复消息发送的超时(以毫秒为单位)。 它通常用于基于队列的有限回复通道。

对 Bean 的引用。 默认为 . 它与“redis-template”属性互斥。​​RedisConnectionFactory​​​​redisConnectionFactory​

对 Bean 的引用。 它与“连接工厂”属性互斥。​​RedisTemplate​

对 实例的引用。 如有必要,它用于将每个命令参数序列化为 byte[]。​​org.springframework.data.redis.serializer.RedisSerializer​

返回命令键的 SpEL 表达式。 它默认为邮件头。 它不得计算为 。​​redis_command​​​​null​

计算为命令参数的逗号分隔的 SpEL 表达式。 与属性互斥。 如果这两个属性都未提供,则 将用作命令参数。 参数表达式的计算结果可以为“null”以支持可变数量的参数。​​arguments-strategy​​​​payload​

一个标志,用于指定在配置时间时,评估的 Redis 命令字符串是否可用作表达式评估上下文中的变量。 否则,将忽略此属性。​​boolean​​​​#cmd​​​​o.s.i.redis.outbound.ExpressionArgumentsStrategy​​​​argument-expressions​

对 实例的引用。 它与属性互斥。 如果这两个属性都未提供,则 将用作命令参数。​​o.s.i.redis.outbound.ArgumentsStrategy​​​​argument-expressions​​​​payload​

您可以将 用作通用组件来执行任何所需的 Redis 操作。 以下示例演示如何从 Redis 原子序数获取递增值:​​<int-redis:outbound-gateway>​

<int-redis:outbound-gateway request-channel="requestChannel"
reply-channel="replyChannel"
command-expression="'INCR'"/>

有效负载应具有 的名称 ,该名称可能由 Bean 定义提供。​​Message​​​​redisCounter​​​​org.springframework.data.redis.support.atomic.RedisAtomicInteger​

该方法具有泛型作为其返回类型。 实际结果取决于命令类型。 例如,返回一个 . 有关命令及其参数和结果类型的更多信息,请参阅 Redis 规范。​​RedisConnection#execute​​​​Object​​​​MGET​​​​List<byte[]>​

Redis 队列出站网关

Spring 集成引入了 Redis 队列出站网关来执行请求和回复场景。 它将对话推送到提供的 ,将该值作为其键推送到 Redis 列表,并等待来自键为 . 每个交互使用不同的 UUID。 以下清单显示了 Redis 出站网关的可用属性:​​UUID​​​​queue​​​​UUID​​​​UUID' plus '.reply​

<int-redis:queue-outbound-gateway
request-channel=""
reply-channel=""
requires-reply=""
reply-timeout=""
connection-factory=""
queue=""
order=""
serializer=""
extract-payload=""/>

此终结点从中接收实例的 。​​MessageChannel​​​​Message​

此终结点发送回复实例的位置。​​MessageChannel​​​​Message​

指定此出站网关是否必须返回非空值。 默认情况下,此值为。 否则,当 Redis 返回值时会抛出 a。​​false​​​​ReplyRequiredException​​​​null​

等待回复消息发送的超时(以毫秒为单位)。 它通常用于基于队列的有限回复通道。

对 Bean 的引用。 默认为 . 它与“redis-template”属性互斥。​​RedisConnectionFactory​​​​redisConnectionFactory​

出站网关向其发送对话的 Redis 列表的名称。​​UUID​

注册多个网关时此出站网关的顺序。

豆引用。 它可以是一个空字符串,这意味着“没有序列化程序”。 在这种情况下,来自入站 Redis 消息的原始消息将作为有效负载发送到 。 默认情况下,它是一个 .​​RedisSerializer​​​​byte[]​​​​channel​​​​Message​​​​JdkSerializationRedisSerializer​

指定此终端节点是否期望 Redis 队列中的数据包含整个实例。 如果此属性设置为 ,则不能为空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。​​Message​​​​true​​​​serializer​

Redis 队列入站网关

Spring Integration 4.1 引入了 Redis 队列入站网关来执行请求和回复场景。 它从提供的 中弹出一个对话,从 Redis 列表中弹出值作为其键,并使用加号键将回复推送到 Redis 列表。 以下清单显示了 Redis 队列入站网关的可用属性:​​UUID​​​​queue​​​​UUID​​​​UUID​​​​.reply​

<int-redis:queue-inbound-gateway
request-channel=""
reply-channel=""
executor=""
reply-timeout=""
connection-factory=""
queue=""
order=""
serializer=""
receive-timeout=""
expect-message=""
recovery-interval=""/>

此终端节点发送从 Redis 数据创建的实例的位置。​​MessageChannel​​​​Message​

此终结点从中等待回复实例的位置。 可选 - 标头仍在使用中。​​MessageChannel​​​​Message​​​​replyChannel​

对 Spring(或标准 JDK)bean 的引用。 它用于基础侦听任务。 它默认为 .​​TaskExecutor​​​​Executor​​​​SimpleAsyncTaskExecutor​

等待回复消息发送的超时(以毫秒为单位)。 它通常用于基于队列的有限回复通道。

对 Bean 的引用。 默认为 . 它与“redis-template”属性互斥。​​RedisConnectionFactory​​​​redisConnectionFactory​

对话的 Redis 列表的名称。​​UUID​

注册多个网关时此入站网关的顺序。

豆引用。 它可以是一个空字符串,这意味着“没有序列化程序”。 在这种情况下,来自入站 Redis 消息的原始消息将作为有效负载发送到 。 它默认为 . (请注意,在 4.3 之前的版本中,它是默认的。 要恢复该行为,请提供对 的引用。​​RedisSerializer​​​​byte[]​​​​channel​​​​Message​​​​JdkSerializationRedisSerializer​​​​StringRedisSerializer​​​​StringRedisSerializer​

等待获取接收消息的超时(以毫秒为单位)。 它通常用于基于队列的有限请求通道。

指定此终端节点是否期望 Redis 队列中的数据包含整个实例。 如果此属性设置为 ,则不能为空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。​​Message​​​​true​​​​serializer​

在重新启动侦听器任务之前,侦听器任务在“右弹出”操作出现异常后应休眠的时间(以毫秒为单位)。

必须配置多个线程进行处理;否则,当 在出现错误后尝试重新启动侦听器任务时,可能会出现死锁。 可用于处理这些错误,以避免重新启动,但最好不要将应用程序暴露在可能的死锁情况下。 有关可能的实现,请参阅 Spring 框架参考手册​。​​task-executor​​​​RedisQueueMessageDrivenEndpoint​​​​errorChannel​​​​TaskExecutor​

Redis 流出站通道适配器

Spring Integration 5.4 引入了反应式 Redis 流出站通道适配器,用于将消息负载写入 Redis 流。 出站通道适配器用于向流中添加 。 以下示例演示如何将 Java 配置和服务类用于 Redis 流出站通道适配器。​​ReactiveStreamOperations.add(…)​​​​Record​

@Bean
@ServiceActivator(inputChannel = "messageChannel")
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey");
reactiveStreamMessageHandler.setSerializationContext(serializationContext);
reactiveStreamMessageHandler.setHashMapper(hashMapper);
reactiveStreamMessageHandler.setExtractPayload(true);
return reactiveStreamMessageHandler;
}


构造使用 和流名称的实例以添加记录。 另一个构造函数变体基于 SpEL 表达式,用于根据请求消息评估流密钥。​​ReactiveRedisStreamMessageHandler​​​​ReactiveRedisConnectionFactory​


设置用于在添加到流之前序列化记录键和值。​​RedisSerializationContext​


Set,提供Java类型和Redis哈希/映射之间的契约。​​HashMapper​


如果为“true”,通道适配器将从请求消息中提取有效负载,以便添加流记录。 或者使用整条消息作为值。 默认为 .​​true​

Redis 流入站通道适配器

Spring Integration 5.4 引入了 Reactive Stream 入站通道适配器,用于从 Redis 流读取消息。 入站通道适配器使用 或基于自动确认标志从 Redis 流读取记录。 以下示例演示如何将 Java 配置用于 Redis 流入站通道适配器。​​StreamReceiver.receive(…)​​​​StreamReceiver.receiveAutoAck()​

@Bean
public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageProducer messageProducer =
new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey");
messageProducer.setStreamReceiverOptions(
StreamReceiver.StreamReceiverOptions.builder()
.pollTimeout(Duration.ofMillis(100))
.build());
messageProducer.setAutoStartup(true);
messageProducer.setAutoAck(false);
messageProducer.setCreateConsumerGroup(true);
messageProducer.setConsumerGroup("my-group");
messageProducer.setConsumerName("my-consumer");
messageProducer.setOutputChannel(fromRedisStreamChannel);
messageProducer.setReadOffset(ReadOffset.latest());
messageProducer.extractPayload(true);
return messageProducer;
}

构造一个使用和流式传输密钥读取记录的实例。​​ReactiveRedisStreamMessageProducer​​​​ReactiveRedisConnectionFactory​

A 使用反应式基础设施使用 Redis 流。​​StreamReceiver.StreamReceiverOptions​

一个属性,用于指定此终结点是否应在应用程序上下文启动后自动启动。 默认为 . 如果 ,应手动启动。​​SmartLifecycle​​​​true​​​​false​​​​RedisStreamMessageProducer​​​​messageProducer.start()​

如果为 ,则不会自动确认收到的消息。 消息的确认将延迟到使用消息的客户端。 默认为 .​​false​​​​true​

如果为 ,将创建一个使用者组。 在创建消费者组流期间,还将创建(如果尚不存在)。 使用者组跟踪消息传递并区分使用者。 默认为 .​​true​​​​false​

设置使用者组名称。 它默认为定义的 Bean 名称。

设置使用者名称。 从组 读取消息。​​my-consumer​​​​my-group​

要从此终结点向其发送消息的消息通道。

定义要读取消息的偏移量。 默认为 .​​ReadOffset.latest()​

如果为“true”,通道适配器将从 中提取有效负载值。 否则,整体将用作有效负载。 默认为 .​​Record​​​​Record​​​​true​

如果设置为 ,则 Redis 驱动程序不会自动确认 Redis 流中的标头,而是将标头添加到消息中,以实例作为值生成。 目标集成流的责任是在基于此类记录为消息完成业务逻辑时调用其回调。 即使在反序列化期间发生异常并进行了配置,也需要类似的逻辑。 因此,目标错误处理程序必须决定确认或纳克此类失败的消息。 与 一起,还将这些标头填充到消息中以生成:、 和 。​​autoAck​​​​false​​​​Record​​​​IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK​​​​SimpleAcknowledgment​​​​acknowledge()​​​​errorChannel​​​​IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK​​​​ReactiveRedisStreamMessageProducer​​​​RedisHeaders.STREAM_KEY​​​​RedisHeaders.STREAM_MESSAGE_ID​​​​RedisHeaders.CONSUMER_GROUP​​​​RedisHeaders.CONSUMER​

从版本 5.5 开始,您可以在 上显式配置选项,包括新引入的函数,如果 Redis Stream 使用者在发生反序列化错误时应继续轮询,则需要该函数。 默认函数将消息发送到错误通道(如果提供),并可能确认失败的消息,如上所述。 所有这些都与外部提供的.​​StreamReceiver.StreamReceiverOptionsBuilder​​​​ReactiveRedisStreamMessageProducer​​​​onErrorResume​​​​StreamReceiver.StreamReceiverOptionsBuilder​​​​StreamReceiver.StreamReceiverOptions​

瑞迪斯锁注册表

Spring Integration 4.0 引入了 . 某些组件(例如,聚合器和重新排序器)使用从实例获取的锁来确保一次只有一个线程操作一个组。 在单个组件中执行此功能。 您现在可以在这些组件上配置外部锁定注册表。 当您将它与共享 一起使用时,您可以使用 跨多个应用程序实例提供此功能,以便一次只有一个实例可以操作该组。​​RedisLockRegistry​​​​LockRegistry​​​​DefaultLockRegistry​​​​MessageGroupStore​​​​RedisLockRegistry​

当一个本地线程释放一个锁时,另一个本地线程通常可以立即获取该锁。 如果线程使用不同的注册表实例释放锁,则最多可能需要 100 毫秒才能获取锁。

若要避免“挂起”锁(当服务器发生故障时),此注册表中的锁将在默认的 60 秒后过期,但您可以在注册表上配置此值。 锁的持有时间通常要短得多。

由于密钥可能会过期,因此尝试解锁过期的锁会导致引发异常。 但是,受此类锁保护的资源可能已受到损害,因此应将此类异常视为严重。 应将过期时间设置为足够大的值以防止出现这种情况,但应将其设置得足够低,以便在服务器发生故障后可以在合理的时间内恢复锁定。

从版本 5.0 开始,实现 ,这将删除上次获取的锁,这些锁当前未锁定。​​RedisLockRegistry​​​​ExpirableLockRegistry​​​​age​

5.5.6 版本的字符串,支持通过 . 有关更多信息,请参阅其 JavaDocs。​​RedisLockRegistry​​​​RedisLockRegistry.locks​​​​RedisLockRegistry.setCacheCapacity()​

字符串 对于版本 5.5.13,公开了一个选项,用于确定应在哪种模式下进行 Redis 锁获取:​​RedisLockRegistry​​​​setRedisLockType(RedisLockType)​

  • ​RedisLockType.SPIN_LOCK​​- 锁通过周期循环(100ms)获取,检查是否可以获取锁。 违约。
  • ​RedisLockType.PUB_SUB_LOCK​​- 锁是通过 redis 发布订阅获取的。

发布-订阅是首选模式 - 客户端 Redis 服务器之间的网络抖动更少,性能更高 - 当订阅收到有关在其他进程中解锁的通知时,会立即获取锁定。 但是,Redis 不支持主/副本连接中的发布-订阅(例如在 AWS ElastiCache 环境中),因此选择繁忙旋转模式作为默认值,以使注册表在任何环境中工作。

标签:Spring,适配器,Redis,Integration,实例,消息,序列化
From: https://blog.51cto.com/u_15326439/5933733

相关文章

  • Spring Boot Hello World
    在我使用熟悉的工具链中,好像​​vscode​​也能开发​​SpringBoot​​,但是我还是选择了专业的​​IDEA​​,因为穷,选择了社区版。之前在​​https://spring.io/​​根据......
  • SpringBoot整合RabbitMQ
    1、Maven依赖<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.0.RELEASE</vers......
  • redisson 延迟队列实现订单过期监听
    需求:     订单下单超过两个小时以后,如果还未支付,则自动转为取消支付状态实现:    1,创建延迟队列的监听任务RedisDelayedQueueListener,消费延迟队列 ......
  • Springboot整合mybatis依赖
    <!--    Springboot整合mybatis依赖-->    <dependency>      <groupId>org.mybatis.spring.boot</groupId>      <artifact......
  • spring boot 跨域
    springboot提供了两种跨域配置方式1.全局跨域2.局部跨域全局跨域packagecom.tons.config;importorg.springframework.context.annotation.Configuration;import......
  • Spring 集成支持通过 SFTP 进行文件传输操作
    Spring集成支持通过SFTP进行文件传输操作。安全文件传输协议(SFTP)是一种网络协议,可让您通过任何可靠的流在Internet上的两台计算机之间传输文件。SFTP协议需要一个安......
  • pom之 热部署spring-boot-devtools
    pom之热部署spring-boot-devtools <!--    热部署 Ctrl+Shift+Alt+/-->    <dependency>      <groupId>org.springframework.boot......
  • pom之 数据源信息 spring-boot-starter-jdbc ,mysql-connector-java
    <!--    数据源信息--><!--    <dependency>--><!--      <groupId>org.springframework.boot</groupId>--><!--      <arti......
  • Spring Integration 的资源支持
    资源入站通道适配器建立在Spring的抽象之上,以支持跨各种实际类型的底层资源(如文件、URL或类路径资源)的更大灵活性。因此,它与文件入站通道适配器类似,但比文件入站通道适......
  • Spring Integration 对RSocket 支持
    RSocketSpring集成模块()允许执行 RSocket应用协议​。​​spring-integration-rsocket​​您需要将此依赖项包含在项目中:<dependency><groupId>org.springframewor......