首页 > 其他分享 >6.29 celery分布式异步任务框架

6.29 celery分布式异步任务框架

时间:2023-07-05 09:35:56浏览次数:35  
标签:异步 task app celery 任务 print import 6.29

1.celery:分步式异步任务框架 

/1  异步任务

/2  延迟任务

/3  定时任务

/4  celery架构

  消息中间件(broker):消息队列:可以使用redis,rabbitmq
  任务执行单元(worker):执行单元 执行提交的任务
  任务执行结果存储(banckend):可以使用mysql,redis

/5 安装celery模块:

  cmd 中输入 -pip install Celery

  celery不支持win,所以想再win上运行,需要额外安装eventlet

/6  安装eventlet : pip3 install eventlet

   补充操作:退出虚拟环境  deactivate

          卸载安装的模块:pip3 uninstall celery

2.celery执行异步任务 

/1  新建py文件,注册celery任务

  # 导入Celery类
  from celery import Celery
  broker = 'redis://127.0.0.1:6379/1' # 消息中间件 redis
  backend = 'redis://127.0.0.1:6379/2' # 结果存储 redis
  # 实例化得出对象
  app = Celery(__name__,broker=broker, backend=backend,)

  @app.task # 加上这个装饰器,让add函数变成了celery的任务
  def add(a, b):
    c = a + b
    return c

/2  新建文件,在文件中写入异步调用操作 

  # 1 异步调用 : 执行下面的函数,把任务提交消息队列中,并没有执行,等待work的执行
  # 返回给我们一个id号,我们之后通过id号来查结果
  res = add.delay(2,3)
  print(res) # 得到了一个ID号49e33190-3499-48a3-968b-5f1f59888011

/3  在文科路径下启动worker 

  cmd中进入到提交消息队列代码所在的路径中

  celery -A demo worker -l info -P eventlet

  启动worker和提交消息并没有先后顺序

/4  新建一个 文件来执行结果储存

  # 执行结果存储
  from demo import app
  # 从celery的包下
  from celery.result import AsyncResult

  id = '49e33190-3499-48a3-968b-5f1f59888011'
  if __name__ == '__main__':
  a = AsyncResult(id=id, app=app)
  if a.successful(): # 正常执行完成
  result = a.get() # 任务返回的结果
  print(result)
  elif a.failed():
  print('任务失败')
  elif a.status == 'PENDING':
  print('任务等待中被执行')
  elif a.status == 'RETRY':
  print('任务异常后正在重试')
  elif a.status == 'STARTED':
  print('任务已经开始被执行')

3.celery的包结构使用 

/1  创建包,在里面创建celery.py(名字固定)

  from celery import Celery
  broker = 'redis://127.0.0.1:6379/1' # 消息中间件 redis
  backend = 'redis://127.0.0.1:6379/2' # 结果存储 redis

  # 在括号中添加include参数,include的中括号中写入需要被app管理的文件,

  #  这样写可以免去在上面加装饰器进行管理
  app = Celery(__name__,broker=broker, backend=backend,
  include=['celery_task.user_task','celery_task.home_task','celery_task.course_task'])

/2 在包中新建任务文件

  例如user_task 

  # 新建任务文件
  # 需要在celery前面加点,不加点就表示从第三方包导入app
  # celery前面加点表示从当前路径下导入
  from .celery import app
  import time

  @app.task
  def send_sms(mobile,code):
  time.sleep(3)
  print('成功发送短信给%s,验证码为%s' % (mobile,code))
  return True

/3  在包的外边创建提交任务的代码 

  # 提交任务
  from celery_task.user_task import send_sms
  res = send_sms.delay(88888888,77777)
  print(res)

/4  启动worker

  启动worker
  要在包所在的路径下启动
  celery -A 包名 worker -l info -P eventlet    

/5  查看结果  

  # 查询结果
  from celery_task.celery import app
  # 从celery的包下
  from celery.result import AsyncResult

  id = '190ffcc5-9cb5-4781-978f-9d1949c68d86'
  if __name__ == '__main__':
  a = AsyncResult(id=id, app=app)
  if a.successful(): # 正常执行完成
  result = a.get() # 任务返回的结果
  print(result)
  elif a.failed():
  print('任务失败')
  elif a.status == 'PENDING':
  print('任务等待中被执行')
  elif a.status == 'RETRY':
  print('任务异常后正在重试')
  elif a.status == 'STARTED':
  print('任务已经开始被执行')

4.celery执行延时任务和定时任务

/1  定义:

  延时任务:等待一定的时间再执行任务

  定时任务:规定例如每天几点执行一次

/2  延时任务

  先导入异步任务的函数

  创建时间对象

  例:eta=datetime.utcnow() + timedelta(seconds=5)   #  datetime.utcnow() 拿取的是UTC时间(是个对象),timedelta(seconds=5)(变为对象)可以和上面拿到的UTC时间相加

  # 使用apply_async传参数加上时间对象
  res = send_sms.apply_async(kwargs={'mobile':88888888,'code':77777},eta=eta)

/3 定时任务 (写在celery文件中)

  # 配置时区
  app.conf.timezone = 'Asia/Shanghai'
  # 是否使用UTC
  app.conf.enable_utc = False  

  # 定时任务的配置
  from datetime import timedelta
  from celery.schedules import crontab
  app.conf.beat_schedule = {
    'send_sms': {
    'task': 'celery_task.user_task.send_sms',
    'schedule': timedelta(seconds=5),
    'args': ('99999999', 8888),
    },
    'obtain_app': {
    'task': 'celery_task.home_task.obtain_app',
    # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
    'schedule': crontab(hour=11, minute=38), # 每天11点35,执行
    'args': (),
    }
  }

5.django中使用celery  

/1  首先把建立的celery模块的包文件夹移动到根路径下

/2  增加新的任务文件banner_task,并添加新任务

  from .celery import app

  @app.task
  def add_banner():
    from home.models import Banner
    Banner.objects.create(title='测试', image='/1.png', link='/banner', info='xxx',orders=99)
    return 'banner增加成功'

/3  在celery.py 中加载django配置

  # 注:只有在需要用到django中的

  import os
  os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffy_01.settings.dev")

/4  视图函数中添加接口,导入任务,提交即可 

  class CeleryView(APIView):
    def get(self, request):
    res = add_banner.delay()
    return APIResponse(msg='新增banner的任务已经提交了')

/5 启动worker

  在根路径下启动就行

6.接口的缓存 

/1  双写一致性:

  缓存数据和数据库的数据不一致了

  写入数据库,删除缓存

  写入数据库,更新缓存

/2  在轮播图接口下添加方法 

  def list(self, request, *args, **kwargs):
  #  接口中接入缓存的逻辑
  1 先去缓存中查一下有没有数据
  2 如果有,直接返回,不走父类的list了(list在走数据库)
  3 如果没有,走父类list,查询数据库
  4 把返回的数据,放到缓存中

  data = cache.get('home_banner_list')
  if not data: # 缓存中没有
  print('走了数据库')
  res = super().list(request, *args, **kwargs) # 查询数据库
  # 返回的数据,放到缓存中
  data = res.data.get('data') # {code:100,msg:成功,data:[{},{}]}
  cache.set('home_banner_list', data)
  return APIResponse(data=data)  

标签:异步,task,app,celery,任务,print,import,6.29
From: https://www.cnblogs.com/abc683871/p/17514456.html

相关文章

  • 接口缓存、定时更新、异步发送短信
    目录一、接口缓存二、双写一致性之定时更新celery的定时任务代码celery_task/home_task.pycelery_task/celery.py启动worker、beta三、异步发送短信步骤视图函数user/views.py任务celery_task/user_task.py四、异步秒杀逻辑前后端4.1前端Sckill.vue4.2后端视图类路由任务:celery......
  • 012双写一致性之定时更新,异步发送短信,异步秒杀逻辑前后端,课程页面前端,课程相关表分析,
    0双写一致性之定时更新#一旦加入缓存,就会出现数据不一致的请请求#双写一致性问题 -1改数据,删缓存-2改数据,改缓存-3定时更新#首页轮播图存在双写一致性问题这个问题 -以现在的技术水平(信号),做不到:改数据删缓存 -能选择的就是定时更新 -轮播......
  • 多任务异步协程实现
    1.未实现异步操作代码: 2.实现异步操作代码: ......
  • 异步编程
    1.greenlet实现协程过程详细: 2.yield关键字实现协程for循环流程: 3.asyncio模块实现协程操作: 4.async&await关键字实现协程: ......
  • [GPT] 网页中某些dom内容是通过 js 数据异步渲染的,nodejs 怎么获取网页解析这些数据
     要处理使用JavaScript异步渲染内容的网页,您可以在JavaScript蜘蛛中使用Puppeter或Playwright等无头浏览器来获取网页,然后与动态渲染的内容进行交互。 下面是一个使用Puppeteer的例子:constpuppeteer=require('puppeteer');(async()=>{//Launchaheadles......
  • 2023年暑假集训总结/6.29
    6-27T1有毒爱排列有毒让你求长度为n且逆序对个数对p取余为k的排列的个数,答案对998244353取模。考试时我考虑到设fi,j表示放了数1∼i,此时逆序对个数modp=j的排列个数。转移显然,枚举i+1放到哪个位置即可,时间复杂度O(n^2p)。得了60分,而后通过观察性......
  • 异步爬虫之线程池案例应用
    1.爬取梨视频缓存本地视频_1: 2.爬取梨视频缓存本地视频_2: 3.下载至本地视频:  ......
  • 文章发布审核——同步调用与异步调用
    同步:就是在发出一个调用时,在没有得到结果之前,该调用就不返回(实时处理)异步:调用在发出之后,这个调用就直接返回了,没有返回结果(分时处理)发布文章——》异步调用——》审核文章异步线程的方式审核文章 SpringBoot集成异步线程调用:①:在自动审核的方法上加上@Async注解(标明要异步......
  • 异步爬虫之线程池的基本使用
    1.串行方式执行程序记录: 2.串行使用8秒,使用线程池用了2秒: ......
  • 异步
    Promise所谓Promise,简单说就是一个容器,里面保存着某个未来才会结束的事件(通常是一个异步操作)的结果。从语法上说,Promise是一个对象,从它可以获取异步操作的消息。Promise提供统一的API,各种异步操作都可以用同样的方法进行处理。promise对象的状态不受外界的影响.有三种状态:p......