首页 > 其他分享 >Flask项目配置Celery

Flask项目配置Celery

时间:2024-06-06 14:43:56浏览次数:19  
标签:celery task Flask app 配置 Celery path distribution os

 service_task/async_tasks/celery_app.py

import os
import sys
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
env = os.getenv("env", "dev")
path = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
sys.path.insert(0, path)
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
from flask import Flask
from celery import Celery
from service_task.create_app_depend import app_conf
print("-----------------------------service_task---当前环境-----------------------------")
print(env)
print("-----------------------------service_task---当前环境-----------------------------")

cache_config = app_conf.env_data['RedisCache']

broker = "redis://:{password}@{host}:{port}/0".format(password=cache_config['Password'], host=cache_config['Host'],
                                                      port=cache_config['Port'])
backend = "redis://:{password}@{host}:{port}/1".format(password=cache_config['Password'], host=cache_config['Host'],
                                                      port=cache_config['Port'])

def make_celery(app):
    celery_app = Celery(
        main="task_notify",
        backend=backend,
        broker=broker,

    )
    celery_app.autodiscover_tasks(['async_tasks.task_relate'])
    celery_app.conf.task_serializer = "json"
    celery_app.conf.result_serializer = "json"
    celery_app.conf.accept_content = ["json"]
    celery_app.conf.broker_transport_options = {'visibility_timeout': 3600, 'max_retries': 0}
    celery_app.conf.timezone = "Asia/Shanghai"  # 时区
    celery_app.conf.enable_utc = False  # 是否使用UTC
    # celery_app.conf.update(app.config)
    class ContextTask(celery_app.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery_app.Task = ContextTask
    return celery_app

falsk_app = Flask(__name__)
falsk_app.config.from_object(app_conf.config)
from service_task.create_app_depend.sql_db import db
# 连接数据库
db.init_app(falsk_app)
celery_app = make_celery(falsk_app)

 service_task/async_tasks/task_relate.py

from service_task.async_tasks.celery_app import celery_app
from service_task.apps.issued_task_school.bll.payment_task_distribution_celery_bll import payment_task_distribution_celery_bll


# 学校缴费任务派发
@celery_app.task(name="school_payment_task_distribution", queue="school_task")
def school_payment_task_distribution(data):
    payment_task_distribution_celery_bll.school_payment_task_distribution_process(data)


# 机构缴费任务派发
@celery_app.task(name="institution_payment_task_distribution", queue="school_task")
def institution_payment_task_distribution(data):
    payment_task_distribution_celery_bll.institution_payment_task_distribution_process(data)


# 再次提醒
@celery_app.task(name="again_distribution_task", queue="school_task")
def again_distribution_task(data):
    payment_task_distribution_celery_bll.again_distribution_task(data)

supervisor/celery配置

 启动命令

 

标签:celery,task,Flask,app,配置,Celery,path,distribution,os
From: https://www.cnblogs.com/Gaohx/p/18235131

相关文章

  • 电脑局域网内让其他电脑通过IP访问配置
    依次点击桌面左下角“开始菜单”>“所有应用”>“Windows系统”>“控制面板”,如图所示 在控制面板界面,选择“查看方式”为“大图标”,然后点击打开window防火墙,如图所示 然后点击“高级设置”,如图所示  在高级安全Windows防火墙界面,右侧点击Windows防火墙“属性”,如......
  • nginx 多个域名使用同一个通配符证书,并使用 include 指令来避免在多个 server 块中重
    1.创建SSL配置文件首先,创建一个包含SSL配置的文件,例如ssl_params.conf:#ssl_params.confssl_certificate/path/to/your/wildcard_certificate.pem;ssl_certificate_key/path/to/your/wildcard_private_key.key;ssl_session_timeout5m;ssl_ciphersECDHE-......
  • Spring运维之boot项目打包jar和插件运行并且设置启动时临时属性和自定义配置文件
    打包与运行window版本上制作我们要把idea中的程序抽取出来作为一个独立的jar包把jar包放到服务器上服务器是长期运行的我们就能随时访问了在Maven的生命周期中选择package打包功能在资源管理器中查找java-jar文件名.jarjava-jar文件名.jar但是我们在测试里面......
  • 普鲁米预览失败:出现 1 个错误:* 配置 Terraform AWS 提供程序:未找到 Terraform AWS 提
    我遇到了这样一个问题:当我尝试运行命令将资源导入现有的pulumi堆栈时,预览总是失败错误如下类型名称计划信息pulumi:pulumi:堆栈dev1错误-aws:ebs:Volumeidimport1错误诊断:pulumi:pulumi:Stack(dev):error:预览失败:出现1个错误:*配置TerraformAWS提......
  • 简单实现Viper配置管理
    本文由ChatMoney团队出品简介前面实现的一个简易suno-api。是使用cookie来获取suno-token发起请求的。当时并没有通过配置的方式来获取cookie,而是直接在代码中写死了cookie的值,这种做法并不好,所以现在打算把cookie值改造为一个配置,通过viper来读取。什么是viperViper是一......
  • Laravel Sail别名配置
    LaravelSail是Laravel的官方开发环境,它提供了一种轻松的方式来运行Laravel应用。开发推荐使用Sail环境。基于Docker又无需学习Docker。aliassail='sh$([-fsail]&&echosail||echovendor/bin/sail)'alias:这是一个shell命令,它可以用来为一个命令创建一个别......
  • mybatis逆向工程generatorConfiguration配置
      <?xmlversion="1.0"encoding="UTF-8"?><!DOCTYPEgeneratorConfigurationPUBLIC"-//mybatis.org//DTDMyBatisGeneratorConfiguration1.0//EN""http://mybatis.org/dtd/mybatis-generator-config_1_0.dtd"&......
  • k8s配置节点亲和性yaml示例:根据节点名称来配置节点亲和性(node affinity)
    在Kubernetes中,根据节点名称来配置节点亲和性(nodeaffinity)通常不是直接通过节点名称实现的,而是通过为节点添加特定的标签,然后在Pod的亲和性规则中匹配这些标签。不过,有一种特殊情况是使用NodeAffinity的nodeSelectorTerms中的matchExpressions,通过设置operator为In并使用......
  • Python Flask实现蓝图Blueprint配置和模块渲染
     Python基础学习:Pyhton语法基础Python变量Python控制流Python函数与类PythonException处理Python文件操作Python日期与时间PythonSocket的使用......
  • 新手上路:Linux虚拟机创建与Hadoop集群配置指南①(未完)
    一、基础阶段Linux操作系统:创建虚拟机1.创建虚拟机打开VM,点击文件,新建虚拟机,点击自定义,下一步下一步这里可以选择安装程序光盘映像文件,我选择稍后安装选择linux系统位置不选C盘,创建一个新的文件夹VM来放置虚拟机,将虚拟机名字改为master方便后续识别(也可以改为其他......