RSocket Spring 集成模块 () 允许执行 RSocket 应用协议。spring-integration-rsocket
您需要将此依赖项包含在项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
<version>6.0.0</version>
</dependency>
该模块从版本 5.2 开始提供,基于 Spring 消息传递基础及其 RSocket 组件实现,例如 、 和 。 请参阅 Spring Framework RSocket Support 以获取有关 RSocket 协议、术语和组件的更多信息。RSocketRequester
RSocketMessageHandler
RSocketStrategies
在通过通道适配器开始集成流处理之前,我们需要在服务器和客户端之间建立 RSocket 连接。 为此,Spring Integration RSocket 支持提供了 和 的实现。ServerRSocketConnector
ClientRSocketConnector
AbstractRSocketConnector
根据提供的用于接受来自客户端的连接,在主机和端口上公开侦听器。 可以使用 以及可配置的其他选项自定义内部实例,例如 以及有效负载数据和标头元数据。 当客户端请求者提供 时(见下文),连接的客户端将存储为 下由 确定的密钥下的 。 默认情况下,连接数据用于键,作为转换值转换为具有 UTF-8 字符集的字符串。 此类注册表可以在应用程序逻辑中使用,以确定特定的客户端连接以便与其交互,或将同一消息发布到所有连接的客户端。 从客户端建立连接时,将从 发出 。 这类似于 Spring 消息传递模块中的注释所提供的内容。 映射模式意味着接受所有客户端路由。 可用于通过标头区分不同的路由。ServerRSocketConnector
io.rsocket.transport.ServerTransport
RSocketServer
setServerConfigurer()
RSocketStrategies
MimeType
setupRoute
ClientRSocketConnector
RSocketRequester
clientRSocketKeyStrategy
BiFunction<Map<String, Object>, DataBuffer, Object>
RSocketRequester
RSocketConnectedEvent
ServerRSocketConnector
@ConnectMapping
*
RSocketConnectedEvent
DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER
典型的服务器配置可能如下所示:
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ServerRSocketConnector serverRSocketConnector() {
ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
+ headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
return serverRSocketConnector;
}
@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
...
}
所有选项(包括 bean 和 for )都是可选的。 有关更多信息,请参阅 JavaDocs。RSocketStrategies
@EventListener
RSocketConnectedEvent
ServerRSocketConnector
从版本 5.2.1 开始,将提取到公共的顶级类中,以便可能与现有 RSocket 服务器连接。 当 提供 的外部实例时,它不会在内部创建 RSocket 服务器,而只是将所有处理逻辑委托给提供的实例。 此外,还可以配置一个标志来处理RSocket控制器,完全取代标准提供的功能。 这在混合配置中非常有用,当经典方法与 RSocket 通道适配器一起存在于同一应用程序中,并且应用程序中存在外部配置的 RSocket 服务器时。ServerRSocketMessageHandler
ServerRSocketConnector
ServerRSocketMessageHandler
ServerRSocketMessageHandler
messageMappingCompatible
@MessageMapping
RSocketMessageHandler
@MessageMapping
作为支架,用于基于通过提供的 . 可以使用提供的 . (使用可选模板变量)和元数据也可以在此组件上配置。ClientRSocketConnector
RSocketRequester
RSocket
ClientTransport
RSocketConnector
RSocketConnectorConfigurer
setupRoute
setupData
典型的客户端配置可能如下所示:
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ClientRSocketConnector clientRSocketConnector() {
ClientRSocketConnector clientRSocketConnector =
new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
clientRSocketConnector.setSetupRoute("clientConnect/{user}");
clientRSocketConnector.setSetupRouteVariables("myUser");
return clientRSocketConnector;
}
这些选项中的大多数(包括 bean)都是可选的。 请注意我们如何在任意端口上连接到本地启动的 RSocket 服务器。 有关使用案例,请参阅。 另请参阅及其超类 JavaDocs 了解更多信息。RSocketStrategies
ServerRSocketConnector.clientRSocketKeyStrategy
setupData
ClientRSocketConnector
AbstractRSocketConnector
两者都负责将入站通道适配器映射到其配置,以路由传入的 RSocket 请求。 有关详细信息,请参阅下一节。ClientRSocketConnector
ServerRSocketConnector
path
RSocket 入站网关
负责接收 RSocket 请求并生成响应(如果有)。 它需要一个映射数组,这些映射可以是类似于 MVC 请求映射或语义的模式。 此外,(从版本 5.2.2 开始),可以在 上配置一组交互模型(请参阅),以按特定帧类型限制对此端点的 RSocket 请求。 默认情况下,支持所有交互模型。 这样的 bean,根据它的实现(扩展 ),由 或 自动检测内部传入请求的路由逻辑。 可以为显式终结点注册提供 。 这样,自动检测选项就会被禁用。 也可以注入到或者它们从所提供的覆盖任何显式注入中获得。 解码器用于根据提供的解码请求有效负载。 如果在传入 中未提供标头,则会将请求视为 RSocket 交互模型。 在这种情况下,对 执行普通操作。 否则,标头中的值用于向 RSocket 发送回复。 为此,对 执行操作。 根据逻辑,要向下游发送的消息始终是 a。 在 RSocket 交互模型中,消息具有纯转换的 . 回复可以是普通对象或 - 根据 中提供的编码器将它们正确转换为 RSocket 响应。RSocketInboundGateway
path
@MessageMapping
RSocketInteractionModel
RSocketInboundGateway
IntegrationRSocketEndpoint
ReactiveMessageHandler
ServerRSocketConnector
ClientRSocketConnector
IntegrationRSocketMessageHandler
AbstractRSocketConnector
RSocketInboundGateway
AbstractRSocketConnector
RSocketStrategies
RSocketInboundGateway
AbstractRSocketConnector
RSocketStrategies
requestElementType
RSocketPayloadReturnValueHandler.RESPONSE_HEADER
Message
RSocketInboundGateway
fireAndForget
RSocketInboundGateway
send
outputChannel
MonoProcessor
RSocketPayloadReturnValueHandler.RESPONSE_HEADER
RSocketInboundGateway
sendAndReceiveMessageReactive
outputChannel
payload
Flux
MessagingRSocket
fireAndForget
payload
payload
Publisher
RSocketInboundGateway
RSocketStrategies
从版本 5.3 开始,将向 中添加一个选项(默认 )。 默认情况下,传入的转换方式是每个事件的单独解码方式。 这是语义当前存在的确切行为。 要恢复以前的行为或根据应用程序要求将整体解码为单个单元,必须设置为 。 但是,目标解码逻辑取决于所选的逻辑,例如,a要求流中存在新的行分隔符(默认情况下)以指示字节缓冲区结束。decodeFluxAsUnit
false
RSocketInboundGateway
Flux
@MessageMapping
Flux
decodeFluxAsUnit
true
Decoder
StringDecoder
有关如何配置端点和处理下游有效负载的示例,请参阅使用 Java 配置 RSocket 端点。RSocketInboundGateway
RSocket 出站网关
这是一个,用于对 RSocket 执行请求并根据 RSocket 回复(如果有)生成回复。 低级别的 RSocket 协议交互被委托给服务器端请求消息中提供的解析或标头。 服务器端的目标可以通过 或使用 API 根据为连接请求映射选择的某些业务密钥进行解析。 有关更多信息,请参阅 JavaDocs。RSocketOutboundGateway
AbstractReplyProducingMessageHandler
RSocketRequester
ClientRSocketConnector
RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
RSocketRequester
RSocketConnectedEvent
ServerRSocketConnector.getClientRSocketRequester()
ServerRSocketConnector.setClientRSocketKeyStrategy()
ServerRSocketConnector
发送请求必须显式配置(与路径变量一起)或通过根据请求消息评估的 SpEL 表达式。route
RSocket 交互模型可以通过选项或相应的表达式设置提供。 默认情况下,a 用于常见的网关用例。RSocketInteractionModel
requestResponse
当请求消息有效负载为 时,可以提供根据目标中提供的对其元素进行编码的选项。 此选项的表达式的计算结果为 . 有关数据及其类型的更多信息,请参阅 JavaDocs。Publisher
publisherElementType
RSocketStrategies
RSocketRequester
ParameterizedTypeReference
RSocketRequester.RequestSpec.data()
RSocket 请求也可以使用 . 为此,可以在 上配置反对请求消息。 此类表达式的计算结果必须为 .metadata
metadataExpression
RSocketOutboundGateway
Map<Object, MimeType>
当不是时,必须提供。 默认情况下,它是一个。 此选项的表达式的计算结果为 . 有关回复数据及其类型的更多信息,请参阅 和 JavaDocs。interactionModel
fireAndForget
expectedResponseType
String.class
ParameterizedTypeReference
RSocketRequester.RetrieveSpec.retrieveMono()
RSocketRequester.RetrieveSpec.retrieveFlux()
来自 的回复是一个(即使对于交互模型也是如此)总是使这个组件为 . 这样的 a 在生产之前订阅到常规频道或由 . 或交互模型的响应也包装到回复中。 它可以通过使用直通服务激活器在下游展平:payload
RSocketOutboundGateway
Mono
fireAndForget
Mono<Void>
async
Mono
outputChannel
FluxMessageChannel
Flux
requestStream
requestChannel
Mono
FluxMessageChannel
@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
return payload;
}
或者在目标应用程序逻辑中显式订阅。
还可以配置(或通过表达式评估)预期的响应类型,以将此网关视为出站通道适配器。 但是,仍然必须配置(即使它只是一个)来启动对返回的订阅。void
outputChannel
NullChannel
Mono
有关如何配置端点以处理下游有效负载的示例,请参阅使用 Java 配置 RSocket 端点。RSocketOutboundGateway
RSocket 命名空间支持
Spring 集成提供了一个命名空间和相应的模式定义。 若要将其包含在配置中,请在应用程序上下文配置文件中添加以下命名空间声明:rsocket
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/rsocket
https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
...
</beans>
入境
要使用 XML 配置 Spring Integration RSocket 入站通道适配器,您需要使用命名空间中的相应组件。 以下示例演示如何配置它:inbound-gateway
int-rsocket
<int-rsocket:inbound-gateway id="inboundGateway"
path="testPath"
interaction-models="requestStream,requestChannel"
rsocket-connector="clientRSocketConnector"
request-channel="requestChannel"
rsocket-strategies="rsocketStrategies"
request-element-type="byte[]"/>
A 和应配置为通用定义。ClientRSocketConnector
ServerRSocketConnector
<bean>
出境
<int-rsocket:outbound-gateway id="outboundGateway"
client-rsocket-connector="clientRSocketConnector"
auto-startup="false"
interaction-model="fireAndForget"
route-expression="'testRoute'"
request-channel="requestChannel"
publisher-element-type="byte[]"
expected-response-type="java.util.Date"
metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>
有关所有这些 XML 属性的说明,请参阅。spring-integration-rsocket.xsd
使用 Java 配置 RSocket 端点
以下示例演示如何使用 Java 配置 RSocket 入站终结点:
@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
return rsocketInboundGateway;
}
@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
return payload.next().map(String::toUpperCase);
}
在此配置中假定为 OR,具有自动检测“echo”路径上此类端点的含义。 注意签名,因为它完全反应性地处理 RSocket 请求并生成反应性回复。ClientRSocketConnector
ServerRSocketConnector
@Transformer
以下示例显示如何使用 Java DSL 配置 RSocket 入站网关:
@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
return IntegrationFlow
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel))
.<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
.get();
}
在此配置中假定为 OR,其含义是自动检测“/大写”路径上的此类端点,并将预期的交互模型作为“请求通道”。ClientRSocketConnector
ServerRSocketConnector
以下示例演示如何使用 Java 配置 RSocket 出站网关:
@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
RSocketOutboundGateway rsocketOutboundGateway =
new RSocketOutboundGateway(
new FunctionExpression<Message<?>>((m) ->
m.getHeaders().get("route_header")));
rsocketOutboundGateway.setInteractionModelExpression(
new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
return rsocketOutboundGateway;
}
只有客户端才需要。 在服务器端,必须在请求消息中提供带有值的标头。setClientRSocketConnector()
RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
RSocketRequester
以下示例显示如何使用 Java DSL 配置 RSocket 出站网关:
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlow
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.interactionModel(RSocketInteractionModel.requestResponse)
.expectedResponseType(String.class)
.clientRSocketConnector(clientRSocketConnector))
.get();
}
请参阅集成流作为网关,了解有关如何使用上述流程开头提到的接口的更多信息。Function