一、依赖
<dependency>
<groupId>org.asynchttpclient</groupId>
<artifactId>async-http-client</artifactId>
<version>2.12.3</version>
</dependency>
二、事件流处理器
@Slf4j
public class ChatGptStreamedHandler implements StreamedAsyncHandler {
@Override
public State onStream(Publisher publisher) {
publisher.subscribe(new ChatGptSubscriber());
log.info("》》》》》 onStream");
return State.CONTINUE;
}
@Override
public State onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
log.info("》》》》》onStatusReceived");
return responseStatus.getStatusCode() == 200 ? State.CONTINUE : State.ABORT;
}
@Override
public State onHeadersReceived(HttpHeaders headers) throws Exception {
log.info("》》》》》 头信息处理");
return State.CONTINUE;
}
@Override
public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
log.info("》》》》》 onBodyPartReceived");
return State.CONTINUE;
}
@Override
public void onThrowable(Throwable t) {
log.error("发生异常", t);
}
@Override
public Object onCompleted() throws Exception {
log.info("》》》》》 完成");
return State.ABORT;
}
}
三、事件订阅者
{
"bodyByteBuffer": {
"array": "eyJlcnJvcl9jb2RlIjoxMDAsImVycm9yX21zZyI6IkludmFsaWQgcGFyYW1ldGVyIn0=",
"limit": 50,
"position": 0
},
"bodyPartBytes": "eyJlcnJvcl9jb2RlIjoxMDAsImVycm9yX21zZyI6IkludmFsaWQgcGFyYW1ldGVyIn0=",
"last": false
}
{
"id": "chatcmpl-7WHp5USDDqtzkqXXXXXXXXXXXXX",
"object": "chat.completion.chunk",
"created": 1687929363,
"model": "gpt-3.5-turbo-0613",
"choices": [
{
"index": 0,
"delta": {
"content": "你"
},
"finish_reason": null
}
]
}
@Slf4j
public class ChatGptSubscriber implements Subscriber {
private final static String DATA_SEPARATOR = "data: ";
private final static String FINISH_MARK = "[DONE]";
@Override
public void onSubscribe(Subscription s) {
log.info(">>>>> onSubscribe {}", JSON.toJSONString(s));
s.request(Long.MAX_VALUE);
}
// 暂存字段
private String tempStorage = Strings.EMPTY;
@Override
public void onNext(Object o) {
String bodyDatas = new String(JSON.parseObject(JSON.toJSONString(o)).getBytes("bodyPartBytes"));
log.info(">>>>> onNext {}", bodyDatas);
boolean firstLine = false;
StringBuilder words = new StringBuilder();
if(bodyDatas.startsWith(DATA_SEPARATOR)){
firstLine = true;
// 结尾刚好为完整JSON
if(StringUtil.isNotEmpty(tempStorage)){
// 通用处理
words.append(getWord(tempStorage));
}
}
String[] singleWords = bodyDatas.split(DATA_SEPARATOR);
for(int i=0; i<singleWords.length; i++){
String singleWordJsonStr = singleWords[i];
// 校验
if(StringUtil.isEmpty(singleWordJsonStr)){
continue;
}
if(FINISH_MARK.equals(singleWordJsonStr)){
break;
}
// 结尾处理
if(i == singleWords.length -1){
tempStorage = singleWordJsonStr;
break;
}
// 开头处理
if(!firstLine && i==0 && StringUtil.isNotEmpty(tempStorage)){
singleWordJsonStr = tempStorage + singleWordJsonStr;
}
// 通用处理
String word = getWord(singleWordJsonStr);
if(StringUtil.isNotEmpty(word)){
words.append(word);
}
}
log.info(">>>>> ai says: {}", words);
}
private String getWord(String singleWordJsonStr) {
ChatCompletionResponseDTO chatCompletionResponseDTO = OpenAiUtil.checkAndGetResponse(singleWordJsonStr, ChatCompletionResponseDTO.class);
return chatCompletionResponseDTO.getChoices().get(0).getDelta().getContent();
}
@Override
public void one rror(Throwable t) {
//这里就是监听到的响应
log.info(">>>>> one rror {}", JSON.toJSONString(t));
}
@Override
public void onComplete() {
log.info(">>>>> onComplete ");
}
}
四、请求调用
public static void main(String[] args) throws Exception {
//原始地址
String apiUrl = "https://api.openai.com/v1/chat/completions";
String apiKey = "";
List<JSONObject> msgData = new ArrayList<>();
msgData.add(
new JSONObject()
.fluentPut("role", "user")
.fluentPut("content", "200字夸奖我一下")
);
JSONObject req = new JSONObject()
// 完成时要生成的最大token数量。
// 提示的token计数加上max_tokens不能超过模型的上下文长度。大多数模型的上下文长度为2048个token(最新模型除外,支持4096个)。
.fluentPut("max_tokens", 512)
// .fluentPut("model", "gpt-3.5-turbo")
.fluentPut("model", "gpt-4")
// 可选 默认值1
// 使用什么样的采样温度,介于0和2之间。较高的值(如0.8)将使输出更加随机,而较低的值(例如0.2)将使其更加集中和确定。
// 通常建议更改它或top_p,但不能同时更改两者。
.fluentPut("temperature", 0.6)
//告诉ChatGpt 使用流式返回,传FALSE 则返回完整的JSON(有点慢)
.fluentPut("stream", Boolean.TRUE)
.fluentPut("messages", msgData);
log.info("》》 {}", JSON.toJSONString(req));
AsyncHttpClient asyncHttpClient = new DefaultAsyncHttpClient();
asyncHttpClient.preparePost(apiUrl)
.addHeader("Content-Type", "application/json")
.addHeader("Authorization", "Bearer " + apiKey)
.addHeader("Accept", "text/event-stream")
.setBody(JSON.toJSONString(req))
.execute(new ChatGptStreamedHandler())
.get();
asyncHttpClient.close();
}
五、输出
通过输出我们可以发现,实际上也是先获取全部结果,再按照流式进行返回。并没有缩短等待时长
参考:
JAVA 监听 text/event-stream ,事件流 SSE_java text/event-stream_高冷滴互联网农民工的博客-CSDN博客