首页 > 其他分享 >Celery - 分布式任务队列

Celery - 分布式任务队列

时间:2023-05-04 17:15:52浏览次数:34  
标签:celery task 队列 app worker Celery 任务 import 分布式

Celery - 分布式任务队列


目录

1 celery简介

1.1 什么是celery

Celery是一个用Python编写的分布式任务队列框架。它允许开发者将任务分发到多台服务器或进程中执行,从而实现高效的异步任务处理。

Celery框架基于消息传递实现,任务可以在多个节点之间异步传递和执行,这大大提高了任务的可靠性和可伸缩性。Celery还支持多种消息传递中间件,如RabbitMQ、Redis、Amazon SQS等。

  • 注意:

1)celery可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)

2)celery服务为为其他项目服务提供异步解决任务需求的

注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

在django中想用异步,交个celery来做,其他不需要的则django自己来做

celery和django是两个项目服务

1.2 celery架构

Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

image-20230301162038433

(1)消息中间件 message broker

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

(2)任务执行单元 worker

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

(3)任务结果存储 task result store

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

任务中间件 broker:其他项目服务提交的异步任务,放在里面排队 【需要借助第三方:redis,rabbitmq】

任务执行单元 worker:真正执行异步任务的进程

结果存储 backend:结果存储,函数的返回结果存到backend中,【需要借助第三方:redis,mysql】

(4)使用场景

异步执行:解决耗时任务

延迟执行:解决延迟任务

定时执行:解决周期(周期)任务

2 Celery安装与使用

2.1 安装

pip install celery

消息中间件:RabbitMQ/Redis

windows系统上要安装这个模块

pip install eventlet

2.2 快速使用

① 第1步:创建celery app与创建任务

  • celery_test.py中
from celery import Celery

# 先配置好
# 消息中间件broker:提交的异步任务,放在broker中
broker = 'redis://127.0.0.1:6379/3'
# 任务结果存储backend:执行完的结果,放在backend中
backend = 'redis://127.0.0.1:6379/4'

# 第1步:类实例化得到对象

celery_app = Celery('test1', broker=broker, backend=backend)


# 第一个参数是名字:可以用__name__
# 第二个、第三个参数是:broker、backend

# 第2步:写任务,用装饰器的形式
@celery_app.task
def add_test(a, b):
    import time
    time.sleep(3)
    print(a + b)
    return a + b

# 第3步:提交任务,在别的程序中写提交任务

② 第2步:提交任务:在别的程序中

  • django - task.py 中
# 在这里编写celery的提交任务


from celery_test import add_test

# 同步调用
# res = add_test(5,6)
# print(res)

# 异步调用

res = add_test.delay(1, 3)
print(res)
# 这时候,任务还没执行,说明celery的worker还没执行,只是提交上去了
# 422c683a-a255-4c7c-be27-ccb76e7ee19f


# 启动 worker

image-20230310155800097

③ 第3步:启动celery(app)服务 【也就是worker】

  • 非windows
4.x之前版本
celery worker -A celery的包名 -l info

5.x之后的版本
celery -A celery的包名 worker -l info
  • windows
pip3 install eventlet

4.x之前版本
celery worker -A celery的包名 -l info -P eventlet

5.x之后的版本
celery -A celery的包名 worker -l info -P eventlet

④ 第4步:worker会执行消息中间件中的任务,并把结果存起来

image-20230310155941135

worker执行完会将结果存储在redis中

image-20230310155822455

⑤ 第5步:查看结果【通过程序获得结果】

  • django - get_result.py中
# 查看celery的worker结果
from celery_test import celery_app

from celery.result import AsyncResult

id = '17e170a4-0c74-455d-afa7-4c5c893de152'
if __name__ == '__main__':
    async_obj = AsyncResult(id=id, app=celery_app)
    if async_obj.successful():
        result = async_obj.get()
        print(result)
    elif async_obj.failed():
        print('任务失败')
    elif async_obj.status == 'PENDING':
        print('任务等待中被执行')
    elif async_obj.status == 'RETRY':
        print('任务异常后正在重试')
    elif async_obj.status == 'STARTED':
        print('任务已经开始被执行')

3 celery包结构 - 在项目中使用celery

project
    ├── celery_task  	# celery包
    │   ├── __init__.py # 包文件
    │   ├── celery.py   # celery连接和配置相关文件,且名字必须交celery.py
    │   └── tasks.py    # 所有任务函数
    ├── add_task.py  	# 添加任务
    └── get_result.py   # 获取结果

3.1 在项目中使用celery操作步骤:

① 第1步:新建包 - 编写celery项目:创建、注册、配置

在celery项目的包结构中

  • celery.py中,注册任务和配置
from celery import Celery

# 先配置好
# 消息中间件broker:提交的异步任务,放在broker中
broker = 'redis://127.0.0.1:6379/3'
# 任务结果存储backend:执行完的结果,放在backend中
backend = 'redis://127.0.0.1:6379/4'

# 第1步:类实例化得到对象
# 不要忘记include
celery_app = Celery('test1', broker=broker, backend=backend,
                    include=['celery_project.order_task', 'celery_project.user_task'])

# 新建任务py文件

② 第2步:在包内部task.py中,编写异步任务

# 发送短信任务


from .celery import celery_app
import time


@celery_app.task
def send_sms(mobile, code):
    print("发送%s短信成功:验证码为%s" % (mobile, code))
    time.sleep(2)
    return True

③ 第3步:在celery项目的路径外,启动worker

/Luffy/luffy_api/script   celery -A celery_test worker -l info

④ 第4步:在其他项目中,提交任务

被提交到中间件中,等待worker执行,因为worker启动了,就会被worker执行

"提交celery任务"


from celery_project.user_task import send_sms


# 同步调用
# res = send_sms('13900000000','1234')
# print(res) # 发送13900000000短信成功:验证码为1234


# 异步调用
task_id = send_sms.delay('13900000000','1234')
print(task_id)
# 8d1aea99-1ba0-4248-a91f-786ac9ffa1c9

⑤ 第5步:worker执行完,结果存到backend

这里使用的是redis

image-20230312193155609

⑥ 第6步:在其他项目中,查看结果

# 查看celery的worker结果
from celery_project.celery import celery_app

from celery.result import AsyncResult

id = '8d1aea99-1ba0-4248-a91f-786ac9ffa1c9'
if __name__ == '__main__':
    async_obj = AsyncResult(id=id, app=celery_app)
    if async_obj.successful():
        result = async_obj.get()
        print(result)
    elif async_obj.failed():
        print('任务失败')
    elif async_obj.status == 'PENDING':
        print('任务等待中被执行')
    elif async_obj.status == 'RETRY':
        print('任务异常后正在重试')
    elif async_obj.status == 'STARTED':
        print('任务已经开始被执行')

4 异步任务、延迟任务、定时任务

4.1 异步任务

任务名.delay(args=[参数])

4.2 延迟任务

app.apply_async(args=[参数],eta=时间对象)
  • 演示
# 延时任务:延迟20秒
from datetime import datetime, timedelta

eta = datetime.utcnow() + timedelta(seconds=20)
# celery默认配置文件中使用的utc的时间
res2 = send_sms.apply_async(args=['222222', '8888'], eta=eta)

4.3 定时任务

每间隔一段时间,执行某个任务

定时任务需要启动beat

beat:【定时提交任务的进程:配置在app.conf.beat_schedule 中的任务

worker:执行任务

① 第一步:任务的定时配置

配置中可以配置多个

from celery.schedules import crontab
# 新建任务py文件
celery_app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
celery_app.conf.enable_utc = False
# celery的配置文件#####
# 任务的定时配置
celery_app.conf.beat_schedule = {
    'send_sms': {
        'task': 'celery_project.user_task.send_sms',  # 导包,一定从celery包开始导入
        # 'schedule': timedelta(seconds=3),  # 时间对象
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'schedule': crontab(hour=23, minute=55),  # 每天9点43
        'args': ('18888888', '6666'),
    },
}

② 第二步:启动beat

beat 是定时提交任务的进程,执行配置在app.conf.beat_schedule中的任务

celery -A celery_project beat -l info

③ 第三步:启动worker

celery -A celery_project worker -l info 

win:celery -A celery_task worker -l info -P eventlet

注意:

  • 启动命令的执行位置,如果是包结构一定要在包这层,也就是创建了celery包的上一层

  • include=['xxx'],路径是从包名下开始导的

image-20230313000248451

5 django中使用celery

使用定时任务,除了celery还可以使用别的:APSchedule第三方模块执行定时任务

5.1 使用步骤

① 第一步:将写好的包放在项目路径下

-luffy_api
    -celery_task #celery的包路径
       -luffy_api  #源代码路径

② 第二步:在使用提交一步任务的位置,导入使用即可

视图函数中使用,导入任务

任务.delay()  # 提交任务

③ 第三步:启动worker,如果有定时任务,启动beat

celery -A celery包 worker -l info

④ 第四步:等待任务被worker执行

⑤ 在视图函数中,查询任务执行的结果

5.2 案例:秒杀功能

异步操作做秒杀,提高并发量

① 前端逻辑:

1) 秒杀按钮,用户点击 发送ajax请求到后端

② 后端逻辑:

2) 视图函数---》提交秒杀任务---》借助于celery,提交到中间件中了

3) 当次秒杀的请求,就回去了,携带者任务id号在前端

4) 前端开启定时任务,每隔3s钟,带着任务,向后端发送请求,查看是否秒杀成功

5) 后端还可能出现的情况

​ I) 任务还在等待被执行----》返回给前端,前端继续每隔3s发送一次请求
​ II) 任务执行完了,秒杀成功了---》返回给前端,恭喜您秒杀成功--》关闭前端定时器
​ III) 任务执行完了,秒杀失败了---》返回给前端,秒杀失败--》关闭前端定时器

	-1 把咱们写的包,复制到项目目录下
    	-luffy_api
        	-celery_task #celery的包路径
            	celery.py  # 一定不要忘了一句话
                import os
                os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
            -luffy_api  #源代码路径
            
    -2 在使用提交异步任务的位置,导入使用即可
    	-视图函数中使用,导入任务
        -任务.delay()  # 提交任务
        
        
    -3 启动worker,如果有定时任务,启动beat
    
    -4 等待任务被worker执行
    
    -5 在视图函数中,查询任务执行的结果
    
    
    
 # 重点:celery中使用djagno,有时候,任务中会使用django的orm,缓存,表模型。。。。一定要加
	os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
  • 视图
#### 秒杀逻辑,CBV
from rest_framework.viewsets import ViewSet

from celery_task.order_task import sckill_task
from celery_task.celery import app
from celery.result import AsyncResult


class SckillView(ViewSet):
    @action(methods=['GET'], detail=False)
    def sckill(self, request):
        a = request.query_params.get('id')
        # 使用异步,提交一个秒杀任务
        res = sckill_task.delay(a)
        return APIResponse(task_id=res.id)

    @action(methods=['GET'], detail=False)
    def get_result(self, request):
        task_id = request.query_params.get('task_id')
        a = AsyncResult(id=task_id, app=app)
        if a.successful():
            result = a.get()
            if result:
                return APIResponse(msg='秒杀成功')
            else:
                return APIResponse(code=101, msg='秒杀失败')
        elif a.status == 'PENDING':
            print('任务等待中被执行')
            return APIResponse(code=666, msg='还在秒杀中')
  • 任务 order_task.py
# 秒杀任务
import random
import time


@app.task
def sckill_task(good_id):
    # 生成订单,减库存,都要在一个事务中
    print("商品%s:秒杀开始" % good_id)
    # 这个过程,可能是1,2,3s中的任意一个
    time.sleep(random.choice([6, 7, 9]))
    print('商品%s秒杀结束' % good_id)

    return random.choice([True, False])

  • 前端Sckill.vue
<template>
  <div>

    <button @click="handleSckill">秒杀</button>
  </div>
</template>

<script>
import Header from '@/components/Header';
import Banner from '@/components/Banner';
import Footer from '@/components/Footer';

export default {
  name: 'Sckill',
  data() {
    return {
      task_id: '',
      t: null
    }
  },
  methods: {
    handleSckill() {
      this.$axios.get(this.$settings.BASE_URL + '/user/sckill/sckill/?id=999').then(res => {
        this.task_id = res.data.task_id
        this.t = setInterval(() => {
          this.$axios.get(this.$settings.BASE_URL + '/user/sckill/get_result/?task_id=' + this.task_id).then(res => {
            if (res.data.code == 666) {
              //如果秒杀任务还没执行,定时任务继续执行
              console.log(res.data.msg)
            } else {
              // 秒杀结束,无论成功失败,这个定时任务都结束
              clearInterval(this.t)
              this.t = null
              this.$message(res.data.msg)
            }

          })
        }, 2000)
      }).catch(res => {

      })
    }
  }

}
</script>

6 轮播图接口加缓存

# 1 网站首页被访问的频率很高,瞬间 1w个人在访问,首页的轮播图接口会执行1w次,1w次查询轮播图标的sql在执行,轮播图基本不变
# 2 想一种方式,让这1w个访问,效率更高一些,不查数据库了,直接走缓存--》redis--》效率高

# 3 现在的逻辑变成了
	1 轮播图接口请求来了,先去缓存中看,如果有,直接返回
    2 如果没有,查数据库,然后把轮播图数据,放到redis中,缓存起来
    
    
    
    
# 改接口
    # 加入缓存的轮播图接口
    def list(self, request, *args, **kwargs):
        # 查看缓存有没有数据,如果没有,再走数据库
        banner_list = cache.get('banner_list')
        if banner_list:
            print('走了缓存')
            return APIResponse(data=banner_list)
        else:  # 走数据库
            print('走了数据库')
            res = super().list(request, *args, **kwargs)
            # 把序列化后的数据存到缓存中,redis中
            cache.set('banner_list', res.data)
            return APIResponse(data=res.data)

7 双写不一致

加入缓存后,如果缓存中有数据,先去缓存中拿。

但是如果mysql中的数据修改了,缓存中的数据则不会修改,则数据不一致

双写一致性:****写入mysql,redis没动,数据不一致存在问题

7.1 解决方式

① 修改数据,删除缓存

② 修改数据,更新缓存

③ 定时更新缓存:实时性很差

7.2 轮播图定时更新

# 第一步:在celery配置定时任务
app.conf.beat_schedule = {
    'update_banner': {
        'task': 'celery_task.home_task.update_banner',
        'schedule': timedelta(seconds=3),  # 时间对象
    },
}

# 第二步:启动worker,启动beat


# update_banner任务的代码
from home.models import Banner
from home.serializer import BannerSerializer
from django.core.cache import cache
from django.conf import settings
@app.task
def update_banner():
    # 只要这个任务一执行,就更新轮播图的缓存
    banners = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')
    ser = BannerSerializer(instance=banners, many=True)
    for item in ser.data:
        item['image'] = settings.BACKEND_URL + item['image']

    cache.set('banner_list', ser.data)  # 会出问题,轮播图地址显示不全
    return True

标签:celery,task,队列,app,worker,Celery,任务,import,分布式
From: https://www.cnblogs.com/DuoDuosg/p/17371844.html

相关文章

  • 分布式有状态服务的调度技术预研报告
    1.研究项目背景平台版本建设中,为了充分发挥视频分析引擎性能,需要针对业务特点,现有的分布式调用方式无法满足需求,需要研究分布式服务的有状态调用实现。2.技术现状分析2.1分布式有状态服务调度技术的发展历程固定分配最初的分布式有状态服务调度技术采用固定分配的方式,......
  • 《分布式任务调度平台XXL-JOB》
    文档地址https://www.xuxueli.com/xxl-job/中文文档EnglishDocumentation源码仓库地址源码仓库地址ReleaseDownloadhttps://github.com/xuxueli/xxl-jobDownloadhttp://gitee.com/xuxueli0323/xxl-jobDownload中央仓库地址<!--http://repo1.maven.org/......
  • Flask中使用Celery教程
    不管是使用什么编程语言,使用什么框架。在服务器上执行耗时操作,比如网络请求、视频转码、图片处理等。如果想实现快速响应客户端的需求,则必须使用任务队列。任务队列是一个单独的程序,和网站没有直接关系,任务队列提供了接口,能在网站中通过代码操作任务队列,比如:添加任务,查看任务结......
  • 消息队列
    sys/msg.h#include<sys/msg.h>intmain(void){//创建消息队列//通过key创建或获取消息队列返回消息队列ID失败返回-1/**msgget创建或获取消息队列*key:ftok函数返回的key*msgflg标志位置*0-获取不......
  • 云原生技术实践营「微服务X消息队列专场」
    微服务和消息队列都是当前比较流行的架构模式,可以帮助开发者在实际业务中解决大型复杂分布式系统面临的各种挑战:微服务架构是一种云原生架构方法,目的是提高系统的扩展性、可靠性和灵活性,它提倡将单一的应用程序划分成一组小的服务,服务之间互相协调、互相配合,每个服务运行在其独立......
  • 分布式kv存储系统之etcd集群管理
    etcd简介etcd是CoreOS团队于2013年6月发起的开源项目,它的目标是构建一个高可用的分布式键值(key-value)数据库。etcd内部采用raft协议作为一致性算法,etcd基于Go语言实现。 官方网站:https://etcd.io/;github地址:https://github.com/etcd-io/etcd;官方硬件......
  • 分布式锁
    一、什么是分布式锁?为什么需要分布式锁锁,是用来保证线程或进程同步的工具,用于控制对共享资源的访问。分布式锁也是锁的一种。普通的锁(例如Java中的Synchronized和ReentrantLock)无法用在多个进程中,此时就需要分布式锁来控制分布式系统对共享资源的访问。在Java开发的分布式系统......
  • 终于有人把openGauss3.0.0分布式原理讲透了,openGauss X ShardingSphere分布式原理和部
    本文为原理精讲,部署文章链接如下https://www.cnblogs.com/opengauss/p/17364285.html一、opengauss的背景和行业现状2022年,七大openGauss商业版发布,是基于openGauss3.0推出商业发行版目前海量数据库Vastbase表现最佳,一直是TOP1作者认为之所以海量数据库Vastbase......
  • 分布式共识如何工作?
    英文原文链接:https://medium.com/s/story/lets-take-a-crack-at-understanding-distributed-consensus-dad23d0dc95HowDoesDistributedConsensusWork?目录HowDoesDistributedConsensusWork?什么是分布式系统?分布式系统的属性在分布式系统中达成共识意味着什么区块链技......
  • 分布式事务
    分布式理论CAP理论在一个分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partitiontolerance),这三个要素最多只能同时实现两点,不可能三者兼顾。由于P(分区容错)是必选项,所以只能在AP或者CP中选择。一致性(Consistency):在分布式系统中的所有数据备份,在同一时刻是......