首页 > 其他分享 >【Go系列】集成RabbitMQ和Micro框架

【Go系列】集成RabbitMQ和Micro框架

时间:2022-12-13 19:22:37浏览次数:45  
标签:string fmt broker RabbitMQ var Micro rabbitmq Go msg

package mmicro

import (
    "encoding/json"
    "fmt"
    "github.com/go-micro/plugins/v4/broker/rabbitmq"
    "go-micro.dev/v4"
    "go-micro.dev/v4/broker"
    "math/rand"
    "sync/atomic"
    "time"
)

/*
集成RabbitMQ和Micro框架的小Demo
*/

var RabbitBroker broker.Broker

func SetupRabbitMQ(mqType string, user, password string, host, port string, exchange string) {
    rabbitmqURL := fmt.Sprintf("%v://%v:%v@%v:%v/", mqType, user, password, host, port)
    fmt.Printf("rabbitmqURL %v\n", rabbitmqURL)
    RabbitBroker = rabbitmq.NewBroker(
        broker.Addrs(rabbitmqURL),
        rabbitmq.ExchangeName(exchange),
        rabbitmq.DurableExchange(),
        rabbitmq.PrefetchCount(5),
    )
}

func init() {
    SetupRabbitMQ("amqp", "guest", "guest",
        "127.0.0.1", "5672", "amq.topic")
}

func PublishMsg(topic string, data interface{}) {
    bytes, _ := json.Marshal(data)
    msg := &broker.Message{Body: bytes}
    fmt.Printf("msg is %+v\n", string(bytes))
    err := RabbitBroker.Publish(topic, msg)
    if err != nil {
        fmt.Println("publish err:", err)
    } else {
        fmt.Println("publish msg ok")
    }
}

// 限制消费者最多创建的协程数量
var workCh = make(chan struct{}, 50)

// 统计当前的并发数
var cnt atomic.Int64

// 统计当前消费了多少消息
var msgCnt atomic.Int64

// CoProcessMsg 并发方式处理消息
func CoProcessMsg(event broker.Event) error {
    msg := event.Message()
    var eventTime int64
    json.Unmarshal(msg.Body, &eventTime)
    fmt.Printf("ProcessMsg data [%v] latency %+vms\n", eventTime, time.Since(time.Unix(eventTime, 0)).Milliseconds())
    workCh <- struct{}{}
    go func(d int64) {
        cnt.Add(1)
        cost := rand.Intn(10)
        time.Sleep(time.Duration(cost) * time.Second)
        msgCnt.Add(1)
        fmt.Printf("done: [%v], cost [%vs], msg cnt is %v\n", d, cost, msgCnt.Load())
        cnt.Add(-1)
        <-workCh
    }(eventTime)
    fmt.Printf("num is %v\n", cnt.Load())
    return nil
}

// ProcessMsg 非并发方式处理消息
func ProcessMsg(event broker.Event) error {
    msg := event.Message()
    var eventTime int64
    json.Unmarshal(msg.Body, &eventTime)
    fmt.Printf("ProcessMsg data [%v] latency %+vms\n", eventTime, time.Since(time.Unix(eventTime, 0)).Milliseconds())
    n := rand.Intn(10)
    time.Sleep(time.Duration(n) * time.Second)
    fmt.Printf("ProcessMsg [%v], cost [%vs]\n", eventTime, n)
    msgCnt.Add(1)
    fmt.Printf("msg cnt is %v\n", msgCnt.Load())
    return nil
}

func start() error {
    go func() {
        // 生产者
        for i := 1; i <= 100; i++ {
            n := rand.Intn(500)
            time.Sleep(time.Millisecond * time.Duration(n))
            PublishMsg("test", time.Now().Unix())
        }
    }()

    // 消费者
    go func() {
        RabbitBroker.Subscribe("test",
            // ProcessMsg,
            CoProcessMsg,
            rabbitmq.DurableQueue(),
            broker.Queue("test_queue"),
            rabbitmq.AckOnSuccess(),
        )
    }()
    return nil
}

func Main2() {
    cnt.Store(0)
    srv := micro.NewService(
        micro.Broker(RabbitBroker),
        micro.AfterStart(start),
    )
    srv.Init()
    srv.Run()
}

 

标签:string,fmt,broker,RabbitMQ,var,Micro,rabbitmq,Go,msg
From: https://www.cnblogs.com/bfstudy2022/p/16979670.html

相关文章