首页 > 其他分享 >go高并发之路——消息中间件kafka(上)

go高并发之路——消息中间件kafka(上)

时间:2024-07-30 22:28:36浏览次数:13  
标签:err sarama 分区 kafka 版本 消息中间件 go Kafka

一般高并发的业务都是某个时间段的请求量特别大,比如本人负责的直播业务,基本上一天就两个高峰段:早上和晚上的特定时间段。其它的时间里,流量基本都比较平稳。那么面对流量高峰,我们可以采取哪些措施呢?常见的有服务器和DB提前扩容、监控告警(盯监控)、流量削峰、加缓存、网关限流、服务降级等措施,具体问题具体分析。
接下来,我们就学习下常见的抵御流量洪峰的一个手段——消息中间件。市面上常见的消息中间件有很多种类,比如Kafka、RabbitMQ、RocketMQ、ActiveMQ、ZeroMQ等。今天就和大家一起学习下kafka的一些常见的知识点和一些坑点。

一、kafka的类库

go常用的kafka库有 https://github.com/IBM/sarama (推荐)、 https://github.com/confluentinc/confluent-kafka-go 等。 我基本上用的都是第一个类,但是有一个比较大的坑:就是在v1.32.0中,会出现生产kafka消息超时的一个问题。这个当时也是定位了蛮久的,通过日志排查、升级Kafka服务器版本、换代码写法、找腾讯云厂商定位等等措施,最终才定位到是这个版本有问题,被官方所废弃了。最终是将SDK升级到更高的版本才解决了此问题。

二、kafka的版本号

Kafka 目前总共演进了 7 个大版本,分别是 0.7、0.8、0.9、0.10、0.11、1.0 和 2.0、3.0,其中的小版本和 Patch 版本很多。有兴趣的可以去了解下各个版本都更新了什么:https://kafka.apache.org/downloads。

建议使用0.10.0.0之后的版本,因为这是里程碑式的大版本,该版本引入了 Kafka Streams。从这个版本起,Kafka 正式升级成分布式流处理平台,虽然此时的 Kafka Streams 还基本不能线上部署使用。0.10 大版本包含两个小版本:0.10.1 和 0.10.2,它们的主要功能变更都是在 Kafka Streams 组件上。自 0.10.2.1 版本起,新版本 Consumer API 算是比较稳定了。据我了解,目前公司现网环境使用的最低的版本也是这个0.10.2.1。

最后强烈建议,不论你用的是哪个版本,都得尽量保持服务器端版本和客户端版本一致,否则你将损失很多 Kafka 为你提供的性能优化收益。而且可能会出现很多莫名其妙的问题,比如kafka进程假死、连接不上broker等问题。

三、kafka生产者分区策略

使用过kafka的小伙伴都应该比较清楚,kafka下真实存储数据的地方是topic(主题)之下的partition(分区),而topic下的每条消息只会保存在某一个partition中,不会在多个分区中被保存多份。之所以topic之下还有partition,主要作用是为了提高kafka负载均衡的能力,提高系统的吞吐性。

标题中提到的分区策略就是决定生产者将消息发送到哪个分区的算法,那么kafka分区都有哪些策略呢?
主要有四个:

1、轮询策略,即按顺序分配,默认分区策略。举个例子,假设一个主题包含3个分区。第一条消息会被发送到分区0,第二条消息会被发送到分区1,第三条消息会被发送到分区2。接着,当生产第4条消息时,分配将重新开始,这条消息会被发送到分区0。以此类推。

2、随机策略,就是随意地将消息放置到任何一个分区,这个本质上和轮询差不多,也是为了将数据打散,使其均匀分布,但是打散效果比轮询差一点,好像新版本的kafka已经废弃了,改为默认是轮询分配了。

3、按key消息建保存策略。Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key可以是某个业务的标识划分比如公司、部门、业务ID等等。只要消息定义了key,那么就可以保证同一个key的所有消息都进入相同的分区里面。如果指定了 Key,那么默认实现按消息键保存策略;如果没有指定 Key,则使用轮询策略。这个方式作用非常强大,当你需要实现消息的顺序消费的时候,就可以指定这个key。

举个实际的使用场景,我这里有一个业务,用户会有两种行为,新增和更改,这两种行为我这边都会生产kafka消息给下游消费,那么这种情况下能使用上面的轮询和随机策略吗?很明显不行,假如新增和更改只隔了很短的一个时间间隔,然后这两条消息被推送到不同的分区,那么就可能出现这样的情况:消费者先消费了更改的数据,然后再消费到新增的数据,这样数据就乱了啊。那这时候,按key分区的策略就派上用场了,我可以将用户ID设置成一个key,那么该用户的数据都会落到同一个分区,且有先后顺序了,这样就不会出问题了。

下面是使用sarama实现的一个demo:

package main

import (
	"fmt"
	"log"
	"strconv"

	"github.com/IBM/sarama"
)

func main() {
	// 创建生产者配置
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	config.Version = sarama.V1_1_1_0 //kafka指定版本号,与broker保持一致

	// 创建生产者
	producer, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
	if err != nil {
		log.Fatalf("Failed to create Kafka producer: %v", err)
	}
	defer producer.Close()

	for i := 0; i < 100; i++ { //生产100条消息
		// 创建消息并指定分区
		message := &sarama.ProducerMessage{
			Topic: "live-task-reward",
			Key:   sarama.StringEncoder("jay"), //指定key,那么该key的100条消息都会落在同一个分区,落在哪个分区根据这个key计算出来
			Value: sarama.StringEncoder("Hello, Kafka!" + strconv.Itoa(i)),
		}

		// 发送消息
		partition, offset, err := producer.SendMessage(message)
		if err != nil {
			log.Fatalf("Failed to send message: %v", err)
		}

		fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset)
	}

}

4、指定分区。就是在生产消息的时候可以直接指定分区生产,使消息落入到具体的某个分区中。下面是使用sarama实现的一个demo:

package main

import (
	"fmt"
	"log"
	"strconv"

	"github.com/IBM/sarama"
)

func main() {
	// 创建生产者配置
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	config.Producer.Partitioner = sarama.NewManualPartitioner //如果需要指定分区的时候,这个参数必须设置
	config.Version = sarama.V1_1_1_0                          //kafka指定版本号,与broker保持一致

	// 创建生产者
	producer, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
	if err != nil {
		log.Fatalf("Failed to create Kafka producer: %v", err)
	}
	defer producer.Close()

	for i := 0; i < 100; i++ { //生产100条消息
		// 创建消息并指定分区
		message := &sarama.ProducerMessage{
			Topic:     "live-task-reward",
			Key:       sarama.StringEncoder("jay"), //即使这里指定了key,但kafka不会去计算该key。因为下面指定了分区1,那么所有数据都会落在分区1
			Value:     sarama.StringEncoder("Hello, Kafka!" + strconv.Itoa(i)),
			Partition: 1, // 指定分区为 1
		}

		// 发送消息
		partition, offset, err := producer.SendMessage(message)
		if err != nil {
			log.Fatalf("Failed to send message: %v", err)
		}

		fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset)
	}

}

以上,分别讲了kafka的常用类库、版本和生产者分区策略的一些知识点和踩过的一些坑,这也都是用好kafka必须掌握的一些基础知识。好了,下篇我将会继续为大家讲解kafak的剩余知识。回见~

标签:err,sarama,分区,kafka,版本,消息中间件,go,Kafka
From: https://www.cnblogs.com/lmz-blogs/p/18333465

相关文章

  • Go语言---sort 包中sort.Ints()、sort.Strings()、sort.Slice()、sort.SliceStable()、s
    在每一种编程语言中,都会涉及到排序操作。而在Go语言中,其中内置的sort包中提供了根据一些排序函数来对任何序列进行排序的功能。通过这个包中的一些方法,我们可以对一些基本的可以比较大小的类型的切片进行排序,也可以通过实现排序接口的几个特定方法实现自定义排序。sort.I......
  • Python面试题:如何使用Django Rest Framework构建RESTful API
    使用DjangoRestFramework(DRF)构建RESTfulAPI是一个常见且强大的方法。以下是一个详细的指南,帮助你从头开始创建一个基本的Django项目,并使用DRF构建一个RESTfulAPI。环境准备安装Django和DjangoRestFramework:pipinstalldjangodjangorestframewor......
  • golang面试题:json包变量不加tag会怎么样?
    问题json包里使用的时候,结构体里的变量不加tag能不能正常转成json里的字段?怎么答如果变量首字母小写,则为private。无论如何不能转,因为取不到反射信息。如果变量首字母大写,则为public。不加tag,可以正常转为json里的字段,json内字段名跟结构体内字段原名一致。加了tag,从str......
  • 7/30 go协程
    协程是逻辑上优化的线程,不用切换CPU和内核态     组合访问   TRANSLATEwithxEnglishArabicHebrewPolishBulgarianHindiPortugueseCatalanHmongDawRomanianChineseSimplifiedHungarianRussianChineseTraditi......
  • 理解 Go 语言的池Pool
            Go是一种有自动垃圾回收机制的编程语言,采用三色并发标记算法标记对象并回收。和其他没有自动垃圾回收机制的编程语言不同,使用Go语言创建对象时,我们没有回收/释放的心理负担,想创建对象就创建,想用对象就用。        但是,如果想使用Go语言开发一个......
  • 深入浅出Django的路由系统:全面指南与实战示例
    Django的路由系统用于将用户请求的URL与相应的视图函数匹配,它决定了用户访问特定URL时会执行哪个视图函数。通过URL配置,路径匹配,路径参数,命名路由,路由命名空间,包含其他URL配置,以及反向解析,Django实现了高效且灵活的URL路由管理。1.URL配置URL配置是Django项目中定义URL与视......
  • 计算机毕业设计django/flask+uniapp私人定制商品订单系统hbuiderx微信小程序
    私人订制订单发布与对应商品出售平台方面的任务繁琐,以至于每年都在私人订制订单发布与对应商品出售平台这方面投入较多的精力却效果甚微,私人订制订单发布与对应商品出售平台的目标就是为了能够缓解私人订制订单发布与对应商品出售平台管理面临的压力,让私人订制订单发布与对......
  • debezium source端同步海量数据库数据vastbase到kafka
    debeziumsource端同步海量数据库数据vastbase到kafkaOriginal 韦家富 心流时刻  2024年01月30日15:17 北京本文用于debeziumsource端同步海量vastbase数据库的数据到kafka,sink端同理。 1、基础环境安装 1、kafka2、zookeeper3、海量数据库vastbase 自行......
  • go-zero 自定义中间件的几种方式
    目录1.通过api文件生成并填入具体逻辑定义api生成对应的模板填充中间件逻辑2.在server启动前完成中间件的注册定义中间件:注册到server中首先go-zero已经为我们提供了很多的中间件的实现,但有时难免有需求需要自定义,这里介绍几种自定义的方法,供参考。1.通过api文件生......
  • Kafka的人工智能与机器学习应用
    Kafka的人工智能与机器学习应用作者:禅与计算机程序设计艺术/ZenandtheArtofComputerProgramming1.背景介绍1.1问题的由来随着互联网的快速发展,数据量呈爆炸式增长,如何高效地处理和分析这些数据成为了企业和研究机构面临的挑战。Kafka作为一款高吞吐量的分布式......