LangChain 提供了一套模块化的工具和抽象,使开发者能够将 LLM 与外部数据源、记忆机制、工具调用和逻辑控制流结合起来,从而构建可扩展、可维护、具有上下文感知能力的智能应用,比如聊天机器人、问答系统、文档摘要器、自动化代理(Agents)等。

场景 使用的 LangChain 组件
智能客服 Memory + Chain + LLM
知识库问答 Retrieval + RAG + Vector DB
自动化办公助理 Agents + Tools(如 Gmail、Excel、API)
代码生成助手 LLM + Prompt + OutputParser
多轮推理任务 Agents + Plan-and-execute

LangChain 的架构围绕以下几个核心概念展开:

模型管理(LLM)

支持多种 LLM 提供商(如 OpenAI、qWen、Hugging Face、Google、Ollama 等)。

提供统一接口调用不同模型:

  • LLM(文本生成)ChatOpenAI, ChatTongyi(Qwen), ChatOllama, ChatAnthropic

  • Embedding(向量化)OpenAIEmbeddings, DashScopeEmbeddings(Qwen), OllamaEmbeddings

  • 多模态模型ChatGoogleVertexAI(支持图像+文本)

Ollama

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from langchain_ollama import OllamaEmbeddings, ChatOllama

llm = ChatOllama(base_url="http://localhost:11434", model="qwen3:4b", temperature=0)

# 加载检索知识库
embedding = OllamaEmbeddings(base_url="http://localhost:11434", model="nomic-embed-text")

loader = DirectoryLoader("./knowledge_base", glob="*.txt")
docs = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=300, chunk_overlap=50)
splits = text_splitter.split_documents(docs)

vectorstore = Chroma.from_documents(
documents=splits,
embedding=embedding,
persist_directory="./data/vectorstore"
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})

# 调用模型
query = "推荐几种最新款式的衣服"
docs = retriever.invoke(query)
context = "\n\n".join([doc.page_content for doc in docs])
if not context.strip():
return "知识库中未找到相关信息。"

prompt = f"你是一个客服,根据以下资料回答问题:\n\n{context}\n\n问题:{query}\n回答:"
response = llm.invoke(prompt)

OpenAI

1
2
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

千问

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from langchain_community.chat_models import ChatTongyi
from langchain_community.embeddings import DashScopeEmbeddings

llm = ChatTongyi(
model=os.getenv("MODEL_NAME", "qwen-turbo"), # 可选: qwen-turbo, qwen-plus, qwen-max, qwen-max-latest
streaming=True,
model_kwargs={"temperature": 0},
)

# 加载知识库文档
loader = DirectoryLoader("./knowledge_base", glob="*.txt")
docs = loader.load()
splits = RecursiveCharacterTextSplitter(chunk_size=300, chunk_overlap=50).split_documents(docs)
vectorstore = Chroma.from_documents(splits, DashScopeEmbeddings(model="text-embedding-v1"), persist_directory="./data")

retriever = vectorstore.as_retriever(k=3)
query = "最新的商品类型有哪些"
docs = retriever.invoke(query)
context = "\n\n".join([d.page_content for d in docs])

prompt = f"你是客服,请根据以下资料回答:\n{context}\n\n问题:{query}\n回答:"
response = llm.invoke(prompt)

提示工程(Prompt)

构建和管理提示词模板,支持动态填充、少量示例(few-shot)等。

  • PromptTemplate:传统字符串模板

  • ChatPromptTemplate:支持多轮对话消息(System/Human/AI)

  • FewShotPromptTemplate:带示例的提示

  • MessagesPlaceholder:用于 Agent 中的动态消息插槽

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

prompt = ChatPromptTemplate.from_messages([
("system", "你是一个订单查询助手。数据库包含 users(id, name) 和 orders(user_id, product, status) 表。\n"
"当用户提到姓名(如'张三'),请先从 users 表查 id,再关联 orders 表。\n"
"只用简洁明确的中文返回最终答案,不要输出 SQL 信息。"),
("human", "{input}"),
MessagesPlaceholder("agent_scratchpad")
])

db = SQLDatabase.from_uri(f"sqlite:///{DB_PATH}")

agent = create_sql_agent(llm=llm, db=db, extra_prompt_messages=prompt.messages, agent_type="openai-tools")

result = agent.invoke({"input": query})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from langchain_core.prompts import PromptTemplate
# 创建自动摘要记忆
summary_prompt = PromptTemplate.from_template(
"请将以下对话压缩成一段简洁的中文摘要,保留关键信息(如用户姓名、订单、问题类型):\n\n{summary}\n\n当前对话:\nHuman: {new_lines}\nAI:"
)

memory = ConversationSummaryMemory(
llm=llm,
memory_key="chat_history",
return_messages=True, # 以消息对象形式返回,兼容 Agent
human_prefix = "Human",
ai_prefix = "AI",
prompt = summary_prompt # 自定义摘要 prompt
)

链式编排(Chain)

将多个组件串联成流水线。

  • LLMChain:Prompt + LLM

  • SequentialChain:多步骤顺序执行

  • RetrievalQA:RAG 问答链

  • ConversationalRetrievalChain:带记忆的 RAG

  • 自定义 Chain:通过 RunnableSequenceRunnableLambda

1
2
3
4
5
6
7
from langchain.chains import RetrievalQA
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
retriever=retriever,
chain_type="stuff"
)
qa_chain.invoke("公司退货政策是什么?")

直接手动构建链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from langchain_community.chat_models import ChatTongyi
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

#################### llm
llm = ChatTongyi(
model=os.getenv("MODEL_NAME", "qwen-turbo"),
streaming=True,
model_kwargs={"temperature": 0},
)

######################## 构建路由 Chain
router_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个分类器,请严格输出以下三者之一:order_db, knowledge_base, general\n"
"规则:\n"
"- 涉及订单、用户、发货 → order_db\n"
"- 涉及退货、政策、说明书 → knowledge_base\n"
"- 其他 → general"),
("human", "问题:{input}")
])

router_chain = router_prompt | llm | StrOutputParser()
route = router_chain.invoke({"input": user_input}).strip()
print(f"[路由结果: {route}]")

智能决策(Agents)

让 LLM 自主调用工具(如搜索、数据库、API)。

  • Tools:封装函数为工具(@tool 装饰器)

  • Agent Types:

    • openai-tools:兼容 OpenAI Function Calling(推荐)
    • react:ReAct 思维链
    • self-ask-with-search:自问自答
  • AgentExecutor:执行循环(Thought → Action → Observation)

  • 调用和执行DB

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    from langchain_community.agent_toolkits import create_sql_agent

    db = SQLDatabase.from_uri(f"sqlite:///{DB_PATH}")
    llm = ChatOllama(base_url="http://localhost:11434", model="qwen3:4b", temperature=0)
    agent = create_sql_agent(llm=llm, db=db, agent_type="openai-tools")
    try:
    result = agent.invoke({"input": query})
    return result["output"]
    except Exception as e:
    return f"数据库查询失败: {str(e)}"
  • 调用和执行Tool

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    from langchain.agents import create_openai_tools_agent, AgentExecutor
    from langchain_ollama import OllamaEmbeddings, ChatOllama

    llm = ChatOllama(base_url="http://localhost:11434", model="qwen3:4b", temperature=0)

    # 定义 Tool
    tools = [
    Tool(
    name="订单与用户查询",
    func=query_database,
    description="用于查询用户的订单状态、购买记录、个人信息等结构化数据。"
    ),
    Tool(
    name="客服知识库查询",
    func=query_knowledge_base,
    description="用于查询退换货政策、产品使用说明、常见问题等非结构化知识。"
    )
    ]

    prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个智能客服助手。请根据用户问题选择合适的工具:\n"
    "- 如果问题涉及订单、发货、用户信息等,请使用‘订单与用户查询’工具。\n"
    "- 如果问题涉及退货、政策、产品说明等,请使用‘客服知识库查询’工具。\n"
    "请用简洁、友好的中文回答。"),
    MessagesPlaceholder("chat_history"),
    ("human", "{input}"),
    MessagesPlaceholder("agent_scratchpad")
    ])

    # 管理 LLM、工具 和 提示词
    agent = create_openai_tools_agent(llm, tools, prompt)
    executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)
    response = executor.invoke({
    "input": user_input,
    "chat_history": chat_history.messages
    })

工具调用(Tool)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from langchain_core.tools import Tool
from langchain_ollama import OllamaEmbeddings, ChatOllama

tools = [
Tool(
name="订单与用户查询",
func=query_database,
description="用于查询用户的订单状态、购买记录、个人信息等结构化数据。"
),
Tool(
name="客服知识库查询",
func=query_knowledge_base,
description="用于查询退换货政策、产品使用说明、常见问题等非结构化知识。"
)
]

llm = ChatOllama(base_url="http://localhost:11434", model="qwen3:4b", temperature=0)

agent = create_openai_tools_agent(llm, tools, "prompt 模板")
executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
handle_parsing_errors=True)

response = executor.invoke({"input": "你好"})
print("智能客服:", response["output"])
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from langchain.tools import tool

# @tool 装饰器用于将函数定义为一个工具
@tool
def detect_scenes(video_path: str) -> List[Tuple[float, float]]:
try:
# 此处为具体的工具实现
return scenes
except Exception as e:
# 发生错误时返回模拟数据
return [(0.0, 5.0), (5.0, 10.0), (10.0, 15.0)]

# 使用@tool装饰器标记函数为工具
@tool
def get_current_weather(location: str) -> str:
"""获取指定地点的当前天气"""
# 这里应该是实际调用天气API的逻辑,简化为返回模拟数据
return f"{location}的当前天气是晴天,温度25°C"


# 在Agent或其他链中可以直接调用工具函数
if __name__ == "__main__":
result = get_current_weather("北京")
print(result)

状态记忆(Memory)

  • 保存对话历史:让 LLM 知道上下文

  • 维护状态:跟踪用户偏好、任务进度

  • 支持多轮交互:如填表、分步任务

类型 适用场景 特点
ConversationBufferMemory 简单对话 保存全部历史(可能超 token)
ConversationBufferWindowMemory 长对话 仅保存最近 N 轮
ConversationSummaryMemory 超长对话 用 LLM 生成摘要
EntityMemory 实体跟踪 记住人物/物品属性
VectorStoreRetrieverMemory 知识库问答 从向量库检索相关记忆

使用场景

场景 推荐 Memory 类型
简单聊天机器人 ConversationBufferMemory
长对话(>10轮) ConversationBufferWindowMemory(k=5)
超长对话(>50轮) ConversationSummaryMemory
角色扮演/实体跟踪 ConversationEntityMemory
知识库问答 VectorStoreRetrieverMemory
生产环境持久化 RedisChatMessageHistory

基础记忆

ConversationBufferMemory 的 简单对话链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o")
memory = ConversationBufferMemory()

conversation = ConversationChain(
llm=llm,
memory=memory,
verbose=True
)

# 多轮对话
response1 = conversation.predict(input="你好!")
response2 = conversation.predict(input="我叫林然")
response3 = conversation.predict(input="我刚才说了什么?")

print(response3) # → "你刚才说你叫林然"
1
2
3
4
5
6
# 查看记忆内容
print(memory.load_memory_variables({}))
# {'history': 'Human: 你好!\nAI: 你好!有什么我可以帮你的吗?\nHuman: 我叫林然\nAI: 很高兴认识你,林然!'}

# 清空记忆
memory.clear()

窗口记忆(长对话)

避免 token 超限,只保留最近 3 轮:Window Memory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from langchain.memory import ConversationBufferWindowMemory

memory = ConversationBufferWindowMemory(k=3) # 保留最近3轮

conversation = ConversationChain(
llm=ChatOpenAI(),
memory=memory,
verbose=True
)

# 对话超过3轮后,最早的内容会被丢弃
for i in range(5):
conversation.predict(input=f"消息{i+1}")

# 查看当前记忆(应只有最后3轮)
history = memory.load_memory_variables({})["history"]
print(f"\n当前记忆轮数: {history.count('Human:')}")
print(f"记忆内容:\n{history}")

摘要记忆(超长对话)

用 LLM 自动摘要历史:Summary Memory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from langchain.memory import ConversationSummaryMemory

memory = ConversationSummaryMemory(
llm=ChatOpenAI(),
memory_key="chat_history",
return_messages=True
)

conversation = ConversationChain(
llm=ChatOpenAI(),
memory=memory,
verbose=True
)

# 随着对话增长,memory 会自动摘要
conversation.predict(input="我们来玩角色扮演")
conversation.predict(input="你扮演医生")
conversation.predict(input="我头痛三天了")

特别注意:

  • 如果 return_messages=True 则存储的对象为 List[BaseMessage],获取时需要转换为 文本内容,一般需要注入 记忆注入点MessagesPlaceholder("chat_history"),还有压缩时summary_prompt的影响,是独立存储时才需要获取最近几轮的切片操作。

    1
    2
    3
    4
    5
    # 将消息转为文本
    history = "\n".join(
    f"{'用户' if m.type == 'human' else '客服'}: {m.content}"
    for m in self.get_summary_recent(6) # 最近6轮
    )
  • 如果 return_messages=False ,不需要注入 MessagesPlaceholder("chat_history")

实体记忆

记住对话中提到的实体属性(如人物、物品):EntityMemory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from langchain.memory import ConversationEntityMemory
from langchain.memory.prompt import ENTITY_MEMORY_CONVERSATION_TEMPLATE

entity_memory = ConversationEntityMemory(llm=ChatOpenAI())

conversation = ConversationChain(
llm=ChatOpenAI(),
prompt=ENTITY_MEMORY_CONVERSATION_TEMPLATE,
memory=entity_memory,
verbose=True
)

conversation.predict(input="我叫林然,32岁")
conversation.predict(input="我喜欢喝红茶")
conversation.predict(input="我住在北京")

# 查看实体记忆
print(entity_memory.entity_cache)
# {'林然': '32岁,喜欢喝红茶,住在北京'}

向量记忆(知识库)

向量数据库中检索相关记忆(适合知识库),向量记忆:VectorStoreRetrieverMemory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings

# 1. 创建向量库
embedding = OpenAIEmbeddings()
vectorstore = FAISS.from_texts(
["林然喜欢红茶", "陈默是林然的前男友", "北京今天下雨"],
embedding
)
retriever = vectorstore.as_retriever()

# 2. 创建记忆
memory = VectorStoreRetrieverMemory(retriever=retriever)

# 3. 在链中使用
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate

template = """你是一个助手,会参考相关记忆回答问题。
记忆: {retrieved_memories}
问题: {input}
回答:"""

prompt = PromptTemplate.from_template(template)
chain = LLMChain(
llm=ChatOpenAI(),
prompt=prompt,
memory=memory,
verbose=True
)

chain.predict(input="林然喜欢什么?") # → 从向量库检索"林然喜欢红茶"

Agent 工具

让 Agent 记住工具调用历史:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from langchain.agents import AgentType, initialize_agent
from langchain.memory import ConversationBufferMemory

memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)

agent = initialize_agent(
tools=[],
llm=ChatOpenAI(),
agent=AgentType.CONVERSATIONAL_REACT_DESCRIPTION,
memory=memory,
verbose=True
)

agent.run("记住我的名字是林然")
agent.run("我叫什么?") # → "你叫林然"

持久化记忆

保存到文件

1
2
3
4
5
6
7
8
9
import pickle

# 保存
with open('memory.pkl', 'wb') as f:
pickle.dump(memory, f)

# 加载
with open('memory.pkl', 'rb') as f:
memory = pickle.load(f)

保存到 Redis(生产推荐)

1
2
3
4
5
6
7
8
9
10
11
12
from langchain.memory import ConversationBufferMemory
from langchain_community.storage import RedisChatMessageHistory

message_history = RedisChatMessageHistory(
session_id="user_123",
url="redis://localhost:6379/0"
)

memory = ConversationBufferMemory(
chat_memory=message_history,
return_messages=True
)

保存到 PostgreSQL

1
2
3
4
5
6
7
from langchain_community.chat_message_histories import PostgresChatMessageHistory

history = PostgresChatMessageHistory(
session_id="user_123",
connection_string="postgresql://user:pass@localhost/db"
)
memory = ConversationBufferMemory(chat_memory=history)

高级技巧

  1. 自定义 Memory Key

    1
    2
    3
    4
    memory = ConversationBufferMemory(
    memory_key="conversation_history", # 自定义 key
    input_key="user_input" # 指定输入 key
    )
  2. LangGraph 中使用 Memory

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    from langgraph.graph import StateGraph, END

    class AgentState(TypedDict):
    messages: Annotated[list, add_messages]
    memory: dict # 自定义 memory 字段

    def call_model(state: AgentState):
    # 从 state["memory"] 读取历史
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

    workflow = StateGraph(AgentState)
    workflow.add_node("agent", call_model)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    # LangGraph 中的自定义记忆
    from typing import Annotated, Sequence
    from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
    from langgraph.graph import StateGraph, START, END
    from langgraph.graph.message import add_messages
    from langchain_openai import ChatOpenAI
    import os
    from dotenv import load_dotenv

    load_dotenv()

    # 定义状态
    class GraphState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], add_messages]
    user_name: str # 自定义记忆字段

    # 节点函数
    def call_model(state: GraphState):
    llm = ChatOpenAI(model="gpt-4o")
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

    def extract_name(state: GraphState):
    """从对话中提取用户名"""
    if not state.get("user_name"):
    for msg in reversed(state["messages"]):
    if isinstance(msg, HumanMessage):
    if "我叫" in msg.content:
    name = msg.content.split("我叫")[-1].strip("。!")
    return {"user_name": name}
    return {"user_name": state.get("user_name", "")}

    # 构建图
    def create_graph():
    workflow = StateGraph(GraphState)
    workflow.add_node("extract_name", extract_name)
    workflow.add_node("call_model", call_model)

    workflow.set_entry_point("extract_name")
    workflow.add_edge("extract_name", "call_model")
    workflow.add_edge("call_model", END)

    return workflow.compile()

    def main():
    app = create_graph()

    print("=== LangGraph 记忆测试 ===")
    inputs = {"messages": [HumanMessage(content="你好!我叫林然")]}
    result = app.invoke(inputs)

    print(f"\nAI 回答: {result['messages'][-1].content}")
    print(f"提取的用户名: {result['user_name']}")

    # 第二轮对话
    inputs2 = {
    "messages": result["messages"] + [HumanMessage(content="你还记得我吗?")],
    "user_name": result["user_name"]
    }
    result2 = app.invoke(inputs2)
    print(f"\n第二轮回答: {result2['messages'][-1].content}")

    if __name__ == "__main__":
    main()
  3. 记忆过滤(敏感信息)

    1
    2
    3
    4
    5
    class SafeMemory(ConversationBufferMemory):
    def save_context(self, inputs, outputs):
    # 过滤身份证/手机号
    inputs = {k: v.replace("138****1234", "[PHONE]") for k, v in inputs.items()}
    super().save_context(inputs, outputs)

上下文理解

从之前的历史记录中获取信息,并注入到当前对话中。

  • 从 langgraph 的 AgentState 对象中获取 summary,然后通过LLM提取关键上下文数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    # 1. 获取原始问题
    original_query = "介绍一下这个商品"

    # 2. 结合摘要和最近消息,重写查询
    summary = state.get("summary", "")
    recent_msgs = state["messages"][-3:] # 最近2-3条

    # 构建上下文字符串
    context_parts = []
    if summary:
    context_parts.append(f"背景摘要:{summary}")
    for msg in recent_msgs[:-1]: # 不包含当前问题
    role = "用户" if isinstance(msg, HumanMessage) else "客服"
    context_parts.append(f"{role}: {msg.content}")

    context_str = "\n".join(context_parts)

    # 3. 让 LLM 重写查询(消除指代)
    if context_str.strip():
    rewrite_prompt = [
    SystemMessage(content="你是一个查询重写助手。请结合对话上下文,将用户的当前问题改写为一个独立、明确、无指代的完整问题。"),
    HumanMessage(content=f"上下文:\n{context_str}\n\n当前问题:{original_query}\n\n改写后的问题:")
    ]
    rewritten = llm.invoke(rewrite_prompt)
    enhanced_query = rewritten.content.strip()
    else:
    enhanced_query = original_query

    print(f"[RAG 调试] 原始查询: '{original_query}' → 增强查询: '{enhanced_query}'")

    # 4. 用增强查询执行 RAG
    retriever = init_rag().as_retriever(k=3)
    docs = retriever.invoke(enhanced_query)
    context = "\n\n".join([d.page_content for d in docs]) or "知识库中未找到相关信息。"

    answer_prompt = ChatPromptTemplate.from_messages([
    ("system", "你是客服,请根据以下资料简洁回答问题,不要反问。"),
    ("human", f"资料:{context}\n\n问题:{enhanced_query}")
    ])
    rag_chain = answer_prompt | llm | StrOutputParser()

    result = rag_chain.invoke({"context": context, "query": enhanced_query})
  • 从 langgraph 的 AgentState 对象中获取 summary,然后自己解析提取关键上下文数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    # 1. 从对话历史中提取用户身份
    user_name = None
    for msg in state["messages"]:
    if isinstance(msg, HumanMessage):
    content = msg.content
    if "我是" in content or "我叫" in content or "我的名字是" in content:
    # 简单提取姓名(可扩展为 NER)
    for keyword in ["我是", "我叫", "我的名字是"]:
    if keyword in content:
    name = content.split(keyword)[-1].strip("。!?,. ")
    if name:
    user_name = name
    break
    if user_name:
    break

    # 2. 改写用户查询,显式包含用户名
    original_query = state["messages"][-1].content
    enhanced_query = f"(当前用户姓名:{user_name}{original_query}" if user_name else original_query
    print(f"[DB 调试] 增强后查询: {enhanced_query}")
  • ConversationSummaryMemory 对象中获取 chat_history,然后通过LLM解析提取关键上下文数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    # 创建自动摘要记忆
    summary_prompt = PromptTemplate.from_template(
    "请将以下对话压缩成一段简洁的中文摘要,保留关键信息(如用户姓名、订单、问题类型):\n\n{summary}\n\n当前对话:\nHuman: {new_lines}\nAI:"
    )

    summary_memory = ConversationSummaryMemory(
    llm=self.llm,
    memory_key="chat_history",
    return_messages=False, # 如果是 True 以消息对象形式返回,兼容 Agent。 如果 False 则返回纯文本摘要
    human_prefix="Human",
    ai_prefix="AI",
    prompt=summary_prompt # 自定义摘要 prompt
    )


    def get_summary_recent(self, recent_n=6):
    """获取最近 N 条对话消息"""
    messages = summary_memory.load_memory_variables({})["chat_history"]
    if len(messages) > recent_n:
    return messages[-recent_n:] # 最近 N 条
    return messages


    def extract_last_user_name(self) -> str:
    recent_messages = self.get_summary_recent()
    prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个信息提取助手。请仔细阅读以下对话历史,"
    "并提取用户在**最近一次发言**中提到的关键信息。"
    "只返回所需内容,不要解释。"),
    ("human", "对话历史:{history}\n\n"
    "问题:用户最近提到的姓名是什么?如果没有,返回“未知”。\n"
    "姓名:")
    ])

    chain = prompt | self.llm | StrOutputParser()
    return chain.invoke({"history": recent_messages})

状态图(LangGraph)

用于构建 可控、循环、分支、多智能体协作 的复杂工作流。

  • 基于状态(State)驱动

  • 支持条件分支、循环、并行

  • 天然支持多智能体路由

  • 可中断、可恢复、可调试

1
2
3
4
5
6
7
8
9
10
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
messages: Annotated[list, add_messages]

builder = StateGraph(State)
builder.add_node("chatbot", chatbot_node)
builder.add_edge(START, "chatbot")
builder.add_edge("chatbot", END)
graph = builder.compile()

基础 Demo

加载、解析、检索文档,构建问答链和模板,输出响应

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 1. 加载文档
loader = PyPDFLoader("C:\\Users\\haeng\\Desktop\\大数据技术详解.pdf")
docs = loader.load()

# 2. 分块
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(docs)

# 3. 嵌入并存入向量库
vectorstore = Chroma.from_documents(documents=splits
, embedding=OllamaEmbeddings(base_url="http://localhost:11434", model="nomic-embed-text")
, persist_directory="./data/chroma_db")

# 4. 创建检索器
retriever = vectorstore.as_retriever()

# 5. 构建问答链
llm = ChatOllama(base_url="http://localhost:11434", model="qwen3:0.6b", temperature=0)

system_prompt = (
"你是一个乐于助人的智能助手,请根据以下上下文回答用户问题。"
"\n\n"
"{context}"
)
# 构建提示模板
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
("human", "{input}"),
])

# 创建 chain
question_answer_chain = create_stuff_documents_chain(llm, prompt)
rag_chain = create_retrieval_chain(retriever, question_answer_chain)

# 6. 提问
response = rag_chain.invoke({"input": "总结一下这份文档主要讲了什么内容?"})
print(response["answer"])

智能客服机器人

LangChain Agent(路由决策):

  • 若问题涉及订单/用户/状态 → 调用 SQL DB 工具

  • 若问题涉及政策/说明/FAQ → 调用 RAG 知识库工具(knowledge_base 文件夹下)

    • return_policy.txt(退换货政策)
    • product_guide.txt(产品使用指南)
  • 其他问题直接调用LLM

单智能体

支持多轮对话,记忆上下文

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# ----------------------------
# 1. 初始化数据库
# ----------------------------
DB_PATH = "../data/customer_service.db"

def init_database():
if os.path.exists(DB_PATH):
return
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, email TEXT)
''')
cursor.execute('''
CREATE TABLE orders (
id INTEGER PRIMARY KEY,
user_id INTEGER,
product TEXT,
status TEXT,
order_date TEXT,
FOREIGN KEY(user_id) REFERENCES users(id)
)
''')
cursor.executemany("INSERT INTO users VALUES (?, ?, ?)", [
(1, "张三", "zhangsan@example.com"),
(2, "李四", "lisi@example.com"),
])
cursor.executemany("INSERT INTO orders VALUES (?, ?, ?, ?, ?)", [
(1, 1, "无线蓝牙耳机", "delivered", "2024-10-01"),
(2, 2, "机械键盘", "shipped", "2025-11-01"),
(3, 1, "智能手机", "shipped", "2025-09-01"),
(4, 2, "显示器支架", "pending", "2025-11-11"),
])
conn.commit()
conn.close()


# ----------------------------
# 2. 初始化 RAG 知识库
# ----------------------------
KNOWLEDGE_DIR = "./knowledge_base"
VECTOR_STORE_PATH = "../data/vectorstore"
def init_rag():
embedding = OllamaEmbeddings(base_url="http://localhost:11434", model="nomic-embed-text")

if os.path.exists(VECTOR_STORE_PATH):
vectorstore = Chroma(
persist_directory=VECTOR_STORE_PATH,
embedding_function=embedding
)
return vectorstore.as_retriever(search_kwargs={"k": 3})

loader = DirectoryLoader(KNOWLEDGE_DIR, glob="*.txt")
docs = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=300, chunk_overlap=50)
splits = text_splitter.split_documents(docs)

vectorstore = Chroma.from_documents(
documents=splits,
embedding=embedding,
persist_directory=VECTOR_STORE_PATH
)
return vectorstore.as_retriever(search_kwargs={"k": 3})


# ----------------------------
# 3. 定义工具函数
# ----------------------------
def query_database(query: str) -> str:
"""查询订单、用户等结构化数据"""
db = SQLDatabase.from_uri(f"sqlite:///{DB_PATH}")
llm = ChatOllama(base_url="http://localhost:11434", model="qwen3:4b", temperature=0)
from langchain_community.agent_toolkits import create_sql_agent
agent = create_sql_agent(llm=llm, db=db, agent_type="openai-tools")
try:
result = agent.invoke({"input": query})
return result["output"]
except Exception as e:
return f"数据库查询失败: {str(e)}"


def query_knowledge_base(query: str) -> str:
"""查询退换货政策、产品说明等非结构化知识"""
retriever = init_rag()
docs = retriever.invoke(query)
context = "\n\n".join([doc.page_content for doc in docs])
if not context.strip():
return "知识库中未找到相关信息。"
# 使用 LLM 生成自然语言回答
llm = ChatOllama(base_url="http://localhost:11434", model="qwen3:4b", temperature=0)
prompt = f"你是一个客服,根据以下资料回答问题:\n\n{context}\n\n问题:{query}\n回答:"
response = llm.invoke(prompt)
return response.content


# ----------------------------
# 4. 创建混合 Agent,指定 Tool
# ----------------------------
def create_hybrid_agent():
tools = [
Tool(
name="订单与用户查询",
func=query_database,
description="用于查询用户的订单状态、购买记录、个人信息等结构化数据。"
),
Tool(
name="客服知识库查询",
func=query_knowledge_base,
description="用于查询退换货政策、产品使用说明、常见问题等非结构化知识。"
)
]

llm = ChatOllama(base_url="http://localhost:11434", model="qwen3:4b", temperature=0)

# Agent 系统提示
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个智能客服助手。请根据用户问题选择合适的工具:\n"
"- 如果问题涉及订单、发货、用户信息等,请使用‘订单与用户查询’工具。\n"
"- 如果问题涉及退货、政策、产品说明等,请使用‘客服知识库查询’工具。\n"
"请用简洁、友好的中文回答。"),
MessagesPlaceholder("chat_history"), # ←←← 记忆注入点
("human", "{input}"),
MessagesPlaceholder("agent_scratchpad")
])

agent = create_openai_tools_agent(llm, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)
return executor


# ----------------------------
# 5. 主程序
# ----------------------------
def main():
init_database()
print("智能客服机器人(数据库 + RAG 知识库 + 多轮记忆)已启动!")
print("你可以提问:")
print(" - 张三最新的订单状态?")
print(" - 退货政策是什么?")
print(" - 蓝牙耳机怎么用?")
print("输入 'quit' 退出。\n")

agent = create_hybrid_agent()
chat_history = ChatMessageHistory() # 存储对话历史

while True:
user_input = input("用户: ").strip()
if not user_input:
continue

if user_input and user_input.lower() in {"quit", "exit"}:
print("再见!")
break

try:
# response = agent.invoke({"input": user_input})
response = agent.invoke({
"input": user_input,
"chat_history": chat_history.messages
})
# 保存本轮对话到历史
chat_history.add_user_message(user_input)
chat_history.add_ai_message(response["output"])

print("智能客服:", response["output"])
except Exception as e:
error_msg = "抱歉,我暂时无法处理这个问题,需要人工介入。"
print("智能客服:", error_msg, str(e))
# 也保存错误回复到历史,避免后续混乱
chat_history.add_user_message(user_input)
chat_history.add_ai_message(error_msg)


if __name__ == "__main__":
main()

多智能体(LangGraph)

将单体客服系统拆分为多个专业智能体(Agents),并通过 LangGraph 实现协作控制流,能显著提升系统的模块化、可维护性和推理能力

本智能体不支持上下文理解,更多代码和使用方式,参照 GitHub(自动摘要记忆,支持多轮对话,上下文理解)

多智能体 + LangGraph 协作流程图:

pengline-converted-image
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# ----------------------------
# 1. 初始化数据库 & 知识库
# ----------------------------
DB_PATH = "../data/customer_service.db"
KNOWLEDGE_DIR = "knowledge_base"
VECTOR_STORE_PATH = "../data/vectorstore"

def init_database():
# 此处省略(详细实现可参照GitHub 代码)
conn.commit(); conn.close()

def init_rag():
# 此处省略(详细实现可参照GitHub 代码)
return vs

# ----------------------------
# 2. 定义状态图结构
# ----------------------------
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
next: str # 下一步执行的 agent

# ----------------------------
# 3. 定义各智能体
# ----------------------------
llm = ChatOllama(base_url="http://localhost:11434", model="qwen3:8b", temperature=0)

# --- Router Agent ---
class RouteQuery(BaseModel):
datasource: Literal["order_db", "knowledge_base", "general"] = Field(
..., description="问题类型:order_db(订单/用户)、knowledge_base(政策/说明)、general(其他)"
)

def router_agent(state: AgentState):
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个路由智能体,请判断用户问题属于哪一类:\n"
"- order_db:涉及订单、发货、用户信息等\n"
"- knowledge_base:涉及退货、产品说明、FAQ等\n"
"- general:闲聊、无法分类的问题"),
("human", "{input}")
])
router_llm = llm.with_structured_output(RouteQuery)
response = router_llm.invoke(prompt.format_messages(input=state["messages"][-1].content))
return {"next": response.datasource}

# --- DB Agent ---
def db_agent(state: AgentState):
db = SQLDatabase.from_uri(f"sqlite:///{DB_PATH}")
agent = create_sql_agent(
llm=llm,
db=db,
agent_type="openai-tools", # LangChain 会自动适配 Ollama 的 function calling
handle_parsing_errors=True
)
last_msg = state["messages"][-1].content
try:
result = agent.invoke({"input": last_msg})
answer = result["output"]
except Exception as e:
answer = f"数据库查询失败:{str(e)}"
return {"messages": [AIMessage(content=answer)]}

# --- RAG Agent ---
def rag_agent(state: AgentState):
vectorstore = init_rag()
retriever = vectorstore.as_retriever(k=3)
query = state["messages"][-1].content
docs = retriever.invoke(query)
context = "\n\n".join([d.page_content for d in docs])
if not context.strip():
return {"messages": [AIMessage(content="知识库中未找到相关信息。")]}
prompt = f"你是客服,请根据以下资料回答:\n{context}\n\n问题:{query}\n回答:"
response = llm.invoke(prompt)
return {"messages": [AIMessage(content=response.content)]}

# --- General Agent ---
def general_agent(state: AgentState):
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个友好客服,但无法访问订单或知识库。请礼貌回应。"),
("human", "{input}")
])
response = llm.invoke(prompt.format_messages(input=state["messages"][-1].content))
return {"messages": [AIMessage(content=response.content)]}

# ----------------------------
# 4. 构建 LangGraph 工作流
# ----------------------------
def route_to_agent(state: AgentState) -> Literal["db_agent", "rag_agent", "general_agent"]:
return state["next"]

graph_builder = StateGraph(AgentState)
graph_builder.add_node("router", router_agent)
graph_builder.add_node("db_agent", db_agent)
graph_builder.add_node("rag_agent", rag_agent)
graph_builder.add_node("general_agent", general_agent)

graph_builder.add_conditional_edges(
"router",
route_to_agent,
{"order_db": "db_agent", "knowledge_base": "rag_agent", "general": "general_agent"}
)

graph_builder.add_edge("db_agent", END)
graph_builder.add_edge("rag_agent", END)
graph_builder.add_edge("general_agent", END)

graph_builder.set_entry_point("router")
app = graph_builder.compile()

多智能体(Chain)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# ----------------------------
# 1. 初始化 & 环境
# ----------------------------
DB_PATH = "../data/customer_service.db"
KNOWLEDGE_DIR = "knowledge_base"
VECTOR_STORE_PATH = "../data/vectorstore"

# 加载 .env 文件中的环境变量
load_dotenv()

# 全局 LLM(用于路由 + General + RAG 生成)
llm = ChatTongyi(
model=os.getenv("MODEL_NAME", "qwen-turbo"),
streaming=True,
model_kwargs={"temperature": 0},
)

# ----------------------------
# 2. 初始化数据库
# ----------------------------
def init_database():
# 此处省略(详细实现可参照GitHub 代码)
conn.commit(); conn.close()

# ----------------------------
# 3. 初始化 RAG(Qwen Embedding)
# ----------------------------
def init_rag():
# 此处省略(详细实现可参照GitHub 代码)
return vs.as_retriever(k=3)

# ----------------------------
# 4. 子系统实现(封装为 Runnable)
# ----------------------------

# --- 路由 Chain ---
router_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个分类器,请严格输出以下三者之一:order_db, knowledge_base, general\n"
"规则:\n"
"- 涉及订单、用户、发货 → order_db\n"
"- 涉及退货、政策、说明书 → knowledge_base\n"
"- 其他 → general"),
("human", "问题:{input}")
])
router_chain = router_prompt | llm | StrOutputParser()

# --- DB Agent(封装为 Runnable)---
def get_db_response(query: str) -> str:
db = SQLDatabase.from_uri(f"sqlite:///{DB_PATH}")
agent = create_sql_agent(llm=llm, db=db, agent_type="openai-tools", handle_parsing_errors=True, verbose=True)

try:
return agent.invoke({"input": query})
except Exception as e:
return f"数据库查询失败:{str(e)}"

db_runnable = RunnableLambda(get_db_response)

# --- RAG Chain ---
def get_rag_response(query: str) -> str:
retriever = init_rag()
docs = retriever.invoke(query)
context = "\n\n".join([d.page_content for d in docs])
if not context.strip():
return "知识库中未找到相关信息。"
rag_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个智能客服,请根据以下资料简洁回答用户问题。"),
("human", "资料:{context}\n\n问题:{query}")
])
chain = rag_prompt | llm | StrOutputParser()
return chain.invoke({"context": context, "query": query})

rag_runnable = RunnableLambda(get_rag_response)

# --- General Chain ---
general_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个友好但功能有限的客服,无法访问订单或知识库。请礼貌回应用户的问题。"),
("human", "{input}")
])
general_chain = general_prompt | llm | StrOutputParser()

# ----------------------------
# 5. 主执行逻辑(Chain 风格路由)
# ----------------------------
def route_and_run(user_input: str) -> str:
# Step 1: 路由
route = router_chain.invoke({"input": user_input}).strip()
print(f"[路由结果: {route}]")

# Step 2: 根据路由调用对应子系统
if route == "order_db":
return db_runnable.invoke(user_input)
elif route == "knowledge_base":
return rag_runnable.invoke(user_input)
else:
return general_chain.invoke({"input": user_input})