前言
celery 作为python中非常受欢迎的异步消息队列,而项目中操作数据库使用较多的也是ORM的框架,Tortoise也是使用较多的异步的ORM框架,当celery+Tortoise能碰撞成什么火花呢。
1.安装依赖
pip install celery
pip install tortoise-orm
pip install aerich
pip install aiomysql
pip install redis
pip install gevent
2. 实现代码
文件结构
│ application.py │ config.py │ models.py │ public_task.py └─celery_task | user_task.py
application.py
import asyncio
from celery import Celery
from tortoise import Tortoise
import config
app = Celery('celery_tortoise', include=["celery_task.user_task"])
app.config_from_object("config")
async def init_db():
await Tortoise.init(
config=config.TORTOISE_ORM
)
# 数据库操作使用当前的事件循环
asyncio.run(init_db())
"""
1.启动worker任务
celery -A application worker -l info -P gevent
2.启动beat 任务
celery -A application beat
"""
config.py
from datetime import timedelta
broker_url = 'redis://192.168.1.200:6379/4' # 使用Redis作为消息代理
result_backend = 'redis://192.168.1.200:6379/5' # 把任务结果存在了Redis
task_serializer = 'msgpack' # 任务序列化和反序列化使用msgpack方案
result_serializer = 'json' # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
result_expires = 60 * 60 * 24 # 任务过期时间
accept_content = ['json', 'msgpack'] # 指定接受的内容类型
# 使用beat进程自动生成任务
beat_schedule = {
'get_users': {
'task': 'celery_task.user_task.beat_task',
'schedule': timedelta(seconds=10),
'args': ()
},
}
TORTOISE_ORM = {
'connections': {
'default': {
'engine': 'tortoise.backends.mysql',
'credentials': {
'host': '192.168.1.200',
'port': '3306',
'user': 'root',
'password': '123456',
'database': 'celery',
'minsize': 1,
'maxsize': 5,
'charset': 'utf8mb4',
"echo": True
}
},
},
'apps': {
'models': {
'models': ['models', "aerich.models"],
'default_connection': 'default',
}
},
'use_tz': False,
'timezone': 'Asia/Shanghai'
}
models.py
from tortoise.models import Model
from tortoise import fields
class User(Model):
"""用户表"""
id = fields.IntField(pk=True, description="用户id")
username = fields.CharField(max_length=255, description="姓名")
pwd = fields.CharField(max_length=255, description="密码")
class Meta:
table = "user" # 数据库中的表名称
table_description = '用户表'
public_task.py
from celery_task.user_task import public_task
result = public_task.delay(2)
print(result)
user_task.py
import asyncio
from application import app
from models import User
# 处理任务需要开启一个新的事件循环
event_loop = asyncio.new_event_loop()
async def get_users():
user = await User.all().first()
print(f"get_users查询到用户: {user.username}", )
async def get_user(user_id):
user = await User.get(id=user_id)
print(f"get_user查询到用户:{user.username}")
@app.task
def beat_task():
event_loop.run_until_complete(get_users())
@app.task
def public_task(user_id):
event_loop.run_until_complete(get_user(user_id))
注意: 异步async中当前代码中一共用到了两个事件循环,操作数据库tortoise使用一个事件循环,而celery处理异步的任务是用到了一个新的事件循环,两个不同的处理过程需要两个不同的事件循环。
3.初始化数据库
# 1.生成配置文件
aerich init -t config.TORTOISE_ORM
# 2.在数据库中创建celery的库
# 3.迁移数据库, 会在数据库中生成user表
aerich init-db
# 数据库插入数据
insert into celery.`user`(username, pwd) values("test1", "123456"),("test2", "123456"),("test2", "123456")
4. 启动
启动定时任务
celery -A application beat
启动work
celery -A application worker -l info -P gevent
发布任务
python public_task.py
执行的结果
标签:task,get,py,celery,ORM,user,使用,import From: https://blog.csdn.net/weixin_43413871/article/details/136999546