首页 > 其他分享 >Spring家族中的消息通信解决方案

Spring家族中的消息通信解决方案

时间:2024-06-05 14:58:56浏览次数:16  
标签:Stream Spring Integration 家族 消息 解决方案 Messaging Cloud

相信大家对消息通信架构以及各种消息中间件应该都不陌生。在分布式系统的设计和开发过程中,消息通信是用于实现系统解耦、提高扩展性的一大技术体系。而业界关于如何实现消息通信系统也有很多解决方案和对应的开发框架。不知道你有没有发现,在我们每天都在使用到Spring框架中,实际上也包含了一套完整而强大的消息通信机制,被广泛的应用在Spring家族的各个框架中,你可能在不经意中已经在使用这套机制了。今天的内容将围绕这点展开讨论,让你在应用这些框架时知其然更知其所以然。

在Spring家族中,与消息通信机制相关的框架有三个,分别是Spring Messaging、Spring Integration和Spring Cloud Steam。事实上,Spring Cloud中的Spring Cloud Stream是基于Spring Integration实现了消息发布和消费机制并提供了一层封装,很多关于消息发布和消费的概念和实现方法本质上都是依赖于Spring Integration。而在Spring Integration的背后,则依赖于Spring Messaging组件来实现消息处理机制的基础设施。这三个框架之间的依赖关系如下图所示。


接下来的内容,我们先来对位于底层的Spring Messaging和Spring Integration框架做一些展开,方便你在使用Spring Cloud Stream时对其背后的实现原理有更好的理解。

Spring Messaging

Spring Messaging是Spring基础框架中的一个底层模块,用于提供统一的消息编程模型。例如,消息这个数据单元在Spring Messaging中统一定义Message接口,包括一个消息头Header和一个消息体Payload。

public interface Message<T> {

T getPayload();

MessageHeaders getHeaders();

}

而消息通道MessageChannel的定义也比较简单。

public interface MessageChannel {

long INDEFINITE_TIMEOUT = -1;

default boolean send(Message<?> message) {

return send(message, INDEFINITE_TIMEOUT);

}

boolean send(Message<?> message, long timeout);

}

可以看到,我们可以调用MessageChannel的send方法将消息发送至该消息通道中。

消息通道的概念比较抽象,可以简单把它理解为是对队列的一种抽象。通道的名称对应的就是队列的名称,但是作为一种抽象和封装,各个消息通信系统所特有的队列概念并不会直接暴露在业务代码中,而是通过通道来对队列进行配置。下图展示了这层抽象关系。


Spring Messaging把通道抽象成两种基本表现形式,即支持轮询的PollableChannel和实现发布-订阅模式的SubscribableChannel,这两个通道都继承自具有消息发送功能的MessageChannel。

public interface PollableChannel extends MessageChannel {

    Message<?> receive();

    Message<?> receive(long timeout);

}

public interface SubscribableChannel extends MessageChannel {

    boolean subscribe(MessageHandler handler);

    boolean unsubscribe(MessageHandler handler);

}

我们注意到对于PollableChannel而言才有receive的概念,代表这是通过轮询操作主动获取消息的过程。而SubscribableChannel则是通过注册回调函数MessageHandler来实现事件响应。MessageHandler接口定义如下。

public interface MessageHandler {

void handleMessage(Message<?> message) throws MessagingException;

}

通过MessageHandler,我们就可以实现对消息的异步消费。

Spring Integration

Spring Integration是对Spring Messaging的扩展。在Spring Messaging的基础上,Spring Integration还实现了其他几种有用的通道。


这里列举了支持阻塞式队列的RendezvousChannel,该通道与带缓存的QueueChannel都属于点对点通道,但只有在前一个消息被消费之后才能发送下一个消息。PriorityChannel即优先级队列。而DirectChannel是Spring Integration的默认通道,该通道的消息发送和接收过程处于同一线程中。另外还有ExecutorChannel,使用基于多线程的TaskExecutor来异步消费通道中的消息。

Spring Integration的设计目的是为了系统集成,因此内部提供了大量的集成化端点方便应用程序直接使用。当各个异构系统之间进行集成时,如何屏蔽各种技术体系所带来的差异性,Spring Integration为我们提供了即插即用的解决方案。Spring Integration提供的常见集成端点包括HTTP、JDBC、JMS、AMQP、JPA、Mail、MongoDB、Redis等。

Spring Cloud Stream

Spring Cloud Stream是Spring Integration的一种增强,同时与Spring Boot体系进行了融合,也是Spring Cloud家族中专门用来实现微服务环境下消息通信的核心框架。我们先来看一下Spring Cloud Stream中与消息通信相关的内容。

Spring Cloud Stream中的通道

结合前面关于Spring Messaging和Spring Integration中各种概念的描述,我们就不难理解Spring Cloud Stream中关于Source和Sink的定义。Source和Sink都是接口,其中Source接口的定义是这样的。

public interface Source {

String OUTPUT = "output";

@Output(Source.OUTPUT)

MessageChannel output();

}

注意到这里通过Spring Messaging提供的MessageChannel来发送消息,而MessageChannel类来自于Spring Messaging组件。我们在MessageChannel上添加了一个@Output注解,该注解定义了一个输出通道。

类似的,Sink接口定义如下所示。

public interface Sink{

String INPUT = "input";

@Input(Source.INPUT)

SubscribableChannel input();

}

同样,这里通过Spring Messaging中的SubscribableChannel来实现消息接收,而@Input注解定义了一个输入通道。

注意到@Input和@Output注解使用通道名称作为参数,如果没有名称,会使用带注解的方法名字作为参数,也就是默认情况下分别使用“input”和“output”作为通道名称。从这个角度讲,一个Spring Cloud Stream应用程序中的Input和Output通道数量和名称都是可以任意设置的,我们只需要在这些通道的定义上添加@Input和@Output注解即可,这里也给出相应的示例代码。

public interface CustomChannels{

    @Output

    MessageChannel output1();

    @Input

    SubscribableChannel input1();

    

@Input

    SubscribableChannel input2();

}

可以看到,我们在这里定义了一个CustomChannels接口并声明了一个Output通道和两个Input通道,说明使用该通道的服务会从外部的两个通道中获取消息并向外部的一个通道发送消息。注意到上述接口同时使用到了Spring Messaging中的SubscribableChannel和MessageChannel。Spring Cloud Stream对Spring Messaging和Spring Integration提供了原生支持,我们可以直接使用这两个框架所提供的API直接操作消息发布和接收的过程。但在多数场景下,我们不需要依赖这种方式就能完成常见的开发需求。

Spring Cloud Stream整体架构

介绍完Spring Cloud Stream所提供的通道之后,我们来进一步分析它的整体架构。Spring Cloud Stream对整个消息发布和消费过程做了高度抽象,并提供了一系列核心组件。区别于直接使用RabbitMQ、Kafka等消息中间件,Spring Cloud Stream为开发人员提供了一套统一的API。相当于Spring Cloud Stream在消息生产者和消费者之间添加了一种桥梁机制,这种桥梁机制屏蔽了各种消息中间件之间的差异。


在上图中,我们不难看出Spring Cloud Stream具备四个核心组件,分别是Binder、Channel、Source和Sink,其中Binder和Channel成对出现,而Source和Sink分别面向消息的发布者和消费者。我们已经理解了Source、Sink和Channel的概念,这里重点对Binder进行展开。

Spring Cloud Stream中最重要的概念就是Binder。所谓Binder,顾名思义就是一种黏合剂,将业务服务与消息通信系统黏合在一起。通过Binder,我们可以很方便的连接消息中间件,可以动态的改变消息的目标地址、发送方式而不需要了解其背后的各种消息中间件在实现上的差异。到目前为止,Spring Cloud Stream通过Binder组件分别完成了对RabbitMQ以及Kafka的集成,我们可以基于Spring Cloud Stream所提供的统一的API来使用这两款消息中间件,而不需要具体处理它们在消息发布和消费上的差异。这点显著提高了广大开发人员的开发效率。

总结

我们知道软件设计和实现过程都应该采用分层的思想,而Spring框架针对消息通信的解决方案也采用了同样的设计思想。我们发现在Spring中,用于实现消息通信的组件和框架并不是只有一个,而是有三个。从提供消息、通道等基础概念的Spring Messaging开始,到用于构建系统集成总线的Spring Integration,最后才是具备平台型消息通信能力的Spring Cloud Stream。这三个框架各自的定位,以及功能的演进过程值得我们深入进行分析。

标签:Stream,Spring,Integration,家族,消息,解决方案,Messaging,Cloud
From: https://blog.csdn.net/2401_83062316/article/details/139472522

相关文章

  • spring入门aop和ioc
    目录spring分层架构表现层服务层(业务层)持久层spring核心ioc(控制反转)1)接下来是代码示例:2)ioc容器的使用过程3)ioc中的bean管理4)实例化bean的三种方式aop(面向切面开发)定义优势AOP底层原理AOP相关的术语AOP入门aop注解开发aop纯注解开发Di(依赖注入)1)属性的set方法注入值的方式2)构造......
  • springMvc 配置 UReport2
    参考:https://blog.csdn.net/qq_42207808/article/details/112258835 1.配置pom.xml引入目前最新得2.2.9版本<dependency><groupId>com.bstek.ureport</groupId><artifactId>ureport2-console</artifactId&......
  • 基于SpringBoot的秒杀系统源码数据库
    基于SpringBoot的秒杀系统源码数据库社会发展日新月异,用计算机应用实现数据管理功能已经算是很完善的了,但是随着移动互联网的到来,处理信息不再受制于地理位置的限制,处理信息及时高效,备受人们的喜爱。本次开发一套基于SpringBoot的秒杀系统,管理员功能有个人中心,用户管理,商品类......
  • 基于springboot的二手车交易系统源码数据库
    基于springboot的二手车交易系统源码数据库如今社会上各行各业,都喜欢用自己行业的专属软件工作,互联网发展到这个时候,人们已经发现离不开了互联网。新技术的产生,往往能解决一些老技术的弊端问题。因为传统二手车交易信息管理难度大,容错率低,管理人员处理数据费工费时,所以专门为......
  • 基于springboot的纺织品企业财务管理系统源码数据库
    基于springboot的纺织品企业财务管理系统源码数据库在如今社会上,关于信息上面的处理,没有任何一个企业或者个人会忽视,如何让信息急速传递,并且归档储存查询,采用之前的纸张记录模式已经不符合当前使用要求了。所以,对纺织品企业财务信息管理的提升,也为了对纺织品企业财务信息进行......
  • 一个基于 React + SpringBoot 的在线多功能问卷系统(附源码)
    简介:一个基于React+SpringBoot的在线多功能问卷系统前端技术栈:React、React-Router、Webpack、Antd、Zustand、Echarts、DnDKit后端技术栈:SpringBoot、MySQL、MyBatisPlus、Redis项目源码下载链接: https://pan.quark.cn/s/2e32786e0c61部分页面静态预览: 主要前......
  • 商淘云助力连锁门店同城引流、会员营销及连锁品牌节税整体解决方案
    随着市场竞争的日益激烈,连锁门店如何在同城范围内迅速提升品牌知名度、吸引并维护忠诚顾客群体,同时确保税务合规以降低经营成本,成为了摆在连锁品牌面前的重要课题。商淘云,作为一家专注于智慧零售解决方案的科技企业,凭借其深厚的行业经验和先进的技术实力,为连锁门店提供了一套同......
  • 基于springboot实现疫情信息管理系统项目【项目源码+论文说明】计算机毕业设计
    基于springboot实现疫情信息管理系统演示摘要近年来,信息化管理行业的不断兴起,使得人们的日常生活越来越离不开计算机和互联网技术。首先,根据收集到的用户需求分析,对设计系统有一个初步的认识与了解,确定疫情信息管理系统的总体功能模块。然后,详细设计系统的主要功能模块,通......
  • 【git commit错误】error: bad signature 0x00000000 fatal: index file corrupt原因
    解决Git错误:error:badsignature0x00000000fatal:indexfilecorrupt原因分析及解决方案在使用Git进行版本控制时,可能会遇到各种错误。其中之一是关于索引文件(通常为.git/index)损坏的错误,这会导致无法正常提交更改。基础知识Git索引:Git使用一个索引文件来跟踪工作目......
  • 128springboot汽车租赁管理系统租车订单还车汽车资讯论坛管理(源码+文档+PPT+运行视频+
    项目技术:springboot+Maven+Vue等等组成,B/S模式+Maven管理等等。环境需要1.运行环境:最好是javajdk1.8,我们在这个平台上运行的。其他版本理论上也可以。2.IDE环境:IDEA,Eclipse,Myeclipse都可以。推荐IDEA;3.tomcat环境:Tomcat7.x,8.x,9.x版本均可4.硬件环境:windows......