首页 > 编程语言 >一个练习项目,好玩的bbs-python-fastapi

一个练习项目,好玩的bbs-python-fastapi

时间:2024-09-02 14:04:44浏览次数:13  
标签:return python fastapi data cursor sessionId bbs id conn

代码:

from fastapi import FastAPI, Response, Cookie, Depends
from fastapi.responses import JSONResponse
from fastapi.responses import HTMLResponse
import os.path
import MySQLdb
import json
import hashlib
import random
import math
import os
from datetime import datetime
import uvicorn

#uvicorn start_web_fastapi:app --port 1090 --reload

class DateEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return obj.strftime("%Y-%m-%d %H:%M:%S")
        else:
            return json.JSONEncoder.default(self, obj)

secretKey = 'saacac3423@21212'
pagesize = 20
app = FastAPI()

def getConn():
    conn = MySQLdb.Connection('127.0.0.1', 'root', '123456', 'my_bbs')
    cursor = conn.cursor(cursorclass = MySQLdb.cursors.DictCursor)
    return (conn, cursor)

def getloginuserinfo(sessionIdx, sessionId: str | None = Cookie(None)):
    (conn, cursor) = getConn()
    if sessionId is not None and sessionId != '' and sessionId.extra != {}:
        sessionIdx = sessionId
        
    sql = "select id,username,nickname,addTime,sessionId from user where sessionId='%s'" % sessionIdx
    cursor.execute(sql)
    data = cursor.fetchone()
    if data is None:
        data = {'id' : 0, 'username' : '', 'nickname' : '', 'addTime' : '', 'sessionId' : ''}
    
    return data
    
def response(code, msg, data):
    if code != 0:
        result = {'code' : code, 'msg' : msg, 'data' : None}
    else:
        result = {'code' : 0, 'msg' : '', 'data' : data}
        
    result = json.dumps(result, cls = DateEncoder, ensure_ascii = False)
    response = HTMLResponse(
        content=result,
        headers={"Server": "python-fastapi"},
    )
    return response

def error(code, msg):
    return response(code, msg, None)

def success( data = {}):
    return response(0, '', data)

@app.get("/")
async def index():
    (conn, cursor) = getConn()
    result = "此站接口使用python.fastapi实现,<a href='api.html' target='_blank'>接口列表</a>"
    response = HTMLResponse(
        content=result,
        headers={"Server": "python-fastapi"},
    )
    return response

@app.get('/user/register')
async def register(username: str = "", password: str = "", nickname: str = ""):
    (conn, cursor) = getConn()
    sql = "select id,username,nickname,addTime from user where username='%s'" % username
    cursor.execute(sql)
    data = cursor.fetchone()
    if data != None:
        return error(1, '用户名已经存在')

    try:
        passwordMd5 = hashlib.md5(password.encode(encoding='utf-8')).hexdigest()
        sql = "insert into user(username, password, nickname) value('%s', '%s', '%s')" % (username, passwordMd5, nickname)
        cursor.execute(sql)
        conn.commit()
        insertId = cursor.lastrowid
        return success(insertId)
    except MySQLdb.Error as e:
        conn.rollback()
        return error(1, '注册失败')

@app.get('/user/login')
async def login(username: str = "", password: str = ""):
    (conn, cursor) = getConn()
    passwordMd5 = hashlib.md5(password.encode(encoding='utf-8')).hexdigest()
    sql = "select id,username,nickname,addTime from user where username='%s' and password='%s'" % (username, passwordMd5)
    cursor.execute(sql)
    data = cursor.fetchone()
    if data == None:
        return error(1, '用户名或者密码错误')

    tmpSessionId = secretKey + str(data['id']) + str(data['addTime'])
    tmpSessionId = hashlib.md5(tmpSessionId.encode(encoding='utf-8')).hexdigest()
    try:
        sql = "update user set sessionId='%s' where id=%s" % (tmpSessionId, data['id'])
        cursor.execute(sql)
        conn.commit()
        data['sessionId'] = tmpSessionId
        return success(data)
    except MySQLdb.Error as e:
        conn.rollback()
        return error(1, '保存会话id失败')

@app.get('/user/logout')        
async def logout(sessionId: str = ""):
    (conn, cursor) = getConn()
    data = getloginuserinfo(sessionId)
    
    if data == None:
        return success(None)
    
    if data['sessionId'] == '':
        return success(data)

    try:
        sql = "update user set sessionId='' where sessionId='%s'" % sessionId
        cursor.execute(sql)
        conn.commit()
        data['sessionId'] = ''
        return success(data)
    except MySQLdb.Error as e:
        conn.rollback()
        return error(1, '删除会话id失败')

@app.get('/user/getuserinfo') 
async def getuserinfo(sessionId: str = ""):
    (conn, cursor) = getConn()
    userinfo = getloginuserinfo(sessionId)
    return success(userinfo)
    
@app.get('/post/list')
async def postlist(page: int = 1, keyword: str = ""):
    (conn, cursor) = getConn()
    page = int(page)
    if page <= 0:
        page = 1
    addsql = " isDel=0 "
    if keyword is not None and keyword != '':
        addsql = " isDel=0 and title like '%"+keyword+"%' "
        
    start = (page - 1) * pagesize
    
    sql1 = "select count(1) as count from content where %s" % addsql
    cursor.execute(sql1)
    countdata = cursor.fetchone()
    totalpage = math.ceil(countdata['count'] / float(pagesize))
    
    data = []
    if totalpage > 0:
        sql2 = "select id,title,userId,userNickename,replyNum,updateTime from content where %s order by updateTime desc limit %s,%s" % (addsql, start, pagesize)
        cursor.execute(sql2)
        data = cursor.fetchall()
    
    return success({'totalpage' : totalpage, 'data' : data})

@app.get('/post/detail')
async def postdetail(id: int = 0):
    (conn, cursor) = getConn()
    sql = "select id,title,content,userId,userNickename,replyNum,updateTime from content where isDel=0 and id=%s" % id
    cursor.execute(sql)
    data = cursor.fetchone()
    
    return success(data)

@app.get('/post/add')  
async def postadd(title: str = "", content: str = "", sessionId: str = ""):
    (conn, cursor) = getConn()
    userinfo = getloginuserinfo(sessionId)
    userId = userinfo['id']
    userNickename = userinfo['nickname']
    
    if userId <= 0:
        return error(1, '请先登录')

    try:
        sql = "insert into content(title, content, userId, userNickename) value('%s', '%s', %s, '%s')" % (title, content, userId, userNickename)
        cursor.execute(sql)
        conn.commit()
        insertId = cursor.lastrowid
        return success(insertId)
    except MySQLdb.Error as e:
        conn.rollback()
        return error(1, '发帖失败')

@app.get('/post/edit') 
async def postedit(id: int = 0, title: str = "", content: str = "", sessionId: str = ""):
    (conn, cursor) = getConn()
    userinfo = getloginuserinfo(sessionId)
    userId = userinfo['id']
    userNickename = userinfo['nickname']
    
    if userId <= 0:
        return error(1, '请先登录')

    try:
        sql = "update content set title='%s',content='%s',userId=%s,userNickename='%s' where id=%s and userId=%s" % (title, content, userId, userNickename, id, userId)
        cursor.execute(sql)
        conn.commit()
        return success(None)
    except MySQLdb.Error as e:
        conn.rollback()
        return error(1, '编辑帖子失败')

@app.get('/post/delete') 
async def postdelete(id: int = 0, sessionId: str = ""):
    (conn, cursor) = getConn()
    userinfo = getloginuserinfo(sessionId)
    userId = userinfo['id']
    userNickename = userinfo['nickname']
    
    if userId <= 0:
        return error(1, '请先登录')

    try:
        sql = "update content set isDel=1 where id=%s and userId=%s" % (id, userId)
        cursor.execute(sql)
        conn.commit()
        return success(None)
    except MySQLdb.Error as e:
        conn.rollback()
        return error(1, '删除帖子失败')
    
@app.get('/reply/list') 
async def replylist(page: int = 0, contentId: int = 0):
    (conn, cursor) = getConn()
    page = int(page)
    if page <= 0:
        page = 1
    start = (page - 1) * pagesize
    
    sql1 = "select count(1) as count from reply where isDel=0 and contentId=%s" % contentId
    cursor.execute(sql1)
    countdata = cursor.fetchone()
    totalpage = math.ceil(countdata['count'] / float(pagesize))
    
    data = []
    if totalpage > 0:
        sql2 = "select id,content,replyUserId,replyUserNickename,addTime from reply where isDel=0 and contentId=%s order by id asc limit %s,%s" % (contentId, start, pagesize)
        cursor.execute(sql2)
        data = cursor.fetchall()
    
    return success({'totalpage' : totalpage, 'data' : data})
    
@app.get('/reply/detail') 
async def replydetail(id: int = 0):
    (conn, cursor) = getConn()
    sql = "select id,content,replyUserId,replyUserNickename,addTime from reply where isDel=0 and id=%s" % id
    cursor.execute(sql)
    data = cursor.fetchone()
    
    return success(data)
           
@app.get('/reply/add') 
async def replyadd(contentId: int = 0, content: str = "", sessionId: str = ""):
    (conn, cursor) = getConn()
    userinfo = getloginuserinfo(sessionId)
    userId = userinfo['id']
    userNickename = userinfo['nickname']
    
    if userId <= 0:
        return error(1, '请先登录')

    try:
        sql2 = "update content set replyNum=replyNum+1 where id=%s" % contentId
        cursor.execute(sql2)
        sql1 = "insert into reply(contentId, content, replyUserId, replyUserNickename) value(%s, '%s', %s, '%s')" % (contentId, content, userId, userNickename)
        cursor.execute(sql1)
        
        conn.commit()
        insertId = cursor.lastrowid
        return success(insertId)
    except MySQLdb.Error as e:
        conn.rollback()
        return error(1, '回复失败')
    
@app.get('/reply/edit') 
async def replyedit(id: int = 0, content: str = "", sessionId: str = ""):
    (conn, cursor) = getConn()
    userinfo = getloginuserinfo(sessionId)
    userId = userinfo['id']
    userNickename = userinfo['nickname']
    
    if userId <= 0:
        return error(1, '请先登录')

    try:
        sql = "update reply set content='%s',replyUserId=%s,replyUserNickename='%s' where id=%s and replyUserId=%s" % (content, userId, userNickename, id, userId)
        cursor.execute(sql)
        conn.commit()
        return success(None)
    except MySQLdb.Error as e:
        conn.rollback()
        return error(1, '编辑回复失败')
    
@app.get('/reply/delete') 
async def replydelete(id: int = 0, sessionId: str = ""):
    (conn, cursor) = getConn()
    userinfo = getloginuserinfo(sessionId)
    userId = userinfo['id']
    userNickename = userinfo['nickname']
    
    if userId <= 0:
        return error(1, '请先登录')

    sql = "select id,content,replyUserId,replyUserNickename,addTime,contentId from reply where isDel=0 and id=%s" % id
    cursor.execute(sql)
    contentdata = cursor.fetchone()
    
    if contentdata is None:
        return error(1, '回复不存在')

    try:
        sql2 = "update content set replyNum=replyNum-1 where id=%s" % contentdata['contentId']
        cursor.execute(sql2)
        sql1 = "update reply set isDel=1 where id=%s and replyUserId=%s" % (id, userId)
        cursor.execute(sql1)
        
        conn.commit()
        return success(None)
    except MySQLdb.Error as e:
        conn.rollback()
        return error(1, '删除回复失败')
    
if __name__ == "__main__":
    uvicorn.run(app="start_web_fastapi:app", host="127.0.0.1", port=1090, reload=True)

 

输出:

D:\workspace\studys\study_pys\pc_app\dist>D:\software\Python310\python.exe D:\workspace\studys\study_bbs\start_web_fastapi.py
INFO:     Will watch for changes in these directories: ['D:\\workspace\\studys\\study_pys\\pc_app\\dist']
INFO:     Uvicorn running on http://127.0.0.1:1090 (Press CTRL+C to quit)
INFO:     Started reloader process [16692] using WatchFiles
INFO:     Started server process [3692]
INFO:     Waiting for application startup.
INFO:     Application startup complete.

 

标签:return,python,fastapi,data,cursor,sessionId,bbs,id,conn
From: https://www.cnblogs.com/xuxiaobo/p/18392591

相关文章

  • 一个练习项目,好玩的bbs-python-bottle
    代码:frombottleimportroute,run,templatefrombottleimportBottle,request,responseimportos.pathimportMySQLdbimportjsonimporthashlibimportrandomimportmathimportosfromdatetimeimportdatetimeclassDateEncoder(json.JSONEncoder):......
  • 一个练习项目,好玩的bbs-python-pyramid
    代码:fromwsgiref.simple_serverimportmake_serverfrompyramid.configimportConfiguratorfrompyramid.viewimportview_configfrompyramid.responseimportResponseimportos.pathimportMySQLdbimportjsonimporthashlibimportrandomimportmathimport......
  • 一个练习项目,好玩的bbs-python-webpy
    代码:importwebimportos.pathimportMySQLdbimportjsonimporthashlibimportrandomimportmathimportosfromdatetimeimportdatetimeclassDateEncoder(json.JSONEncoder):defdefault(self,obj):ifisinstance(obj,datetime):......
  • [开题报告]flask框架的高校医务室管理系统(程序+论文+python)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着高校规模的不断扩大和学生健康意识的增强,高校医务室作为维护师生健康的重要机构,其管理效率与服务质量直接影响到师生的日常生活与学习......
  • 推荐一个Python打造的开源自动驾驶平台:Donkeycar!
    1、引言随着人工智能和自动驾驶技术的飞速发展,自动驾驶车辆的研究和开发成为了科技领域的热点。对于初学者、爱好者和学生而言,一款易于上手且功能强大的自动驾驶平台显得尤为重要。Donkeycar正是这样一款开源项目,它提供了一个轻量级、模块化的Python自驾车库,旨在促进快速实验和社区......
  • 如何使用 Python 调用 DPAPI ?
    在Windows环境下,DPAPI(DataProtectionAPI)是一种用于加密和解密数据的API,可以保护数据,使其只能由当前用户或计算机访问。在Python中,可以通过Cryptography或pywin32等库来使用DPAPI进行数据加密和解密。以下是我我做项目时使用Python调用DPAPI进行数据加密和解密的示......
  • [开题报告]flask框架的机电配件管理系统(程序+论文+python)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景在快速发展的机电行业中,配件管理作为保障设备高效运行与维护的重要环节,其重要性日益凸显。随着企业规模的扩大和生产复杂度的提升,传统的配......
  • 基于micropython的ESP8266控制触摸传感器的设计方案
       以下是一个基于MicroPython的ESP8266控制触摸传感器的设计方案:一、硬件准备1. ESP8266开发板(如NodeMCU)       2. 触摸传感器模块(如TTP223触摸传感器)   3. 杜邦线若干                    ......
  • 0基础学习Python路径(40)operator模块
    operator模块operator模块提供了一套与Python的内置运算符对应的高效率函数。函数的种类函数包含的种类有:对象的比较运算、逻辑运算、数学运算和序列运算比较运算运算函数语法小于lt(a,b)a<b小于等于le(a,b)a<=b大于gt(a,b)a>b大于等于ge(a,b)a>=b等于eq(......
  • Python股票程序交易接口查账,提交订单,自动交易(2)
    Python股票接口实现查询账户,提交订单,自动交易(1)上一篇是获取数据,获取数据不难,有很多第三方库都可以获取,不一定非要用券商官方的接口,程序交易主要是交易的执行,这个没有官方接口是很难实现的。券商的接口不用担心安全和稳定的问题,相当于就是普通股票账户,开通了程序化交易的权......