代码:
from fastapi import FastAPI, Response, Cookie, Depends from fastapi.responses import JSONResponse from fastapi.responses import HTMLResponse import os.path import MySQLdb import json import hashlib import random import math import os from datetime import datetime import uvicorn #uvicorn start_web_fastapi:app --port 1090 --reload class DateEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime): return obj.strftime("%Y-%m-%d %H:%M:%S") else: return json.JSONEncoder.default(self, obj) secretKey = 'saacac3423@21212' pagesize = 20 app = FastAPI() def getConn(): conn = MySQLdb.Connection('127.0.0.1', 'root', '123456', 'my_bbs') cursor = conn.cursor(cursorclass = MySQLdb.cursors.DictCursor) return (conn, cursor) def getloginuserinfo(sessionIdx, sessionId: str | None = Cookie(None)): (conn, cursor) = getConn() if sessionId is not None and sessionId != '' and sessionId.extra != {}: sessionIdx = sessionId sql = "select id,username,nickname,addTime,sessionId from user where sessionId='%s'" % sessionIdx cursor.execute(sql) data = cursor.fetchone() if data is None: data = {'id' : 0, 'username' : '', 'nickname' : '', 'addTime' : '', 'sessionId' : ''} return data def response(code, msg, data): if code != 0: result = {'code' : code, 'msg' : msg, 'data' : None} else: result = {'code' : 0, 'msg' : '', 'data' : data} result = json.dumps(result, cls = DateEncoder, ensure_ascii = False) response = HTMLResponse( content=result, headers={"Server": "python-fastapi"}, ) return response def error(code, msg): return response(code, msg, None) def success( data = {}): return response(0, '', data) @app.get("/") async def index(): (conn, cursor) = getConn() result = "此站接口使用python.fastapi实现,<a href='api.html' target='_blank'>接口列表</a>" response = HTMLResponse( content=result, headers={"Server": "python-fastapi"}, ) return response @app.get('/user/register') async def register(username: str = "", password: str = "", nickname: str = ""): (conn, cursor) = getConn() sql = "select id,username,nickname,addTime from user where username='%s'" % username cursor.execute(sql) data = cursor.fetchone() if data != None: return error(1, '用户名已经存在') try: passwordMd5 = hashlib.md5(password.encode(encoding='utf-8')).hexdigest() sql = "insert into user(username, password, nickname) value('%s', '%s', '%s')" % (username, passwordMd5, nickname) cursor.execute(sql) conn.commit() insertId = cursor.lastrowid return success(insertId) except MySQLdb.Error as e: conn.rollback() return error(1, '注册失败') @app.get('/user/login') async def login(username: str = "", password: str = ""): (conn, cursor) = getConn() passwordMd5 = hashlib.md5(password.encode(encoding='utf-8')).hexdigest() sql = "select id,username,nickname,addTime from user where username='%s' and password='%s'" % (username, passwordMd5) cursor.execute(sql) data = cursor.fetchone() if data == None: return error(1, '用户名或者密码错误') tmpSessionId = secretKey + str(data['id']) + str(data['addTime']) tmpSessionId = hashlib.md5(tmpSessionId.encode(encoding='utf-8')).hexdigest() try: sql = "update user set sessionId='%s' where id=%s" % (tmpSessionId, data['id']) cursor.execute(sql) conn.commit() data['sessionId'] = tmpSessionId return success(data) except MySQLdb.Error as e: conn.rollback() return error(1, '保存会话id失败') @app.get('/user/logout') async def logout(sessionId: str = ""): (conn, cursor) = getConn() data = getloginuserinfo(sessionId) if data == None: return success(None) if data['sessionId'] == '': return success(data) try: sql = "update user set sessionId='' where sessionId='%s'" % sessionId cursor.execute(sql) conn.commit() data['sessionId'] = '' return success(data) except MySQLdb.Error as e: conn.rollback() return error(1, '删除会话id失败') @app.get('/user/getuserinfo') async def getuserinfo(sessionId: str = ""): (conn, cursor) = getConn() userinfo = getloginuserinfo(sessionId) return success(userinfo) @app.get('/post/list') async def postlist(page: int = 1, keyword: str = ""): (conn, cursor) = getConn() page = int(page) if page <= 0: page = 1 addsql = " isDel=0 " if keyword is not None and keyword != '': addsql = " isDel=0 and title like '%"+keyword+"%' " start = (page - 1) * pagesize sql1 = "select count(1) as count from content where %s" % addsql cursor.execute(sql1) countdata = cursor.fetchone() totalpage = math.ceil(countdata['count'] / float(pagesize)) data = [] if totalpage > 0: sql2 = "select id,title,userId,userNickename,replyNum,updateTime from content where %s order by updateTime desc limit %s,%s" % (addsql, start, pagesize) cursor.execute(sql2) data = cursor.fetchall() return success({'totalpage' : totalpage, 'data' : data}) @app.get('/post/detail') async def postdetail(id: int = 0): (conn, cursor) = getConn() sql = "select id,title,content,userId,userNickename,replyNum,updateTime from content where isDel=0 and id=%s" % id cursor.execute(sql) data = cursor.fetchone() return success(data) @app.get('/post/add') async def postadd(title: str = "", content: str = "", sessionId: str = ""): (conn, cursor) = getConn() userinfo = getloginuserinfo(sessionId) userId = userinfo['id'] userNickename = userinfo['nickname'] if userId <= 0: return error(1, '请先登录') try: sql = "insert into content(title, content, userId, userNickename) value('%s', '%s', %s, '%s')" % (title, content, userId, userNickename) cursor.execute(sql) conn.commit() insertId = cursor.lastrowid return success(insertId) except MySQLdb.Error as e: conn.rollback() return error(1, '发帖失败') @app.get('/post/edit') async def postedit(id: int = 0, title: str = "", content: str = "", sessionId: str = ""): (conn, cursor) = getConn() userinfo = getloginuserinfo(sessionId) userId = userinfo['id'] userNickename = userinfo['nickname'] if userId <= 0: return error(1, '请先登录') try: sql = "update content set title='%s',content='%s',userId=%s,userNickename='%s' where id=%s and userId=%s" % (title, content, userId, userNickename, id, userId) cursor.execute(sql) conn.commit() return success(None) except MySQLdb.Error as e: conn.rollback() return error(1, '编辑帖子失败') @app.get('/post/delete') async def postdelete(id: int = 0, sessionId: str = ""): (conn, cursor) = getConn() userinfo = getloginuserinfo(sessionId) userId = userinfo['id'] userNickename = userinfo['nickname'] if userId <= 0: return error(1, '请先登录') try: sql = "update content set isDel=1 where id=%s and userId=%s" % (id, userId) cursor.execute(sql) conn.commit() return success(None) except MySQLdb.Error as e: conn.rollback() return error(1, '删除帖子失败') @app.get('/reply/list') async def replylist(page: int = 0, contentId: int = 0): (conn, cursor) = getConn() page = int(page) if page <= 0: page = 1 start = (page - 1) * pagesize sql1 = "select count(1) as count from reply where isDel=0 and contentId=%s" % contentId cursor.execute(sql1) countdata = cursor.fetchone() totalpage = math.ceil(countdata['count'] / float(pagesize)) data = [] if totalpage > 0: sql2 = "select id,content,replyUserId,replyUserNickename,addTime from reply where isDel=0 and contentId=%s order by id asc limit %s,%s" % (contentId, start, pagesize) cursor.execute(sql2) data = cursor.fetchall() return success({'totalpage' : totalpage, 'data' : data}) @app.get('/reply/detail') async def replydetail(id: int = 0): (conn, cursor) = getConn() sql = "select id,content,replyUserId,replyUserNickename,addTime from reply where isDel=0 and id=%s" % id cursor.execute(sql) data = cursor.fetchone() return success(data) @app.get('/reply/add') async def replyadd(contentId: int = 0, content: str = "", sessionId: str = ""): (conn, cursor) = getConn() userinfo = getloginuserinfo(sessionId) userId = userinfo['id'] userNickename = userinfo['nickname'] if userId <= 0: return error(1, '请先登录') try: sql2 = "update content set replyNum=replyNum+1 where id=%s" % contentId cursor.execute(sql2) sql1 = "insert into reply(contentId, content, replyUserId, replyUserNickename) value(%s, '%s', %s, '%s')" % (contentId, content, userId, userNickename) cursor.execute(sql1) conn.commit() insertId = cursor.lastrowid return success(insertId) except MySQLdb.Error as e: conn.rollback() return error(1, '回复失败') @app.get('/reply/edit') async def replyedit(id: int = 0, content: str = "", sessionId: str = ""): (conn, cursor) = getConn() userinfo = getloginuserinfo(sessionId) userId = userinfo['id'] userNickename = userinfo['nickname'] if userId <= 0: return error(1, '请先登录') try: sql = "update reply set content='%s',replyUserId=%s,replyUserNickename='%s' where id=%s and replyUserId=%s" % (content, userId, userNickename, id, userId) cursor.execute(sql) conn.commit() return success(None) except MySQLdb.Error as e: conn.rollback() return error(1, '编辑回复失败') @app.get('/reply/delete') async def replydelete(id: int = 0, sessionId: str = ""): (conn, cursor) = getConn() userinfo = getloginuserinfo(sessionId) userId = userinfo['id'] userNickename = userinfo['nickname'] if userId <= 0: return error(1, '请先登录') sql = "select id,content,replyUserId,replyUserNickename,addTime,contentId from reply where isDel=0 and id=%s" % id cursor.execute(sql) contentdata = cursor.fetchone() if contentdata is None: return error(1, '回复不存在') try: sql2 = "update content set replyNum=replyNum-1 where id=%s" % contentdata['contentId'] cursor.execute(sql2) sql1 = "update reply set isDel=1 where id=%s and replyUserId=%s" % (id, userId) cursor.execute(sql1) conn.commit() return success(None) except MySQLdb.Error as e: conn.rollback() return error(1, '删除回复失败') if __name__ == "__main__": uvicorn.run(app="start_web_fastapi:app", host="127.0.0.1", port=1090, reload=True)
输出:
D:\workspace\studys\study_pys\pc_app\dist>D:\software\Python310\python.exe D:\workspace\studys\study_bbs\start_web_fastapi.py [32mINFO[0m: Will watch for changes in these directories: ['D:\\workspace\\studys\\study_pys\\pc_app\\dist'] [32mINFO[0m: Uvicorn running on [1mhttp://127.0.0.1:1090[0m (Press CTRL+C to quit) [32mINFO[0m: Started reloader process [[36m[1m16692[0m] using [36m[1mWatchFiles[0m [32mINFO[0m: Started server process [[36m3692[0m] [32mINFO[0m: Waiting for application startup. [32mINFO[0m: Application startup complete.
标签:return,python,fastapi,data,cursor,sessionId,bbs,id,conn From: https://www.cnblogs.com/xuxiaobo/p/18392591