首页 > 数据库 >Apscheduler 使用Redis集群做为任务存储

Apscheduler 使用Redis集群做为任务存储

时间:2024-02-28 09:46:26浏览次数:22  
标签:jobs redis self Redis Apscheduler job 集群 key run

背景

由于原生的apscheduler仅支持redis单节点连接,不支持redis集群,所以本人基于原生的RedisJobStore自己修改了一个专门用于连接redis集群的类RedisClusterJobStore
修改点有以下内容:

  1. 修改类名RedisJobStoreRedisClusterJobStore
  2. 将原始导入的redis替换为rediscluster
  3. 将原始代码中的pipe.multi()注释掉,此处注释掉是否会有其他的影响暂未测试

文件

# encoding:utf8
from __future__ import absolute_import
from datetime import datetime

from pytz import utc
import six

from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError
from apscheduler.util import datetime_to_utc_timestamp, utc_timestamp_to_datetime
from apscheduler.job import Job

try:
    import cPickle as pickle
except ImportError:  # pragma: nocover
    import pickle

try:
    # from redis import Redis
    from rediscluster import RedisCluster # 将原本的redis库替换为rediscluster
except ImportError:  # pragma: nocover
    raise ImportError('RedisJobStore requires redis installed')


class RedisClusterJobStore(BaseJobStore):
    """
    Stores jobs in a Redis database. Any leftover keyword arguments are directly passed to redis's
    :class:`~redis.StrictRedis`.

    Plugin alias: ``redis``

    :param int db: the database number to store jobs in
    :param str jobs_key: key to store jobs in
    :param str run_times_key: key to store the jobs' run times in
    :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the
        highest available
    """

    def __init__(self, db=0, jobs_key='apscheduler.jobs', run_times_key='apscheduler.run_times',
                 pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args):
        super(RedisClusterJobStore, self).__init__()

        if db is None:
            raise ValueError('The "db" parameter must not be empty')
        if not jobs_key:
            raise ValueError('The "jobs_key" parameter must not be empty')
        if not run_times_key:
            raise ValueError('The "run_times_key" parameter must not be empty')

        self.pickle_protocol = pickle_protocol
        self.jobs_key = jobs_key
        self.run_times_key = run_times_key
        # self.redis = Redis(db=int(db), **connect_args)
        self.redis = RedisCluster(**connect_args)

    def lookup_job(self, job_id):
        job_state = self.redis.hget(self.jobs_key, job_id)
        return self._reconstitute_job(job_state) if job_state else None

    def get_due_jobs(self, now):
        timestamp = datetime_to_utc_timestamp(now)
        job_ids = self.redis.zrangebyscore(self.run_times_key, 0, timestamp)
        if job_ids:
            job_states = self.redis.hmget(self.jobs_key, *job_ids)
            return self._reconstitute_jobs(six.moves.zip(job_ids, job_states))
        return []

    def get_next_run_time(self):
        next_run_time = self.redis.zrange(self.run_times_key, 0, 0, withscores=True)
        if next_run_time:
            return utc_timestamp_to_datetime(next_run_time[0][1])

    def get_all_jobs(self):
        job_states = self.redis.hgetall(self.jobs_key)
        jobs = self._reconstitute_jobs(six.iteritems(job_states))
        paused_sort_key = datetime(9999, 12, 31, tzinfo=utc)
        return sorted(jobs, key=lambda job: job.next_run_time or paused_sort_key)

    def add_job(self, job):
        if self.redis.hexists(self.jobs_key, job.id):
            raise ConflictingIdError(job.id)

        with self.redis.pipeline() as pipe:
            # pipe.multi()
            pipe.hset(self.jobs_key, job.id, pickle.dumps(job.__getstate__(),
                                                          self.pickle_protocol))
            if job.next_run_time:
                pipe.zadd(self.run_times_key,
                          {job.id: datetime_to_utc_timestamp(job.next_run_time)})

            pipe.execute()

    def update_job(self, job):
        if not self.redis.hexists(self.jobs_key, job.id):
            raise JobLookupError(job.id)

        with self.redis.pipeline() as pipe:
            pipe.hset(self.jobs_key, job.id, pickle.dumps(job.__getstate__(),
                                                          self.pickle_protocol))
            if job.next_run_time:
                pipe.zadd(self.run_times_key,
                          {job.id: datetime_to_utc_timestamp(job.next_run_time)})
            else:
                pipe.zrem(self.run_times_key, job.id)

            pipe.execute()

    def remove_job(self, job_id):
        if not self.redis.hexists(self.jobs_key, job_id):
            raise JobLookupError(job_id)

        with self.redis.pipeline() as pipe:
            pipe.hdel(self.jobs_key, job_id)
            pipe.zrem(self.run_times_key, job_id)
            pipe.execute()

    def remove_all_jobs(self):
        with self.redis.pipeline() as pipe:
            pipe.delete(self.jobs_key)
            pipe.delete(self.run_times_key)
            pipe.execute()

    def shutdown(self):
        self.redis.connection_pool.disconnect()

    def _reconstitute_job(self, job_state):
        job_state = pickle.loads(job_state)
        job = Job.__new__(Job)
        job.__setstate__(job_state)
        job._scheduler = self._scheduler
        job._jobstore_alias = self._alias
        return job

    def _reconstitute_jobs(self, job_states):
        jobs = []
        failed_job_ids = []
        for job_id, job_state in job_states:
            try:
                jobs.append(self._reconstitute_job(job_state))
            except BaseException:
                self._logger.exception('Unable to restore job "%s" -- removing it', job_id)
                failed_job_ids.append(job_id)

        # Remove all the jobs we failed to restore
        if failed_job_ids:
            with self.redis.pipeline() as pipe:
                pipe.hdel(self.jobs_key, *failed_job_ids)
                pipe.zrem(self.run_times_key, *failed_job_ids)
                pipe.execute()

        return jobs

    def __repr__(self):
        return '<%s>' % self.__class__.__name__

如何使用

from aaa import RedisClusterJobStore

# 配置 Redis集群 作为 JobStore
jobstore = {
    'default': RedisClusterJobStore(startup_nodes=[{"host": "127.0.0.1", "port": 7000}],passoword="test1234")
}
executors={
	'default': ThreadPoolExecutor()
}

scheduler = BackgroundScheduler(jobstores=jobstore, executors=executors)
# 定义一个任务
def job_function():
    print("Hello World!")

# 添加任务到调度器
scheduler.add_job(job_function, 'interval', seconds=5)
scheduler.start()

标签:jobs,redis,self,Redis,Apscheduler,job,集群,key,run
From: https://www.cnblogs.com/jruing/p/18039022

相关文章

  • 小白的学习记录——Redis的简单使用
    Redis是什么?不同于MySql,MySql是基于二维表存储数据,而Redis是一个基于内存的key-value键值对结构数据库我们为什么要用Redis?基于内存存储,读写性很高适合存储热点数据,(热点商品,资讯,新闻)企业应用广泛Redis是对MySql的补充,与其共存,大部分数据放在MySql中。Redis官网:https:/re......
  • Redis加Lua脚本实现分布式锁
    先讲一下为什么使用分布式锁:在传统的单体应用中,我们可以使用Java并发处理相关的API(如ReentrantLock或synchronized)来实现对共享资源的互斥控制,确保在高并发情况下同一时间只有一个线程能够执行特定方法。然而,随着业务的发展,单体应用逐渐演化为分布式系统,多线程、多进程分布在不同......
  • redis自学(6)SkipList
    SkipListSkipList(跳表)首先是链表,但与传统链表相比有几点差异:  元素按照升序排列存储  节点可能包含多个指针,指针跨度不同(最多允许32级指针,跨度成倍数递增)    SkipList的特点:  跳跃表是一个双向链表,每个节点都包含score和ele值  节点按照score值排序,sc......
  • Redis集群在线迁移
    一、redis集群迁移的几种方式离线迁移:通过rdb或者aof文件的方式,实现离线迁移缺点:版本限制,不同版本启动时,可能会出现覆盖数据文件的情况(也可能是集群模式的限制);需要停机,会造成数据不一致问题。使用shell脚本,单库对单库,性能极低,生产环境不能使用。主从同步:成为从节点slaveofip......
  • Redis部署-主从复制
    原理分master和slave;master以写为主,slave只支持读,master的数据发生变化时自动同步到slave。作用读写分离:主机提供写,从机提供读,可提高性能,分散负载压力容灾恢复:主机与从机数据一致,其中一台机器宕机后,另一台机器能正常使用数据备份:主机与从机分散在不同的机器,数据一致,起到了数......
  • 【进阶篇】使用 Redis 实现分布式缓存的全过程思考(一)
    目录前言一、关于缓存二、基本数据结构三、缓存注解3.1自定义注解3.2定义切点(拦截器)3.3AOP实现3.4使用示例四、数据一致性4.1缓存更新策略4.2缓存读写过程五、高可用5.1缓存穿透5.2缓存击穿5.3缓存雪崩5.4Redis集群六、文章小结前言写在前面,让我们从3个问题开始今天的文章:......
  • 项目开发中 Redis 缓存和数据库一致性问题及解决方案
    引入Redis缓存提高性能如果公司的项目业务处于起步阶段,流量非常小,那无论是读请求还是写请求,直接操作数据库即可,这时架构模型是这样的:但随着业务量的增长,你的项目业务请求量越来越大,这时如果每次都从数据库中读数据,那肯定会有性能问题。这个阶段通常的做法是,引入缓存来提高读性......
  • Rancher 无法删除集群的Solution
    Rancher无法删除集群的Solution不同版本的Rancher都能遇到该问题,此问题中,Rancher版本为v2.6.0当我们先删除节点,并在节点宿主机上删除了对应的服务器,再通过Rancher界面去删除托管/自建立集群时,往往这个操作会卡住,并出现报错:{"type":"error","links":{},"code":"PermissionDe......
  • 使用python批量删除redis key
     比如我的业务。刚上线默认为超级管理员新增权限--请导出id用于清缓存svc格式请注意分页需要导出全部selectCONCAT('@rbac/ent/aclgr/',e.id)as需要清理缓存的rediskeyfroment_rbac_groupewherenotexists(selectp.`groupid`froment_rbac_group_permissionp......
  • Redis扩展功能
    Redis事务一次操作执行多条命令,本质是一组命令的集合。一个事务中的所有命令都会序列化,按顺序地串行化执行而不会被其它命令插入、不许加塞。由于redis只能在执行前检查一组命令的语法错误,在命令执行时出现异常没法全体回滚,所以是弱一致性。multi+exec组合正常执行执行前队......