袋鼠云数栈产品中 AI+ 实现原理剖析
我们是袋鼠云数栈 UED 团队,致力于打造优秀的一站式数据中台产品。我们始终保持工匠精神,探索前端道路,为社区积累并传播经验价值。
本文作者:修能
生产力工具 + AI 是不可逆转的趋势,慢慢的大模型能力通过 AI Agent 落地的工程化能力也开始趋于成熟。作为大数据产品的数栈也必然是需要借助 AI 能力提升产品竞争力。
去年 12 月,我们在产品中上线了 AI+ 的功能,借助已经开源的大模型的能力,帮助我们探索和落地更多地应用场景。在初版 AI+ 的功能中,我们实现了基础功能的通话。
SSE
在 ChatGPT 中,我们在等待大模型生成回答的时间通常不需要很久。这是因为 ChatGPT 通过 server-sent events(SSE)
来实现将生成的部分回答通过事件流传递到前端。而这就让前端不必等回答全部生成后再获取,也就使得不需要请求等待很久。
SSE 是一种基于 HTTP 协议的单向通信机制,用于服务端向客户端推送数据。
SSE | WebSocket |
---|---|
基于 HTTP 协议 | 基于 TCP 连接,本身是一种协议 |
单向通信 | 双向通信 |
简单易用 | 复杂 |
入门使用
// 创建 SSE 的实例
const evtSource = new EventSource("//api.example.com/ssedemo.php", {
withCredentials: true,
});
// 添加监听事件
evtSource.onmessage = (event) => {
const newElement = document.createElement("li");
const eventList = document.getElementById("list");
newElement.textContent = `message: ${event.data}`;
eventList.appendChild(newElement);
};
// 错误处理
evtSource.onerror = (err) => {
console.error("EventSource failed:", err);
};
// 关闭事件流
evtSource.close();
需要注意的是,SSE 请求的服务端响应信息头的 MIME 类型必须是text/event-stream
,否则会无法监听到事件。
另外,由于是基于 HTTP 协议的,所以在 HTTP/1.1 或更低的时候,会受浏览器最大连接数的限制。
Fields
收到的消息格式一定是具有以下字段的某种组合,其他字段名都将忽略,每行一个:
event
data
id
retry
: this is a test stream // 第一条消息,这会被解析会注释
data: some text // 第二条消息
data: another message // 第三条消息
data: with two lines
event: userconnect // 第四条消息
data: {"username": "bobby", "time": "02:33:48"}
如上所示,默认浏览器的 EventSource API 虽然可用,但是限制比较多。
- 只支持 url 和 withCredentials 参数。不支持往 body 里传参数。而通常来说 URL 是有最大长度限制的。
- 无法自定义请求头。
- 只能发起 GET 请求。
其实,我们也可以通过 Fetch 来实现 SSE 的通信,只不过需要额外自行处理数据流的传递。
实现
首先,我们借助 Fetch 的能力来实现请求。
const response = await fetch(url, options);
通过接受用户提供的 url 和 options 发起一个 fetch 的请求。
然后,我们需要排除掉非 SSE 的请求类型,我们可以直接拿响应的 header 中拿 content-type
进行判断。
const contentType = response.headers.get('content-type');
if (!contentType?.startsWith('text/event-stream')) {
throw new Error('SSE 请求必须设置 content-type 为 text/event-stream');
}
接着,我们业务场景中通常直接通过 response.json()
获取 JSON 格式的数据了,但这里我们由于是事件流,所以我们通过 response.body
拿到的是一个 ReadableStream
。我们需要借助相关的 API 进行流的读取。
const reader = response.body.getReader();
let result: ReadableStreamDefaultReadResult<Uint8Array>;
while (!(result = await reader.read()).done) {
// 假定每一次 read 的 value 都是完整的消息
onmessage(onChunk(result.value));
}
其中 onChunk 函数就是处理事件流中的每一份数据的。
// 伪代码
function onChunk(arr: Uint8Array){
const links = seekLinks();
// 待完善
}
在实现 seekLinks
方法之前,我们需要先知道到什么时候算每一行的结束。
从 Fields 可以知道,每一行是以\n
作为区分的。
function seekLinks(arr: Uint8Array){
const lines = [];
const buffer = arr;
const bufLength = buffer.length;
let position = 0;
let lineStart = 0;
while(position < bufLength){
// '\n'.charCodeAt() === 10;
if(buffer[position] === 10){
lines.push(buffer.slice(lineStart, position));
lineStart = position;
};
position += 1;
}
return lines;
}
在获取到所有行后,针对每一行做处理。
// 伪代码
function onChunk(arr: Uint8Array){
const links = seekLinks();
const decoder = new TextDecoder();
let message = {
data: '',
event: '',
id: '',
retry: undefined,
}:
links.forEach((line) => {
// ':'.charCodeAt() === 58;
const colon = line.findIndex(l => l === 58);
const fieldArr = line.slice(0, colon);
const valueArr = line.slice(colon);
if(colon === -1){
// 当冒号作为开头的时候,解析成注释
return;
}
const field = decoder.decode(fieldArr);
const value = decoder.decode(valueArr);
switch (field) {
case 'data':
message.data = message.data
? message.data + '\n' + value
: value;
break;
case 'event':
message.event = value;
break;
case 'id':
message.id = value;
break;
case 'retry':
const retry = parseInt(value, 10);
message.retry = retry
break;
}
});
return message;
}
大致完成了最简单的基础功能的解析,而以上伪代码参考 fetch-event-source 的源码。
借助 fetch-event-source 的能力,在数栈产品中调用的方式和 HTTP 请求基本保持一致。
function sse(url: string, params: any, options: FetchEventSourceInit) {
const headers = {
'Content-Type': 'application/json',
accept: 'text/event-stream',
};
fetchEventSource(url, {
method: 'POST',
body: JSON.stringify(params),
headers,
...options,
});
}
打字机效果
接着,我们实现具备科技感的打字机效果:
输出
这里我们不能直接将响应的消息直接打印到屏幕上,因为响应的消息通常是好多字,这样子会导致打字机效果显得非常卡顿,用户体验不佳。
在数栈产品中,我们通过将响应的消息收集到暂存区中,然后通过每秒从暂存区中取出若干个字符打印到屏幕上,优化打字机卡顿的效果。
function AIGC(){
const typing = useTyping({
// 暂存区启动后,每个 delay 的时间都会执行该方法将消息打印到屏幕上
onTyping(val) {
// ...
},
});
const handleChat = (message: string) => {
// 标志暂存区需要开始存响应的消息了
typing.start();
requestChat(params, {
onmessage(event: { data: string }) {
const { data } = event;
// 把响应的消息存入暂存区中
typing.push(data);
},
onclose() {
// 关闭或失败的话,释放暂存区的数据
typing.close();
},
onerror() {
typing.close();
},
});
};
}
其中,相关暂存区的代码整理成 useTyping
实现。
export default function useTyping({
onTyping,
onEnd,
}: {
onTyping: (val: string) => void;
onEnd: () => void;
}) {
const interval = useRef<number>();
const queue = useRef<string>('');
const isStart = useRef<boolean>(false);
function startTyping() {
if (interval.current) return;
let index = 0;
interval.current = window.setInterval(() => {
if (index < queue.current.length) {
const str = queue.current;
onTyping(str.slice(0, index + 1));
index++;
} else if (!isStart.current) {
// 如果发送了全部的消息且信号关闭,则清空队列
window.clearInterval(interval.current);
interval.current = 0;
onEnd();
}
// 如果发送了全部的消息,但是信号没有关闭,则什么都不做继续轮训等待新的消息
}, 50);
}
useEffect(() => {
return () => {
window.clearInterval(interval.current);
interval.current = 0;
};
}, []);
function start() {
isStart.current = true;
window.clearInterval(interval.current);
interval.current = 0;
queue.current = '';
}
function push(str: string) {
if (!isStart.current) return;
queue.current += str.replace(/\\n/g, '\n');
startTyping();
}
// 关闭的时候不需要清空队列,因为可能还有一些消息没有发送完毕,统一等消息发送完毕后关闭
function close() {
isStart.current = false;
}
return { start, push, close };
}
光标
在实现了打字机效果后,我们还需要添加一个闪烁的光标。
原理比较简单,就是在消息区域的最后一个元素的末尾添加元素即可。
.markdown {
>*:last-child::after {
content: " ";
width: 2px;
height: 13px;
transform: translate(1px, 2px);
font-family: Menlo, Monaco, "Courier New", monospace;
font-weight: normal;
font-size: 0;
font-feature-settings: "liga" 0, "calt" 0;
line-height: 13px;
letter-spacing: 0;
display: inline-block;
visibility: hidden;
animation: blinker 1s step-end infinite;
background: #000;
}
@keyframes blinker {
0% {
visibility: inherit;
}
50% {
visibility: hidden;
}
100% {
visibility: inherit;
}
}
}
当然,这里有一些问题,在 markdown 解析出 Code Block 的时候会导致光标错位,这个问题 ChatGPT 同样也有。
那么到这里,我们就实现了一个具备基础功能的 AI+ 的需求。
最后
欢迎关注【袋鼠云数栈UED团队】~
袋鼠云数栈 UED 团队持续为广大开发者分享技术成果,相继参与开源了欢迎 star