pip install vibora tortoise-orm
from tortoise import fields, models from tortoise.contrib.pydantic import pydantic_model_creator class User(models.Model): id = fields.IntField(pk=True) username = fields.CharField(max_length=255, unique=True) email = fields.CharField(max_length=255, unique=True) # 创建 Pydantic model,方便在请求处理中使用 UserPydantic = pydantic_model_creator(User, name='User')
from tortoise import Tortoise async def init_db(): # 这里配置数据库连接 TORTOISE_ORM = { 'connections': { 'default': { 'engine': 'asyncpg', 'host': '127.0.0.1', 'port': '5432', 'user': 'user', 'password': 'password', 'database': 'test_db', }, }, 'apps': { 'models': { 'models': ['models'], # 注册我们的models模块 'default_connection': 'default', }, }, } await Tortoise.init(TORTOISE_ORM) await Tortoise.generate_schemas()
from vibora import Vibora, Request from pydantic import BaseModel from tortoise.exceptions import DoesNotExist from models import User, UserPydantic from db_config import init_db app = Vibora() @app.listener('before_server_start') async def setup_db(app, loop): await init_db() # 创建一个请求模型来验证输入 class CreateUser(BaseModel): username: str email: str @app.route('/users', methods=['POST']) async def create_user(request: Request, user: CreateUser): user = await User.create(username=user.username, email=user.email) return {'id': user.id, 'username': user.username, 'email': user.email} @app.route('/users/{user_id}', methods=['GET']) async def get_user(request: Request, user_id: int): try: user = await User.get(id=user_id).prefetch_related('user') return {'id': user.id, 'username': user.username, 'email': user.email} except DoesNotExist: return {'error': 'User not found'}, 404 @app.route('/users/{user_id}', methods=['PUT']) async def update_user(request: Request, user_id: int, user: CreateUser): try: await User.filter(id=user_id).update(username=user.username, email=user.email) return {'message': 'User updated successfully'} except DoesNotExist: return {'error': 'User not found'}, 404 @app.route('/users/{user_id}', methods=['DELETE']) async def delete_user(request: Request, user_id: int): try: await User.get(id=user_id).delete() return {'message': 'User deleted successfully'} except DoesNotExist: return {'error': 'User not found'}, 404 if __name__ == '__main__': app.run(debug=True)
from vibora import Vibora from db_config import init_db from routes import setup_routes app = Vibora() @app.listener('before_server_start') async def setup_db(app, loop): await init_db() # 设置路由 setup_routes(app) if __name__ == '__main__': app.run(debug=True)
from pydantic import BaseModel from tortoise.exceptions import DoesNotExist from models import User, UserPydantic from vibora import Request, route class CreateUser(BaseModel): username: str email: str @route('/users', methods=['POST']) async def create_user(request: Request, user: CreateUser): user = await User.create(username=user.username, email=user.email) return {'id': user.id, 'username': user.username, 'email': user.email} @route('/users/{user_id}', methods=['GET']) async def get_user(request: Request, user_id: int): try: user = await User.get(id=user_id).prefetch_related('user') return {'id': user.id, 'username': user.username, 'email': user.email} except DoesNotExist: return {'error': 'User not found'}, 404 @route('/users/{user_id}', methods=['PUT']) async def update_user(request: Request, user_id: int, user: CreateUser): try: await User.filter(id=user_id).update(username=user.username, email=user.email) return {'message': 'User updated successfully'} except DoesNotExist: return {'error': 'User not found'}, 404 @route('/users/{user_id}', methods=['DELETE']) async def delete_user(request: Request, user_id: int): try: await User.get(id=user_id).delete() return {'message': 'User deleted successfully'} except DoesNotExist: return {'error': 'User not found'}, 404 def setup_routes(app): app.add_route(create_user) app.add_route(get_user) app.add_route(update_user) app.add_route(delete_user)
from vibora import Vibora, Request, Response from vibora.responses import JSONResponse from db_config import init_db from routes import setup_routes from middleware import token_middleware import asyncio import datetime app = Vibora() @app.listener('before_server_start') async def setup_db(app, loop): await init_db() # 设置路由 setup_routes(app) # 定时事务 async def schedule_transactions(app): while True: # 第一个事务:每6秒处理一次 await asyncio.sleep(6) print("Running the first transaction...") # 第二个事务:每33分钟处理一次 await asyncio.sleep(33 * 60) print("Running the second transaction...") # 第三个事务:每天早上1点到5点之间处理 now = datetime.datetime.now() if now.hour >= 1 and now.hour < 5: print("Running the third transaction...") # 确保不会在1小时内多次执行 await asyncio.sleep((5 - now.hour) * 3600 - now.minute * 60) # 跨域处理中间件 @app.middleware async def cors_middleware(request: Request, handler): response = await handler(request) response.headers['Access-Control-Allow-Origin'] = '*' # 允许所有来源 response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE, OPTIONS' response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization' return response # 使用中间件验证token app.middleware(token_middleware) # 启动定时事务 @app.listener('after_server_start') async def start_scheduling(app, loop): loop.create_task(schedule_transactions(app)) if __name__ == '__main__': app.run(debug=True)
from vibora import Request, Response from typing import Awaitable from jose import jwt # 假设使用的是PyJWT库 async def token_middleware(request: Request, handler: Awaitable) -> Response: token = request.headers.get('Authorization', '').replace('Bearer ', '') if token: try: # 假设你的secret_key是在环境变量中 user_info = jwt.decode(token, 'secret_key', algorithms=['HS256']) # 将用户信息添加到请求中 request.state.user_info = user_info except: return JSONResponse({'error': 'Invalid token'}, status_code=401) else: return JSONResponse({'error': 'Token is missing'}, status_code=401) return await handler(request)
标签:经验,vibora,app,email,user,import,id,User From: https://www.cnblogs.com/pearlcity/p/17975028