| """prepare.py |
| |
| Utilities to prepare documents and knowledge-graph artifacts for a RAG (Retrieval-Augmented |
| Generation) pipeline. |
| |
| This module implements: |
| - safe file loading for text-like files (UTF-8 tolerant) |
| - dataset creation from a `context` directory using various loaders |
| - chunking, embedding and upserting to Pinecone via `prepare_RAG` |
| - building/updating a Knowledge Graph and generating hierarchical community summaries |
| |
| Main public functions: |
| - create_dataset(directory_path: str) -> List[Document] |
| - prepare_RAG(pinecone_API, index_name, ...) -> (index, pc, llm, documents) |
| - build_knowledge_graph(documents, llm, pc, index, info=True) -> KnowledgeGraphIndex |
| |
| Note: many helper functions are nested; this docstring highlights the high-level |
| purpose and responsibilities only. |
| """ |
|
|
| import os |
| import pathlib |
| import time |
| import re |
| from pinecone import Pinecone |
|
|
| from langchain_mistralai import ChatMistralAI |
| from langchain_openai import ChatOpenAI |
| from langchain_core.messages import HumanMessage, SystemMessage |
| from langchain.schema import Document |
| from langchain_community.document_loaders import ( |
| CSVLoader, PyPDFLoader, UnstructuredWordDocumentLoader, |
| UnstructuredPowerPointLoader, UnstructuredMarkdownLoader, |
| UnstructuredHTMLLoader, NotebookLoader |
| ) |
| from langchain_text_splitters import RecursiveCharacterTextSplitter |
|
|
| from llama_index.core.memory import Memory |
|
|
| import pickle |
|
|
| import json |
| from typing import List, Any |
| from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, BaseMessage |
|
|
| from typing import List, Any |
| from pydantic import BaseModel, ValidationError |
|
|
|
|
| memory = Memory(token_limit=2048) |
|
|
| |
| |
| |
| class SafeTextLoader: |
| """Loads a text file as a single Document, safely handling UTF-8 decoding errors.""" |
| def __init__(self, file_path): |
| self.file_path = file_path |
|
|
| def load(self): |
| """Load the file and return a list containing a single LangChain `Document`. |
| |
| The loader is UTF-8 tolerant: it reads raw bytes and decodes using UTF-8 with |
| 'ignore' on errors to avoid failing on files containing invalid sequences. |
| |
| Returns: |
| List[Document]: a list with one Document (page_content and metadata['source']) |
| or an empty list on error. |
| """ |
| try: |
| with open(self.file_path, "rb") as f: |
| raw_bytes = f.read() |
| text = raw_bytes.decode("utf-8", errors="ignore") |
| return [Document(page_content=text, metadata={"source": str(self.file_path)})] |
| except Exception as e: |
| print(f"[Error] Failed to read {self.file_path}: {e}") |
| return [] |
|
|
| |
| |
| |
| LOADER_MAPPING = { |
| ".txt": SafeTextLoader, |
| ".json": SafeTextLoader, |
| ".md": UnstructuredMarkdownLoader, |
| ".csv": CSVLoader, |
| ".yaml": SafeTextLoader, |
| ".yml": SafeTextLoader, |
| ".pdf": PyPDFLoader, |
| ".docx": UnstructuredWordDocumentLoader, |
| ".pptx": UnstructuredPowerPointLoader, |
| ".html": UnstructuredHTMLLoader, |
| ".htm": UnstructuredHTMLLoader, |
| ".ipynb": NotebookLoader, |
| ".py": SafeTextLoader, |
| ".js": SafeTextLoader, |
| ".sql": SafeTextLoader, |
| } |
|
|
| CONTEXT_ROOT = pathlib.Path(__file__).parent / "context" |
|
|
| def create_dataset(directory_path: str = "context"): |
| """Recursively load files under `directory_path` using extension-specific loaders.""" |
|
|
| target_dir = pathlib.Path(directory_path).resolve() |
| if not target_dir.exists() or not target_dir.is_dir(): |
| print(f"[Error] Target directory does not exist: {target_dir}") |
| return [] |
|
|
| documents = [] |
| for file_path in target_dir.rglob("*"): |
| if not file_path.is_file(): |
| continue |
| ext = file_path.suffix.lower() |
| loader_cls = LOADER_MAPPING.get(ext) |
| if loader_cls is None: |
| print(f"[Skip] Unsupported file type: {file_path}") |
| continue |
| try: |
| loader = loader_cls(str(file_path)) |
| docs = loader.load() |
| documents.extend(docs) |
| print(f"[Loaded] {file_path} ({len(docs)} docs)") |
| except Exception as e: |
| print(f"[Error] Failed to load {file_path}: {e}") |
|
|
| print(f"[Done] Finished scanning {target_dir}") |
| print(f"Total documents loaded: {len(documents)}") |
| return documents |
|
|
|
|
| from llama_index.core import KnowledgeGraphIndex |
| from llama_index.core import Document as LlamaDocument |
|
|
| import hashlib |
|
|
|
|
| def fetch_existing_ids(index, namespace, ids, batch_size=100): |
| """Fetch IDs from Pinecone in safe batches to avoid URI too large errors""" |
| existing_ids = set() |
| for start in range(0, len(ids), batch_size): |
| batch_ids = ids[start:start + batch_size] |
| result = index.fetch(ids=batch_ids, namespace=namespace) |
| if hasattr(result, "vectors"): |
| existing_ids.update(result.vectors.keys()) |
| return existing_ids |
|
|
|
|
|
|
| |
| |
| |
| import hashlib |
| import time |
| from langchain_text_splitters import RecursiveCharacterTextSplitter |
| from llama_index.core import Document as LlamaDocument |
| from pinecone import Pinecone |
|
|
| import os |
| import re |
| import time |
| import hashlib |
|
|
| from langchain_openai import ChatOpenAI |
| from langchain_mistralai import ChatMistralAI |
| from langchain_text_splitters import RecursiveCharacterTextSplitter |
| from pinecone import Pinecone |
|
|
| from llama_index.core import Document as LlamaDocument |
|
|
| |
| |
| |
|
|
|
|
| |
| |
| |
| def _build_and_index_community_summaries( |
| kg_index, |
| pc, |
| index, |
| llm, |
| impacted_nodes=None, |
| info=True, |
| ): |
| """ |
| This function implements a hierarchical community detection and summarization pipeline: |
| |
| 1. COMMUNITY DETECTION: |
| - Uses NetworkX's greedy_modularity_communities to find natural clusters in the KG |
| - Filters communities by minimum size (COMMUNITY_MIN_SIZE) to avoid noise |
| |
| 2. HIERARCHY CONSTRUCTION: |
| - Builds a multi-level tree structure (max depth = MAX_HIERARCHY_DEPTH) |
| - Recursively splits large communities using the same modularity algorithm |
| - Creates parent-child relationships between community levels |
| |
| 3. AFFECTED NODE TRACKING: |
| - Marks communities as "_affected" if they contain new/updated nodes |
| - Propagates affected status upward to parent communities |
| - Enables incremental updates by only processing changed regions |
| |
| 4. BOTTOM-UP SUMMARIZATION: |
| - Leaf communities: Generate detailed reports from entity relationships |
| - Parent communities: Synthesize child summaries into higher-level overviews |
| - Uses sampling (LIMIT_NODES_PER_SUMMARY) to handle large communities |
| |
| 5. VECTOR STORAGE: |
| - Creates stable IDs using SHA-256 hashes of community composition |
| - Embeds summaries using Pinecone's llama-text-embed-v2 model |
| - Stores in dedicated "community-summaries" namespace |
| """ |
|
|
| import hashlib |
| import networkx as nx |
| from networkx.algorithms.community import greedy_modularity_communities |
|
|
| COMMUNITY_NAMESPACE = "community-summaries" |
| COMMUNITY_MIN_SIZE = 3 |
| MAX_HIERARCHY_DEPTH = 2 |
| LIMIT_NODES_PER_SUMMARY = 60 |
| LIMIT_TRIPLES_PER_SUMMARY = 120 |
|
|
| try: |
| nxg = kg_index.get_networkx_graph() |
| except Exception as e: |
| print(f"[Error] Unable to extract NetworkX graph from KG: {e}") |
| return |
|
|
| if nxg.number_of_nodes() == 0 or nxg.number_of_edges() == 0: |
| if info: |
| print("[Community] KG empty or trivial; skipping community summarization.") |
| return |
|
|
| first_run = impacted_nodes is None |
| impacted_nodes = set(impacted_nodes or []) |
|
|
| if info: |
| print(f"[Community] Starting summarization. First run: {first_run}") |
| print(f"[Community] Impacted nodes: {len(impacted_nodes)}") |
|
|
| |
| if info: |
| print("[Community] Detecting top-level communities (greedy modularity)...") |
| try: |
| communities = list(greedy_modularity_communities(nxg)) |
| except Exception as e: |
| print(f"[Error] Community detection failed: {e}") |
| return |
|
|
| large_communities = [c for c in communities if len(c) >= max(2, COMMUNITY_MIN_SIZE)] |
| small_communities = [c for c in communities if len(c) < max(2, COMMUNITY_MIN_SIZE)] |
|
|
| if info: |
| print(f"[Community] Found {len(communities)} communities; " |
| f"{len(large_communities)} large, {len(small_communities)} small.") |
|
|
| |
| hierarchy = [] |
| for idx, comm in enumerate(large_communities): |
| subgraph = nxg.subgraph(comm).copy() |
| node_set = set(subgraph.nodes()) |
| node = { |
| "id": f"C{idx}", |
| "level": 0, |
| "nodes": node_set, |
| "children": [], |
| "_affected": first_run or bool(impacted_nodes & node_set), |
| } |
|
|
| |
| frontier = [(node, subgraph, 1)] |
| while frontier: |
| parent, g, depth = frontier.pop() |
| if depth > MAX_HIERARCHY_DEPTH or g.number_of_nodes() < max(2, COMMUNITY_MIN_SIZE * 2): |
| continue |
| try: |
| subs = list(greedy_modularity_communities(g)) |
| except Exception: |
| subs = [] |
|
|
| subs = [s for s in subs if 1 <= len(s) <= len(g) - 1] |
| subs = [s for s in subs if len(s) >= max(2, COMMUNITY_MIN_SIZE)] |
|
|
| for j, s in enumerate(subs): |
| sg = g.subgraph(s).copy() |
| child = { |
| "id": f"{parent['id']}.{j}", |
| "level": depth, |
| "nodes": set(s), |
| "children": [], |
| "_affected": first_run or bool(impacted_nodes & set(s)), |
| } |
| parent["children"].append(child) |
| if depth + 1 <= MAX_HIERARCHY_DEPTH and sg.number_of_nodes() >= max(2, COMMUNITY_MIN_SIZE * 2): |
| frontier.append((child, sg, depth + 1)) |
|
|
| hierarchy.append(node) |
|
|
| |
| def mark_ancestors(n): |
| any_child = False |
| for c in n["children"]: |
| if mark_ancestors(c): |
| any_child = True |
| if any_child: |
| n["_affected"] = True |
| return n["_affected"] |
|
|
| for root in hierarchy: |
| mark_ancestors(root) |
|
|
| |
| def triples_within(node_ids, graph): |
| res = [] |
| for (u, v, data) in graph.edges(data=True): |
| if u in node_ids and v in node_ids: |
| rel = data.get("label") or data.get("relationship") or "related_to" |
| res.append((u, rel, v)) |
| return res |
|
|
| def sample_for_prompt(nodes_set, triples_list, max_nodes=LIMIT_NODES_PER_SUMMARY, max_triples=LIMIT_TRIPLES_PER_SUMMARY): |
| nodes_list = list(nodes_set)[:max_nodes] |
| triples_list = triples_list[:max_triples] |
| return nodes_list, triples_list |
|
|
| def summarize_leaf(nodes_set, graph): |
| nodes_list, tri_list = sample_for_prompt( |
| nodes_set, |
| triples_within(nodes_set, graph) |
| ) |
| prompt = ( |
| "You are creating a concise community report from a knowledge graph.\n" |
| "Given the following entity list and intra-community relationships, produce:\n" |
| " - Title\n" |
| " - Key Themes (bullet points)\n" |
| " - Notable Entities\n" |
| " - Important Relationships (summarize patterns rather than listing all)\n" |
| " - Outliers or Cross-links (if any)\n" |
| " - 3-5 Answerable Questions this community can address\n" |
| "Keep it under ~250-300 words.\n\n" |
| f"Entities (sample): {nodes_list}\n" |
| f"Relationships (sample triples): {[f'{u} --[{r}]--> {v}' for (u,r,v) in tri_list]}\n" |
| ) |
| resp = llm.invoke(prompt) |
| return resp.content.strip() |
|
|
| def summarize_parent(child_summaries): |
| join_text = "\n\n".join([f"[Child {i+1}]\n{txt}" for i, txt in enumerate(child_summaries)]) |
| prompt = ( |
| "You are creating a higher-level summary that unifies several community reports.\n" |
| "Synthesize the following child community reports into a coherent parent-level summary:\n" |
| " - Overarching Title\n" |
| " - Cross-community Key Themes\n" |
| " - How the sub-communities relate and differ\n" |
| " - Cross-cutting entities/relationships\n" |
| " - 3-5 high-level questions the parent community can answer\n" |
| "Target length: 250-350 words.\n\n" |
| f"{join_text}\n" |
| ) |
| resp = llm.invoke(prompt) |
| return resp.content.strip() |
|
|
| |
| def build_summaries(node, graph): |
| if not node["_affected"]: |
| return None |
| if not node["children"]: |
| node["summary"] = summarize_leaf(node["nodes"], graph) |
| return node["summary"] |
| child_summaries = [] |
| for ch in node["children"]: |
| s = build_summaries(ch, graph) |
| if s is not None: |
| child_summaries.append(s) |
| if child_summaries: |
| node["summary"] = summarize_parent(child_summaries) |
| return node["summary"] |
| node["summary"] = summarize_leaf(node["nodes"], graph) |
| return node["summary"] |
|
|
| for node in hierarchy: |
| build_summaries(node, nxg) |
|
|
| |
| flat_nodes = [] |
| def flatten(n): |
| if n.get("_affected") and "summary" in n: |
| flat_nodes.append({ |
| "id": n["id"], |
| "level": n["level"], |
| "size": len(n["nodes"]), |
| "nodes": list(n["nodes"]), |
| "summary": n["summary"] |
| }) |
| for c in n["children"]: |
| flatten(c) |
| for n in hierarchy: |
| flatten(n) |
|
|
| if not flat_nodes: |
| if info: |
| print("[Community] No affected community summaries to upsert.") |
| return |
|
|
| if info: |
| print(f"[Community] Upserting {len(flat_nodes)} community summaries to namespace: {COMMUNITY_NAMESPACE}") |
|
|
| def summary_vec_id(node_rec): |
| key = f"{node_rec['id']}|{node_rec['level']}|{','.join(sorted(node_rec['nodes'])[:20])}" |
| return "comm_" + hashlib.sha256(key.encode("utf-8")).hexdigest()[:24] |
|
|
| |
| B = 96 |
| texts = [rec["summary"] for rec in flat_nodes] |
| ids = [summary_vec_id(rec) for rec in flat_nodes] |
| metas = [{ |
| "type": "community_summary", |
| "community_id": rec["id"], |
| "level": rec["level"], |
| "size": rec["size"], |
| "node_sample": rec["nodes"][:20], |
| "text": rec["summary"] |
| } for rec in flat_nodes] |
|
|
| for start in range(0, len(texts), B): |
| batch_texts = texts[start:start+B] |
| batch_ids = ids[start:start+B] |
| batch_metas = metas[start:start+B] |
| emb = pc.inference.embed( |
| model="llama-text-embed-v2", |
| inputs=batch_texts, |
| parameters={"input_type": "passage", "truncate": "END"} |
| ) |
| vectors = [ |
| {"id": vid, "values": e["values"], "metadata": meta} |
| for vid, e, meta in zip(batch_ids, emb, batch_metas) |
| ] |
| index.upsert(vectors=vectors, namespace=COMMUNITY_NAMESPACE) |
|
|
| if info: |
| print("[Community] Community summaries upsert completed.") |
|
|
| def prepare_RAG( |
| pinecone_API, |
| index_name, |
| chunk_size=400, |
| chunk_overlap=30, |
| llm_model="gpt-4.1-nano", |
| dir_name="context", |
| info=True |
| ): |
| """ |
| Steps: |
| 1) Select LLM wrapper (OpenAI vs. Mistral) by `llm_model` string. |
| 2) Create dataset with `create_dataset(dir_name)`. |
| 3) Connect to Pinecone and obtain `index`. |
| 4) Split documents into chunks; normalize `metadata['source']` to be path-relative |
| to the `context` anchor (stable across machines). |
| 5) Compute stable vector IDs per chunk from source+content hashes. |
| 6) Use `fetch_existing_ids` to identify and skip already-indexed chunks. |
| 7) Embed only new chunks via `pc.inference.embed` (retry with backoff). |
| 8) Upsert embeddings and metadata into a fixed namespace (`example-namespace`). |
| """ |
|
|
| import os, re, hashlib, time |
| from pinecone import Pinecone |
| from langchain_mistralai import ChatMistralAI |
| from langchain_openai import ChatOpenAI |
| from langchain_text_splitters import RecursiveCharacterTextSplitter |
|
|
| if info: |
| print(f"Preparing RAG with LLM: {llm_model}, Index: {index_name}, Dir: {dir_name}") |
| llm = ChatOpenAI(model=llm_model, streaming=True) if "gpt" in llm_model else ChatMistralAI(model=llm_model, streaming=True) |
|
|
| documents = create_dataset(dir_name) |
| pc = Pinecone(api_key=pinecone_API) |
| index = pc.Index(index_name) |
|
|
| if not documents: |
| print(f"[Warning] No documents found. Using existing Pinecone index.") |
| return index, pc, llm, None |
|
|
| splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) |
| all_splits = splitter.split_documents(documents) |
|
|
| def path_after_context(full_path: str, anchor: str = "context") -> str: |
| if not full_path: |
| return "" |
| parts = re.split(r"[\\/]+", str(full_path)) |
| idx = None |
| for i, part in enumerate(parts): |
| if part.lower() == anchor.lower(): |
| idx = i |
| if idx is not None and idx < len(parts) - 1: |
| return "/".join(parts[idx + 1 :]) |
| return os.path.basename(str(full_path)) |
|
|
| for chunk in all_splits: |
| if "source" in chunk.metadata and chunk.metadata["source"]: |
| chunk.metadata["source"] = path_after_context(chunk.metadata["source"], anchor="context") |
|
|
| if info: |
| print(f"Total chunks: {len(all_splits)}") |
|
|
| def chunk_id(chunk, prefix="vec"): |
| text_hash = hashlib.sha256(chunk.page_content.encode("utf-8")).hexdigest()[:16] |
| source = chunk.metadata.get("source", "unknown") |
| file_hash = hashlib.sha256(source.encode("utf-8")).hexdigest()[:8] |
| return f"{prefix}_{file_hash}_{text_hash}" |
|
|
|
|
| |
| |
| |
| """ |
| VECTOR ID GENERATION STRATEGY: |
| |
| For Document Chunks: |
| Pattern: "vec_{file_hash}_{content_hash}" |
| - file_hash: SHA-256 of normalized source path (8 chars) |
| - content_hash: SHA-256 of chunk content (16 chars) |
| - Enables exact duplicate detection across runs |
| - Stable across different machine paths due to source normalization |
| |
| For Community Summaries: |
| Pattern: "comm_{community_hash}" |
| - community_hash: SHA-256 of "community_id|level|sorted_node_sample" |
| - Ensures stable IDs for the same community composition |
| - Allows updates when community structure changes |
| |
| NAMESPACE STRATEGY: |
| - "example-namespace": Stores document chunk embeddings |
| - "community-summaries": Stores hierarchical community summaries |
| - Separation enables independent update/query strategies |
| - Prevents interference between document and summary vectors |
| |
| IDEMPOTENCY GUARANTEE: |
| - fetch_existing_ids() checks Pinecone before embedding |
| - Prevents duplicate embeddings for identical content |
| - Enables safe re-runs without data duplication |
| - Reduces embedding costs and storage usage |
| """ |
|
|
| namespace = "example-namespace" |
| all_ids = [chunk_id(c) for c in all_splits] |
| existing = fetch_existing_ids(index, namespace, all_ids, batch_size=100) |
| new_chunks = [(c, i) for c, i in zip(all_splits, all_ids) if i not in existing] |
|
|
| if info: |
| print(f"Chunks already indexed: {len(all_splits) - len(new_chunks)}") |
| print(f"New chunks to embed: {len(new_chunks)}") |
|
|
| if not new_chunks: |
| print("[Info] Nothing new to index. Skipping embedding/upsert.") |
| else: |
| batch_size = 94 |
|
|
| def retry_forever(func, *args, **kwargs): |
| attempt = 1 |
| while True: |
| try: |
| return func(*args, **kwargs) |
| except Exception as e: |
| wait = min(60, 2 ** min(attempt, 6)) |
| print(f"[Retry] {func.__name__} failed (attempt {attempt}): {e}. Sleeping {wait}s") |
| time.sleep(wait) |
| attempt += 1 |
|
|
| for start_idx in range(0, len(new_chunks), batch_size): |
| print(f"[Info] Embedding and upserting batch {start_idx // batch_size + 1}...") |
| batch, ids = zip(*new_chunks[start_idx:start_idx + batch_size]) |
| texts = [chunk.page_content for chunk in batch] |
| metas = [chunk.metadata or {} for chunk in batch] |
|
|
| embeddings = retry_forever( |
| pc.inference.embed, |
| model="llama-text-embed-v2", |
| inputs=texts, |
| parameters={"input_type": "passage", "truncate": "END"} |
| ) |
|
|
| batch_records = [ |
| {"id": i, "values": e['values'], "metadata": {"text": t, **m}} |
| for i, e, t, m in zip(ids, embeddings, texts, metas) |
| ] |
| retry_forever(index.upsert, vectors=batch_records, namespace=namespace) |
|
|
| if info: |
| print(f"Completed upsert of {len(new_chunks)} new vectors.") |
|
|
| return index, pc, llm, documents |
|
|
|
|
| def build_knowledge_graph(documents, llm, pc, index, info=True): |
| """ |
| Build/update the Knowledge Graph (KG) from documents, persist it, merge deltas, and |
| (re)generate community summaries for changed regions. |
| |
| Args: |
| documents: List of LangChain Documents from prepare_RAG (may be empty). |
| llm: LangChain-compatible LLM used via LlamaIndex. |
| pc: Pinecone client (for embeddings). |
| index: Pinecone index to store community summary vectors. |
| info: Enable verbose logging. |
| |
| Returns: |
| KnowledgeGraphIndex | None |
| |
| Flow: |
| 1) Identify new/changed docs via source+content hashing (seen file cache). |
| 2) Load existing KG from pickle or build a fresh one. |
| 3) If there is a delta, build a delta KG and merge nodes/edges. |
| 4) Summarize impacted communities and upsert summaries to Pinecone. |
| 5) Export `knowledge_graph.json` and update the seen-file signatures. |
| """ |
|
|
|
|
| import os, pickle, json, hashlib, re |
| from llama_index.core import Document, KnowledgeGraphIndex |
| from llama_index.llms.langchain import LangChainLLM |
|
|
| |
| def path_after_context(full_path: str, anchor: str = "context") -> str: |
| if not full_path: |
| return "" |
| parts = re.split(r"[\\/]+", str(full_path)) |
| idx = None |
| for i, part in enumerate(parts): |
| if part.lower() == anchor.lower(): |
| idx = i |
| if idx is not None and idx < len(parts) - 1: |
| return "/".join(parts[idx + 1 :]) |
| return os.path.basename(str(full_path)) |
|
|
| def file_sig(doc_like): |
| """Return (sig_id, normalized_source) using source+content hashing similar to prepare_RAG.""" |
| meta = getattr(doc_like, "metadata", {}) or {} |
| text = getattr(doc_like, "page_content", "") or getattr(doc_like, "text", "") or "" |
| src = meta.get("source", "unknown") |
| if src: |
| src = path_after_context(src, anchor="context") |
| src_hash = hashlib.sha256(src.encode("utf-8")).hexdigest()[:8] |
| text_hash = hashlib.sha256(text.encode("utf-8")).hexdigest()[:16] |
| return f"kg_{src_hash}_{text_hash}", src |
|
|
| def load_seen_sigs(path="kg_seen_files.json"): |
| try: |
| if os.path.exists(path): |
| with open(path, "r", encoding="utf-8") as f: |
| data = json.load(f) |
| return set(data if isinstance(data, list) else []) |
| except Exception as e: |
| print(f"[Warn] Failed to load seen file sigs: {e}") |
| return set() |
|
|
| def save_seen_sigs(sigs, path="kg_seen_files.json"): |
| try: |
| with open(path, "w", encoding="utf-8") as f: |
| json.dump(sorted(list(sigs)), f, indent=2) |
| except Exception as e: |
| print(f"[Warn] Failed to save seen file sigs: {e}") |
|
|
| |
| |
| |
| """ |
| CRITICAL: This section handles the complex merge of new documents into existing knowledge graphs. |
| |
| KEY CHALLENGES ADDRESSED: |
| - Duplicate Detection: Uses content+source hashing to identify truly new/changed documents |
| - Delta Processing: Builds partial KG from only new documents, then merges |
| - Conflict Resolution: Handles nodes/edges that may already exist in the base graph |
| - Change Propagation: Tracks exactly which nodes/edges are new for community summarization |
| |
| MERGE STRATEGY: |
| 1. Signature-based filtering identifies only new/changed documents |
| 2. Builds a "delta KG" from new documents only |
| 3. Performs set operations to find truly new nodes/edges: |
| - new_nodes = delta_nodes - base_nodes |
| - new_edges = delta_edges - base_edges |
| 4. Merges using NetworkX's native add_nodes_from/add_edges_from |
| 5. Preserves all node/edge attributes during merge |
| |
| WHY THIS MATTERS: |
| - Without proper incremental updates, the system would rebuild the entire KG every time |
| - Enables efficient updates when only a few documents change |
| - Maintains community summaries for unchanged parts of the graph |
| """ |
|
|
| seen_sigs = load_seen_sigs() |
|
|
| |
| all_docs = documents or [] |
| new_docs = [] |
| new_sigs = [] |
| for d in all_docs: |
| sig, _ = file_sig(d) |
| if sig not in seen_sigs: |
| new_docs.append(d) |
| new_sigs.append(sig) |
|
|
| if info: |
| print(f"[KG] Total input docs: {len(all_docs)} | New/changed docs detected: {len(new_docs)}") |
|
|
| |
| llama_docs_all = [Document(text=doc.page_content, metadata=doc.metadata) for doc in all_docs] |
| llama_docs_delta = [Document(text=doc.page_content, metadata=doc.metadata) for doc in new_docs] |
| llm_for_kg = LangChainLLM(llm) |
| persist_file = os.path.abspath("./kg_index.pkl") |
|
|
| def _build_and_persist(docs): |
| kg = KnowledgeGraphIndex.from_documents( |
| documents=docs, |
| max_triplets_per_chunk=20, |
| extract_relations=True, |
| include_embeddings=True, |
| llm=llm_for_kg |
| ) |
| with open(persist_file, "wb") as f: |
| pickle.dump(kg, f) |
| return kg |
|
|
| def _load_existing(): |
| with open(persist_file, "rb") as f: |
| return pickle.load(f) |
|
|
| kg_index = None |
| graph_exists = False |
|
|
| try: |
| if os.path.exists(persist_file): |
| if info: |
| print("[Info] Found persisted KG pickle file.") |
| graph_exists = True |
| kg_index = _load_existing() |
| if info: |
| print("[Info] Loaded Knowledge Graph from pickle.") |
| elif llama_docs_all: |
| if info: |
| print("[Info] No persisted KG found. Building new KG from all documents...") |
| kg_index = _build_and_persist(llama_docs_all) |
| if info: |
| print("[Info] Built and persisted Knowledge Graph via pickle.") |
| else: |
| if info: |
| print("[Info] No persisted KG found and no documents to build from.") |
| except Exception as e: |
| print(f"[Error] GraphRAG init/load failed: {e}") |
| kg_index = None |
|
|
| |
| inserted_any = False |
| graph_override = None |
|
|
| new_nodes = set() |
| new_edges = set() |
|
|
| if kg_index and graph_exists and llama_docs_delta: |
| if info: |
| print(f"[Info] Incrementally inserting {len(llama_docs_delta)} new/changed docs into KG...") |
|
|
| |
| try: |
| |
| kg_delta = KnowledgeGraphIndex.from_documents( |
| documents=llama_docs_delta, |
| max_triplets_per_chunk=20, |
| extract_relations=True, |
| include_embeddings=False, |
| llm=llm_for_kg |
| ) |
| nxg_base = kg_index.get_networkx_graph() |
| nxg_delta = kg_delta.get_networkx_graph() |
|
|
| |
| base_nodes_before = set(nxg_base.nodes()) |
| base_edges_before = set(nxg_base.edges()) |
| delta_nodes = set(nxg_delta.nodes()) |
| delta_edges = set(nxg_delta.edges()) |
|
|
| print(f"\n[Diagnostic] Base graph nodes before merge: {len(base_nodes_before)}") |
| print(f"[Diagnostic] Base graph edges before merge: {len(base_edges_before)}") |
| print(f"[Diagnostic] Delta graph nodes: {len(delta_nodes)}") |
| print(f"[Diagnostic] Delta graph edges: {len(delta_edges)}") |
|
|
| |
| new_nodes = delta_nodes - base_nodes_before |
| new_edges = delta_edges - base_edges_before |
| already_existing_nodes = delta_nodes & base_nodes_before |
| already_existing_edges = delta_edges & base_edges_before |
|
|
| print(f"[Diagnostic] Delta nodes already in base: {len(already_existing_nodes)}") |
| print(f"[Diagnostic] Delta edges already in base: {len(already_existing_edges)}") |
| print(f"[Diagnostic] Truly new nodes to add: {len(new_nodes)}") |
| print(f"[Diagnostic] Truly new edges to add: {len(new_edges)}") |
|
|
| |
| nxg_base.add_nodes_from(nxg_delta.nodes(data=True)) |
| nxg_base.add_edges_from(nxg_delta.edges(data=True)) |
| graph_override = nxg_base |
| inserted_any = True |
|
|
| |
| base_nodes_after = set(nxg_base.nodes()) |
| base_edges_after = set(nxg_base.edges()) |
| print(f"\n[Diagnostic] Base graph nodes after merge: {len(base_nodes_after)}") |
| print(f"[Diagnostic] Base graph edges after merge: {len(base_edges_after)}") |
| print(f"[Diagnostic] Nodes added: {len(base_nodes_after - base_nodes_before)}") |
| print(f"[Diagnostic] Edges added: {len(base_edges_after - base_edges_before)}") |
|
|
| |
| num_nodes = nxg_delta.number_of_nodes() |
| num_edges = nxg_delta.number_of_edges() |
|
|
| print(f"\n[Delta Graph Summary]") |
| print(f" - Total Nodes: {num_nodes}") |
| print(f" - Total Edges: {num_edges}") |
|
|
| |
| print("\n[Delta Graph Nodes] (showing up to 10):") |
| for i, (node, data) in enumerate(nxg_delta.nodes(data=True)): |
| if i >= 10: |
| print(" ...") |
| break |
| print(f" {i+1}. {node}: {data}") |
|
|
| |
| print("\n[Delta Graph Edges] (showing up to 10):") |
| for i, (source, target, data) in enumerate(nxg_delta.edges(data=True)): |
| if i >= 10: |
| print(" ...") |
| break |
| print(f" {i+1}. {source} -> {target}: {data}") |
|
|
| |
| if len(new_nodes) == 0 and len(new_edges) == 0: |
| print("[Warning] All delta nodes/edges already existed in the base graph. No actual change.") |
|
|
| if info: |
| print("[Info] Merged delta KG into existing graph (override used for summaries).") |
| except Exception as e: |
| print(f"[Error] Fallback merge failed: {e}") |
| |
| |
|
|
| |
| if inserted_any and graph_override is None: |
| try: |
| with open(persist_file, "wb") as f: |
| pickle.dump(kg_index, f) |
| except Exception as e: |
| print(f"[Warn] Failed to persist updated KG: {e}") |
|
|
| |
|
|
| |
| if kg_index: |
| |
| if not graph_exists or inserted_any: |
| _build_and_index_community_summaries( |
| kg_index=kg_index, |
| pc=pc, |
| index=index, |
| llm=llm, |
| impacted_nodes=new_nodes.union(u for u, v in new_edges).union(v for u, v in new_edges), |
| info=True |
| ) |
|
|
| |
| try: |
| nxg = graph_override if graph_override is not None else kg_index.get_networkx_graph() |
| graph_dict = {} |
| for u, v, attrs in nxg.edges(data=True): |
| rel = attrs.get("label") or attrs.get("relationship") or "related_to" |
| if u not in graph_dict: |
| graph_dict[u] = [] |
| graph_dict[u].append([rel, v]) |
| output_file = "knowledge_graph.json" |
| with open(output_file, "w", encoding="utf-8") as f: |
| json.dump(graph_dict, f, indent=4, ensure_ascii=False) |
| if info: |
| print(f"[Info] Knowledge graph saved to {output_file}") |
| except Exception as e: |
| print(f"[Error] Failed to save knowledge graph: {e}") |
|
|
| |
| if (not graph_exists and llama_docs_all) or inserted_any: |
| |
| seen_sigs.update(new_sigs) |
| save_seen_sigs(seen_sigs) |
|
|
| return kg_index |
|
|
|
|
|
|