首页 > 其他分享 >使用spring.cloud.stream来发送kafka消息,并根据某字段将消息发送到固定partition上

使用spring.cloud.stream来发送kafka消息,并根据某字段将消息发送到固定partition上

时间:2024-05-10 13:22:05浏览次数:26  
标签:某字 stream spring partition kafka 消息 cloud

1、问题:

在进行功能开发的时候遇到一个需求,具体需求如下:

  在某个服务A中接收到消息,对消息体进行校验,判断消息体中的数据是否需要产生告警,若产生告警,则将告警信息发送到kafka中,由另一个服务B接收到消息并记录到mongo中;

  当A服务在此接收到消息,发现以前的某个告警已经恢复,则再次发送消息到kafka中,B服务接收到消息后对原来的告警进行状态恢复;

  而假如在进行告警状态更新时,告警的消息发到了某个partition中,而恢复的消息发送到了另一个partition中,中间间隔时间非常短,恢复的消息先被消费,就会导致出现问题,告警应该是恢复的,但是确没有恢复,因为原纪录都还没插入数据库;

  所以必须要将相同的告警或恢复对象发送到同一个partition中,下游在消费的时候,默认情况下同一个customer会消费固定的partition中的消息,所以这样就能保证消息的顺序消费。

2、解决过程:

1)、在A和B服务中都是使用的spring.cloud.stream来进行kafka消息的发送和接收,版本是3.1.2,而根据官方的方式,我使用了如下配置进行指定partition:

# 配置获取 Message 对象的哪个字段作为分区 key,如根据实例里边的 id 作为 key 则写成 payload.entityId
spring.cloud.stream.bindings.<channelName>.producer.partitionKeyExpression=payload.entityId

 

代码中发送的代码写成如下:

Message<?> payload = MessageBuilder.withPayload(entity).build();
        this.streamBridge.send(
                "ASSET_ALARM_RECORD_BINDING_NAME",
                payload
        );

2)、但是在进行测试时,又一直提示我报错 SpelEvaluationException: EL1008E,后来查找原因发现是在并发量很大的情况下,不能使用payload来进行分区,推荐使用headers,具体可以查看:https://github.com/spring-cloud/spring-cloud-stream/issues/2213 , 所以修改为如下:

# 配置获取 Message 对象的哪个字段作为分区 key,如根据实例里边的 id 作为 key 则写成headers['entityId']
spring.cloud.stream.bindings.<channelName>.producer.partitionKeyExpression=headers['entityId']
spring.cloud.stream.bindings.<channelName>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer

代码中发送的代码写成如下:

Message<?> payload = MessageBuilder.withPayload(entity).setHeader("partition", entity.getEntityId()).build();
this.streamBridge.send( "ASSET_ALARM_RECORD_BINDING_NAME", payload );

3)、但是还是会报错,我在A服务中要发送两种消息到kafka的不同topic上,其中一个进行了上述配置,另外一个不需要,但是发现另外一个报了空指针异常,提示我在发送消息的时候从headers中找不到partition,这就很奇怪了,然后从github上找到了答案:https://github.com/spring-cloud/spring-cloud-stream/issues/2249 ,从其回答中可以看出,作者在3.2.2以后修复了这个问题,所以我将spring.cloud.stream 的版本改为3.2.10,再进行测试,发现已经可以正常的发送消息,并且消息也能根据entityId发送到指定的partition上。

 

标签:某字,stream,spring,partition,kafka,消息,cloud
From: https://www.cnblogs.com/Silentness/p/18183894

相关文章

  • SpringBoot中全部注解归纳解释
    https://blog.csdn.net/weixin_55772633/article/details/131882825https://www.cnblogs.com/jingzh/p/14620211.html1springboot注解1.1引言1.2基本注解1.3JPA注解1.4SpringMVC相关注解1.5全局异常处理1.6项目中具体配置解析和使用环境1.7Lombok注解1.8数......
  • SpringBoot+使用过滤器链执行风控决策
    风控流程下单前进行风控校验//1.begin---风控处理---前置处理{黑白名单校验}RiskControlRuleEnumcontrolRuleEnum=riskControlHandlerService.preHandle(mappingObj.getMerchantGoodsType(),thatUser);if(controlRuleEnum!=null){log......
  • Spring 中 bean 的循环依赖
    什么是循环依赖A直接或间接依赖B的同时B又间接或直接依赖A,此时我们可以称A和B之间存在循环依赖关系。在使用Spring的过程中应该尽量避免循环引用关系的出现。生命周期简述在阅读下面的样例之前,需要先了解一下Spring中bean的生命周期,简单来说bean的生命周期分......
  • Springboot项目镜像制作&传递环境变量、设置hostname、动态设置JVM参数、cmd&entrypoi
    实现制作一个springboot的镜像,并且可以传递环境变量实现动态JVM参数和端口。0.准备&cmd、entrypoint区别1.准备springboot项目一个简单的springboot项目,默认启动8001端口,里面只有一个接口。xxx%curllocalhost:8081indexdocker环境2.CMD、entrypoint区......
  • springboot3.2.3如何集成swagger
    在SpringBoot中集成Swagger可以方便地生成API文档并进行接口测试。要在SpringBoot3.2.3中集成Swagger,你可以按照以下步骤进行操作:1.添加Swagger依赖到pom.xml文件中:点击查看代码<dependency><groupId>io.springfox</groupId><artifactId>springfox-boot-starter<......
  • openfeign接口Springboot启动Bean报错未找到Singleton bean creation not allowed whi
    检查步骤检查springboot启动类是否标注@EnableFeignClients注解,未标注该注解会导致无法注入bean检查远程调用模块是否标注注解@FeignClient检查@FeignClient注解中是否写了正确的微服务名称(区分大小写)检查@FeignClient注解中标识的微服务是否启动​​原因:此处接......
  • springboot+vue项目
    1MyBatisPlus的分页插件是怎么生效的?体现在哪里?PaginationInnerInterceptor是通过拦截数据库操作来实现分页功能的。 MyBatisPlus的分页插件PaginationInnerInterceptor是通过拦截数据库操作来实现分页功能的。它的工作原理如下:配置分页插件:在你的SpringBoot应用......
  • SpringBoot随手笔记
    SpringBoot随手笔记0关于火狐浏览器什么时候会发出http请求的说明在抓包的情况下(按下F12后的模式),不管是刷新页面还是在浏览器地址栏回车,该页面中的图片都会发出http请求;但如果不是抓包的模式下,如果访问的页面和上一次访问的页面相同(地址栏的地址没有更改),不管是刷新页面还......
  • springboot seata 全局捕获异常失效
    问题:Springboot使用@ControllerAdvice或@RestControllerAdvice全局捕获异常时,捕获不到自己抛出的相应异常首先看一下全局异常组件有么有被扫描到如何查看,很简单只需要写一段类加载打印代码,如下 如果启动时,打印了你写的字符串就说明时烧苗到了 这就说明是其他的问题了,那就......
  • Springboot项目的jar包的运行方式以及使用yum安装java后忘记了位置
    SpringBoot项目打包后的jar的部署方式这里我写了五种部署方式1.直接启动java-jarxxx.jar这种方式就只适合自己在测试时用一下,关闭会话就能停止运行属实是方便。2.后台启动java-jarxxx.jar&在后台静默启动,同样关闭会话也会停止,优点是和上面一样,日志是打印在窗口的3......