基于django来实现sse
最近在实现通过post调用三方的接口得到sse流数据,并且自己需要用拿到的数据用sse流在返回给前端
from rest_framework.views import APIView
from django.http import StreamingHttpResponse, JsonResponse
class ChatMessageViewSet(APIView):
def post(self, request):
"""
获取列表
:param request:
:return:
"""
# 发送POST请求到第三方API,并开启流式响应
try:
response = requests.post(url, headers=headers, json=payload, stream=True)
if response.status_code == 200:
# 创建一个StreamingHttpResponse来逐块传递数据
def stream_response(ai_answer_str: str):
for chunk in response.iter_lines(): # 使用iter_lines处理行
try:
yield f"{chunk}\n\n" # 格式化为SSE事件
except json.JSONDecodeError as e:
print(2222, e)
# 如果解析 JSON 失败,直接发送原始数据
yield f"{chunk}\n\n" # 格式化为SSE事件
# 返回StreamingHttpResponse
return StreamingHttpResponse(stream_response(), content_type='text/event-stream')
else:
# 如果请求失败,返回错误信息
return JsonResponse({"code": 400, "message": response.json(), "data": {}}, status=200)
except Exception as e:
# 处理请求异常
return JsonResponse({"code": 400, "message": str(e), "data": {}}, status=200)
这里必须注意的是 sse返回的格式,一定要是 (data: \n\n),开头是data冒号和两个空格,最后以两个换行结尾
标签:return,stream,StreamingHttpResponse,接口,django,sse,response From: https://www.cnblogs.com/up-k/p/18464452