Nadezhda Komarova
first commit
4be6b01
"""prepare.py
Utilities to prepare documents and knowledge-graph artifacts for a RAG (Retrieval-Augmented
Generation) pipeline.
This module implements:
- safe file loading for text-like files (UTF-8 tolerant)
- dataset creation from a `context` directory using various loaders
- chunking, embedding and upserting to Pinecone via `prepare_RAG`
- building/updating a Knowledge Graph and generating hierarchical community summaries
Main public functions:
- create_dataset(directory_path: str) -> List[Document]
- prepare_RAG(pinecone_API, index_name, ...) -> (index, pc, llm, documents)
- build_knowledge_graph(documents, llm, pc, index, info=True) -> KnowledgeGraphIndex
Note: many helper functions are nested; this docstring highlights the high-level
purpose and responsibilities only.
"""
import os
import pathlib
import time
import re
from pinecone import Pinecone
from langchain_mistralai import ChatMistralAI
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langchain.schema import Document
from langchain_community.document_loaders import (
CSVLoader, PyPDFLoader, UnstructuredWordDocumentLoader,
UnstructuredPowerPointLoader, UnstructuredMarkdownLoader,
UnstructuredHTMLLoader, NotebookLoader
)
from langchain_text_splitters import RecursiveCharacterTextSplitter
from llama_index.core.memory import Memory
import pickle
import json
from typing import List, Any
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, BaseMessage
from typing import List, Any
from pydantic import BaseModel, ValidationError
memory = Memory(token_limit=2048)
# -------------------------
# UTF-8 safe Text Loader
# -------------------------
class SafeTextLoader:
"""Loads a text file as a single Document, safely handling UTF-8 decoding errors."""
def __init__(self, file_path):
self.file_path = file_path
def load(self):
"""Load the file and return a list containing a single LangChain `Document`.
The loader is UTF-8 tolerant: it reads raw bytes and decodes using UTF-8 with
'ignore' on errors to avoid failing on files containing invalid sequences.
Returns:
List[Document]: a list with one Document (page_content and metadata['source'])
or an empty list on error.
"""
try:
with open(self.file_path, "rb") as f:
raw_bytes = f.read()
text = raw_bytes.decode("utf-8", errors="ignore")
return [Document(page_content=text, metadata={"source": str(self.file_path)})]
except Exception as e:
print(f"[Error] Failed to read {self.file_path}: {e}")
return []
# -------------------------
# Loader mapping
# -------------------------
LOADER_MAPPING = {
".txt": SafeTextLoader,
".json": SafeTextLoader,
".md": UnstructuredMarkdownLoader,
".csv": CSVLoader,
".yaml": SafeTextLoader,
".yml": SafeTextLoader,
".pdf": PyPDFLoader,
".docx": UnstructuredWordDocumentLoader,
".pptx": UnstructuredPowerPointLoader,
".html": UnstructuredHTMLLoader,
".htm": UnstructuredHTMLLoader,
".ipynb": NotebookLoader,
".py": SafeTextLoader,
".js": SafeTextLoader,
".sql": SafeTextLoader,
}
CONTEXT_ROOT = pathlib.Path(__file__).parent / "context"
def create_dataset(directory_path: str = "context"):
"""Recursively load files under `directory_path` using extension-specific loaders."""
target_dir = pathlib.Path(directory_path).resolve()
if not target_dir.exists() or not target_dir.is_dir():
print(f"[Error] Target directory does not exist: {target_dir}")
return []
documents = []
for file_path in target_dir.rglob("*"):
if not file_path.is_file():
continue
ext = file_path.suffix.lower()
loader_cls = LOADER_MAPPING.get(ext)
if loader_cls is None:
print(f"[Skip] Unsupported file type: {file_path}")
continue
try:
loader = loader_cls(str(file_path))
docs = loader.load()
documents.extend(docs)
print(f"[Loaded] {file_path} ({len(docs)} docs)")
except Exception as e:
print(f"[Error] Failed to load {file_path}: {e}")
print(f"[Done] Finished scanning {target_dir}")
print(f"Total documents loaded: {len(documents)}")
return documents
from llama_index.core import KnowledgeGraphIndex
from llama_index.core import Document as LlamaDocument
import hashlib
def fetch_existing_ids(index, namespace, ids, batch_size=100):
"""Fetch IDs from Pinecone in safe batches to avoid URI too large errors"""
existing_ids = set()
for start in range(0, len(ids), batch_size):
batch_ids = ids[start:start + batch_size]
result = index.fetch(ids=batch_ids, namespace=namespace)
if hasattr(result, "vectors"):
existing_ids.update(result.vectors.keys())
return existing_ids
# -------------------------
# Prepare RAG
# -------------------------
import hashlib
import time
from langchain_text_splitters import RecursiveCharacterTextSplitter
from llama_index.core import Document as LlamaDocument
from pinecone import Pinecone
import os
import re
import time
import hashlib
from langchain_openai import ChatOpenAI
from langchain_mistralai import ChatMistralAI
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pinecone import Pinecone
from llama_index.core import Document as LlamaDocument
# You are assumed to already have:
# - create_dataset(dir_name)
# - fetch_existing_ids(index, namespace, all_ids, batch_size)
# -------------------------
# Internal helper: build & upsert community summaries (incremental inside; same signature)
# -------------------------
def _build_and_index_community_summaries(
kg_index,
pc,
index,
llm,
impacted_nodes=None,
info=True,
):
"""
This function implements a hierarchical community detection and summarization pipeline:
1. COMMUNITY DETECTION:
- Uses NetworkX's greedy_modularity_communities to find natural clusters in the KG
- Filters communities by minimum size (COMMUNITY_MIN_SIZE) to avoid noise
2. HIERARCHY CONSTRUCTION:
- Builds a multi-level tree structure (max depth = MAX_HIERARCHY_DEPTH)
- Recursively splits large communities using the same modularity algorithm
- Creates parent-child relationships between community levels
3. AFFECTED NODE TRACKING:
- Marks communities as "_affected" if they contain new/updated nodes
- Propagates affected status upward to parent communities
- Enables incremental updates by only processing changed regions
4. BOTTOM-UP SUMMARIZATION:
- Leaf communities: Generate detailed reports from entity relationships
- Parent communities: Synthesize child summaries into higher-level overviews
- Uses sampling (LIMIT_NODES_PER_SUMMARY) to handle large communities
5. VECTOR STORAGE:
- Creates stable IDs using SHA-256 hashes of community composition
- Embeds summaries using Pinecone's llama-text-embed-v2 model
- Stores in dedicated "community-summaries" namespace
"""
import hashlib
import networkx as nx
from networkx.algorithms.community import greedy_modularity_communities
COMMUNITY_NAMESPACE = "community-summaries"
COMMUNITY_MIN_SIZE = 3
MAX_HIERARCHY_DEPTH = 2
LIMIT_NODES_PER_SUMMARY = 60
LIMIT_TRIPLES_PER_SUMMARY = 120
try:
nxg = kg_index.get_networkx_graph()
except Exception as e:
print(f"[Error] Unable to extract NetworkX graph from KG: {e}")
return
if nxg.number_of_nodes() == 0 or nxg.number_of_edges() == 0:
if info:
print("[Community] KG empty or trivial; skipping community summarization.")
return
first_run = impacted_nodes is None
impacted_nodes = set(impacted_nodes or [])
if info:
print(f"[Community] Starting summarization. First run: {first_run}")
print(f"[Community] Impacted nodes: {len(impacted_nodes)}")
# ---- community detection ----
if info:
print("[Community] Detecting top-level communities (greedy modularity)...")
try:
communities = list(greedy_modularity_communities(nxg))
except Exception as e:
print(f"[Error] Community detection failed: {e}")
return
large_communities = [c for c in communities if len(c) >= max(2, COMMUNITY_MIN_SIZE)]
small_communities = [c for c in communities if len(c) < max(2, COMMUNITY_MIN_SIZE)]
if info:
print(f"[Community] Found {len(communities)} communities; "
f"{len(large_communities)} large, {len(small_communities)} small.")
# ---- build hierarchy and mark affected ----
hierarchy = []
for idx, comm in enumerate(large_communities):
subgraph = nxg.subgraph(comm).copy()
node_set = set(subgraph.nodes())
node = {
"id": f"C{idx}",
"level": 0,
"nodes": node_set,
"children": [],
"_affected": first_run or bool(impacted_nodes & node_set),
}
# simple frontier-based recursive splitting
frontier = [(node, subgraph, 1)]
while frontier:
parent, g, depth = frontier.pop()
if depth > MAX_HIERARCHY_DEPTH or g.number_of_nodes() < max(2, COMMUNITY_MIN_SIZE * 2):
continue
try:
subs = list(greedy_modularity_communities(g))
except Exception:
subs = []
subs = [s for s in subs if 1 <= len(s) <= len(g) - 1]
subs = [s for s in subs if len(s) >= max(2, COMMUNITY_MIN_SIZE)]
for j, s in enumerate(subs):
sg = g.subgraph(s).copy()
child = {
"id": f"{parent['id']}.{j}",
"level": depth,
"nodes": set(s),
"children": [],
"_affected": first_run or bool(impacted_nodes & set(s)),
}
parent["children"].append(child)
if depth + 1 <= MAX_HIERARCHY_DEPTH and sg.number_of_nodes() >= max(2, COMMUNITY_MIN_SIZE * 2):
frontier.append((child, sg, depth + 1))
hierarchy.append(node)
# propagate affected upward
def mark_ancestors(n):
any_child = False
for c in n["children"]:
if mark_ancestors(c):
any_child = True
if any_child:
n["_affected"] = True
return n["_affected"]
for root in hierarchy:
mark_ancestors(root)
# ---- summarization helpers ----
def triples_within(node_ids, graph):
res = []
for (u, v, data) in graph.edges(data=True):
if u in node_ids and v in node_ids:
rel = data.get("label") or data.get("relationship") or "related_to"
res.append((u, rel, v))
return res
def sample_for_prompt(nodes_set, triples_list, max_nodes=LIMIT_NODES_PER_SUMMARY, max_triples=LIMIT_TRIPLES_PER_SUMMARY):
nodes_list = list(nodes_set)[:max_nodes]
triples_list = triples_list[:max_triples]
return nodes_list, triples_list
def summarize_leaf(nodes_set, graph):
nodes_list, tri_list = sample_for_prompt(
nodes_set,
triples_within(nodes_set, graph)
)
prompt = (
"You are creating a concise community report from a knowledge graph.\n"
"Given the following entity list and intra-community relationships, produce:\n"
" - Title\n"
" - Key Themes (bullet points)\n"
" - Notable Entities\n"
" - Important Relationships (summarize patterns rather than listing all)\n"
" - Outliers or Cross-links (if any)\n"
" - 3-5 Answerable Questions this community can address\n"
"Keep it under ~250-300 words.\n\n"
f"Entities (sample): {nodes_list}\n"
f"Relationships (sample triples): {[f'{u} --[{r}]--> {v}' for (u,r,v) in tri_list]}\n"
)
resp = llm.invoke(prompt)
return resp.content.strip()
def summarize_parent(child_summaries):
join_text = "\n\n".join([f"[Child {i+1}]\n{txt}" for i, txt in enumerate(child_summaries)])
prompt = (
"You are creating a higher-level summary that unifies several community reports.\n"
"Synthesize the following child community reports into a coherent parent-level summary:\n"
" - Overarching Title\n"
" - Cross-community Key Themes\n"
" - How the sub-communities relate and differ\n"
" - Cross-cutting entities/relationships\n"
" - 3-5 high-level questions the parent community can answer\n"
"Target length: 250-350 words.\n\n"
f"{join_text}\n"
)
resp = llm.invoke(prompt)
return resp.content.strip()
# bottom-up, only affected subtrees
def build_summaries(node, graph):
if not node["_affected"]:
return None
if not node["children"]:
node["summary"] = summarize_leaf(node["nodes"], graph)
return node["summary"]
child_summaries = []
for ch in node["children"]:
s = build_summaries(ch, graph)
if s is not None:
child_summaries.append(s)
if child_summaries:
node["summary"] = summarize_parent(child_summaries)
return node["summary"]
node["summary"] = summarize_leaf(node["nodes"], graph)
return node["summary"]
for node in hierarchy:
build_summaries(node, nxg)
# flatten affected nodes w/ new summaries
flat_nodes = []
def flatten(n):
if n.get("_affected") and "summary" in n:
flat_nodes.append({
"id": n["id"],
"level": n["level"],
"size": len(n["nodes"]),
"nodes": list(n["nodes"]),
"summary": n["summary"]
})
for c in n["children"]:
flatten(c)
for n in hierarchy:
flatten(n)
if not flat_nodes:
if info:
print("[Community] No affected community summaries to upsert.")
return
if info:
print(f"[Community] Upserting {len(flat_nodes)} community summaries to namespace: {COMMUNITY_NAMESPACE}")
def summary_vec_id(node_rec):
key = f"{node_rec['id']}|{node_rec['level']}|{','.join(sorted(node_rec['nodes'])[:20])}"
return "comm_" + hashlib.sha256(key.encode("utf-8")).hexdigest()[:24]
# batch embed + upsert
B = 96
texts = [rec["summary"] for rec in flat_nodes]
ids = [summary_vec_id(rec) for rec in flat_nodes]
metas = [{
"type": "community_summary",
"community_id": rec["id"],
"level": rec["level"],
"size": rec["size"],
"node_sample": rec["nodes"][:20],
"text": rec["summary"]
} for rec in flat_nodes]
for start in range(0, len(texts), B):
batch_texts = texts[start:start+B]
batch_ids = ids[start:start+B]
batch_metas = metas[start:start+B]
emb = pc.inference.embed(
model="llama-text-embed-v2",
inputs=batch_texts,
parameters={"input_type": "passage", "truncate": "END"}
)
vectors = [
{"id": vid, "values": e["values"], "metadata": meta}
for vid, e, meta in zip(batch_ids, emb, batch_metas)
]
index.upsert(vectors=vectors, namespace=COMMUNITY_NAMESPACE)
if info:
print("[Community] Community summaries upsert completed.")
def prepare_RAG(
pinecone_API,
index_name,
chunk_size=400,
chunk_overlap=30,
llm_model="gpt-4.1-nano",
dir_name="context",
info=True
):
"""
Steps:
1) Select LLM wrapper (OpenAI vs. Mistral) by `llm_model` string.
2) Create dataset with `create_dataset(dir_name)`.
3) Connect to Pinecone and obtain `index`.
4) Split documents into chunks; normalize `metadata['source']` to be path-relative
to the `context` anchor (stable across machines).
5) Compute stable vector IDs per chunk from source+content hashes.
6) Use `fetch_existing_ids` to identify and skip already-indexed chunks.
7) Embed only new chunks via `pc.inference.embed` (retry with backoff).
8) Upsert embeddings and metadata into a fixed namespace (`example-namespace`).
"""
import os, re, hashlib, time
from pinecone import Pinecone
from langchain_mistralai import ChatMistralAI
from langchain_openai import ChatOpenAI
from langchain_text_splitters import RecursiveCharacterTextSplitter
if info:
print(f"Preparing RAG with LLM: {llm_model}, Index: {index_name}, Dir: {dir_name}")
llm = ChatOpenAI(model=llm_model, streaming=True) if "gpt" in llm_model else ChatMistralAI(model=llm_model, streaming=True)
documents = create_dataset(dir_name)
pc = Pinecone(api_key=pinecone_API)
index = pc.Index(index_name)
if not documents:
print(f"[Warning] No documents found. Using existing Pinecone index.")
return index, pc, llm, None
splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
all_splits = splitter.split_documents(documents)
def path_after_context(full_path: str, anchor: str = "context") -> str:
if not full_path:
return ""
parts = re.split(r"[\\/]+", str(full_path))
idx = None
for i, part in enumerate(parts):
if part.lower() == anchor.lower():
idx = i
if idx is not None and idx < len(parts) - 1:
return "/".join(parts[idx + 1 :])
return os.path.basename(str(full_path))
for chunk in all_splits:
if "source" in chunk.metadata and chunk.metadata["source"]:
chunk.metadata["source"] = path_after_context(chunk.metadata["source"], anchor="context")
if info:
print(f"Total chunks: {len(all_splits)}")
def chunk_id(chunk, prefix="vec"):
text_hash = hashlib.sha256(chunk.page_content.encode("utf-8")).hexdigest()[:16]
source = chunk.metadata.get("source", "unknown")
file_hash = hashlib.sha256(source.encode("utf-8")).hexdigest()[:8]
return f"{prefix}_{file_hash}_{text_hash}"
# -------------------------
# Vector ID & Namespace Architecture
# -------------------------
"""
VECTOR ID GENERATION STRATEGY:
For Document Chunks:
Pattern: "vec_{file_hash}_{content_hash}"
- file_hash: SHA-256 of normalized source path (8 chars)
- content_hash: SHA-256 of chunk content (16 chars)
- Enables exact duplicate detection across runs
- Stable across different machine paths due to source normalization
For Community Summaries:
Pattern: "comm_{community_hash}"
- community_hash: SHA-256 of "community_id|level|sorted_node_sample"
- Ensures stable IDs for the same community composition
- Allows updates when community structure changes
NAMESPACE STRATEGY:
- "example-namespace": Stores document chunk embeddings
- "community-summaries": Stores hierarchical community summaries
- Separation enables independent update/query strategies
- Prevents interference between document and summary vectors
IDEMPOTENCY GUARANTEE:
- fetch_existing_ids() checks Pinecone before embedding
- Prevents duplicate embeddings for identical content
- Enables safe re-runs without data duplication
- Reduces embedding costs and storage usage
"""
namespace = "example-namespace"
all_ids = [chunk_id(c) for c in all_splits]
existing = fetch_existing_ids(index, namespace, all_ids, batch_size=100)
new_chunks = [(c, i) for c, i in zip(all_splits, all_ids) if i not in existing]
if info:
print(f"Chunks already indexed: {len(all_splits) - len(new_chunks)}")
print(f"New chunks to embed: {len(new_chunks)}")
if not new_chunks:
print("[Info] Nothing new to index. Skipping embedding/upsert.")
else:
batch_size = 94
def retry_forever(func, *args, **kwargs):
attempt = 1
while True:
try:
return func(*args, **kwargs)
except Exception as e:
wait = min(60, 2 ** min(attempt, 6))
print(f"[Retry] {func.__name__} failed (attempt {attempt}): {e}. Sleeping {wait}s")
time.sleep(wait)
attempt += 1
for start_idx in range(0, len(new_chunks), batch_size):
print(f"[Info] Embedding and upserting batch {start_idx // batch_size + 1}...")
batch, ids = zip(*new_chunks[start_idx:start_idx + batch_size])
texts = [chunk.page_content for chunk in batch]
metas = [chunk.metadata or {} for chunk in batch]
embeddings = retry_forever(
pc.inference.embed,
model="llama-text-embed-v2",
inputs=texts,
parameters={"input_type": "passage", "truncate": "END"}
)
batch_records = [
{"id": i, "values": e['values'], "metadata": {"text": t, **m}}
for i, e, t, m in zip(ids, embeddings, texts, metas)
]
retry_forever(index.upsert, vectors=batch_records, namespace=namespace)
if info:
print(f"Completed upsert of {len(new_chunks)} new vectors.")
return index, pc, llm, documents # Return documents for KG construction
def build_knowledge_graph(documents, llm, pc, index, info=True):
"""
Build/update the Knowledge Graph (KG) from documents, persist it, merge deltas, and
(re)generate community summaries for changed regions.
Args:
documents: List of LangChain Documents from prepare_RAG (may be empty).
llm: LangChain-compatible LLM used via LlamaIndex.
pc: Pinecone client (for embeddings).
index: Pinecone index to store community summary vectors.
info: Enable verbose logging.
Returns:
KnowledgeGraphIndex | None
Flow:
1) Identify new/changed docs via source+content hashing (seen file cache).
2) Load existing KG from pickle or build a fresh one.
3) If there is a delta, build a delta KG and merge nodes/edges.
4) Summarize impacted communities and upsert summaries to Pinecone.
5) Export `knowledge_graph.json` and update the seen-file signatures.
"""
import os, pickle, json, hashlib, re
from llama_index.core import Document, KnowledgeGraphIndex
from llama_index.llms.langchain import LangChainLLM
# ---- duplicate detection "like in prepare_RAG" (signature unchanged) ----
def path_after_context(full_path: str, anchor: str = "context") -> str:
if not full_path:
return ""
parts = re.split(r"[\\/]+", str(full_path))
idx = None
for i, part in enumerate(parts):
if part.lower() == anchor.lower():
idx = i
if idx is not None and idx < len(parts) - 1:
return "/".join(parts[idx + 1 :])
return os.path.basename(str(full_path))
def file_sig(doc_like):
"""Return (sig_id, normalized_source) using source+content hashing similar to prepare_RAG."""
meta = getattr(doc_like, "metadata", {}) or {}
text = getattr(doc_like, "page_content", "") or getattr(doc_like, "text", "") or ""
src = meta.get("source", "unknown")
if src:
src = path_after_context(src, anchor="context")
src_hash = hashlib.sha256(src.encode("utf-8")).hexdigest()[:8]
text_hash = hashlib.sha256(text.encode("utf-8")).hexdigest()[:16]
return f"kg_{src_hash}_{text_hash}", src
def load_seen_sigs(path="kg_seen_files.json"):
try:
if os.path.exists(path):
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
return set(data if isinstance(data, list) else [])
except Exception as e:
print(f"[Warn] Failed to load seen file sigs: {e}")
return set()
def save_seen_sigs(sigs, path="kg_seen_files.json"):
try:
with open(path, "w", encoding="utf-8") as f:
json.dump(sorted(list(sigs)), f, indent=2)
except Exception as e:
print(f"[Warn] Failed to save seen file sigs: {e}")
# -------------------------
# Incremental KG Update Strategy
# -------------------------
"""
CRITICAL: This section handles the complex merge of new documents into existing knowledge graphs.
KEY CHALLENGES ADDRESSED:
- Duplicate Detection: Uses content+source hashing to identify truly new/changed documents
- Delta Processing: Builds partial KG from only new documents, then merges
- Conflict Resolution: Handles nodes/edges that may already exist in the base graph
- Change Propagation: Tracks exactly which nodes/edges are new for community summarization
MERGE STRATEGY:
1. Signature-based filtering identifies only new/changed documents
2. Builds a "delta KG" from new documents only
3. Performs set operations to find truly new nodes/edges:
- new_nodes = delta_nodes - base_nodes
- new_edges = delta_edges - base_edges
4. Merges using NetworkX's native add_nodes_from/add_edges_from
5. Preserves all node/edge attributes during merge
WHY THIS MATTERS:
- Without proper incremental updates, the system would rebuild the entire KG every time
- Enables efficient updates when only a few documents change
- Maintains community summaries for unchanged parts of the graph
"""
seen_sigs = load_seen_sigs()
# Identify new/changed docs by signature
all_docs = documents or []
new_docs = []
new_sigs = []
for d in all_docs:
sig, _ = file_sig(d)
if sig not in seen_sigs:
new_docs.append(d)
new_sigs.append(sig)
if info:
print(f"[KG] Total input docs: {len(all_docs)} | New/changed docs detected: {len(new_docs)}")
# ---- prepare LlamaIndex objects ----
llama_docs_all = [Document(text=doc.page_content, metadata=doc.metadata) for doc in all_docs]
llama_docs_delta = [Document(text=doc.page_content, metadata=doc.metadata) for doc in new_docs]
llm_for_kg = LangChainLLM(llm)
persist_file = os.path.abspath("./kg_index.pkl")
def _build_and_persist(docs):
kg = KnowledgeGraphIndex.from_documents(
documents=docs,
max_triplets_per_chunk=20,
extract_relations=True,
include_embeddings=True,
llm=llm_for_kg
)
with open(persist_file, "wb") as f:
pickle.dump(kg, f)
return kg
def _load_existing():
with open(persist_file, "rb") as f:
return pickle.load(f)
kg_index = None
graph_exists = False
try:
if os.path.exists(persist_file):
if info:
print("[Info] Found persisted KG pickle file.")
graph_exists = True
kg_index = _load_existing()
if info:
print("[Info] Loaded Knowledge Graph from pickle.")
elif llama_docs_all:
if info:
print("[Info] No persisted KG found. Building new KG from all documents...")
kg_index = _build_and_persist(llama_docs_all)
if info:
print("[Info] Built and persisted Knowledge Graph via pickle.")
else:
if info:
print("[Info] No persisted KG found and no documents to build from.")
except Exception as e:
print(f"[Error] GraphRAG init/load failed: {e}")
kg_index = None
# ---- incremental insertion (signature unchanged) ----
inserted_any = False
graph_override = None # if we need merge fallback for community detection
new_nodes = set()
new_edges = set()
if kg_index and graph_exists and llama_docs_delta:
if info:
print(f"[Info] Incrementally inserting {len(llama_docs_delta)} new/changed docs into KG...")
######################################################################
try:
# Build delta KG from new/changed docs
kg_delta = KnowledgeGraphIndex.from_documents(
documents=llama_docs_delta,
max_triplets_per_chunk=20,
extract_relations=True,
include_embeddings=False,
llm=llm_for_kg
)
nxg_base = kg_index.get_networkx_graph()
nxg_delta = kg_delta.get_networkx_graph()
# Diagnostic: Print node/edge sets before merge
base_nodes_before = set(nxg_base.nodes())
base_edges_before = set(nxg_base.edges())
delta_nodes = set(nxg_delta.nodes())
delta_edges = set(nxg_delta.edges())
print(f"\n[Diagnostic] Base graph nodes before merge: {len(base_nodes_before)}")
print(f"[Diagnostic] Base graph edges before merge: {len(base_edges_before)}")
print(f"[Diagnostic] Delta graph nodes: {len(delta_nodes)}")
print(f"[Diagnostic] Delta graph edges: {len(delta_edges)}")
# Show intersection and difference
new_nodes = delta_nodes - base_nodes_before
new_edges = delta_edges - base_edges_before
already_existing_nodes = delta_nodes & base_nodes_before
already_existing_edges = delta_edges & base_edges_before
print(f"[Diagnostic] Delta nodes already in base: {len(already_existing_nodes)}")
print(f"[Diagnostic] Delta edges already in base: {len(already_existing_edges)}")
print(f"[Diagnostic] Truly new nodes to add: {len(new_nodes)}")
print(f"[Diagnostic] Truly new edges to add: {len(new_edges)}")
# Merge delta into base
nxg_base.add_nodes_from(nxg_delta.nodes(data=True))
nxg_base.add_edges_from(nxg_delta.edges(data=True))
graph_override = nxg_base
inserted_any = True
# Diagnostic: Print node/edge sets after merge
base_nodes_after = set(nxg_base.nodes())
base_edges_after = set(nxg_base.edges())
print(f"\n[Diagnostic] Base graph nodes after merge: {len(base_nodes_after)}")
print(f"[Diagnostic] Base graph edges after merge: {len(base_edges_after)}")
print(f"[Diagnostic] Nodes added: {len(base_nodes_after - base_nodes_before)}")
print(f"[Diagnostic] Edges added: {len(base_edges_after - base_edges_before)}")
# Print delta graph summary
num_nodes = nxg_delta.number_of_nodes()
num_edges = nxg_delta.number_of_edges()
print(f"\n[Delta Graph Summary]")
print(f" - Total Nodes: {num_nodes}")
print(f" - Total Edges: {num_edges}")
# Print first 10 nodes
print("\n[Delta Graph Nodes] (showing up to 10):")
for i, (node, data) in enumerate(nxg_delta.nodes(data=True)):
if i >= 10:
print(" ...")
break
print(f" {i+1}. {node}: {data}")
# Print first 10 edges
print("\n[Delta Graph Edges] (showing up to 10):")
for i, (source, target, data) in enumerate(nxg_delta.edges(data=True)):
if i >= 10:
print(" ...")
break
print(f" {i+1}. {source} -> {target}: {data}")
# Warn if nothing new was actually added
if len(new_nodes) == 0 and len(new_edges) == 0:
print("[Warning] All delta nodes/edges already existed in the base graph. No actual change.")
if info:
print("[Info] Merged delta KG into existing graph (override used for summaries).")
except Exception as e:
print(f"[Error] Fallback merge failed: {e}")
######################################################################
# Persist KG if mutated via API
if inserted_any and graph_override is None:
try:
with open(persist_file, "wb") as f:
pickle.dump(kg_index, f)
except Exception as e:
print(f"[Warn] Failed to persist updated KG: {e}")
# First-time build already happened above (graph_exists==False and llama_docs_all not empty)
# ---- Community summaries (incremental occurs inside the helper; same signature) ----
if kg_index:
# Only trigger summaries when: first build or we actually inserted/merged something
if not graph_exists or inserted_any:
_build_and_index_community_summaries(
kg_index=kg_index,
pc=pc,
index=index,
llm=llm,
impacted_nodes=new_nodes.union(u for u, v in new_edges).union(v for u, v in new_edges),
info=True
)
# Optional: save graph for visualization (post-update)
try:
nxg = graph_override if graph_override is not None else kg_index.get_networkx_graph()
graph_dict = {}
for u, v, attrs in nxg.edges(data=True):
rel = attrs.get("label") or attrs.get("relationship") or "related_to"
if u not in graph_dict:
graph_dict[u] = []
graph_dict[u].append([rel, v])
output_file = "knowledge_graph.json"
with open(output_file, "w", encoding="utf-8") as f:
json.dump(graph_dict, f, indent=4, ensure_ascii=False)
if info:
print(f"[Info] Knowledge graph saved to {output_file}")
except Exception as e:
print(f"[Error] Failed to save knowledge graph: {e}")
# ---- mark seen signatures only after successful insertion or first build ----
if (not graph_exists and llama_docs_all) or inserted_any:
# Add only the new ones we processed this run
seen_sigs.update(new_sigs)
save_seen_sigs(seen_sigs)
return kg_index