首页 > 其他分享 >go连接kafka

go连接kafka

时间:2023-02-13 11:07:13浏览次数:33  
标签:nil err sarama producer kafka Error go config 连接


Part1前言

本文主要介绍如何通过go语言连接kafka。这里采用的是sarama库。​​https://github.com/Shopify/sarama​


go连接kafka_golang

Part2库的安装

go get -u github.com/Shopify/sarama

go get相关定义

参数介绍:
-d 只下载不安装
-f 只有在你包含了 -u 参数的时候才有效,不让 -u 去验证 import 中的每一个都已经获取了,这对于本地 fork 的包特别有用
-fix 在获取源码之后先运行 fix,然后再去做其他的事情
-t 同时也下载需要为运行测试所需要的包
-u 强制使用网络去更新包和它的依赖包
-v 显示执行的命令

Part3生产消息

参考代码​​https://github.com/Shopify/sarama/tree/main/examples/txn_producer​​这里为了提升性能不考虑事务的概念。

1创建消费者

version, err := sarama.ParseKafkaVersion("2.8.1")
if err != nil {
Error(err.Error())
return
}
config := sarama.NewConfig()
config.Version = version
config.Producer.Idempotent = true
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewManualPartitioner
config.Net.MaxOpenRequests = 1

brokers := G_Configjson.KafkaBroker
producer, err := sarama.NewSyncProducer(strings.Split(brokers, ","), config)
if err != nil {
Error(err.Error())
return
}

代码解读:
ParseKafkaVersion :填写当前kafka的版本
Idempotent:kafka 0.11.0.0版本引入了idempotent producer机制,在这个机制中,同一消息可能被producer发送多次,但是在broker端只会写入一次,他为每一条消息编号去重,而且对kafka开销影响不大。需要设置producer端的新参数 enable.idempotent 为true。
WaitForAll:等待所有副本完成提交
MaxOpenRequests:允许有多少未完成请求的发送,这个值越大,性能越好,但是如果Idempotent为false,不能确保消息顺序。
NewSyncProducer:创建生产者,可以有多个brokers。

2发送消息

msg := &sarama.ProducerMessage{Topic: kafkaProduce.sipTopic, Key: nil, Partition: kafkaProduce.sipPartition,
Value: sarama.ByteEncoder(packet.Data()),
}
partition, offset, err := kafkaProduce.sipProduce.SendMessage(msg)
if nil != err {
logger.Error(err.Error())
logger.Info(fmt.Sprintf("Kafka SendMessage partition=%d offset=%d", partition, offset))
}

代码解读:
发送消息需要指定发送的Topic,Partition,以及数据等内容。

Part4消费消息

代码参考​​https://github.com/Shopify/sarama/blob/main/examples/consumergroup/main.go​

3创建消费者

version, err := sarama.ParseKafkaVersion("2.8.1")
if err != nil {
log.Panicf("Error parsing Kafka version: %v", err)
}
config := sarama.NewConfig()
config.Version = version
//设置初始offset
config.Consumer.Offsets.Initial = sarama.OffsetOldest
client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)

代码解读:初始化信息和之前的创建生产者类似,不过通过Offsets.Initial来设置是消费最新的消息还是之前未消费完的消息
NewConsumerGroup:创建消费者

4消费数据

这里需要创建一个struct让其来消费,struct定义如下

type Consumer struct {
}

func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
return nil
}

func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
select {
case message := <-claim.Messages():
log.Printf("Message claimed: timestamp = %v, topic = %s offset=%d", message.Timestamp, message.Topic, message.Offset)
session.MarkMessage(message, "")
case <-session.Context().Done():
return nil
}
}
}

重点函数是ConsumeClaim来消费数据。
之后将struct对象设置给之前创建的消费者,使用示例如下

if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
log.Panicf("Error from consumer: %v", err)
}

Part5总结

本文主要解决go语言通过sarama库来链接kafka进行消费者和生产者的创建。

标签:nil,err,sarama,producer,kafka,Error,go,config,连接
From: https://blog.51cto.com/u_12701820/6053721

相关文章

  • golang 切片 slice
    1.基本介绍切片是数组的一个引用,因此切片是引用类型。切片的使用与数组类似,遍历,访问切片元素等都一样。切片是长度是可以变化的,因此切片可以看做是一个动态数组。slice内......
  • MixGo CE及外接模块管脚简单介绍
    之前用习惯了Arduino系列的板卡,也习惯了上面的管脚使用方法,换成MixGo系列的板卡,都是TypeC接口,这个管脚应该怎么看呢?MixGo系列的外接模块管脚又是怎么看,程序里面应该怎么......
  • 10 Django中间件
    Django中间件中间件介绍django中间件类似于是django的保安,请求来的时候需要先经过中间件才能到达django后端,响应走的时候也需要经过中间件才能到达web服务网关接口dja......
  • 06 Django与Ajax
    Django与Ajax什么是JSONJSON是轻量级的文本数据交换格式,JSON使用JavaScript语法来描述数据对象,但是JSON仍然独立于语言和平台。JSON解析器和JSON库支持许多不......
  • 01 Django简介
    前戏Wsgiref模块封装了socket代码请求来的时候将http数据格式拆封成一个大字典响应走的时候将数据打包成符合http协议要求的数据格式#模块封装了socket代码并将请求......
  • Docker第四章:Dockerfile、微服务、网络连接、compose容器编排、容器监控
    Dockerfile是用来构建Docker镜像的文本文件,是由一条条构建镜像所需的指令和参数构成的脚本、 执行流程1:docker从基础镜像运行一个容器2:执行一条指令并对容器作出修改......
  • kafka如何开启kerberos认证
    参考:     https://www.cnblogs.com/wuyongyin/p/15624452.html kerberos基本原理       https://www.cnblogs.com/wuyongyin/p/15634397.html kerb......
  • goReplay流量回放
    基本原理:goreplay监听一个端口的流量然后记录下来在另一个服务上去回放这个记录下来的流量文件。1先安装go的运行环境2.1go环境搭建tar-C/usr/local-zxvf......
  • Go GRPC之拦截器
    grpc服务端和客户端都提供了interceptor功能,功能类似middleware,很适合在这里处理验证、日志等流程,话不多说直接上代码1.编写helloworld.proto  并用命令生成相应的g......
  • Go Grpc的四种流
    srteam顾名思义就是一种流,可以源源不断的推送数据,很适合传输一些大数据,或者服务端和客户端长时间数据交互,比如客户端可以向服务端订阅一个数据,服务端就......