技术栈
- gin
- gorm
- rabbitmq
数据库表结构:
CREATE TABLE `article` (
`id` int NOT NULL AUTO_INCREMENT,
`article_name` varchar(64) COLLATE utf8mb4_unicode_ci NOT NULL,
`content` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,
`user_id` int NOT NULL,
`desc` varchar(20) COLLATE utf8mb4_unicode_ci NOT NULL,
`status` smallint NOT NULL COMMENT '状态',
`created_at` datetime NOT NULL COMMENT '创建时间',
`updated_at` datetime NOT NULL COMMENT '更新时间',
`deleted_at` datetime DEFAULT NULL COMMENT '删除时间',
PRIMARY KEY (`id`),
KEY `idx_article_deleted_at` (`deleted_at`),
KEY `idx_article_user_id` (`user_id`),
KEY `idx_article_status` (`status`),
KEY `idx_content` (`article_name`,`content`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
向队列中插入数据
err := db.Table("article").Where("id= ?", id).Update(&article).Error
消费数据
db.Transaction(func(tx *gorm.DB) error {
if err := tx.Select("update article set `status`=1 where `id` =?", id).Error; err != nil {
return err
}
return nil
})
具体代码
// ProductMq 生产数据
func (t RabbitMqController) ProductMq(c *gin.Context) {
var entity database.MessageEntity
if err := c.ShouldBindJSON(&entity); err != nil {
c.JSON(400, Response{
Code: 0,
Msg: err.Error(),
Data: nil,
})
return
}
go func() {
defer func() {
if err := recover(); err != nil {
logrus.Errorf("publish message fail :%v ", err)
}
}()
var articles []models.Article
db := database.GetDb()
err := db.Where("status=1").Limit(10).Select("id").Find(&articles).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
logrus.Errorf("article no data err:%v", err.Error())
return
}
logrus.Errorf("database err:%v", err.Error())
return
}
rabbit := database.GetRabbitMqConn()
err = rabbit.Channel.Confirm(false)
if err != nil {
logrus.Errorf("Failed to enable publisher confirms:%v", err.Error())
return
}
confirms := rabbit.Channel.NotifyPublish(make(chan amqp.Confirmation, len(articles)))
//defer rabbit.Close()
for _, data := range articles {
res, err := json.Marshal(data.Id)
if err != nil {
logrus.Errorf("json marshal err:%v", err.Error())
return
}
// TODO: 需要开启confirm模式 手动ack
if err = rabbit.Channel.Publish(entity.Exchange, entity.Key, false, false, amqp.Publishing{
DeliveryMode: amqp.Persistent,
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Priority: entity.Priority,
Body: res,
}); err != nil {
logrus.Errorf("database err:%v", err.Error())
return
}
}
select {
case confirm := <-confirms:
if !confirm.Ack {
logrus.Errorf("Failed delivery of message with body...", confirm)
// 可以在这里实现重试逻辑
return
}
}
}()
c.JSON(200, Response{
Code: 0,
Msg: "ok",
Data: nil,
})
return
}
type ConsumeMqReq struct {
QueueName string `json:"queueName"`
Consumer string `json:"consumer"` // 指定消费者
HasNewConnection bool `json:"hasNewConnection"`
}
// ConsumeMq 消费
func (t RabbitMqController) ConsumeMq(c *gin.Context) {
var req ConsumeMqReq
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, Response{
Code: 0,
Msg: err.Error(),
Data: nil,
})
return
}
redisClient := database.GetRedisClient()
go database.ConsumeMessagesWithAck(req.QueueName, func(body amqp.Delivery) error {
var data int
err := json.Unmarshal(body.Body, &data)
if err != nil {
logrus.Errorf("err:%v", err)
return err
}
// 避免重复消费 可以使用全局唯一字段判断,或者将消费的数据id存入redis
result, err := redisClient.Get(strconv.Itoa(data)).Result()
if !errors.Is(err, redis.Nil) {
logrus.Infof("消息已被消费,忽略 %s", strconv.Itoa(data))
// _ = body.Reject(false)
return nil
}
if len(result) > 0 {
logrus.Infof("redis data: %s", result)
}
if err := updateArticleData(data); err != nil {
logrus.Errorf("updateArticleData err:%v", err)
return err
}
return nil
}, func() {})
c.JSON(200, Response{
Code: 0,
Msg: "ok",
Data: nil,
})
return
}
标签:实战,return,err,nil,RabbitMq,Error,Go,article,id
From: https://www.cnblogs.com/cnzt/p/17911802.html