首页 > 其他分享 >go操作kafka

go操作kafka

时间:2023-08-01 09:35:25浏览次数:41  
标签:err sarama fmt kafka msg go 操作 config

go操作kafka

  1. ZooKeeper是一个分布式协调服务,它的主要作用是为分布式系统提供一致性服务,提供的功能包括:配置维护、命名服务、分布式同步、组服务等。Kafka的运行依赖ZooKeeper。目前kafka3.2.0以上版本(kafka_2.13-3.2.0.tgz)内就包含自带的ZooKeeper,因此直接下载Kafka就行。

  2. 解压Kafka压缩包,放到常用的目录下。打开终端,cd进入到kafka_2.13-3.2.0根目录下。

  3. 由于Kafka的运行依赖ZooKeeper,首先得运行zookeeper,并在后台挂载。

    .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
    
  4. 开启kafka

    .\bin\windows\kafka-server-start.bat .\config\server.properties
    
  5. 生产者代码:

    package main
    
    import (
    	"fmt"
    	"github.com/Shopify/sarama"
    )
    
    // 基于sarama第三方库开发的kafka client
    
    func main() {
    	config := sarama.NewConfig()
    	config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
    	config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
    	config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回
    
    	// 构造一个消息
    	msg := &sarama.ProducerMessage{}
    	msg.Topic = "web_log"
    	msg.Value = sarama.StringEncoder("hhhhhhhh啦啦啦")
    	// 连接kafka
    	client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
    	if err != nil {
    		fmt.Println("producer closed, err:", err)
    		return
    	}
    	defer client.Close()
    	// 发送消息
    	pid, offset, err := client.SendMessage(msg)
    	if err != nil {
    		fmt.Println("send msg failed, err:", err)
    		return
    	}
    	fmt.Printf("pid:%v offset:%v\n", pid, offset)
    }
    
    
  6. 消费者代码:

    package main
    
    import (
    	"fmt"
    	"github.com/Shopify/sarama"
    	"sync"
    )
    
    func consumer() {
    	var wg sync.WaitGroup
    	consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
    	if err != nil {
    		fmt.Println("Failed to start consumer: %s", err)
    		return
    	}
    	partitionList, err := consumer.Partitions("web_log") //获得该topic所有的分区
    	if err != nil {
    		fmt.Println("Failed to get the list of partition:, ", err)
    		return
    	}
    
    	for partition := range partitionList {
    		pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
    		if err != nil {
    			fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)
    			return
    		}
    		wg.Add(1)
    		fmt.Println(pc.Messages())
    		go func(sarama.PartitionConsumer) { //为每个分区开一个go协程去取值
    			// OffsetNewest从最新的地方开始 ,OffsetOldest是从头开始,一般选择OffsetNewest
    			for msg := range pc.Messages() { //阻塞直到有值发送过来,然后再继续等待
    				fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
    			}
    			defer pc.AsyncClose()
    			wg.Done()
    		}(pc)
    	}
    	wg.Wait()
    }
    func main() {
    	consumer()
    }
    
    

标签:err,sarama,fmt,kafka,msg,go,操作,config
From: https://www.cnblogs.com/luoyingfenfei/p/17595602.html

相关文章

  • 《字符串篇》string类进行转换等操作
    C++中的string类用法简介原文链接:https://blog.csdn.net/liitdar/article/details/80498634概述string是C++标准库的一个重要的部分,主要用于字符串处理。c_str(),string转换为char*//方法一:使用c_str()方法,代码(stringsimple.cpp)如下:#include<string>#include<iostream......
  • sqlite命令操作数据库
    CommandLineShellForSQLite一、sqlite命令操作数据库保证模拟器打开状态1.cmd命令行进入android安装目录我的是D:\android-sdk-windows-1.5_r1\tools2.adbshell命令打开模拟器上的命令行界面3.#cd/data/data进入数据库目录4.#cdorg.yihu5.#ls;列出目录下所有文件6.#......
  • Day 7: Shell数组和循环操作
    学习目标学习内容1.数组的定义和赋值2.数组的遍历for循环遍历数组长度3.练习任务大树哥个人信息学习目标学习Shell中的数组和循环操作。掌握数组的定义、遍历和操作方法。练习编写脚本,使用数组存储和处理数据。学习内容今天我们将学习Shell中的数组和循环操作,这......
  • 【go语言】3.1.2 接口的定义和实现
    在Go中,接口是一种抽象类型,用来描述其他类型应该有哪些方法。它定义了一组方法,但没有实现。这些方法由其他类型实现。接口的定义接口定义的格式如下:typeInterfaceNameinterface{Method1(param1type1,param2type2)returntype1Method2(param1type1,param2ty......
  • mp3文件后128字节歌曲信息读写操作
    参照网上相关资料,实现向mp3文件后128字节写入指定数据,并读取出来.项目中为向MP3文件写入歌曲信息,如歌手、专辑、歌名等.第一步:创建关键类SongInfopublicclassSongInfo{privateStringTAG="TAG";//文件头1-3privateStringsongName="歌名";//歌曲名4-33priva......
  • WSL 常见操作
    WSL数据迁移:wsl--shutdown#关机wsl--exportUbuntu-20.04E:\WSL-Ubuntu-20.04#导出数据到文件wsl--unregisterUbuntu-20.04#注销原来的发行版wsl--importUbuntu-20.04E:\WSL-Ubuntu\E:\wsl-Ubuntu-20.04--version2#导入数据ubuntu2004config--defa......
  • 6、Mysql操作数据库以及数据表
    学习sql规则,可以让mysql服务器帮咱们做其他操作1、操作数据库(文件夹)createdatabase数据库名defaultcharsetutf8;表示整个数据库是utf8的格式 use数据库名;使用这个数据库 查看数据库showdatabases; 删除数据库dropdatabase数据库名;数据库没有修改这一说......
  • openGauss数据库常用操作命令
    1.以操作系统用户omm登录数据库主节点su-omm1.1启动服务分布式openGauss:gs_om-tstart启动服务gs_om-trestart重启服务集中式openGauss:gs_om-tstop关闭服务gs_om-tstart启动服务1.2使用“gs_om-tstatus–detail”命令查询openGauss各实例状......
  • MongoDB数据库的部署和应用
    推荐步骤:在Centos01上部署MongoDB服务器客户端登录验证在centos01的MongoDB配置文件通过配置文件控制MongoDB服务,配置MongoDB身份验证在centos01的MongoDB服务器配置身份验证管理和修改配置文件支持验证在centos01管理MongoDB管理数据,集合批量数据管理实验步骤创建管理MongoDB组和......
  • Django 动态操作model
    fromdjango.appsimportappsforoinoids_result:oid_result=snmp.snmpWalk(o.oid)ifo.model_filed_nameandlen(o.model_filed_name)>0:field_name=o.model_filed_name.get('field_......