上一篇博客从使用的角度了解了spring cloud stream。现在得知道为什么其可以生效。
我们针对分区做了下面的配置:
# cloud stream 相关
spring.cloud.stream.bindings.input.destination=streamfenqu
spring.cloud.stream.bindings.input.group=stream_receiver
## 消费组
spring.profiles.active=
## 分区
spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.instance-count=3
spring.cloud.stream.instance-index=
于是,在消息队列中,生成了三个队列和一个 exchange
不言而喻,可以得出上面的关系图
所以,如果我们需要同时搞两个消费组,且具有各自的分区,那么应该修改exchange的名字,来看看是哪里定义的streamfenqu:
spring.cloud.stream.bindings.input.destination=streamfenqu
这是指的通道的目的地,修改exchange只能在此处进行修改,如果现在我有这样一个需求,对于发到exchange:streamfenqu中的消息,我需要分别给两个消费组进行分区消费,那么通道绑定的目的地应该都为 streamfenqu才行,按照道理,这里的路由键由cloud stream生成就重复了。下面我新建一个路由组,并对其进行分区,看有什么结果。(注意这里采用了相同的通道)
使用下面的配置:
# cloud stream 相关
spring.cloud.stream.bindings.input.destination=streamfenqu
spring.cloud.stream.bindings.input.group=stream_receiver_2
## 消费组
spring.profiles.active=
## 分区
spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.instance-count=3
spring.cloud.stream.instance-index=
# 应用本身相关
spring.application.name=stream-receiver_2
打开实例
另外生成了几个队列
同时也添加了路由,来测试一下
消费组stream_receiver_2的分区验证截图:
消费组stream_receiver的分区验证截图
两个分区的消费组和分区功能都分别得到了有效验证