概念定义

LlamaIndex是专为LLM应用设计的数据框架,从最初的RAG(检索增强生成)框架演进为多智能体编排平台,提供企业级数据连接、向量存储、知识图谱和智能体工作流的统一解决方案。

详细解释

LlamaIndex在2025年完成了从RAG框架到多智能体框架的重大转型,重新定义了企业AI应用的构建方式。框架的核心价值在于将各种数据源(文档、数据库、API、知识图谱)与LLM能力无缝连接,通过AgentWorkflow系统实现复杂的多智能体协作。 2025年版本的LlamaIndex不再仅仅是检索增强生成工具,而是一个完整的企业AI基础设施,支持多模态数据处理、状态管理、工作流编排等高级功能。平台通过200+数据连接器、50+向量数据库集成,以及先进的查询优化引擎,为企业提供了构建知识助手的完整工具链。

核心架构体系

1. AgentWorkflow编排系统

多智能体协作框架
from llama_index.core.workflow import Workflow, step, Context
from llama_index.core.agent import ReActAgent
from llama_index.core.tools import QueryEngineTool
from llama_index.llms.openai import OpenAI

class ResearchWorkflow(Workflow):
    def __init__(self):
        super().__init__()
        self.llm = OpenAI(model="gpt-4")
        self.setup_agents()
    
    def setup_agents(self):
        """设置专门化智能体"""
        # 研究员智能体
        self.researcher = ReActAgent.from_tools(
            tools=[QueryEngineTool(self.create_query_engine("research_docs"))],
            llm=self.llm,
            system_prompt="""你是一个专业研究员,负责:
            1. 深度分析技术文档
            2. 提取关键洞察和数据
            3. 生成研究报告摘要
            请保持客观和准确性。"""
        )
        
        # 写作智能体
        self.writer = ReActAgent.from_tools(
            tools=[QueryEngineTool(self.create_query_engine("style_guide"))],
            llm=self.llm,
            system_prompt="""你是一个专业技术写手,负责:
            1. 基于研究结果创作内容
            2. 确保内容结构清晰
            3. 遵循企业写作风格
            请生成高质量的技术文档。"""
        )
    
    @step
    async def research_phase(self, ctx: Context, query: str):
        """研究阶段"""
        ctx.data["original_query"] = query
        
        # 研究员智能体执行研究
        research_result = await self.researcher.achat(
            f"请对以下主题进行深入研究:{query}"
        )
        
        ctx.data["research_findings"] = research_result.response
        return research_result
    
    @step
    async def writing_phase(self, ctx: Context, research_result):
        """写作阶段"""
        research_findings = ctx.data["research_findings"]
        
        # 写作智能体基于研究结果创作
        writing_result = await self.writer.achat(
            f"""基于以下研究结果,创作一份专业报告:
            
            研究发现:{research_findings}
            
            要求:
            1. 结构清晰,逻辑严谨
            2. 包含执行摘要和详细分析
            3. 提供可行的建议
            """
        )
        
        ctx.data["final_report"] = writing_result.response
        return writing_result
    
    @step  
    async def review_phase(self, ctx: Context, writing_result):
        """审核阶段"""
        # 质量检查和优化
        final_report = ctx.data["final_report"]
        
        # 可以添加更多的审核逻辑
        quality_score = self.assess_report_quality(final_report)
        
        if quality_score < 0.8:
            # 返回修改建议
            return await self.writing_phase(ctx, writing_result)
        
        return {
            "report": final_report,
            "quality_score": quality_score,
            "metadata": {
                "research_findings": ctx.data["research_findings"],
                "original_query": ctx.data["original_query"]
            }
        }

# 使用工作流
workflow = ResearchWorkflow()
result = await workflow.run(query="AI在金融领域的最新应用")
状态管理和上下文
from llama_index.core.workflow import Context
from pydantic import BaseModel, Field
from typing import Dict, Any, List

class WorkflowState(BaseModel):
    """工作流状态模型"""
    task_id: str = Field(..., description="任务唯一标识")
    current_step: str = Field(default="init", description="当前步骤")
    data_sources: List[str] = Field(default_factory=list, description="数据源列表")
    intermediate_results: Dict[str, Any] = Field(default_factory=dict, description="中间结果")
    error_history: List[Dict] = Field(default_factory=list, description="错误历史")
    
class StatefulWorkflow(Workflow):
    def __init__(self):
        super().__init__()
        self.state_store = {}  # 可以替换为Redis等持久化存储
    
    async def save_state(self, ctx: Context, state: WorkflowState):
        """保存工作流状态"""
        self.state_store[state.task_id] = state.dict()
        ctx.data["workflow_state"] = state
    
    async def restore_state(self, ctx: Context, task_id: str) -> WorkflowState:
        """恢复工作流状态"""
        if task_id in self.state_store:
            state_dict = self.state_store[task_id]
            state = WorkflowState(**state_dict)
            ctx.data["workflow_state"] = state
            return state
        return WorkflowState(task_id=task_id)
    
    @step
    async def atomic_update(self, ctx: Context, key: str, value: Any):
        """原子更新状态"""
        state = ctx.data.get("workflow_state")
        if state:
            # 使用Pydantic验证
            state.intermediate_results[key] = value
            await self.save_state(ctx, state)
        
        return value

2. 多模态数据处理

统一多模态索引
from llama_index.core.indices import MultiModalVectorStoreIndex
from llama_index.core.schema import ImageDocument, TextNode
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.multi_modal_llms.openai import OpenAIMultiModal
import chromadb

class MultiModalRAGSystem:
    def __init__(self):
        self.setup_vector_stores()
        self.setup_multimodal_llm()
        self.setup_index()
    
    def setup_vector_stores(self):
        """设置分离的向量存储"""
        # 文本向量存储
        text_client = chromadb.PersistentClient(path="./text_vectordb")
        text_collection = text_client.get_or_create_collection("text_documents")
        self.text_vector_store = ChromaVectorStore(chroma_collection=text_collection)
        
        # 图像向量存储
        image_client = chromadb.PersistentClient(path="./image_vectordb")  
        image_collection = image_client.get_or_create_collection("image_documents")
        self.image_vector_store = ChromaVectorStore(chroma_collection=image_collection)
    
    def setup_multimodal_llm(self):
        """设置多模态LLM"""
        self.multimodal_llm = OpenAIMultiModal(
            model="gpt-4-vision-preview",
            max_new_tokens=1000,
            temperature=0.1
        )
    
    def setup_index(self):
        """创建多模态索引"""
        self.index = MultiModalVectorStoreIndex(
            text_vector_store=self.text_vector_store,
            image_vector_store=self.image_vector_store,
            multimodal_llm=self.multimodal_llm
        )
    
    def ingest_documents(self, document_path: str):
        """摄取多模态文档"""
        from llama_index.readers.file import PDFReader
        from llama_index.core.node_parser import SentenceSplitter
        
        # 解析PDF文档(文本+图像)
        reader = PDFReader(extract_images=True)
        documents = reader.load_data(document_path)
        
        text_nodes = []
        image_documents = []
        
        for doc in documents:
            if hasattr(doc, 'image'):
                # 图像文档
                image_doc = ImageDocument(
                    image_path=doc.image_path,
                    text=doc.text or "",
                    metadata=doc.metadata
                )
                image_documents.append(image_doc)
            else:
                # 文本文档分割
                splitter = SentenceSplitter(chunk_size=512, chunk_overlap=50)
                nodes = splitter.get_nodes_from_documents([doc])
                text_nodes.extend(nodes)
        
        # 插入到索引
        self.index.insert_nodes(text_nodes)
        self.index.insert_images(image_documents)
        
        return {
            "text_nodes": len(text_nodes),
            "image_documents": len(image_documents)
        }
    
    def multimodal_query(self, query: str, query_type="text_to_multimodal"):
        """多模态查询"""
        from llama_index.core.retrievers import MultiModalRetriever
        from llama_index.core.query_engine import SimpleMultiModalQueryEngine
        
        # 创建多模态检索器
        retriever = MultiModalRetriever(
            index=self.index,
            similarity_top_k=5,
            image_similarity_top_k=3
        )
        
        # 创建查询引擎
        query_engine = SimpleMultiModalQueryEngine(
            retriever=retriever,
            multi_modal_llm=self.multimodal_llm
        )
        
        # 执行查询
        response = query_engine.query(query)
        
        return {
            "response": response.response,
            "source_nodes": [
                {
                    "text": node.text,
                    "image_path": getattr(node, 'image_path', None),
                    "score": node.score,
                    "metadata": node.metadata
                }
                for node in response.source_nodes
            ]
        }

# 使用示例
rag_system = MultiModalRAGSystem()
result = rag_system.ingest_documents("./company_reports.pdf")
response = rag_system.multimodal_query("展示销售数据的图表,并解释趋势")

3. 高级RAG策略

知识图谱增强RAG
from llama_index.graph_stores.neo4j import Neo4jGraphStore
from llama_index.core.indices import KnowledgeGraphIndex
from llama_index.core.query_engine import KnowledgeGraphQueryEngine

class GraphRAGSystem:
    def __init__(self, neo4j_uri, username, password):
        self.setup_graph_store(neo4j_uri, username, password)
        self.setup_kg_index()
    
    def setup_graph_store(self, uri, username, password):
        """设置Neo4j图存储"""
        self.graph_store = Neo4jGraphStore(
            username=username,
            password=password,
            url=uri,
            database="neo4j"
        )
    
    def setup_kg_index(self):
        """设置知识图谱索引"""
        self.kg_index = KnowledgeGraphIndex.from_documents(
            documents=[],  # 将通过add_documents添加
            graph_store=self.graph_store,
            max_triplets_per_chunk=10,
            include_embeddings=True
        )
    
    def build_knowledge_graph(self, documents):
        """构建知识图谱"""
        # 添加文档到知识图谱
        self.kg_index.insert_documents(documents)
        
        # 提取实体和关系
        entities = self.kg_index.get_entities()
        relationships = self.kg_index.get_relationships()
        
        return {
            "entities_count": len(entities),
            "relationships_count": len(relationships),
            "graph_stats": self.get_graph_statistics()
        }
    
    def hybrid_query(self, query: str, query_mode="hybrid"):
        """混合查询(向量+图谱)"""
        # 配置多种查询策略
        query_engines = {
            "vector": self.kg_index.as_query_engine(
                query_mode="embedding",
                similarity_top_k=10
            ),
            "keyword": self.kg_index.as_query_engine(
                query_mode="keyword", 
                similarity_top_k=10
            ),
            "hybrid": self.kg_index.as_query_engine(
                query_mode="hybrid",
                similarity_top_k=10
            ),
            "kg": KnowledgeGraphQueryEngine(
                graph_store=self.graph_store,
                query_mode="cypher"
            )
        }
        
        # 执行查询
        results = {}
        for mode, engine in query_engines.items():
            try:
                response = engine.query(query)
                results[mode] = {
                    "response": response.response,
                    "source_nodes": [
                        {
                            "text": node.text,
                            "score": getattr(node, 'score', 0),
                            "metadata": node.metadata
                        }
                        for node in response.source_nodes
                    ]
                }
            except Exception as e:
                results[mode] = {"error": str(e)}
        
        return results
    
    def get_graph_statistics(self):
        """获取图谱统计信息"""
        cypher_queries = {
            "node_count": "MATCH (n) RETURN count(n) as count",
            "relationship_count": "MATCH ()-[r]->() RETURN count(r) as count",
            "node_types": "MATCH (n) RETURN labels(n) as types, count(n) as count",
            "relationship_types": "MATCH ()-[r]->() RETURN type(r) as type, count(r) as count"
        }
        
        stats = {}
        for key, query in cypher_queries.items():
            try:
                result = self.graph_store.query(query)
                stats[key] = result
            except Exception as e:
                stats[key] = f"Error: {e}"
        
        return stats

4. 企业级数据连接

200+数据连接器集成
from llama_index.readers.database import DatabaseReader
from llama_index.readers.s3 import S3Reader
from llama_index.readers.notion import NotionPageReader
from llama_index.readers.slack import SlackReader
from llama_index.readers.github import GithubRepositoryReader

class EnterpriseDataHub:
    def __init__(self):
        self.data_connectors = {}
        self.setup_connectors()
    
    def setup_connectors(self):
        """设置企业数据连接器"""
        self.data_connectors = {
            # 数据库连接器
            "database": DatabaseReader(
                sql_database=self.create_sql_connection()
            ),
            
            # 云存储连接器
            "s3": S3Reader(
                bucket="company-documents",
                prefix="knowledge-base/"
            ),
            
            # 协作工具连接器
            "notion": NotionPageReader(
                integration_token="your-notion-token"
            ),
            
            "slack": SlackReader(
                slack_token="your-slack-token",
                earliest_date="2024-01-01"
            ),
            
            # 代码仓库连接器
            "github": GithubRepositoryReader(
                github_token="your-github-token",
                owner="your-org",
                repo="knowledge-repo"
            )
        }
    
    def unified_data_ingestion(self, source_configs):
        """统一数据摄取"""
        all_documents = []
        
        for source_name, config in source_configs.items():
            try:
                connector = self.data_connectors[source_name]
                
                if source_name == "database":
                    documents = connector.load_data(
                        query=config.get("query", "SELECT * FROM knowledge_articles")
                    )
                elif source_name == "s3":
                    documents = connector.load_data(
                        prefix=config.get("prefix", "")
                    )
                elif source_name == "notion":
                    documents = connector.load_data(
                        page_ids=config.get("page_ids", [])
                    )
                elif source_name == "slack":
                    documents = connector.load_data(
                        channel_ids=config.get("channel_ids", []),
                        reverse_chronological=True
                    )
                elif source_name == "github":
                    documents = connector.load_data(
                        branch="main",
                        filter_directories=config.get("directories", ["docs/"])
                    )
                
                # 添加数据源标记
                for doc in documents:
                    doc.metadata["source"] = source_name
                    doc.metadata["ingestion_time"] = datetime.now().isoformat()
                
                all_documents.extend(documents)
                
            except Exception as e:
                print(f"Error ingesting from {source_name}: {e}")
        
        return all_documents
    
    def create_unified_index(self, documents):
        """创建统一索引"""
        from llama_index.core import VectorStoreIndex, StorageContext
        from llama_index.vector_stores.pinecone import PineconeVectorStore
        
        # 使用Pinecone作为向量存储
        vector_store = PineconeVectorStore(
            pinecone_index=self.create_pinecone_index(),
            namespace="enterprise_knowledge"
        )
        
        storage_context = StorageContext.from_defaults(
            vector_store=vector_store
        )
        
        # 创建索引
        index = VectorStoreIndex.from_documents(
            documents,
            storage_context=storage_context
        )
        
        return index

高级查询优化

1. 智能查询路由

from llama_index.core.query_engine import RouterQueryEngine
from llama_index.core.selectors import LLMSingleSelector
from llama_index.core.tools import QueryEngineTool

class IntelligentQueryRouter:
    def __init__(self, indices):
        self.indices = indices
        self.setup_query_engines()
        self.setup_router()
    
    def setup_query_engines(self):
        """设置专门化查询引擎"""
        self.query_engines = {
            "vector_search": self.indices["vector"].as_query_engine(
                similarity_top_k=10,
                response_mode="tree_summarize"
            ),
            "keyword_search": self.indices["vector"].as_query_engine(
                similarity_top_k=15,
                response_mode="accumulate"
            ),
            "sql_search": self.indices["database"].as_query_engine(),
            "graph_search": self.indices["knowledge_graph"].as_query_engine(
                query_mode="cypher"
            )
        }
    
    def setup_router(self):
        """设置智能路由器"""
        query_engine_tools = [
            QueryEngineTool(
                query_engine=self.query_engines["vector_search"],
                metadata=ToolMetadata(
                    name="vector_search",
                    description="用于语义相似性搜索,适合概念性问题和内容理解"
                )
            ),
            QueryEngineTool(
                query_engine=self.query_engines["sql_search"],
                metadata=ToolMetadata(
                    name="sql_search",
                    description="用于结构化数据查询,适合统计和数据分析问题"
                )
            ),
            QueryEngineTool(
                query_engine=self.query_engines["graph_search"],
                metadata=ToolMetadata(
                    name="graph_search",
                    description="用于实体关系查询,适合复杂关联和推理问题"
                )
            )
        ]
        
        self.router = RouterQueryEngine(
            selector=LLMSingleSelector.from_defaults(),
            query_engine_tools=query_engine_tools
        )
    
    def intelligent_query(self, query: str):
        """智能查询执行"""
        # 执行路由查询
        response = self.router.query(query)
        
        return {
            "response": response.response,
            "selected_engine": response.metadata.get("selector_result"),
            "source_nodes": [
                {
                    "text": node.text,
                    "score": getattr(node, 'score', 0),
                    "metadata": node.metadata
                }
                for node in response.source_nodes
            ]
        }

最佳实践指南

1. 性能优化策略

from llama_index.core import Settings
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI

class LlamaIndexOptimizer:
    def __init__(self):
        self.setup_global_settings()
        self.setup_caching()
    
    def setup_global_settings(self):
        """设置全局配置"""
        # LLM配置
        Settings.llm = OpenAI(
            model="gpt-4",
            temperature=0.1,
            max_tokens=2000,
            timeout=30,
            max_retries=3
        )
        
        # 嵌入模型配置
        Settings.embed_model = OpenAIEmbedding(
            model="text-embedding-3-large",
            dimensions=1536
        )
        
        # 分块配置
        Settings.chunk_size = 1024
        Settings.chunk_overlap = 200
        Settings.context_window = 4096
    
    def setup_caching(self):
        """设置缓存策略"""
        from llama_index.core.callbacks import CallbackManager
        from llama_index.callbacks.aim import AimCallback
        
        # 启用缓存
        import llama_index.core
        llama_index.core.global_handler = "simple"
        
        # 设置回调管理器
        Settings.callback_manager = CallbackManager([
            AimCallback()
        ])
    
    def batch_processing(self, documents, batch_size=50):
        """批量处理优化"""
        processed_docs = []
        
        for i in range(0, len(documents), batch_size):
            batch = documents[i:i + batch_size]
            
            # 批量嵌入
            batch_embeddings = Settings.embed_model.get_text_embedding_batch(
                [doc.text for doc in batch]
            )
            
            for doc, embedding in zip(batch, batch_embeddings):
                doc.embedding = embedding
                processed_docs.append(doc)
        
        return processed_docs

2. 部署和监控

from llama_index.core.callbacks import CBEventType, EventPayload
import logging
import time

class ProductionDeployment:
    def __init__(self):
        self.setup_logging()
        self.setup_monitoring()
    
    def setup_logging(self):
        """设置生产环境日志"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('llamaindex.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def setup_monitoring(self):
        """设置监控回调"""
        from llama_index.core.callbacks import BaseCallbackHandler
        
        class ProductionCallback(BaseCallbackHandler):
            def __init__(self, logger):
                super().__init__(event_starts_to_ignore=[], event_ends_to_ignore=[])
                self.logger = logger
                self.start_times = {}
            
            def on_event_start(self, event_type: CBEventType, payload: EventPayload, **kwargs):
                self.start_times[event_type] = time.time()
                self.logger.info(f"Event started: {event_type}")
            
            def on_event_end(self, event_type: CBEventType, payload: EventPayload, **kwargs):
                if event_type in self.start_times:
                    duration = time.time() - self.start_times[event_type]
                    self.logger.info(f"Event completed: {event_type}, Duration: {duration:.2f}s")
                    
                    # 记录性能指标
                    if event_type == CBEventType.QUERY:
                        self.log_query_metrics(payload, duration)
            
            def log_query_metrics(self, payload, duration):
                """记录查询指标"""
                self.logger.info(f"Query metrics: duration={duration:.2f}s")
        
        Settings.callback_manager.add_handler(ProductionCallback(self.logger))

相关概念

延伸阅读