首页 > 其他分享 >基于 go-zero 框架的项目中集成 Kafka

基于 go-zero 框架的项目中集成 Kafka

时间:2024-08-09 15:17:38浏览次数:9  
标签:return err nil sarama Kafka zero func go

Kafka 封装

Kafka 集成指南

本文档描述了如何在基于 go-zero 框架的项目中集成 Kafka。

1. 项目结构

在项目中添加以下文件和目录:

└── consts
    └── kafka.go
└── pkg
    └── kafka
        ├── consumer.go
        └── producer.go

2. 常量定义

consts/kafka.go 中定义主题和消费者组:

package consts

const (
    TopicExample1 = "example_topic_1"
    TopicExample2 = "example_topic_2"

    ConsumerGroupExample1 = "example_group_1"
    ConsumerGroupExample2 = "example_group_2"
)

3. 配置

internal/config/config.go 中添加 Kafka 配置:

type Config struct {
    rest.RestConf
    Kafka struct {
        Brokers []string
    }
    // 其他配置...
}

etc/standard-api.yaml 中添加 Kafka 配置:

Kafka:
  Brokers:
    - localhost:9092
# 其他配置...

4. 生产者实现

pkg/kafka/producer.go 中实现生产者:

package kafka

import (
    "github.com/Shopify/sarama"
    "github.com/zeromicro/go-zero/core/logx"
)

type Producer struct {
    producer sarama.SyncProducer
}

func NewProducer(brokers []string) (*Producer, error) {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    producer, err := sarama.NewSyncProducer(brokers, config)
    if err != nil {
        return nil, err
    }
    return &Producer{producer: producer}, nil
}

func (p *Producer) Produce(topic string, message string) error {
    msg := &sarama.ProducerMessage{
        Topic: topic,
        Value: sarama.StringEncoder(message),
    }
    _, _, err := p.producer.SendMessage(msg)
    if err != nil {
        logx.Errorf("Failed to send message: %v", err)
        return err
    }
    return nil
}

func (p *Producer) Close() error {
    return p.producer.Close()
}

5. 消费者实现

pkg/kafka/consumer.go 中实现消费者:

package kafka

import (
    "context"
    "github.com/Shopify/sarama"
    "github.com/zeromicro/go-zero/core/logx"
)

type MessageHandler func(message *sarama.ConsumerMessage) error

type Consumer struct {
    consumer sarama.ConsumerGroup
}

func NewConsumer(brokers []string, groupID string) (*Consumer, error) {
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true
    consumer, err := sarama.NewConsumerGroup(brokers, groupID, config)
    if err != nil {
        return nil, err
    }
    return &Consumer{consumer: consumer}, nil
}

func (c *Consumer) Consume(ctx context.Context, topics []string, handler MessageHandler) error {
    for {
        err := c.consumer.Consume(ctx, topics, &consumerGroupHandler{handler: handler})
        if err != nil {
            logx.Errorf("Error from consumer: %v", err)
            return err
        }
        if ctx.Err() != nil {
            return ctx.Err()
        }
    }
}

func (c *Consumer) Close() error {
    return c.consumer.Close()
}

type consumerGroupHandler struct {
    handler MessageHandler
}

func (h *consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (h *consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for message := range claim.Messages() {
        if err := h.handler(message); err != nil {
            logx.Errorf("Error handling message: %v", err)
        } else {
            session.MarkMessage(message, "")
        }
    }
    return nil
}

6. 服务上下文集成

internal/svc/service_context.go 中初始化 Kafka 生产者:

type ServiceContext struct {
    Config   config.Config
    Producer *kafka.Producer
    // 其他服务...
}

func NewServiceContext(c config.Config) *ServiceContext {
    producer, err := kafka.NewProducer(c.Kafka.Brokers)
    if err != nil {
        logx.Fatalf("Failed to create Kafka producer: %v", err)
    }

    return &ServiceContext{
        Config:   c,
        Producer: producer,
        // 初始化其他服务...
    }
}

7. 在 Logic 层中使用

internal/logic/standard.go 中使用生产者和消费者:

package logic

import (
    "context"
    "your-project/consts"
    "your-project/internal/svc"
    "your-project/internal/types"
    "your-project/pkg/kafka"

    "github.com/zeromicro/go-zero/core/logx"
)

type StandardLogic struct {
    logx.Logger
    ctx    context.Context
    svcCtx *svc.ServiceContext
}

func NewStandardLogic(ctx context.Context, svcCtx *svc.ServiceContext) *StandardLogic {
    return &StandardLogic{
        Logger: logx.WithContext(ctx),
        ctx:    ctx,
        svcCtx: svcCtx,
    }
}

func (l *StandardLogic) Produce(req *types.ProduceRequest) error {
    return l.svcCtx.Producer.Produce(consts.TopicExample1, req.Message)
}

func (l *StandardLogic) StartConsumer() error {
    consumer, err := kafka.NewConsumer(l.svcCtx.Config.Kafka.Brokers, consts.ConsumerGroupExample1)
    if err != nil {
        return err
    }
    defer consumer.Close()

    return consumer.Consume(l.ctx, []string{consts.TopicExample1}, func(message *sarama.ConsumerMessage) error {
        // 处理消息
        l.Infof("Received message: %s", string(message.Value))
        return nil
    })
}

8. 启动消费者

main 函数中启动消费者:

func main() {
    // ... 其他初始化代码 ...

    ctx := svc.NewServiceContext(c)
    
    // 启动消费者
    go func() {
        logic := logic.NewStandardLogic(context.Background(), ctx)
        if err := logic.StartConsumer(); err != nil {
            logx.Errorf("Failed to start consumer: %v", err)
        }
    }()

    // ... 启动 HTTP 服务器 ...
}

这样,您就可以在项目中灵活地使用 Kafka 生产者和消费者了。主题和消费者组可以在 consts/kafka.go 中定义和管理。

标签:return,err,nil,sarama,Kafka,zero,func,go
From: https://www.cnblogs.com/lwhzj/p/18350796

相关文章

  • 基于 go-zero 框架的项目中集成 WebSocket
    WebSocket集成指南本文档描述了如何在基于go-zero框架的项目中集成WebSocket。1.安装依赖首先,安装gorilla/websocket库:gogetgithub.com/gorilla/websocket2.项目结构在项目中添加以下文件和目录:└──pkg└──websocket└──websocket.go3......
  • 编写 Django 单元测试的更优雅的方法
    我目前正在使用Django的单元测试(基于Python标准库模块:unittest)编写测试。我已经为我的Contact模型编写了这个测试,它通过了:classContactTestCase(TestCase):defsetUp(self):"""Createmodelobjects."""Contact.objects.create(nam......
  • 介绍一款新奇的开源操作系统:GodoOS
    在快节奏的现代办公环境中,一款高效、集成化的操作系统无疑是提升工作效率的利器。今天,我们要为您隆重介绍——GodoOS,一款专为内网办公环境设计的全能操作系统。它不仅仅是一个工具,更是您团队协作与文件管理的得力助手,将彻底改变您的工作方式,带来前所未有的便捷体验! 【全能办......
  • KubeSphere 部署 Kafka 集群实战指南
    本文档将详细阐述如何利用Helm这一强大的工具,快速而高效地在K8s集群上安装并配置一个Kafka集群。实战服务器配置(架构1:1复刻小规模生产环境,配置略有不同)主机名IPCPU内存系统盘数据盘用途ksp-registry192.168.9.904840200Harbor镜像仓库ksp-co......
  • 在两个大文件中找出相同的记录,用golang如何写?
    在两个大文件中找出相同的记录,可以使用Golang实现高效的算法。这里主要涉及以下几个步骤:读取文件:逐行读取两个大文件。使用数据结构存储记录:可以使用Go的map数据结构来存储其中一个文件的记录,之后遍历另一个文件,检查其记录是否在map中,若在则记录下该相同记录。输出结果:将......
  • SpringBoot集成Kafka详解
    1、构建项目1.1、引入依赖<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.5.RELEASE</version> </parent> <dependencies> <dependency> ......
  • Django+记账管理系统-计算机毕设定制-附项目源码(可白嫖)50377
    摘 要本文课题研究的记账管理系统,系统的主要功能模块包括记账信息、企业类型、公告信息、公告类型等,采取面对对象的开发模式进行软件的开发和硬体的架设,能很好的满足实际使用的需求,完善了对应的软体架设以及程序编码的工作,采用Django开发框架,MySQL数据库,Ajax异步交互,根据Aj......
  • 【优秀python毕设案例】基于python django的新媒体网络舆情数据爬取与分析
    摘   要如今在互联网时代下,微博成为了一种新的流行社交形式,是体现网络舆情的媒介之一。现如今微博舆论多带有虚假不实、恶意造谣等负面舆论,为了营造更好的网络环境,本设计提出了基于新媒体的网络舆情数据爬取与分析,主要对微博热点话题进行处理。本设计首先以Python为环......
  • OCR即时翻译:DeepL和Google两大翻译接口支持,轻便、高效!
    哈喽,大家好!欢迎来到【程序视点】前言OCR识别是日常工作中常用的功能了。大家最常用的应该就是微信截图的OCR功能了。平时用用还好,一旦遇到复杂的,就无法满足我们的要求了。比如今天要说的OCR翻译软件介绍STranslateSTranslate是一款多功能的OCR即时翻译软件,它提供了一种高......
  • Godot遍历目录下文件,并创建按钮
    想用Godot做一个一站式的文本编辑器核心:funcdir_contents(path): vardir=DirAccess.open(path) varfiles=[] ifdir: dir.list_dir_begin() varfile_name=dir.get_next() whilefile_name!="": ifdir.current_is_dir(): break else: files.......