1、问题:
在进行功能开发的时候遇到一个需求,具体需求如下:
在某个服务A中接收到消息,对消息体进行校验,判断消息体中的数据是否需要产生告警,若产生告警,则将告警信息发送到kafka中,由另一个服务B接收到消息并记录到mongo中;
当A服务在此接收到消息,发现以前的某个告警已经恢复,则再次发送消息到kafka中,B服务接收到消息后对原来的告警进行状态恢复;
而假如在进行告警状态更新时,告警的消息发到了某个partition中,而恢复的消息发送到了另一个partition中,中间间隔时间非常短,恢复的消息先被消费,就会导致出现问题,告警应该是恢复的,但是确没有恢复,因为原纪录都还没插入数据库;
所以必须要将相同的告警或恢复对象发送到同一个partition中,下游在消费的时候,默认情况下同一个customer会消费固定的partition中的消息,所以这样就能保证消息的顺序消费。
2、解决过程:
1)、在A和B服务中都是使用的spring.cloud.stream来进行kafka消息的发送和接收,版本是3.1.2,而根据官方的方式,我使用了如下配置进行指定partition:
# 配置获取 Message 对象的哪个字段作为分区 key,如根据实例里边的 id 作为 key 则写成 payload.entityId spring.cloud.stream.bindings.<channelName>.producer.partitionKeyExpression=payload.entityId
代码中发送的代码写成如下:
Message<?> payload = MessageBuilder.withPayload(entity).build(); this.streamBridge.send( "ASSET_ALARM_RECORD_BINDING_NAME", payload );
2)、但是在进行测试时,又一直提示我报错 SpelEvaluationException: EL1008E,后来查找原因发现是在并发量很大的情况下,不能使用payload来进行分区,推荐使用headers,具体可以查看:https://github.com/spring-cloud/spring-cloud-stream/issues/2213 , 所以修改为如下:
# 配置获取 Message 对象的哪个字段作为分区 key,如根据实例里边的 id 作为 key 则写成headers['entityId']
spring.cloud.stream.bindings.<channelName>.producer.partitionKeyExpression=headers['entityId']
spring.cloud.stream.bindings.<channelName>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
代码中发送的代码写成如下:
Message<?> payload = MessageBuilder.withPayload(entity).setHeader("partition", entity.getEntityId()).build(); this.streamBridge.send( "ASSET_ALARM_RECORD_BINDING_NAME", payload );
3)、但是还是会报错,我在A服务中要发送两种消息到kafka的不同topic上,其中一个进行了上述配置,另外一个不需要,但是发现另外一个报了空指针异常,提示我在发送消息的时候从headers中找不到partition,这就很奇怪了,然后从github上找到了答案:https://github.com/spring-cloud/spring-cloud-stream/issues/2249 ,从其回答中可以看出,作者在3.2.2以后修复了这个问题,所以我将spring.cloud.stream 的版本改为3.2.10,再进行测试,发现已经可以正常的发送消息,并且消息也能根据entityId发送到指定的partition上。
标签:某字,stream,spring,partition,kafka,消息,cloud From: https://www.cnblogs.com/Silentness/p/18183894