首页 > 其他分享 >Go操作Kafka

Go操作Kafka

时间:2024-03-21 16:45:56浏览次数:34  
标签:err sarama fmt Kafka msg Go 操作 kafka

目录

一、Go操作之kafka

  • Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展等特点。本文介绍了如何使用Go语言发送和接收kafka消息。

二、sarama

1. 下载及安装

go get github.com/Shopify/sarama

2. 注意事项

  • sarama v1.20之后的版本加入了zstd压缩算法,需要用到cgo,在Windows平台编译时会提示类似如下错误:
# github.com/DataDog/zstd
exec: "gcc":executable file not found in %PATH%

- 所以在Windows平台请使用v1.19版本的sarama

三、连接使用kafka

1. 发送消息

package main
 
import (
	"fmt"
 
	"github.com/Shopify/sarama"
)
 
// 基于sarama第三方库开发的kafka client
 
func main() {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
	config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
	config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回
 
	// 构造一个消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = "web_log"
	msg.Value = sarama.StringEncoder("this is a test log")
	// 连接kafka
	client, err := sarama.NewSyncProducer([]string{"192.168.1.7:9092"}, config)
	if err != nil {
		fmt.Println("producer closed, err:", err)
		return
	}
	defer client.Close()
	// 发送消息
	pid, offset, err := client.SendMessage(msg)
	if err != nil {
		fmt.Println("send msg failed, err:", err)
		return
	}
	fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

2. 消费消息

package main
 
import (
	"fmt"
 
	"github.com/Shopify/sarama"
)
 
// kafka consumer
 
func main() {
	consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
	if err != nil {
		fmt.Printf("fail to start consumer, err:%v\n", err)
		return
	}
	partitionList, err := consumer.Partitions("web_log") // 根据topic取到所有的分区
	if err != nil {
		fmt.Printf("fail to get list of partition:err%v\n", err)
		return
	}
	fmt.Println(partitionList)
	for partition := range partitionList { // 遍历所有的分区
		// 针对每个分区创建一个对应的分区消费者
		pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
		if err != nil {
			fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
			return
		}
		defer pc.AsyncClose()
		// 异步从每个分区消费信息
		go func(sarama.PartitionConsumer) {
			for msg := range pc.Messages() {
				fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
			}
		}(pc)
	}
}

标签:err,sarama,fmt,Kafka,msg,Go,操作,kafka
From: https://www.cnblogs.com/Mcoming/p/18080315

相关文章

  • python操作kafka
    目录一、python操作kafka1.python使用kafka生产者2.python使用kafka消费者3.使用docker中的kafka二、python操作kafka细节2.1生产者demo2.2消费者demo2.3消费者(消费群组)2.4消费者(读取目前最早可读的消息)2.5消费者(手动设置偏移量)2.6消费者(订阅多个主题)......
  • Go+Gin 接口防止用户频繁访问
    在Go+Gin框架中,可以利用中间件实现API防洪(防止用户频繁访问)功能。一种常见的防洪技术是利用Go的漏桶算法或令牌桶算法进行流量控制。以下面的Go代码为例,演示了一个简单的令牌桶方式的中间件:packagemainimport("net/http""time""github.com/gin-gonic/gin"......
  • Kafka集群部署
    目录Kafka集群部署1.1服务器资源1.1.1安装JDK(所有设备)1.1.2配置ip和主机名映射(所有服务器)(可不做)1.1.3配置主机名(所有设备)(可不做)1.2在node1上安装、配置kafka1.2.1安装kafka1.2.2修改配置文件1.2.2.1修改zookeeper.properties1.2.2.2配置Zookeeper的id1.2.2.3......
  • ElasticSearch中使用ik分词器进行实现分词操作
    简介:在默认的情况下,ES中只存在Stander分词器,但是这个分词器往往不满足我们的分词需求,这里通过ik分词器进行自定义我们的分词操作1、第一步将ik分词器进行下载下载地址:https://github.com/medcl/elasticsearch-analysis-ik需要注意,需要选择和自己的ES版本对应的版本2、将ik分词......
  • Step by Step Data Replication Using Oracle GoldenGate
    1、Quickstarts2、ConfigureDeployments3、ManageDeploymentsfromtheServiceManager 4、ConfigureDataReplicationProcessesfromtheAdministrationService 5、ConfigurePathstoTransportTraiData 6、MonitorPathsandTrailsfromtheReceiver......
  • 操作系统综合题之“用记录型信号量机制的wait和signal操作来解决了由北向南和由南向北
    1.问题:假设系统有三个并发进程read、move和print共享缓冲区B1和B2。进程read负责从输入设备上读取信息,每读取一条记录后把它存如缓冲区B1中;进程move负责从缓冲区B1中取出一条记录,整理后放入缓冲区B2;进程print负责将缓冲区B2中的记录取出并打印输出。缓冲区B1和B2每次只能存放1个......
  • 从零开始学Spring Boot系列-集成Kafka
    Kafka简介ApacheKafka是一个开源的分布式流处理平台,由LinkedIn公司开发和维护,后来捐赠给了Apache软件基金会。Kafka主要用于构建实时数据管道和流应用。它类似于一个分布式、高吞吐量的发布-订阅消息系统,可以处理消费者网站的所有动作流数据。这种动作流数据包括页面浏览、搜索......
  • 【软考】关系代数篇(基础操作、关系公式、各种连接)
    【软考】关系代数篇一、关系代数简介二、五个基本运算1、选择(Selection):2、投影(Projection):3、连接(Join):4、并(Union):5、差(Difference):三、其他操作和表达式以及结果集1、笛卡尔积(CartesianProduct):2、交集(Intersection):3、除法(Division):4、自然连接(NaturalJoin):5、全连接(FullO......
  • Zookeeper+Kafka单节点部署
    一.Zookeeper部署启动1.下载zookeeperApacheZooKeeper(http://zookeeper.apache.org/releases.html) 2.文件上传通过Xftp软件上传到Linux系统中若在外部解压可直接传入相应的文件地址,若未解压则传入压缩包在Linux系统中进行解压操作tar-zxvfzookeeper-3.9.2.tar.gz......
  • 使用元类实现Django的ORM
    一、ORM基本介绍ORM是python编程语言后端web框架Django的核心思想,“ObjectRelationalMapping”,即对象-关系映射,简称ORM。二、实现目标创建一个实例对象,用创建它的类名当做数据表名,用创建它的类属性对应数据表的字段,当对这个实例对象操作时,能够对应MySQL语句如图:三、......