Flask 的异步用法案例。
案例 1:异步视图
异步视图允许你使用 async def
定义路由处理函数,这在处理 I/O 密集型任务时非常有用。
from flask import Flask
import asyncio
app = Flask(__name__)
@app.route('/async-data')
async def get_async_data():
# 模拟异步操作,例如从外部API获取数据
await asyncio.sleep(2) # 模拟网络延迟
return {'data': 'This is async data'}
案例 2:异步数据库操作
使用异步数据库库,如 aiosqlite
,来执行数据库查询,这可以提高应用的响应性能。
from flask import Flask, jsonify
import aiosqlite
app = Flask(__name__)
DATABASE = 'your_database.db'
async def get_db():
db = await aiosqlite.connect(DATABASE)
return db
@app.route('/data')
async def get_data():
async with get_db() as db:
cursor = await db.execute('SELECT * FROM your_table')
data = await cursor.fetchall()
return jsonify(data)
案例 3:异步任务队列
使用 Celery
与 Flask
结合,可以创建和管理后台任务,如发送异步邮件。
from flask import Flask
from celery import Celery
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
@celery.task
def send_email(to, subject, body):
# 这里实现发送邮件的逻辑
print(f'Email sent to {to}')
@app.route('/send-email')
def schedule_email():
send_email.delay('[email protected]', 'Hello', 'This is an async email.')
return 'Email scheduled to be sent.'
案例 4:异步错误处理
使用异步错误处理器来异步地处理错误,例如,记录错误日志到外部服务。
from flask import Flask
app = Flask(__name__)
@app.errorhandler(404)
async def page_not_found(e):
# 可以在这里执行异步操作,例如记录日志
await log_error_to_service(str(e))
return "Page not found", 404
async def log_error_to_service(error_message):
# 模拟异步日志记录
await asyncio.sleep(1)
print(f'Logged error: {error_message}')
标签:__,异步,Flask,app,用法,async,def
From: https://www.cnblogs.com/full-stack-linux-new/p/18250541