已到时间的消息转移到 ready
err := q.pending2Ready()
if err != nil {
return err
}
// 循环调用 ready2Unack 拉取消息进行消费
ids := make([]string, 0, q.fetchLimit)
for {
idStr, err := q.ready2Unack()
if err == redis.Nil { // consumed all
break
}
if err != nil {
return err
}
ids = append(ids, idStr)
if q.fetchLimit > 0 && len(ids) >= int(q.fetchLimit) {
break
}
}
if len(ids) > 0 {
q.batchCallback(ids)
}
// 将 nack 或超时的消息放入重试队列
err = q.unack2Retry()
if err != nil {
return err
}
// 清理已达到最大重试次数的消息
err = q.garbageCollect()
if err != nil {
return err
}
// 消费重试队列
ids = make([]string, 0, q.fetchLimit)
for {
idStr, err := q.retry2Unack()
if err == redis.Nil { // consumed all
break
}
if err != nil {
return err
}
ids = append(ids, idStr)
if q.fetchLimit > 0 && len(ids) >= int(q.fetchLimit) {
break
}
}
if len(ids) > 0 {
q.batchCallback(ids)
}
return nil
}
至此一个简单可靠的延时队列就做好了,现在就开始试用吧
标签:return,nil,err,队列,ids,可靠,fetchLimit,idStr,延迟 From: https://www.cnblogs.com/Leo_wl/p/16783264.html