首页 > 其他分享 >golang之异步队列Asynq

golang之异步队列Asynq

时间:2023-10-12 13:12:13浏览次数:35  
标签:异步 asynq nil err Asynq golang task go log

Asynq[1]是一个Go实现的分布式任务队列和异步处理库,基于redis,类似Ruby的sidekiq[2]和Python的celery[3]。Go生态类似的还有machinery[4]和goworker

图片

同时提供一个WebUI asynqmon[5],可以源码形式安装或使用Docker image, 还可以和Prometheus集成

docker run --rm --name asynqmon -p 8080:8080 hibiken/asynqmon,如果使用的是主机上的redis,还需加上 --redis-addr=host.docker.internal:6379,否则会报错[6]

即 docker run --rm --name asynqmon -p 8080:8080 hibiken/asynqmon --redis-addr=host.docker.internal:6379


➜  asynq-demo git:(main) ✗ tree
.
├── client.go
├── const.go
├── go.mod
├── go.sum
└── server.go

0 directories, 5 files

其中const.go:

package main

const (
 redisAddr   = "127.0.0.1:6379"
 redisPasswd = ""
)

const (
 TypeExampleTask    = "shuang:asynq-task:example"
)

client.go:


package main

import (
 "encoding/json"
 "fmt"
 "log"
 "time"

 "github.com/hibiken/asynq"
)

type ExampleTaskPayload struct {
 UserID string
 Msg    string
 // 业务需要的其他字段
}

func NewExampleTask(userID string, msg string) (*asynq.Task, error) {
 payload, err := json.Marshal(ExampleTaskPayload{UserID: userID, Msg: msg})
 if err != nil {
  return nil, err
 }
 return asynq.NewTask(TypeExampleTask, payload), nil
}

var client *asynq.Client

func main() {

 client = asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr, Password: redisPasswd, DB: 0})
 defer client.Close()

 //go startExampleTask()
 startExampleTask()

 //startGithubUpdate() // 定时触发
}

func startExampleTask() {

 fmt.Println("开始执行一次性的任务")
 // 立刻执行
 task1, err := NewExampleTask("10001", "mashangzhixing!")
 if err != nil {
  log.Fatalf("could not create task: %v", err)
 }

 info, err := client.Enqueue(task1)
 if err != nil {
  log.Fatalf("could not enqueue task: %v", err)
 }
 log.Printf("task1 -> enqueued task: id=%s queue=%s", info.ID, info.Queue)

 // 10秒后执行(定时执行)
 task2, err := NewExampleTask("10002", "10s houzhixing")
 if err != nil {
  log.Fatalf("could not create task: %v", err)
 }

 info, err = client.Enqueue(task2, asynq.ProcessIn(10*time.Second))
 if err != nil {
  log.Fatalf("could not enqueue task: %v", err)
 }
 log.Printf("task2 -> enqueued task: id=%s queue=%s", info.ID, info.Queue)

 // 30s后执行(定时执行)
 task3, err := NewExampleTask("10003", "30s houzhixing")
 if err != nil {
  log.Fatalf("could not create task: %v", err)
 }

 theTime := time.Now().Add(30 * time.Second)
 info, err = client.Enqueue(task3, asynq.ProcessAt(theTime))
 if err != nil {
  log.Fatalf("could not enqueue task: %v", err)
 }
 log.Printf("task3 -> enqueued task: id=%s queue=%s", info.ID, info.Queue)
}

server.go:

package main

import (
 "context"
 "encoding/json"
 "fmt"
 "time"

 "github.com/davecgh/go-spew/spew"
 "github.com/hibiken/asynq"
)

var AsynqServer *asynq.Server // 异步任务server

func initTaskServer() error {
 // 初始化异步任务服务端
 AsynqServer = asynq.NewServer(
  asynq.RedisClientOpt{
   Addr:     redisAddr,
   Password: redisPasswd, //与client对应
   DB:       0,
  },
  asynq.Config{
   // Specify how many concurrent workers to use
   Concurrency: 100,
   // Optionally specify multiple queues with different priority.
   Queues: map[string]int{
    "critical": 6,
    "default":  3,
    "low":      1,
   },
   // See the godoc for other configuration options
  },
 )
 return nil
}

func main() {
 initTaskServer()
 mux := asynq.NewServeMux()

 mux.HandleFunc(TypeExampleTask, HandleExampleTask)
 // ...register other handlers...

 if err := AsynqServer.Run(mux); err != nil {
  fmt.Printf("could not run asynq server: %v", err)
 }
}

func HandleExampleTask(ctx context.Context, t *asynq.Task) error {

 res := make(map[string]string)

 spew.Dump("t.Payload() is:", t.Payload())
 err := json.Unmarshal(t.Payload(), &res)
 if err != nil {
  fmt.Printf("rum session, can not parse payload: %s,  err: %v", t.Payload(), err)
  return nil
 }
 //-----------具体处理逻辑------------
 spew.Println("拿到的入参为:", res, "接下来将进行具体处理")
 fmt.Println()
 // 模拟具体的处理
 time.Sleep(5 * time.Second)
 fmt.Println("--------------处理了5s,处理完成-----------------")

 return nil

}

执行redis-server


清除redis中所有的key:


执行docker run --rm --name asynqmon -p 8080:8080 hibiken/asynqmon --redis-addr=host.docker.internal:6379

图片

执行 go run client.go const.go (生产者,产生消息放入队列)

图片

此时能看到redis中多个几个key

图片

同时管理后台能看到队列的信息

图片

执行 go run server.go const.go (消费者,消费队列中的消息)

图片

可以看到都被处理了

图片

此时redis中的key:

图片

此处的业务处理为模拟,实际可能是某个被触发后不需要马上执行的操作

 

相关文档:

 

 

 

 

标签:异步,asynq,nil,err,Asynq,golang,task,go,log
From: https://www.cnblogs.com/xingxia/p/golang_asynq.html

相关文章

  • C++异步定时器设计与实现
    C++异步定时器设计与实现由于目前C++标准中没有现成的定时器,本设计使用C++11相关语法并进行封装。本定时器包含一个TimerManager类用于创建定时器和进行定时任务管理,TimerManager会创建两个线程(mTimerTickThread、mTimerCallbackThread)分别用于时间处理和函数回调。可以使用Ti......
  • golang 反斜杠替换
    难点主要是golang和Java类似双引号定义字符串pythonphp单双引号通吃只是上代码packagemainimport( "fmt" "strings")funcmain(){ str:="+++\\+++" replacedStr:=strings.Replace(str,"\\","",-1) fmt.Println(......
  • springboot启动后异步启动一个程序
    如果你想在SpringBoot启动后异步方式启动一个方法,你可以使用SpringFramework的异步支持和 @Async 注解来实现。以下是如何在SpringBoot应用程序中异步方式启动一个方法的步骤:配置异步支持: 首先,在应用程序的主类上添加 @EnableAsync 注解,以启用异步支持importor......
  • IT技术栈:Golang面试攻略详细总结,有的坑,原来真的可以躲过去
    IT技术栈:Golang面试攻略详细总结,有的坑,原来真的可以躲过去首发2023-10-1017:38·大侠技术栈make与new的异同  相同点:都是用来给变量分配内存的不同点:new一般给值类型的变量,例如:string、int、arr分配内存,make给slice、channel、map等引用类型的变量分配内存返回值......
  • AbortController创建一个可中断的异步任务执行函数---【已解决】
    1、需求背景使用异步操作(promise)或者多个循环时,遇到不能及时中断操作,回收资源时2、代码/***创建一个可中断的异步任务执行函数。*@param{function}taskFunction-要执行的异步任务函数,接受一个AbortSignal参数用于中断。*@returns{object}包含执行任务和中断......
  • golang之gRPC
    相关链接:grpc:https://grpc.io/docs/languages/go/quickstart/ protobuf:https://protobuf.dev/programming-guides/proto3/ protobuf语法: 示例:syntax="proto3";//声明请求参数messageSearchRequest{stringquery=1;int32page=2;int32pag......
  • 多线程使用场景三-异步调用
         ......
  • Golang chan 的实现原理
    Golangchan的实现原理Go语言中的chan(通道)是一种用于在不同的goroutines之间进行通信和同步的重要机制。chan的实现原理涉及到Go语言的运行时系统和底层的数据结构。以下是chan的主要实现原理:底层数据结构:chan的底层数据结构是一个用于存储数据的环形队列(circularqueue)或链......
  • golang map/sync.map 实现
    mapGo中的map是一种高效的散列表(hashtable)实现,它的底层实现细节包括以下重要方面:哈希表(HashTable):map的底层数据结构是一个哈希表。哈希表是一个数组,每个元素都是一个哈希桶,用于存储键值对。哈希函数(HashFunction):Go使用哈希函数将键映射到哈希桶。这个哈希函数是内......
  • 解决 golang 中 grep console 插件不生效问题
    日志多了以后不好找,idea中的神奇grepconsole在goland竟然不好使了,一番查找下,找到了一个解决方案cmd+shift+a找到Registry找到go.run.processes.with.pty,改为false大功告成原贴:https://github.com/krasa/GrepConsole/issues/175......