E:\song\agv_fastapi_socket_v3\app.py
import json
# fastapi
from fastapi import FastAPI, WebSocket, Request, WebSocketDisconnect
from fastapi.responses import RedirectResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.openapi.docs import get_swagger_ui_html
from fastapi.templating import Jinja2Templates
from fastapi.middleware.cors import CORSMiddleware
# 启动数据库
# from config.config import initiate_database
from routes.scheduler import scheduler_router
from routes.status import status_router
from routes.config import config_router
from routes.control import control_router
from routes.navigate import navigate_router
from websocket.manager import websocketmanager
# websocket dispatch
from websocket.dispatch import dispatch_socket
# scheduler
from scheduler.scheduler import scheduler
# 启动定时器
scheduler.start()
# docs_url一定要设置成None,才会使用本地的 swagger-ui 的静态文件
app = FastAPI(docs_url=None)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
templates = Jinja2Templates(directory="templates")
app.mount('/static', StaticFiles(directory='static'), name='static')
@app.get('/docs', include_in_schema=False)
async def custom_swagger_ui_html():
return get_swagger_ui_html(
openapi_url=app.openapi_url,
title=app.title + " - Swagger UI ",
oauth2_redirect_url=app.swagger_ui_oauth2_redirect_url,
swagger_js_url="/static/swagger-ui/swagger-ui-bundle.js",
swagger_css_url="/static/swagger-ui/swagger-ui.css",
swagger_favicon_url="/static/swagger-ui/favicon.png"
)
# 初始化事件
@app.on_event("startup")
async def start_database():
# await initiate_database()
pass
# docs文档
@app.get("/", tags=["Docs"])
async def get_docs():
return RedirectResponse("/docs")
# jinjia2的测试网页
@app.get("/jinjia2", response_class=HTMLResponse)
async def read_jinjia2(request: Request):
return templates.TemplateResponse("index.html", {"request": request, "name": "Alice"})
# websocket的测试网页
@app.get("/websocket", response_class=HTMLResponse)
async def websocket_test(request: Request):
return templates.TemplateResponse("websocket.html", {"request": request})
@app.websocket("/wstest")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"send from serve: {data}")
app.include_router(scheduler_router, tags=["Scheduler"], prefix="/scheduler")
app.include_router(status_router, tags=["Status"], prefix="/status")
app.include_router(config_router, tags=["Config"], prefix="/config")
app.include_router(control_router, tags=["Control"], prefix="/control")
app.include_router(navigate_router, tags=["Navigate"], prefix="/navigate")
# client_id代表用户的唯一标识,用来区分不同的用户
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
# 将新的socket连接存储起来
await websocketmanager.connect(websocket)
try:
while True:
# data = await websocket.receive_json()
data = await websocket.receive_text()
data_dict = eval(data)
if 'websocket' in data_dict: # 处理心跳检测
await websocket.send_text("pong")
else:
await dispatch_socket(data_dict['msg_type'], data_dict["msg"], websocket)
except WebSocketDisconnect:
websocketmanager.disconnect(websocket)
await websocketmanager.broadcast(f"Client left the chat")
E:\song\agv_fastapi_socket_v3\main.py
import uvicorn
if __name__ == '__main__':
uvicorn.run('app:app', host="127.0.0.1", port=9000, reload=True)```
# `E:\song\agv_fastapi_socket_v3\config\config.py`
```py
from typing import Optional
from beanie import init_beanie
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseSettings
from models.student import Student
class Settings(BaseSettings):
# database configurations
DATABASE_URL: Optional[str] = None
class Config:
env_file = ".env"
orm_mode = True
async def initiate_database():
client = AsyncIOMotorClient(Settings().DATABASE_URL)
await init_beanie(database=client['agv'],
document_models=[Student])
E:\song\agv_fastapi_socket_v3\dao\student.py
from typing import List, Union
from beanie import PydanticObjectId
from models.student import Student
student_collection = Student
async def retrieve_students() -> List[Student]:
students = await student_collection.all().to_list()
return students
async def add_student(new_student: Student) -> Student:
student = await new_student.create()
return student
async def retrieve_student(id: PydanticObjectId) -> Student:
student = await student_collection.get(id)
if student:
return student
async def delete_student(id: PydanticObjectId) -> bool:
student = await student_collection.get(id)
if student:
await student.delete()
return True
async def update_student_data(id: PydanticObjectId, data: dict) -> Union[bool, Student]:
des_body = {k: v for k, v in data.items() if v is not None}
update_query = {"$set": {
field: value for field, value in des_body.items()
}}
student = await student_collection.get(id)
if student:
await student.update(update_query)
return student
return False
E:\song\agv_fastapi_socket_v3\models\student.py
from typing import Optional, Any
from beanie import Document
from pydantic import BaseModel, EmailStr
class Student(Document):
fullname: str
email: EmailStr
course_of_study: str
year: int
gpa: float
class Config:
schema_extra = {
"example": {
"fullname": "Abdulazeez Abdulazeez Adeshina",
"email": "abdul@school.com",
"course_of_study": "Water resources engineering",
"year": 4,
"gpa": "3.76"
}
}
class UpdateStudentModel(BaseModel):
fullname: Optional[str]
email: Optional[EmailStr]
course_of_study: Optional[str]
year: Optional[int]
gpa: Optional[float]
class Collection:
name = "student"
class Config:
schema_extra = {
"example": {
"fullname": "Abdulazeez Abdulazeez",
"email": "abdul@school.com",
"course_of_study": "Water resources and environmental engineering",
"year": 4,
"gpa": "5.0"
}
}
class Response(BaseModel):
status_code: int
response_type: str
description: str
data: Optional[Any]
class Config:
schema_extra = {
"example": {
"status_code": 200,
"response_type": "success",
"description": "Operation successful",
"data": "Sample data"
}
}
E:\song\agv_fastapi_socket_v3\routes\config.py
from fastapi import APIRouter
from typing import Optional
from pydantic import BaseModel
# agv socket 工厂方法
from tcpsocket.factory import agv_socket_factory
# status的端口号
from tcpsocket.params import Port
# 创建status router路由
config_router = APIRouter()
# 创建agv status的socket
agv_status_socket = agv_socket_factory.create(Port.STATUS.value)
agv_config_socket = agv_socket_factory.create(Port.CONFIG.value)
class StatusParams(BaseModel):
msg_type: int
msg: Optional[dict]
class Config:
schema_extra = {
"example": {
"msg_type": 1000,
"msg": {"参数名称": "xx参数"}
}
}
@config_router.get("/", response_description="编号 查询结果")
async def get_config():
return "地图"
@config_router.post("/", response_description="编号 查询结果")
async def post_config(params: StatusParams):
print(params)
return {
"map": '地图'
}
# try:
# agv_socket.send(1,Status(params.msg_type).value,{})
# res = agv_socket.receive()
# return res
# except Exception as e:
# print(e)
# raise HTTPException(
# status_code=409,
# detail="查询编号不正确"
# )
E:\song\agv_fastapi_socket_v3\routes\control.py
from fastapi import APIRouter
from typing import Optional
from pydantic import BaseModel
# agv socket 工厂方法
from tcpsocket.factory import agv_socket_factory
# status的端口号
from tcpsocket.params import Port
# 创建status router路由
control_router = APIRouter()
# 创建agv status的socket
agv_control_socket = agv_socket_factory.create(Port.CONTROL.value)
class StatusParams(BaseModel):
msg_type: int
msg: Optional[dict]
class Config:
schema_extra = {
"example": {
"msg_type": 1000,
"msg": {"参数名称": "xx参数"}
}
}
@control_router.get("/", response_description="编号 查询结果")
async def get_control():
return "地图"
@control_router.post("/", response_description="编号 查询结果")
async def post_control(params: StatusParams):
print(params)
return {
"map": '地图'
}
# try:
# agv_socket.send(1,Status(params.msg_type).value,{})
# res = agv_socket.receive()
# return res
# except Exception as e:
# print(e)
# raise HTTPException(
# status_code=409,
# detail="查询编号不正确"
# )
E:\song\agv_fastapi_socket_v3\routes\navigate.py
from fastapi import APIRouter
from typing import Optional
from pydantic import BaseModel
# agv socket 工厂方法
from tcpsocket.factory import agv_socket_factory
# status的端口号
from tcpsocket.params import Port
# 创建status router路由
navigate_router = APIRouter()
# 创建agv status的socket
agv_navigate_socket = agv_socket_factory.create(Port.NAVIGATE.value)
class StatusParams(BaseModel):
msg_type: int
msg: Optional[dict]
class Config:
schema_extra = {
"example": {
"msg_type": 1000,
"msg": {"参数名称": "xx参数"}
}
}
@navigate_router.get("/", response_description="编号 查询结果")
async def get_navigate():
return "地图"
@navigate_router.post("/", response_description="编号 查询结果")
async def post_navigate(params: StatusParams):
print(params)
return {
"map": '地图'
}
# try:
# agv_socket.send(1,Status(params.msg_type).value,{})
# res = agv_socket.receive()
# return res
# except Exception as e:
# print(e)
# raise HTTPException(
# status_code=409,
# detail="查询编号不正确"
# )
E:\song\agv_fastapi_socket_v3\routes\scheduler.py
from fastapi import APIRouter, Body
from scheduler.scheduler import scheduler
scheduler_router = APIRouter()
@scheduler_router.get("/", response_description="Toggle the scheduler")
async def toggle_scheduler(flag: bool):
if flag:
scheduler.resume()
else:
scheduler.pause()
return {
"status_code": 200,
"response_type": "success",
"description": "Students data retrieved successfully",
"data": "ok"
}
E:\song\agv_fastapi_socket_v3\routes\status.py
from fastapi import APIRouter
from typing import Optional
from pydantic import BaseModel
# agv socket 工厂方法
from tcpsocket.factory import agv_socket_factory
# status的端口号
from tcpsocket.params import Port
# 创建status router路由
status_router = APIRouter()
# 创建agv status的socket
agv_status_socket = agv_socket_factory.create(Port.STATUS.value)
class StatusParams(BaseModel):
msg_type: int
msg: Optional[dict]
class Config:
schema_extra = {
"example": {
"msg_type": 1000,
"msg": {"参数名称": "xx参数"}
}
}
@status_router.get("/")
async def get_status(msg_type: int):
print(msg_type)
return 'ok'
@status_router.post("/", response_description="编号 查询结果")
async def post_status(params: StatusParams):
return {
"name": 'Alice',
"age": 19,
"sex": "male"
}
# try:
# agv_socket.send(1,Status(params.msg_type).value,{})
# res = agv_socket.receive()
# return res
# except Exception as e:
# print(e)
# raise HTTPException(
# status_code=409,
# detail="查询编号不正确"
# )
E:\song\agv_fastapi_socket_v3\routes\student.py
from fastapi import APIRouter, Body
from dao.student import *
from models.student import *
student_router = APIRouter()
@student_router.get("/", response_description="Students retrieved", response_model=Response)
async def get_students():
return {
"status_code": 200,
"response_type": "success",
"description": "Students data retrieved successfully",
"data": "test"
}
E:\song\agv_fastapi_socket_v3\scheduler\scheduler.py
# 定时器任务计划
from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()
E:\song\agv_fastapi_socket_v3\socket_routes\status.py
import json
from websocket.manager import websocketmanager
from scheduler.scheduler import scheduler
# dispatch status,根据msg_type的值,来决定哪些查询添加定时器任务,哪些查询直接查询返回结果
async def dispatch_status(msg_type, msg, websocket):
if msg_type == 1020:
await add_schdulers(msg_type, msg, websocket)
print("添加任务计划")
else:
print("直接查询返回")
# 添加定时器任务
async def add_schdulers(msg_type, msg, websocket):
scheduler.add_job(job_status_task, 'interval',
seconds=5, args=[websocket])
scheduler.add_job(job_status_loc, 'interval',
seconds=5, args=[websocket])
# 定时获取路径导航信息
async def job_status_task(websocket):
await websocketmanager.send_personal_json(json.dumps({"robot_status_task_res": "name"}), websocket)
# 定时获取agv的位置坐标
async def job_status_loc(websocket):
await websocketmanager.send_personal_json(json.dumps({"robot_status_loc_res": "name"}), websocket)
E:\song\agv_fastapi_socket_v3\tcpsocket\agvsocket.py
import json
import socket
import struct
class AgvSocket:
__PACK_FMT_STR = '!BBHLH6s'
def __init__(self, ip, port):
so = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
so.connect((ip, port))
so.settimeout(5)
self.so = so
def send(self, req_id, msg_type, msg={}):
raw_msg = self.__pack_msg(req_id, msg_type, msg={})
self.so.send(raw_msg)
def receive(self):
global data
data_all = b''
try:
data = self.so.recv(16)
except socket.timeout:
print('timeout')
self.so.close()
json_data_len = 0
back_req_num = 0
if len(data) < 16:
print('pack head error')
print(data)
self.so.close()
else:
header = struct.unpack(AgvSocket.__PACK_FMT_STR, data)
json_data_len = header[3]
back_req_num = header[4]
data_all += data
data = b''
read_size = 1024
try:
while json_data_len > 0:
recv = self.so.recv(read_size)
data += recv
json_data_len -= len(recv)
if json_data_len < read_size:
read_size = json_data_len
data_all += data
return json.loads(data_all[16:].decode())
except socket.timeout:
print('timeout')
def close(self):
self.so.close()
@staticmethod
def __pack_msg(req_id, msg_type, msg={}):
msg_len = 0
json_str = json.dumps(msg)
if msg != {}:
msg_len = len(json_str)
raw_msg = struct.pack(AgvSocket.__PACK_FMT_STR, 0x5A, 0x01, req_id,
msg_len, msg_type, b'\x00\x00\x00\x00\x00\x00')
if msg != {}:
raw_msg += bytearray(json_str, 'ascii')
return raw_msg
E:\song\agv_fastapi_socket_v3\tcpsocket\factory.py
from .params import IP, Port
from .agvsocket import AgvSocket
class FactoryTcpSocket:
def __init__(self, ip: str):
self.ip = ip
self.socket_dic = {}
def create(self, port):
pass
# res = self.socket_dic.get(port)
# if res is None:
# agv_socket = AgvSocket(self.ip, port)
# self.socket_dic[port]=agv_socket
# return agv_socket
# else:
# return res
agv_socket_factory = FactoryTcpSocket(IP)
E:\song\agv_fastapi_socket_v3\tcpsocket\params.py
from enum import Enum
IP = "192.168.192.5"
class Port(Enum):
STATUS = 19204
CONTROL = 19205
NAVIGATE = 19206
CONFIG = 19207
OTHER = 19210
PUSH = 192301
class Status(Enum):
INFO = 1000 # 查询机器人信息
RUN = 1002 # 机器人运行信息
LOC = 1004 # 机器人位置
SPEED = 1005 # 机器人速度
BLOCK = 1006 # 机器人的被阻挡状态
BATTERY = 1007 # 机器人电池状态
MOTOR = 1040 # 电机状态信息
LASER = 1009 # 激光点云数据
AREA = 1011 # 机器人当前所在区域
EMERGENCY = 1012 # 机器人急停状态
IO = 1013 # 查询机器人I/O状态
IMU = 1014 # 查询机器人IMU数据
RFID = 1015 # RFID数据
ULTRASONIC = 1016 # 超声传感器数据
PGV = 1017 # 二维码数据
ENCODER = 1018 # 编码器数据
TASK = 1020 # 机器人导航状态
TASK_STATUS_PACKAGE = 1110 # 机器人任务状态
RELOC = 1021 # 机器人定位状态
LOADMAP = 1022 # 地图载入状态
SLAM = 1025 # 机器人扫图状态
DISPATCH = 1030 # 机器人当前的调度状态
ALARM = 1050 # 机器人报警状态
CURRENT_LOCK = 1060 # 当前控制权所有者
MAP = 1300 # 机器人载入的地图以及存储的地图
STATION = 1301 # 查询机器人当前载入地图中的站点信息
STARTDMXSCRIPT = 1903 # 运行氛围灯脚本
STOPBATTERYSCRIPT = 1904 # 停止氛围灯脚本
ALL1 = 1100 # 批量查询数据1
ALL2 = 1101 # 批量查询数据2
ALL3 = 1102 # 批量查询数据3
class Control(Enum):
pass
class Navigate(Enum):
pass
class Config(Enum):
# 从机器人上下载地图
DOWN_LOAD_MAP = 4011
class Other(Enum):
pass
class Push(Enum):
pass
E:\song\agv_fastapi_socket_v3\tcpsocket\socket_config.py
import socket
import json
import time
import struct
PACK_FMT_STR = '!BBHLH6s'
IP = '192.168.192.5'
Port = 19207
def packMasg(reqId, msgType, msg={}):
msgLen = 0
jsonStr = json.dumps(msg)
if (msg != {}):
msgLen = len(jsonStr)
rawMsg = struct.pack(PACK_FMT_STR, 0x5A, 0x01, reqId,
msgLen, msgType, b'\x00\x00\x00\x00\x00\x00')
if (msg != {}):
rawMsg += bytearray(jsonStr, 'ascii')
print(msg)
return rawMsg
so = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
so.connect((IP, Port))
so.settimeout(5)
# test_msg = packMasg(1,1110,{"task_ids":["SEER78914"]})
# 查询agv中所有的地图信息
test_msg = packMasg(1, 1300)
# 查询motor
# test_msg = packMasg(1,1040,{"motor_names":["left motor"]})
# 下载地图
test_msg = packMasg(1, 4011, {"map_name": "20221116_biaotou"})
so.send(test_msg)
dataall = b''
# while True:
try:
data = so.recv(16)
except socket.timeout:
print('timeout')
so.close
jsonDataLen = 0
backReqNum = 0
if (len(data) < 16):
print('pack head error')
print(data)
so.close()
else:
header = struct.unpack(PACK_FMT_STR, data)
jsonDataLen = header[3]
backReqNum = header[4]
dataall += data
data = b''
readSize = 1024
try:
while (jsonDataLen > 0):
recv = so.recv(readSize)
data += recv
jsonDataLen -= len(recv)
if jsonDataLen < readSize:
readSize = jsonDataLen
dataall += data
with open('map.json', 'w', encoding='utf-8') as f:
f.write(dataall[16:].decode())
except socket.timeout:
print('timeout')
so.close()
E:\song\agv_fastapi_socket_v3\templates\index.html
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>jinjia2</title>
<link href="{{ url_for('static', path='/index/style.css') }}" rel="stylesheet">
</head>
<body>
<h1>this is jinja2 template</h1>
<h2>{{name}}</h2>
</body>
</html>```
# `E:\song\agv_fastapi_socket_v3\templates\websocket.html`
```html
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off" />
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
// socket的连接地址
const ws = new WebSocket("ws://localhost:9000/status/ws");
// socket 连接成功的回调
ws.addEventListener('open', (e) => {
console.log(`socket 连接成功`)
})
// socket 连接失败的回调
ws.addEventListener('error', (e) => {
console.log(`socket 连接失败`)
})
// socket 连接断开时候的回调
ws.addEventListener('close', (e) => {
console.log(`socket 连接断开`)
})
// 采用这中属性赋值的方法,只能有一个回调函数,如果想要有多个回调函数,可以采用上面的addEventListener的方法,来添加回调函数
ws.onmessage = function (event) {
receMessage(event.data)
};
// 将接受到内容渲染到界面上
function receMessage(msg) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(msg)
message.appendChild(content)
messages.appendChild(message)
}
// 发送内容
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>```
# `E:\song\agv_fastapi_socket_v3\websocket\dispatch.py`
```py
from socket_routes.status import dispatch_status
# 分发任务
async def dispatch_socket(msg_type, msg, websocket):
# 查询机器人状态
if msg_type >= 1000 and msg_type <= 1999:
print("状态api")
await dispatch_status(msg_type, msg, websocket)
elif msg_type >= 2000 and msg_type <= 2999:
print("控制api")
elif msg_type >= 3000 and msg_type <= 3999:
print("导航api")
elif msg_type >= 4000 and msg_type <= 4999:
print("配置api")
elif msg_type >= 6000 and msg_type <= 6999:
print("其他api")
elif msg_type == 9300 or msg_type == 19301:
print("推送api")
else:
print("暂无此api")
E:\song\agv_fastapi_socket_v3\websocket\manager.py
from typing import List
from fastapi.websockets import WebSocket
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
# 直接发送,就是单个发送
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def send_personal_json(self, message: str, websocket: WebSocket):
await websocket.send_json(message)
# 广播,遍历所有的connection连接,然后发送消息
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
websocketmanager = ConnectionManager()
标签:websocket,socket,fastapi,离线,agv,msg,import,data
From: https://www.cnblogs.com/zhuoss/p/16963090.html