首页 > 其他分享 >Go使用asynq

Go使用asynq

时间:2023-12-02 15:12:05浏览次数:19  
标签:task err fmt asynq time func 使用 Go

asynq是基于reids的队列,支持多种形式

消费者

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/hibiken/asynq"
)

// HandleMsg 处理msg
func HandleMsg(ctx context.Context, t *asynq.Task) error {
	//fmt.Println("------HandleMsg start------")

	log.Printf("type: %v, payload: %s", t.Type(), string(t.Payload()))

	return nil
}

func main() {
	// asynq server
	srv := asynq.NewServer(
		asynq.RedisClientOpt{
			Addr: "192.168.252.128:6379",
			DB:   0,
		},
		asynq.Config{Concurrency: 20},
	)

	mux := asynq.NewServeMux()

	// some middlewares
	mux.Use(func(next asynq.Handler) asynq.Handler {
		return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
			// just record a log
			fmt.Println(fmt.Printf("[%s] log - %v", time.Now().Format("2006-01-02 15:04:05"), string(t.Payload())))

			return next.ProcessTask(ctx, t)
		})
	})

	// some workers
	mux.HandleFunc("msg", HandleMsg)

	// start server
	if err := srv.Start(mux); err != nil {
		log.Fatalf("could not start server: %v", err)
	}

	// Wait for termination signal.
	c := make(chan os.Signal, 1)

	signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

	for {
		s := <-c
		switch s {
		case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT:
			fmt.Println("Program Exit...", s)
			srv.Shutdown()
			srv.Stop()
			return
		default:
			fmt.Println("other signal", s)
		}
	}
}

测试生产者

package main

import (
	"encoding/json"
	"fmt"
	"os"
	"testing"
	"time"

	"github.com/hibiken/asynq"
)

var c *asynq.Client

func TestMain(m *testing.M) {
	r := asynq.RedisClientOpt{
		Addr: "192.168.252.128:6379",
		DB:   0,
	}
	c = asynq.NewClient(r)
	ret := m.Run()
	c.Close()
	os.Exit(ret)
}

// 即时消费
func Test_Enqueue(t *testing.T) {
	payload := map[string]interface{}{"user_id": 1, "message": "i'm immediately message"}
	bytes, err := json.Marshal(payload)
	if err != nil {
		fmt.Println("转换失败:", err)
		return
	}
	task := asynq.NewTask("msg", bytes)
	res, err := c.Enqueue(task)
	if err != nil {
		t.Errorf("could not enqueue task: %v", err)
		t.FailNow()
	}
	fmt.Printf("Enqueued Result: %+v\n", res)
}

// 延时消费
func Test_EnqueueDelay(t *testing.T) {
	payload := map[string]interface{}{"user_id": 1, "message": "i'm delay 5 seconds message"}
	bytes, err := json.Marshal(payload)
	if err != nil {
		fmt.Println("转换失败:", err)
		return
	}
	task := asynq.NewTask("msg", bytes)
	res, err := c.Enqueue(task, asynq.ProcessIn(5*time.Second))
	// res, err := c.Enqueue(task, asynq.ProcessAt(time.Now().Add(5*time.Second)))
	if err != nil {
		t.Errorf("could not enqueue task: %v", err)
		t.FailNow()
	}
	fmt.Printf("Enqueued Result: %+v\n", res)
}

// 超时、重试、过期
func Test_EnqueueOther(t *testing.T) {
	payload := map[string]interface{}{"user_id": 1, "message": "i'm delay,try,timeout 5 seconds message"}
	bytes, err := json.Marshal(payload)
	if err != nil {
		fmt.Println("转换失败:", err)
		return
	}
	task := asynq.NewTask("msg", bytes)
	// 10秒超时,最多重试3次,20秒后过期
	res, err := c.Enqueue(task, asynq.MaxRetry(3), asynq.Timeout(10*time.Second), asynq.Deadline(time.Now().Add(20*time.Second)))
	if err != nil {
		t.Errorf("could not enqueue task: %v", err)
		t.FailNow()
	}
	fmt.Printf("Enqueued Result: %+v\n", res)
}

标签:task,err,fmt,asynq,time,func,使用,Go
From: https://www.cnblogs.com/qcy-blog/p/17871622.html

相关文章

  • vue3使用富文本编辑器wangEditor 5,增加自定义下拉框,并动态改变下拉框内容
    官方资料wangEditor官网效果展示准备工作这里按照wangEditor官网提供的Vue3Demo操作就行,下面的内容可以直接跳过安装yarnadd@wangeditor/editor#或者npminstall@wangeditor/editor--saveyarnadd@wangeditor/editor-for-vue@next#或者npminstall@w......
  • 在OI类竞赛中经常使用的C++STL模板类
    vector变长数组vector的初始化vector<int>a;//定义一个空的vector,且元素类型为intvector<int>a(n);//定义一个长度为n,元素类型为int的vector,且每个元素都是0vector<int>a(n,x);//定义一个长度为n,元素类型为int,且每个元素都是x的vectorvector<int>b(a);//定义一......
  • Java 8 仍被广泛使用,占比 50%
    调查中,更多的开发人员选择在生产中使用Java17,而不是Java11。Docker逐渐成为打包Web应用程序的首选,且Spring和SpringBoot的使用率遥遥领先。具体而言,开发者最常使用的 Java版本是 Java8,占比高达 50%;其次分别是 Java17(45%)、Java11(38%)以及 Java20(11%)......
  • Mongoose介绍
    官网Mongoose.js中文网(mongoosejs.net)基本使用安装最新的是mongoose8.0.0版本,基于Promise,以前的版本是基于回调函数。npmnpmimongooseyarnyarnaddmongoose使用以mongoose8.0.0举例://1-引入mongooseconstmongoose=require("mongoose");//2-连接......
  • Java流Stream使用详解(中)
    一、Stream流的中间方法名称说明Stream<T> filter(Predicate<?superT> predicate)过滤Stream<T> limit(longmaxSize)获取前几个元素Stream<T> skip(longn)跳过前几个元素Stream<T> distinct()元素去重,依赖(hashCode和equals方法)static<T> Stream<T> concat(Stream......
  • 使用Navicat For MSSQL连接绿色版SQLServer2008R2问题解决
    问题1、创建连接时出现错误:[IM002][Microsoft][ODBC驱动程序管理器]未发现数据源名称并且未指定默认驱动程序(0)Navicat来连接SQLserver,这里确实有点麻烦,出现错误[IM002][Microsoft][ODBC驱动程序管理器]未发现数据源名称并且未指定默认驱动程序(0),解决方法:进入Navicat的安装......
  • 使用极限网关助力 ES 集群无缝升级、迁移上/下云
    在工作中大家可能会遇到以下这些场景:自建ES集群需要平滑迁移到XX云;从XX云将ES集群迁移到自建机房;ES集群进行跨版本升级,同时保留回退能力;这些场景往往都还有个共同的需求:迁移过程要保证业务的最小停机时间。幸运的是,在这三个场景中,我们都能使用极限网关来帮助我们......
  • Linux 下使用命令将图片反色
    #单张图片反色convert-negateimage.pngimage_ne.png#单张图片反色(替换)convert-negateimage.pngimage.png#单张图片反色,修复格式不兼容convertimage.pngimage.png&&convert-negateimage.pngimage_ne.png#单张图片反色,修复格式不兼容(替换)convertimage.p......
  • 拓数派受邀参加由Google举办的“深度探索 LLM / Generative AI的生态与应用”主题活动
    大语言模型(LLM)可谓是当下国内科创界最热门的话题。近日,拓数派创始人兼CEO冯雷(RayVon)受邀参加由Google举办的“深度探索LLM/GenerativeAI的生态与应用”主题活动,与现场嘉宾共话科技行业发展新趋势。图为:活动现场照片在圆桌讨论环节中,冯雷与主持人及几位创业公司高管,进行了一场......
  • HarmonyOS之ArkTS-常用基本数据类型及使用
    ArtTS基本数据类型:包括number、string、boolean、array、枚举类型、unknown等number:数字类型,在程序中定义一个变量指定类型一定要小写number      看了截图大家肯定有点疑惑为什么变量后面要加一个;number这就是TS的缘故,这样是为了防止后面发生变异(可被用来放......