首页 > 其他分享 >kafk 消费者配置

kafk 消费者配置

时间:2023-08-23 11:36:19浏览次数:24  
标签:消费者 配置 kafka kafk 提交 setProperty org poll properties

一、客户端方式:

1、导入相关依赖

<!--pom 导入依赖,根据实际情况选择版本 -->
<dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>2.3.0</version>
</dependency>
    

2、代码实现

public class KafkaConfig {private static Properties getProperties(){
        Properties properties = new Properties();

        properties.setProperty("bootstrap.servers","localhost:9092");
        //消费者组
        properties.setProperty("group.id",GROUP_ID);
        //序列化
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //反序列化
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //安全协议,根据实际需求是否需要
        properties.setProperty("security.protocol","SASL_PLAINTEXT");
        //Sasl 机制,根据实际需求是否需要
        properties.setProperty("sasl.mechanism","SCRAM-SHA-256");
        //Jaas 配置
        properties.setProperty("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";");

        return properties;
    }

    public static void main(String[] args){
        Properties properties = getProperties();
        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);
     //订阅主题
        kafkaConsumer.subscribe(Arrays.asList("topicName"));

        while (true){
       //poll(),轮询拉取消息 ConsumerRecords<String, String> poll = kafkaConsumer.poll(Duration.ofMillis(1000)); System.out.println("======================================================="); for (ConsumerRecord data : poll) { System.out.println("=========="+data.value()+"=================");

         //业务数据处理 } System.out.println("======================================================="); } } }

 

二、使用@KafkaListener 注解监听

1、导入依赖

<!--根据实际情况选择版本-->
<dependency>   <groupId>org.springframework.kafka</groupId>   <artifactId>spring-kafka</artifactId>   <version>2.3.7.RELEASE</version> </dependency>

 

2、yml配置

spring:
  flyway:
    enabled: true
    baseline-on-migrate: true
    validate-on-migrate: false
    out-of-order: false
    clean-disabled: true
  kafka:
    bootstrap-servers: 127.0.0.1:9092
#properties 安全机制根据实际情况选择是否配置 properties: security: protocol: SASL_PLAINTEXT sasl: mechanism: SCRAM-SHA-256 jaas: config: org.apache.kafka.common.security.scram.ScramLoginModule required username="10322022155S866XTKPOH" password="FKWmtz15htCpsvV9"; consumer: # 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名 group-id: gp_103GkyHEKEP #key value 的反序列化 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest #auto-offset-reset: smallest #关闭自动提交 enable-auto-commit: false listener: # RECORD 当每一条记录被消费者监听器(ListenerConsumer)处理后提交 # BATCH 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理后提交 # TIME 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理后,距离上次提交时间大于TIME时提交 # COUNT 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理后,被处理record数量大于COUNT时提交 # COUNT_TIME TIME | COUMT 有一个条件满足时提交 # MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理后,手动调用 Acknowledgment.acknowledge()后提交 # MANUAL_IMMEDIATE 手动调用 Acknowledgment.acknowledge() 之后 立即提交 ack-mode: manual_immediate # 消费监听接口监听的主题不存在时,默认会报错 missing-topics-fatal: false

#kafka自定义消息发送配置 #autostartup 是否启动监听 kafka: autostartup: false groupId: groupId topicCharge: topicName

3、代码实现

@KafkaListener(topics = "${kafka.topicCharge}",groupId = "${kafka.groupId}",autoStartup="${kafka.autostartup}")
public void listenGroup(ConsumerRecord<String,String> record, Acknowledgment ack){
  System.out.println(record.value());
  //业务数据处理

  //提交消费   ack.acknowledge(); }

 

标签:消费者,配置,kafka,kafk,提交,setProperty,org,poll,properties
From: https://www.cnblogs.com/yangjcBlog/p/17650737.html

相关文章

  • Zookeeper对于Kafka的作用和意义
    Zookeeper在ApacheKafka中扮演着关键的角色,它提供了分布式协调和配置管理服务,对于Kafka集群的正常运行和高可用性至关重要。以下是具体介绍。配置管理Zookeeper负责存储和管理Kafka集群的配置信息,包括主题(topics)和分区(partitions)的分配、副本(replicas)的分布、消费者组(consumergro......
  • RuoYi-vue配置记录
    如果这个项目能顺利运行,标志着Springboot+vue的前后端环境都配好了。一、官方文档若依官方文档:介绍|RuoYi,在这个地方克隆/下载项目源代码https://gitee.com/y_project/RuoYi-Vue解压到自己的目录下 首先根据官方文档的环境部署所说,检查一下自己的这些是否都满足要求了:J......
  • 国标视频云服务EasyGBS国标平台进行内网映射两个公网设备配置的详细步骤
    国标视频云服务EasyGBS支持设备/平台通过国标GB28181协议注册接入,并能实现视频的实时监控直播、录像、检索与回看、语音对讲、云存储、告警、平台级联等功能。平台部署简单、可拓展性强,支持将接入的视频流进行全终端、全平台分发,分发的视频流包括RTSP、RTMP、FLV、HLS、WebRTC等格......
  • Ubuntu16.04+CUDA8.0+OpenCV3.1+python+caffe+faster-rcnn环境配置
    前言Ubuntu1604注意事项CUDA80安装显卡驱动安装CUDA80编译CUDASampleOpenCV31pythonCaffe安装CaffeMNIST数据集测试faster-rcnn后记前言经过大概两个星期的配置,终于将faster-rcnn安装好了,期间重装了大概十次系统,查阅了无数多文献博客,遇到了无数多坑。本人写这篇文章就是希望读者......
  • webman:配置端口/日志等(v1.5.7)
     一,文档地址:https://www.workerman.net/doc/webman/others/security.htmlhttps://www.workerman.net/doc/webman/request.htmlhttps://www.workerman.net/doc/webman/config.html说明:刘宏缔的架构森林—专注it技术的博客,网站:https://blog.imgtouch.com原文: https://b......
  • webman:配置异常处理返回json格式(v1.5.7)
    一,添加一个除0错的异常代码:页面显示效果如图:二,配置:php代码1,config/123456789101112131415161718<?php/** *Thisfileispartofwebman. * *LicensedunderTheMITLicense *Forfullcopyrightandlicenseinformation......
  • 如何配置VScode的C++环境
    你需要:VScodeMinGW没了安装VScode在VScode官网下载VScode。打开VScode,点击扩展,输入Chinese,下载Chinese(Simplified)(简体中文)LanguagePackforVisualStudioCode,安装中文包并重启。MinGW在MinGW官网下载MinGW。下载x86_64-posix-sjlj压缩包速度快,解压到......
  • 第九章 Nacos Config--服务配置
    9.1服务配置中心介绍首先我们来看一下,微服务架构下关于配置文件的一些问题:配置文件相对分散。在一个微服务架构下,配置文件会随着微服务的增多变的越来越多,而且分散在各个微服务中,不好统一配置和管理。配置文件无法区分环境。微服务项目可能会有多个环境,例如:测试环境、预发......
  • 在2021应该怎样配置 Favicon:用六个文件来适配大多数需求
    转载:HowtoFaviconin2023:Sixfilesthatfitmostneeds—MartianChronicles,EvilMartians’teamblog是时候重新思考如何为现代浏览器配置一套favicon并且阻止发疯的图标生成器。今天,仅仅只是为了在浏览器tab栏和触控屏上显示一个小小的网站logo,前端开发者就必须......
  • pyqt5 QtDesigner 和 PyUIC 的环境配置
    参考:https://zhuanlan.zhihu.com/p/4251489591.安装pyqt5 模块#pyqt5安装pip3installpyqt5-ihttps://mirrors.aliuyun.com/pypi/simple#QtDesignerpip3installpyqt5-tools-ihttps://mirrors.aliuyun.com/pypi/simple运行PyCharm;从顶部菜单栏选择:File-......