概念定义

LangChain是一个开源的AI应用开发框架,通过标准化的接口和组件化设计,让开发者能够快速构建基于大语言模型的复杂应用,包括聊天机器人、文档问答系统、AI Agent等。

详细解释

LangChain在2025年已发展为AI应用开发的事实标准,月下载量超过7000万次。框架的核心理念是成为连接LLM与业务系统的”通用胶水”,通过模块化设计降低AI应用开发门槛。 框架主要解决了LLM应用开发中的关键挑战:Prompt管理、外部数据集成、对话记忆、工具调用、多步推理等。2025年版本重点加强了多智能体协作(LangGraph)、表达式语言(LCEL)、人机交互循环等能力,形成了从原型到生产的完整开发链路。

核心组件架构

1. LangChain Expression Language (LCEL)

声明式链构建
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

# LCEL 链式构建
prompt = ChatPromptTemplate.from_template(
    "将以下内容翻译成{language}: {text}"
)
model = ChatOpenAI()
output_parser = StrOutputParser()

# 使用 | 操作符构建链
chain = prompt | model | output_parser

# 执行链
result = chain.invoke({
    "language": "法语", 
    "text": "Hello, how are you?"
})
可组合性设计
# 复杂链组合
from langchain.schema.runnable import RunnableLambda, RunnablePassthrough

def analyze_sentiment(text):
    # 情感分析逻辑
    return {"sentiment": "positive", "confidence": 0.95}

def generate_response(inputs):
    sentiment = inputs["sentiment"]
    original_text = inputs["text"]
    # 基于情感生成响应
    return f"检测到{sentiment}情感,生成相应回复..."

# 构建情感分析+响应生成链
sentiment_chain = (
    RunnablePassthrough.assign(
        sentiment=RunnableLambda(analyze_sentiment)
    )
    | RunnableLambda(generate_response)
)

result = sentiment_chain.invoke({"text": "今天天气真好!"})

2. LangGraph多智能体框架

状态图定义
from langgraph import StateGraph, END
from langchain_core.messages import HumanMessage
from typing import TypedDict, List

class AgentState(TypedDict):
    messages: List[HumanMessage]
    current_task: str
    completed_tasks: List[str]
    next_agent: str

def researcher_agent(state: AgentState):
    """研究员智能体"""
    # 执行研究任务
    research_result = conduct_research(state["current_task"])
    
    return {
        "messages": state["messages"] + [HumanMessage(content=research_result)],
        "completed_tasks": state["completed_tasks"] + ["research"],
        "next_agent": "writer"
    }

def writer_agent(state: AgentState):
    """写作智能体"""
    # 基于研究结果写作
    article = write_article(state["messages"][-1].content)
    
    return {
        "messages": state["messages"] + [HumanMessage(content=article)],
        "completed_tasks": state["completed_tasks"] + ["writing"],
        "next_agent": "reviewer"
    }

def reviewer_agent(state: AgentState):
    """审核智能体"""
    # 审核文章质量
    review_result = review_article(state["messages"][-1].content)
    
    if review_result["approved"]:
        return {"next_agent": END}
    else:
        return {"next_agent": "writer"}  # 返回写作智能体修改

# 构建智能体工作流
workflow = StateGraph(AgentState)
workflow.add_node("researcher", researcher_agent)
workflow.add_node("writer", writer_agent)  
workflow.add_node("reviewer", reviewer_agent)

# 定义状态转移
workflow.add_edge("researcher", "writer")
workflow.add_edge("writer", "reviewer")
workflow.add_conditional_edges(
    "reviewer",
    lambda x: x["next_agent"],
    {"writer": "writer", END: END}
)

workflow.set_entry_point("researcher")
app = workflow.compile()
人机协作循环
from langgraph import StateGraph
from langgraph.checkpoint.sqlite import SqliteSaver

class HumanInLoopState(TypedDict):
    messages: List[str]
    human_feedback: str
    requires_approval: bool

def ai_agent(state):
    """AI智能体处理"""
    response = llm.invoke(state["messages"][-1])
    
    # 判断是否需要人工审核
    if contains_sensitive_content(response):
        return {
            "messages": state["messages"] + [response],
            "requires_approval": True
        }
    
    return {
        "messages": state["messages"] + [response],
        "requires_approval": False
    }

def human_review(state):
    """人工审核节点"""
    if state["requires_approval"]:
        # 等待人工审核
        return {"human_feedback": "pending"}
    return {"human_feedback": "approved"}

# 配置检查点保存器
memory = SqliteSaver.from_conn_string(":memory:")

workflow = StateGraph(HumanInLoopState)
workflow.add_node("ai", ai_agent)
workflow.add_node("human", human_review)

workflow.add_conditional_edges(
    "ai",
    lambda x: "human" if x["requires_approval"] else END,
    {"human": "human", END: END}
)

app = workflow.compile(checkpointer=memory)

3. 记忆管理系统

对话记忆压缩
from langchain.memory import ConversationSummaryBufferMemory
from langchain.schema import BaseMessage

class EnhancedConversationMemory:
    def __init__(self, llm, max_token_limit=4000):
        self.memory = ConversationSummaryBufferMemory(
            llm=llm,
            max_token_limit=max_token_limit,
            return_messages=True
        )
        self.conversation_history = []
    
    def add_conversation_turn(self, human_input: str, ai_response: str):
        """添加对话轮次"""
        # 添加到记忆
        self.memory.chat_memory.add_user_message(human_input)
        self.memory.chat_memory.add_ai_message(ai_response)
        
        # 自动压缩历史对话
        if self._should_compress():
            self.memory.prune()
    
    def get_relevant_context(self, current_input: str):
        """获取相关上下文"""
        # 获取当前缓冲区内容
        buffer_messages = self.memory.chat_memory.messages
        
        # 获取压缩摘要
        summary = self.memory.moving_summary_buffer
        
        return {
            "summary": summary,
            "recent_messages": buffer_messages,
            "total_interactions": len(self.conversation_history)
        }
    
    def _should_compress(self):
        """判断是否需要压缩"""
        current_tokens = self.memory.llm.get_num_tokens_from_messages(
            self.memory.chat_memory.messages
        )
        return current_tokens > self.memory.max_token_limit
向量记忆存储
from langchain.vectorstores import Chroma
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.memory import VectorStoreRetrieverMemory

class VectorMemoryManager:
    def __init__(self):
        self.embeddings = OpenAIEmbeddings()
        self.vectorstore = Chroma(embedding_function=self.embeddings)
        
        self.memory = VectorStoreRetrieverMemory(
            vectorstore=self.vectorstore,
            memory_key="relevant_history",
            input_key="input"
        )
    
    def store_interaction(self, user_input: str, ai_response: str, metadata: dict):
        """存储交互记录"""
        interaction_text = f"用户: {user_input}\nAI: {ai_response}"
        
        # 添加到向量存储
        self.vectorstore.add_texts(
            [interaction_text],
            metadatas=[{
                "timestamp": datetime.now().isoformat(),
                "user_id": metadata.get("user_id"),
                "session_id": metadata.get("session_id"),
                "interaction_type": metadata.get("type", "chat")
            }]
        )
    
    def retrieve_relevant_memories(self, query: str, k: int = 5):
        """检索相关记忆"""
        relevant_docs = self.vectorstore.similarity_search(query, k=k)
        return [doc.page_content for doc in relevant_docs]

4. 工具集成系统

MCP协议集成
from langchain_mcp import MCPToolkit

class MCPToolIntegration:
    def __init__(self):
        self.mcp_toolkit = MCPToolkit()
        self.available_tools = {}
    
    def register_mcp_server(self, server_url: str, server_name: str):
        """注册MCP服务器"""
        tools = self.mcp_toolkit.connect_to_server(server_url)
        self.available_tools[server_name] = tools
        
        return f"已连接到 {server_name},获得 {len(tools)} 个工具"
    
    def create_tool_calling_agent(self):
        """创建工具调用智能体"""
        from langchain.agents import create_tool_calling_agent, AgentExecutor
        from langchain_core.prompts import ChatPromptTemplate
        
        # 合并所有可用工具
        all_tools = []
        for server_tools in self.available_tools.values():
            all_tools.extend(server_tools)
        
        # 创建智能体提示
        prompt = ChatPromptTemplate.from_messages([
            ("system", "你是一个有用的助手,可以使用各种工具来帮助用户。"),
            ("human", "{input}"),
            ("placeholder", "{agent_scratchpad}")
        ])
        
        # 创建智能体
        agent = create_tool_calling_agent(
            llm=ChatOpenAI(model="gpt-4"),
            tools=all_tools,
            prompt=prompt
        )
        
        return AgentExecutor(agent=agent, tools=all_tools, verbose=True)

# 使用示例
mcp_integration = MCPToolIntegration()
mcp_integration.register_mcp_server("http://localhost:3001", "database_tools")
mcp_integration.register_mcp_server("http://localhost:3002", "file_tools")

agent_executor = mcp_integration.create_tool_calling_agent()
result = agent_executor.invoke({
    "input": "查询数据库中的用户信息,并将结果保存到文件"
})

实际应用场景

1. RAG系统构建

from langchain.document_loaders import DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Chroma
from langchain.chains import RetrievalQA

class LangChainRAGSystem:
    def __init__(self, documents_path: str):
        self.documents_path = documents_path
        self.setup_rag_pipeline()
    
    def setup_rag_pipeline(self):
        """设置RAG流水线"""
        # 1. 加载文档
        loader = DirectoryLoader(
            self.documents_path,
            glob="**/*.md",
            show_progress=True
        )
        documents = loader.load()
        
        # 2. 文本分割
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            separators=["\n\n", "\n", " ", ""]
        )
        chunks = text_splitter.split_documents(documents)
        
        # 3. 向量化存储
        self.vectorstore = Chroma.from_documents(
            documents=chunks,
            embedding=OpenAIEmbeddings(),
            persist_directory="./chroma_db"
        )
        
        # 4. 构建检索链
        self.qa_chain = RetrievalQA.from_chain_type(
            llm=ChatOpenAI(model="gpt-4"),
            chain_type="stuff",
            retriever=self.vectorstore.as_retriever(
                search_kwargs={"k": 5}
            ),
            return_source_documents=True
        )
    
    def query(self, question: str):
        """查询RAG系统"""
        result = self.qa_chain({"query": question})
        
        return {
            "answer": result["result"],
            "sources": [doc.metadata for doc in result["source_documents"]],
            "confidence": self.calculate_confidence(result)
        }

2. 对话机器人系统

from langchain.chains import ConversationChain
from langchain.prompts import PromptTemplate

class LangChainChatbot:
    def __init__(self):
        self.memory = ConversationSummaryBufferMemory(
            llm=ChatOpenAI(model="gpt-3.5-turbo"),
            max_token_limit=2000,
            return_messages=True
        )
        
        self.setup_conversation_chain()
    
    def setup_conversation_chain(self):
        """设置对话链"""
        template = """
        你是一个有用、友善且知识渊博的AI助手。请基于以下对话历史回答用户的问题。

        对话历史摘要:
        {history}

        当前对话:
        人类:{input}
        AI助手:
        """
        
        prompt = PromptTemplate(
            input_variables=["history", "input"],
            template=template
        )
        
        self.conversation = ConversationChain(
            llm=ChatOpenAI(model="gpt-4"),
            prompt=prompt,
            memory=self.memory,
            verbose=True
        )
    
    def chat(self, user_input: str, user_id: str = None):
        """处理用户对话"""
        try:
            # 获取AI回复
            response = self.conversation.predict(input=user_input)
            
            # 记录对话
            self.log_conversation(user_id, user_input, response)
            
            return {
                "response": response,
                "status": "success",
                "memory_usage": self.get_memory_stats()
            }
        
        except Exception as e:
            return {
                "response": "抱歉,我遇到了一些问题,请稍后再试。",
                "status": "error",
                "error": str(e)
            }
    
    def get_memory_stats(self):
        """获取记忆统计"""
        return {
            "total_messages": len(self.memory.chat_memory.messages),
            "summary_exists": bool(self.memory.moving_summary_buffer)
        }

3. 文档处理工作流

from langchain.chains.summarize import load_summarize_chain
from langchain.chains.mapreduce import MapReduceChain

class DocumentProcessingWorkflow:
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4")
        self.setup_processing_chains()
    
    def setup_processing_chains(self):
        """设置文档处理链"""
        # 摘要链
        self.summary_chain = load_summarize_chain(
            llm=self.llm,
            chain_type="map_reduce",
            verbose=True
        )
        
        # 关键词提取链
        keyword_prompt = PromptTemplate(
            template="从以下文本中提取5个最重要的关键词:\n{text}\n关键词:",
            input_variables=["text"]
        )
        
        self.keyword_chain = LLMChain(
            llm=self.llm,
            prompt=keyword_prompt
        )
    
    def process_document(self, document_path: str):
        """处理文档"""
        # 加载文档
        loader = UnstructuredFileLoader(document_path)
        document = loader.load()[0]
        
        # 文本分割
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=2000,
            chunk_overlap=200
        )
        chunks = text_splitter.split_documents([document])
        
        # 生成摘要
        summary = self.summary_chain.run(chunks)
        
        # 提取关键词
        keywords = self.keyword_chain.run(document.page_content[:2000])
        
        # 情感分析
        sentiment = self.analyze_sentiment(document.page_content)
        
        return {
            "summary": summary,
            "keywords": keywords.split(", "),
            "sentiment": sentiment,
            "word_count": len(document.page_content.split()),
            "processing_time": time.time() - start_time
        }

部署与优化

1. 生产环境配置

# langchain_config.yaml
langchain:
  # LLM配置
  llm:
    provider: "openai"
    model: "gpt-4"
    temperature: 0.7
    max_tokens: 2000
    request_timeout: 30
    
  # 缓存配置
  cache:
    type: "redis"
    url: "redis://localhost:6379"
    ttl: 3600
    
  # 回调配置
  callbacks:
    - langsmith_tracer
    - wandb_tracer
    - custom_logger
    
  # 错误处理
  retry:
    max_retries: 3
    backoff_factor: 2
    
  # 并发限制
  concurrency:
    max_concurrent_requests: 10
    rate_limit: "100/minute"

2. 性能优化策略

from langchain.cache import RedisCache
from langchain.globals import set_llm_cache
import langchain

# 启用缓存
set_llm_cache(RedisCache(redis_url="redis://localhost:6379"))

# 启用调试
langchain.debug = True
langchain.verbose = True

# 自定义回调
class PerformanceCallback(BaseCallbackHandler):
    def on_llm_start(self, serialized, prompts, **kwargs):
        self.start_time = time.time()
        
    def on_llm_end(self, response, **kwargs):
        duration = time.time() - self.start_time
        self.log_performance_metrics(duration, response)
    
    def log_performance_metrics(self, duration, response):
        metrics = {
            "duration": duration,
            "token_count": response.llm_output.get("token_usage", {}),
            "model": response.llm_output.get("model_name"),
            "timestamp": datetime.now().isoformat()
        }
        # 发送到监控系统
        self.send_to_monitoring(metrics)

最佳实践建议

1. 开发模式选择

  • 原型开发:使用LCEL快速构建MVP
  • 生产应用:结合LangGraph构建复杂工作流
  • 企业级:集成监控、缓存、错误处理

2. 性能优化

  • 缓存策略:启用LLM响应缓存
  • 批处理:合并多个请求降低延迟
  • 异步处理:使用async/await提高并发

3. 错误处理

  • 重试机制:配置指数退避重试
  • 降级方案:准备备用模型和响应
  • 监控告警:实时监控链执行状态

相关概念

  • AI Agent - LangGraph构建的智能体系统
  • 函数调用 - 工具集成和外部API调用
  • RAG - 检索增强生成系统实现

延伸阅读