在开发实时语音聊天机器人(Chatbot)时,最核心的体验指标是低延迟流畅性

系统链路通常是:LLM生成文字 -> TTS合成语音 -> 前端播放。这就引入了一个经典的并发协调问题:

  1. 流式衔接:LLM 还在蹦字,TTS 就得开始合成,不能等整段说完。
  2. 乱序与重组:TTS 合成速度不均匀(短句快、长句慢),导致任务并发完成的顺序是乱序的,后端必须在推送前将其重组,确保前端按顺序接收和播放。
  3. 全链路保活:如何精准判断 LLM 是否输出结束,并确保所有 TTS 任务都执行完毕后再关闭连接。

本文是一篇技术练习笔记,记录了如何使用 Python asyncio,通过生成器、Future 占位符和双指针队列,解决上述流式并发问题。

1. LLM 流式处理:从 Token 到 Sentence

LLM 按字(Token)输出,而 TTS 按句合成。为了让 LLM 输出后能立刻开始 TTS 处理,这里的做法是让 LLM 流式输出,在缓冲块(buffer)里面以句子为单位构建 TTS 任务文本,然后使用 yield 按句子返回。

# 创建流式请求(将网卡缓冲区内容读入本地)
stream = await self.client.chat.completions.create(..., stream=True)
buffer = ""

# 流式接收 token
async for chunk in stream:
    # 获取 token
    if chunk.choices[0].delta.content is not None:
        token = chunk.choices[0].delta.content
        buffer += token

        # 检查token中是否包含句子结束符(可能一次返回多个句子)
        while buffer:
            # 查找第一个句子结束符
            min_pos = -1
            for ending in self.sentence_endings:
                pos = buffer.find(ending)
                if pos != -1 and (min_pos == -1 or pos < min_pos):
                    min_pos = pos

            if min_pos != -1:
                # 找到句子结束符,提取句子
                sentence = buffer[:min_pos + 1].strip()
                buffer = buffer[min_pos + 1:]  # 保留剩余部分

                if sentence:  # 非空句子才yield
                    yield sentence
            else:
                # 没有找到句子结束符,等待更多token
                break

# 处理剩余的buffer(LLM输出结束但没有句号)
if buffer.strip():
    yield buffer.strip()

逻辑解析

  • stream = await ...:这里使用 AsyncOpenAI 创建了一个异步流。它的主要目的是将网卡缓冲区中的内容以 chunk 为单位读入本地内存。
  • async for chunk in stream:这个循环实现了 LLM 解析的关键逻辑。它会异步地从 stream 中拉取一个 chunk,然后和 buffer 进行合并。
  • if sentence: yield sentence:当凑出一个完整句子时,程序通过 yield 将句子抛出。关键点在于yield 会导致当前协程(chat_stream 函数)暂停执行(Pause),等待外部调用(__anext__())后才会继续循环,从而实现了 LLM 每输出一个句子就立刻生成一个对应的任务单元。
  • 效果:这就实现了 LLM 每输出一个句子,就立刻生成一个对应的任务单元,并暂停等待,而不是一口气生成所有句子。

2. 核心调度:主循环的生产消费逻辑

这里有两个问题要处理:

  1. TTS 返回是按照句子长度(乱序),而非句子先后顺序。
  2. TTS 返回内容本身不包含“是否最后一句”的标识。

此处的做法是在主循环中协调 LLM(生产者)和 TTS(消费者):

  1. 通过捕获 LLM 生成器的 StopAsyncIteration 异常,得知 LLM 已结束,此时手动注入一个“End 任务”给 TTS。
  2. 设置 next_id 指针,强制按顺序推送音频,实现后端的乱序重组。
# 状态变量
task_id = 0       # 当前分配的任务ID
next_id = 0       # 期望播放的下一个ID(保证顺序)
pending = {}      # 进行中的任务 {id: future}
results = {}      # 已完成但乱序的结果 {id: result}
llm_finished = False
llm_next_task = None

while True:
    # 1. 非阻塞拉取 LLM
    if not llm_finished:
        if llm_next_task is None:
            # 手动步进迭代器,包装成 Task
            # 相当于创建了一个 Worker 去拉取 LLM 的下一句
            llm_next_task = asyncio.create_task(llm_iterator.__anext__())

        # 检查 Worker 是否完成
        if llm_next_task.done():
            try:
                # 生产者:从 Task 中取出句子
                sentence = llm_next_task.result()

                # 消费者:立刻提交 TTS 任务
                future = await self.tts_client.submit(
                    task_id=task_id,
                    text=sentence,
                    is_end=False
                )
                # 登记“进行中”的任务
                pending[task_id] = future
                task_id += 1
                llm_next_task = None # 重置,准备拉下一句
                
            except StopAsyncIteration:
                # LLM 结束,提交一个特殊的 End 任务
                future = await self.tts_client.submit(
                    task_id=task_id,
                    text="",
                    is_end=True
                )
                pending[task_id] = future
                llm_finished = True
                llm_next_task = None
            except Exception as e:
                # 处理 LLM 异常...
                break
    
    # ... 后续步骤 ...

逻辑解析

  • 生产消费模型:主循环实现了一个生产消费模型,sentence = llm_next_task.result() 这里是 LLM 模块作为生产者不断产生新的句子。
  • llm_next_task = asyncio.create_task(...):这是非阻塞拉取的核心。此处不直接 await LLM,而是创建一个后台任务去 await。主循环可以继续往下跑(去检查 TTS 结果)。
  • if llm_next_task.done():在主循环的下一次轮询时,检查任务是否完成。
  • except StopAsyncIteration:这是 LLM 生成器(chat_stream)正常结束的信号(不是错误)。捕获它,就能知道 LLM 已说完,此时提交一个 is_end=True 的特殊任务,下游就能据此判断何时结束。

3. TTS 任务:用 Future 作为“taskid”

主循环(ChatHandler)和 TTS Worker 是解耦的,Worker 完成任务后,如何将结果准确地送回给主循环?

这里的做法是不使用全局字典轮询,而是在提交任务时,将 asyncio.Future 对象作为“回执单”(或“内存地址”)随任务一起放入队列。

async def submit(self, session_id, task_id, text, is_end=False, ...) -> asyncio.Future:
    # 1. 创建一个空的 "Future" 占位符
    future = asyncio.Future()
    
    # 2. 将 future 随任务一起发给 Worker
    await self.global_queue.put({
        'session_id': session_id,
        'task_id': task_id,
        'text': text,
        'is_end': is_end,
        'future': future  # <--- 关键点:把回执单塞进去
    })
    
    # 3. 立刻返回这个占位符
    return future

Worker 线程(或协程)处理完音频后,不需要关心是谁派发的任务,直接填充手中的 future

# Worker 逻辑 (伪代码)
async def _worker(self, ...):
    task = await self.global_queue.get()
    
    if task['is_end']:
        result = TTSResult(is_end=True)
    else:
        result = await self._do_tts(task['text'])
    
    # 4. 填充回执单
    # 外部持有该 future 引用的地方会立刻变为 "done" 状态
    task['future'].set_result(result)

逻辑解析

  • submit 函数返回的 futureglobal_queue 里的 future同一个内存对象。主循环的 pending[task_id] = future 持有了这个对象的引用。
  • Worker 通过 future.set_result(result) 填充数据,主循环在轮询 pending 字典时,通过 future.done() 就能立刻感知到任务已完成。
  • tts 维护了一个队列,考虑到并发时异步任务混乱,一般来说外部要追踪自己的任务需要一个 task_id 之类的键,去一个大字典里面轮询查看任务状态。但是此设计的精妙之处在于将 future 占位符作为任务的一个元素,队列里任务的 future 和外部 submit 返回的 future 是一个变量(其实就是一种 task_id,只不过内存本身被巧妙地用作了那个大字典),当 worker 进程处理后,直接使用 future.set_result(result)future 填充上音频和句子文本,这样外部轮询 future 时就自然而然的获取到了任务结果。

4. 乱序重组:用 next_id 指针保证顺序

TTS Worker 是并发执行的(task_id=2 可能比 task_id=1 先完成),但推送时必须按 0, 1, 2... 的顺序。

这里的实现是引入一个“已完成但乱序”的 results 缓存区,并使用 next_id 指针(类似 TCP 的偏移量机制)来保证按序推送。

    # 2. 收集已完成的任务
    # 遍历所有“进行中”的任务
    for tid in list(pending.keys()):
        if pending[tid].done():
            try:
                # 从 future 取出结果,放入 results 缓存
                results[tid] = pending.pop(tid).result()
            except Exception as e:
                # 处理 Future 本身的异常
                results[tid] = TTSResult(success=False, error=str(e))

    # 3. 按序推送 (Reordering)
    # 关键:这是一个 while 循环,而不是 if
    while next_id in results:
        # 只有期望的 next_id 存在于 results 中,才处理
        result = results.pop(next_id)
        
        if result.is_end:
            await websocket.send_json({"type": "end"})
            return # 完美退出
            
        if result.success:
            await websocket.send_json({
                "type": "audio", 
                "audio": base64.b64encode(result.audio_data).decode()
            })
        
        # 指针下移,准备处理下一个
        next_id += 1 

    # ... 循环末尾的 sleep 和退出检查 ...
    if llm_finished and not pending and not results:
        # 兜底退出:LLM 结束了,也没任务在跑了,缓存也空了
        break
        
    await asyncio.sleep(0.01) # 避免忙循环

逻辑解析

  • 主循环最后,在轮询完所有的 future 任务后进入推送状态,next_id 递增,按顺序推送下一个句子+音频,如果下一个句子是 end,那么也要推送给前端(前端也根据 end 判断是否结束)然后退出主循环。
  • 举例:如果 task_id=2 先完成了,它会被放入 results
  • 此时 next_id=0while next_id in results 条件不满足,不推送。
  • 几轮循环后,task_id=0 完成了,被放入 results
  • while 循环启动,推送 id=0next_id 变为 1
  • 循环继续,发现 results 里没有 id=1,退出 while
  • 再过几轮,task_id=1 完成,while 循环启动,推送 id=1next_id 变为 2
  • 循环继续,发现 results 里有 id=2,推送 id=2next_id 变为 3
  • 这样就保证了严格的顺序。