首页 > 其他分享 >celery中使用ORM

celery中使用ORM

时间:2024-03-25 22:29:05浏览次数:22  
标签:task get py celery ORM user 使用 import

前言

celery 作为python中非常受欢迎的异步消息队列,而项目中操作数据库使用较多的也是ORM的框架,Tortoise也是使用较多的异步的ORM框架,当celery+Tortoise能碰撞成什么火花呢。

1.安装依赖

pip install celery
pip install tortoise-orm 
pip install aerich
pip install aiomysql
pip install redis
pip install gevent

2. 实现代码

文件结构

│ application.py
│ config.py
│ models.py
│ public_task.py
└─celery_task
|  user_task.py

application.py

import asyncio
from celery import Celery
from tortoise import Tortoise
import config
​
app = Celery('celery_tortoise', include=["celery_task.user_task"])
​
app.config_from_object("config")
​
​
async def init_db():
    await Tortoise.init(
        config=config.TORTOISE_ORM
    )
​
# 数据库操作使用当前的事件循环
asyncio.run(init_db())
​
"""
1.启动worker任务
celery -A  application worker -l info -P gevent
2.启动beat 任务
celery -A application beat
"""
​
​

config.py

from datetime import timedelta
​
​
broker_url = 'redis://192.168.1.200:6379/4'     # 使用Redis作为消息代理
​
result_backend = 'redis://192.168.1.200:6379/5'     # 把任务结果存在了Redis
​
task_serializer = 'msgpack'     # 任务序列化和反序列化使用msgpack方案
​
result_serializer = 'json'  # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
​
result_expires = 60 * 60 * 24   # 任务过期时间
​
accept_content = ['json', 'msgpack']    # 指定接受的内容类型
​
​
# 使用beat进程自动生成任务
​
beat_schedule = {
​
    'get_users': {
        'task': 'celery_task.user_task.beat_task',
        'schedule': timedelta(seconds=10),
        'args': ()
​
    },
}
​
TORTOISE_ORM = {
    'connections': {
        'default': {
            'engine': 'tortoise.backends.mysql',
            'credentials': {
                'host': '192.168.1.200',
                'port': '3306',
                'user': 'root',
                'password': '123456',
                'database': 'celery',
                'minsize': 1,
                'maxsize': 5,
                'charset': 'utf8mb4',
                "echo": True
            }
        },
    },
    'apps': {
        'models': {
            'models': ['models', "aerich.models"],
            'default_connection': 'default',
​
        }
    },
    'use_tz': False,
    'timezone': 'Asia/Shanghai'
}
​
​
​
​

models.py

from tortoise.models import Model
from tortoise import fields
​
​
class User(Model):
    """用户表"""
    id = fields.IntField(pk=True, description="用户id")
    username = fields.CharField(max_length=255, description="姓名")
    pwd = fields.CharField(max_length=255, description="密码")
​
    class Meta:
        table = "user"  # 数据库中的表名称
        table_description = '用户表'
​
​

public_task.py

from celery_task.user_task import public_task
​
​
result = public_task.delay(2)
​
print(result)
​

user_task.py

import asyncio
from application import app
from models import User
​
# 处理任务需要开启一个新的事件循环
event_loop = asyncio.new_event_loop()
​
​
async def get_users():
    user = await User.all().first()
    print(f"get_users查询到用户: {user.username}", )
​
​
async def get_user(user_id):
    user = await User.get(id=user_id)
    print(f"get_user查询到用户:{user.username}")
​
​
@app.task
def beat_task():
    event_loop.run_until_complete(get_users())
​
​
@app.task
def public_task(user_id):
    event_loop.run_until_complete(get_user(user_id))

注意: 异步async中当前代码中一共用到了两个事件循环,操作数据库tortoise使用一个事件循环,而celery处理异步的任务是用到了一个新的事件循环,两个不同的处理过程需要两个不同的事件循环。

3.初始化数据库

# 1.生成配置文件
aerich init -t config.TORTOISE_ORM
# 2.在数据库中创建celery的库
# 3.迁移数据库, 会在数据库中生成user表
aerich init-db
# 数据库插入数据
insert into celery.`user`(username, pwd) values("test1", "123456"),("test2", "123456"),("test2", "123456")

4. 启动

启动定时任务

celery -A application beat

启动work

celery -A  application worker -l info -P gevent

发布任务

python public_task.py

执行的结果

标签:task,get,py,celery,ORM,user,使用,import
From: https://blog.csdn.net/weixin_43413871/article/details/136999546

相关文章

  • 学会在 C++ 中使用变量:从定义到实践
    C++变量变量是用于存储数据值的容器。在C++中,有不同类型的变量(使用不同的关键字定义),例如:int-存储整数(没有小数点),例如123或-123double-存储浮点数,带有小数点,例如19.99或-19.99char-存储单个字符,例如'a'或'B'。字符值用单引号括起来string-存储文本,例如......
  • 使用libvirt的端口转发,实现虚拟机跟外界互通
    参考Dynamicport-forwardingforNAT-edkvm/libvirtnetworksForwardingIncomingConnections在/etc/libvirt/hooks添加qemu脚本,模板如下,需要根据自己的配置进行修改:#!/bin/bash#usedsomefromadvancedscripttohavemultipleports:useanequalnumberofguest......
  • 灰色预测模型以及matlab软件使用
    1,灰色系统简介著名学者邓聚龙教授于20世纪70年代末、80年代初提出:“The诞生标志:邓教授第一篇灰色系统论文ControlProblemsofGreySystems”,发表于北荷兰出版公司期刊System&ControlLetter,1982,No.5.1.1灰色系统,白色系统,黑色系统白色系统是指一个系统的内部特征......
  • 2-12. 使用 UI Toolkit 和 UI Builder 制作物品编辑器
    Editor在Assets目录下创建Editor目录,然后在Editor目录下创建UIBuilder然后在UIBuilder下面右键创建UIToolkitEditor,起名叫ItemEditor然后就会打开一个ItemEditor窗口这个窗口也可以通过Window->UIToolkit->ItemEditor进行打开打开UIBuilder下......
  • ts里面的 ! 和 ? 还有 as以及各种符号的意义以及使用
    ts里面的!和?还有as以及各种符号的意义以及使用 https://www.jianshu.com/p/dd304d5cb3dc类型断言---as把一个大的范围断言成小的、精确的范围typeMethod='GET'|'POST'functiondd(url:string,method:Method){console.log('lll');};letoption=......
  • C++调用C# (使用C++/CLI)
    简介C++/CLI简介见上文 C#调用C++(使用C++/CLI)-咸鱼翻身?-博客园(cnblogs.com)新建解决方案->CPlusCSharpSolution新建C#类库->CSharpNative新建类NativeCalculate1publicclassNativeCalculate2{3publicintAdd(inta,intb)4{......
  • C#调用C++ (使用C++/CLI)
    简介C++/CLI(C++CommonLanguageInfrastructure)是一种允许在.NET平台上创建托管代码(managedcode)和非托管代码(unmanagedcode)之间互操作的语言。其原理可以概括如下:编译器支持:C++/CLI编译器能够编译同时包含托管和非托管代码的源文件。这使得在同一个项目中可以混合使用......
  • Typora安装及使用
    【一】下载Typora软件【1】下载官网https://typoraio.cn/系统重装会失效【2】破解方法(1)破解步骤!C:\ProgramFiles\Typora\resources中替换app.asar文件点击安装包一直下一步使用key.txt激活密钥【3】使用方法及快捷键(1)标题语法及快捷键最多6级标题代码方......
  • Pytorch之transforms
    文章目录一、transforms是什么?二、Tensor1.为何引入tensor类型的变量呢2.代码示例二、Normalize类三、Resize三、Compose三、Random_Crop一、transforms是什么?transforms对应transform.py文件。可以理解成一个工具箱,里面定义的各种类和方法,可以处理图片,对图片进......
  • Typora安装及使用方式
    【一】下载Typora软件【1】下载官网https://typoraio.cn/系统重装会失效【2】破解方法(1)破解步骤!C:\ProgramFiles\Typora\resources中替换app.asar文件点击安装包一直下一步使用key.txt激活密钥【3】使用方法及快捷键(1)标题语法及快捷键最多6级标题代码方......