首页 > 其他分享 >go高并发之路——消息中间件kafka(下)

go高并发之路——消息中间件kafka(下)

时间:2024-08-07 15:50:37浏览次数:7  
标签:副本 err 分区 领导者 Kafka 消息中间件 go kafka

一、kafka副本机制

所谓的副本机制(Replication),也可以称之为备份机制,通常是指分布式系统在多台网络互联的机器上保存有相同的数据拷贝。kafka的副本概念实际上是在分区(partition)层级下定义的,每个分区配置有若干个副本。根据 Kafka 副本机制的定义,同一个分区下的所有副本保存有相同的消息序列,这些副本分散保存在不同的Broker 上,从而能够对抗部分 Broker 宕机带来的数据不可用

如下图所示,展示了一个有 3 台 Broker 的 Kafka 集群上的副本分布情况,可以看到,主题 1 分区 0 的 2 个副本分散在 2 台 Broker 上,其他主题分区的副本也类似,都散落在不同的 Broker 上,从而实现数据冗余。

既然副本有多个,而多个副本的数据都要保持一致,那这是怎么实现的呢?
针对这个问题,最常见的就是采用基于领导者(leader)的副本机制了,而kafka也是这样设计的。下面是kafka基于领导者的工作原理图:

关于领导者副本机制,我们需要清楚几个点:
(1)在 Kafka 中,副本分成两类:领导者副本(Leader Replica)和追随者副本(Follower Replica)。每个分区在创建时都要选举一个副本,称为领导者副本,其余的副本都称为追随者副本。

(2)在 Kafka 中,追随者副本是不对外提供服务的,任何一个追随者副本都不能响应消费者和生产者的读写请求。这个很特殊,和其他的分布式系统不太一样,例如MySQL,一般备库都是可以提供对外服务的,比如读操作。kafka追随者副本唯一的任务就是从领导者副本异步拉取消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。

(3)当领导者副本挂掉了,或者说领导者副本所在的 Broker 宕机时,kafka会依托于ZooKeeper组件监控到,然后开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。而老 Leader 副本重启回来后,只能作为追随者副本加入到集群中。

(4)kafka的领导者副本机制为什么要这么设计?
有一个好处就是,领导者副本做完了所有操作,不会存在所谓的数据读取延迟,毕竟像MySQL那样的话,从写副本到其它读副本,肯定会有数据延迟,那么连接读副本的客户端就要容忍数据延迟带来的业务影响。

(5)既然追随者副本不提供服务,只是保持与领导者副本的数据同步,那么追随者副本到底在什么条件下才算与 Leader 同步呢?
kafka中有个概念 In-sync Replicas,也就是所谓的 ISR 副本集合,ISR 中的副本都是与 Leader 同步的副本,相反,不在 ISR 中的追随者副本就被认为是与 Leader 不同步的。当然,Leader副本天然就在ISR副本中。至于Kafka怎样判断某个副本是不是ISR,这就比较复杂了,有兴趣的同学可以去了解下。

如下图所示,我们可以看下腾讯云的ckafka的管理台,就有分区、leader、副本、ISR、未同步副本等这些字段,说明这些也是比较基础的概念,我们必须得清楚。

二、kafka为什么这么快

使用过kafka的同学应该都清楚,kafak处理数据的速度是飞一般的感觉。那为啥kafak会这么快呢?
1、顺序写入:Kafka的数据存储是基于日志的,消息被追加到分区的末尾。顺序写入在磁盘上比随机写入更高效,因为现代磁盘在顺序写入时能够达到更高的吞吐量。

2、零拷贝
Kafka在发送消息时使用了零拷贝技术,即操作系统在从磁盘读取数据并将其发送到网络时,不需要将数据拷贝到用户空间。这减少了数据的拷贝次数,从而提升了性能。

3、分区和并行处理
Kafka的主题被分成多个分区,每个分区可以独立于其他分区进行读写操作。这种分区机制允许Kafka在多个服务器上分布负载,从而实现高吞吐量。消费者可以并行地从多个分区中读取数据,这进一步提高了系统的处理能力。

4、批量处理
Kafka支持批量发送消息。生产者可以将多条消息批量发送到Kafka服务器,消费者也可以批量拉取消息。这种批量处理方式减少了网络往返次数,提高了效率。

5、数据压缩
Kafka支持对消息进行压缩,如GZIP、Snappy、LZ4等。压缩可以减少数据传输和存储的大小,进一步提高效率。

三、kafka如何重设消费者组位移

一般kafka中的消息是有保留时长的,有两个维度,一个是broker,另一个是可以针对broker下的各个topic分别设置保留时长。一般我们现网会设置保留时长1-7天,根据具体情况而定。

假设现在有个业务场景,我们修改了某个新功能,然后上线了两天了,突然发现这个上线的新功能有bug,需要重新消费上线之后的数据,那么这时候我们能怎么做呢?
很简单,我们可以创建一个新的消费者组,然后把位移指定到上线的那个时间点的offset,最后重新跑下老代码脚本,就可以搞定了。

那么我们如何去重设新消费者组的位移呢?
有两个方式:
1、假设我们使用腾讯云、阿里云之类的云kafka,那我们直接去后台设置一下就好,如下图所示就是腾讯云kafka重设消费者组位移的界面,可以看到有很多种设置的方式,可以以Topic为维度,也可以以Partition为维度

2、另一种方式就是通过代码去修改提交的位移,然后再重新消费,下面是重置消费者组位移的一个demo,实现了重置分区2的offset到20,重置之后,就可以从这个位点开始消费数据了:

// 重置消费者组位移
// 有条件(kafka)的跑下这段代码,也可以得出一个结论,消费者组位移重置过后,一旦被消费,那么就不可能再被重置了。
// 想要消费那段数据,得整一个新的消费者组
func main() {
	// Kafka集群的地址
	brokers := []string{"172.88.88.88:9092"}

	// 创建配置
	config := sarama.NewConfig()
	config.Version = sarama.V1_1_1_0 // 你需要设置你的Kafka版本

	// 创建客户端
	client, err := sarama.NewClient(brokers, config)
	if err != nil {
		log.Fatalf("Error creating client: %v", err)
	}
	defer client.Close()

	// 消费者组
	groupID := "goLangAndKafka"
	topic := "xxx-cash"
	partition := int32(2) // 指定分区2

	// 获取消费者组的位移
	offsetManager, err := sarama.NewOffsetManagerFromClient(groupID, client)
	if err != nil {
		log.Fatalf("Error creating offset manager: %v", err)
	}
	defer offsetManager.Close()

	partitionOffsetManager, err := offsetManager.ManagePartition(topic, partition)
	if err != nil {
		log.Fatalf("Error creating partition offset manager: %v", err)
	}
	defer partitionOffsetManager.Close()

	// 重置位移到指定的偏移量
	newOffset := int64(20) // 你想要设置的新位移,这里重置偏移量到20
	partitionOffsetManager.MarkOffset(newOffset, "")

	log.Printf("Successfully reset offset for group %s, topic %s, partition %d to %d", groupID, topic, partition, newOffset)

	// 逻辑:重新消费20位点之后的数据...

}

以上,就是对kafka副本机制、kafka为什么这么高性能、kafka如何重设消费者组位移内容的一些讲解。至此,kafka的那些事儿基本上就讲完了,都是我们工作中需要弄懂整明白的东西。当然,这些是比较常见的知识,有些关于kafka更加深入的东西还需要各位去探索、去学习。

标签:副本,err,分区,领导者,Kafka,消息中间件,go,kafka
From: https://www.cnblogs.com/lmz-blogs/p/18347172

相关文章

  • 通过go自定义alertmanager 发送通知的webhook
    本文主要是大体介绍webhook的代码以及涉及到的服务部署,详细配置需要自己弄Prometheus、alertmanager部署先创建alertmanager、Prometheus的docker-composeyaml文件,下面只是把服务运行起来,具体功能需要自己配置,如果有就跳过version:'3'services:prometheus:ima......
  • 基于django+vue的小说阅读系统【开题报告+程序+论文】-计算机毕设
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着互联网技术的飞速发展,数字阅读已成为人们获取信息与娱乐的重要方式之一。小说作为文学的重要分支,拥有庞大的读者群体。然而,传统的小说......
  • 基于django+vue的小区物业管理系统【开题报告+程序+论文】-计算机毕设
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着城市化进程的加速,住宅小区作为城市居民生活的重要载体,其管理与服务水平直接影响到居民的生活质量和幸福感。传统的物业管理方式往往依......
  • 部署argo-rollouts
    https://github.com/argoproj/argo-rollouts/releaseskubectlcreatenamespaceargo-rolloutskubectlapply-nargo-rollouts-fhttps://github.com/argoproj/argo-rollouts/releases/download/v1.3.1/install.yamlkubectlapply-fhttps://github.com/argoproj/argo-r......
  • ssh 远程登录报错:Unable to negotiate with IP port 22: no matching host key type f
    最近在Mac上想要远程一台Linux服务器,结果不知怎么的就不能使用以前的ssh登录了iot@ios-iMac~%[email protected]:nomatchinghostkeytypefound.Theiroffer:ssh-rsa,ssh-dss ......
  • kafka疑问合集
    1.kafka重启会丢数据吗?在Kafka中,重启不会导致数据丢失。Kafka使用持久化的方式将数据存储在磁盘上,因此即使重启Kafka,数据仍然会被保留。当Kafka重启后,它会从存储的数据中恢复状态,确保数据的持久性和一致性。因此,用户不必担心数据丢失的问题。2.kafka如何保证数据不丢失?Kafka通......
  • go开发要不要拆分独立的包
    在Go语言中,业务开发时是否使用独立的Go包(package)主要取决于项目的结构和需求。以下是使用独立Go包的一些区别和考虑因素:1. 模块化:独立的Go包有助于将代码分割成更小的、可管理的模块。这有助于提高代码的可读性和可维护性。2. 重用性:通过将功能封装在独立的包中,你可以......
  • 计算机毕业设计django+vue基于的勤工助学服务系统【开题+程序+论文】
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着高等教育的普及和社会经济的发展,勤工助学已成为许多在校学生缓解经济压力、提升个人能力的重要途径。然而,传统勤工助学服务往往依赖于......
  • golang sliver二次开发自定义命令(格式乱后面再调)
    准备工作sliver客户端和服务端之间命令的通信采用的grpc,服务端和和植入程序通信使用的protobuf,所以在开发之前需要了解grpc和protobuf,相关文档:https://jergoo.gitbooks.io/go-grpc-practice-guide/content/chapter2/hello-grpc.htmlhttps://jergoo.gitbooks.io/go-grpc-pra......
  • 国产麒麟:安装 google 浏览器
    1、下载安装包https://www.google.cn/intl/zh-CN/chrome/browser-tools/  2、安装sudo yumlocalinstallgoogle-chrome-stable_current_x86_64.rpm 3、访问  ......