celery介绍架构和安装:
celery :分布式的异步任务框架,主要用来做:
- 异步任务 - 延迟任务 - 定时任务---》如果只想做定时任务,可以不使用celery,有别的选择
celery 框架,原理
1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket) 2)celery服务为为其他项目服务提供异步解决任务需求的 注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求
人是一个独立运行的服务 | 医院也是一个独立运行的服务
正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
celery架构
消息中间件(broker):消息队列:可以使用redis,rabbitmq
#提交的任务在消息中间件里存着但是没有执行 任务执行单元(worker):真正的执行 提交的任务 任务执行结果存储(banckend):可以使用mysql,redis
Celery 官网:http://www.celeryproject.org/
Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html
Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/
安装celery
pip install Celery
celery不支持win,所以想再win上运行,需要额外安装eventlet
windows系统需要eventlet支持:pip3 install eventlet Linux与MacOS直接执行: 3.x,4.x版本:celery worker -A demo -l info 5.x版本: celery -A demo worker -l info -P eventlet
celery执行异步任务
1 在虚拟环境中装celery和eventlet
pip install Celery
pip3 install eventlet
2 写个py文件(demo.py),实例化得到app对象,注册任务
from celery import Celery import time broker = 'redis://127.0.0.1:6379/1' # 消息中间件 backend = 'redis://127.0.0.1:6379/2' # 结果存储 app = Celery(__name__, broker=broker, backend=backend) @app.task # 加了app.task就成为了celery的任务 def add(a, b): print(a + b,'-----------') time.sleep(1) return a + b
3 启动worker(worker监听消息队列,等待别人提交任务,如果没有就卡再这)
celery -A demo worker -l info -P eventlet #这条命名需要在环境变量下执行才会生效
4 别人 提交任务(add_task.py),提交完成会返回一个id号,后期使用id号查询,至于这个任务有没有被执行,取决于worker有没有启动
from demo import add # 异步使用------>提交任务到消息中间件中,并没有执行,返回的格式是一个id号 res = add.delay(999, 999) print(res) # 输出结果:28c75c45-1ee8-4eb9-8d35-38e316c5d858
5 提交任务的人,再查看结果(get_result.py)
from demo import app # celery的包下 from celery.result import AsyncResult id = '98655b23-883e-4376-b5db-cc90e48323cd' if __name__ == '__main__': a = AsyncResult(id=id, app=app) if a.successful(): # 正常执行完成 result = a.get() # 任务返回的结果 print(result) elif a.failed(): print('任务失败') elif a.status == 'PENDING': print('任务等待中被执行') elif a.status == 'RETRY': print('任务异常后正在重试') elif a.status == 'STARTED': print('任务已经开始被执行')
包结构celery:
1 新建包:celery_task
2 再包下新建 celery.py 必须叫它,里面实例化得到app对象
from celery import Celery broker = 'redis://127.0.0.1:6379/1' # 消息中间件 backend = 'redis://127.0.0.1:6379/2' # 结果存储 app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.course_task', 'celery_task.home_task', 'celery_task.user_task']) #include表示管理的任务
3 新建任务py文件:user_task.py course_task.py home_task.py
home_task.py中:
import time from .celery import app @app.task def send_sms(mobile, code): time.sleep(2) print('%s手机号发送成功,验证码是%s' % (mobile, code)) return True
4 启动worker,以包启动,来到包所在路径下
celery -A 包名 worker -l info -P eventlet celery -A celery_task worker -l info -P eventlet
5 其它程序,导入任务,提交任务即可
from celery_task.home_task import send_sms res = send_sms.delay('16655145917', '1234') print(res)
6 其它程序,查询结果
from celery_task.celery import app # celery的包下 from celery.result import AsyncResult id = '51a669a3-c96c-4f8c-a5fc-e1b8e2189ed0' if __name__ == '__main__': a = AsyncResult(id=id, app=app) if a.successful(): # 正常执行完成 result = a.get() # 任务返回的结果 print(result) elif a.failed(): print('任务失败') elif a.status == 'PENDING': print('任务等待中被执行') elif a.status == 'RETRY': print('任务异常后正在重试') elif a.status == 'STARTED': print('任务已经开始被执行')
celery执行延迟任务:
from datetime import timedelta
#timedelta的解释
timedelta类型为对象类型
print(timedelta(seconds=2)) #输出结果为:0:00:02
print(timedelta(minutes=3)) #输出结果为:0:03:00
print(timedelta(hours=5)) #输出结果为:5:00:00
print(timedelta(hours=3,minutes=5)) #输出结果为:3:05:00
延迟任务实现:
from datetime import timedelta, datetime eta=datetime.utcnow()+timedelta(seconds=5)
#函数.apply_async(kwargs={'mobile':'1896334234','code':8888},eta=时间对象) # res=send_sms.apply_async(args=['16655145917','1234'],eta=eta) res=send_sms.apply_async(kwargs={'mobile':'16655144907','code':'1234'},eta=eta) print(res)
celery执行定时任务:
1.在celery.py中配置:
# 时区 app.conf.timezone = 'Asia/Shanghai' # 是否使用UTC app.conf.enable_utc = False from datetime import timedelta from celery.schedules import crontab # 定时任务配置 app.conf.beat_schedule = { 'send_sms': { 'task': 'celery_task.home_task.send_sms', 'schedule': timedelta(seconds=5), #每隔5秒执行一次 'args': ('1822344343', 8888), }, 'add_course': { 'task': 'celery_task.course_task.add_coures', # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点 'schedule': crontab(hour=11, minute=38), # 每天11点35,执行 'args': (), } }
2.启动beat,启动worker
celery -A celery_task beat -l info
3.到了时间,beat进程负责提交任务到消息队列---》worker执行
django中使用celery:
1 把写好的包,copy到项目根路径下
2.在home_task.py中写任务
from .celery import app @app.task def add_banner(): from home.models import Banner Banner.objects.create(title='测试',image='/1.png',link='3',info='xxx',orders=89) return '增加成功'
3 在celery.py 中加载django配置
import os os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffy_api.settings.dev")
4. 视图函数中,导入任务,提交即可
from rest_framework.views import APIView
from celery_task.home_task import add_banner
class CeleryView(APIView):
def get(self, request):
add_banner.delay()
return APIResponse()
5. 启动worker,等待运行即可
celery -A celery_task worker -l info -P eventlet
接口缓存:
给查询接口加入缓存
# 查询所有 轮播图接口 from rest_framework.viewsets import GenericViewSetfrom .serialzier import BannerSerializer from .models import Banner from utils.common_mixin import CommonListModelMixin as ListModelMixin from django.conf import settings from django.core.cache import cache from utils.common_response import APIResponse class BannerView(GenericViewSet, ListModelMixin): queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT] serializer_class = BannerSerializer def list(self, request, *args, **kwargs):
'''
1 先去缓存中查一下有没有数据
2 如果有,直接返回,不走父类的list了(list在走数据库)
3 如果没有,走父类list,查询数据库
4 把返回的数据,放到缓存中
'''
data = cache.get('home_banner_list') if not data: print(111) res = super().list(request, *args, **kwargs) data = res.data.get('data') cache.set('home_banner_list', data) return APIResponse(data=data)
标签:task,app,celery,任务,print,import From: https://www.cnblogs.com/Hao12345/p/17515230.html