返回文章列表

LangGraph框架

131 min read
LangGraph框架

LangGraph是什么(What)

LangGraph将工作流表示为图结构(graph),提供了更高的灵活性和控制能力,特别适合需要循环逻辑、状态管理以及多主体协作的场景,比如智能代理(agent)和多代理工作流

核心概念

  1. State (状态):这是图形的“内存”。所有节点共享同一个数据结构,并根据执行结果不断更新它。

  2. Nodes (节点):通常是一个函数。它接收当前状态,执行一些逻辑(如调用 LLM),然后返回更新后的状态。

  3. Edges (边):定义节点之间的路径。分为普通边(直接从 A 到 B)和条件边(根据 LLM 的判断决定去 A 还是去 B)。

环境安装与配置

pip install -U langgraph

quick start

第一步:定义状态 (State)

相当于memory

通常使用 Python 的 TypedDict 来定义。

from typing import TypedDict, Annotated, List

class AgentState(TypedDict):
# 使用 Annotated 和 operator.add 可以让消息不断累加,而不是覆盖
    messages: Annotated[List[str], "add"] 

** 为什么 LangGraph 推荐用 TypedDict 定义状态?**

在 LangGraph 的 StateGraph 中,状态通常是一个字典。使用 TypedDict 的主要原因是:

  1. 轻量化:状态在节点之间频繁传递,原生 dict 的序列化和反序列化速度最快。

  2. 兼容性:LangGraph 的 Annotated 语法(如 operator.add)在 TypedDict 上表现最自然,方便实现状态的增量更新。

第二步:定义节点 (Nodes)

节点就是处理逻辑的地方。

def chatbot_node(state: AgentState):
# 这里可以调用模型
   return {"messages": ["这是 AI 的回答"]}

第三步:构建图 (Graph)

将节点连接起来。

from langgraph.graph import StateGraph, END

workflow = StateGraph(AgentState)

# 添加节点
workflow.add_node("chatbot", chatbot_node)

# 设置入口
workflow.set_entry_point("chatbot")

# 添加边(这里直接指向结束)
workflow.add_edge("chatbot", END)

# 编译
app = workflow.compile()

第四步:运行

app.invoke({"messages": ["你好!"]})

图结构可视化

model调用(和langchain一样)

tools调用

定义和使用

定义和绑定和langchain一样,但是工具调用不同

LangGraph 的核心优势在于它有一个内置的工具节点 ToolNode,可以自动处理模型生成的 tool_calls 并返回结果。

from langgraph.prebuilt import ToolNode
from langgraph.graph import StateGraph, START, END

# 1. 定义工具节点
tool_node = ToolNode([get_weather])

# 2. 定义逻辑:判断模型输出是否包含工具调用
def should_continue(state):
    messages = state["messages"]
    last_message = messages[-1]
    if last_message.tool_calls:
        return "tools"return END

# 3. 构建图
workflow = StateGraph(MessagesState)
workflow.add_node("agent", call_model) # 这里面调用 model_with_tools
workflow.add_node("tools", tool_node)  # 自动执行工具逻辑

workflow.add_edge(START, "agent")
# 根据模型结果决定:是去执行工具,还是直接结束
workflow.add_conditional_edges("agent", should_continue)
workflow.add_edge("tools", "agent") # 工具执行完后,回到模型进行总结

app = workflow.compile()

封装node内部

延伸问题:如果这个agent有多个节点,每个节点有多个工具,每个节点都要都要定义一个ToolNode节点吗?这样结构会很冗余?

可以使用 LangGraph 预置的 create_react_agent。它自带了工具调用的死循环,会自动处理“LLM请求 -> 执行工具 -> LLM总结”的过程,最后直接吐出你要的最终文本

from langgraph.prebuilt import create_react_agent
from langchain_core.messages import HumanMessage

def news_analyst_node_seq(state: SequentialState):
    console.print("--- (Sequential) CALLING NEWS ANALYST ---")
    prompt = f"Your task is to act as an expert News Analyst. Find the latest major news about the topic in the user's request and provide a concise summary.\n\nUser Request: {state['user_request']}"
    
    # 修复点:创建一个自带工具执行循环的 Agent
    # 只要它还在使用工具,它就会在内部循环,直到拿到结果写出最终答案
    agent = create_react_agent(llm, tools=[search_tool])
    
    # 触发内部 agent 执行
    response = agent.invoke({"messages": [HumanMessage(content=prompt)]})
    
    # 获取内部 agent 的最后一条消息(即最终的分析总结,而不是工具调用请求)
    final_answer = response["messages"][-1].content
    
    return {"news_report": final_answer}

but这样会导致问题,因为这样会导致双层agent嵌套,react会嵌套进在整体的agent架构中,比如某个节点会调用10次tool,不一定有必要,但是会增加rt和token,不是全部场景都适合这个问题,比如这个例子

官方文档明确说,create_agent 会“一直循环调用工具,直到满足停止条件”;也支持用 middleware 限制 model/tool 调用次数。https://docs.langchain.com/oss/python/deepagents/middleware

State 状态

State 的更新

状态(State)组件扮演着关键的载体角色,负责在图的各个节点之间传递信息。意味LangGraph框架的核心在于State的有效使用和掌握。在复杂的应用中,State组件需要存储和管理的信息量会显著增加。核心功能如工具使用、记忆能力和人机交互等,都依赖State来实现和维护

image

     图中的每个节点都具备访问、读取和写入状态State的权限。当某一个节点去修改状态时,它会将此信息广播到图中的所有其他节点。这种广播机制允许其他节点响应状态的变化并相应地调整其行为。从开发的角度来看,`State`实际上是一个共享的数据结构。如上图所示,状态表现为一个简单的字典。通过对这个字典进行读写操作,可以实现自左而右的数据流动,从而构建一个可运行的图结构   

image

可以在图完成编译后,进行state的初始化

from typing import TypedDict, Annotated
import operator

class AgentState(TypedDict):
    # 这是一个基本字段,新值会覆盖旧值
    input: str
    # 这是一个带“聚合器”的字段,新值会附加到旧列表中
    messages: Annotated[list, operator.add]

更新机制--覆盖和聚合

State 的更新不是手动的 state["key"] = value,而是通过节点的返回值自动触发的。

A. 覆盖(Default Overwrite)

对于普通字段,如果 Node 返回了同名的 Key,原有的值会被直接替换。

  • 旧状态: {"count": 1}

  • Node 返回: {"count": 2}

  • 新状态: {"count": 2}

B. 聚合(Reducer Functions)

如果你希望保留历史记录(比如对话记录),你需要使用 AnnotatedReducer 函数。最常用的是 operator.add

  • 旧状态: {"messages": ["Hello"]}

  • Node 返回: {"messages": ["World"]}

  • 新状态: {"messages": ["Hello", "World"]}

也可以自定义函数实现:

def merge_and_trim(existing: list, new: list) -> list:
    # 1. 合并新旧列表
    combined = existing + new
    # 2. 限制长度:只保留最后 3 个元素
    return combined[-3:]

class LimitedState(TypedDict):
    # 每次有新数据进入,都会触发 merge_and_trim 逻辑
    recent_actions: Annotated[list, merge_and_trim]

C. 智能处理(使用 add_messages

这是 LangGraph 专门为对话模型提供的工具。它比简单的 operator.add 更聪明:如果新消息的 ID 与旧消息相同,它会执行替换(更新);如果 ID 不同,则执行追加

Python

from langgraph.graph.message import add_messages

class AgentState(TypedDict):# 能够自动处理 AI 消息的更新(比如流式输出)和历史堆叠
    messages: Annotated[list, add_messages]

Annotated函数

Annotated(来自 typing 模块)主要用于添加元数据。但在 LangGraph 的语境下,它被赋予了特殊的使命:定义 State 字段的“合并协议”

image

from typing import Annotated, TypedDict
import operator

class MyState(TypedDict):
    # 场景 1:数字累加
    count: Annotated[int, operator.add]
    
    # 场景 2:列表追加(最常用)
    messages: Annotated[list, operator.add]

Reducer函数的机制

机制

  LangGraph内部原理是:State中的每个key都有自己独立的Reducer函数,通过指定的reducer函数应用状态值的更新。

image

  Reducer 函数用来根据当前的状态(state)和一个操作(action)来计算并返回新的状态。它是一种设计模式,用于将业务逻辑与状态变更解耦,使得状态的变更预测性更强并且容易追踪。这样的函数通常接收两个参数:当前的状态(state)和一个描述应用了什么操作的对象(action), 根据 action 类型来决定如何修改状态。比如,在一个购物车应用中,可能会有添加商品、删除商品、修改商品数量等操作。返回一个新的状态对象,而不是修改原始状态对象。简单理解,Reducer函数做的就是根据给定的输入(当前状态和操作)生成新的状态。

举例:

当你调用 app.invoke() 后,每一轮更新都遵循以下步骤:

  1. 节点输出:节点返回 {"my_key": "new_value"}

  2. 查找规则:LangGraph 检查 State 定义中 my_key 是否绑定了 Annotated[..., reducer]

  3. 触发计算

  • 如果没有 Reducer:执行 existing = updated(直接覆盖)。
  • 如果有 Reducer:调用 reducer(existing, updated)
  1. 状态持久化:将计算结果存入 Checkpoint(检查点)

为什么 Reducer 对“并行执行”至关重要?

这是 LangGraph 的高级特性。如果你的图中有并行节点(两个节点同时运行并返回结果),Reducer 是防止冲突的关键:

  • 场景:节点 A 和节点 B 同时运行,都想更新 logs 字段。

  • 过程

    1. 节点 A 返回 {"logs": ["A done"]}
    2. 节点 B 返回 {"logs": ["B done"]}
    3. LangGraph 会依次调用 Reducer:先合并 A 的结果,再在 A 的基础上合并 B 的结果。
  • 结果logs 最终变为 ["A done", "B done"],而不是互相覆盖。

state生命周期

  1. 初始化: 当你调用 app.invoke({"input": "hi"}) 时,初始数据被填入 State。

  2. 节点读取: 每一个 Node 函数接收当前的 state 作为参数。

  3. 节点返回: Node 处理完后,返回一个包含更新字段的字典

  4. 状态融合: LangGraph 根据你定义的 Reducer 规则,将 Node 返回的字典合并到全局 State 中。

  5. 流转: 更新后的 State 被传递给下一个节点。

messageState

简单来说,MessagesState 是 LangGraph 为了让你少写代码而提供的一个“全家桶”预设状态。

在开发聊天机器人(Chatbot)时,几乎每个人都要定义一个包含 messages 列表的状态。与其每次都手动写 Annotatedadd_messages,LangGraph 直接帮你打包好了。

from typing import Annotated, TypedDict
from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages

class MessagesState(TypedDict):
    # 核心就在这里:它预设了一个叫 messages 的字段
    # 并且已经绑定好了 add_messages 这个智能 Reducer
    messages: Annotated[list[AnyMessage], add_messages]

边和条件边

普通边 (Normal Edges)

普通边用于定义固定的顺序。一旦节点 A 执行完毕,程序总是会直接跳转到节点 B。

  • 定义方式workflow.add_edge("开始节点", "目标节点")

  • 使用场景:线性工作流。例如:数据处理 -> 数据分析 -> 生成报告。

from langgraph.graph import StateGraph, END

workflow = StateGraph(AgentState)

# 定义两个节点
workflow.add_node("fetch_data", fetch_data_node)
workflow.add_node("process_data", process_data_node)

# 定义普通边:fetch_data 执行完后,必须执行 process_data
workflow.add_edge("fetch_data", "process_data")

# 最后处理完直接结束
workflow.add_edge("process_data", END)

条件边 (Conditional Edges)

这是构建智能代理(Agent)的核心。它允许根据当前状态的内容(通常是 LLM 的判断)动态决定下一步去哪里。

  • 定义方式workflow.add_conditional_edges("源节点", 路由器函数, 映射字典)

  • 核心组件

    1. 源节点:决策发生的起点。
    2. 路由器函数 (Router Function):一个逻辑函数,接收当前 state,返回一个字符串(路径名)。
    3. 映射字典 (Path Map):将路由器返回的字符串映射到具体的节点名或 END
# 1. 定义路由器函数
def route_decision(state: AgentState):
    last_message = state["messages"][-1]
    # 模拟逻辑:如果消息里包含"天气",就去调用工具节点
    if "天气" in last_message:
        return "call_tool"
    else:
        return "reply_directly"

# 2. 在图中定义条件边
workflow.add_conditional_edges(
    "chatbot",           # 从 chatbot 节点出发
    route_decision,      # 使用这个函数做决定
    {                    # 映射关系:
        "call_tool": "weather_node", 
        "reply_directly": END
    }
)

Router Agent路由代理

     在`LangGraph`中,我们可以利用“条件边”这一概念来指导或约束大模型在处理特定任务时的逻辑流程。这种机制允许大模型在达到某一环节并满足预设条件时,根据不同的条件输出或数据,选择性地执行不同的逻辑路径       

LangGraph使用的是一个类似于 if-else语句的结构组件,称为Router(路由)。这个组件允许大模型从一组预设的选项中选择合适的步骤来进行执行。这个设计思路并不难理解,同时由于LangGraph的底层封装,实现起来也非常简单

  1. 如果想选择性地路由到 1 个或多个边,则需要使用add_conditional_edges**方法,需要定义一个 routing_function作为路由函数,并添加一个新的节点node_c **
from langgraph.graph import START, StateGraph, END
from langgraph.graph import StateGraph
from IPython.display import Image, display

def node_a(state):
    return {"x": state["x"] + 1}

def node_b(state):
    return {"x": state["x"] - 2}

def node_c(state):
    return {"x": state["x"] + 1}

def routing_function():
    if state["x"] == 10:
        return "node_b"
    else:
        return "node_c"

builder = StateGraph(dict)

builder.add_node("node_a", node_a)
builder.add_node("node_b", node_b)
builder.add_node("node_c", node_c)

builder.set_entry_point("node_a")

# 构建节点之间的边
builder.add_conditional_edges("node_a", routing_function)

graph = builder.compile()
  1. routing_function路由函数的返回值用作将状态发送到下一个的节点(或节点列表)的名称。除此之外,还可以使用path_map参数,通过一个字典的数据结构将routing_function的输出映射到下一个节点的名称
from langgraph.graph import START, StateGraph, END
from langgraph.graph import StateGraph

def node_a(state):
    return {"x": state["x"] + 1}

def node_b(state):
    return {"x": state["x"] - 2}

def node_c(state):
    return {"x": state["x"] + 1}

def routing_function():
    if state["x"] == 10:
        return True
    else:
        return False

builder = StateGraph(dict)

builder.add_node("node_a", node_a)
builder.add_node("node_b", node_b)
builder.add_node("node_c", node_c)

builder.set_entry_point("node_a")

# 构建节点之间的边
builder.add_conditional_edges("node_a", routing_function, {True: "node_b", False: "node_c"})

builder.add_edge("node_b", END)
builder.add_edge("node_c", END)

graph = builder.compile()

from IPython.display import Image, display

display(Image(graph.get_graph(xray=True).draw_mermaid_png()))

在构建实际的Agent时,Router fuction的定义才是最关键且最重要的。我们需要在这个函数中,基于特定的一些格式或者标识来区分该执行哪一条分支的逻辑。而对于消息的传递,大模型往往是通过结构化输出,引导其在响应的过程中应遵循哪种模式来工作,就类似于工具调用过程。Router就很好的利用到了这个特性,通过结构化输出的特性来控制接下来的分支路径。

  1. 更常使用的是用Pydantic来处理路由决策。在这种策略下,我们可以通过定义一个包含Union类型属性的父模型来灵活地从多种模式中选择适当的路由分支。例如,如果我们想根据输出决定是查询数据库还是直接回答问题,可以创建一个统一的模型来封装可能的输出类型。代码如下所示:
import getpass
import os
from typing import Union, Optional
from pydantic import BaseModel, Field
from langchain_openai import ChatOpenAI

'''
定义Pydantic模型以及用于生成格式化输出的大模型实例
'''
# 定义数据库插入的用户信息模型
class UserInfo(BaseModel):
    *"""Extracted user information, such as name, age, email, and phone number, if relevant."""*
*    *name: str = Field(description="The name of the user")
    age: Optional[int] = Field(description="The age of the user")
    email: str = Field(description="The email address of the user")
    phone: Optional[str] = Field(description="The phone number of the user")

# 定义正常生成模型回复的模型
class ConversationalResponse(BaseModel):
    *"""Respond to the user's query in a conversational manner. Be kind and helpful."""*
*    *response: str = Field(description="A conversational response to the user's query")

# 定义最终响应模型,可以是用户信息或一般响应
class FinalResponse(BaseModel):
    final_output: Union[UserInfo, ConversationalResponse]

if not os.environ.get("OPENAI_API_KEY"):
    os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter your OpenAI API key: ")

# 生成模型实例
llm = ChatOpenAI(model="gpt-4o-mini")

'''
考虑到设计的场景中用户需要执行数据库操作,我们首先需要实现一个连接数据库的函数。
在这个示例中,我们选择使用MySQL数据库。基于`UserInfo`模型的定义,构建一个相应的表结构,以便支持后续的数据插入和更新操作。
'''
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy.orm import sessionmaker

# 创建基类
Base = declarative_base()

# 定义 UserInfo 模型
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    age = Column(Integer)
    email = Column(String(100))
    phone = Column(String(15))

# 数据库连接 URI,这里要替换成自己的Mysql 连接信息,以下是各个字段的对应解释:
# root:MySQL 数据库的用户名。
# snowball950123:MySQL 数据库的密码。
# 192.168.110.131:MySQL 服务器的 IP 地址。
# langgraph_agent:要连接的数据库的名称。
# charset=utf8mb4:设置数据库的字符集为 utf8mb4,支持更广泛的 Unicode 字符
DATABASE_URI = 'mysql+pymysql://root:snowball950123@192.168.110.131/langgraph_agent?charset=utf8mb4'
engine = create_engine(DATABASE_URI, echo=True)

# 如果表不存在,则创建表
Base.metadata.create_all(engine)

# 创建会话
Session = sessionmaker(bind=engine)
session = Session()

'''
定义节点函数,其中chat_with_model作为路由节点将用户输入的文本转化成格式化输出,搭配Router Function构建分支
'''

def chat_with_model(state):
    *"""generate structured output"""*
*    *print(state)
    print("-----------------")
    messages = state['messages']
    structured_llm = llm.with_structured_output(FinalResponse)
    response = structured_llm.invoke(messages)
    return {"messages": [response]}

# final_answer用于直接生成响应
def final_answer(state):
    *"""generate natural language responses"""*
*    *print(state)
    print("-----------------")
    messages = state['messages'][-1]
    response = messages.final_output.response
    return {"messages": [response]}

#insert_db用于执行数据库插入操作
def insert_db(state):
    *"""Insert user information into the database"""*
*    *session = Session()  # 确保为每次操作创建新的会话
    try:
        result = state['messages'][-1]
        output = result.final_output
        # 创建用户实例
        user = User(name=output.name, age=output.age, email=output.email, phone=output.phone)
        # 添加到会话
        session.add(user)
        # 提交事务
        session.commit()
        return {"messages": [f"数据已成功存储至Mysql数据库。"]}
    except Exception as e:
        session.rollback()  # 出错时回滚
        return {"messages": [f"数据存储失败,错误原因:{e}"]}
    finally:
        session.close()  # 关闭会话

'''
构建图,以及router
'''
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage

class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

#定义generate_branch函数作为Router Function,根据经过chat_with_model节点后产生的不同Pydantic对象,
# 选择连接不同的节点,即final_answer或insert_db
def generate_branch(state: AgentState):
    result = state['messages'][-1]
    output = result.final_output

    if isinstance(output, UserInfo):
        return True
    elif isinstance(output, ConversationalResponse):
        return False

graph = StateGraph(AgentState)

# 添加三个节点
graph.add_node("chat_with_model", chat_with_model)
graph.add_node("final_answer", final_answer)
graph.add_node("insert_db", insert_db)

# 设置图的启动节点
graph.set_entry_point("chat_with_model")

# 设置条件边
graph.add_conditional_edges(
    "chat_with_model",
    generate_branch,
    {True: "insert_db", False: "final_answer"}
    )

# 设置终止节点
graph.set_finish_point("final_answer")
graph.set_finish_point("insert_db")

# 编译图
graph = graph.compile()

'''
测试
'''
query="我叫木羽,今年28岁,邮箱地址是snow@gmial.com,电话是1323521313"
input_message = {"messages": [HumanMessage(content=query)]}

result = graph.invoke(input_message)
print(result)

通过预定义的分支结构,可以根据用户的输入请求灵活适配不同的场景,在这个过程中,结构化输出对于路由至关重要,因为它们确保系统可以可靠地解释大模型的决定并采取行动。这种Router Agent(路由代理)的**优势就是可以精准的控制程序链路中的每一个细节,但同时也表现出来了这是一种相对有限的控制级别的代理架构,因为大模型通常只能控制单个决策。**想象一下上面的场景中,如果我们希望定义的insert_db不仅仅只是包含插入数据库,而是有一堆各式各样的工具,比如网络搜索,RAG等等,应该如何进一步的扩展呢? 难道要做对每一个工具在insert_db节点下再通过Router Function做分支判断吗?虽然可行,但总归并不是高效的做法。

Tool Calling Agent(工具调用代理)

Tool Calling Agent 是在Router Agent的基础上,大模型可以自主选择并使用多种工具来完成某个条件分支中的任务。**工具调用大家应该非常熟悉了,当我们希望代理与外部系统交互时,工具就非常有用。外部系统(例如API)通常需要特定的输入模式,而不是自然语言。例如,当我们绑定 API 作为工具时,我们赋予大模型对所需输入模式的感知,大模型就能根据用户的自然语言输入选择调用工具,并将返回符合该工具架构的输出

工具定义

@tool

通过 @tool 装饰器将一个函数转为工具

from langchain.tools import tool

@tool
def get_weather(location: str) -> str:
    """获取指定城市的天气信息"""
    # 模拟天气查询
    return f"{location} 的天气是晴朗,温度 25°C"

继承 BaseTool 来定义更复杂的工具

BaseTool 是 LangChain 中所有工具的抽象基类,定义了工具的核心接口。继承它需要实现:

  • name:工具的唯一标识符,供模型调用。

  • description:工具的描述,模型会根据它判断何时使用该工具。

  • _run:同步执行逻辑,必须实现。

  • _arun(可选):异步执行逻辑,如果不实现,异步调用时会回退到 _run

此外,你还可以覆盖 args_schema 来定义工具输入的结构(使用 Pydantic),从而实现参数校验和更清晰的文档。

ToolNodebind_tools 都支持任何继承自 BaseTool 的工具。后面用法和@tool的用法一致

from typing import Optional, Type
from langchain.tools import BaseTool
from pydantic import BaseModel, Field

# 1. 定义输入参数的 Pydantic 模型
class WeatherInput(BaseModel):
    location: str = Field(description="城市名称,例如北京")
    unit: Optional[str] = Field(default="celsius", description="温度单位,celsius 或 fahrenheit")

# 2. 继承 BaseTool
class WeatherTool(BaseTool):
    name: str = "get_weather"
    description: str = "获取指定城市的天气信息"
    args_schema: Type[BaseModel] = WeatherInput

    # 可以添加自定义属性(例如用于计数或缓存)
    call_count: int = 0

    def _run(self, location: str, unit: str = "celsius") -> str:
        """同步执行方法"""
        self.call_count += 1
        # 这里可以调用真实 API
        # 示例返回模拟数据
        return f"{location} 的天气是晴朗,温度 25°{unit}"

    async def _arun(self, location: str, unit: str = "celsius") -> str:
        """异步执行方法(可选)"""
        # 如果工具需要异步 I/O,可以在这里实现
        self.call_count += 1
        # 模拟异步 API 调用
        import asyncio
        await asyncio.sleep(0.1)
        return f"{location} 的天气是晴朗,温度 25°{unit}"

高级特性

参数验证

通过 args_schema 定义输入模型,LangChain 会在工具执行前自动校验参数,并给出清晰的错误提示,无需在 _run 中手动检查。

错误处理

你可以在 _run 中捕获异常,并返回用户友好的错误信息,或者抛出 ToolException 让框架处理。

工具状态与缓存

由于工具是对象,你可以保存内部状态,但注意在并发场景下可能引发竞态条件。如果需要在图的不同步骤中共享状态,推荐将状态存储在图的全局状态中,而不是工具内部。

异步工具

如果你使用异步运行时(如 FastAPI),实现 _arun 可以提升性能。在 LangGraph 中,ToolNode 在执行异步流(astream)时会优先使用 _arun,否则回退到 _run

工具的绑定

model = ChatOpenAI(model="gpt-4o")
model_with_tools = model.bind_tools(tools)

工具的使用

使用ToolNode自动执行工具

LangGraph 提供了一个预构建节点 ToolNode,它接收一个工具列表,能够自动执行由模型生成的工具调用。

  而Tool Calling Agent的本质原理是:让大模型根据用户的输入,自动的去判断应该使用哪个函数,并实际的执行,最后结合工具的响应结果 + 用户的原始问题作为完整的Prompt生成最终的问题。

from langgraph.graph import StateGraph, END, START, MessagesState
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_openai import ChatOpenAI

# 1. 定义工具
@tool
def get_weather(location: str) -> str:
    """获取指定城市的天气信息"""
    return f"{location} 的天气是晴朗,温度 25°C"

tools = [get_weather]

# 2. 创建模型并绑定工具
model = ChatOpenAI(model="gpt-4o")
model_with_tools = model.bind_tools(tools)

# 3. 定义图状态(通常使用 MessagesState 来管理消息)
workflow = StateGraph(MessagesState)

# 4. 定义节点
def call_model(state: MessagesState):
    response = model_with_tools.invoke(state["messages"])
    return {"messages": [response]}

# 创建工具节点,传入工具列表
tool_node = ToolNode(tools)

# 添加节点
workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)

# 设置入口
workflow.set_entry_point("agent")

# 添加条件边:根据模型输出中是否包含工具调用决定下一步
workflow.add_conditional_edges(
    "agent",
    tools_condition,  # 内置判断函数
    {"tools": "tools", END: END}
)

# 工具节点执行后返回 agent 节点
workflow.add_edge("tools", "agent")

# 编译图
app = workflow.compile()

手动调用工具

在节点定义中,手动调用工具

def call_tool_node(state: MessagesState):
    last_message = state["messages"][-1]
    tool_calls = last_message.tool_calls
    results = []
    for tc in tool_calls:
        # 根据工具名称选择工具并执行
        if tc["name"] == "get_weather":
            result = get_weather.invoke(tc["args"])
        else:
            result = "Unknown tool"
        results.append(
            ToolMessage(content=result, tool_call_id=tc["id"])
        )
    return {"messages": results}

基于 checkpointer 的短期记忆管理

     checkpointer通过一些数据结构来存储`State`状态中产生的信息,并且在每个`task`开始时去读取全局的状态。

checkPointer的作用

如果没有 Checkpointer,LangGraph 的图(Graph)在执行完一次任务后就会“失忆”。 有了它,你可以实现:

  • (单次会话中的)多轮对话(Memory): 自动存储与用户的历史对话。

  • 错误恢复(Fault Tolerance): 如果执行过程中报错,可以从上一个成功的节点重新开始。

  • 人机交互(Human-in-the-loop): 在执行到某个节点时暂停,等待人工审批,然后再继续执行。

  • 时间旅行(Time Travel): 允许开发者查看或回滚到图在过去的任意一个执行状态。

原理

LangGraph框架引入Thread(线程)概念来充当会话的角色。每个线程代表一个独特的交互或对话流。而thread_id是与特定执行线程关联的唯一标识符

Checkpointer 实际上是一个键值对数据库。它在图执行的每一个步骤(Step)之后,都会捕捉当前 State(状态) 的快照。

  • Thread ID: 每个对话或任务流都有一个唯一的 ID。

  • Checkpoint ID: 在同一个 Thread 中,每一步执行产生的唯一序列号。

当你再次运行同一个 thread_id 时,LangGraph 会自动从该线程最新的 Checkpoint 中加载 State。

分类

基于内存的-MemorySaver

LangGraph 框架有一个内置的持久层,通过checkpointer实现。当使用checkpointer编译图时,检查点会在每个超级步骤中保存图状态的checkpoint。这些checkpoint被保存到一个thread中,可以在图执行后访问。如下图所示:

超级步骤可以被认为是图节点上的单次迭代。并行运行的节点是同一超级步骤的一部分,而顺序运行的节点则属于单独的超级步骤。在图执行开始时,所有节点都开始处于inactive状态。当节点在其任何传入边缘(或“通道”)上接收到新消息(状态)时,该节点将变为active 。然后,活动节点运行其功能并以更新进行响应。在每个超级步骤结束时,没有传入消息的节点通过将自己标记为inactive 。当所有节点inactive并且没有消息在传输时,图执行终止。

image

 MemorySaver是实现上述流程的一种形式,它通过使用 defaultdict 将checkpointer存储在memory中

使用的方法非常简单,就是在创建任何 LangGraph图时,通过在编译图时添加MemorySaver来将其设置为保留其State状态中的数据,即:

from langgraph.checkpoint.memory import MemorySaver

# 1. 初始化存储器
memory = MemorySaver()

# 2. 编译图时加入 checkpointer
app = workflow.compile(checkpointer=memory)

# 3. 运行时指定 thread_id
config = {"configurable": {"thread_id": "user_123"}}
app.invoke(input_data, config=config)

持久化到数据库-SqliteSaver、PostgresSaver

   `SqliteSaver`是`checkponiter`的第二种实现形式,不同于`MemorySaver`仅通过字典的形式将状态信息存储在当前的运行环境下,`SqliteSaver`做的是持久化存储,这个方法会把`checkponiter`实际的存储在本地的`SQLite`数据库中,同时提供了异步环境下的实现`AsyncSqliteSaver`,适用于轻量级的应用落地场景。       
import sqlite3
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph

# 1. 创建数据库连接
conn = sqlite3.connect("state_db.sqlite", check_same_thread=False)

# 2. 初始化 Checkpointer
memory = SqliteSaver(conn)

# 3. 编译图时挂载
app = workflow.compile(checkpointer=memory)

# 4. 运行(指定 thread_id,数据会自动存入 .sqlite 文件)
config = {"configurable": {"thread_id": "session_1"}}
app.invoke({"input": "你好"}, config)

基于 store 的长期记忆管理

仅使用checkpointer,我们无法做到跨线程共享信息。这激发了对Store的需求。LangGraph通过BaseStore接口提供内置文档存储。与通过线程 ID 保存状态的checkpointer不同,存储使用自定义命名空间来组织数据。常见用例包括存储用户配置文件、构建知识库以及管理所有线程的全局首选项。

核心概念

命名空间 (Namespace)

Store 使用层级化的命名空间来组织数据,类似于文件夹路径。例如:

  • ("users", "user_123", "preferences"):存储特定用户的偏好。

  • ("documents", "legal"):存储全局共享的法律文档索引。

存储内容 (Items)

每个存储条目包含:

  • Value: 具体的 JSON 数据(如 {"fav_drink": "coffee"})。

  • Key: 该条目的唯一标识。

代码使用

第一步:创建并挂载 Store

目前官方提供了 InMemoryStore(内存)和基于数据库的实现。

from langgraph.store.memory import InMemoryStore
from langgraph.graph import StateGraph

# 1. 初始化 Store
my_store = InMemoryStore()

# 2. 编译图时传入(可以同时拥有 checkpointer 和 store)
app = workflow.compile(checkpointer=memory_saver, store=my_store)

第二步:在节点(Node)中操作 Store

在定义节点函数时,只需在参数名中加入 store,LangGraph 会自动注入它

def remember_user_preference(state, config, *, store):
    user_id = config["configurable"]["user_id"]
    namespace = ("users", user_id)
    
    # 存入信息
    store.put(namespace, "bio", {"likes": "Python", "coffee": "latte"})
    
    return {"messages": ["记住了!"]}

def greet_user(state, config, *, store):
    user_id = config["configurable"]["user_id"]
    # 跨线程读取信息
    item = store.get(("users", user_id), "bio")
    
    greeting = f"欢迎回来,喜欢{item.value['coffee']}的开发者!"
    return {"messages": [greeting]}

Human-in-the-loop

核心机制:断点 (Breakpoints)

在编译图时,你可以通过 interrupt_beforeinterrupt_after 指定在哪些节点前后自动暂停。

from langgraph.checkpoint.memory import MemorySaver

# 1. 定义 Checkpointer(必须有,否则无法恢复中断)
memory = MemorySaver()

# 2. 编译图时设置断点# 假设我们有一个名为 'action' 的节点负责执行敏感操作(如扣款)
app = workflow.compile(
    checkpointer=memory,
    interrupt_before=["action"]  # 在执行 'action' 节点之前暂停
)

# 3. 第一次运行
config = {"configurable": {"thread_id": "user_trial_1"}}
initial_input = {"messages": ["我要购买这个商品"]}

# 运行到断点处会自动停止,并返回当前状态for event in app.stream(initial_input, config):
    print(event)

# 此时程序结束,但状态已持久化。'action' 节点尚未执行。

人工介入与恢复执行

当图进入中断状态后,你可以执行以下操作:

A. 直接继续(审批通过)

如果人工审核通过,只需传入 None 作为输入,图会从断点处继续。

# 传入 None 表示“继续执行之前挂起的任务”
app.invoke(None, config)

B. 修改状态后继续(人工干预)

如果人工认为 AI 的计划有误,可以先修改状态再继续。

# 获取当前状态
current_state = app.get_state(config)

# 手动修改状态中的数据(例如修改付款金额)
new_values = {"total_price": 99.0} 
app.update_state(config, new_values)

# 然后继续
app.invoke(None, config)

实现 HITL 的标准流程图

进阶:显式人工节点

有时你不希望通过编译配置来控制,而是想在图的逻辑里显式地询问用户。这时可以结合 interrupt 函数(LangGraph 0.2+ 推荐)

from langgraph.types import interrupt

def human_approval_node(state):# 这一行会产生一个特定的中断,并向调用者请求数据
    answer = interrupt("请确认是否执行该操作?(yes/no)")
    return {"is_approved": answer == "yes"}

runtime

待补充

参考资料

官方文档:https://langchain-ai.github.io/langgraph/

学习教程:https://github.com/NanGePlus/LangGraphChatBot