概念定义
LangChain是一个开源的AI应用开发框架,通过标准化的接口和组件化设计,让开发者能够快速构建基于大语言模型的复杂应用,包括聊天机器人、文档问答系统、AI Agent等。详细解释
LangChain在2025年已发展为AI应用开发的事实标准,月下载量超过7000万次。框架的核心理念是成为连接LLM与业务系统的”通用胶水”,通过模块化设计降低AI应用开发门槛。 框架主要解决了LLM应用开发中的关键挑战:Prompt管理、外部数据集成、对话记忆、工具调用、多步推理等。2025年版本重点加强了多智能体协作(LangGraph)、表达式语言(LCEL)、人机交互循环等能力,形成了从原型到生产的完整开发链路。核心组件架构
1. LangChain Expression Language (LCEL)
声明式链构建Copy
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
# LCEL 链式构建
prompt = ChatPromptTemplate.from_template(
"将以下内容翻译成{language}: {text}"
)
model = ChatOpenAI()
output_parser = StrOutputParser()
# 使用 | 操作符构建链
chain = prompt | model | output_parser
# 执行链
result = chain.invoke({
"language": "法语",
"text": "Hello, how are you?"
})
Copy
# 复杂链组合
from langchain.schema.runnable import RunnableLambda, RunnablePassthrough
def analyze_sentiment(text):
# 情感分析逻辑
return {"sentiment": "positive", "confidence": 0.95}
def generate_response(inputs):
sentiment = inputs["sentiment"]
original_text = inputs["text"]
# 基于情感生成响应
return f"检测到{sentiment}情感,生成相应回复..."
# 构建情感分析+响应生成链
sentiment_chain = (
RunnablePassthrough.assign(
sentiment=RunnableLambda(analyze_sentiment)
)
| RunnableLambda(generate_response)
)
result = sentiment_chain.invoke({"text": "今天天气真好!"})
2. LangGraph多智能体框架
状态图定义Copy
from langgraph import StateGraph, END
from langchain_core.messages import HumanMessage
from typing import TypedDict, List
class AgentState(TypedDict):
messages: List[HumanMessage]
current_task: str
completed_tasks: List[str]
next_agent: str
def researcher_agent(state: AgentState):
"""研究员智能体"""
# 执行研究任务
research_result = conduct_research(state["current_task"])
return {
"messages": state["messages"] + [HumanMessage(content=research_result)],
"completed_tasks": state["completed_tasks"] + ["research"],
"next_agent": "writer"
}
def writer_agent(state: AgentState):
"""写作智能体"""
# 基于研究结果写作
article = write_article(state["messages"][-1].content)
return {
"messages": state["messages"] + [HumanMessage(content=article)],
"completed_tasks": state["completed_tasks"] + ["writing"],
"next_agent": "reviewer"
}
def reviewer_agent(state: AgentState):
"""审核智能体"""
# 审核文章质量
review_result = review_article(state["messages"][-1].content)
if review_result["approved"]:
return {"next_agent": END}
else:
return {"next_agent": "writer"} # 返回写作智能体修改
# 构建智能体工作流
workflow = StateGraph(AgentState)
workflow.add_node("researcher", researcher_agent)
workflow.add_node("writer", writer_agent)
workflow.add_node("reviewer", reviewer_agent)
# 定义状态转移
workflow.add_edge("researcher", "writer")
workflow.add_edge("writer", "reviewer")
workflow.add_conditional_edges(
"reviewer",
lambda x: x["next_agent"],
{"writer": "writer", END: END}
)
workflow.set_entry_point("researcher")
app = workflow.compile()
Copy
from langgraph import StateGraph
from langgraph.checkpoint.sqlite import SqliteSaver
class HumanInLoopState(TypedDict):
messages: List[str]
human_feedback: str
requires_approval: bool
def ai_agent(state):
"""AI智能体处理"""
response = llm.invoke(state["messages"][-1])
# 判断是否需要人工审核
if contains_sensitive_content(response):
return {
"messages": state["messages"] + [response],
"requires_approval": True
}
return {
"messages": state["messages"] + [response],
"requires_approval": False
}
def human_review(state):
"""人工审核节点"""
if state["requires_approval"]:
# 等待人工审核
return {"human_feedback": "pending"}
return {"human_feedback": "approved"}
# 配置检查点保存器
memory = SqliteSaver.from_conn_string(":memory:")
workflow = StateGraph(HumanInLoopState)
workflow.add_node("ai", ai_agent)
workflow.add_node("human", human_review)
workflow.add_conditional_edges(
"ai",
lambda x: "human" if x["requires_approval"] else END,
{"human": "human", END: END}
)
app = workflow.compile(checkpointer=memory)
3. 记忆管理系统
对话记忆压缩Copy
from langchain.memory import ConversationSummaryBufferMemory
from langchain.schema import BaseMessage
class EnhancedConversationMemory:
def __init__(self, llm, max_token_limit=4000):
self.memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=max_token_limit,
return_messages=True
)
self.conversation_history = []
def add_conversation_turn(self, human_input: str, ai_response: str):
"""添加对话轮次"""
# 添加到记忆
self.memory.chat_memory.add_user_message(human_input)
self.memory.chat_memory.add_ai_message(ai_response)
# 自动压缩历史对话
if self._should_compress():
self.memory.prune()
def get_relevant_context(self, current_input: str):
"""获取相关上下文"""
# 获取当前缓冲区内容
buffer_messages = self.memory.chat_memory.messages
# 获取压缩摘要
summary = self.memory.moving_summary_buffer
return {
"summary": summary,
"recent_messages": buffer_messages,
"total_interactions": len(self.conversation_history)
}
def _should_compress(self):
"""判断是否需要压缩"""
current_tokens = self.memory.llm.get_num_tokens_from_messages(
self.memory.chat_memory.messages
)
return current_tokens > self.memory.max_token_limit
Copy
from langchain.vectorstores import Chroma
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.memory import VectorStoreRetrieverMemory
class VectorMemoryManager:
def __init__(self):
self.embeddings = OpenAIEmbeddings()
self.vectorstore = Chroma(embedding_function=self.embeddings)
self.memory = VectorStoreRetrieverMemory(
vectorstore=self.vectorstore,
memory_key="relevant_history",
input_key="input"
)
def store_interaction(self, user_input: str, ai_response: str, metadata: dict):
"""存储交互记录"""
interaction_text = f"用户: {user_input}\nAI: {ai_response}"
# 添加到向量存储
self.vectorstore.add_texts(
[interaction_text],
metadatas=[{
"timestamp": datetime.now().isoformat(),
"user_id": metadata.get("user_id"),
"session_id": metadata.get("session_id"),
"interaction_type": metadata.get("type", "chat")
}]
)
def retrieve_relevant_memories(self, query: str, k: int = 5):
"""检索相关记忆"""
relevant_docs = self.vectorstore.similarity_search(query, k=k)
return [doc.page_content for doc in relevant_docs]
4. 工具集成系统
MCP协议集成Copy
from langchain_mcp import MCPToolkit
class MCPToolIntegration:
def __init__(self):
self.mcp_toolkit = MCPToolkit()
self.available_tools = {}
def register_mcp_server(self, server_url: str, server_name: str):
"""注册MCP服务器"""
tools = self.mcp_toolkit.connect_to_server(server_url)
self.available_tools[server_name] = tools
return f"已连接到 {server_name},获得 {len(tools)} 个工具"
def create_tool_calling_agent(self):
"""创建工具调用智能体"""
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.prompts import ChatPromptTemplate
# 合并所有可用工具
all_tools = []
for server_tools in self.available_tools.values():
all_tools.extend(server_tools)
# 创建智能体提示
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个有用的助手,可以使用各种工具来帮助用户。"),
("human", "{input}"),
("placeholder", "{agent_scratchpad}")
])
# 创建智能体
agent = create_tool_calling_agent(
llm=ChatOpenAI(model="gpt-4"),
tools=all_tools,
prompt=prompt
)
return AgentExecutor(agent=agent, tools=all_tools, verbose=True)
# 使用示例
mcp_integration = MCPToolIntegration()
mcp_integration.register_mcp_server("http://localhost:3001", "database_tools")
mcp_integration.register_mcp_server("http://localhost:3002", "file_tools")
agent_executor = mcp_integration.create_tool_calling_agent()
result = agent_executor.invoke({
"input": "查询数据库中的用户信息,并将结果保存到文件"
})
实际应用场景
1. RAG系统构建
Copy
from langchain.document_loaders import DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Chroma
from langchain.chains import RetrievalQA
class LangChainRAGSystem:
def __init__(self, documents_path: str):
self.documents_path = documents_path
self.setup_rag_pipeline()
def setup_rag_pipeline(self):
"""设置RAG流水线"""
# 1. 加载文档
loader = DirectoryLoader(
self.documents_path,
glob="**/*.md",
show_progress=True
)
documents = loader.load()
# 2. 文本分割
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", " ", ""]
)
chunks = text_splitter.split_documents(documents)
# 3. 向量化存储
self.vectorstore = Chroma.from_documents(
documents=chunks,
embedding=OpenAIEmbeddings(),
persist_directory="./chroma_db"
)
# 4. 构建检索链
self.qa_chain = RetrievalQA.from_chain_type(
llm=ChatOpenAI(model="gpt-4"),
chain_type="stuff",
retriever=self.vectorstore.as_retriever(
search_kwargs={"k": 5}
),
return_source_documents=True
)
def query(self, question: str):
"""查询RAG系统"""
result = self.qa_chain({"query": question})
return {
"answer": result["result"],
"sources": [doc.metadata for doc in result["source_documents"]],
"confidence": self.calculate_confidence(result)
}
2. 对话机器人系统
Copy
from langchain.chains import ConversationChain
from langchain.prompts import PromptTemplate
class LangChainChatbot:
def __init__(self):
self.memory = ConversationSummaryBufferMemory(
llm=ChatOpenAI(model="gpt-3.5-turbo"),
max_token_limit=2000,
return_messages=True
)
self.setup_conversation_chain()
def setup_conversation_chain(self):
"""设置对话链"""
template = """
你是一个有用、友善且知识渊博的AI助手。请基于以下对话历史回答用户的问题。
对话历史摘要:
{history}
当前对话:
人类:{input}
AI助手:
"""
prompt = PromptTemplate(
input_variables=["history", "input"],
template=template
)
self.conversation = ConversationChain(
llm=ChatOpenAI(model="gpt-4"),
prompt=prompt,
memory=self.memory,
verbose=True
)
def chat(self, user_input: str, user_id: str = None):
"""处理用户对话"""
try:
# 获取AI回复
response = self.conversation.predict(input=user_input)
# 记录对话
self.log_conversation(user_id, user_input, response)
return {
"response": response,
"status": "success",
"memory_usage": self.get_memory_stats()
}
except Exception as e:
return {
"response": "抱歉,我遇到了一些问题,请稍后再试。",
"status": "error",
"error": str(e)
}
def get_memory_stats(self):
"""获取记忆统计"""
return {
"total_messages": len(self.memory.chat_memory.messages),
"summary_exists": bool(self.memory.moving_summary_buffer)
}
3. 文档处理工作流
Copy
from langchain.chains.summarize import load_summarize_chain
from langchain.chains.mapreduce import MapReduceChain
class DocumentProcessingWorkflow:
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4")
self.setup_processing_chains()
def setup_processing_chains(self):
"""设置文档处理链"""
# 摘要链
self.summary_chain = load_summarize_chain(
llm=self.llm,
chain_type="map_reduce",
verbose=True
)
# 关键词提取链
keyword_prompt = PromptTemplate(
template="从以下文本中提取5个最重要的关键词:\n{text}\n关键词:",
input_variables=["text"]
)
self.keyword_chain = LLMChain(
llm=self.llm,
prompt=keyword_prompt
)
def process_document(self, document_path: str):
"""处理文档"""
# 加载文档
loader = UnstructuredFileLoader(document_path)
document = loader.load()[0]
# 文本分割
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=2000,
chunk_overlap=200
)
chunks = text_splitter.split_documents([document])
# 生成摘要
summary = self.summary_chain.run(chunks)
# 提取关键词
keywords = self.keyword_chain.run(document.page_content[:2000])
# 情感分析
sentiment = self.analyze_sentiment(document.page_content)
return {
"summary": summary,
"keywords": keywords.split(", "),
"sentiment": sentiment,
"word_count": len(document.page_content.split()),
"processing_time": time.time() - start_time
}
部署与优化
1. 生产环境配置
Copy
# langchain_config.yaml
langchain:
# LLM配置
llm:
provider: "openai"
model: "gpt-4"
temperature: 0.7
max_tokens: 2000
request_timeout: 30
# 缓存配置
cache:
type: "redis"
url: "redis://localhost:6379"
ttl: 3600
# 回调配置
callbacks:
- langsmith_tracer
- wandb_tracer
- custom_logger
# 错误处理
retry:
max_retries: 3
backoff_factor: 2
# 并发限制
concurrency:
max_concurrent_requests: 10
rate_limit: "100/minute"
2. 性能优化策略
Copy
from langchain.cache import RedisCache
from langchain.globals import set_llm_cache
import langchain
# 启用缓存
set_llm_cache(RedisCache(redis_url="redis://localhost:6379"))
# 启用调试
langchain.debug = True
langchain.verbose = True
# 自定义回调
class PerformanceCallback(BaseCallbackHandler):
def on_llm_start(self, serialized, prompts, **kwargs):
self.start_time = time.time()
def on_llm_end(self, response, **kwargs):
duration = time.time() - self.start_time
self.log_performance_metrics(duration, response)
def log_performance_metrics(self, duration, response):
metrics = {
"duration": duration,
"token_count": response.llm_output.get("token_usage", {}),
"model": response.llm_output.get("model_name"),
"timestamp": datetime.now().isoformat()
}
# 发送到监控系统
self.send_to_monitoring(metrics)
最佳实践建议
1. 开发模式选择
- 原型开发:使用LCEL快速构建MVP
- 生产应用:结合LangGraph构建复杂工作流
- 企业级:集成监控、缓存、错误处理
2. 性能优化
- 缓存策略:启用LLM响应缓存
- 批处理:合并多个请求降低延迟
- 异步处理:使用async/await提高并发
3. 错误处理
- 重试机制:配置指数退避重试
- 降级方案:准备备用模型和响应
- 监控告警:实时监控链执行状态
相关概念
延伸阅读
- LangChain官方文档 - 完整开发指南
- LangGraph教程 - 多智能体工作流
- LCEL指南 - 表达式语言详解
- LangChain中文社区 - 中文学习资源