Spaces:
Paused
Paused
| """ | |
| GraphRAG索引器 | |
| 负责构建层次化的知识图谱索引,包括实体提取、图谱构建、社区检测和摘要生成 | |
| """ | |
| from typing import List, Dict, Optional | |
| import asyncio | |
| try: | |
| from langchain_core.documents import Document | |
| except ImportError: | |
| try: | |
| from langchain_core.documents import Document | |
| except ImportError: | |
| try: | |
| from langchain_core.documents import Document | |
| except ImportError: | |
| from langchain.schema import Document | |
| try: | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| except ImportError: | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from entity_extractor import EntityExtractor, EntityDeduplicator | |
| from knowledge_graph import KnowledgeGraph, CommunitySummarizer | |
| class GraphRAGIndexer: | |
| """GraphRAG索引器 - 实现Microsoft GraphRAG的索引流程""" | |
| def __init__(self, enable_async: bool = True, async_batch_size: int = 8): | |
| """初始化GraphRAG索引器 | |
| Args: | |
| enable_async: 是否启用异步处理(默认启用) | |
| async_batch_size: 异步并发批次大小(默认5个文档并发) | |
| """ | |
| print("🚀 初始化GraphRAG索引器...") | |
| self.entity_extractor = EntityExtractor(enable_async=enable_async) | |
| self.entity_deduplicator = EntityDeduplicator() | |
| self.knowledge_graph = KnowledgeGraph() | |
| self.community_summarizer = CommunitySummarizer() | |
| self.enable_async = enable_async | |
| self.async_batch_size = async_batch_size | |
| self.indexed = False | |
| mode = "异步模式" if enable_async else "同步模式" | |
| print(f"✅ GraphRAG索引器初始化完成 ({mode}, 并发数={async_batch_size})") | |
| def index_documents(self, documents: List[Document], | |
| batch_size: int = 10, | |
| save_path: Optional[str] = None) -> KnowledgeGraph: | |
| """ | |
| 对文档集合建立GraphRAG索引 | |
| 工作流程(遵循Microsoft GraphRAG): | |
| 1. 文档分块(已在document_processor中完成) | |
| 2. 实体和关系提取 | |
| 3. 实体去重和合并 | |
| 4. 构建知识图谱 | |
| 5. 社区检测 | |
| 6. 生成社区摘要 | |
| Args: | |
| documents: 文档列表 | |
| batch_size: 批处理大小 | |
| save_path: 保存路径 | |
| Returns: | |
| 构建好的知识图谱 | |
| """ | |
| print(f"\n{'='*50}") | |
| print(f"📊 开始GraphRAG索引流程") | |
| print(f" 文档数量: {len(documents)}") | |
| print(f"{'='*50}\n") | |
| # 步骤1: 实体和关系提取 | |
| print("📍 步骤 1/5: 实体和关系提取") | |
| extraction_results = [] | |
| if self.enable_async: | |
| # 异步批量处理模式 | |
| print(f"🚀 使用异步处理模式,并发数={self.async_batch_size}") | |
| extraction_results = self._extract_async(documents) | |
| else: | |
| # 同步处理模式(原有逻辑) | |
| print("🔄 使用同步处理模式") | |
| total_batches = (len(documents) - 1) // batch_size + 1 | |
| for i in range(0, len(documents), batch_size): | |
| batch = documents[i:i+batch_size] | |
| batch_num = i // batch_size + 1 | |
| print(f"\n⚙️ === 批次 {batch_num}/{total_batches} (文档 {i+1}-{min(i+batch_size, len(documents))}) ===") | |
| for idx, doc in enumerate(batch): | |
| doc_global_index = i + idx | |
| try: | |
| result = self.entity_extractor.extract_from_document( | |
| doc.page_content, | |
| doc_index=doc_global_index | |
| ) | |
| extraction_results.append(result) | |
| except Exception as e: | |
| print(f" ❌ 文档 #{doc_global_index + 1} 处理失败: {e}") | |
| # 添加空结果以保持索引一致 | |
| extraction_results.append({"entities": [], "relations": []}) | |
| print(f"✅ 批次 {batch_num}/{total_batches} 完成") | |
| # 步骤2: 实体去重 | |
| print("\n📍 步骤 2/5: 实体去重和合并") | |
| all_entities = [] | |
| all_relations = [] | |
| for result in extraction_results: | |
| all_entities.extend(result.get("entities", [])) | |
| all_relations.extend(result.get("relations", [])) | |
| dedup_result = self.entity_deduplicator.deduplicate_entities(all_entities) | |
| unique_entities = dedup_result["entities"] | |
| entity_mapping = dedup_result["mapping"] | |
| # 更新关系中的实体名称 | |
| mapped_relations = [] | |
| for relation in all_relations: | |
| source = entity_mapping.get(relation["source"], relation["source"]) | |
| target = entity_mapping.get(relation["target"], relation["target"]) | |
| mapped_relations.append({ | |
| **relation, | |
| "source": source, | |
| "target": target | |
| }) | |
| # 步骤3: 构建知识图谱 | |
| print("\n📍 步骤 3/5: 构建知识图谱") | |
| cleaned_results = [{ | |
| "entities": unique_entities, | |
| "relations": mapped_relations | |
| }] | |
| self.knowledge_graph.build_from_extractions(cleaned_results) | |
| # 步骤4: 社区检测 | |
| print("\n📍 步骤 4/5: 社区检测") | |
| self.knowledge_graph.detect_communities(algorithm="louvain") | |
| # 步骤5: 生成社区摘要 | |
| print("\n📍 步骤 5/5: 生成社区摘要") | |
| self.community_summarizer.summarize_all_communities(self.knowledge_graph) | |
| # 保存图谱 | |
| if save_path: | |
| self.knowledge_graph.save_to_file(save_path) | |
| self.indexed = True | |
| # 打印统计信息 | |
| print(f"\n{'='*50}") | |
| print("✅ GraphRAG索引构建完成!") | |
| stats = self.knowledge_graph.get_statistics() | |
| print(f"\n📊 统计信息:") | |
| print(f" - 节点数: {stats['num_nodes']}") | |
| print(f" - 边数: {stats['num_edges']}") | |
| print(f" - 社区数: {stats['num_communities']}") | |
| print(f" - 图密度: {stats['density']:.4f}") | |
| print(f"\n 实体类型分布:") | |
| for etype, count in stats['entity_types'].items(): | |
| print(f" • {etype}: {count}") | |
| print(f"{'='*50}\n") | |
| return self.knowledge_graph | |
| def _extract_async(self, documents: List[Document]) -> List[Dict]: | |
| """异步批量提取实体和关系 | |
| Args: | |
| documents: 文档列表 | |
| Returns: | |
| 提取结果列表 | |
| """ | |
| total_docs = len(documents) | |
| extraction_results = [] | |
| # 将文档分成多个异步批次 | |
| for i in range(0, total_docs, self.async_batch_size): | |
| batch_end = min(i + self.async_batch_size, total_docs) | |
| batch_num = i // self.async_batch_size + 1 | |
| total_batches = (total_docs - 1) // self.async_batch_size + 1 | |
| print(f"\n⚡ === 异步批次 {batch_num}/{total_batches} (文档 {i+1}-{batch_end}) ===") | |
| # 准备异步批次数据 | |
| async_batch = [ | |
| (documents[idx].page_content, idx) | |
| for idx in range(i, batch_end) | |
| ] | |
| # 异步执行当前批次 | |
| try: | |
| batch_results = asyncio.run( | |
| main=self.entity_extractor.extract_batch_async(async_batch) | |
| ) | |
| extraction_results.extend(batch_results) | |
| print(f"✅ 异步批次 {batch_num}/{total_batches} 完成") | |
| except Exception as e: | |
| print(f"❌ 异步批次 {batch_num} 失败: {e}") | |
| # 添加空结果 | |
| for _ in range(len(async_batch)): | |
| extraction_results.append({"entities": [], "relations": []}) | |
| return extraction_results | |
| def get_graph(self) -> KnowledgeGraph: | |
| """获取知识图谱""" | |
| if not self.indexed: | |
| print("⚠️ 图谱尚未构建,请先调用 index_documents()") | |
| return self.knowledge_graph | |
| def load_index(self, filepath: str) -> KnowledgeGraph: | |
| """加载已有的图谱索引""" | |
| print(f"📂 从文件加载图谱索引: {filepath}") | |
| self.knowledge_graph.load_from_file(filepath) | |
| self.indexed = True | |
| return self.knowledge_graph | |
| def initialize_graph_indexer(): | |
| """初始化GraphRAG索引器""" | |
| return GraphRAGIndexer() | |