首页 > 其他分享 >celery

celery

时间:2023-06-29 16:26:02浏览次数:33  
标签:task app py celery 任务 import

1 Celery架构,介绍

# Celery:芹菜(跟翻译没有任何关系),分布式异步任务框架,框架(跟其他web框架无关)
# Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
# 架构
	-broker:任务中间件,用户提交的任务,存在这个里面(redis,rabbitmq)
    -worker:任务执行者,消费者,真正执行任务的进程(真正干活的人)
    -backend:任务结果存储,任务执行后的结果(redis,rabbitmq)
	
# celery服务为为其他项目服务提供异步解决任务需求的
# celery能够做的事:
	-异步任务(区分同步任务)
    
    -延迟任务
    -定时任务(其他框架做)
    
# 更好的理解celery
注:会有两个服务同时运行,一个是项目服务(django服务),一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

人是一个独立运行的服务(django) | 医院也是一个独立运行的服务(celery)
	正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
	人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求

2 celery的简单使用

# 安装 windows
# pip install celery
# pip3 install eventlet

2.1 celery_task

# pip3 install celery
from celery import Celery

# app=Celery('test',)

# backend='redis://:密码@127.0.0.1:6379/1'  如果有密码,这么写
broker = 'redis://127.0.0.1:6379/1'  # redis地址
backend = 'redis://127.0.0.1:6379/2'  # redis地址

# 1 实例化得到celery对象
app = Celery(__name__, backend=backend, broker=broker)


# 2 写一堆任务(计算a+b,挖井,砍树),函数,
# 使用装饰器包裹任务(函数)

@app.task()
def add(a, b):
    import time
    time.sleep(2)
    return a + b

2.2 提交任务

# from celery_task import app
import celery_task

# 1 同步执行
# res = celery_task.add(2, 3)  # 普通的同步任务,同步执行任务
# print(res)

# 2 异步任务:
# 第一步:提交(使用任务名.apply_async(参数))
# 结果是任务id号,唯一标识这个任务
# res = celery_task.add.apply_async(args=[2, 3])
res = celery_task.add.apply_async(kwargs={'a':2,'b':3})
print(res)  # abab1ad3-0e58-4faa-bc05-14d157dc8217

# 第二步:让worker执行--->结果存到redis
# 通过命令启动
# 非windows
# 5.x之前这么启动
# 命令:celery worker -A celery_task -l info
# 5.x以后
# celery -A celery_task.main worker  -l info

# windows:
# pip3 install eventlet
# 5.x之前这么启动
# celery worker -A celery_task -l info -P eventlet
# 5.x以后
# celery -A celery_task.main worker  -l info -P eventlet

# 第三步:查看任务执行结果

2.3 查看任务结果

from celery_task import app

from celery.result import AsyncResult

id = 'abab1ad3-0e58-4faa-bc05-14d157dc8217'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():
        print('任务执行成功了')
        result = a.get()  # 异步任务执行的结果
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

3 celery包结构

# 目录结构
    -celery_task               # 包名
    	__init__.py
    	celery.py             # app所在py文件
    	course_task.py        # 任务
    	order_task.py         # 任务
    	user_task.py          # 任务
    提交任务.py                # 提交任务
    查看结果.py                # 查看结果

3.1 celery_task /celery.py

from celery import Celery

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
# include  是一个列表,放被管理的task 的py文件
app = Celery(__name__, backend=backend, broker=broker,include=[
    'celery_task.course_task',
    'celery_task.order_task',
    'celery_task.user_task',
])

# 原来,任务写在这个py文件中

# 后期任务非常多,可能有用户相关任务,课程相关任务,订单相关任务。。。

3.2 celery_task /任务.py

##########user_task.py
import time
from .celery import app

# 发送短信任务
@app.task()
def send_sms(phone, code):
    time.sleep(3)  # 模拟发送短信延迟
    print('短信发送成功,手机号是:%s,验证码是:%s' % (phone, code))
    return '短信发送成功'



############order_task.py
from .celery import app
# 生成订单任务
@app.task()
def make_order():
    with open(r'D:\py18\luffy_api\script\2 celery的包结构\celery_task\order.txt', 'a', encoding='utf-8') as f:
        f.write('生成一条订单\n')
    return True



############course_task.py
from .celery import  app
@app.task()
def add(a,b):
    return a+b

3.3 提交任务.py

from celery_task import user_task,order_task

# 提交一个发送短信任务
# res = user_task.send_sms.apply_async(args=['18972374345', '8888'])
# print(res)


# 提交一个生成订单任务

# res=order_task.make_order.apply_async()
# print(res)


############## celery 执行延迟任务 ##################

# 添加延迟任务
# from datetime import datetime, timedelta
# # datetime.utcnow()  获取当前的utc时间
# eta=datetime.utcnow() + timedelta(seconds=50) #  50s后的utc时间
# # 10s后,发送短信
# res=user_task.send_sms.apply_async(args=(200, 50), eta=eta)
# print(res)


### 使用第二种方式执行异步任务
# res=user_task.send_sms.apply_async(args=('12345566677', '8888'))


res=user_task.send_sms.delay('12345566677', '8888')
print(res)

3.4 查看结果.py

from celery_task.celery import app
from celery.result import AsyncResult

id = '0f283e22-e8d0-40a6-a8ed-8998038bc7a3'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    print(app.conf)
    if a.successful():
        print('任务执行成功了')
        result = a.get()  # 异步任务执行的结果
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

4 celery异步任务,延迟任务

4.1 执行延迟任务

from datetime import datetime, timedelta
# datetime.utcnow()  获取当前的utc时间
eta=datetime.utcnow() + timedelta(seconds=50) #  50s后的utc时间
# 10s后,发送短信
res=user_task.send_sms.apply_async(args=(200, 50), eta=eta)
print(res)

4.1 执行异步任务

# 方式一:不写时间,就表示立即执行
user_task.send_sms.apply_async(args=('12345566677', '8888'))
# 方式二:
res=user_task.send_sms.delay('12345566677', '8888')

5 celery定时任务

5.1 第一步:celey.py中写入

# 第一步,在包(celery_task)下的celey.py中写入
###修改celery的配置信息    app.conf整个celery的配置信息
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

####配置定时任务
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    'send_sms_every_3_seconds': {
        'task': 'celery_task.user_task.send_sms',  # 指定执行的是哪个任务
        'schedule': timedelta(seconds=3),
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': ('18953675221', '8888'),
    },
    'make_order_every_5_seconds': {
        'task': 'celery_task.order_task.make_order',  # 指定执行的是哪个任务
        'schedule': timedelta(seconds=5),
    },
    'add_every_1_seconds': {
        'task': 'celery_task.course_task.add',  # 指定执行的是哪个任务
        'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': (3, 5),
    },
}

5.2 第二步:启动worker

# celery worker -A 包名 -l info -P eventlet
celery worker -A celery_task.main -l info -P eventlet
# 如果beat没有启动,worker是没有活干的,需要启动beat,worker才能干活,和beat启动顺序无先后

5.3 第三步:启动beat

celery beat -A celery_task -l 

6 django中集成celery

# 0 了解
	-django-celery  # 第三方把django和celery集成起来,方便我们使用,但是,第三方写的包的版本,跟celery和django版本完全对应
    	
    -我们自己使用包结构集成到django中

# 第一步,把写好的包,直接复制到项目根路径
# 第二步,在视图类中(函数中)
from celery_task.user_task import send_sms
def test(request):
    mobile = request.GET.get('mobile')
    code = '9999'
    res = send_sms.delay(mobile, code)  # 同步发送假设3分支钟,异步发送,直接就返回id了,是否成功不知道,后期通过id查询
    print(res)

    return HttpResponse(res)

7 django集成celery实现定时任务(定时更新首页轮播图)

# 首页轮播图,加入缓存
	-因为如果不加缓存,每次用户访问首页,都去查一次数据库,对数据库压力大
    -第一次访问查数据库,拿到数据,放到缓存(redis),以后再访问,直接从redis中能够获取
    
    -双写一致性问题(redis缓存和mysql数据不同步)
    	-有方案
    -缓存穿透,缓存击穿,缓存雪崩问题(暂时不讲)
class BannerView(ViewSetMixin, ListAPIView):
    queryset = models.Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[
               :settings.BANNER_COUNT]
    serializer_class = serializer.BannerSerializer

    def list(self, request, *args, **kwargs):
        # 先去缓存中获取,如果缓存有,直接返回,如果没有,去数据库查询,放到缓存

        # 先去缓存中获取
        banner_list = cache.get('banner_list_cache')
        if not banner_list:  # 去数据库中获取
            # 没有走缓存
            print('查了数据库')
            res = super().list(request, *args, **kwargs)
            banner_list = res.data  # res是Response对象
            # 放入到缓存中
            cache.set('banner_list_cache', banner_list)
        return Response(data=banner_list)

8 双写一致性问题

# redis缓存和mysql数据不同步
# 缓存更新策略
	-先更新数据库,再更新缓存(可靠性高一些)
    -先更新数据库,再删缓存(可靠性高一些)
    
	-先删缓存,再更新数据库(缓存删了,数据库还没更新,来了一个请求,缓存了老数据)
    
    -定时更新(对实时性要求不高)
    	-每隔12个小时,更新一下缓存
    

8 首页轮播图缓存定时更新

8.1 home_task.py

# 更新轮播图缓存的任务

from celery_task.celery import app
from home import models
from django.conf import settings
from home import serializer
from django.core.cache import cache


@app.task()
def update_banner():
    # 1  从mysql中取出轮播图数据
    queryset = models.Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[
               :settings.BANNER_COUNT]
    # 2 序列化
    ser = serializer.BannerSerializer(instance=queryset, many=True)
    # 3 获取到字典,手动拼上前面的地址
    banner_list = ser.data
    for banner in banner_list:
        banner['image'] = settings.BACKEND_URL % str(banner['image'])

    # 4 放到缓存中
    cache.set('banner_list_cache', banner_list)
    return True

8.2 celery.py

from celery import Celery

# 由于celery和django 是独立的两个服务,要想在celery服务中使用django,必须加这两句
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffy_api.settings.dev")


# import django
# django.setup()

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
# include  是一个列表,放被管理的task 的py文件
app = Celery(__name__, backend=backend, broker=broker, include=[
    'celery_task.course_task',
    'celery_task.order_task',
    'celery_task.user_task',
    'celery_task.home_task',  #新写的task,一定要注册
])

# 原来,任务写在这个py文件中

# 后期任务非常多,可能有用户相关任务,课程相关任务,订单相关任务。。。


#### 注册定时任务

###修改celery的配置信息    app.conf整个celery的配置信息
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 配置定时任务
from datetime import timedelta

app.conf.beat_schedule = {
    'update_banner_every_3_seconds': {
        'task': 'celery_task.home_task.update_banner',  # 指定执行的是哪个任务
        'schedule': timedelta(seconds=3),
    },
}

8.3 启动djagno,启动beat,启动worker

python manage.py runserver 
celery -A celery_task beat -l info
celery -A celery_task worker -l info -P eventlet

celery定义一个1小时后执行的任务

from datetime import datetime, timedelta
from celery import Celery

app = Celery('my_tasks', broker='amqp://guest@localhost//')

@app.task
def my_task():
    # 任务的逻辑代码
    pass

# 计算一个小时后的时间
eta_time = datetime.now() + timedelta(hours=1)

# 使用 ETA 选项调用任务
result = my_task.apply_async(eta=eta_time)

标签:task,app,py,celery,任务,import
From: https://www.cnblogs.com/yangyucai/p/17514472.html

相关文章

  • celery介绍
    简介Celery是使用python编写的分布式任务调度框架。celery能做什么Celery是一个强大的分布式任务队列框架,它可以与Python应用程序一起使用,提供了异步任务处理和分布式消息传递的能力。以下是Celery框架的一些主要功能和用途:异步任务处理Celery可以将耗时的任务放......
  • 10redis列表操作,其他操作,redis管道,django中使用redis,django缓存,序列化json和pickle,cel
    字符串和字节转换的两种方式#字符串和字节转换的两种方式 -decode,encode-直接类型转换-bytes格式的16进制,2进制,10进制的显示#字符串需要用encode,bytes格式需要用decode,但是有时候忘了#可以直接进行强转b1=bytes(s,encoding='utf-8') print(......
  • celery笔记八之数据库操作定时任务
    本文首发于公众号:Hunter后端原文链接:celery笔记八之数据库操作定时任务前面我们介绍定时任务是在celery.py中的app.conf.beat_schedule定义,这一篇笔记我们介绍一下如何在Django系统中的表里来操作这些任务。依赖及migrate操作beat的启动表介绍手动操作定时任务1......
  • celery笔记七之周期/定时任务及crontab定义
    本文首发于公众号:Hunter后端原文链接:celery笔记七之周期/定时任务及crontab定义periodictask,即为周期,或者定时任务,比如说每天晚上零点零分需要运行一遍某个函数,或者每隔半小时运行一遍该函数,都是这种任务的范畴。在第一篇笔记的时候我们就介绍过celery的组件构成,其中有一......
  • celery笔记七之周期/定时任务及crontab定义
    本文首发于公众号:Hunter后端原文链接:celery笔记七之周期/定时任务及crontab定义periodictask,即为周期,或者定时任务,比如说每天晚上零点零分需要运行一遍某个函数,或者每隔半小时运行一遍该函数,都是这种任务的范畴。在第一篇笔记的时候我们就介绍过celery的组件构成,其中有一个......
  • celery 执行异步任务,延迟任务,定时任务
    celery执行异步任务,延迟任务,定时任务1异步任务 任务.delay(参数)2延迟任务 任务.app_async(args=[].eta=时间对象)#如果没有修改时区,需要使用utc时间3定时任务 需要启动beat和worker-beat定时提交任务进程---》配置在app.comf.beat_schedule的任务-worker......
  • celery封装与包结构
    celery封装与包结构project├──celery_task #celery包│├──__init__.py#包文件│├──celery.py#celery连接和配置相关文件,且名字必须交celery.py│└──tasks.py#所有任务函数├──add_task.py #添加任务......
  • celery笔记五之消息队列的介绍
    本文首发于公众号:Hunter后端原文链接:celery笔记五之消息队列的介绍前面我们介绍过task的处理方式,将task发送到队列queue,然后worker从queue中一个个的获取task进行处理。task的队列queue可以是多个,处理task的worker也可以是多个,worker可以处理任意queue......
  • Django与celery集成:异步任务原理和过程
    0.原理和架构a.客户发送请求到django;b.django产生任务(要执行的函数);c.django把任务丢给celery的brokerd.celery的worker从broker拿到任务并且执行;e.worker执行后保存结果到后端数据库;  1.在django里面配置celery的目录结构PSD:\djangotest\myrecrument>treeD:.├─.idea......
  • Celery的使用
    celery是什么Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。Celery的架构由三部分组成,消息中间件(messagebroker),任务执行单元(worker)和任务执行结果存储(taskresultstore)组成。消息中间件Celery本身不提供消息服......