首页 > 其他分享 >11celery介绍架构和安装,celery执行异步任务,包结构celery,celery执行延迟任务和定时任务,django中使用celery,接口缓存

11celery介绍架构和安装,celery执行异步任务,包结构celery,celery执行延迟任务和定时任务,django中使用celery,接口缓存

时间:2023-06-29 19:22:05浏览次数:53  
标签:task app django celery 任务 print import

1 celery介绍架构和安装

# celery :分布式的异步任务框架,主要用来做:
	- 异步任务
    - 延迟任务
    - 定时任务---》如果只想做定时任务,可以不使用celery,有别的选择
    
# celery 框架,原理
1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

人是一个独立运行的服务 | 医院也是一个独立运行的服务
	正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
	人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
    
    
# celery架构
消息中间件(broker):消息队列:可以使用redis,rabbitmq,咱们使用redis
任务执行单元(worker):真正的执行 提交的任务
任务执行结果存储(banckend):可以使用mysql,redis,咱们使用redis

    
# 安装celery
	-pip install Celery
    -释放出可执行文件:celery,由于 python解释器的script文件夹再环境变量,任意路径下执行celery都能找到
    
    
    
# celery不支持win,所以想再win上运行,需要额外安装eventlet
windows系统需要eventlet支持:pip3 install eventlet
Linux与MacOS直接执行:
	3.x,4.x版本:celery worker -A demo -l info
    5.x版本:     celery -A demo worker -l info -P eventlet

image

2 celery执行异步任务

# 基本使用()
	1 再虚拟环境中装celery和eventlet
    2 写个py文件,实例化得到app对象,注册任务
        from celery import Celery
        import time
        broker = 'redis://127.0.0.1:6379/1'  # 消息中间件 redis
        backend = 'redis://127.0.0.1:6379/2'  # 结果存储 redis
        app = Celery(__name__, broker=broker, backend=backend)
        @app.task # 变成celery的任务了
        def add(a, b):
            print('运算结果是',a + b)
            time.sleep(1)
            return a + b
    3 启动worker(worker监听消息队列,等待别人提交任务,如果没有就卡再这)
    	 celery -A demo worker -l info -P eventlet
        
    4 别人 提交任务,提交完成会返回一个id号,后期使用id号查询,至于这个任务有没有被执行,取决于worker有没有启动
    	from demo import add
		res=add.delay(77,66)
        
    5 提交任务的人,再查看结果
    	from demo import app
        # celery的包下
        from celery.result import AsyncResult

        id = '042a8fc1-6b0f-4ad6-bf72-edefa657a52f'
        if __name__ == '__main__':
            a = AsyncResult(id=id, app=app)
            if a.successful():  # 正常执行完成
                result = a.get()  # 任务返回的结果
                print(result)
            elif a.failed():
                print('任务失败')
            elif a.status == 'PENDING':
                print('任务等待中被执行')
            elif a.status == 'RETRY':
                print('任务异常后正在重试')
            elif a.status == 'STARTED':
                print('任务已经开始被执行')

3 包结构celery

# 使用步骤:
	1 新建包:celery_task
    2 再包下新建 celery.py 必须叫它,里面实例化得到app对象
    from celery import Celery
    broker = 'redis://127.0.0.1:6379/1'  # 消息中间件 redis
    backend = 'redis://127.0.0.1:6379/2'  # 结果存储 redis
    app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.course_task','celery_task.home_task','celery_task.user_task'])
    
    3 新建任务py文件:user_task.py   course_task.py  home_task.py
    	-以后跟谁相关的任务,就写在谁里面
        
        '''
from celery import app    表示从celery第三方包包下导入app,没有
from .celery import app   表示从当前路径下的celery.py 中导入app

'''
        from .celery import app
        import time
        @app.task
        def send_sms(mobile, code):
            time.sleep(2)
            print('%s手机号,发送短信成功,验证码是:%s' % (mobile, code))
            return True
	4 启动worker,以包启动,来到包所在路径下
        celery -A 包名 worker -l info -P eventlet
        celery -A celery_task worker -l info -P eventlet

	5 其它程序,导入任务,提交任务即可
        from celery_task.user_task import send_sms
        res = send_sms.delay(1999999333, 8888)
        print(res)  # f33ba3c5-9b78-467a-94d6-17b9074e8533
    6 其它程序,查询结果
    from celery_task.celery import app
    # celery的包下
    from celery.result import AsyncResult

    id = '51a669a3-c96c-4f8c-a5fc-e1b8e2189ed0'
    if __name__ == '__main__':
        a = AsyncResult(id=id, app=app)
        if a.successful():  # 正常执行完成
            result = a.get()  # 任务返回的结果
            print(result)
        elif a.failed():
            print('任务失败')
        elif a.status == 'PENDING':
            print('任务等待中被执行')
        elif a.status == 'RETRY':
            print('任务异常后正在重试')
        elif a.status == 'STARTED':
            print('任务已经开始被执行')

4 celery执行延迟任务和定时任务

# celery 可以做
	-异步任务
    -延迟任务---》延迟多长时间干任务
    -定时任务:每天12点钟,每隔几秒。。。
   		-如果只做定时任务,不需要使用celery这么重,apscheduler(自己去研究)
        

        
 # 异步任务
	-导入异步任务的函数
    -函数.delay(参数)
 # 延迟任务
	-导入异步任务的函数
    eta = datetime.utcnow() + timedelta(seconds=5)
    #5秒后执行
    -函数.apply_async(kwargs={'mobile':'1896334234','code':8888},eta=时间对象)
# 定时任务:在app所在的文件下配置
	- 1 配置(celery.py)
	app.conf.beat_schedule = {
        'send_sms': {
            'task': 'celery_task.user_task.send_sms',
            'schedule': timedelta(seconds=5),  #每隔5秒提交一次
            'args': ('1822344343', 8888),
        },
        'add_course': {
            'task': 'celery_task.course_task.add_course',
            # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
            'schedule': crontab(hour=11, minute=38),  # 每天11点35,执行
            'args': (),
        }
    }
	-2 启动beat,启动worker
	celery  -A celery_task beat -l info 
    -3 到了时间,beat进程负责提交任务到消息队列---》worker执行

5 django中使用celery

# 使用步骤
	1 把之前写好的包,copy到项目根路径下
    2 在xx_task.py 中写任务
    	from .celery import app
        @app.task
        def add_banner():
            from home.models import Banner
            Banner.objects.create(title='测试', image='/1.png', link='/banner', info='xxx',orders=99)
            return 'banner增加成功'
        
   3 在celery.py 中加载django配置
	# 一、加载django配置环境
    import os
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffy_api.settings.dev")

    4 视图函数中,导入任务,提交即可
    	class CeleryView(APIView):
            def get(self, request):
                res = add_banner.delay()
                return APIResponse(msg='新增banner的任务已经提交了')
     #worker的根路径要和copy的包一致
    5 启动worker,等待运行即可

5.1 解释

#1  celery中要使用djagno的东西,才要加这句话    
	import os
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffy_api.settings.dev")

6 接口缓存

# 所有接口都可以改造,尤其是查询所有的这种接口,如果加入缓存,会极大的提高查询速度

# 首页轮播图接口:获取轮播图数据,加缓存---》咱们只是以它为例


class BannerView(GenericViewSet, ListModelMixin):
    queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
    serializer_class = BannerSerializer

    def list(self, request, *args, **kwargs):
        '''
          1 先去缓存中查一下有没有数据
          2 如果有,直接返回,不走父类的list了(list在走数据库)
          3 如果没有,走父类list,查询数据库
          4 把返回的数据,放到缓存中
        '''

        data = cache.get('home_banner_list')
        if not data:  # 缓存中没有
            print('走了数据库')
            res = super().list(request, *args, **kwargs)  # 查询数据库
            # 返回的数据,放到缓存中
            data = res.data.get('data')  # {code:100,msg:成功,data:[{},{}]}
            cache.set('home_banner_list', data)
        return APIResponse(data=data)


    
# 公司里可能会这么写
	-写一个查询所有带缓存的基类
    -写个装饰器,只要一配置,就自动带缓存
    
    
# 双写一致性问题:缓存数据和数据库数据不一致了
	-写入数据库,删除缓存
    -写入数据库,更新缓存
    -定时更新缓存

标签:task,app,django,celery,任务,print,import
From: https://www.cnblogs.com/whxx/p/17515036.html

相关文章

  • celery
    1Celery架构,介绍#Celery:芹菜(跟翻译没有任何关系),分布式异步任务框架,框架(跟其他web框架无关)#Celeryisaprojectwithminimalfunding,sowedon’tsupportMicrosoftWindows.Pleasedon’topenanyissuesrelatedtothatplatform.#架构 -broker:任务中间件,用户提......
  • celery介绍
    简介Celery是使用python编写的分布式任务调度框架。celery能做什么Celery是一个强大的分布式任务队列框架,它可以与Python应用程序一起使用,提供了异步任务处理和分布式消息传递的能力。以下是Celery框架的一些主要功能和用途:异步任务处理Celery可以将耗时的任务放......
  • rsync备份任务练习
    06-备份任务实战今天的任务主要以实际备份任务入手,完成综合练习,完成对rsync的综合运用。先看需求再讲解再次动手实践  客户端需求客户端需求:1.客户端每天凌晨1点在服务器本地打包备份(/etc目录和/var/log目录)2.客户端备份的数据必须存放至以"主机名_ip地......
  • django使用gunicorn框架,客户端请求耗时接口被中断问题
    项目使用的是django,使用了gunicorn作为动态web服务,使用的是supervisor作为进程管理工具。由于特殊原因,最近上线了一个非常耗时的http接口,一段时间后开始有用户陆续反馈他们的代码调用这个接口会返回502错误,经过一段时间的排查排除了网关的问题,确认是系统问题。经过......
  • django离线脚本的使用(就是需要借助django的功能,然后写一些脚本)
    在django中,我们可以创建一些离线脚本,它的意思是我们并没有启动django服务,而只是运行了django的一些配置以便我们在脚本里完成一些,需要django环境支持的脚本操作下面是详细用法:#启动djangoimportosimportsysimportdjango#首先,一定要把当前项目的路径加到python模块搜......
  • 10redis列表操作,其他操作,redis管道,django中使用redis,django缓存,序列化json和pickle,cel
    字符串和字节转换的两种方式#字符串和字节转换的两种方式 -decode,encode-直接类型转换-bytes格式的16进制,2进制,10进制的显示#字符串需要用encode,bytes格式需要用decode,但是有时候忘了#可以直接进行强转b1=bytes(s,encoding='utf-8') print(......
  • django中ORM的相关操作
    ORM-操作<br>基本操作包括增删改查<br>ORMCRUD核心->模型类.管理器对象一、管理器对象每个继承下自models.Model的模型类,都会有一个object对象被同样继承下来,这个对象叫管理器对象<br>数据库的增删改查可以通过模型的管理器实现二、创建数据DjangoORM使用一种直观的......
  • python实现定时任务
    第一种方式:立即执行,间隔时间 第二种方式:具体到某个时间 参考:https://blog.csdn.net/weixin_44799217/article/details/127352531https://blog.csdn.net/u013302168/article/details/123420582 ......
  • django项目在windows的部署(apach+Mod_wsgi+django)
    如果django项目如果要正式使用,我们需要将项目部署到开发环境上去。django项目自带的服务不支持多线程,会出现多个用户访问时,页面卡死,半天打不开的问题。所以,该如何部署django项目呢?下边是我的部署经验,实测有效。如果可以的话,尽量部署到linux上,但是我的系统中涉及到一些window文件......
  • django缓存的使用
    缓存:可以把django中的一个变量(数据),存放到某个位置,下次还可以取出来之前用过:默认放在:内存中,其实可以放在文件中,数据库,redis。。。。fromdjango.core.cacheimportcachecache.set('key','value',5)#存放值res=cache.get('key')#取值通过配置,控制存放在哪,只要如下写,就会......