首页 > 其他分享 >HTTP事件流 text/event-stream

HTTP事件流 text/event-stream

时间:2023-12-19 10:14:08浏览次数:36  
标签:info HTTP log stream text State Override public String

GitHub All-in-one OpenAI Demo

一、依赖

<dependency>
  <groupId>org.asynchttpclient</groupId>
  <artifactId>async-http-client</artifactId>
  <version>2.12.3</version>
</dependency>

二、事件流处理器

@Slf4j
public class ChatGptStreamedHandler implements StreamedAsyncHandler {
    @Override
    public State onStream(Publisher publisher) {
        publisher.subscribe(new ChatGptSubscriber());
        log.info("》》》》》 onStream");
        return State.CONTINUE;
    }

    @Override
    public State onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
        log.info("》》》》》onStatusReceived");
        return responseStatus.getStatusCode() == 200 ? State.CONTINUE : State.ABORT;
    }

    @Override
    public State onHeadersReceived(HttpHeaders headers) throws Exception {
        log.info("》》》》》 头信息处理");
        return State.CONTINUE;
    }

    @Override
    public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
        log.info("》》》》》 onBodyPartReceived");
        return State.CONTINUE;
    }

    @Override
    public void onThrowable(Throwable t) {
        log.error("发生异常", t);
    }

    @Override
    public Object onCompleted() throws Exception {
        log.info("》》》》》 完成");
        return State.ABORT;
    }
}

三、事件订阅者

{
  "bodyByteBuffer": {
    "array": "eyJlcnJvcl9jb2RlIjoxMDAsImVycm9yX21zZyI6IkludmFsaWQgcGFyYW1ldGVyIn0=",
    "limit": 50,
    "position": 0
  },
  "bodyPartBytes": "eyJlcnJvcl9jb2RlIjoxMDAsImVycm9yX21zZyI6IkludmFsaWQgcGFyYW1ldGVyIn0=",
  "last": false
}
{
  "id": "chatcmpl-7WHp5USDDqtzkqXXXXXXXXXXXXX",
  "object": "chat.completion.chunk",
  "created": 1687929363,
  "model": "gpt-3.5-turbo-0613",
  "choices": [
    {
      "index": 0,
      "delta": {
        "content": "你"
      },
      "finish_reason": null
    }
  ]
 }
@Slf4j
public class ChatGptSubscriber implements Subscriber {

    private final static String DATA_SEPARATOR = "data: ";
    private final static String FINISH_MARK = "[DONE]";

    @Override
    public void onSubscribe(Subscription s) {
        log.info(">>>>> onSubscribe  {}", JSON.toJSONString(s));
        s.request(Long.MAX_VALUE);
    }

    // 暂存字段
    private String tempStorage = Strings.EMPTY;

    @Override
    public void onNext(Object o) {
        String bodyDatas = new String(JSON.parseObject(JSON.toJSONString(o)).getBytes("bodyPartBytes"));
        log.info(">>>>> onNext  {}", bodyDatas);
        boolean firstLine = false;
        StringBuilder words = new StringBuilder();
        if(bodyDatas.startsWith(DATA_SEPARATOR)){
            firstLine = true;
            // 结尾刚好为完整JSON
            if(StringUtil.isNotEmpty(tempStorage)){
                // 通用处理
                words.append(getWord(tempStorage));
            }
        }
        String[] singleWords = bodyDatas.split(DATA_SEPARATOR);

        for(int i=0; i<singleWords.length; i++){
            String singleWordJsonStr = singleWords[i];
            // 校验
            if(StringUtil.isEmpty(singleWordJsonStr)){
                continue;
            }
            if(FINISH_MARK.equals(singleWordJsonStr)){
                break;
            }
            // 结尾处理
            if(i == singleWords.length -1){
                tempStorage = singleWordJsonStr;
                break;
            }
            // 开头处理
            if(!firstLine && i==0 && StringUtil.isNotEmpty(tempStorage)){
                singleWordJsonStr = tempStorage + singleWordJsonStr;
            }
            // 通用处理
            String word = getWord(singleWordJsonStr);
            if(StringUtil.isNotEmpty(word)){
                words.append(word);
            }
        }

        log.info(">>>>> ai says: {}", words);
    }

    private String getWord(String singleWordJsonStr) {
        ChatCompletionResponseDTO chatCompletionResponseDTO = OpenAiUtil.checkAndGetResponse(singleWordJsonStr, ChatCompletionResponseDTO.class);
        return chatCompletionResponseDTO.getChoices().get(0).getDelta().getContent();
    }

    @Override
    public void one rror(Throwable t) {
        //这里就是监听到的响应
        log.info(">>>>> one rror  {}", JSON.toJSONString(t));
    }

    @Override
    public void onComplete() {
        log.info(">>>>> onComplete ");
    }
}

四、请求调用

public static void main(String[] args) throws Exception {
    //原始地址
    String apiUrl = "https://api.openai.com/v1/chat/completions";
    String apiKey = "";
    List<JSONObject> msgData = new ArrayList<>();
    msgData.add(
            new JSONObject()
                    .fluentPut("role", "user")
                    .fluentPut("content", "200字夸奖我一下")
    );
    JSONObject req = new JSONObject()
            // 完成时要生成的最大token数量。
            // 提示的token计数加上max_tokens不能超过模型的上下文长度。大多数模型的上下文长度为2048个token(最新模型除外,支持4096个)。
            .fluentPut("max_tokens", 512)
//                .fluentPut("model", "gpt-3.5-turbo")
            .fluentPut("model", "gpt-4")
            // 可选 默认值1
            // 使用什么样的采样温度,介于0和2之间。较高的值(如0.8)将使输出更加随机,而较低的值(例如0.2)将使其更加集中和确定。
            // 通常建议更改它或top_p,但不能同时更改两者。
            .fluentPut("temperature", 0.6)
            //告诉ChatGpt 使用流式返回,传FALSE 则返回完整的JSON(有点慢)
            .fluentPut("stream", Boolean.TRUE)
            .fluentPut("messages", msgData);

    log.info("》》 {}", JSON.toJSONString(req));
    AsyncHttpClient asyncHttpClient = new DefaultAsyncHttpClient();
    asyncHttpClient.preparePost(apiUrl)
            .addHeader("Content-Type", "application/json")
            .addHeader("Authorization", "Bearer " + apiKey)
            .addHeader("Accept", "text/event-stream")
            .setBody(JSON.toJSONString(req))
            .execute(new ChatGptStreamedHandler())
            .get();
    asyncHttpClient.close();
}

五、输出

image.png
通过输出我们可以发现,实际上也是先获取全部结果,再按照流式进行返回。并没有缩短等待时长


参考:
JAVA 监听 text/event-stream ,事件流 SSE_java text/event-stream_高冷滴互联网农民工的博客-CSDN博客

标签:info,HTTP,log,stream,text,State,Override,public,String
From: https://www.cnblogs.com/meidanlong/p/17912974.html

相关文章

  • HTTP事件流 text/event-stream
    GitHubAll-in-oneOpenAIDemo一、依赖<dependency><groupId>org.asynchttpclient</groupId><artifactId>async-http-client</artifactId><version>2.12.3</version></dependency>二、事件流处理器@Slf4jpublicclass......
  • StreamUtils
    packagecom.redis.utils;importcom.SpringUtils;importcom.StringUtils;importlombok.extern.slf4j.Slf4j;importorg.springframework.data.domain.Range;importorg.springframework.data.redis.connection.RedisConnection;importorg.springframework.data.red......
  • Adaptive Sparse Convolutional Networks with Global Context Enhancement for Faste
    AdaptiveSparseConvolutionalNetworkswithGlobalContextEnhancementforFasterObjectDetectiononDroneImages*Authors:[[BoweiDu]],[[YechengHuang]],[[JiaxinChen]],[[DiHuang]]初读印象comment::提出了一种新型全局上下文增强自适应稀疏卷积网络(CEAS......
  • stream流的概述以及idea与stream
    前面自己学过一些流的概念,比如IO流,用于读写本地的数据。stream流主要是用于对集合/数组进行操作 idea现在已经很好的支持Stream流操作,在debug的时候可以很好的看到详细内容 下面以一个我的简单demo为例1.distinct进行去重 2.filter去重之后的过滤//是一个中间操作 3......
  • wgcloud运维部署 - 配置使用https证书
    https://www.wgstart.com/help/docs137.html......
  • Istio从入门到精通—— 流量治理的原理 —— VirutalService —— HTTPRedirect
    流量治理的原理——VirutalService——HTTPRedirecthttps://istio.io/latest/docs/reference/config/networking/virtual-service/#HTTPRedirect HTTPRedirectcanbeusedtosenda301redirectresponsetothecaller,wheretheAuthority/HostandtheURIinthe......
  • Ubuntu20.04下DeepStream Python环境安装
    引子最近工作学习中遇到多路视频解码抽帧的需求,考虑到项目上大多数用到的都是Nvidia的显卡,常规CPU软解显然无法满足多路的需求,故考虑使用N卡的硬解码功能。然后我就毫不犹豫的去找轮子了,ChatGPT这么火,那就先问问它吧。嗯,呃,貌似下图红框里答案只有那么一点点靠谱(不要问我为啥......
  • HTTP 3.0之QUIC优势和TCP弊端
    1HTTP3.01.1引言从HTTP/1.1到HTTP/2,HTTP协议一直都是使用TCP作为传输协议。然而,就在最新的HTTP/3,HTTP就直接把TCP抛弃了,向孤立无援的UDP伸出了援手,基于UDP协议的基础上,在应用层实现了一个可靠的传输协议——QUIC。很多同学可能就好奇了,HTTP都用TCP都用了几......
  • 鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之Text文本组件
    鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之文本组件一、操作环境操作系统: Windows10专业版IDE:DevEcoStudio3.1SDK:HarmonyOS3.1编辑二、文本组件Text 是显示文本的基础组件之一,它可以包含子组件 Span ,当包含 Span 时不生效,只显示 Span 的内容。Text('我是Text'){Span('......
  • boost beast http::read 一直阻塞不返回,问题解决, 使用parser对象的skip(true) 来解
    用beast作为客户端发送http请求后读web服务端返回的数据,遇到了http::read或http::async_read一直阻塞着,不返回,直到连接过期后被强制网络断开后read函数才返回。看了官方文档,文档里这么描述的,read要一直等到end_of_stream时才回退出阻塞状态。也就是连接失效后才行。但我们的......