package main //生产者代码 import ( "fmt" "github.com/IBM/sarama" "time" ) // 基于sarama第三方库开发的kafka client var brokers = []string{"127.0.0.1:9092"} var topic = "hello_kafka0" // 同步消息模式 func syncProducer(config *sarama.Config) { // 连接kafka,使用配置构建一个同步生产者 syncProducer, err := sarama.NewSyncProducer(brokers, config) if err != nil { fmt.Println("syncProducer closed,err:", err) return } defer syncProducer.Close() //构建发送消息 srcValue := "test syncProducer send msg, i = %d" for i := 0; i < 5000; i++ { value := fmt.Sprintf(srcValue, i) msg := &sarama.ProducerMessage{ Topic: topic, Value: sarama.ByteEncoder(value), } // 发送消息,并获取消息存储的分区和偏移 partition, offset, err := syncProducer.SendMessage(msg) if err != nil { fmt.Println("send msg failed,err:", err) return } fmt.Printf("send success, partition:%v offset:%v\n", partition, offset) } } // 异步消息模式 func asyncProducer(config *sarama.Config) { // 连接kafka,使用配置构建一个异步的生产者 asyncProducer, err := sarama.NewAsyncProducer(brokers, config) if err != nil { fmt.Println("asyncProducer closed,err:", err) return } defer asyncProducer.AsyncClose() //异步关闭 fmt.Println("start goroutine...") // 异步发送,因此接收需要先启动协程,从通道中进行接收 go func(producer sarama.AsyncProducer) { for { select { case suc := <-producer.Successes(): fmt.Println("offset: ", suc.Offset, "timestamp:", suc.Timestamp.String(), "partition:", suc.Partition) case fail := <-producer.Errors(): fmt.Println("err: ", fail.Err) } } }(asyncProducer) //每500ms构建一条消息进行发送,注意消息每次都需要重新构建 for i := 0; i < 50; i++ { time.Sleep(500 * time.Millisecond) timeNow := time.Now() value := "this is a message " + timeNow.Format("14:49:05") msg := &sarama.ProducerMessage{ //消息需要每次进行构建 Topic: topic, Value: sarama.ByteEncoder(value), //将字符串转化为字节数组 } asyncProducer.Input() <- msg // 使用通道进行发送 } } func main() { config := sarama.NewConfig() //创建一个sarama的config对象 config.Producer.RequiredAcks = sarama.WaitForAll //发送完数据需要isr中的节点,理解为leader和flower都需要回复确认 config.Producer.Partitioner = sarama.NewRandomPartitioner //新选一个patition //是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用. config.Producer.Return.Errors = true //接收错误 config.Producer.Return.Successes = true //成功交付的消息将在success channel返回 config.Version = sarama.V3_2_0_0 //指定版本 config.Producer.Retry.Max = 10 //最大重试时间 config.Producer.MaxMessageBytes = 32 * 1024 * 1024 // 最大的消息缓冲字节 默认为100*1024*1024 //syncProducer(config) asyncProducer(config) }
消费者代码:
package main import ( "context" "fmt" "github.com/IBM/sarama" "os" "os/signal" "sync" "time" ) // kafka消费者消费消息 var topic string = "hello_kafka0" var brokers = []string{"127.0.0.1:9092"} var topics = []string{"hello_kafka0"} // 普通消费者 func ordinaryConsumer(wg *sync.WaitGroup, groupId string) { defer wg.Done() //计数减1 config := sarama.NewConfig() config.Consumer.Return.Errors = true //是否接收错误 config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange //消费者组的消费策略 config.Consumer.MaxWaitTime = 500 * time.Second //消费者拉取的最大等待时间 config.Version = sarama.V3_2_0_0 config.Consumer.Group.InstanceId = groupId consumer, err := sarama.NewConsumer(brokers, config) if err != nil { fmt.Println("fail to start consumer,err:%v\n", err) return } defer consumer.Close() partitionList, err := consumer.Partitions(topic) //根据topic获取到所有的分区 if err != nil { fmt.Printf("fail to get list of partition:err%v\n", err) return } for partition := range partitionList { //遍历所有的分区 //对每个分区创建一个分区消费者,Offset这里指定为获取所有消息,只获取最新的采用OffsetNewest partConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetOldest) if err != nil { fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err) return } defer partConsumer.AsyncClose() // 方式1、采用for range方式获取,获取完毕就结束 /*go func(sarama.PartitionConsumer) { for msg := range partConsumer.Messages() { fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value)) } }(partConsumer) time.Sleep(3 * time.Second) //延迟主线程,防止协程还没运行*/ // 方式2、采用for select方式获取,一直阻塞等待获取 //信号关闭触发 signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) Loop: for { select { case msg := <-partConsumer.Messages(): fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value)) case err := <-partConsumer.Errors(): fmt.Println(err.Err) case <-signals: break Loop } } } } // 消费者组,ConsumerGroup负责将主题和分区的处理划分为一组进程(consumer组的成员) type consumerGroupHandler struct{} // ConsumerGroupClaim 负责处理来自消费者组中给定主题和分区的Kafka消息 // ConsumerGroupHandler 实例用于处理单个主题/分区声明。 它还为您的消费者组会话生命周期提供钩子,并允许您在消费循环之前或之后触发逻辑。 func (consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (handler consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { fmt.Printf("Message topic:%q partition:%d offset:%d value:%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Value) sess.MarkMessage(msg, "") //标记这条消息已经消费 } return nil } func groupConsumer(wg *sync.WaitGroup, groupId string) { defer wg.Done() config := sarama.NewConfig() config.Version = sarama.V3_2_0_0 config.Consumer.Return.Errors = true consumerGroup, err := sarama.NewConsumerGroup(brokers, groupId, config) if err != nil { fmt.Println("consumerGroup start failed", err) return } defer func() { _ = consumerGroup.Close() }() // 启动协程从错误通道中接收错误信息 go func() { for err := range consumerGroup.Errors() { fmt.Println("ERROR", err) } }() // 迭代消费者会话 ctx := context.Background() //`应该在无限循环中调用Consume,当服务器端重新平衡发生时,需要重新创建consumer会话以获取新的声明 for { handler := consumerGroupHandler{} err := consumerGroup.Consume(ctx, topics, handler) if err != nil { fmt.Println("the Consume failed", err) return } } } func main() { var wg = &sync.WaitGroup{} wg.Add(2) //go ordinaryConsumer(wg, "tt") go groupConsumer(wg, "cc") //通过mark消息已经消费,因此相同消费者组中不会有两个消费者消费到相同的消息 go groupConsumer(wg, "cc") wg.Wait() }
标签:err,sarama,fmt,partition,kafka,golang,例子,msg,config From: https://www.cnblogs.com/fly-fly-fly-fly/p/18220870