首页 > 其他分享 >Go项目实战—RabbitMq篇

Go项目实战—RabbitMq篇

时间:2024-07-09 12:10:33浏览次数:21  
标签:实战 return err nil RabbitMq Error Go article id

技术栈

  • gin
  • gorm
  • rabbitmq

数据库表结构:

CREATE TABLE `article` (
  `id` int NOT NULL AUTO_INCREMENT,
  `article_name` varchar(64) COLLATE utf8mb4_unicode_ci NOT NULL,
  `content` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,
  `user_id` int NOT NULL,
  `desc` varchar(20) COLLATE utf8mb4_unicode_ci NOT NULL,
  `status` smallint NOT NULL COMMENT '状态',
  `created_at` datetime NOT NULL COMMENT '创建时间',
  `updated_at` datetime NOT NULL COMMENT '更新时间',
  `deleted_at` datetime DEFAULT NULL COMMENT '删除时间',
  PRIMARY KEY (`id`),
  KEY `idx_article_deleted_at` (`deleted_at`),
  KEY `idx_article_user_id` (`user_id`),
  KEY `idx_article_status` (`status`),
  KEY `idx_content` (`article_name`,`content`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci

向队列中插入数据

image


err := db.Table("article").Where("id= ?", id).Update(&article).Error

消费数据

image
image


db.Transaction(func(tx *gorm.DB) error {
  if err := tx.Select("update article set `status`=1 where `id` =?", id).Error; err != nil {
    return err
  }
  return nil
})

具体代码

// ProductMq 生产数据
func (t RabbitMqController) ProductMq(c *gin.Context) {

	var entity database.MessageEntity
	if err := c.ShouldBindJSON(&entity); err != nil {
		c.JSON(400, Response{
			Code: 0,
			Msg:  err.Error(),
			Data: nil,
		})
		return
	}

	go func() {
		defer func() {
			if err := recover(); err != nil {
				logrus.Errorf("publish message fail :%v ", err)
			}
		}()
		var articles []models.Article
		db := database.GetDb()
		err := db.Where("status=1").Limit(10).Select("id").Find(&articles).Error
		if err != nil {
			if errors.Is(err, gorm.ErrRecordNotFound) {
				logrus.Errorf("article no data err:%v", err.Error())
				return
			}
			logrus.Errorf("database err:%v", err.Error())
			return
		}
		rabbit := database.GetRabbitMqConn()
		err = rabbit.Channel.Confirm(false)
		if err != nil {
			logrus.Errorf("Failed to enable publisher confirms:%v", err.Error())
			return
		}
		confirms := rabbit.Channel.NotifyPublish(make(chan amqp.Confirmation, len(articles)))
		//defer rabbit.Close()
		for _, data := range articles {
			res, err := json.Marshal(data.Id)
			if err != nil {
				logrus.Errorf("json marshal err:%v", err.Error())
				return
			}
			// TODO: 需要开启confirm模式 手动ack
			if err = rabbit.Channel.Publish(entity.Exchange, entity.Key, false, false, amqp.Publishing{
				DeliveryMode:    amqp.Persistent,
				Headers:         amqp.Table{},
				ContentType:     "text/plain",
				ContentEncoding: "",
				Priority:        entity.Priority,
				Body:            res,
			}); err != nil {
				logrus.Errorf("database err:%v", err.Error())
				return
			}
		}
		select {
		case confirm := <-confirms:
			if !confirm.Ack {
				logrus.Errorf("Failed delivery of message with body...", confirm)
				// 可以在这里实现重试逻辑
				return
			}
		}
	}()

	c.JSON(200, Response{
		Code: 0,
		Msg:  "ok",
		Data: nil,
	})
	return
}

type ConsumeMqReq struct {
	QueueName        string `json:"queueName"`
	Consumer         string `json:"consumer"` // 指定消费者
	HasNewConnection bool   `json:"hasNewConnection"`
}

// ConsumeMq 消费
func (t RabbitMqController) ConsumeMq(c *gin.Context) {

	var req ConsumeMqReq
	if err := c.ShouldBindJSON(&req); err != nil {
		c.JSON(400, Response{
			Code: 0,
			Msg:  err.Error(),
			Data: nil,
		})
		return
	}
	redisClient := database.GetRedisClient()
	go database.ConsumeMessagesWithAck(req.QueueName, func(body amqp.Delivery) error {
		var data int
		err := json.Unmarshal(body.Body, &data)
		if err != nil {
			logrus.Errorf("err:%v", err)
			return err
		}
		// 避免重复消费 可以使用全局唯一字段判断,或者将消费的数据id存入redis
		result, err := redisClient.Get(strconv.Itoa(data)).Result()
		if !errors.Is(err, redis.Nil) {
			logrus.Infof("消息已被消费,忽略 %s", strconv.Itoa(data))
			// _ = body.Reject(false)
			return nil
		}
		if len(result) > 0 {
			logrus.Infof("redis data: %s", result)
		}
		if err := updateArticleData(data); err != nil {
			logrus.Errorf("updateArticleData err:%v", err)
			return err
		}
		return nil
	}, func() {})

	c.JSON(200, Response{
		Code: 0,
		Msg:  "ok",
		Data: nil,
	})
	return
}

标签:实战,return,err,nil,RabbitMq,Error,Go,article,id
From: https://www.cnblogs.com/cnzt/p/17911802.html

相关文章

  • Go 中空结构体的用法,我帮你总结全了!
    Go中空结构体的用法,我帮你总结全了!原创 江湖十年 Go编程世界 2024年06月05日07:51 浙江 4人听过在Go语言中,空结构体 struct{} 是一个非常特殊的类型,它不包含任何字段并且不占用任何内存空间。虽然听起来似乎没什么用,但空结构体在Go编程中实际上有着广泛的应......
  • Java版Flink使用指南——定制RabbitMQ数据源的序列化器
    大纲新建工程新增依赖数据对象序列化器接入数据源测试修改Slot个数打包、提交、运行工程代码在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们从RabbitMQ队列中读取了字符串型数据。如果我们希望读取的数据被自动化转换为一个对象,则需要定制序......
  • 实战篇——XSS漏洞xss-labs-master靶场实战一
    实战篇——XSS漏洞xss-labs-master靶场实战(1)XSS的分类(1)反射型XSS攻击者通过电子邮件等方式将恶意链接发送给目标用户。当目标用户点击该链接时,服务器接收该目标用户的请求并把带有恶意脚本的页面发送给目标用户的浏览器,浏览器解析页面时就会执行恶意脚本。(2)存储型XSS......
  • SNN Algorithm
    SpikingNeuralNetworkAlgorithmSpikingNeuralNetworks(SNNs)areatypeofneuralnetworkthataimtomorecloselymimicthebehaviorofbiologicalneuronscomparedtotraditionalartificialneuralnetworks.ThekeydifferenceisthatSNNsusespike-ba......
  • YC311A [ 20240701 CQYC省选模拟赛 T1 ] 好串(good)
    题意给定一个长度为\(n\)的\(01\)串。定义一个串是好的当且仅当该串的所有前缀以及所有后缀的\(1\)的数量大于等于\(0\)的数量。你需要维护\(q\)个查询,每次求\(S_{l,...,r}\)的子串最少添加的\(1\)的个数使得该子串是好的。Sol首先不难发现一个正确的贪心,也......
  • vk-data-goods-sku-popup uniapp vue3示例
    效果图组件简介vk-data-goods-sku-popup是一个uniapp上面方便好用的sku组件,使用场景包括但不限于商品详情页、购物车页面、订单结算页、搜索结果页下面就上代码了,完整vue页面的代码如下<scriptsetup>import{ref,defineEmits,defineProps,computed}from'vue'//显示......
  • go 使用websocket
    packagechatimport( "encoding/json" "github.com/gorilla/websocket" "github.com/zeromicro/go-zero/core/logx" "log" "net/http" "sync")typeClientstruct{ conn*websocket.Conn......
  • google adsense verify失败
    现象Wecouldn'tverifyyoursite.MakesurethechangesyoumadetoyoursitearepublishedandaccessiblebytheGoogleAdSensecrawler.Ifyou’restillhavingissuestryanothermethod.背景笔者网站接入googleadsense时接入方式选择Ads.txt接入方式,新增Ad......
  • HarmonyOS NEXT开发实战:Navigation页面跳转对象传递案例
    介绍本示例主要介绍在使用Navigation实现页面跳转时,如何在跳转页面得到转入页面传的类对象的方法。实现过程中使用了第三方插件class-transformer,传递对象经过该插件的plainToClass方法转换后可以直接调用对象的方法,效果图预览使用说明从首页进入本页面时,会传递一个类对......
  • HarmonyOS NEXT开发实战:发布图片评论案例
    介绍本示例将通过发布图片评论场景,介绍如何使用startAbilityForResult接口拉起相机拍照,并获取相机返回的数据。效果图预览使用说明通过startAbilityForResult接口拉起相机,拍照后获取图片地址。实现思路创建CommentData类,实现IDataSource接口的对象,用于评论列表使用Lazy......