LangChain流式输出与异步实战:Agent场景完整实现

深入解析LangChain中流式输出与异步处理的实现原理及实践方法
本文阐述了流式输出和异步处理对AI应用的重要性:异步编程通过事件循环机制解决LLM API调用的阻塞问题,流式输出利用LLM自回归生成特性实时传递token。文章详细介绍了LangChain的astream方法、Agent场景下的流式实现方案,以及基于AsyncQueueCallbackHandler和asyncio.Queue的生产者-消费者架构设计。
在构建对话式AI应用时,流式输出(Streaming)和异步处理(Async)是两个不可或缺的核心能力。没有流式输出,用户需要漫长等待才能看到完整回复;没有异步处理,应用在等待LLM API响应时会完全阻塞,无法扩展。本文将深入解析LangChain中流式输出与异步的实现原理,并展示如何在实际API中集成这些能力。
流式输出和异步为什么对AI应用至关重要
异步编程:解决LLM API等待的性能瓶颈
大语言模型的调用几乎都通过API完成,每次调用可能需要数秒甚至更长时间。如果你的应用使用同步代码,在等待LLM响应期间,整个线程会被阻塞,什么都做不了。这意味着你的应用无法同时处理多个用户请求,扩展性极差。
Python的异步编程基于**事件循环(Event Loop)机制,由asyncio标准库提供支持。与多线程不同,异步编程是单线程的,通过协程(Coroutine)**在I/O等待期间主动让出控制权,让事件循环调度其他任务执行。这种模型特别适合I/O密集型场景——LLM API调用正是典型的I/O密集型操作:网络请求发出后,CPU几乎什么都不做,只是在等待远端服务器的响应。异步模型让这段"空窗期"得到充分利用,事件循环可以在此期间处理其他用户的请求。
异步代码的核心优势在于:当一个请求在等待LLM响应时,程序可以去处理其他请求。对于AI应用来说,大量时间都花在等待API调用上,异步处理带来的性能提升尤为显著。在实践中,一个异步FastAPI服务可以用单进程同时处理数十个并发LLM请求,而同步版本在同等条件下可能只能串行处理。
流式输出:不仅是体验优化,更是功能机制
LLM生成文本的本质是逐token产出——模型基于Transformer架构,采用**自回归(Autoregressive)**方式生成文本:每次前向传播只预测一个token,然后将该token追加到输入序列,再次前向传播预测下一个token。这个过程的延迟由模型大小、序列长度和硬件性能共同决定,通常每个token需要数十毫秒,生成一段完整回复可能需要数秒。流式输出就是将这个逐token生成的过程实时传递给用户,而不是等到所有token都生成完毕后再一次性返回。

流式输出的价值体现在三个层面:
- 用户体验提升:想象使用GPT-4生成一个长故事,如果不流式输出,用户可能需要盯着空白页面等待十几秒甚至更久,这在聊天场景中是不可接受的。
- 中间步骤可视化:在Agent场景中,流式输出不仅传递最终文本,还能展示中间决策过程——比如"正在搜索网页"、"正在调用计算工具"等。Perplexity的ProSearch就是一个优秀案例,用户可以实时看到搜索、分析的全过程。
- 功能特性实现:ChatGPT接收到LLM输出的"使用搜索工具"token后,不会直接显示这些token,而是将其转化为"正在搜索网页"的UI提示。这说明流式输出本身就是一种功能机制。
LangChain基础流式输出:astream方法详解
使用astream进行异步流式输出
LangChain中,带有a前缀的方法都是对应方法的异步版本。astream就是stream的异步版本,使用方式非常简洁:
tokens = []
async for token in llm.astream("Tell me about NLP"):
tokens.append(token)
print(token.content, end="|", flush=True)
每个返回的对象都是AIMessageChunk类型,包含一小段内容。一个有趣的特性是,这些chunk可以通过加法运算合并:
tk = tokens[0]
for token in tokens[1:]:
tk += token
合并后的结果仍然是一个AIMessageChunk,但内容已经拼接完整。这个特性不仅适用于文本内容,也适用于工具调用参数等其他字段,在处理Agent的流式输出时非常实用。

需要注意的是,flush=True参数会强制控制台立即更新显示内容,使流式效果更加平滑。不设置的话,输出会有明显的"一块一块"的感觉。
LangChain Agent流式输出实现方案
构建可配置的Agent实例
Agent场景比简单的LLM调用复杂得多,因为涉及多轮LLM调用和工具执行。为了在API中使用流式输出,我们需要在每次查询时提供新的回调处理器。通过LangChain的configurable_fields机制,可以在LLM上添加可配置字段:
llm = ChatOpenAI(streaming=True)
llm = llm.configurable_fields(
callbacks=ConfigurableField(description="callbacks")
)
注意streaming=True这个参数——虽然看起来不起眼,但它是启用流式输出的前提。配置好后,每次调用Agent时都可以传入不同的回调处理器。
自定义AsyncQueueCallbackHandler回调处理器
LangChain的回调系统是一套观察者模式(Observer Pattern)的实现,允许开发者在链(Chain)或Agent执行的各个生命周期节点注入自定义逻辑。回调处理器可以监听的事件包括:LLM开始/结束、工具调用开始/结束、链开始/结束等。AsyncCallbackHandler是其异步版本,所有方法都是协程,不会阻塞事件循环。我们正是利用这套机制,在每个token产生时将其捕获并转发。
核心思路是:回调处理器将LLM产生的token放入异步队列(asyncio.Queue),然后在外部从队列中取出token进行处理。asyncio.Queue是Python异步编程中实现生产者-消费者模式的核心数据结构——当队列为空时get()会自动挂起等待,不占用CPU资源,完美契合"LLM慢速生产token、外部快速消费token"的场景。

这个自定义处理器继承自AsyncCallbackHandler,包含几个关键组件:
__aiter__方法:作为异步生成器,持续从队列中取出token。如果队列为空,使用asyncio.sleep(0.1)等待——这里必须使用异步sleep,否则会阻塞整个线程。当收到"done"标记时停止迭代,否则yield当前token。
on_llm_new_token方法:LangChain在LLM返回每个token时调用此方法。它从chunk的additional_kwargs中提取工具调用信息,使用Python的**海象运算符(:=)**同时完成赋值和条件判断:
if tool_calls := chunk.additional_kwargs.get("tool_calls"):
if tool_calls[0]["function"]["name"] == "final_answer":
self.final_answer_seen = True
else:
await self.queue.put(chunk)
final_answer_seen标志用于区分工具调用token和最终回答token,以便进行不同的处理。
on_llm_end方法:在Agent执行过程中,LLM会被调用多次(选择工具、生成最终答案等),每次结束都会触发此方法。如果当前是工具调用结束,发送"step_end"标记;如果是最终答案结束,发送"done"标记以停止生成器。
流式Agent执行器的架构设计
用astream替代invoke实现流式调用

将流式输出集成到Agent执行器中,核心是使用astream替代invoke来调用Agent:
async def stream(self, query):
async for token in self.agent.astream(query, config={"callbacks": [streamer]}):
# 合并chunk到完整输出
if output is None:
output = token
else:
output += token
# 提取工具调用信息
tool_calls = token.additional_kwargs.get("tool_calls")
if tool_calls:
# 处理工具名称和参数
...
每个chunk被逐步合并到output中,同时实时提取工具调用的名称和参数。由于Agent使用了tool_choice="any",所有输出都通过工具调用返回,content字段始终为空。
用asyncio.Queue解耦Agent执行与token消费
关键的架构设计在于:不要在Agent执行器内部处理token的最终去向。Agent执行器只负责将token放入队列,而在外部通过异步循环从队列中取出token,决定如何使用它们:
queue = asyncio.Queue()
streamer = QueueCallbackHandler(queue)
task = asyncio.create_task(agent_executor.run(query))
async for token in streamer:
if token == "step_end":
print("\
")
elif "function" in str(token):
tool_name = extract_tool_name(token)
print(f"Calling {tool_name}")
else:
print(token, end="\
相关推荐
教程攻略Cursor+Codex双IDE协同:开源项目二开实战方法论
基于实战经验总结的开源项目二次开发完整方法论,详解Cursor+Codex双IDE协同工作流,涵盖二开七环节、MVP验证、AI读源码技巧,帮助开发者三天跑通项目、两周完成业务集成。
教程攻略Cursor多Agent实战:50分钟搭建Next.js全栈博客
使用Cursor IDE多Agent协作模式,50分钟内从零搭建全栈博客。涵盖Next.js、Clerk认证、Supabase数据库集成,详解4个AI Agent分阶段开发流程与关键避坑经验。
教程攻略从零搭建AI软件工厂:Cursor工程师的多Agent协作实战经验
Cursor工程师Eric分享AI软件工厂构建实战:从自动化六层级、护栏设计、并行Agent管理到规模化扩展,详解如何用多Agent协作实现7×24小时高效软件开发。