1、解决消息重复发送
当使用Kafka生产者发送消息时,可以设置消息的Key,使用Key来保证相同Key的消息不会被重复发送。在发送消息时,可以使用带Key的消息发送方式,如下所示:
msg := &sarama.ProducerMessage{ Topic: "test", Partition: int32(0), Key: sarama.StringEncoder("unique-key"), Value: sarama.StringEncoder("test-message"), } partition, offset, err := producer.SendMessage(msg)
如果相同Key的消息已经被发送过,那么Kafka会将新的消息发送到相同分区的同一个Offset,从而实现去重。
2、解决消息重复消费
当使用Kafka消费者消费消息时,需要保存已经消费的消息的Offset,以避免重复消费。在Go语言中,可以使用Kafka的Offset来标识已经消费的消息。
在消费消息时,可以使用高级消费者API的方式,手动提交Offset。当消费完一个分区的消息后,可以手动提交Offset,表示已经成功消费了这些消息。在下次启动消费者时,可以从上次提交的Offset处开始消费,从而避免重复消费。
示例代码如下所示:
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil) if err != nil { panic(err) } defer consumer.Close() partitionConsumer, err := consumer.ConsumePartition("test", 0, sarama.OffsetNewest) if err != nil { panic(err) } defer partitionConsumer.Close() for msg := range partitionConsumer.Messages() { // 处理消息 fmt.Println(string(msg.Value)) // 手动提交Offset partitionConsumer.MarkOffset(msg, "") }
在上面的代码中,通过调用MarkOffset
方法来手动提交Offset。如果程序崩溃或者消费过程中发生错误,已经成功消费的消息的Offset将会被提交,从而避免重复消费。