AgentSpan实战:Python构建可持久化AI Agent,崩溃恢复与人工审批

AgentSpan通过状态持久化解决AI Agent崩溃恢复和人工审批等待问题
AgentSpan是一个开源平台,通过将AI Agent的执行状态存储在独立服务器上,解决了Agent崩溃后需从头执行的问题。它支持崩溃恢复(通过执行ID从断点继续)和人工审批长时间等待两大核心能力,还能零改动包装现有LangGraph工作流,为其补充持久化能力。
文章正文
构建AI Agent工作流时,有一个常被忽视但至关重要的问题:如果Agent执行到一半崩溃了怎么办? 某个步骤需要人工审批,而审批人出差了三周,整个流程是否要从头再来?
AgentSpan是一个开源平台,专门解决AI Agent的"持久化"问题。它通过将Agent状态与执行逻辑解耦,让Agent具备崩溃恢复和长时间等待的能力。更重要的是,它不仅可以独立使用,还能无缝包装现有的LangGraph工作流,无需改动任何业务代码。
什么是持久化AI Agent?
传统的Agent执行模式是线性的:调用LLM → 执行工具 → 再调用LLM → 直到完成。所有状态都保存在进程内存中。一旦进程崩溃,所有中间结果全部丢失,下次启动只能从头开始。
在分布式系统领域,状态持久化是一个经典难题。传统微服务通过数据库、消息队列等外部存储解决这一问题。但AI Agent的特殊性在于其状态不仅包含结构化数据,还包含LLM的上下文窗口、工具调用链、推理中间步骤等复杂信息。这使得直接套用传统持久化方案并不简单。AgentSpan的解法本质上借鉴了分布式工作流引擎(如Apache Airflow、Temporal)的思路:将执行状态外化到独立存储,通过唯一执行ID实现状态追踪,而不要求开发者重新设计业务逻辑。
AgentSpan的核心思路是将Agent的状态存储在独立的服务器上。Agent的工具执行结果、LLM调用记录、工作流进度等信息都持久化到服务端。即使客户端进程崩溃,状态依然保留在服务器上。重新启动程序后,只需通过执行ID重新连接,就能从断点处继续执行。

这种架构带来两个关键能力:
- 崩溃恢复(Crash & Resume):进程意外终止后,重启即可从断点继续
- 人工审批等待(Human-in-the-Loop):支持长时间等待人工介入,无需保持进程运行
崩溃恢复机制实战
安装与基本工具定义
首先安装AgentSpan:pip install agentspan。然后定义两个模拟工具——一个用于分块分析数据(带随机延迟),一个用于聚合结果:
from agentspan.agents import agent, tool, start, AgentRuntime, AgentHandle
@tool
def analyze_chunk(chunk_id: int) -> dict:
"""A simple function that processes a data chunk and returns metrics."""
time.sleep(random.randint(3, 8))
return {"chunk_id": chunk_id, "processed": True,
"metrics": {"count": random.randint(5, 8)}}
@tool
def aggregate_results(results: list[dict] = None) -> dict:
"""Aggregates metrics from all chunks into a final report."""
results = results or []
return {"total_count": sum(x["metrics"]["count"] for x in results),
"summary": "analysis complete"}
你可能没注意到,默认参数不要使用可变对象(如空列表),这是Python的经典陷阱。应使用None作为默认值,在函数体内处理。
执行ID持久化策略
崩溃恢复的核心在于执行ID的管理。AgentSpan为每次执行分配唯一ID,只要持有这个ID,就能重新连接到对应的执行状态。这一设计与分布式事务中的"幂等性"原则一脉相承——相同的执行ID对应唯一确定的执行上下文,无论重连多少次,系统都能准确定位到正确的断点位置,而不会产生重复执行或状态混乱的问题。
if not os.path.exists("interrupted_execution_id"):
handle = start(agent, "Analyze the chunks with IDs 1, 2, and 3")
status = handle.get_status()
execution_id = status.execution_id
with open("interrupted_execution_id", "w") as f:
f.write(execution_id)
程序启动时检查是否存在中断记录文件。如果不存在,说明是全新执行,创建新任务并保存ID;如果存在,说明上次执行被中断,读取ID后重新连接:
with AgentRuntime() as runtime:
with open("interrupted_execution_id", "r") as f:
execution_id = f.read()
runtime.serve(agent, blocking=False)
handle = AgentHandle(execution_id=execution_id, runtime=runtime)

实际演示效果
运行程序后,AgentSpan会启动本地服务器(默认端口6767),提供Web监控界面。你可以清晰看到每个LLM调用、工具执行的完整链路。
当在工具执行过程中按Ctrl+C强制终止程序时,服务器上的执行状态仍然显示为"running"。重新运行程序,它会自动检测到中断记录,重新连接到之前的执行,从断点处继续完成剩余的工具调用,而不是重新执行已完成的步骤。

人工审批的持久化等待
Human-in-the-Loop(HITL)是AI系统设计中的重要模式,指在自动化流程的关键节点引入人工判断。在金融风控、医疗诊断、法律合规等高风险场景中,HITL不是可选项而是监管要求。传统实现方式通常依赖消息队列(如RabbitMQ)或工作流引擎来挂起任务,但这些方案与AI Agent框架的集成成本较高,往往需要开发者在业务代码之外维护一套独立的状态机。AgentSpan通过approval_required参数将HITL能力直接内嵌到工具定义层,降低了实现门槛,同时保证了审批等待期间的状态完整性——即便等待时间长达数天,整个执行上下文依然完好保存在服务端。
标记需要审批的工具
AgentSpan通过approval_required参数标记需要人工审批的工具:
@tool(approval_required=True)
def process_refund(order_id: str, amount: float) -> dict:
"""Issue a refund for the given order."""
return {"refund_id": "R_" + order_id, "status": "processed", "amount": amount}
当Agent执行到这个工具时,会自动暂停并等待人工审批。
轮询与审批机制
通过轮询检查执行状态,当检测到is_waiting时,提示用户进行审批操作:
for _ in range(60):
time.sleep(2)
status = handle.get_status()
if status.is_waiting:
choice = input(f"Do you want to approve: {status.pending_tool}? ")
if choice.lower() == "yes":
handle.approve()
else:
handle.reject()
break
if status.is_complete:
print(f"Completed: {status.output['result']}")
break

这里的关键在于:即使在等待审批期间程序崩溃或主动关闭,重新启动后仍然可以通过执行ID重新连接,继续等待或完成审批流程。之前的所有工具调用结果都不会丢失。
与LangGraph无缝集成
LangGraph框架简介
LangGraph是LangChain团队推出的有状态、多Actor应用构建框架,基于有向图(DAG)模型定义Agent工作流。它将Agent的执行过程建模为节点(Node)和边(Edge)的组合,支持条件分支、并行执行、循环等复杂控制流。LangGraph的核心优势在于其状态管理机制——通过TypedDict定义状态模式,每个节点接收并更新状态,天然适合多步骤Agent任务。然而LangGraph本身并不原生支持跨进程的状态持久化:一旦运行进程终止,图的执行状态随之消失。这正是AgentSpan的切入点——它在不侵入LangGraph内部实现的前提下,为其补充了持久化能力。
零改动包装现有LangGraph工作流
AgentSpan最实用的特性之一是能够包装现有的LangGraph工作流,无需修改任何业务逻辑代码。
假设你已经有一个完整的LangGraph状态图,包含节点、边、工具等定义。只需要改动执行层的代码:
from agentspan.agents import AgentRuntime, AgentHandle
with AgentRuntime() as runtime:
runtime.deploy(app)
runtime.serve(app, blocking=False)
if not os.path.exists("interrupted_execution_id_lg"):
handle = runtime.start(app, {"messages": [HumanMessage(content="...")]})
with open("interrupted_execution_id_lg", "w") as f:
f.write(handle.execution_id)
else:
with open("interrupted_execution_id_lg", "r\
相关推荐
教程攻略Cursor+Codex双IDE协同:开源项目二开实战方法论
基于实战经验总结的开源项目二次开发完整方法论,详解Cursor+Codex双IDE协同工作流,涵盖二开七环节、MVP验证、AI读源码技巧,帮助开发者三天跑通项目、两周完成业务集成。
教程攻略Cursor多Agent实战:50分钟搭建Next.js全栈博客
使用Cursor IDE多Agent协作模式,50分钟内从零搭建全栈博客。涵盖Next.js、Clerk认证、Supabase数据库集成,详解4个AI Agent分阶段开发流程与关键避坑经验。
教程攻略从零搭建AI软件工厂:Cursor工程师的多Agent协作实战经验
Cursor工程师Eric分享AI软件工厂构建实战:从自动化六层级、护栏设计、并行Agent管理到规模化扩展,详解如何用多Agent协作实现7×24小时高效软件开发。