Kafka 封装
Kafka 集成指南
本文档描述了如何在基于 go-zero 框架的项目中集成 Kafka。
1. 项目结构
在项目中添加以下文件和目录:
└── consts
└── kafka.go
└── pkg
└── kafka
├── consumer.go
└── producer.go
2. 常量定义
在 consts/kafka.go
中定义主题和消费者组:
package consts
const (
TopicExample1 = "example_topic_1"
TopicExample2 = "example_topic_2"
ConsumerGroupExample1 = "example_group_1"
ConsumerGroupExample2 = "example_group_2"
)
3. 配置
在 internal/config/config.go
中添加 Kafka 配置:
type Config struct {
rest.RestConf
Kafka struct {
Brokers []string
}
// 其他配置...
}
在 etc/standard-api.yaml
中添加 Kafka 配置:
Kafka:
Brokers:
- localhost:9092
# 其他配置...
4. 生产者实现
在 pkg/kafka/producer.go
中实现生产者:
package kafka
import (
"github.com/Shopify/sarama"
"github.com/zeromicro/go-zero/core/logx"
)
type Producer struct {
producer sarama.SyncProducer
}
func NewProducer(brokers []string) (*Producer, error) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
return nil, err
}
return &Producer{producer: producer}, nil
}
func (p *Producer) Produce(topic string, message string) error {
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(message),
}
_, _, err := p.producer.SendMessage(msg)
if err != nil {
logx.Errorf("Failed to send message: %v", err)
return err
}
return nil
}
func (p *Producer) Close() error {
return p.producer.Close()
}
5. 消费者实现
在 pkg/kafka/consumer.go
中实现消费者:
package kafka
import (
"context"
"github.com/Shopify/sarama"
"github.com/zeromicro/go-zero/core/logx"
)
type MessageHandler func(message *sarama.ConsumerMessage) error
type Consumer struct {
consumer sarama.ConsumerGroup
}
func NewConsumer(brokers []string, groupID string) (*Consumer, error) {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
consumer, err := sarama.NewConsumerGroup(brokers, groupID, config)
if err != nil {
return nil, err
}
return &Consumer{consumer: consumer}, nil
}
func (c *Consumer) Consume(ctx context.Context, topics []string, handler MessageHandler) error {
for {
err := c.consumer.Consume(ctx, topics, &consumerGroupHandler{handler: handler})
if err != nil {
logx.Errorf("Error from consumer: %v", err)
return err
}
if ctx.Err() != nil {
return ctx.Err()
}
}
}
func (c *Consumer) Close() error {
return c.consumer.Close()
}
type consumerGroupHandler struct {
handler MessageHandler
}
func (h *consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (h *consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
if err := h.handler(message); err != nil {
logx.Errorf("Error handling message: %v", err)
} else {
session.MarkMessage(message, "")
}
}
return nil
}
6. 服务上下文集成
在 internal/svc/service_context.go
中初始化 Kafka 生产者:
type ServiceContext struct {
Config config.Config
Producer *kafka.Producer
// 其他服务...
}
func NewServiceContext(c config.Config) *ServiceContext {
producer, err := kafka.NewProducer(c.Kafka.Brokers)
if err != nil {
logx.Fatalf("Failed to create Kafka producer: %v", err)
}
return &ServiceContext{
Config: c,
Producer: producer,
// 初始化其他服务...
}
}
7. 在 Logic 层中使用
在 internal/logic/standard.go
中使用生产者和消费者:
package logic
import (
"context"
"your-project/consts"
"your-project/internal/svc"
"your-project/internal/types"
"your-project/pkg/kafka"
"github.com/zeromicro/go-zero/core/logx"
)
type StandardLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewStandardLogic(ctx context.Context, svcCtx *svc.ServiceContext) *StandardLogic {
return &StandardLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *StandardLogic) Produce(req *types.ProduceRequest) error {
return l.svcCtx.Producer.Produce(consts.TopicExample1, req.Message)
}
func (l *StandardLogic) StartConsumer() error {
consumer, err := kafka.NewConsumer(l.svcCtx.Config.Kafka.Brokers, consts.ConsumerGroupExample1)
if err != nil {
return err
}
defer consumer.Close()
return consumer.Consume(l.ctx, []string{consts.TopicExample1}, func(message *sarama.ConsumerMessage) error {
// 处理消息
l.Infof("Received message: %s", string(message.Value))
return nil
})
}
8. 启动消费者
在 main
函数中启动消费者:
func main() {
// ... 其他初始化代码 ...
ctx := svc.NewServiceContext(c)
// 启动消费者
go func() {
logic := logic.NewStandardLogic(context.Background(), ctx)
if err := logic.StartConsumer(); err != nil {
logx.Errorf("Failed to start consumer: %v", err)
}
}()
// ... 启动 HTTP 服务器 ...
}
这样,您就可以在项目中灵活地使用 Kafka 生产者和消费者了。主题和消费者组可以在 consts/kafka.go
中定义和管理。