首页 > 其他分享 >Spring Integration 对RSocket 支持

Spring Integration 对RSocket 支持

时间:2022-12-13 12:02:02浏览次数:71  
标签:ServerRSocketConnector Spring 配置 ClientRSocketConnector Integration 请参阅 rsocket 

Spring Integration 对RSocket 支持_spring

RSocket Spring 集成模块 () 允许执行 RSocket 应用协议​。​​spring-integration-rsocket​

您需要将此依赖项包含在项目中:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
<version>6.0.0</version>
</dependency>

该模块从版本 5.2 开始提供,基于 Spring 消息传递基础及其 RSocket 组件实现,例如 、 和 。 请参阅 Spring Framework RSocket Support 以获取有关 RSocket 协议、术语和组件的更多信息。​​RSocketRequester​​​​RSocketMessageHandler​​​​RSocketStrategies​

在通过通道适配器开始集成流处理之前,我们需要在服务器和客户端之间建立 RSocket 连接。 为此,Spring Integration RSocket 支持提供了 和 的实现。​​ServerRSocketConnector​​​​ClientRSocketConnector​​​​AbstractRSocketConnector​

根据提供的用于接受来自客户端的连接,在主机和端口上公开侦听器。 可以使用 以及可配置的其他选项自定义内部实例,例如 以及有效负载数据和标头元数据。 当客户端请求者提供 时(见下文),连接的客户端将存储为 下由 确定的密钥下的 。 默认情况下,连接数据用于键,作为转换值转换为具有 UTF-8 字符集的字符串。 此类注册表可以在应用程序逻辑中使用,以确定特定的客户端连接以便与其交互,或将同一消息发布到所有连接的客户端。 从客户端建立连接时,将从 发出 。 这类似于 Spring 消息传递模块中的注释所提供的内容。 映射模式意味着接受所有客户端路由。 可用于通过标头区分不同的路由。​​ServerRSocketConnector​​​​io.rsocket.transport.ServerTransport​​​​RSocketServer​​​​setServerConfigurer()​​​​RSocketStrategies​​​​MimeType​​​​setupRoute​​​​ClientRSocketConnector​​​​RSocketRequester​​​​clientRSocketKeyStrategy​​​​BiFunction<Map<String, Object>, DataBuffer, Object>​​​​RSocketRequester​​​​RSocketConnectedEvent​​​​ServerRSocketConnector​​​​@ConnectMapping​​​​*​​​​RSocketConnectedEvent​​​​DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER​

典型的服务器配置可能如下所示:

@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}

@Bean
public ServerRSocketConnector serverRSocketConnector() {
ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
+ headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
return serverRSocketConnector;
}

@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
...
}

所有选项(包括 bean 和 for )都是可选的。 有关更多信息,请参阅 JavaDocs。​​RSocketStrategies​​​​@EventListener​​​​RSocketConnectedEvent​​​​ServerRSocketConnector​

从版本 5.2.1 开始,将提取到公共的顶级类中,以便可能与现有 RSocket 服务器连接。 当 提供 的外部实例时,它不会在内部创建 RSocket 服务器,而只是将所有处理逻辑委托给提供的实例。 此外,还可以配置一个标志来处理RSocket控制器,完全取代标准提供的功能。 这在混合配置中非常有用,当经典方法与 RSocket 通道适配器一起存在于同一应用程序中,并且应用程序中存在外部配置的 RSocket 服务器时。​​ServerRSocketMessageHandler​​​​ServerRSocketConnector​​​​ServerRSocketMessageHandler​​​​ServerRSocketMessageHandler​​​​messageMappingCompatible​​​​@MessageMapping​​​​RSocketMessageHandler​​​​@MessageMapping​

作为支架,用于基于通过提供的 . 可以使用提供的 . (使用可选模板变量)和元数据也可以在此组件上配置。​​ClientRSocketConnector​​​​RSocketRequester​​​​RSocket​​​​ClientTransport​​​​RSocketConnector​​​​RSocketConnectorConfigurer​​​​setupRoute​​​​setupData​

典型的客户端配置可能如下所示:

@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}

@Bean
public ClientRSocketConnector clientRSocketConnector() {
ClientRSocketConnector clientRSocketConnector =
new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
clientRSocketConnector.setSetupRoute("clientConnect/{user}");
clientRSocketConnector.setSetupRouteVariables("myUser");
return clientRSocketConnector;
}

这些选项中的大多数(包括 bean)都是可选的。 请注意我们如何在任意端口上连接到本地启动的 RSocket 服务器。 有关使用案例,请参阅。 另请参阅及其超类 JavaDocs 了解更多信息。​​RSocketStrategies​​​​ServerRSocketConnector.clientRSocketKeyStrategy​​​​setupData​​​​ClientRSocketConnector​​​​AbstractRSocketConnector​

两者都负责将入站通道适配器映射到其配置,以路由传入的 RSocket 请求。 有关详细信息,请参阅下一节。​​ClientRSocketConnector​​​​ServerRSocketConnector​​​​path​

RSocket 入站网关

负责接收 RSocket 请求并生成响应(如果有)。 它需要一个映射数组,这些映射可以是类似于 MVC 请求映射或语义的模式。 此外,(从版本 5.2.2 开始),可以在 上配置一组交互模型(请参阅),以按特定帧类型限制对此端点的 RSocket 请求。 默认情况下,支持所有交互模型。 这样的 bean,根据它的实现(扩展 ),由 或 自动检测内部传入请求的路由逻辑。 可以为显式终结点注册提供 。 这样,自动检测选项就会被禁用。 也可以注入到或者它们从所提供的覆盖任何显式注入中获得。 解码器用于根据提供的解码请求有效负载。 如果在传入 中未提供标头,则会将请求视为 RSocket 交互模型。 在这种情况下,对 执行普通操作。 否则,标头中的值用于向 RSocket 发送回复。 为此,对 执行操作。 根据逻辑,要向下游发送的消息始终是 a。 在 RSocket 交互模型中,消息具有纯转换的 . 回复可以是普通对象或 - 根据 中提供的编码器将它们正确转换为 RSocket 响应。​​RSocketInboundGateway​​​​path​​​​@MessageMapping​​​​RSocketInteractionModel​​​​RSocketInboundGateway​​​​IntegrationRSocketEndpoint​​​​ReactiveMessageHandler​​​​ServerRSocketConnector​​​​ClientRSocketConnector​​​​IntegrationRSocketMessageHandler​​​​AbstractRSocketConnector​​​​RSocketInboundGateway​​​​AbstractRSocketConnector​​​​RSocketStrategies​​​​RSocketInboundGateway​​​​AbstractRSocketConnector​​​​RSocketStrategies​​​​requestElementType​​​​RSocketPayloadReturnValueHandler.RESPONSE_HEADER​​​​Message​​​​RSocketInboundGateway​​​​fireAndForget​​​​RSocketInboundGateway​​​​send​​​​outputChannel​​​​MonoProcessor​​​​RSocketPayloadReturnValueHandler.RESPONSE_HEADER​​​​RSocketInboundGateway​​​​sendAndReceiveMessageReactive​​​​outputChannel​​​​payload​​​​Flux​​​​MessagingRSocket​​​​fireAndForget​​​​payload​​​​payload​​​​Publisher​​​​RSocketInboundGateway​​​​RSocketStrategies​

从版本 5.3 开始,将向 中添加一个选项(默认 )。 默认情况下,传入的转换方式是每个事件的单独解码方式。 这是语义当前存在的确切行为。 要恢复以前的行为或根据应用程序要求将整体解码为单个单元,必须设置为 。 但是,目标解码逻辑取决于所选的逻辑,例如,a要求流中存在新的行分隔符(默认情况下)以指示字节缓冲区结束。​​decodeFluxAsUnit​​​​false​​​​RSocketInboundGateway​​​​Flux​​​​@MessageMapping​​​​Flux​​​​decodeFluxAsUnit​​​​true​​​​Decoder​​​​StringDecoder​

有关如何配置端点和处理下游有效负载的示例,请参阅使用 Java 配置 RSocket 端点。​​RSocketInboundGateway​

RSocket 出站网关

这是一个,用于对 RSocket 执行请求并根据 RSocket 回复(如果有)生成回复。 低级别的 RSocket 协议交互被委托给服务器端请求消息中提供的解析或标头。 服务器端的目标可以通过 或使用 API 根据为连接请求映射选择的某些业务密钥进行解析。 有关更多信息,请参阅 JavaDocs。​​RSocketOutboundGateway​​​​AbstractReplyProducingMessageHandler​​​​RSocketRequester​​​​ClientRSocketConnector​​​​RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER​​​​RSocketRequester​​​​RSocketConnectedEvent​​​​ServerRSocketConnector.getClientRSocketRequester()​​​​ServerRSocketConnector.setClientRSocketKeyStrategy()​​​​ServerRSocketConnector​

发送请求必须显式配置(与路径变量一起)或通过根据请求消息评估的 SpEL 表达式。​​route​

RSocket 交互模型可以通过选项或相应的表达式设置提供。 默认情况下,a 用于常见的网关用例。​​RSocketInteractionModel​​​​requestResponse​

当请求消息有效负载为 时,可以提供根据目标中提供的对其元素进行编码的选项。 此选项的表达式的计算结果为 . 有关数据及其类型的更多信息,请参阅 JavaDocs。​​Publisher​​​​publisherElementType​​​​RSocketStrategies​​​​RSocketRequester​​​​ParameterizedTypeReference​​​​RSocketRequester.RequestSpec.data()​

RSocket 请求也可以使用 . 为此,可以在 上配置反对请求消息。 此类表达式的计算结果必须为 .​​metadata​​​​metadataExpression​​​​RSocketOutboundGateway​​​​Map<Object, MimeType>​

当不是时,必须提供。 默认情况下,它是一个。 此选项的表达式的计算结果为 . 有关回复数据及其类型的更多信息,请参阅 和 JavaDocs。​​interactionModel​​​​fireAndForget​​​​expectedResponseType​​​​String.class​​​​ParameterizedTypeReference​​​​RSocketRequester.RetrieveSpec.retrieveMono()​​​​RSocketRequester.RetrieveSpec.retrieveFlux()​

来自 的回复是一个(即使对于交互模型也是如此)总是使这个组件为 . 这样的 a 在生产之前订阅到常规频道或由 . 或交互模型的响应也包装到回复中。 它可以通过使用直通服务激活器在下游展平:​​payload​​​​RSocketOutboundGateway​​​​Mono​​​​fireAndForget​​​​Mono<Void>​​​​async​​​​Mono​​​​outputChannel​​​​FluxMessageChannel​​​​Flux​​​​requestStream​​​​requestChannel​​​​Mono​​​​FluxMessageChannel​

@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
return payload;
}

或者在目标应用程序逻辑中显式订阅。

还可以配置(或通过表达式评估)预期的响应类型,以将此网关视为出站通道适配器。 但是,仍然必须配置(即使它只是一个)来启动对返回的订阅。​​void​​​​outputChannel​​​​NullChannel​​​​Mono​

有关如何配置端点以处理下游有效负载的示例,请参阅使用 Java 配置 RSocket 端点。​​RSocketOutboundGateway​

RSocket 命名空间支持

Spring 集成提供了一个命名空间和相应的模式定义。 若要将其包含在配置中,请在应用程序上下文配置文件中添加以下命名空间声明:​​rsocket​

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/rsocket
https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
...
</beans>

入境

要使用 XML 配置 Spring Integration RSocket 入站通道适配器,您需要使用命名空间中的相应组件。 以下示例演示如何配置它:​​inbound-gateway​​​​int-rsocket​

<int-rsocket:inbound-gateway id="inboundGateway"
path="testPath"
interaction-models="requestStream,requestChannel"
rsocket-connector="clientRSocketConnector"
request-channel="requestChannel"
rsocket-strategies="rsocketStrategies"
request-element-type="byte[]"/>

A 和应配置为通用定义。​​ClientRSocketConnector​​​​ServerRSocketConnector​​​​<bean>​

出境

<int-rsocket:outbound-gateway id="outboundGateway"
client-rsocket-connector="clientRSocketConnector"
auto-startup="false"
interaction-model="fireAndForget"
route-expression="'testRoute'"
request-channel="requestChannel"
publisher-element-type="byte[]"
expected-response-type="java.util.Date"
metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>

有关所有这些 XML 属性的说明,请参阅。​​spring-integration-rsocket.xsd​

使用 Java 配置 RSocket 端点

以下示例演示如何使用 Java 配置 RSocket 入站终结点:

@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
return rsocketInboundGateway;
}

@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
return payload.next().map(String::toUpperCase);
}

在此配置中假定为 OR,具有自动检测“echo”路径上此类端点的含义。 注意签名,因为它完全反应性地处理 RSocket 请求并生成反应性回复。​​ClientRSocketConnector​​​​ServerRSocketConnector​​​​@Transformer​

以下示例显示如何使用 Java DSL 配置 RSocket 入站网关:

@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
return IntegrationFlow
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel))
.<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
.get();
}

在此配置中假定为 OR,其含义是自动检测“/大写”路径上的此类端点,并将预期的交互模型作为“请求通道”。​​ClientRSocketConnector​​​​ServerRSocketConnector​

以下示例演示如何使用 Java 配置 RSocket 出站网关:

@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
RSocketOutboundGateway rsocketOutboundGateway =
new RSocketOutboundGateway(
new FunctionExpression<Message<?>>((m) ->
m.getHeaders().get("route_header")));
rsocketOutboundGateway.setInteractionModelExpression(
new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
return rsocketOutboundGateway;
}

只有客户端才需要。 在服务器端,必须在请求消息中提供带有值的标头。​​setClientRSocketConnector()​​​​RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER​​​​RSocketRequester​

以下示例显示如何使用 Java DSL 配置 RSocket 出站网关:

@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlow
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.interactionModel(RSocketInteractionModel.requestResponse)
.expectedResponseType(String.class)
.clientRSocketConnector(clientRSocketConnector))
.get();
}

请参阅集成流作为网关,了解有关如何使用上述流程开头提到的接口的更多信息。​​Function​

标签:ServerRSocketConnector,Spring,配置,ClientRSocketConnector,Integration,请参阅,rsocket,
From: https://blog.51cto.com/u_15326439/5933774

相关文章

  • SpringBootApplication
    packagecom.example.demo;importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;/** *项......
  • 【java-01】springboot利用sharding jdbc实现读写分离
    写在开头打算把自己的java后端学习过程分享给大家,也方便之后自己回顾。从这里开始~目前在学习黑马的瑞吉外卖新手入门项目,这篇随笔记录的是项目优化之一读写分离先列出......
  • 深入理解 Spring 事务:入门、使用、原理
    大家好,我是树哥。Spring事务是复杂一致性业务必备的知识点,掌握好Spring事务可以让我们写出更好地代码。这篇文章我们将介绍Spring事务的诞生背景,从而让我们可以更清晰......
  • Spring框架之控制反转IoC(Inversion of Control)的理解
    简单理解:控制反转就是将代码的调用权(控制权)从调用方转移给被调用方(服务提供方)。解释一下:如果我们需要创建某个类,就需要程序员去修改代码,然后才可以得到想要的类。反转的意思......
  • java.lang.ClassNotFoundException: SpringSkinFactory抽象工厂模式范例报错
    示例代码:packageAbstractFactory.utils;importjavax.xml.parsers.*;importorg.w3c.dom.*;importjava.io.*;publicclassXMLUtil{//该方法用于从XML配......
  • Spring 自定义propertyEditor
    Userpackagecom.example.zylspringboot.selfEditor;publicclassUser{privateStringname;privateAddressaddress;privateIntegerage;......
  • 【Emoji】Spring 通过注解 处理 转换 Emoji
      依赖<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>4.5.6</version></dependency><dependency>......
  • Spring Boot命令指定环境启动jar包
    原文地址:SpringBoot命令指定环境启动jar包-Stars-One的杂货小窝记下通过命令行的方式去改变springboot项目中的环境配置信息命令项目中有以下配置application.yml......
  • spring boot 拦截器
    springboot使用拦截器1.创建拦截器类,继承HandlerInterceptor2.注册拦截器,指定拦截规则springframework中的拦截器类需要继承与HandlerInterceptor,springboot也是......
  • SpringBoot引入外部jar包,并打包到项目jar包中
    SpringBoot引入外部jar包,并打包到项目jar包中在网上下载需要的jar包,在项目中新建文件夹将jar包放进去通过File->ProjectStructure引入选中jar包加载成功后,ja......