核心概念
4.1应用模型
一个spring cloud Stream 应用包括了一个消息中间件作为核心。某个应用通过springcloud Stream使用input和output通道与外界(注:消息队列)
进行消息交换。通道通过中间件专用的绑定机制连接到外部的虚拟主机(注:原文为brokers,这是消息队列里面的概念,可以理解成消息队里的虚拟主机)。
(注:图片中的MiddleWare就是指的消息中间件,Springcloud Stream Application就是使用SpringcloudStream组件的应用,
这两者通过Binder紧密的结合起来了,但是SpringCloud Stream Application中的应用想要通过Binder获取到中间件中的消息,
必须调用 Application Core中提供的API,而在架构上,Application Core是通过inputs和outputs通道撘成的桥和Binder进行
交互的)
4.1.1、FatJAR
Spring Cloud Stream应用可以在你的ide上单机运行。如果你需要在生产环境中运行spring cloud Stream项目,你可以创建一个可执行
的JAR通过maven或者Gradle。
4.2、抽象的Binder
Spring Cloud Stream 提供了Binder来实现和kafka或者RabbitMQ的交互(注:根据kafka和rabbitmq提供的服务来实现一定的功能,我
个人称之为交互)。同时,spring Cloud Stream 也提供了一个例子,https://github.com/spring-cloud/spring-cloud-stream/blob/master/spring-cloud-stream-test-support/src/main/java/org/springframework/cloud/stream/test/binder/TestSupportBinder.java 例子中定义了一个未被修改的通道使得测试通道之间可以直接
通过通道相互作用。你同样也可以使用扩展API去写你自己的Binder。
spring cloud Stream使用spring boot的自动配置进行配置,使得 Binder灵活的连接各种消息中间件成为可能。例如:
部署者可以在运行态下动态的选择哪个目标(例如kafka的topics或者 rabbitmq的exchange)作为最终通道连接的对象。
这种配置支持任意一种springboot的配置。例如在sink的例子 https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#spring-cloud-stream-overview-introducing 中,设置
spring.cloud.stream.bindings.input.destination=raw-sensor-data
会使得Binder从 名为 raw-sensor-data 的kafka topic 或者一个绑定在名为raw-sensor-data的RabbitMQ交换机的队列中
获取消息。
Springcloud Stream会在classpath下面自动寻找和使用一个找到的binder。你可以用相同的代码使用不同的消息中间件。如果要
这样做,需要在build项目的时候引入一个不同的binder。在更复杂的环境下,你可以为你的应用打包很多binder,在不同的环境下使用
不同的binder(甚至你可以为不同的Binder绑定不同的通道)
4.3、支持发布-订阅模式
消息数据通过在共享的主题中广播,两个应用之间的交互遵从发布-订阅模式。可以从下面这张展示部署在同一个topic下相互影响的spring
cloud Stream集群是如何实现消息的发布-订阅模式的。
传感器向http端口发送的消息数据被发送到一个名为raw-sensor-data的公共目标。这个,目标下订阅了两个微服务,一个十计算时间窗平均值的微服务,
另一个是将原始数据放入到HDFS(Hadoop分布式文件系统)的微服务应用。为了处理消息数据,两个应用程序都将该topic下的消息作为自己运行时的输入设备。
发布-订阅交流模型减少了消息生产者和消息消费者在加入新的拓扑节点时不改变当前已有的网络模型的复杂度。
4.4、消费者组
虽然消息 发布-订阅模式使得具有共享topics的应用间的交流变得简单,但是通过创建给定应用程序的多个实例来扩展的能力同样重要。
执行此操作时,应用程序的不同实例将放置在竞争的使用者关系中,其中只有一个实例来处理给定的消息。
Spring Cloud Stream通过消费者群体的概念对此行为进行建模。每一个消费者通过使用如下配置
spring.cloud.stream.bingings.<channelName>.group
的属性来指明特定的组名。例如,在下面的图片中,这个属性将被这样设置
# left
spring.cloud.stream.bindings.<channelName>.group=hdfsWrite
# right
spring.cloud.stream.bindings.<channelName>.group=average
订阅给定目标的所有组都会收到已发布数据的副本,但每个组中只有一个成员从该目标接收给定的消息,默认情况下,未指定组时,
Spring Cloud Stream将应用程序分配给与所有其他使用者组具有发布 - 订阅关系的匿名且独立的单成员使用者组。
4.5、消费者类型
有两种消费者类型:
消息驱动(异步)
轮询(同步)
在2.0版本以前只支持消息驱动的方式,消息一旦可用就会传递,并且可以使用一个线程来处理它
如果要控制处理消息的速率,可能需要使用同步使用者
4.5.1、持久性
消费者组订阅是持久的,与Spring Cloud Stream的固定应用模型一致。也就是说,绑定器实现确保组订阅是持久的,一旦
创建了一个组的至少一个订阅,该组就会接收消息,即使它们是在组中的所有应用程序都已停止时发送的。
通常,在将应用程序绑定到给定目标时,最好始终指定使用者组。扩展Spring Cloud Stream应用程序时,必须为每个输入
绑定指定一个使用者组。这样做可以防止应用程序的实例接收重复的消息
4.6、分区支持
Spring Cloud Stream支持在给定应用程序的多个实例之间对数据进行分区。在分区方案中,物理通信介质(例如代理主题)
被视为结构化为多个分区。一个或多个生产者应用程序实例将数据发送到多个消费者应用程序实例,并确保由共同特征标识的
数据由同一个消费者实例处理。
Spring Cloud Stream提供了一种通用抽象,用于以统一的方式实现分区处理用例,因此,无论消息中间件本身具有自然分
区(例如,Kafka)或者没有(例如,RabbitMQ),都可以使用分区
分区是有状态处理中的一个关键概念,由于性能或一致性原因,确保所有相关数据一起处理至关重要。或者,在时间窗口平均
值计算示例中,重要的是来自任何给定传感器的所有测量值都由相同的应用程序实例处理。