首页 > 其他分享 >延迟消息队列beanstalkd

延迟消息队列beanstalkd

时间:2022-08-31 19:24:55浏览次数:88  
标签:队列 tube ready job 2207011 beanstalkd root 延迟

beanstalkd是简单、快速的工作队列!

Beanstalkd安装

安装它的v1.12版本

[root@2207011 resource]#  wget https://github.com/beanstalkd/beanstalkd/archive/refs/tags/v1.12.tar.gz
[root@2207011 resource]# tar -zxvf v1.12.tar.gz
[root@2207011 resource]# cd beanstalkd-1.12
[root@2207011 beanstalkd-1.12]# make install PREFIX=/usr/local/beanstalkd-1.1
[root@2207011 beanstalkd-1.12]# ln -s /usr/local/beanstalkd-1.12/ /usr/local/beanstalkd
[root@2207011 beanstalkd-1.12]# cd /usr/local/beanstalkd
[root@2207011 beanstalkd]# ./bin/beanstalkd -h
[root@2207011 beanstalkd]# ./bin/beanstalkd & # 后台运行服务
[root@2207011 beanstalkd]# ps -ef | grep beanstalkd | grep -v grep
root      4367  4144  0 10:25 pts/0    00:00:00 ./bin/beanstalkd

最终看到了beanstalkd进程,安装完成。

 

Beanstalkd工作原理

我们可以在beanstalkd工作队列服务上创建各种工作job,然后给相应的消费者去预定job处理。

job 的生命周期

Client 使用 put 命令创建一个工作任务 job. 在整个生命周期中 job 可能有四个工作状态:ready、reserved、delayed、buried. 在 put 操作之后,一个 job 的典型状态是 ready,在 ready 队列中,它将等待一个 worker 取出此 job 并设置为其为 reserved 状态. worker占有此 job 并执行,当 job 执行完毕,worker 可以发送一个 delete 指令删除此 job.

Status Description
ready 等待被取出并处理
reserved 如果 job 被 worker 取出,将被此 worker 预订,worker 将执行此 job
delayed 等待特定时间之后,状态再迁移为 ready 状态
buried 等待唤醒,通常在 job 处理失败时进入该状态

job 典型的生命周期

   put            reserve               delete
  -----> [READY] ---------> [RESERVED] --------> *poof*

job 可能的状态迁移

   put with delay               release with delay
  ----------------> [DELAYED] <------------.
                        |                   |
                 kick   | (time passes)     |
                        |                   |
   put                  v     reserve       |       delete
  -----------------> [READY] ---------> [RESERVED] --------> *poof*
                       ^  ^                |  |
                       |   \  release      |  |
                       |    `-------------'   |
                       |                      |
                       | kick                 |
                       |                      |
                       |       bury           |
                    [BURIED] <---------------'
                       |
                       |  delete
                        `--------> *poof*

Tubes

一个 beanstalkd 实例服务可能有一个或者多个 tube,用来储存统一类型的 job.每个 tube 由一个就绪 (ready) 队列与延迟 (delayed) 队列组成.每个 job 所有的状态迁移在一个 tube 中完成.通过发送 watch 指令, 消费者 consumers 可以监控感兴趣的 tube.通过发送 ignore 指令, 消费者 consumers 可以取消监控 tube.通过 list-tubes-watched命令返回所有监控的 tubes,当客户端预订 (reserved) 一个 job,此 job 可能来自任何一个它监控的 tube.

当一个客户端连接上服务器时,客户端监控的 tube 默认为 defaut,如果客户端提交 job 时,没有使用 use 命令,那么这些 job 就存于名为default的 tube 中.

tube 按需求创建,无论他们在什么时候被引用到.如果一个 tube 变为空(即 no ready jobs,no delayed jobs,no buried jobs)和没有任何客户端引用(being watched),它将会被自动删除.

 

应用示例:

使用pheanstalk4.0.4的php客户端测试发布延迟工作

$beanstalkd = Pheanstalk::create('beanstalkd服务地址');
// 在success管道上创建一个延迟6s的工作
$beanstalkd->useTube('success')->put("content: cooking dinner, create time: " . date('Y-m-d H:i:s'), 1024, 6, 3);
// 消费该工作
$beanstalkd->watch('success');
while (true) {
    echo sprintf("time: %s\n", date('Y-m-d H:i:s'));
    try {
        // 每秒预定一次工作(实际这里是堵塞1s 根据实际情况设置)
        $job = $beanstalkd->reserveWithTimeout(1);
        if ($job) {
            echo sprintf("time: %s, do job\n%s\n", date('Y-m-d H:i:s'), $job->getData());
            // ...处理工作
            // 处理完工作删除它
            $beanstalkd->useTube('success')->delete($job);
        }
    } catch (\Throwable $e) {
        echo sprintf("demo exception: %s\n" . $e->getMessage());
    }
}

 

标签:队列,tube,ready,job,2207011,beanstalkd,root,延迟
From: https://www.cnblogs.com/xiaoxiaobug/p/16555436.html

相关文章

  • RabbitMQ 入门系列:9、扩展内容:死信队列:真不适合当延时队列。
    系列目录RabbitMQ入门系列:1、MQ的应用场景的选择与RabbitMQ安装。RabbitMQ入门系列:2、基础含义:链接、通道、队列、交换机。RabbitMQ入门系列:3、基础含义:持久化、......
  • MyBatis-分步查询的优势(延迟加载)
    分步查询的优点:可以实现延迟加载  但是必须在核心配置文件中设置全局配置信息(mybatis-config.xml):lazyLoadingEnabled:延迟加载的全局开关。当开启时,所有关联对象都会延......
  • Docker-Compose 安装 RabbitMQ 并安装延迟队列插件 rabbitmq-delayed-message-exchang
    1、Docker-Compose 安装RabbitMQdocker-compose.ymlversion:'3'services:rabbitmq:restart:alwaysimage:rabbitmq:3.8.5-managementcontainer_......
  • 树的难题 BJOI2017 点分治 单调队列
    P3714[BJOI2017]树的难题没时间码先口胡。明显有一个n^2的暴力。可以拿到20分。链的情况也非常容易一个简单的单调队列就可以解决当然可以暴力的采用线段树。这样......
  • redis zset 延迟合并任务处理
    rediszset延迟合并任务处理@AutowiredpublicRedisTemplateredisTemplate;##1.发送端:在接口中收集任务ID,累计时间段之后,合并处理。##rediszset主键,任......
  • 队列
    什么是队列(Queue)?队列(queue)是一种采用先进先出(FIFO,firstinfirstout)策略的抽象数据结构。比如生活中排队,总是按照先来的先服务,后来的后服务。队列在数据结构中举足轻重,其......
  • 厚积薄发--一文带您了解阿里云 RocketMQ 轻量版消息队列(MNS)
    作者:周新宇&陈涛&李凯阿里云RocketMQ轻量版(MNS)消息队列是一个轻量、可靠、可扩展且完全托管的分布式消息队列服务。MNS能够帮助应用开发者在他们应用的分布式组件上更......
  • 如何解决消息队列的延时以及过期失效问题?
    如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?面试官心理分析你看这问法,其实本质针对的场景,都是说,......
  • 共享栈和双端队列
    一、算法设计思想1.ABCD顺序入栈,任意时刻出栈,共多少种排列(Catalan数:(1/n+1)·C2nn)       一定不存在这种情况:i<j<k,Str[i]>Str[k]>Str[j]。只需要在全排列的基......
  • 如何保证消息队列的高可用?
    如何保证消息队列的高可用?面试官心理分析如果有人问到你MQ的知识,高可用是必问的。上一讲提到,MQ会导致系统可用性降低。所以只要你用了MQ,接下来问的一些要点肯......