首页 > 其他分享 >Go接入kafka

Go接入kafka

时间:2023-04-18 19:03:57浏览次数:46  
标签:partitionId err sarama 接入 kafka topic func offset Go

需要借助的库

github.com/Shopify/sarama // kafka主要的库*
github.com/bsm/sarama-cluster // kafka消费组

生产者

package producer

import (
	"fmt"
	"github.com/HappyTeemo7569/teemoKit/tlog"
	"github.com/Shopify/sarama"
	"kafkaDemo/define"
)

var (
	ProducerId = 1
)

type Producer struct {
	Producer   sarama.SyncProducer
	Topic      string //主题
	ProducerID int    //生产者Id
	MessageId  int
}

func (p *Producer) InitProducer() {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
	config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
	config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回

	// 连接kafka
	client, err := sarama.NewSyncProducer([]string{define.SERVER_LIST}, config)
	if err != nil {
		tlog.Error("producer closed, err:", err)
		return
	}

	p.Producer = client
	p.Topic = define.TOPIC
	p.ProducerID = ProducerId
	p.MessageId = 1

	ProducerId++
}

func (p *Producer) SendMessage() {
	// 构造一个消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = p.Topic
	txt := fmt.Sprintf("ProducerID:%d  this is a test log %d",
		p.ProducerID, p.MessageId)
	msg.Value = sarama.StringEncoder(txt)

	// 发送消息
	pid, offset, err := p.Producer.SendMessage(msg)
	//_, _, err := client.SendMessage(msg)
	if err != nil {
		fmt.Println("send msg failed, err:", err)
		return
	}
	tlog.Info(fmt.Sprintf("ProducerID:%d pid:%v offset:%v msg:%s",
		p.ProducerID, pid, offset, txt))

	p.MessageId++
}

func (p *Producer) Close() {
	p.Producer.Close()
}

消费者

package consumer

import (
	"github.com/HappyTeemo7569/teemoKit/tlog"
	"github.com/Shopify/sarama"
	"kafkaDemo/define"
)

type Consumer struct {
	Consumer   sarama.Consumer
	Topic      string
	ConsumerId int //消费者Id
}

func (c *Consumer) InitConsumer() error {
	consumer, err := sarama.NewConsumer([]string{define.SERVER_LIST}, nil)
	if err != nil {
		return err
	}
	c.Consumer = consumer
	c.Topic = define.TOPIC
	c.ConsumerId = ConsumerId
	ConsumerId++
	return nil
}

//指定partition
//offset 可以指定,传-1为获取最新offest
func (c *Consumer) GetMessage(partitionId int32, offset int64) {
	if offset == -1 {
		offset = sarama.OffsetNewest
	}
	pc, err := c.Consumer.ConsumePartition(c.Topic, partitionId, offset)
	if err != nil {
		tlog.Error("failed to start consumer for partition %d,err:%v", partitionId, err)
		//That topic/partition is already being consumed
		return
	}

	// 异步从每个分区消费信息
	go func(sarama.PartitionConsumer) {
		for msg := range pc.Messages() {
			tlog.Info("ConsumerId:%d Partition:%d Offset:%d Key:%v Value:%v", c.ConsumerId, msg.Partition, msg.Offset, msg.Key, string(msg.Value))
		}
	}(pc)
}

//遍历所有分区
func (c *Consumer) GetMessageToAll(offset int64) {
	partitionList, err := c.Consumer.Partitions(c.Topic) // 根据topic取到所有的分区
	if err != nil {
		tlog.Error("fail to get list of partition:err%v", err)
		return
	}
	tlog.Info("所有partition:", partitionList)

	for partition := range partitionList { // 遍历所有的分区
		c.GetMessage(int32(partition), offset)
	}
}

主函数

func main() {
	tlog.Info("开始")

	go producer.Put()
	go consumer.Get()

	for {
		time.Sleep(time.Hour * 60)
	}
}


func Put() {
	producer := new(Producer)
	producer.InitProducer()
	go func() {
		for {
			producer.SendMessage()
			time.Sleep(1 * time.Second)
		}
	}()
}

func Get() {

	offest := int64(0)

	consumer := new(Consumer)
	err := consumer.InitConsumer()
	if err != nil {
		tlog.Error("fail to init consumer, err:%v", err)
		return
	}
	consumer.GetMessageToAll(offest)
}

具体源码可以查看:
kafka_demo

生产环境中的优化

  • 可以存储消费的节点到redis
  • 需要顺序的消费的放到一个partition,或者利用哈希算法投递
  • 传入一个通道,将业务逻辑和底层逻辑解耦。
package kafka

import (
	"crypto/tls"
	"crypto/x509"
	"fmt"
	"github.com/Shopify/sarama"
	"io/ioutil"
	"log"
	"sync"
	"vliao.com/stellar/internal/core"
)

type KafkaConsumer struct {
	Node         []string
	Consumer     sarama.Consumer
	Topic        string
	MessageQueue chan []byte
}

func NewKafkaConsumer(topic string) KafkaConsumer {
	return KafkaConsumer{
		Node:  core.GetKafkaConn().Conn,
		Topic: core.GetServerMode() + "_" + topic,
	}
}

// Consume 获取所有分区
func (c *KafkaConsumer) Consume() {
	config := sarama.NewConfig()

	config.Net.SASL.Enable = true
	config.Net.SASL.User = core.GetKafkaConn().SASLUser
	config.Net.SASL.Password = core.GetKafkaConn().SASLPassword
	config.Net.SASL.Handshake = true

	certBytes, err := ioutil.ReadFile(GetFullPath("only-4096-ca-cert"))
	if err != nil {
		fmt.Println("kafka client read cert file failed ", err.Error())
		return
	}
	clientCertPool := x509.NewCertPool()
	ok := clientCertPool.AppendCertsFromPEM(certBytes)
	if !ok {
		fmt.Println("kafka client failed to parse root certificate")
		return
	}
	config.Net.TLS.Config = &tls.Config{
		RootCAs:            clientCertPool,
		InsecureSkipVerify: true,
	}
	config.Net.TLS.Enable = true

	consumer, err := sarama.NewConsumer(c.Node, config)
	if err != nil {
		log.Fatal("NewConsumer err: ", err)
	}
	defer consumer.Close()

	// 先查询该 topic 有多少分区
	partitions, err := consumer.Partitions(c.Topic)
	if err != nil {
		log.Fatal("Partitions err: ", err)
	}
	var wg sync.WaitGroup
	wg.Add(len(partitions))
	// 然后每个分区开一个 goroutine 来消费
	for _, partitionId := range partitions {
		//不开异步会导致一个消费完才会消费另外一个
		go c.consumeByPartition(consumer, c.Topic, partitionId, &wg)
	}
	wg.Wait()
}

// 暂时只是业务一对一,也就是一个生产者产生的消息不会触发多个业务的变动
// 但是可以开多个消费者增加处理能力
func getOffsetCacheKey(topic string, partitionId int32) string {
	return fmt.Sprintf("kafka_offset_%s_%d", topic, partitionId)
}

func setConsumeOffset(topic string, partitionId int32, offset int64) {
	core.RedisBy(core.RedisTypeServer).SetInt64(getOffsetCacheKey(topic, partitionId), offset)
}
func getConsumeOffset(topic string, partitionId int32) (offset int64) {
	key := getOffsetCacheKey(topic, partitionId)
	if core.RedisBy(core.RedisTypeServer).Exists(key) {
		return core.RedisBy(core.RedisTypeServer).GetInt64(key) + 1
	}

	//默认从最新开始
	setConsumeOffset(topic, partitionId, sarama.OffsetNewest)
	return sarama.OffsetNewest
}

func (c *KafkaConsumer) consumeByPartition(consumer sarama.Consumer, topic string, partitionId int32, wg *sync.WaitGroup) {
	defer wg.Done()
	offset := getConsumeOffset(topic, partitionId)
	partitionConsumer, err := consumer.ConsumePartition(topic, partitionId, offset)
	if err != nil {
		log.Fatal("ConsumePartition err: ", err)
	}
	defer partitionConsumer.Close()
	for message := range partitionConsumer.Messages() {
		log.Printf("[Consumer] topic: %s ; partitionid: %d; offset:%d, value: %s\n", topic, message.Partition, message.Offset, string(message.Value))
		setConsumeOffset(topic, partitionId, message.Offset)
		c.MessageQueue <- message.Value
	}
}
package kafka

import (
	"log"
	"testing"
	"vliao.com/stellar/internal/core"
)

func Test_Get(t *testing.T) {
	core.TestMain()
	topic := "test_log"
	var kafkaConsumer = NewKafkaConsumer(topic)

	kafkaConsumer.MessageQueue = make(chan []byte, 1000)
	go kafkaConsumer.Consume()

	for {
		msg := <-kafkaConsumer.MessageQueue
		deal(msg)
	}

}

func deal(msg []byte) {
	log.Printf(string(msg))
}

当你停下来休息的时候,不要忘记别人还在奔跑!



标签:partitionId,err,sarama,接入,kafka,topic,func,offset,Go
From: https://blog.51cto.com/u_12840595/6203984

相关文章

  • MongoDB4.2中索引的查询计划: 提高查询效率的关键Query Plans
    MongoDB查询优化器处理查询并为给定可用索引的查询选择最有效的查询计划。然后,查询系统在每次运行查询时都使用此查询计划。查询优化器仅缓存那些可以有多个可行计划的查询形状的计划。对于每个查询,查询计划程序在查询计划缓存中搜索适合查询形状的条目。如果没有匹配的条目,则......
  • Go: default print format
      slice,map即使为nil打印格式和空元素的情况一样,chan,func,interface,pointer为nil时,打印<nil>当chan,func,pointer赋值后,都打印地址,interface赋值后,永远打印其dynamicvalue的格式packagemainimport"fmt"typeIinterface{m()}typeT[]bytefu......
  • log4j笔记之Category
    org.apache.log4j.CategoryThisclasshasbeendeprecatedandreplacedbytheLoggersubclass.Itwillbekeptaroundtopreservebackwardcompatibilityuntilmid2003.这个类已经被废弃并且被Logger的子类代替。它将将保留为了保持向后兼容性,直到2003年中期。 Logger......
  • 基于GOA蚱蜢优化算法的KNN分类器最优特征选择matlab仿真
    1.算法仿真效果matlab2022a仿真结果如下:2.算法涉及理论知识概要蝗虫优化算法(GrasshopperOptimizationAlgorithm,GOA)是一种新型的元启发式算法,由Mirjalili等人于2017年提出。该算法受幼虫和成年蝗虫大范围移动与寻找食物源的聚集行为启发,具有操作参数少,公式简单......
  • 基于GOA蚱蜢优化算法的KNN分类器最优特征选择matlab仿真
    1.算法仿真效果matlab2022a仿真结果如下:     2.算法涉及理论知识概要       蝗虫优化算法(GrasshopperOptimizationAlgorithm,GOA)是一种新型的元启发式算法,由Mirjalili等人于2017年提出。该算法受幼虫和成年蝗虫大范围移动与寻找食物源的聚......
  • 【小白新手教程】Ubuntu中安装MongoDB
    本文由葡萄城技术团队于原创并首发转载请注明出处:葡萄城官网,葡萄城为开发者提供专业的开发工具、解决方案和服务,赋能开发者。 分享给小白的操作教程,希望给有需要的人一点帮助。虽然是一个简单的问题,老手可能已经得心应手了,但是作为新手却要研究很久,这里按步骤给大家分享一下如......
  • Django视图类中标准导出Excel文件模版(自用)
    一、导出基类、Excel文件处理和保存importhashlibimportosimporttimeimportxlsxwriterfromapplicationimportsettingsfromapps.web.op_drf.filtersimportDataLevelPermissionsFilterfromapps.web.op_drf.responseimportSuccessResponsefromapps.web.wsys......
  • mongodb 3.x 之实用新功能窥看[1] ——使用TTLIndex做Cache处理
    mongodb一直都在不断的更新,不断的发展,那些非常好玩也非常实用的功能都逐步加入到了mongodb中,这不就有了本篇对ttlindex的介绍,刚好我们的生产业务场景中就有这个一个案例。。。 一:案例分析   我们生产的推荐系统要给用户发送短信和邮件的关联营销......
  • go语言sync.Mutex
    go语言sync.mutex数据结构typeMutexstruct{ stateint32 semauint32}Mutex包含了两个字段,分别是state、sema,state表示了当前锁的状态,sema是用于控制锁的一个信号量。这是一个零值可用的结构体,零值表示未加锁state字段的最低三位表示三种状态,分别是mutexLockedmute......
  • cargo rustc
    RUSTC_BOOTSTRAP=1如果需要在稳定版编译器上执行一些夜间发布功能,可以设置该环境变量。$cargorustc---Zunpretty=expandedCompilingxxxv0.1.0(F:\xxx)error:theoption`Z`isonlyacceptedonthenightlycompiler$RUSTC_BOOTSTRAP=1cargorustc---Zunp......