在开发实时语音聊天机器人(Chatbot)时,最核心的体验指标是低延迟和流畅性。
系统链路通常是:LLM生成文字 -> TTS合成语音 -> 前端播放。这就引入了一个经典的并发协调问题:
- 流式衔接:LLM 还在蹦字,TTS 就得开始合成,不能等整段说完。
- 乱序与重组:TTS 合成速度不均匀(短句快、长句慢),导致任务并发完成的顺序是乱序的,后端必须在推送前将其重组,确保前端按顺序接收和播放。
- 全链路保活:如何精准判断 LLM 是否输出结束,并确保所有 TTS 任务都执行完毕后再关闭连接。
本文是一篇技术练习笔记,记录了如何使用 Python asyncio,通过生成器、Future 占位符和双指针队列,解决上述流式并发问题。
1. LLM 流式处理:从 Token 到 Sentence
LLM 按字(Token)输出,而 TTS 按句合成。为了让 LLM 输出后能立刻开始 TTS 处理,这里的做法是让 LLM 流式输出,在缓冲块(buffer)里面以句子为单位构建 TTS 任务文本,然后使用 yield 按句子返回。
# 创建流式请求(将网卡缓冲区内容读入本地)
stream = await self.client.chat.completions.create(..., stream=True)
buffer = ""
# 流式接收 token
async for chunk in stream:
# 获取 token
if chunk.choices[0].delta.content is not None:
token = chunk.choices[0].delta.content
buffer += token
# 检查token中是否包含句子结束符(可能一次返回多个句子)
while buffer:
# 查找第一个句子结束符
min_pos = -1
for ending in self.sentence_endings:
pos = buffer.find(ending)
if pos != -1 and (min_pos == -1 or pos < min_pos):
min_pos = pos
if min_pos != -1:
# 找到句子结束符,提取句子
sentence = buffer[:min_pos + 1].strip()
buffer = buffer[min_pos + 1:] # 保留剩余部分
if sentence: # 非空句子才yield
yield sentence
else:
# 没有找到句子结束符,等待更多token
break
# 处理剩余的buffer(LLM输出结束但没有句号)
if buffer.strip():
yield buffer.strip()
逻辑解析:
stream = await ...:这里使用AsyncOpenAI创建了一个异步流。它的主要目的是将网卡缓冲区中的内容以chunk为单位读入本地内存。async for chunk in stream:这个循环实现了 LLM 解析的关键逻辑。它会异步地从stream中拉取一个chunk,然后和buffer进行合并。if sentence: yield sentence:当凑出一个完整句子时,程序通过yield将句子抛出。关键点在于:yield会导致当前协程(chat_stream函数)暂停执行(Pause),等待外部调用(__anext__())后才会继续循环,从而实现了 LLM 每输出一个句子就立刻生成一个对应的任务单元。- 效果:这就实现了 LLM 每输出一个句子,就立刻生成一个对应的任务单元,并暂停等待,而不是一口气生成所有句子。
2. 核心调度:主循环的生产消费逻辑
这里有两个问题要处理:
- TTS 返回是按照句子长度(乱序),而非句子先后顺序。
- TTS 返回内容本身不包含“是否最后一句”的标识。
此处的做法是在主循环中协调 LLM(生产者)和 TTS(消费者):
- 通过捕获 LLM 生成器的
StopAsyncIteration异常,得知 LLM 已结束,此时手动注入一个“End 任务”给 TTS。 - 设置
next_id指针,强制按顺序推送音频,实现后端的乱序重组。
# 状态变量
task_id = 0 # 当前分配的任务ID
next_id = 0 # 期望播放的下一个ID(保证顺序)
pending = {} # 进行中的任务 {id: future}
results = {} # 已完成但乱序的结果 {id: result}
llm_finished = False
llm_next_task = None
while True:
# 1. 非阻塞拉取 LLM
if not llm_finished:
if llm_next_task is None:
# 手动步进迭代器,包装成 Task
# 相当于创建了一个 Worker 去拉取 LLM 的下一句
llm_next_task = asyncio.create_task(llm_iterator.__anext__())
# 检查 Worker 是否完成
if llm_next_task.done():
try:
# 生产者:从 Task 中取出句子
sentence = llm_next_task.result()
# 消费者:立刻提交 TTS 任务
future = await self.tts_client.submit(
task_id=task_id,
text=sentence,
is_end=False
)
# 登记“进行中”的任务
pending[task_id] = future
task_id += 1
llm_next_task = None # 重置,准备拉下一句
except StopAsyncIteration:
# LLM 结束,提交一个特殊的 End 任务
future = await self.tts_client.submit(
task_id=task_id,
text="",
is_end=True
)
pending[task_id] = future
llm_finished = True
llm_next_task = None
except Exception as e:
# 处理 LLM 异常...
break
# ... 后续步骤 ...
逻辑解析:
- 生产消费模型:主循环实现了一个生产消费模型,
sentence = llm_next_task.result()这里是 LLM 模块作为生产者不断产生新的句子。 llm_next_task = asyncio.create_task(...):这是非阻塞拉取的核心。此处不直接awaitLLM,而是创建一个后台任务去await。主循环可以继续往下跑(去检查 TTS 结果)。if llm_next_task.done():在主循环的下一次轮询时,检查任务是否完成。except StopAsyncIteration:这是 LLM 生成器(chat_stream)正常结束的信号(不是错误)。捕获它,就能知道 LLM 已说完,此时提交一个is_end=True的特殊任务,下游就能据此判断何时结束。
3. TTS 任务:用 Future 作为“taskid”
主循环(ChatHandler)和 TTS Worker 是解耦的,Worker 完成任务后,如何将结果准确地送回给主循环?
这里的做法是不使用全局字典轮询,而是在提交任务时,将 asyncio.Future 对象作为“回执单”(或“内存地址”)随任务一起放入队列。
async def submit(self, session_id, task_id, text, is_end=False, ...) -> asyncio.Future:
# 1. 创建一个空的 "Future" 占位符
future = asyncio.Future()
# 2. 将 future 随任务一起发给 Worker
await self.global_queue.put({
'session_id': session_id,
'task_id': task_id,
'text': text,
'is_end': is_end,
'future': future # <--- 关键点:把回执单塞进去
})
# 3. 立刻返回这个占位符
return future
Worker 线程(或协程)处理完音频后,不需要关心是谁派发的任务,直接填充手中的 future:
# Worker 逻辑 (伪代码)
async def _worker(self, ...):
task = await self.global_queue.get()
if task['is_end']:
result = TTSResult(is_end=True)
else:
result = await self._do_tts(task['text'])
# 4. 填充回执单
# 外部持有该 future 引用的地方会立刻变为 "done" 状态
task['future'].set_result(result)
逻辑解析:
submit函数返回的future和global_queue里的future是同一个内存对象。主循环的pending[task_id] = future持有了这个对象的引用。- Worker 通过
future.set_result(result)填充数据,主循环在轮询pending字典时,通过future.done()就能立刻感知到任务已完成。 - tts 维护了一个队列,考虑到并发时异步任务混乱,一般来说外部要追踪自己的任务需要一个
task_id之类的键,去一个大字典里面轮询查看任务状态。但是此设计的精妙之处在于将future占位符作为任务的一个元素,队列里任务的future和外部submit返回的future是一个变量(其实就是一种task_id,只不过内存本身被巧妙地用作了那个大字典),当 worker 进程处理后,直接使用future.set_result(result)将future填充上音频和句子文本,这样外部轮询future时就自然而然的获取到了任务结果。
4. 乱序重组:用 next_id 指针保证顺序
TTS Worker 是并发执行的(task_id=2 可能比 task_id=1 先完成),但推送时必须按 0, 1, 2... 的顺序。
这里的实现是引入一个“已完成但乱序”的 results 缓存区,并使用 next_id 指针(类似 TCP 的偏移量机制)来保证按序推送。
# 2. 收集已完成的任务
# 遍历所有“进行中”的任务
for tid in list(pending.keys()):
if pending[tid].done():
try:
# 从 future 取出结果,放入 results 缓存
results[tid] = pending.pop(tid).result()
except Exception as e:
# 处理 Future 本身的异常
results[tid] = TTSResult(success=False, error=str(e))
# 3. 按序推送 (Reordering)
# 关键:这是一个 while 循环,而不是 if
while next_id in results:
# 只有期望的 next_id 存在于 results 中,才处理
result = results.pop(next_id)
if result.is_end:
await websocket.send_json({"type": "end"})
return # 完美退出
if result.success:
await websocket.send_json({
"type": "audio",
"audio": base64.b64encode(result.audio_data).decode()
})
# 指针下移,准备处理下一个
next_id += 1
# ... 循环末尾的 sleep 和退出检查 ...
if llm_finished and not pending and not results:
# 兜底退出:LLM 结束了,也没任务在跑了,缓存也空了
break
await asyncio.sleep(0.01) # 避免忙循环
逻辑解析:
- 主循环最后,在轮询完所有的
future任务后进入推送状态,next_id递增,按顺序推送下一个句子+音频,如果下一个句子是end,那么也要推送给前端(前端也根据end判断是否结束)然后退出主循环。 - 举例:如果
task_id=2先完成了,它会被放入results。 - 此时
next_id=0,while next_id in results条件不满足,不推送。 - 几轮循环后,
task_id=0完成了,被放入results。 while循环启动,推送id=0,next_id变为1。- 循环继续,发现
results里没有id=1,退出while。 - 再过几轮,
task_id=1完成,while循环启动,推送id=1,next_id变为2。 - 循环继续,发现
results里有id=2,推送id=2,next_id变为3。 - 这样就保证了严格的顺序。
评论区