袋鼠云数栈产品中 AI+ 实现原理剖析

我们是袋鼠云数栈 UED 团队,致力于打造优秀的一站式数据中台产品。我们始终保持工匠精神,探索前端道路,为社区积累并传播经验价值。

本文作者:修能

生产力工具 + AI 是不可逆转的趋势,慢慢的大模型能力通过 AI Agent 落地的工程化能力也开始趋于成熟。作为大数据产品的数栈也必然是需要借助 AI 能力提升产品竞争力。
去年 12 月,我们在产品中上线了 AI+ 的功能,借助已经开源的大模型的能力,帮助我们探索和落地更多地应用场景。在初版 AI+ 的功能中,我们实现了基础功能的通话。

SSE

在 ChatGPT 中,我们在等待大模型生成回答的时间通常不需要很久。这是因为 ChatGPT 通过 server-sent events(SSE)来实现将生成的部分回答通过事件流传递到前端。而这就让前端不必等回答全部生成后再获取,也就使得不需要请求等待很久。

SSE 是一种基于 HTTP 协议的单向通信机制,用于服务端向客户端推送数据。

SSE WebSocket
基于 HTTP 协议 基于 TCP 连接,本身是一种协议
单向通信 双向通信
简单易用 复杂

入门使用

// 创建 SSE 的实例
const evtSource = new EventSource("//api.example.com/ssedemo.php", {
  withCredentials: true,
});

// 添加监听事件
evtSource.onmessage = (event) => {
  const newElement = document.createElement("li");
  const eventList = document.getElementById("list");

  newElement.textContent = `message: ${event.data}`;
  eventList.appendChild(newElement);
};

// 错误处理
evtSource.onerror = (err) => {
  console.error("EventSource failed:", err);
};

// 关闭事件流
evtSource.close();

需要注意的是,SSE 请求的服务端响应信息头的 MIME 类型必须是text/event-stream,否则会无法监听到事件。
另外,由于是基于 HTTP 协议的,所以在 HTTP/1.1 或更低的时候,会受浏览器最大连接数的限制。


Fields

收到的消息格式一定是具有以下字段的某种组合,其他字段名都将忽略,每行一个:

  • event
  • data
  • id
  • retry
: this is a test stream // 第一条消息,这会被解析会注释

data: some text // 第二条消息

data: another message // 第三条消息
data: with two lines

event: userconnect // 第四条消息
data: {"username": "bobby", "time": "02:33:48"}

如上所示,默认浏览器的 EventSource API 虽然可用,但是限制比较多。

  1. 只支持 url 和 withCredentials 参数。不支持往 body 里传参数。而通常来说 URL 是有最大长度限制的。
  2. 无法自定义请求头。
  3. 只能发起 GET 请求。

其实,我们也可以通过 Fetch 来实现 SSE 的通信,只不过需要额外自行处理数据流的传递。

实现

首先,我们借助 Fetch 的能力来实现请求。

const response = await fetch(url, options);

通过接受用户提供的 url 和 options 发起一个 fetch 的请求。
然后,我们需要排除掉非 SSE 的请求类型,我们可以直接拿响应的 header 中拿 content-type进行判断。

const contentType = response.headers.get('content-type');
if (!contentType?.startsWith('text/event-stream')) {
    throw new Error('SSE 请求必须设置 content-type 为 text/event-stream');
}

接着,我们业务场景中通常直接通过 response.json()获取 JSON 格式的数据了,但这里我们由于是事件流,所以我们通过 response.body 拿到的是一个 ReadableStream。我们需要借助相关的 API 进行流的读取。

const reader = response.body.getReader();
let result: ReadableStreamDefaultReadResult<Uint8Array>;
while (!(result = await reader.read()).done) {
  	// 假定每一次 read 的 value 都是完整的消息
    onmessage(onChunk(result.value));
}

其中 onChunk 函数就是处理事件流中的每一份数据的。

// 伪代码
function onChunk(arr: Uint8Array){
  const links = seekLinks();
  // 待完善
}

在实现 seekLinks 方法之前,我们需要先知道到什么时候算每一行的结束。


从 Fields 可以知道,每一行是以\n作为区分的。

function seekLinks(arr: Uint8Array){
  const lines = [];
  const buffer = arr;
  const bufLength = buffer.length;
  let position = 0;
  let lineStart = 0;
  while(position < bufLength){
    // '\n'.charCodeAt() === 10;
    if(buffer[position] === 10){
      lines.push(buffer.slice(lineStart, position));
      lineStart = position;
    };
    position += 1;
  }
  return lines;
}

在获取到所有行后,针对每一行做处理。

// 伪代码
function onChunk(arr: Uint8Array){
  const links = seekLinks();
  const decoder = new TextDecoder();
  let message = {
    data: '',
    event: '',
    id: '',
    retry: undefined,
  }:
  links.forEach((line) => {
    // ':'.charCodeAt() === 58;
    const colon = line.findIndex(l => l === 58);
    const fieldArr = line.slice(0, colon);
    const valueArr = line.slice(colon);
    if(colon === -1){
      // 当冒号作为开头的时候,解析成注释
      return;
    }
    const field = decoder.decode(fieldArr);
    const value = decoder.decode(valueArr);
    switch (field) {
      case 'data':
          message.data = message.data
              ? message.data + '\n' + value
              : value;
          break;
      case 'event':
          message.event = value;
          break;
      case 'id':
          message.id = value;
          break;
      case 'retry':
          const retry = parseInt(value, 10);
          message.retry = retry
          break;
  	}
  });
  return message;
}

大致完成了最简单的基础功能的解析,而以上伪代码参考 fetch-event-source 的源码。


借助 fetch-event-source 的能力,在数栈产品中调用的方式和 HTTP 请求基本保持一致。

function sse(url: string, params: any, options: FetchEventSourceInit) {
  const headers = {
    'Content-Type': 'application/json',
    accept: 'text/event-stream',
  };
  fetchEventSource(url, {
    method: 'POST',
    body: JSON.stringify(params),
    headers,
    ...options,
  });
}

打字机效果

接着,我们实现具备科技感的打字机效果:

输出

这里我们不能直接将响应的消息直接打印到屏幕上,因为响应的消息通常是好多字,这样子会导致打字机效果显得非常卡顿,用户体验不佳。
在数栈产品中,我们通过将响应的消息收集到暂存区中,然后通过每秒从暂存区中取出若干个字符打印到屏幕上,优化打字机卡顿的效果。

function AIGC(){
   const typing = useTyping({
      // 暂存区启动后,每个 delay 的时间都会执行该方法将消息打印到屏幕上
      onTyping(val) {
        // ...
      },
  });
	const handleChat = (message: string) => {
      // 标志暂存区需要开始存响应的消息了
      typing.start();
      requestChat(params, {
        onmessage(event: { data: string }) {
           	const { data } = event;
            // 把响应的消息存入暂存区中
            typing.push(data);
        },
        onclose() {
            // 关闭或失败的话,释放暂存区的数据
            typing.close();
        },
        onerror() {
            typing.close();
        },
    });
  };
}

其中,相关暂存区的代码整理成 useTyping 实现。

export default function useTyping({
    onTyping,
    onEnd,
}: {
    onTyping: (val: string) => void;
    onEnd: () => void;
}) {
    const interval = useRef<number>();
    const queue = useRef<string>('');
    const isStart = useRef<boolean>(false);

    function startTyping() {
        if (interval.current) return;
        let index = 0;
        interval.current = window.setInterval(() => {
            if (index < queue.current.length) {
                const str = queue.current;
                onTyping(str.slice(0, index + 1));
                index++;
            } else if (!isStart.current) {
                // 如果发送了全部的消息且信号关闭,则清空队列
                window.clearInterval(interval.current);
                interval.current = 0;
                onEnd();
            }
            // 如果发送了全部的消息,但是信号没有关闭,则什么都不做继续轮训等待新的消息
        }, 50);
    }

    useEffect(() => {
        return () => {
            window.clearInterval(interval.current);
            interval.current = 0;
        };
    }, []);

    function start() {
        isStart.current = true;
        window.clearInterval(interval.current);
        interval.current = 0;
        queue.current = '';
    }

    function push(str: string) {
        if (!isStart.current) return;
        queue.current += str.replace(/\\n/g, '\n');
        startTyping();
    }

    // 关闭的时候不需要清空队列,因为可能还有一些消息没有发送完毕,统一等消息发送完毕后关闭
    function close() {
        isStart.current = false;
    }

    return { start, push, close };
}

光标

在实现了打字机效果后,我们还需要添加一个闪烁的光标。
原理比较简单,就是在消息区域的最后一个元素的末尾添加元素即可。

.markdown {
  >*:last-child::after {
    content: " ";
    width: 2px;
    height: 13px;
    transform: translate(1px, 2px);
    font-family: Menlo, Monaco, "Courier New", monospace;
    font-weight: normal;
    font-size: 0;
    font-feature-settings: "liga" 0, "calt" 0;
    line-height: 13px;
    letter-spacing: 0;
    display: inline-block;
    visibility: hidden;
    animation: blinker 1s step-end infinite;
    background: #000;
  }

  @keyframes blinker {
    0% {
      visibility: inherit;
    }
    50% {
      visibility: hidden;
    }
    100% {
      visibility: inherit;
    }
  }
}

当然,这里有一些问题,在 markdown 解析出 Code Block 的时候会导致光标错位,这个问题 ChatGPT 同样也有。


那么到这里,我们就实现了一个具备基础功能的 AI+ 的需求。

最后

欢迎关注【袋鼠云数栈UED团队】~
袋鼠云数栈 UED 团队持续为广大开发者分享技术成果,相继参与开源了欢迎 star

热门相关:重生隐婚:Hi,高冷权少!   鬼王绝宠:逆天废材妃   少林武僧在异界   陆爷的小祖宗又撩又飒   陆爷的小祖宗又撩又飒