首页 > 其他分享 >kafka消费者提交方式(代码演示)

kafka消费者提交方式(代码演示)

时间:2024-05-02 12:33:44浏览次数:22  
标签:演示 key value kafka record flag 提交 consumer properties

自动提交,手动提交(异步提交,同步提交,异步同步结合提交),原理:提交后,重新消费消息位移发生变化。

  1 public class MyConsumer {
  2 
  3     private static KafkaConsumer<String, String> consumer;
  4     private static Properties properties;
  5 
  6     static {
  7 
  8         properties = new Properties();
  9 
 10         properties.put("bootstrap.servers", "127.0.0.1:9092");
 11         properties.put("key.deserializer",
 12                 "org.apache.kafka.common.serialization.StringDeserializer");
 13         properties.put("value.deserializer",
 14                 "org.apache.kafka.common.serialization.StringDeserializer");
 15         properties.put("group.id", "KafkaStudy");
 16     }
 17 
 18     private static void generalConsumeMessageAutoCommit() {
 19 
 20         properties.put("enable.auto.commit", true);
 21         consumer = new KafkaConsumer<>(properties);
 22         consumer.subscribe(Collections.singleton("imooc-kafka-study-x"));
 23 
 24         try {
 25             while (true) {
 26 
 27                 boolean flag = true;
 28                 ConsumerRecords<String, String> records = consumer.poll(100);
 29 
 30                 for (ConsumerRecord<String, String> record : records) {
 31                     System.out.println(String.format(
 32                             "topic = %s, partition = %s, key = %s, value = %s",
 33                             record.topic(), record.partition(),
 34                             record.key(), record.value()
 35                     ));
 36                     if (record.value().equals("done")) {
 37                         flag = false;
 38                     }
 39                 }
 40 
 41                 if (!flag) {
 42                     break;
 43                 }
 44             }
 45         } finally {
 46             consumer.close();
 47         }
 48     }
 49 
 50     private static void generalConsumeMessageSyncCommit() {
 51 
 52         properties.put("auto.commit.offset", false);
 53         consumer = new KafkaConsumer<>(properties);
 54         consumer.subscribe(Collections.singletonList("imooc-kafka-study-x"));
 55 
 56         while (true) {
 57             boolean flag = true;
 58 
 59             ConsumerRecords<String, String> records =
 60                     consumer.poll(100);
 61             for (ConsumerRecord<String, String> record : records) {
 62                 System.out.println(String.format(
 63                         "topic = %s, partition = %s, key = %s, value = %s",
 64                         record.topic(), record.partition(),
 65                         record.key(), record.value()
 66                 ));
 67                 if (record.value().equals("done")) {
 68                     flag = false;
 69                 }
 70             }
 71 
 72             try {
 73                 consumer.commitSync();
 74             } catch (CommitFailedException ex) {
 75                 System.out.println("commit failed error: "
 76                         + ex.getMessage());
 77             }
 78 
 79             if (!flag) {
 80                 break;
 81             }
 82         }
 83     }
 84 
 85     private static void generalConsumeMessageAsyncCommit() {
 86 
 87         properties.put("auto.commit.offset", false);
 88         consumer = new KafkaConsumer<>(properties);
 89         consumer.subscribe(Collections.singletonList("imooc-kafka-study-x"));
 90 
 91         while (true) {
 92             boolean flag = true;
 93 
 94             ConsumerRecords<String, String> records =
 95                     consumer.poll(100);
 96             for (ConsumerRecord<String, String> record : records) {
 97                 System.out.println(String.format(
 98                         "topic = %s, partition = %s, key = %s, value = %s",
 99                         record.topic(), record.partition(),
100                         record.key(), record.value()
101                 ));
102                 if (record.value().equals("done")) {
103                     flag = false;
104                 }
105             }
106 
107             // commit A, offset 2000
108             // commit B, offset 3000
109             consumer.commitAsync();
110 
111             if (!flag) {
112                 break;
113             }
114         }
115     }
116 
117     private static void generalConsumeMessageAsyncCommitWithCallback() {
118 
119         properties.put("auto.commit.offset", false);
120         consumer = new KafkaConsumer<>(properties);
121         consumer.subscribe(Collections.singletonList("imooc-kafka-study-x"));
122 
123         while (true) {
124             boolean flag = true;
125 
126             ConsumerRecords<String, String> records =
127                     consumer.poll(100);
128             for (ConsumerRecord<String, String> record : records) {
129                 System.out.println(String.format(
130                         "topic = %s, partition = %s, key = %s, value = %s",
131                         record.topic(), record.partition(),
132                         record.key(), record.value()
133                 ));
134                 if (record.value().equals("done")) {
135                     flag = false;
136                 }
137             }
138 
139             consumer.commitAsync((map, e) -> {
140                 if (e != null) {
141                     System.out.println("commit failed for offsets: " +
142                     e.getMessage());
143                 }
144             });
145 
146             if (!flag) {
147                 break;
148             }
149         }
150     }
151 
152     @SuppressWarnings("all")
153     private static void mixSyncAndAsyncCommit() {
154 
155         properties.put("auto.commit.offset", false);
156         consumer = new KafkaConsumer<>(properties);
157         consumer.subscribe(Collections.singletonList("imooc-kafka-study-x"));
158 
159         try {
160 
161             while (true) {
162                 ConsumerRecords<String, String> records =
163                         consumer.poll(100);
164 
165                 for (ConsumerRecord<String, String> record : records) {
166                     System.out.println(String.format(
167                             "topic = %s, partition = %s, key = %s, " +
168                                     "value = %s",
169                             record.topic(), record.partition(),
170                             record.key(), record.value()
171                     ));
172                 }
173 
174                 consumer.commitAsync();
175             }
176         } catch (Exception ex) {
177             System.out.println("commit async error: " + ex.getMessage());
178         } finally {
179             try {
180                 consumer.commitSync();
181             } finally {
182                 consumer.close();
183             }
184         }
185     }
186 
187     public static void main(String[] args) {
188         generalConsumeMessageAutoCommit();
189     }
190 }

 

标签:演示,key,value,kafka,record,flag,提交,consumer,properties
From: https://www.cnblogs.com/15078480385zyc/p/18170097

相关文章

  • kafka核心概念Broker、Topic、Partition和Replication
    在Kafka中,Broker、Topic、Partition和Replication是四个核心概念,它们各自扮演了不同的角色并共同协作以确保数据的可靠性、可扩展性和高性能。以下是关于这四个概念的详细解释:Broker(代理)*Broker是Kafka集群中的一个节点,负责存储和转发消息。Kafka集群由多个Broker组成。*Brok......
  • Kafka SASL认证与ACL配置
    ​ Kafka版本2.12-2.2.0,Zookeeper版本:3.4.14,认证方式:SASL/PLAIN,这种方式其实就是一个账号/密码的认证方式,不过它有很多缺陷,比如用户名密码是存储在文件中,不能动态添加,密码明文等等!建议大家用SASL/SCRAM的方式,这种方式用户名/密码是存储在zookeeper中,能够支持动态添加用户。该种......
  • 4-kafka安装
    上传Kafka压缩包将kafka_2.12-3.6.1.tgz文件上传到三台虚拟机的/opt/software目录中解压Kafka压缩包进入/opt/software目录cd/opt/software解压缩文件到指定目录tar-zxvfkafka_2.12-3.6.1.tgz-C/opt/module/进入/opt/module目录cd/opt/module修改文件目录名称m......
  • [转]Git清除贡献者信息和历史提交记录,将开源项目拉取二次开发时可用到
     如果我们用git与github扒了别人的开源代码,想拿来用到自己项目中,但是提交过后,会发现仓库的历史记录又臭又长,贡献者里还有别人的名字,打算把历史记录全部清除并且让目前所有文件全部变成首次commit的状态。可以试试以下这个方法,包你百试百灵!1.Checkout检出新的分支#orphan参......
  • 在flink消费一段时间kafka后,kafka-group的offset被重置了是怎么回事?
    一、背景腾讯Flink使用KafkaSourceAPI创建source端,源码中默认开启了checkpoint的时候提交offset到kafka-broker。读取kafka数据写入到iceberg目前发现一个问题,就是消费数据的时候,消费一段时间后,kafka-group的offset就重置了,看起来像重置到earliest了,导致消费数据激增二......
  • kafka是如何保证数据不丢失的
    Kafka通过一系列机制来确保数据不丢失,这些机制涵盖了生产者、Broker和消费者等关键环节。以下是Kafka保证数据不丢失的主要方式:生产者生产数据不丢失:同步方式:生产者发送数据给Kafka后,会等待Kafka的确认。如果在一定时间内(如10秒)没有收到Broker的ack响应,生产者会认为发送失败......
  • 面试常问问题-中间件一kafka
    kafka是一个分布式发布订阅消息系统名词:生产者(producer)、消费者(consumer)、topic(一个消息又一个topic)、partition(分区)作用1:传输数据(ey:埋点)kafka发送消息失败的可能:1、网络问题导致kafka的服务器无法连接;2、生产者配置错误;3、消息大小超过服务器配置的限制;4、主......
  • 面试常问问题-中间件一kafka
    kafka是一个分布式发布订阅消息系统名词:生产者(producer)、消费者(consumer)、topic(一个消息又一个topic)、partition(分区)作用1:传输数据(ey:埋点)kafka发送消息失败的可能:1、网络问题导致kafka的服务器无法连接;2、生产者配置错误;3、消息大小超过服务器配置的限制;4、主题......
  • git不同项目提交时,显示不同的用户名
    场景在使用git时,不同项目想使用不同的名称和邮箱解决方法每个项目独立设置不同的名称和邮箱项目clone下来后,使用如下命令gitconfig--localuser.name'yourname'gitconfig--localuser.name'youremail'或者直接修改.git/config文件,加入下列配置[user]name......
  • 利用selenium自动提交表单
    安装seleniumpip--trusted-hostpypi.tuna.tsinghua.edu.cninstallseleniumpip--trusted-hostmirrors.aliyun.cominstallcookielibpip--trusted-hostpypi.douban.cominstallcookielib安装chromedriverchromedriver下载地址http://npm.taobao.org/mirrors/chr......