首页 > 其他分享 >golang解决kafka消息重复发送和重复消费

golang解决kafka消息重复发送和重复消费

时间:2023-03-22 17:59:05浏览次数:42  
标签:消费 err 重复 kafka golang 消息 Key Offset

1、解决消息重复发送

当使用Kafka生产者发送消息时,可以设置消息的Key,使用Key来保证相同Key的消息不会被重复发送。在发送消息时,可以使用带Key的消息发送方式,如下所示:

msg := &sarama.ProducerMessage{
    Topic:     "test",
    Partition: int32(0),
    Key:       sarama.StringEncoder("unique-key"),
    Value:     sarama.StringEncoder("test-message"),
}
partition, offset, err := producer.SendMessage(msg)

如果相同Key的消息已经被发送过,那么Kafka会将新的消息发送到相同分区的同一个Offset,从而实现去重。

2、解决消息重复消费

当使用Kafka消费者消费消息时,需要保存已经消费的消息的Offset,以避免重复消费。在Go语言中,可以使用Kafka的Offset来标识已经消费的消息。

在消费消息时,可以使用高级消费者API的方式,手动提交Offset。当消费完一个分区的消息后,可以手动提交Offset,表示已经成功消费了这些消息。在下次启动消费者时,可以从上次提交的Offset处开始消费,从而避免重复消费。

示例代码如下所示:

consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
    panic(err)
}
defer consumer.Close()

partitionConsumer, err := consumer.ConsumePartition("test", 0, sarama.OffsetNewest)
if err != nil {
    panic(err)
}
defer partitionConsumer.Close()

for msg := range partitionConsumer.Messages() {
    // 处理消息
    fmt.Println(string(msg.Value))

    // 手动提交Offset
    partitionConsumer.MarkOffset(msg, "")
}

在上面的代码中,通过调用MarkOffset方法来手动提交Offset。如果程序崩溃或者消费过程中发生错误,已经成功消费的消息的Offset将会被提交,从而避免重复消费。

标签:消费,err,重复,kafka,golang,消息,Key,Offset
From: https://www.cnblogs.com/lwhzj/p/17244912.html

相关文章

  • 解决Kafka总是丢消息的方法和原理
    注:本文转自:https://www.toutiao.com/article/7210953985497678347/?log_from=f0ecce317abb8_1679450040551引入MQ消息中间件最直接的目的:系统解耦以及流量控制(削峰填谷)......
  • python处理kafka数据
    1、程序作用:从多个topic中读取数据--处理数据--写入新的kafkatopic中pip3installkafka-pythonimportjsonfromkafkaimportKafkaProducerfromkafkaimportKafk......
  • Spring MVC拦截器+注解方式实现防止表单重复提交
    原理:在新建页面中Session保存token随机码,当保存时验证,通过后删除,当再次点击保存时由于服务器端的Session中已经不存在了,所有无法验证通过。注,如果是集群的方式,则需要将tok......
  • docker安装kafka并测试
    #1.下载docker镜像dockerpullwurstmeister/zookeeperdockerpullwurstmeister/kafka#2.启动zookeeper(单机方式)dockerrun-d--namezookeeper-p2181:2181-tw......
  • kafka消费消息-java版-demo
    @SpringBootApplicationpublicclassCcApplication{publicstaticvoidmain(String[]args){SpringApplication.run(CcApplication.class,args);/......
  • 解决Hyperledger Fabric通道重复创建问题( readset expected key [Group] /Channel/App
    运行如下几行代码即可dockerrm$(dockerps-a|grep"hyperledger/*"|awk"{print\$1}")&&\docker-composedown--volumes--remove-orphans&&\dockervolumep......
  • kafka简介
    Kafka特点Kafka已被多家不同类型的公司作为多种类型的数据管道和消息系统使用。行为流数据是几乎所有站点在对其网站使用情况做报表时都要用到的数据中最常规的部分。包......
  • 华为OD机试 找出重复代码
    本期题目:找出重复代码......
  • golang 版本管理(windows版本)
    golang版本管理(windows版本) 一、下载版本管理器使用的开源项目地址:https://github.com/voidint/g下载release安装包https://github.com/voidint/g/releases/tag/v1.......
  • 太坑了,我竟然从RocketMQ源码中扒出了7种导致消息重复消费的原因
    大家好,我是三友~~在众多关于MQ的面试八股文中有这么一道题,“如何保证MQ消息消费的幂等性”。为什么需要保证幂等性呢?是因为消息会重复消费。为什么消息会重复消费?明明......