celery介绍和安装
# Celery 是什么 -翻译过来是 芹菜 的意思,跟芹菜没有关系 -框架:服务,python的框架,跟django无关 -能用来做什么 -1 异步任务 -2 定时任务 -3 延迟任务 # 理解celery的运行原理 """ 1) 可以不依赖任何服务器,通过自身命令,启动服务’ 2)celery服务为其他项目服务提供异步解决任务需求的 - 注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求 - 是一个独立运行的服务 | 医院也是一个独立运行的服务 - 正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题 - 人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求 """ # celery架构(Broker,backend 都用redis) -1. 任务中间件Broker(中间件),其他服务提交的异步任务,放在里面排队 - 需要借助于第三方redis rabbitmq -2. 任务执行单元worker,真正执行异步任务的进程 - celery提供的 -3. 结果存储backend,结果存储函数的返回结果,存到backend中 - 需要借助于第三方:redis,mysql # 使用场景 异步执行:解决耗时任务 延迟执行:解决延迟任务 定时执行:解决周期任务 # celery 不支持win,通过eventlet支持在win上运行
celery快速使用
# 1.安装 - 安装完成后会有一个可执行文件celery pip install celery # 不支持win系统,不代表不可以用 """ win:pip install eventlet """ # 2.快速使用 ### 第一步:新建main.py from celery import Celery # 提交的异步任务,放在里面 broker = 'redis://127.0.0.1:6379/1' # 执行完的结果放在这里 backend = 'redis://127.0.0.1:6379/2' app =Celery('test',broker=broker,backend=backend) @app.task def add(a,b): import time time.sleep(3) print('------',a+b) return a+b ### 第二步:其他程序,提交任务 res =add.delay(5,6) # 原来add的参数,直接放在delay中传入即可 print(res) # f150d8a5-c955-478d-9343-f3b60d0d5bdb ### 第三步:启动worker #启动worker命令,win需要安装eventlet win: -4.x之前的版本 celery worker -A main -l info -P eventlet -4.x之后 celery -A main worker - l info -P eventlet mac: celery -A main worker -l info ### 第四步:worker会执行消息中间件中的任务,把结果存起来 ### 第五步:看执行结果,拿到执行的结果 from main import app from celery.result import AsyncResult id='51611be7-4914-4bd2-992d-749008e9c1a6' if __name__ == '__main__': a=AsyncResult(id=id,app=app) if a.successful(): # 执行完了 result = a.get() print(result) elif a.failed(): print('任务失败') elif a.status == 'PENDING': print('任务等待中被执行') elif a.status == 'RETRY': print('任务异常后正在重试') elif a.status == 'STARTED': print('任务已经开始被执行')
celery包结构
project ├── celery_task # celery包 │ ├── __init__.py # 包文件 │ ├── celery.py # celery连接和配置相关文件,且名字必须交celery.py │ └── tasks.py # 所有任务函数 ├── add_task.py # 添加任务 └── get_result.py # 获取结果 ### 第一步:新建包celery_task # 在包下新建【必须叫celery.py】文件,在这个下面写代码 from celery import Celery broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' app = Celery('test', broker=broker, backend=backend, include=['celery_task.order_task', 'celery_task.user_task']) ### 第二步: 在包内部,写task,任务异步任务 -【order_task】 from .celery impport app import time @app.task def add(a,b): print('-----',a+b) time.sleep(2) return a+b - 【user_task】 from .celery import app import time @app.task def send_sms(phone,code): print("给%s发送短信成功,验证码为:%s" % (phone, code)) time.sleep(2) return True ### 第三步:启动worker,包在目录下 celery -A celery_task worker -l info -P eventlet ### 第四步:其他程序,提交任务,被提交到中间件中,等待worker执行,因为worker启动了,就会被worker执行 from celery_task import send_sms res=send_sms.delay('1999999',8888) print(res) # 7d39033c-4cc7-4af2-8d78-e62c277db183 ### 第五步:worker执行完,结果存到backend中 ### 第六步:查看结构 from celery_task import app from celery.result import AsyncResult id = '7d39033c-4cc7-4af2-8d78-e62c277db183' if __name__ == '__main__': a = AsyncResult(id=id, app=app) if a.successful(): # 执行完了 result = a.get() # print(result) elif a.failed(): print('任务失败') elif a.status == 'PENDING': print('任务等待中被执行') elif a.status == 'RETRY': print('任务异常后正在重试') elif a.status == 'STARTED': print('任务已经开始被执行')
标签:task,app,worker,celery,任务,print From: https://www.cnblogs.com/juzijunjun/p/17195899.html