"""prepare.py Utilities to prepare documents and knowledge-graph artifacts for a RAG (Retrieval-Augmented Generation) pipeline. This module implements: - safe file loading for text-like files (UTF-8 tolerant) - dataset creation from a `context` directory using various loaders - chunking, embedding and upserting to Pinecone via `prepare_RAG` - building/updating a Knowledge Graph and generating hierarchical community summaries Main public functions: - create_dataset(directory_path: str) -> List[Document] - prepare_RAG(pinecone_API, index_name, ...) -> (index, pc, llm, documents) - build_knowledge_graph(documents, llm, pc, index, info=True) -> KnowledgeGraphIndex Note: many helper functions are nested; this docstring highlights the high-level purpose and responsibilities only. """ import os import pathlib import time import re from pinecone import Pinecone from langchain_mistralai import ChatMistralAI from langchain_openai import ChatOpenAI from langchain_core.messages import HumanMessage, SystemMessage from langchain.schema import Document from langchain_community.document_loaders import ( CSVLoader, PyPDFLoader, UnstructuredWordDocumentLoader, UnstructuredPowerPointLoader, UnstructuredMarkdownLoader, UnstructuredHTMLLoader, NotebookLoader ) from langchain_text_splitters import RecursiveCharacterTextSplitter from llama_index.core.memory import Memory import pickle import json from typing import List, Any from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, BaseMessage from typing import List, Any from pydantic import BaseModel, ValidationError memory = Memory(token_limit=2048) # ------------------------- # UTF-8 safe Text Loader # ------------------------- class SafeTextLoader: """Loads a text file as a single Document, safely handling UTF-8 decoding errors.""" def __init__(self, file_path): self.file_path = file_path def load(self): """Load the file and return a list containing a single LangChain `Document`. The loader is UTF-8 tolerant: it reads raw bytes and decodes using UTF-8 with 'ignore' on errors to avoid failing on files containing invalid sequences. Returns: List[Document]: a list with one Document (page_content and metadata['source']) or an empty list on error. """ try: with open(self.file_path, "rb") as f: raw_bytes = f.read() text = raw_bytes.decode("utf-8", errors="ignore") return [Document(page_content=text, metadata={"source": str(self.file_path)})] except Exception as e: print(f"[Error] Failed to read {self.file_path}: {e}") return [] # ------------------------- # Loader mapping # ------------------------- LOADER_MAPPING = { ".txt": SafeTextLoader, ".json": SafeTextLoader, ".md": UnstructuredMarkdownLoader, ".csv": CSVLoader, ".yaml": SafeTextLoader, ".yml": SafeTextLoader, ".pdf": PyPDFLoader, ".docx": UnstructuredWordDocumentLoader, ".pptx": UnstructuredPowerPointLoader, ".html": UnstructuredHTMLLoader, ".htm": UnstructuredHTMLLoader, ".ipynb": NotebookLoader, ".py": SafeTextLoader, ".js": SafeTextLoader, ".sql": SafeTextLoader, } CONTEXT_ROOT = pathlib.Path(__file__).parent / "context" def create_dataset(directory_path: str = "context"): """Recursively load files under `directory_path` using extension-specific loaders.""" target_dir = pathlib.Path(directory_path).resolve() if not target_dir.exists() or not target_dir.is_dir(): print(f"[Error] Target directory does not exist: {target_dir}") return [] documents = [] for file_path in target_dir.rglob("*"): if not file_path.is_file(): continue ext = file_path.suffix.lower() loader_cls = LOADER_MAPPING.get(ext) if loader_cls is None: print(f"[Skip] Unsupported file type: {file_path}") continue try: loader = loader_cls(str(file_path)) docs = loader.load() documents.extend(docs) print(f"[Loaded] {file_path} ({len(docs)} docs)") except Exception as e: print(f"[Error] Failed to load {file_path}: {e}") print(f"[Done] Finished scanning {target_dir}") print(f"Total documents loaded: {len(documents)}") return documents from llama_index.core import KnowledgeGraphIndex from llama_index.core import Document as LlamaDocument import hashlib def fetch_existing_ids(index, namespace, ids, batch_size=100): """Fetch IDs from Pinecone in safe batches to avoid URI too large errors""" existing_ids = set() for start in range(0, len(ids), batch_size): batch_ids = ids[start:start + batch_size] result = index.fetch(ids=batch_ids, namespace=namespace) if hasattr(result, "vectors"): existing_ids.update(result.vectors.keys()) return existing_ids # ------------------------- # Prepare RAG # ------------------------- import hashlib import time from langchain_text_splitters import RecursiveCharacterTextSplitter from llama_index.core import Document as LlamaDocument from pinecone import Pinecone import os import re import time import hashlib from langchain_openai import ChatOpenAI from langchain_mistralai import ChatMistralAI from langchain_text_splitters import RecursiveCharacterTextSplitter from pinecone import Pinecone from llama_index.core import Document as LlamaDocument # You are assumed to already have: # - create_dataset(dir_name) # - fetch_existing_ids(index, namespace, all_ids, batch_size) # ------------------------- # Internal helper: build & upsert community summaries (incremental inside; same signature) # ------------------------- def _build_and_index_community_summaries( kg_index, pc, index, llm, impacted_nodes=None, info=True, ): """ This function implements a hierarchical community detection and summarization pipeline: 1. COMMUNITY DETECTION: - Uses NetworkX's greedy_modularity_communities to find natural clusters in the KG - Filters communities by minimum size (COMMUNITY_MIN_SIZE) to avoid noise 2. HIERARCHY CONSTRUCTION: - Builds a multi-level tree structure (max depth = MAX_HIERARCHY_DEPTH) - Recursively splits large communities using the same modularity algorithm - Creates parent-child relationships between community levels 3. AFFECTED NODE TRACKING: - Marks communities as "_affected" if they contain new/updated nodes - Propagates affected status upward to parent communities - Enables incremental updates by only processing changed regions 4. BOTTOM-UP SUMMARIZATION: - Leaf communities: Generate detailed reports from entity relationships - Parent communities: Synthesize child summaries into higher-level overviews - Uses sampling (LIMIT_NODES_PER_SUMMARY) to handle large communities 5. VECTOR STORAGE: - Creates stable IDs using SHA-256 hashes of community composition - Embeds summaries using Pinecone's llama-text-embed-v2 model - Stores in dedicated "community-summaries" namespace """ import hashlib import networkx as nx from networkx.algorithms.community import greedy_modularity_communities COMMUNITY_NAMESPACE = "community-summaries" COMMUNITY_MIN_SIZE = 3 MAX_HIERARCHY_DEPTH = 2 LIMIT_NODES_PER_SUMMARY = 60 LIMIT_TRIPLES_PER_SUMMARY = 120 try: nxg = kg_index.get_networkx_graph() except Exception as e: print(f"[Error] Unable to extract NetworkX graph from KG: {e}") return if nxg.number_of_nodes() == 0 or nxg.number_of_edges() == 0: if info: print("[Community] KG empty or trivial; skipping community summarization.") return first_run = impacted_nodes is None impacted_nodes = set(impacted_nodes or []) if info: print(f"[Community] Starting summarization. First run: {first_run}") print(f"[Community] Impacted nodes: {len(impacted_nodes)}") # ---- community detection ---- if info: print("[Community] Detecting top-level communities (greedy modularity)...") try: communities = list(greedy_modularity_communities(nxg)) except Exception as e: print(f"[Error] Community detection failed: {e}") return large_communities = [c for c in communities if len(c) >= max(2, COMMUNITY_MIN_SIZE)] small_communities = [c for c in communities if len(c) < max(2, COMMUNITY_MIN_SIZE)] if info: print(f"[Community] Found {len(communities)} communities; " f"{len(large_communities)} large, {len(small_communities)} small.") # ---- build hierarchy and mark affected ---- hierarchy = [] for idx, comm in enumerate(large_communities): subgraph = nxg.subgraph(comm).copy() node_set = set(subgraph.nodes()) node = { "id": f"C{idx}", "level": 0, "nodes": node_set, "children": [], "_affected": first_run or bool(impacted_nodes & node_set), } # simple frontier-based recursive splitting frontier = [(node, subgraph, 1)] while frontier: parent, g, depth = frontier.pop() if depth > MAX_HIERARCHY_DEPTH or g.number_of_nodes() < max(2, COMMUNITY_MIN_SIZE * 2): continue try: subs = list(greedy_modularity_communities(g)) except Exception: subs = [] subs = [s for s in subs if 1 <= len(s) <= len(g) - 1] subs = [s for s in subs if len(s) >= max(2, COMMUNITY_MIN_SIZE)] for j, s in enumerate(subs): sg = g.subgraph(s).copy() child = { "id": f"{parent['id']}.{j}", "level": depth, "nodes": set(s), "children": [], "_affected": first_run or bool(impacted_nodes & set(s)), } parent["children"].append(child) if depth + 1 <= MAX_HIERARCHY_DEPTH and sg.number_of_nodes() >= max(2, COMMUNITY_MIN_SIZE * 2): frontier.append((child, sg, depth + 1)) hierarchy.append(node) # propagate affected upward def mark_ancestors(n): any_child = False for c in n["children"]: if mark_ancestors(c): any_child = True if any_child: n["_affected"] = True return n["_affected"] for root in hierarchy: mark_ancestors(root) # ---- summarization helpers ---- def triples_within(node_ids, graph): res = [] for (u, v, data) in graph.edges(data=True): if u in node_ids and v in node_ids: rel = data.get("label") or data.get("relationship") or "related_to" res.append((u, rel, v)) return res def sample_for_prompt(nodes_set, triples_list, max_nodes=LIMIT_NODES_PER_SUMMARY, max_triples=LIMIT_TRIPLES_PER_SUMMARY): nodes_list = list(nodes_set)[:max_nodes] triples_list = triples_list[:max_triples] return nodes_list, triples_list def summarize_leaf(nodes_set, graph): nodes_list, tri_list = sample_for_prompt( nodes_set, triples_within(nodes_set, graph) ) prompt = ( "You are creating a concise community report from a knowledge graph.\n" "Given the following entity list and intra-community relationships, produce:\n" " - Title\n" " - Key Themes (bullet points)\n" " - Notable Entities\n" " - Important Relationships (summarize patterns rather than listing all)\n" " - Outliers or Cross-links (if any)\n" " - 3-5 Answerable Questions this community can address\n" "Keep it under ~250-300 words.\n\n" f"Entities (sample): {nodes_list}\n" f"Relationships (sample triples): {[f'{u} --[{r}]--> {v}' for (u,r,v) in tri_list]}\n" ) resp = llm.invoke(prompt) return resp.content.strip() def summarize_parent(child_summaries): join_text = "\n\n".join([f"[Child {i+1}]\n{txt}" for i, txt in enumerate(child_summaries)]) prompt = ( "You are creating a higher-level summary that unifies several community reports.\n" "Synthesize the following child community reports into a coherent parent-level summary:\n" " - Overarching Title\n" " - Cross-community Key Themes\n" " - How the sub-communities relate and differ\n" " - Cross-cutting entities/relationships\n" " - 3-5 high-level questions the parent community can answer\n" "Target length: 250-350 words.\n\n" f"{join_text}\n" ) resp = llm.invoke(prompt) return resp.content.strip() # bottom-up, only affected subtrees def build_summaries(node, graph): if not node["_affected"]: return None if not node["children"]: node["summary"] = summarize_leaf(node["nodes"], graph) return node["summary"] child_summaries = [] for ch in node["children"]: s = build_summaries(ch, graph) if s is not None: child_summaries.append(s) if child_summaries: node["summary"] = summarize_parent(child_summaries) return node["summary"] node["summary"] = summarize_leaf(node["nodes"], graph) return node["summary"] for node in hierarchy: build_summaries(node, nxg) # flatten affected nodes w/ new summaries flat_nodes = [] def flatten(n): if n.get("_affected") and "summary" in n: flat_nodes.append({ "id": n["id"], "level": n["level"], "size": len(n["nodes"]), "nodes": list(n["nodes"]), "summary": n["summary"] }) for c in n["children"]: flatten(c) for n in hierarchy: flatten(n) if not flat_nodes: if info: print("[Community] No affected community summaries to upsert.") return if info: print(f"[Community] Upserting {len(flat_nodes)} community summaries to namespace: {COMMUNITY_NAMESPACE}") def summary_vec_id(node_rec): key = f"{node_rec['id']}|{node_rec['level']}|{','.join(sorted(node_rec['nodes'])[:20])}" return "comm_" + hashlib.sha256(key.encode("utf-8")).hexdigest()[:24] # batch embed + upsert B = 96 texts = [rec["summary"] for rec in flat_nodes] ids = [summary_vec_id(rec) for rec in flat_nodes] metas = [{ "type": "community_summary", "community_id": rec["id"], "level": rec["level"], "size": rec["size"], "node_sample": rec["nodes"][:20], "text": rec["summary"] } for rec in flat_nodes] for start in range(0, len(texts), B): batch_texts = texts[start:start+B] batch_ids = ids[start:start+B] batch_metas = metas[start:start+B] emb = pc.inference.embed( model="llama-text-embed-v2", inputs=batch_texts, parameters={"input_type": "passage", "truncate": "END"} ) vectors = [ {"id": vid, "values": e["values"], "metadata": meta} for vid, e, meta in zip(batch_ids, emb, batch_metas) ] index.upsert(vectors=vectors, namespace=COMMUNITY_NAMESPACE) if info: print("[Community] Community summaries upsert completed.") def prepare_RAG( pinecone_API, index_name, chunk_size=400, chunk_overlap=30, llm_model="gpt-4.1-nano", dir_name="context", info=True ): """ Steps: 1) Select LLM wrapper (OpenAI vs. Mistral) by `llm_model` string. 2) Create dataset with `create_dataset(dir_name)`. 3) Connect to Pinecone and obtain `index`. 4) Split documents into chunks; normalize `metadata['source']` to be path-relative to the `context` anchor (stable across machines). 5) Compute stable vector IDs per chunk from source+content hashes. 6) Use `fetch_existing_ids` to identify and skip already-indexed chunks. 7) Embed only new chunks via `pc.inference.embed` (retry with backoff). 8) Upsert embeddings and metadata into a fixed namespace (`example-namespace`). """ import os, re, hashlib, time from pinecone import Pinecone from langchain_mistralai import ChatMistralAI from langchain_openai import ChatOpenAI from langchain_text_splitters import RecursiveCharacterTextSplitter if info: print(f"Preparing RAG with LLM: {llm_model}, Index: {index_name}, Dir: {dir_name}") llm = ChatOpenAI(model=llm_model, streaming=True) if "gpt" in llm_model else ChatMistralAI(model=llm_model, streaming=True) documents = create_dataset(dir_name) pc = Pinecone(api_key=pinecone_API) index = pc.Index(index_name) if not documents: print(f"[Warning] No documents found. Using existing Pinecone index.") return index, pc, llm, None splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) all_splits = splitter.split_documents(documents) def path_after_context(full_path: str, anchor: str = "context") -> str: if not full_path: return "" parts = re.split(r"[\\/]+", str(full_path)) idx = None for i, part in enumerate(parts): if part.lower() == anchor.lower(): idx = i if idx is not None and idx < len(parts) - 1: return "/".join(parts[idx + 1 :]) return os.path.basename(str(full_path)) for chunk in all_splits: if "source" in chunk.metadata and chunk.metadata["source"]: chunk.metadata["source"] = path_after_context(chunk.metadata["source"], anchor="context") if info: print(f"Total chunks: {len(all_splits)}") def chunk_id(chunk, prefix="vec"): text_hash = hashlib.sha256(chunk.page_content.encode("utf-8")).hexdigest()[:16] source = chunk.metadata.get("source", "unknown") file_hash = hashlib.sha256(source.encode("utf-8")).hexdigest()[:8] return f"{prefix}_{file_hash}_{text_hash}" # ------------------------- # Vector ID & Namespace Architecture # ------------------------- """ VECTOR ID GENERATION STRATEGY: For Document Chunks: Pattern: "vec_{file_hash}_{content_hash}" - file_hash: SHA-256 of normalized source path (8 chars) - content_hash: SHA-256 of chunk content (16 chars) - Enables exact duplicate detection across runs - Stable across different machine paths due to source normalization For Community Summaries: Pattern: "comm_{community_hash}" - community_hash: SHA-256 of "community_id|level|sorted_node_sample" - Ensures stable IDs for the same community composition - Allows updates when community structure changes NAMESPACE STRATEGY: - "example-namespace": Stores document chunk embeddings - "community-summaries": Stores hierarchical community summaries - Separation enables independent update/query strategies - Prevents interference between document and summary vectors IDEMPOTENCY GUARANTEE: - fetch_existing_ids() checks Pinecone before embedding - Prevents duplicate embeddings for identical content - Enables safe re-runs without data duplication - Reduces embedding costs and storage usage """ namespace = "example-namespace" all_ids = [chunk_id(c) for c in all_splits] existing = fetch_existing_ids(index, namespace, all_ids, batch_size=100) new_chunks = [(c, i) for c, i in zip(all_splits, all_ids) if i not in existing] if info: print(f"Chunks already indexed: {len(all_splits) - len(new_chunks)}") print(f"New chunks to embed: {len(new_chunks)}") if not new_chunks: print("[Info] Nothing new to index. Skipping embedding/upsert.") else: batch_size = 94 def retry_forever(func, *args, **kwargs): attempt = 1 while True: try: return func(*args, **kwargs) except Exception as e: wait = min(60, 2 ** min(attempt, 6)) print(f"[Retry] {func.__name__} failed (attempt {attempt}): {e}. Sleeping {wait}s") time.sleep(wait) attempt += 1 for start_idx in range(0, len(new_chunks), batch_size): print(f"[Info] Embedding and upserting batch {start_idx // batch_size + 1}...") batch, ids = zip(*new_chunks[start_idx:start_idx + batch_size]) texts = [chunk.page_content for chunk in batch] metas = [chunk.metadata or {} for chunk in batch] embeddings = retry_forever( pc.inference.embed, model="llama-text-embed-v2", inputs=texts, parameters={"input_type": "passage", "truncate": "END"} ) batch_records = [ {"id": i, "values": e['values'], "metadata": {"text": t, **m}} for i, e, t, m in zip(ids, embeddings, texts, metas) ] retry_forever(index.upsert, vectors=batch_records, namespace=namespace) if info: print(f"Completed upsert of {len(new_chunks)} new vectors.") return index, pc, llm, documents # Return documents for KG construction def build_knowledge_graph(documents, llm, pc, index, info=True): """ Build/update the Knowledge Graph (KG) from documents, persist it, merge deltas, and (re)generate community summaries for changed regions. Args: documents: List of LangChain Documents from prepare_RAG (may be empty). llm: LangChain-compatible LLM used via LlamaIndex. pc: Pinecone client (for embeddings). index: Pinecone index to store community summary vectors. info: Enable verbose logging. Returns: KnowledgeGraphIndex | None Flow: 1) Identify new/changed docs via source+content hashing (seen file cache). 2) Load existing KG from pickle or build a fresh one. 3) If there is a delta, build a delta KG and merge nodes/edges. 4) Summarize impacted communities and upsert summaries to Pinecone. 5) Export `knowledge_graph.json` and update the seen-file signatures. """ import os, pickle, json, hashlib, re from llama_index.core import Document, KnowledgeGraphIndex from llama_index.llms.langchain import LangChainLLM # ---- duplicate detection "like in prepare_RAG" (signature unchanged) ---- def path_after_context(full_path: str, anchor: str = "context") -> str: if not full_path: return "" parts = re.split(r"[\\/]+", str(full_path)) idx = None for i, part in enumerate(parts): if part.lower() == anchor.lower(): idx = i if idx is not None and idx < len(parts) - 1: return "/".join(parts[idx + 1 :]) return os.path.basename(str(full_path)) def file_sig(doc_like): """Return (sig_id, normalized_source) using source+content hashing similar to prepare_RAG.""" meta = getattr(doc_like, "metadata", {}) or {} text = getattr(doc_like, "page_content", "") or getattr(doc_like, "text", "") or "" src = meta.get("source", "unknown") if src: src = path_after_context(src, anchor="context") src_hash = hashlib.sha256(src.encode("utf-8")).hexdigest()[:8] text_hash = hashlib.sha256(text.encode("utf-8")).hexdigest()[:16] return f"kg_{src_hash}_{text_hash}", src def load_seen_sigs(path="kg_seen_files.json"): try: if os.path.exists(path): with open(path, "r", encoding="utf-8") as f: data = json.load(f) return set(data if isinstance(data, list) else []) except Exception as e: print(f"[Warn] Failed to load seen file sigs: {e}") return set() def save_seen_sigs(sigs, path="kg_seen_files.json"): try: with open(path, "w", encoding="utf-8") as f: json.dump(sorted(list(sigs)), f, indent=2) except Exception as e: print(f"[Warn] Failed to save seen file sigs: {e}") # ------------------------- # Incremental KG Update Strategy # ------------------------- """ CRITICAL: This section handles the complex merge of new documents into existing knowledge graphs. KEY CHALLENGES ADDRESSED: - Duplicate Detection: Uses content+source hashing to identify truly new/changed documents - Delta Processing: Builds partial KG from only new documents, then merges - Conflict Resolution: Handles nodes/edges that may already exist in the base graph - Change Propagation: Tracks exactly which nodes/edges are new for community summarization MERGE STRATEGY: 1. Signature-based filtering identifies only new/changed documents 2. Builds a "delta KG" from new documents only 3. Performs set operations to find truly new nodes/edges: - new_nodes = delta_nodes - base_nodes - new_edges = delta_edges - base_edges 4. Merges using NetworkX's native add_nodes_from/add_edges_from 5. Preserves all node/edge attributes during merge WHY THIS MATTERS: - Without proper incremental updates, the system would rebuild the entire KG every time - Enables efficient updates when only a few documents change - Maintains community summaries for unchanged parts of the graph """ seen_sigs = load_seen_sigs() # Identify new/changed docs by signature all_docs = documents or [] new_docs = [] new_sigs = [] for d in all_docs: sig, _ = file_sig(d) if sig not in seen_sigs: new_docs.append(d) new_sigs.append(sig) if info: print(f"[KG] Total input docs: {len(all_docs)} | New/changed docs detected: {len(new_docs)}") # ---- prepare LlamaIndex objects ---- llama_docs_all = [Document(text=doc.page_content, metadata=doc.metadata) for doc in all_docs] llama_docs_delta = [Document(text=doc.page_content, metadata=doc.metadata) for doc in new_docs] llm_for_kg = LangChainLLM(llm) persist_file = os.path.abspath("./kg_index.pkl") def _build_and_persist(docs): kg = KnowledgeGraphIndex.from_documents( documents=docs, max_triplets_per_chunk=20, extract_relations=True, include_embeddings=True, llm=llm_for_kg ) with open(persist_file, "wb") as f: pickle.dump(kg, f) return kg def _load_existing(): with open(persist_file, "rb") as f: return pickle.load(f) kg_index = None graph_exists = False try: if os.path.exists(persist_file): if info: print("[Info] Found persisted KG pickle file.") graph_exists = True kg_index = _load_existing() if info: print("[Info] Loaded Knowledge Graph from pickle.") elif llama_docs_all: if info: print("[Info] No persisted KG found. Building new KG from all documents...") kg_index = _build_and_persist(llama_docs_all) if info: print("[Info] Built and persisted Knowledge Graph via pickle.") else: if info: print("[Info] No persisted KG found and no documents to build from.") except Exception as e: print(f"[Error] GraphRAG init/load failed: {e}") kg_index = None # ---- incremental insertion (signature unchanged) ---- inserted_any = False graph_override = None # if we need merge fallback for community detection new_nodes = set() new_edges = set() if kg_index and graph_exists and llama_docs_delta: if info: print(f"[Info] Incrementally inserting {len(llama_docs_delta)} new/changed docs into KG...") ###################################################################### try: # Build delta KG from new/changed docs kg_delta = KnowledgeGraphIndex.from_documents( documents=llama_docs_delta, max_triplets_per_chunk=20, extract_relations=True, include_embeddings=False, llm=llm_for_kg ) nxg_base = kg_index.get_networkx_graph() nxg_delta = kg_delta.get_networkx_graph() # Diagnostic: Print node/edge sets before merge base_nodes_before = set(nxg_base.nodes()) base_edges_before = set(nxg_base.edges()) delta_nodes = set(nxg_delta.nodes()) delta_edges = set(nxg_delta.edges()) print(f"\n[Diagnostic] Base graph nodes before merge: {len(base_nodes_before)}") print(f"[Diagnostic] Base graph edges before merge: {len(base_edges_before)}") print(f"[Diagnostic] Delta graph nodes: {len(delta_nodes)}") print(f"[Diagnostic] Delta graph edges: {len(delta_edges)}") # Show intersection and difference new_nodes = delta_nodes - base_nodes_before new_edges = delta_edges - base_edges_before already_existing_nodes = delta_nodes & base_nodes_before already_existing_edges = delta_edges & base_edges_before print(f"[Diagnostic] Delta nodes already in base: {len(already_existing_nodes)}") print(f"[Diagnostic] Delta edges already in base: {len(already_existing_edges)}") print(f"[Diagnostic] Truly new nodes to add: {len(new_nodes)}") print(f"[Diagnostic] Truly new edges to add: {len(new_edges)}") # Merge delta into base nxg_base.add_nodes_from(nxg_delta.nodes(data=True)) nxg_base.add_edges_from(nxg_delta.edges(data=True)) graph_override = nxg_base inserted_any = True # Diagnostic: Print node/edge sets after merge base_nodes_after = set(nxg_base.nodes()) base_edges_after = set(nxg_base.edges()) print(f"\n[Diagnostic] Base graph nodes after merge: {len(base_nodes_after)}") print(f"[Diagnostic] Base graph edges after merge: {len(base_edges_after)}") print(f"[Diagnostic] Nodes added: {len(base_nodes_after - base_nodes_before)}") print(f"[Diagnostic] Edges added: {len(base_edges_after - base_edges_before)}") # Print delta graph summary num_nodes = nxg_delta.number_of_nodes() num_edges = nxg_delta.number_of_edges() print(f"\n[Delta Graph Summary]") print(f" - Total Nodes: {num_nodes}") print(f" - Total Edges: {num_edges}") # Print first 10 nodes print("\n[Delta Graph Nodes] (showing up to 10):") for i, (node, data) in enumerate(nxg_delta.nodes(data=True)): if i >= 10: print(" ...") break print(f" {i+1}. {node}: {data}") # Print first 10 edges print("\n[Delta Graph Edges] (showing up to 10):") for i, (source, target, data) in enumerate(nxg_delta.edges(data=True)): if i >= 10: print(" ...") break print(f" {i+1}. {source} -> {target}: {data}") # Warn if nothing new was actually added if len(new_nodes) == 0 and len(new_edges) == 0: print("[Warning] All delta nodes/edges already existed in the base graph. No actual change.") if info: print("[Info] Merged delta KG into existing graph (override used for summaries).") except Exception as e: print(f"[Error] Fallback merge failed: {e}") ###################################################################### # Persist KG if mutated via API if inserted_any and graph_override is None: try: with open(persist_file, "wb") as f: pickle.dump(kg_index, f) except Exception as e: print(f"[Warn] Failed to persist updated KG: {e}") # First-time build already happened above (graph_exists==False and llama_docs_all not empty) # ---- Community summaries (incremental occurs inside the helper; same signature) ---- if kg_index: # Only trigger summaries when: first build or we actually inserted/merged something if not graph_exists or inserted_any: _build_and_index_community_summaries( kg_index=kg_index, pc=pc, index=index, llm=llm, impacted_nodes=new_nodes.union(u for u, v in new_edges).union(v for u, v in new_edges), info=True ) # Optional: save graph for visualization (post-update) try: nxg = graph_override if graph_override is not None else kg_index.get_networkx_graph() graph_dict = {} for u, v, attrs in nxg.edges(data=True): rel = attrs.get("label") or attrs.get("relationship") or "related_to" if u not in graph_dict: graph_dict[u] = [] graph_dict[u].append([rel, v]) output_file = "knowledge_graph.json" with open(output_file, "w", encoding="utf-8") as f: json.dump(graph_dict, f, indent=4, ensure_ascii=False) if info: print(f"[Info] Knowledge graph saved to {output_file}") except Exception as e: print(f"[Error] Failed to save knowledge graph: {e}") # ---- mark seen signatures only after successful insertion or first build ---- if (not graph_exists and llama_docs_all) or inserted_any: # Add only the new ones we processed this run seen_sigs.update(new_sigs) save_seen_sigs(seen_sigs) return kg_index