首页 > 其他分享 >Go接入kafka

Go接入kafka

时间:2023-03-27 17:22:20浏览次数:48  
标签:err Producer sarama 接入 kafka msg func Go Consumer

需要借助的库

github.com/Shopify/sarama // kafka主要的库*
github.com/bsm/sarama-cluster // kafka消费组

生产者

package producer

import (
	"fmt"
	"github.com/HappyTeemo7569/teemoKit/tlog"
	"github.com/Shopify/sarama"
	"kafkaDemo/define"
)

var (
	ProducerId = 1
)

type Producer struct {
	Producer   sarama.SyncProducer
	Topic      string //主题
	ProducerID int    //生产者Id
	MessageId  int
}

func (p *Producer) InitProducer() {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
	config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
	config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回

	// 连接kafka
	client, err := sarama.NewSyncProducer([]string{define.SERVER_LIST}, config)
	if err != nil {
		tlog.Error("producer closed, err:", err)
		return
	}

	p.Producer = client
	p.Topic = define.TOPIC
	p.ProducerID = ProducerId
	p.MessageId = 1

	ProducerId++
}

func (p *Producer) SendMessage() {
	// 构造一个消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = p.Topic
	txt := fmt.Sprintf("ProducerID:%d  this is a test log %d",
		p.ProducerID, p.MessageId)
	msg.Value = sarama.StringEncoder(txt)

	// 发送消息
	pid, offset, err := p.Producer.SendMessage(msg)
	//_, _, err := client.SendMessage(msg)
	if err != nil {
		fmt.Println("send msg failed, err:", err)
		return
	}
	tlog.Info(fmt.Sprintf("ProducerID:%d pid:%v offset:%v msg:%s",
		p.ProducerID, pid, offset, txt))

	p.MessageId++
}

func (p *Producer) Close() {
	p.Producer.Close()
}

消费者

package consumer

import (
	"github.com/HappyTeemo7569/teemoKit/tlog"
	"github.com/Shopify/sarama"
	"kafkaDemo/define"
)

type Consumer struct {
	Consumer   sarama.Consumer
	Topic      string
	ConsumerId int //消费者Id
}

func (c *Consumer) InitConsumer() error {
	consumer, err := sarama.NewConsumer([]string{define.SERVER_LIST}, nil)
	if err != nil {
		return err
	}
	c.Consumer = consumer
	c.Topic = define.TOPIC
	c.ConsumerId = ConsumerId
	ConsumerId++
	return nil
}

//指定partition
//offset 可以指定,传-1为获取最新offest
func (c *Consumer) GetMessage(partitionId int32, offset int64) {
	if offset == -1 {
		offset = sarama.OffsetNewest
	}
	pc, err := c.Consumer.ConsumePartition(c.Topic, partitionId, offset)
	if err != nil {
		tlog.Error("failed to start consumer for partition %d,err:%v", partitionId, err)
		//That topic/partition is already being consumed
		return
	}

	// 异步从每个分区消费信息
	go func(sarama.PartitionConsumer) {
		for msg := range pc.Messages() {
			tlog.Info("ConsumerId:%d Partition:%d Offset:%d Key:%v Value:%v", c.ConsumerId, msg.Partition, msg.Offset, msg.Key, string(msg.Value))
		}
	}(pc)
}

//遍历所有分区
func (c *Consumer) GetMessageToAll(offset int64) {
	partitionList, err := c.Consumer.Partitions(c.Topic) // 根据topic取到所有的分区
	if err != nil {
		tlog.Error("fail to get list of partition:err%v", err)
		return
	}
	tlog.Info("所有partition:", partitionList)

	for partition := range partitionList { // 遍历所有的分区
		c.GetMessage(int32(partition), offset)
	}
}


主函数

func main() {
	tlog.Info("开始")

	go producer.Put()
	go consumer.Get()

	for {
		time.Sleep(time.Hour * 60)
	}
}


func Put() {
	producer := new(Producer)
	producer.InitProducer()
	go func() {
		for {
			producer.SendMessage()
			time.Sleep(1 * time.Second)
		}
	}()
}

func Get() {

	offest := int64(0)

	consumer := new(Consumer)
	err := consumer.InitConsumer()
	if err != nil {
		tlog.Error("fail to init consumer, err:%v", err)
		return
	}
	consumer.GetMessageToAll(offest)
}


具体源码可以查看:
kafka_demo

标签:err,Producer,sarama,接入,kafka,msg,func,Go,Consumer
From: https://www.cnblogs.com/HappyTeemo/p/17262261.html

相关文章

  • ChatGPT接入微信 C#完整源码
    1.无需搭建服务器2.winform运行程序扫码登录,就可以充当机器人调用chatGPT可实现自动回复(可以识别会话消息和群聊消息,拉入群聊@机器人可以进行群聊的消息回复),可以申请小......
  • Google+ Android版本更新绚丽界面 支持手机Hangout群聊
    继 Google+ 升级iOS版本之后,Android版本也进行了重大更新,增加了Hangout 视频聊天功能;新的Google+ Android 2.6版本也拥有了与iOS版同样炫目的界面。现在用户可......
  • Google 血泪收购史:1/3的收购是失败的
    Google企业发展部门的副总裁DavidLawee今早在TCDisruptNYC上跟MGSiegler进行了面对面交流,坦诚Google近1/3的收购是失败的,“自2003年起我们进行了120起收购,其中2/3的......
  • Google因Google+错失收购Twitter良机?
    Facebook最近可谓风光无限,先是成为有史以来上市时市值最高的公司,接着是扎克伯格想低调完婚都不行,依旧吸引了全世界的目光。而这边,作为Facebook的竞争对手,Google+的日子是真......
  • Google赶在WWDC前发布3D地图 对苹果“先发制人”
    TNW刚刚收到Google邀请,Google说6月6号即将有一个Google地图演示,这个事情相当滑稽,因为WWDC马上就要来了,而苹果被指望在大会上介绍自己的3D地图,但木已成舟,我们只能期望Google......
  • pwn学习笔记-ROP和hijack GOT
    前情提要修改返回地址,让其指向溢出数据中的一段指令(shellcode)修改返回地址,让其指向内存中已有的某个函数(return2libc)修改返回地址,让其指向内存中已有的一段指令(ROP)修......
  • 亚马逊收购3D地图初创公司UpNext 欲与苹果和Google一分天下
    大家都知道,在使用KindleFire时如果想查看地图,必须下载第三方地图应用或通过浏览器访问在线地图服务,十分不方便。而为了打造功能更为全面的产品,亚马逊在周一宣布收购3D地图......
  • Google推短信收发邮件服务Gmail SMS 造福非洲人民
    Google刚刚在非洲推出了一项新的邮件服务GmailSMS,即用户可以通过手机内置的短信功能(SMS)收发Gmail邮件。该服务将首先在加纳、尼日利亚和肯尼亚三国试用。这一举措意味着,......
  • 在开源RunnerGo 中体验高效的性能测试解决方案
    性能测试是软件质量保障的关键环节之一,性能测试可以评估应用的可靠性、稳定性和响应时间。然而,性能测试通常需要大量的时间和资源,因此需要使用高效的性能测试工具来减少测试......
  • 2023最新微信公众号免费接入饿了么外卖优惠券教程
    今年3月份淘宝联盟下线了饿了么外卖CPS活动,在我的圈子里很多推客都说饿了么外卖CPS不能做了。作为一个有着七八年经验的老推客,凭借我的经验一般平台都喜欢用pub、union、......