首页 > 其他分享 >Kafka测试初探【Go】

Kafka测试初探【Go】

时间:2023-05-25 17:03:57浏览次数:60  
标签:Java err sarama Kafka 初探 Go config

上周分享了Kafka性能测试初探的Java版本,有读者留言说太简单,内容比较水。这里澄清一下,是我学得比较水。文章定位就是一篇使用Java语言的Kafka Client客户端进行简单操作演示,然后模拟一下简单场景的性能测试。其中深入学习Kafka的可以随处搜到很权威实用的资料,有深入学习需求的可以自行寻找。

好久没有写Go了,这才突然觉察到,又重新复习了一波Go语言的基础语法。顺带着之前留下的好习惯,每个学习的框架和工具都用Java和Go写一遍。这次也分享一下Go语言的Kafka基础入门,以及生产者的简单测试场景。

我用的是shopify出的sarama,依赖如下github.com/Shopify/sarama v1.38.1。在搜资料的过程中,还发现有使用其他客户端的,选择挺多。

Kafka配置

Sarama框架中的生产者和消费者的配置类是一个,不太清楚这么设计的意图,两个配置重合度并不高,在Sarama中也是分开配置,但使用了同一个配置类。

生产者配置:

	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	config.Producer.Return.Errors = true
	config.Producer.RequiredAcks = sarama.NoResponse
	config.Producer.Compression = sarama.CompressionLZ4
	config.Producer.Timeout = time.Duration(50) * time.Millisecond
	config.Producer.Retry.Max = 3

消费者配置:

	config := sarama.NewConfig()
	config.Consumer.Offsets.AutoCommit.Enable = true
	config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
	config.Consumer.Offsets.Initial = sarama.OffsetOldest
	config.Consumer.Offsets.Retry.Max = 3

这里只选择部分参数,详细的配置项和注释都可以在源码中找到,Sarama的一个好处就是注释非常全,甚至不用看官方API文档。

生产者

下面是生产者的代码,相比较Java来说,我这里增加了header的实践,其实Java也是支持的,只是当时学的时候漏掉了这个知识点。

producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
	if err != nil {
		log.Fatal(err)
		return
	}
	defer func() {
		// 关闭生产者
		if err = producer.Close(); err != nil {
			log.Fatal(err)
			return
		}
	}()
	// 定义需要发送的消息
	headers := []sarama.RecordHeader{sarama.RecordHeader{
		Key:   []byte("funtest"),
		Value: []byte("have fun ~"),
	}}

	msg := &sarama.ProducerMessage{
		Topic:   "topic_test",
		Key:     sarama.StringEncoder("test"),
		Value:   sarama.StringEncoder("ddddddddddddddddd"),
		Headers: headers,
	}
	// 发送消息,并获取该消息的分片、偏移量
	for i := 0; i < 100; i++ {
		ftool.Sleep(1000)
		partition, offset, err := producer.SendMessage(msg)
		if err != nil {
			log.Fatal(err)
		}
		fmt.Printf("partition:%d offset:%d\n", partition, offset)
	}

这里官方给的实践代码中感觉ProducerMessage类似于Java的org.apache.kafka.clients.producer.ProducerRecord#ProducerRecord,也是可以指定partitionid和时间戳,以及单独设置retries次数的。还有一个比较重要的类AsyncProducer,暂时不探索了。

消费者

消费者使用上Go和Java差异比较大,Sarama用了channel的概念,可以一直不停止从服务端获取消息对象,不像Java可以指定一次接受的消息数量,单次最大等待时间等。盲猜这里channel的性能太好了吧,不需要复杂设计也能满足需求。

	consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
	if err != nil {
		fmt.Printf("fail to start consumer, err:%v\n", err)
		return
	}
	topic := "topic_test"
	partitionList, err := consumer.Partitions(topic) // 根据topic取到所有的分区
	if err != nil {
		fmt.Printf("fail to get list of partition:err%v\n", err)
		return
	}
	fmt.Println(partitionList)
	defer consumer.Close()
	for partition := range partitionList { // 遍历所有的分区
		// 针对每个分区创建一个对应的分区消费者
		log.Println(partition)
		pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetOldest)
		if err != nil {
			fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
		}
		for msg := range pc.Messages() {
			log.Println(string(msg.Value))
			//log.Println(string(msg.Headers[0].Value))
		}
		for {
			msg := <-pc.Messages()
			log.Println(string(msg.Value))
		}
	}

后来我写了两种接受方式,其实都是阻塞的,如果是性能测试的时候可以使用Go中的go关键字起routine来执行。

性能测试

我这之展示性能测试简单的例子,即生产者不停地往Kafka发消息的Demo,复用了上文中的生产者代码,下面只展示执行部分。

execute.ExecuteRoutineTimes(func() {
	_, _, _ := producer.SendMessage(msg)
}, 100, 10)

执行方法ExecuteRoutineTimes是我写的一个基于线程模型的压测执行方法,内容如下:


// ExecuteRoutineTimes
// @Description: FunTester性能测试执行框架
// @param fun 待执行方法
// @param times 次数
// @param routine 线程数
func ExecuteRoutineTimes(fun func(), times, routine int) {
	c := make(chan int) //确认所有线程都结束
	key := false        //用于控制所有线程一起结束
	start := ftool.Milli()
	for i := 0; i < routine; i++ {
		go func() {
			sum := 0
			for i := 0; i < times; i++ {
				if key {
					break
				}
				fun()
				sum++
			}
			key = true
			c <- sum
		}()
	}
	total := 0
	for i := 0; i < routine; i++ {
		num := <-c
		total += num
	}
	end := ftool.Milli()
	diff := end - start
	//total := thread * times
	log.Printf("总耗时: %f", float64(diff)/1000)

	log.Printf("请求总数: %d", total)
	log.Printf("QPS: %f", float64(total)/float64(diff)*1000.0)
}

总结起来,相比Java,Go语言相对简单一些。如果习惯了Go语言的习惯,对于做测试来说上手要比Java快一些。再买个坑,改天测试一下两者之间的性能差异。理论上Go要比Java好一些。

Sarama是一个用于Apache Kafka的Go语言库。Kafka是一个分布式流处理平台,它可以处理大规模的数据流,并将其发布到主题中,供其他应用程序使用。Sarama库允许Go应用程序与Kafka集群进行通信。它支持多个版本的Kafka协议,并提供了生产者和消费者API,以便应用程序可以轻松地将消息发布到Kafka主题或从中读取消息。Sarama还提供了一些有用的工具,如分区选择器和负载平衡器,以帮助开发人员更好地管理Kafka消费者。

标签:Java,err,sarama,Kafka,初探,Go,config
From: https://blog.51cto.com/FunTester/6350076

相关文章

  • Django——中间件
    我们在前面的课程中已经学会了给视图函数加装饰器来判断是用户是否登录,把没有登录的用户请求跳转到登录页面。我们通过给几个特定视图函数加装饰器实现了这个需求。但是以后添加的视图函数可能也需要加上装饰器,这样是不是稍微有点繁琐。学完今天的内容之后呢,我们就可以用更适宜的......
  • django 使用mysql数据库
    1.手动创建mysql数据库,比如xadmincreatedatabasexadmincharset=utf8;2.配置django项目setting.py文件中的数据库设置DATABASES={'default':{'ENGINE':'django.db.backends.mysql',#数据库后端'NAME':'xadmin',......
  • Go语言中的数组以及其相关特性
    在Go语言中,数组是一种固定长度、相同类型元素的序列。可以将数组视为一个盒子,其中每个元素都有自己的位置(索引)和值。数组的长度是在声明时指定的,一旦定义后,其长度将是固定的,不能动态改变。数组的类型由元素类型和长度决定,例如,[5]int表示一个包含5个整数元素的数组。要声明和初始......
  • 【GiraKoo】在U盘中安装Windows11系统(WindowsToGo)
    在U盘中安装Windows11系统(WindowsToGo)本文介绍如何利用Rufus工具,将Windows安装到U盘中。在尝试过多款所谓的WindowsToGo工具,均遇到了无法引导的情况。最终使用Rufus工具成功安装启动。下载RufusRufus是非常棒的U盘格式化,制作启动盘,系统盘的优秀工具。并且当前已经支持WindowsT......
  • 用go封装一下封禁功能
    用go封装一下封禁功能本篇为用go设计开发一个自己的轻量级登录库/框架吧-秋玻-博客园(cnblogs.com)的封禁业务篇,会讲讲封禁业务的实现,给库/框架增加新的功能。源码:https://github.com/weloe/token-go思路封禁业务也是在一般项目中比较常见的业务。我们也将它封装在库中作......
  • GoldenEye靶机
    知识点1.gcc编译和cc编译gcc(GNUCompilerCollection)是一套用于编译C、C++、Ada、Fortran和其他编程语言的编译器。它是由自由软件基金会(FSF)开发的一个自由软件,它支持几乎所有主流的操作系统和硬件平台。gcc支持多种不同的编程语言,并且它是许多操心系统的默认编译器。cc编译器......
  • 1004.Django项目用户功能之关联序列化及访问限流
    一、路由器1.SimpleRouter该路由器包括标准集合——list、create、retrieve、update、partial_update、destroy动作的路由。视图集中还可以使用@detail_route或@list_route装饰器标记要被路由的其他方法;2.DefaultRouter这个路由器类似于上面的SimpleRouter,但是还包括一个默......
  • django之对FileField字段的upload_to的设定
       用django开发,经常要处理用户上传的文件,比如user模型里面如果又个人头像的字段ImageField等等,而django在FielField字段(包括ImageField)的支持和扩展是做的很好的,首先一个问题,是上传的文件,django是放到哪里去了,(note:文件流是不会放到数据库里面的,该字段在数据库中只存储路......
  • hackthebox --aragog
    主机发现与爆破nmap-sT--min-rate1000010.10.10.78nmap-sT-sV-sC-O-p22,21,8010.10.10.78 发现有ftp匿名登陆└─$ftp10.10.10.78Connectedto10.10.10.78.220(vsFTPd3.0.3)Name(10.10.10.78:kali):anonymous230Loginsuccessful.Remotesystemtypeis......
  • tracee源码初探(二)TCP处理流程
    handleEvents(ctx),processNetCaptureEvents(若开启Capture.Net)协程一直常驻,并等待netCapChannel消息通知.当有事件传过来时,程序先看该事件是否需要处理,也就是说tracee是上报所有事件的,然后过滤来处理事件。在tracee.go中的initBPF函数里t.bpfModule.InitPerfBuf( "net_cap_......