企业级数据连接框架,从RAG系统到多智能体工作流编排平台
from llama_index.core.workflow import Workflow, step, Context
from llama_index.core.agent import ReActAgent
from llama_index.core.tools import QueryEngineTool
from llama_index.llms.openai import OpenAI
class ResearchWorkflow(Workflow):
def __init__(self):
super().__init__()
self.llm = OpenAI(model="gpt-4")
self.setup_agents()
def setup_agents(self):
"""设置专门化智能体"""
# 研究员智能体
self.researcher = ReActAgent.from_tools(
tools=[QueryEngineTool(self.create_query_engine("research_docs"))],
llm=self.llm,
system_prompt="""你是一个专业研究员,负责:
1. 深度分析技术文档
2. 提取关键洞察和数据
3. 生成研究报告摘要
请保持客观和准确性。"""
)
# 写作智能体
self.writer = ReActAgent.from_tools(
tools=[QueryEngineTool(self.create_query_engine("style_guide"))],
llm=self.llm,
system_prompt="""你是一个专业技术写手,负责:
1. 基于研究结果创作内容
2. 确保内容结构清晰
3. 遵循企业写作风格
请生成高质量的技术文档。"""
)
@step
async def research_phase(self, ctx: Context, query: str):
"""研究阶段"""
ctx.data["original_query"] = query
# 研究员智能体执行研究
research_result = await self.researcher.achat(
f"请对以下主题进行深入研究:{query}"
)
ctx.data["research_findings"] = research_result.response
return research_result
@step
async def writing_phase(self, ctx: Context, research_result):
"""写作阶段"""
research_findings = ctx.data["research_findings"]
# 写作智能体基于研究结果创作
writing_result = await self.writer.achat(
f"""基于以下研究结果,创作一份专业报告:
研究发现:{research_findings}
要求:
1. 结构清晰,逻辑严谨
2. 包含执行摘要和详细分析
3. 提供可行的建议
"""
)
ctx.data["final_report"] = writing_result.response
return writing_result
@step
async def review_phase(self, ctx: Context, writing_result):
"""审核阶段"""
# 质量检查和优化
final_report = ctx.data["final_report"]
# 可以添加更多的审核逻辑
quality_score = self.assess_report_quality(final_report)
if quality_score < 0.8:
# 返回修改建议
return await self.writing_phase(ctx, writing_result)
return {
"report": final_report,
"quality_score": quality_score,
"metadata": {
"research_findings": ctx.data["research_findings"],
"original_query": ctx.data["original_query"]
}
}
# 使用工作流
workflow = ResearchWorkflow()
result = await workflow.run(query="AI在金融领域的最新应用")
from llama_index.core.workflow import Context
from pydantic import BaseModel, Field
from typing import Dict, Any, List
class WorkflowState(BaseModel):
"""工作流状态模型"""
task_id: str = Field(..., description="任务唯一标识")
current_step: str = Field(default="init", description="当前步骤")
data_sources: List[str] = Field(default_factory=list, description="数据源列表")
intermediate_results: Dict[str, Any] = Field(default_factory=dict, description="中间结果")
error_history: List[Dict] = Field(default_factory=list, description="错误历史")
class StatefulWorkflow(Workflow):
def __init__(self):
super().__init__()
self.state_store = {} # 可以替换为Redis等持久化存储
async def save_state(self, ctx: Context, state: WorkflowState):
"""保存工作流状态"""
self.state_store[state.task_id] = state.dict()
ctx.data["workflow_state"] = state
async def restore_state(self, ctx: Context, task_id: str) -> WorkflowState:
"""恢复工作流状态"""
if task_id in self.state_store:
state_dict = self.state_store[task_id]
state = WorkflowState(**state_dict)
ctx.data["workflow_state"] = state
return state
return WorkflowState(task_id=task_id)
@step
async def atomic_update(self, ctx: Context, key: str, value: Any):
"""原子更新状态"""
state = ctx.data.get("workflow_state")
if state:
# 使用Pydantic验证
state.intermediate_results[key] = value
await self.save_state(ctx, state)
return value
from llama_index.core.indices import MultiModalVectorStoreIndex
from llama_index.core.schema import ImageDocument, TextNode
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.multi_modal_llms.openai import OpenAIMultiModal
import chromadb
class MultiModalRAGSystem:
def __init__(self):
self.setup_vector_stores()
self.setup_multimodal_llm()
self.setup_index()
def setup_vector_stores(self):
"""设置分离的向量存储"""
# 文本向量存储
text_client = chromadb.PersistentClient(path="./text_vectordb")
text_collection = text_client.get_or_create_collection("text_documents")
self.text_vector_store = ChromaVectorStore(chroma_collection=text_collection)
# 图像向量存储
image_client = chromadb.PersistentClient(path="./image_vectordb")
image_collection = image_client.get_or_create_collection("image_documents")
self.image_vector_store = ChromaVectorStore(chroma_collection=image_collection)
def setup_multimodal_llm(self):
"""设置多模态LLM"""
self.multimodal_llm = OpenAIMultiModal(
model="gpt-4-vision-preview",
max_new_tokens=1000,
temperature=0.1
)
def setup_index(self):
"""创建多模态索引"""
self.index = MultiModalVectorStoreIndex(
text_vector_store=self.text_vector_store,
image_vector_store=self.image_vector_store,
multimodal_llm=self.multimodal_llm
)
def ingest_documents(self, document_path: str):
"""摄取多模态文档"""
from llama_index.readers.file import PDFReader
from llama_index.core.node_parser import SentenceSplitter
# 解析PDF文档(文本+图像)
reader = PDFReader(extract_images=True)
documents = reader.load_data(document_path)
text_nodes = []
image_documents = []
for doc in documents:
if hasattr(doc, 'image'):
# 图像文档
image_doc = ImageDocument(
image_path=doc.image_path,
text=doc.text or "",
metadata=doc.metadata
)
image_documents.append(image_doc)
else:
# 文本文档分割
splitter = SentenceSplitter(chunk_size=512, chunk_overlap=50)
nodes = splitter.get_nodes_from_documents([doc])
text_nodes.extend(nodes)
# 插入到索引
self.index.insert_nodes(text_nodes)
self.index.insert_images(image_documents)
return {
"text_nodes": len(text_nodes),
"image_documents": len(image_documents)
}
def multimodal_query(self, query: str, query_type="text_to_multimodal"):
"""多模态查询"""
from llama_index.core.retrievers import MultiModalRetriever
from llama_index.core.query_engine import SimpleMultiModalQueryEngine
# 创建多模态检索器
retriever = MultiModalRetriever(
index=self.index,
similarity_top_k=5,
image_similarity_top_k=3
)
# 创建查询引擎
query_engine = SimpleMultiModalQueryEngine(
retriever=retriever,
multi_modal_llm=self.multimodal_llm
)
# 执行查询
response = query_engine.query(query)
return {
"response": response.response,
"source_nodes": [
{
"text": node.text,
"image_path": getattr(node, 'image_path', None),
"score": node.score,
"metadata": node.metadata
}
for node in response.source_nodes
]
}
# 使用示例
rag_system = MultiModalRAGSystem()
result = rag_system.ingest_documents("./company_reports.pdf")
response = rag_system.multimodal_query("展示销售数据的图表,并解释趋势")
from llama_index.graph_stores.neo4j import Neo4jGraphStore
from llama_index.core.indices import KnowledgeGraphIndex
from llama_index.core.query_engine import KnowledgeGraphQueryEngine
class GraphRAGSystem:
def __init__(self, neo4j_uri, username, password):
self.setup_graph_store(neo4j_uri, username, password)
self.setup_kg_index()
def setup_graph_store(self, uri, username, password):
"""设置Neo4j图存储"""
self.graph_store = Neo4jGraphStore(
username=username,
password=password,
url=uri,
database="neo4j"
)
def setup_kg_index(self):
"""设置知识图谱索引"""
self.kg_index = KnowledgeGraphIndex.from_documents(
documents=[], # 将通过add_documents添加
graph_store=self.graph_store,
max_triplets_per_chunk=10,
include_embeddings=True
)
def build_knowledge_graph(self, documents):
"""构建知识图谱"""
# 添加文档到知识图谱
self.kg_index.insert_documents(documents)
# 提取实体和关系
entities = self.kg_index.get_entities()
relationships = self.kg_index.get_relationships()
return {
"entities_count": len(entities),
"relationships_count": len(relationships),
"graph_stats": self.get_graph_statistics()
}
def hybrid_query(self, query: str, query_mode="hybrid"):
"""混合查询(向量+图谱)"""
# 配置多种查询策略
query_engines = {
"vector": self.kg_index.as_query_engine(
query_mode="embedding",
similarity_top_k=10
),
"keyword": self.kg_index.as_query_engine(
query_mode="keyword",
similarity_top_k=10
),
"hybrid": self.kg_index.as_query_engine(
query_mode="hybrid",
similarity_top_k=10
),
"kg": KnowledgeGraphQueryEngine(
graph_store=self.graph_store,
query_mode="cypher"
)
}
# 执行查询
results = {}
for mode, engine in query_engines.items():
try:
response = engine.query(query)
results[mode] = {
"response": response.response,
"source_nodes": [
{
"text": node.text,
"score": getattr(node, 'score', 0),
"metadata": node.metadata
}
for node in response.source_nodes
]
}
except Exception as e:
results[mode] = {"error": str(e)}
return results
def get_graph_statistics(self):
"""获取图谱统计信息"""
cypher_queries = {
"node_count": "MATCH (n) RETURN count(n) as count",
"relationship_count": "MATCH ()-[r]->() RETURN count(r) as count",
"node_types": "MATCH (n) RETURN labels(n) as types, count(n) as count",
"relationship_types": "MATCH ()-[r]->() RETURN type(r) as type, count(r) as count"
}
stats = {}
for key, query in cypher_queries.items():
try:
result = self.graph_store.query(query)
stats[key] = result
except Exception as e:
stats[key] = f"Error: {e}"
return stats
from llama_index.readers.database import DatabaseReader
from llama_index.readers.s3 import S3Reader
from llama_index.readers.notion import NotionPageReader
from llama_index.readers.slack import SlackReader
from llama_index.readers.github import GithubRepositoryReader
class EnterpriseDataHub:
def __init__(self):
self.data_connectors = {}
self.setup_connectors()
def setup_connectors(self):
"""设置企业数据连接器"""
self.data_connectors = {
# 数据库连接器
"database": DatabaseReader(
sql_database=self.create_sql_connection()
),
# 云存储连接器
"s3": S3Reader(
bucket="company-documents",
prefix="knowledge-base/"
),
# 协作工具连接器
"notion": NotionPageReader(
integration_token="your-notion-token"
),
"slack": SlackReader(
slack_token="your-slack-token",
earliest_date="2024-01-01"
),
# 代码仓库连接器
"github": GithubRepositoryReader(
github_token="your-github-token",
owner="your-org",
repo="knowledge-repo"
)
}
def unified_data_ingestion(self, source_configs):
"""统一数据摄取"""
all_documents = []
for source_name, config in source_configs.items():
try:
connector = self.data_connectors[source_name]
if source_name == "database":
documents = connector.load_data(
query=config.get("query", "SELECT * FROM knowledge_articles")
)
elif source_name == "s3":
documents = connector.load_data(
prefix=config.get("prefix", "")
)
elif source_name == "notion":
documents = connector.load_data(
page_ids=config.get("page_ids", [])
)
elif source_name == "slack":
documents = connector.load_data(
channel_ids=config.get("channel_ids", []),
reverse_chronological=True
)
elif source_name == "github":
documents = connector.load_data(
branch="main",
filter_directories=config.get("directories", ["docs/"])
)
# 添加数据源标记
for doc in documents:
doc.metadata["source"] = source_name
doc.metadata["ingestion_time"] = datetime.now().isoformat()
all_documents.extend(documents)
except Exception as e:
print(f"Error ingesting from {source_name}: {e}")
return all_documents
def create_unified_index(self, documents):
"""创建统一索引"""
from llama_index.core import VectorStoreIndex, StorageContext
from llama_index.vector_stores.pinecone import PineconeVectorStore
# 使用Pinecone作为向量存储
vector_store = PineconeVectorStore(
pinecone_index=self.create_pinecone_index(),
namespace="enterprise_knowledge"
)
storage_context = StorageContext.from_defaults(
vector_store=vector_store
)
# 创建索引
index = VectorStoreIndex.from_documents(
documents,
storage_context=storage_context
)
return index
from llama_index.core.query_engine import RouterQueryEngine
from llama_index.core.selectors import LLMSingleSelector
from llama_index.core.tools import QueryEngineTool
class IntelligentQueryRouter:
def __init__(self, indices):
self.indices = indices
self.setup_query_engines()
self.setup_router()
def setup_query_engines(self):
"""设置专门化查询引擎"""
self.query_engines = {
"vector_search": self.indices["vector"].as_query_engine(
similarity_top_k=10,
response_mode="tree_summarize"
),
"keyword_search": self.indices["vector"].as_query_engine(
similarity_top_k=15,
response_mode="accumulate"
),
"sql_search": self.indices["database"].as_query_engine(),
"graph_search": self.indices["knowledge_graph"].as_query_engine(
query_mode="cypher"
)
}
def setup_router(self):
"""设置智能路由器"""
query_engine_tools = [
QueryEngineTool(
query_engine=self.query_engines["vector_search"],
metadata=ToolMetadata(
name="vector_search",
description="用于语义相似性搜索,适合概念性问题和内容理解"
)
),
QueryEngineTool(
query_engine=self.query_engines["sql_search"],
metadata=ToolMetadata(
name="sql_search",
description="用于结构化数据查询,适合统计和数据分析问题"
)
),
QueryEngineTool(
query_engine=self.query_engines["graph_search"],
metadata=ToolMetadata(
name="graph_search",
description="用于实体关系查询,适合复杂关联和推理问题"
)
)
]
self.router = RouterQueryEngine(
selector=LLMSingleSelector.from_defaults(),
query_engine_tools=query_engine_tools
)
def intelligent_query(self, query: str):
"""智能查询执行"""
# 执行路由查询
response = self.router.query(query)
return {
"response": response.response,
"selected_engine": response.metadata.get("selector_result"),
"source_nodes": [
{
"text": node.text,
"score": getattr(node, 'score', 0),
"metadata": node.metadata
}
for node in response.source_nodes
]
}
from llama_index.core import Settings
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
class LlamaIndexOptimizer:
def __init__(self):
self.setup_global_settings()
self.setup_caching()
def setup_global_settings(self):
"""设置全局配置"""
# LLM配置
Settings.llm = OpenAI(
model="gpt-4",
temperature=0.1,
max_tokens=2000,
timeout=30,
max_retries=3
)
# 嵌入模型配置
Settings.embed_model = OpenAIEmbedding(
model="text-embedding-3-large",
dimensions=1536
)
# 分块配置
Settings.chunk_size = 1024
Settings.chunk_overlap = 200
Settings.context_window = 4096
def setup_caching(self):
"""设置缓存策略"""
from llama_index.core.callbacks import CallbackManager
from llama_index.callbacks.aim import AimCallback
# 启用缓存
import llama_index.core
llama_index.core.global_handler = "simple"
# 设置回调管理器
Settings.callback_manager = CallbackManager([
AimCallback()
])
def batch_processing(self, documents, batch_size=50):
"""批量处理优化"""
processed_docs = []
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
# 批量嵌入
batch_embeddings = Settings.embed_model.get_text_embedding_batch(
[doc.text for doc in batch]
)
for doc, embedding in zip(batch, batch_embeddings):
doc.embedding = embedding
processed_docs.append(doc)
return processed_docs
from llama_index.core.callbacks import CBEventType, EventPayload
import logging
import time
class ProductionDeployment:
def __init__(self):
self.setup_logging()
self.setup_monitoring()
def setup_logging(self):
"""设置生产环境日志"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('llamaindex.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def setup_monitoring(self):
"""设置监控回调"""
from llama_index.core.callbacks import BaseCallbackHandler
class ProductionCallback(BaseCallbackHandler):
def __init__(self, logger):
super().__init__(event_starts_to_ignore=[], event_ends_to_ignore=[])
self.logger = logger
self.start_times = {}
def on_event_start(self, event_type: CBEventType, payload: EventPayload, **kwargs):
self.start_times[event_type] = time.time()
self.logger.info(f"Event started: {event_type}")
def on_event_end(self, event_type: CBEventType, payload: EventPayload, **kwargs):
if event_type in self.start_times:
duration = time.time() - self.start_times[event_type]
self.logger.info(f"Event completed: {event_type}, Duration: {duration:.2f}s")
# 记录性能指标
if event_type == CBEventType.QUERY:
self.log_query_metrics(payload, duration)
def log_query_metrics(self, payload, duration):
"""记录查询指标"""
self.logger.info(f"Query metrics: duration={duration:.2f}s")
Settings.callback_manager.add_handler(ProductionCallback(self.logger))