package mmicro
import (
"encoding/json"
"fmt"
"github.com/go-micro/plugins/v4/broker/rabbitmq"
"go-micro.dev/v4"
"go-micro.dev/v4/broker"
"math/rand"
"sync/atomic"
"time"
)
/*
集成RabbitMQ和Micro框架的小Demo
*/
var RabbitBroker broker.Broker
func SetupRabbitMQ(mqType string, user, password string, host, port string, exchange string) {
rabbitmqURL := fmt.Sprintf("%v://%v:%v@%v:%v/", mqType, user, password, host, port)
fmt.Printf("rabbitmqURL %v\n", rabbitmqURL)
RabbitBroker = rabbitmq.NewBroker(
broker.Addrs(rabbitmqURL),
rabbitmq.ExchangeName(exchange),
rabbitmq.DurableExchange(),
rabbitmq.PrefetchCount(5),
)
}
func init() {
SetupRabbitMQ("amqp", "guest", "guest",
"127.0.0.1", "5672", "amq.topic")
}
func PublishMsg(topic string, data interface{}) {
bytes, _ := json.Marshal(data)
msg := &broker.Message{Body: bytes}
fmt.Printf("msg is %+v\n", string(bytes))
err := RabbitBroker.Publish(topic, msg)
if err != nil {
fmt.Println("publish err:", err)
} else {
fmt.Println("publish msg ok")
}
}
// 限制消费者最多创建的协程数量
var workCh = make(chan struct{}, 50)
// 统计当前的并发数
var cnt atomic.Int64
// 统计当前消费了多少消息
var msgCnt atomic.Int64
// CoProcessMsg 并发方式处理消息
func CoProcessMsg(event broker.Event) error {
msg := event.Message()
var eventTime int64
json.Unmarshal(msg.Body, &eventTime)
fmt.Printf("ProcessMsg data [%v] latency %+vms\n", eventTime, time.Since(time.Unix(eventTime, 0)).Milliseconds())
workCh <- struct{}{}
go func(d int64) {
cnt.Add(1)
cost := rand.Intn(10)
time.Sleep(time.Duration(cost) * time.Second)
msgCnt.Add(1)
fmt.Printf("done: [%v], cost [%vs], msg cnt is %v\n", d, cost, msgCnt.Load())
cnt.Add(-1)
<-workCh
}(eventTime)
fmt.Printf("num is %v\n", cnt.Load())
return nil
}
// ProcessMsg 非并发方式处理消息
func ProcessMsg(event broker.Event) error {
msg := event.Message()
var eventTime int64
json.Unmarshal(msg.Body, &eventTime)
fmt.Printf("ProcessMsg data [%v] latency %+vms\n", eventTime, time.Since(time.Unix(eventTime, 0)).Milliseconds())
n := rand.Intn(10)
time.Sleep(time.Duration(n) * time.Second)
fmt.Printf("ProcessMsg [%v], cost [%vs]\n", eventTime, n)
msgCnt.Add(1)
fmt.Printf("msg cnt is %v\n", msgCnt.Load())
return nil
}
func start() error {
go func() {
// 生产者
for i := 1; i <= 100; i++ {
n := rand.Intn(500)
time.Sleep(time.Millisecond * time.Duration(n))
PublishMsg("test", time.Now().Unix())
}
}()
// 消费者
go func() {
RabbitBroker.Subscribe("test",
// ProcessMsg,
CoProcessMsg,
rabbitmq.DurableQueue(),
broker.Queue("test_queue"),
rabbitmq.AckOnSuccess(),
)
}()
return nil
}
func Main2() {
cnt.Store(0)
srv := micro.NewService(
micro.Broker(RabbitBroker),
micro.AfterStart(start),
)
srv.Init()
srv.Run()
}
标签:string,fmt,broker,RabbitMQ,var,Micro,rabbitmq,Go,msg From: https://www.cnblogs.com/bfstudy2022/p/16979670.html