服务端:
from fastapi import FastAPI from fastapi.responses import StreamingResponse from fastapi.middleware.cors import CORSMiddleware import time app = FastAPI() # 允许所有来源的跨域请求 app.add_middleware( CORSMiddleware, allow_origins=["*"], # 允许所有来源 allow_credentials=True, allow_methods=["*"], # 允许所有HTTP方法 allow_headers=["*"] # 允许所有请求头 ) def generate_stream(): for i in range(1, 11): yield f"data: Message {i}\n\n" time.sleep(1) @app.get("/stream") async def stream(): return StreamingResponse(generate_stream(), media_type="text/event-stream") #uvicorn sse_server:app --port 1090 --reload
客户端:
import requests from requests.auth import HTTPBasicAuth # 定义事件流客户端类 class EventStreamClient: def __init__(self, url, username=None, password=None): self.url = url self.session = requests.Session() if username and password: self.session.auth = HTTPBasicAuth(username, password) def connect(self): # 使用requests.Session来保持连接 self.response = self.session.get(self.url, stream=True) self.response.raise_for_status() # 检查是否连接成功 def events(self): # 生成器:逐行读取响应内容并逐一生成事件 for line in self.response.iter_lines(): if line: yield line.decode('utf-8').rstrip() # 去掉行尾的换行符 def close(self): # 关闭会话 self.session.close() # 使用示例 url = 'http://127.0.0.1:1090/stream' # 替换为实际的事件流URL client = EventStreamClient(url, username='', password='') try: client.connect() for event in client.events(): print(event) # 处理接收到的每个事件 finally: client.close()
输出:
标签:stream,python,self,编程,url,session,sse,import,def From: https://www.cnblogs.com/xuxiaobo/p/18624944