首页 > 其他分享 >Golang - kafka 的使用

Golang - kafka 的使用

时间:2023-06-20 11:03:06浏览次数:35  
标签:nil err sarama kafka Golang failed 使用 msg log


producer

package main

import (
	"fmt"
	"github.com/Shopify/sarama"
	"log"
	"strconv"
)

const (
	BROKER = "ip:port"
	TOPIC = "xx"
)

// sendMsg 发送到 kfk
func sendMsg(client sarama.SyncProducer, msg *sarama.ProducerMessage) error {
	partID, offset, err := client.SendMessage(msg)
	if err != nil {
		return err
	}

	// 打印信息
	fmt.Printf("pid:%v offset:%v\n", partID, offset)

	return nil
}

// newMsg 返回一个 ProducerMessage 结构体
func newMsg(topic, key, content string) *sarama.ProducerMessage {
	return &sarama.ProducerMessage{
		Topic: topic,
		Key: sarama.StringEncoder(key),
		Value: sarama.StringEncoder(content),
	}
}

// 基于sarama第三方库开发的kafka client
func main() {

	// Client 配置
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
	config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
	config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回

	// 连接kafka, 集群外部使用 8000 端口, 内部使用 9092 端口
	c, err := sarama.NewSyncProducer([]string{BROKER}, config)
	if err != nil {
		log.Fatalf("[create client failed] err: %v", err)
	}
	defer c.Close()

	// 构造消息
	msg := newMsg(TOPIC, "ttt", `{"name":"Tim", "age":"18"}`)

	// 发送单个消息
	if err := sendMsg(c, msg); err != nil {
		log.Fatalf("[send msg failed] err: %v", err)
	}

	// 循环发送消息
	for k, word := range []string{"Welcome11", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
		msg := newMsg("yy", strconv.Itoa(k), word)

		if err := sendMsg(c, msg); err != nil {
			fmt.Printf("[send msg failed] err: %v\n", err)
			continue
		}
	}
}

consumer

package main

import (
	"fmt"
	"github.com/Shopify/sarama"
	"log"
)

const (
	BROKER = "service-epoch-kafka.epoch-kafka.svc.manager.ucbj.kuber.thc:8000"
	TOPIC = "yangkaiyue-test"
)

func main() {

	consumer, err := sarama.NewConsumer([]string{BROKER}, nil)
	if err != nil {
		log.Fatalf("[new consumer failed] err: %v", err)
	}

	// 根据topic取到所有的分区
	partitionList, err := consumer.Partitions(TOPIC)
	if err != nil {
		log.Fatalf("[get partition list failed] err: %v", err)
	}

	// 遍历所有的分区
	for partition := range partitionList {
		// 针对每个分区创建一个对应的分区消费者
		pc, err := consumer.ConsumePartition(TOPIC, int32(partition), sarama.OffsetNewest)
		if err != nil {
			fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
			return
		}
		defer pc.AsyncClose()

		// 异步从每个分区消费信息
		go func(sarama.PartitionConsumer) {
			for msg := range pc.Messages() {
				fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
			}
		}(pc)
	}

	//阻塞进程
	select{}
}


标签:nil,err,sarama,kafka,Golang,failed,使用,msg,log
From: https://blog.51cto.com/u_16165803/6521280

相关文章

  • Dstat 的使用
    概述安装yum-yinstalldstatdstat命令如果不添加参数,会默认添加-cdngy,并会有提示Youdidnotselectanystats,using-cdngybydefault.语法:dstat[-afv][options..][delay][count]参数CPU查看CPU使用率-c,--cpu#查看cpu使用率汇总信息[root@192-168-248-80......
  • Golang - net/http 笔记
    Serverpackagemainimport( "fmt" "log" "net/http")//模拟实现Handler接口typeBarstruct{}func(bBar)ServeHTTP(whttp.ResponseWriter,req*http.Request){ tgt:=req.URL.Query().Get("name") fmt.Fprintf(w,......
  • Golang - 日志
    官方Log包方法输出到logger.out:log.Print(),log.Printf(),log.Println()输出到logger.out,再执行os.Exit(1):log.Fatal(),log.Fatalln(),log.Fatalf()输出到logger.out,再执行panic():log.Panic(),log.Panicln(),log.Panicf()logger结构体typeLoggerstruc......
  • Golang - Structs 包的使用
    packagemain////主要用于struct转map//还可以判断结构体是否有空属性等功能//import( "fmt" "github.com/fatih/structs")//struct-->maptypeStustruct{ Namestring Ageint}funcmain(){ //创建一个Age属性为空的struct实例 u1:=Stu{......
  • 使用信捷PLC和台达变频器之间的通信程序,通过信捷XC3的Modbus通信控制台达VFD-M变频器
    使用信捷PLC和台达变频器之间的通信程序,通过信捷XC3的Modbus通信控制台达VFD-M变频器的正转、反转、加减速和停止。同时,可以使用威纶触摸屏监控变频器的运转频率、电压、电流以及详细的资料,包括参数设置和PLC程序的接线。您无需添加通信扩展模块,因为信捷PLC自带该功能,简单、方便且......
  • Python开发系列课程(7) - 函数和模块的使用
    函数和模块的使用在讲解本章节的内容之前,我们先来研究一道数学题,请说出下面的方程有多少组正整数解。x1+x2+x3+x4=8x1+x2......
  • 规则引擎调研及初步使用
    一、产生的背景生产过程中,线上的业务规则内嵌在系统的各处代码中,每次策略的调整都需要更新线上系统,进行从需求->设计->编码->测试->上线这种长周期的流程,满足不了业务规则的快速变化以及低成本的更新试错迭代。因此需要有一种解决方案将商业决策逻辑和应用开发者的技术决策分离......
  • C++使用ranges库解析INI文件
    C++使用ranges库解析INI文件引言C++20引入了<ranges>头文件,C++23对其进行了完善,本文将使用该头文件提供的adaptor编写一个简单的ini解析器。ini文件格式介绍一般的ini文件由section和entry部分,比如[section]key=value;Thisisentry.section和entry均独占一行,其中sectio......
  • 宝塔面板中使用Mysql命令快速导入大容量数据库
    在宝塔面板中,文件栏目中打开/www/backup/database文件夹,把我们的数据库上传到此文件夹中。(一般导入的数据库都会上传到这个文件夹,方便以后管理)点击旁边的终端,打开宝塔终端界面。输入账号及密码进入终端管理。(如果忘记密码,在软件商店里面找到Linux工具箱1.4里面可以设置......
  • pytorch 使用多GPU训练模型测试出现:TypeError: forward() missing 1 required positio
    转载:https://blog.csdn.net/lingyunxianhe/article/details/119454778?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522168718901716800227455818%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=16871890171680022745......