首页 > 编程语言 >python网络编程之sse

python网络编程之sse

时间:2024-12-23 20:22:16浏览次数:4  
标签:stream python self 编程 url session sse import def

服务端:

 

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import time

app = FastAPI()

# 允许所有来源的跨域请求
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # 允许所有来源
    allow_credentials=True,
    allow_methods=["*"],   # 允许所有HTTP方法
    allow_headers=["*"]    # 允许所有请求头
)

def generate_stream():
    for i in range(1, 11):
        yield f"data: Message {i}\n\n"
        time.sleep(1)

@app.get("/stream")
async def stream():
    return StreamingResponse(generate_stream(), media_type="text/event-stream")


#uvicorn sse_server:app --port 1090 --reload

  

客户端:

import requests
from requests.auth import HTTPBasicAuth
 
# 定义事件流客户端类
class EventStreamClient:
    def __init__(self, url, username=None, password=None):
        self.url = url
        self.session = requests.Session()
        if username and password:
            self.session.auth = HTTPBasicAuth(username, password)
 
    def connect(self):
        # 使用requests.Session来保持连接
        self.response = self.session.get(self.url, stream=True)
        self.response.raise_for_status()  # 检查是否连接成功
 
    def events(self):
        # 生成器:逐行读取响应内容并逐一生成事件
        for line in self.response.iter_lines():
            if line:
                yield line.decode('utf-8').rstrip()  # 去掉行尾的换行符
 
    def close(self):
        # 关闭会话
        self.session.close()
 
# 使用示例
url = 'http://127.0.0.1:1090/stream'  # 替换为实际的事件流URL
client = EventStreamClient(url, username='', password='')
try:
    client.connect()
    for event in client.events():
        print(event)  # 处理接收到的每个事件
finally:
    client.close()

  

输出:

 

标签:stream,python,self,编程,url,session,sse,import,def
From: https://www.cnblogs.com/xuxiaobo/p/18624944

相关文章

  • Qt编程快速入门(1)Qt结构简介
    Qt构成Qt(/ˈkjuːt/,发音同“cute”)是一个跨平台的C++应用程序开发框架。广泛用于开发GUI程序,这种情况下又被称为部件工具箱。也可用于开发非GUI程序,例如控制台工具和服务器。-摘自维基百科Qt可以在多个平台编译运行,包括Windows、Linux、MacOS,其运行时的表现取决于操作系统的U......
  • python网络编程之websocket
    服务端: importasyncioimportwebsockets#保存已连接的客户端列表connected_clients=set()asyncdefhandle_websocket(websocket):#将新的客户端添加到已连接客户端列表remote_address=websocket.remote_addressconnected_clients.add(websocke......
  • python网络编程之tcp
    服务端: importsocketimportstructsk=socket.socket()sk.bind(('127.0.0.1',9501))#申请操作系统的资源sk.listen()whileTrue:#print(f'sk:{sk}')#conn里存储的是一个客户端和服务端的连接信息conn,adder=sk.accept()#能够和多个客户端......
  • python网络编程之udp
    服务端: importsocketsk=socket.socket(type=socket.SOCK_DGRAM)#表示一个udp协议sk.bind(('127.0.0.1',9504))#服务端不能先发送消息,因为服务端不知道客户端的ipwhileTrue:msg,addr=sk.recvfrom(1024)print(f"接收到客户端数据:{msg.decode('utf-8......
  • python网络编程之http longpull
    服务端:fromflaskimportFlask,request,jsonifyimporttimeapp=Flask(__name__)@app.route('/stream',methods=['GET'])defpoll():#假设这里有一个方法来检查是否有新数据#为了示例,我们简单地模拟等待数据time.sleep(5)#模拟处理时间或等待......
  • Python中指数概率分布函数的绘图详解
    在数据科学和统计学中,指数分布是一种应用广泛的连续概率分布,通常用于建模独立随机事件发生的时间间隔。通过Python,我们可以方便地计算和绘制指数分布的概率密度函数(PDF)。本文将详细介绍指数分布的原理、应用场景,并提供详细的代码示例,展示如何在Python中绘制指数分布的概率密度函数......
  • Python进阶之opencv图片和视频基本读取关闭
    opencv目录opencvpip下载图片基本读取关闭导入前提读取显示和关闭图片属性视频读取显示和关闭视频读取pip下载在终端下载已经修改pip源可直接下载,未修改为下面代码+-i镜像网址代码展示:pipinstallopencv-python==3.4.18.65pipinstallopencv-cont......
  • Python企业公寓后勤管理系统(Pycharm Flask Django Vue mysql)
    文章目录项目介绍和开发技术介绍具体实现截图开发技术开发与测试:设计思路系统测试可行性分析核心代码部分展示文章目录/写作提纲参考源码/演示视频获取方式项目介绍和开发技术介绍论文主要是对后勤管理系统进行了介绍,包括研究的现状,还有涉及的开发背景,然后还对系统......
  • JAVA基础教程-(二)JAVA面向对象编程
    教程目录JAVA基础教程JAVA面向对象编程(二)一、类的成员1.1、属性1.2、方法1.2.1、方法的重载(overload)1.2.2、可变个数形参1.2.3、方法参数的值传递的机制1.3、构造器(构造方法)1.4、总结:属性赋值过程二、面向对象特征:封装和隐藏JAVA基础......
  • Python+Vue3+Django中国戏曲文化传播系统
    文章目录具体实现截图项目介绍和开发技术介绍开发技术核心代码部分展示项目结构分析文章目录/写作提纲参考源码/演示视频获取方式具体实现截图项目介绍和开发技术介绍(Pycharm毕业设计mysql)拟解决的主要问题和技术关键(1)如何将前端页面与数据库进行互连;(2)......