現(xiàn)在市面上Java實(shí)現(xiàn)的流式輸出代碼很少,只能自己動(dòng)手豐衣足食。
一、為什么大語(yǔ)言模型使用流式輸出內(nèi)容
大語(yǔ)言模型采用流式輸出內(nèi)容的原因主要有以下幾點(diǎn):
提高用戶體驗(yàn):流式輸出使得模型的回復(fù)不是一次性生成整個(gè)回答,而是逐字逐句地生成。這種方式避免了用戶長(zhǎng)時(shí)間等待整個(gè)回復(fù)生成完畢的情況,從而提升了用戶體驗(yàn)。
提升交互響應(yīng)速度:通過逐字蹦出回復(fù),可以實(shí)現(xiàn)更快的交互響應(yīng)。這意味著在用戶輸入消息后,模型可以快速開始生成回答的開頭,并根據(jù)上下文逐漸細(xì)化回答。
增強(qiáng)對(duì)話透明度:流式輸出可以讓用戶看到模型逐步構(gòu)建回答的過程,這有助于用戶理解模型是如何形成回答的,提高了對(duì)話的透明度和可解釋性。
優(yōu)化性能表現(xiàn):對(duì)于大型語(yǔ)言模型來說,生成完整的內(nèi)容可能需要較長(zhǎng)的計(jì)算時(shí)間。流式輸出允許模型邊計(jì)算邊輸出,這樣即使模型推理效率不是很高,也能保證用戶體驗(yàn)不會(huì)受到太大影響。
實(shí)現(xiàn)動(dòng)畫效果:流式輸出還可以模仿打字機(jī)的動(dòng)畫效果,即一個(gè)字或一個(gè)詞的輸出,給用戶一種答案逐漸出現(xiàn)的視覺效果。
綜上所述,流式輸出是大語(yǔ)言模型在交互過程中的一種有效策略,它兼顧了效率和用戶體驗(yàn),同時(shí)也增強(qiáng)了模型的互動(dòng)性和透明度。
二、關(guān)于SSE技術(shù)
當(dāng)然WebSocket也可以達(dá)到效果,本文使用更輕量的SSE來實(shí)現(xiàn)。
SSE,全稱為Server-Sent Events(服務(wù)器發(fā)送事件),是一種允許服務(wù)器向?yàn)g覽器客戶端推送實(shí)時(shí)信息的Web技術(shù)。這種機(jī)制基于HTTP協(xié)議,利用長(zhǎng)輪詢的方式,讓服務(wù)器可以主動(dòng)向客戶端發(fā)送更新的數(shù)據(jù),而無(wú)需客戶端不斷地發(fā)起請(qǐng)求去詢問是否有新的數(shù)據(jù)。這個(gè)特點(diǎn)正好符合我都需求,看了這么多大語(yǔ)言模型的應(yīng)用,一直在琢磨底層實(shí)現(xiàn)。
SSE的工作原理是建立在傳統(tǒng)的HTTP請(qǐng)求之上,但與傳統(tǒng)HTTP請(qǐng)求不同的是,一旦建立連接,服務(wù)器就可以持續(xù)地向客戶端發(fā)送消息,直到連接被關(guān)閉??蛻舳私邮盏降南⑼ǔR訨SON或其他格式編碼,并且每條消息都包含一個(gè)事件類型和數(shù)據(jù)負(fù)載。
SSE具有以下特點(diǎn):
單向通信:SSE主要用于服務(wù)器向客戶端發(fā)送數(shù)據(jù),而不是雙向通信。如果需要客戶端向服務(wù)器發(fā)送數(shù)據(jù),通常需要另外的機(jī)制,如WebSocket。
簡(jiǎn)單高效:SSE使用標(biāo)準(zhǔn)的HTTP協(xié)議,不需要額外的庫(kù)或插件,且相比于WebSocket,它在某些情況下可能更加高效,因?yàn)樗恍枰?wù)器發(fā)送數(shù)據(jù),而不需要保持全雙工的連接。
自動(dòng)重連:如果連接中斷,SSE會(huì)自動(dòng)嘗試重新連接,這對(duì)于需要高可靠性的實(shí)時(shí)數(shù)據(jù)推送非常有用。
跨瀏覽器支持:大多數(shù)現(xiàn)代瀏覽器都支持SSE,包括Chrome、Firefox、Safari和Edge。
SSE常用于需要實(shí)時(shí)數(shù)據(jù)更新的應(yīng)用場(chǎng)景,如股票價(jià)格更新、新聞推送、社交媒體通知等。通過SSE,開發(fā)者可以輕松地構(gòu)建實(shí)時(shí)交互式的Web應(yīng)用程序,為用戶提供更加豐富和動(dòng)態(tài)的體驗(yàn)。
三、SSE代碼實(shí)現(xiàn)
在Java中,SseEmitter 是 Spring 框架提供的一個(gè)用于服務(wù)器發(fā)送事件(Server-Sent Events, SSE)的工具。SSE 允許服務(wù)器向客戶端推送實(shí)時(shí)信息,客戶端通過一個(gè)持久的HTTP連接接收這些信息。
SpringBoot的pom依賴如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
-->
在使用SSE之前也測(cè)試了webflux框架實(shí)現(xiàn)打字機(jī),效果不好放棄。
為了使用 SseEmitter 實(shí)現(xiàn)類似打字機(jī)效果的流式輸出,你需要?jiǎng)?chuàng)建一個(gè) SseEmitter 實(shí)例,然后逐步發(fā)送數(shù)據(jù)給客戶端。下面是一個(gè)簡(jiǎn)單的例子:
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TypewriterController {
@GetMapping("/typewriter")
public SseEmitter typewriter() {
SseEmitter emitter = new SseEmitter();
// 模擬從大語(yǔ)言模型獲取數(shù)據(jù)的過程
String[] data = {"Hello", "World", "from", "the", "large", "language", "model"};
for (String word : data) {
try {
// 模擬打字機(jī)效果,每個(gè)單詞之間暫停100毫秒
Thread.sleep(100);
emitter.send(SseEmitter.event().data(word));
} catch (InterruptedException e) {
emitter.completeWithError(e);
return emitter;
}
}
emitter.complete();
return emitter;
}
}
在這個(gè)例子中,我們定義了一個(gè) TypewriterController 類,它有一個(gè) /typewriter 端點(diǎn)。當(dāng)這個(gè)端點(diǎn)被訪問時(shí),它會(huì)創(chuàng)建一個(gè)新的 SseEmitter 對(duì)象,并逐個(gè)發(fā)送字符串?dāng)?shù)組中的單詞。每發(fā)送一個(gè)單詞后,線程會(huì)暫停100毫秒來模擬打字機(jī)的效果。
客戶端可以通過建立一個(gè)到 /typewriter 端點(diǎn)的持久連接來接收這些事件。例如,如果你使用JavaScript作為客戶端,你可以這樣寫:
<!DOCTYPE html>
<html>
<head>
<title>Typewriter Effect</title>
</head>
<body>
<div id="output"></div>
<script>
var source = new EventSource('/typewriter');
source.onmessage = function(event) {
var outputDiv = document.getElementById('output');
outputDiv.innerHTML += event.data + ' '; // 將接收到的數(shù)據(jù)添加到頁(yè)面中
};
</script>
</body>
</html>
這段HTML和JavaScript代碼會(huì)打開一個(gè)到服務(wù)器的SSE連接,并在接收到新數(shù)據(jù)時(shí)更新頁(yè)面的內(nèi)容。每次收到數(shù)據(jù)時(shí),都會(huì)將其追加到 <div id="output"> 元素中,從而實(shí)現(xiàn)類似于打字機(jī)逐字顯示文本的效果。
請(qǐng)注意,在實(shí)際應(yīng)用中,你可能需要處理更多的細(xì)節(jié),比如錯(cuò)誤處理、連接關(guān)閉時(shí)的清理工作等。此外,如果你的大語(yǔ)言模型是通過異步方式生成數(shù)據(jù)的,你可能還需要考慮如何與 SseEmitter 進(jìn)行集成,以確保數(shù)據(jù)能夠正確地流式傳輸?shù)娇蛻舳恕?/p>
四、API方式對(duì)接SSE服務(wù)端
上面基于JavaScript作為客戶端獲取返回的流式數(shù)據(jù),通過后端的Java代碼也可以對(duì)接SSE服務(wù)端,增加其他邏輯,比如:鑒權(quán)、計(jì)費(fèi)、敏感詞過濾等。
pom文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.guo.test</groupId>
<artifactId>streamClient</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>streamClient</name>
<description>streamClient</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.10.0</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp-sse</artifactId>
<version>4.10.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
要注意下面代碼設(shè)置接收媒體類型為:
text/event-stream,建議使用`MediaType.TEXT_EVENT_STREAM_VALUE`代替字符串編碼。
package com.guo.test.streamclient.client;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.springframework.http.MediaType;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class streamClient {
private static final String SSE_URL = "http://localhost:8080/llm/stream/query?query=地鐵安全門的規(guī)范"; // 替換為你的SSE端點(diǎn)地址
public static void main(String[] args) {
OkHttpClient client = new OkHttpClient.Builder()
.readTimeout(0, TimeUnit.MILLISECONDS) // 設(shè)置無(wú)限讀取超時(shí),因?yàn)镾SE是長(zhǎng)連接
.build();
Request request = new Request.Builder()
.url(SSE_URL)
.header("Accept", "text/event-stream") // 設(shè)置接收SSE媒體類型
.build();
try (Response response = client.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new IOException("Unexpected code " + response);
}
// 獲取響應(yīng)體并讀取流
ResponseBody responseBody = response.body();
if (responseBody == null) {
return;
}
try (java.io.Reader reader = responseBody.charStream()) {
char[] buffer = new char[1024];
int bytesRead;
while ((bytesRead = reader.read(buffer)) != -1) {
String data = new String(buffer, 0, bytesRead);
// 處理接收到的SSE數(shù)據(jù)
System.out.println("Received data: " + data);
// 查找SSE事件邊界(通常是"\n\n")
/*int eventBoundary = data.indexOf("\n\n");
if (eventBoundary != -1) {
String event = data.substring(0, eventBoundary);
String eventData = data.substring(eventBoundary + 2);
// 處理事件頭部和事件數(shù)據(jù)
System.out.println("Event: " + event);
System.out.println("Event Data: " + eventData);
}*/
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}