Celery是Python开发的简单、灵活可靠的、处理大量消息的分布式任务调度模块
安装:
pip install celery # 安装celery库
pip install redis # celery依赖于redis
pip install eventlet # Windows下需要安装
项目结构:
主项目目录下celery.py
from __future__ import absolute_import, unicode_literals import os from celery import Celery # django_test是我的项目名 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_test.settings') # 设置django环境 app = Celery('django_test') app.config_from_object('django.conf:settings', namespace='CELERY') # 使用CELERY_ 作为前缀,在settings中写配置 app.autodiscover_tasks()
在主项目下__init__.py添加下面代码
# celery from .celery import app as celery_app __all__ = ['celery_app']
在settings.py添加celery配置
# celery 配置 redis_host = "redis://:" + redispwd + "@" + redishost + ":" + redisport + "/3" CELERY_BROKER_URL = redis_host redis_host = "redis://:" + redispwd + "@" + redishost + ":" + redisport + "/4" CELERY_RESULT_BACKEND = redis_host CELERY_RESULT_SERIALIZER = 'json' CELERY_TASK_RESULT_EXPIRES = 24 * 60 * 60 # CELERY_MAX_TASKS_PER_CHILD = 10 CELERY_TIMEZONE = 'Asia/Shanghai'
在app下面新建tasks.py
from __future__ import absolute_import, unicode_literals import time from celery import shared_task from django.conf import settings @shared_task def add(x, y): time.sleep(30) print("celery结果") return x + y @shared_task def mul(x, y): print('x*y') return x * y @shared_task def reduce(number): with open(settings.BASE_DIR + '/data.py', 'r') as f: tot = int(f.read()) print(tot) print('celery_t') if tot == 0: return 0 else: data = tot - number with open(settings.BASE_DIR+'/data.py', 'w') as f: f.write(str(data)) return 1
我的views.py文件
import datetime import json import traceback from django.conf import settings from django.contrib.auth.hashers import make_password from django.core.cache import cache from django.core.paginator import Paginator from django.http import JsonResponse from rest_framework.response import Response from rest_framework.decorators import APIView from . import tasks class Test(APIView): def get(self, request): li = [] body = request.query_params.dict() task_id = body.get('task_id') from celery import result ar = result.AsyncResult(task_id) print("--------", ar) print(ar.ready(), ar.state) if ar.ready(): print(ar.state, ar.get()) if ar.state == 'SUCCESS': ret = ar.get() resp = {"success": 1, "data": ret} elif ar.state == "FAILURE" or ar.state == "REVOKED": resp = {"success": 1, "data": 0} else: resp = {"success": 1, "data": 0} return Response(resp) def post(self, request): li = [] resp = {"success": 1, "data": li} # with open(settings.BASE_DIR+'/data.py', 'r') as f: # total = int(f.read()) # print(total) t = tasks.reduce.delay(1) print(t.id) resp = {"success": 1, "data": t.id} return Response(resp)
post请求创建新的任务,get请求通过任务id查询任务状态。
我的data.py文件写入了一个数字,代表商品所剩数量,每次post请求创建新的任务,则会打开data.py文件判断里面数字是否为0,为0表示商品已被抢完,否则数字减1,表示抢到该商品。
标签:py,django,celery,ar,秒杀,import,data From: https://www.cnblogs.com/moon3496694/p/17135373.html