producer
package main
import (
"fmt"
"github.com/Shopify/sarama"
"log"
"strconv"
)
const (
BROKER = "ip:port"
TOPIC = "xx"
)
// sendMsg 发送到 kfk
func sendMsg(client sarama.SyncProducer, msg *sarama.ProducerMessage) error {
partID, offset, err := client.SendMessage(msg)
if err != nil {
return err
}
// 打印信息
fmt.Printf("pid:%v offset:%v\n", partID, offset)
return nil
}
// newMsg 返回一个 ProducerMessage 结构体
func newMsg(topic, key, content string) *sarama.ProducerMessage {
return &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(key),
Value: sarama.StringEncoder(content),
}
}
// 基于sarama第三方库开发的kafka client
func main() {
// Client 配置
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
// 连接kafka, 集群外部使用 8000 端口, 内部使用 9092 端口
c, err := sarama.NewSyncProducer([]string{BROKER}, config)
if err != nil {
log.Fatalf("[create client failed] err: %v", err)
}
defer c.Close()
// 构造消息
msg := newMsg(TOPIC, "ttt", `{"name":"Tim", "age":"18"}`)
// 发送单个消息
if err := sendMsg(c, msg); err != nil {
log.Fatalf("[send msg failed] err: %v", err)
}
// 循环发送消息
for k, word := range []string{"Welcome11", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
msg := newMsg("yy", strconv.Itoa(k), word)
if err := sendMsg(c, msg); err != nil {
fmt.Printf("[send msg failed] err: %v\n", err)
continue
}
}
}
consumer
package main
import (
"fmt"
"github.com/Shopify/sarama"
"log"
)
const (
BROKER = "service-epoch-kafka.epoch-kafka.svc.manager.ucbj.kuber.thc:8000"
TOPIC = "yangkaiyue-test"
)
func main() {
consumer, err := sarama.NewConsumer([]string{BROKER}, nil)
if err != nil {
log.Fatalf("[new consumer failed] err: %v", err)
}
// 根据topic取到所有的分区
partitionList, err := consumer.Partitions(TOPIC)
if err != nil {
log.Fatalf("[get partition list failed] err: %v", err)
}
// 遍历所有的分区
for partition := range partitionList {
// 针对每个分区创建一个对应的分区消费者
pc, err := consumer.ConsumePartition(TOPIC, int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
return
}
defer pc.AsyncClose()
// 异步从每个分区消费信息
go func(sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
}
}(pc)
}
//阻塞进程
select{}
}