| """ |
| Graph RAG — Knowledge Graph Builder |
| Builds a NetworkX graph where: |
| - Nodes = chunks (from doc1 & doc2) |
| - Edges = relationships between chunks: |
| * sequential : consecutive chunks in same document |
| * same_section : chunks sharing the same heading/section |
| * cross_similar: high cosine similarity between doc1 chunk & doc2 chunk |
| * entity_link : chunks sharing important noun phrases (entities) |
| """ |
| import re |
| import networkx as nx |
| from typing import List, Dict, Any, Tuple |
| from sentence_transformers import SentenceTransformer |
| import numpy as np |
| from sklearn.metrics.pairwise import cosine_similarity |
|
|
| from .chunker import Chunk |
|
|
|
|
| _EMBED_MODEL_NAME = "all-MiniLM-L6-v2" |
| _CROSS_SIM_THRESHOLD = 0.55 |
| _ENTITY_MIN_LEN = 4 |
|
|
|
|
| def _extract_noun_phrases(text: str) -> set: |
| """ |
| Lightweight noun phrase extraction via regex patterns. |
| No spacy dependency — works in constrained environments. |
| """ |
| |
| patterns = [ |
| r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)+\b', |
| r'\b[A-Z]{2,}\b', |
| r'\b\w{5,}\b', |
| ] |
| entities = set() |
| for pat in patterns: |
| found = re.findall(pat, text) |
| entities.update(f.strip().lower() for f in found if len(f) >= _ENTITY_MIN_LEN) |
| |
| stopwords = {'which', 'these', 'those', 'their', 'there', 'where', 'about', |
| 'would', 'could', 'should', 'other', 'being', 'using', 'having'} |
| return entities - stopwords |
|
|
|
|
| class GraphBuilder: |
| """ |
| Builds and queries a knowledge graph from doc chunks. |
| """ |
|
|
| def __init__(self): |
| self._model = SentenceTransformer(_EMBED_MODEL_NAME) |
| self.graph: nx.Graph = nx.Graph() |
| self._chunk_map: Dict[str, Chunk] = {} |
|
|
| |
| |
| |
|
|
| def build(self, doc1_chunks: List[Chunk], doc2_chunks: List[Chunk]) -> nx.Graph: |
| """ |
| Full graph construction pipeline. |
| Returns the built NetworkX graph. |
| """ |
| self.graph = nx.Graph() |
| self._chunk_map = {} |
|
|
| all_chunks = doc1_chunks + doc2_chunks |
|
|
| |
| for chunk in all_chunks: |
| self._chunk_map[chunk.chunk_id] = chunk |
| self.graph.add_node( |
| chunk.chunk_id, |
| text=chunk.text[:200], |
| doc_id=chunk.doc_id, |
| section=chunk.section, |
| chunk_index=chunk.chunk_index, |
| entities=list(_extract_noun_phrases(chunk.text)), |
| ) |
|
|
| |
| self._add_sequential_edges(doc1_chunks) |
| self._add_sequential_edges(doc2_chunks) |
|
|
| |
| self._add_section_edges(all_chunks) |
|
|
| |
| self._add_cross_similarity_edges(doc1_chunks, doc2_chunks) |
|
|
| |
| self._add_entity_edges(all_chunks) |
|
|
| return self.graph |
|
|
| def _add_sequential_edges(self, chunks: List[Chunk]) -> None: |
| sorted_chunks = sorted(chunks, key=lambda c: c.chunk_index) |
| for i in range(len(sorted_chunks) - 1): |
| a, b = sorted_chunks[i], sorted_chunks[i + 1] |
| self.graph.add_edge( |
| a.chunk_id, b.chunk_id, |
| relation="sequential", |
| weight=0.9, |
| ) |
|
|
| def _add_section_edges(self, chunks: List[Chunk]) -> None: |
| section_map: Dict[str, List[str]] = {} |
| for chunk in chunks: |
| key = f"{chunk.doc_id}::{chunk.section}" |
| section_map.setdefault(key, []).append(chunk.chunk_id) |
|
|
| for ids in section_map.values(): |
| for i in range(len(ids)): |
| for j in range(i + 1, len(ids)): |
| if not self.graph.has_edge(ids[i], ids[j]): |
| self.graph.add_edge( |
| ids[i], ids[j], |
| relation="same_section", |
| weight=0.6, |
| ) |
|
|
| def _add_cross_similarity_edges( |
| self, doc1_chunks: List[Chunk], doc2_chunks: List[Chunk] |
| ) -> None: |
| if not doc1_chunks or not doc2_chunks: |
| return |
|
|
| texts1 = [c.text for c in doc1_chunks] |
| texts2 = [c.text for c in doc2_chunks] |
|
|
| emb1 = self._model.encode(texts1, batch_size=32, show_progress_bar=False) |
| emb2 = self._model.encode(texts2, batch_size=32, show_progress_bar=False) |
|
|
| sim_matrix = cosine_similarity(emb1, emb2) |
|
|
| for i, c1 in enumerate(doc1_chunks): |
| for j, c2 in enumerate(doc2_chunks): |
| sim = float(sim_matrix[i, j]) |
| if sim >= _CROSS_SIM_THRESHOLD: |
| self.graph.add_edge( |
| c1.chunk_id, c2.chunk_id, |
| relation="cross_similar", |
| weight=round(sim, 4), |
| similarity=round(sim, 4), |
| ) |
|
|
| def _add_entity_edges(self, chunks: List[Chunk]) -> None: |
| entity_to_chunks: Dict[str, List[str]] = {} |
| for chunk in chunks: |
| entities = _extract_noun_phrases(chunk.text) |
| for ent in entities: |
| entity_to_chunks.setdefault(ent, []).append(chunk.chunk_id) |
|
|
| for ent, ids in entity_to_chunks.items(): |
| if len(ids) < 2: |
| continue |
| |
| doc_ids = {self._chunk_map[cid].doc_id: cid for cid in ids} |
| if len(doc_ids) >= 2: |
| cids = list(doc_ids.values()) |
| for i in range(len(cids)): |
| for j in range(i + 1, len(cids)): |
| if not self.graph.has_edge(cids[i], cids[j]): |
| self.graph.add_edge( |
| cids[i], cids[j], |
| relation="entity_link", |
| entity=ent, |
| weight=0.5, |
| ) |
|
|
| |
| |
| |
|
|
| def retrieve( |
| self, |
| query: str, |
| seed_chunks: List[Dict[str, Any]], |
| hops: int = 2, |
| max_nodes: int = 10, |
| ) -> List[Dict[str, Any]]: |
| """ |
| Graph-aware retrieval: |
| 1. Start from seed chunk nodes (vector search results) |
| 2. Expand via BFS up to `hops` hops, prioritising high-weight edges |
| 3. Return unique chunks from both docs, ranked by relevance |
| """ |
| visited = set() |
| result_nodes = [] |
|
|
| seed_ids = [ |
| f"{s['doc_id']}_chunk_{s['chunk_index']}" |
| for s in seed_chunks |
| if s.get('chunk_index') is not None |
| ] |
|
|
| |
| queue = [(nid, hops, 1.0) for nid in seed_ids if nid in self.graph] |
|
|
| while queue and len(result_nodes) < max_nodes: |
| node_id, remaining, acc_weight = queue.pop(0) |
| if node_id in visited: |
| continue |
| visited.add(node_id) |
|
|
| chunk = self._chunk_map.get(node_id) |
| if chunk: |
| result_nodes.append({ |
| "chunk_id": node_id, |
| "text": chunk.text, |
| "doc_id": chunk.doc_id, |
| "section": chunk.section, |
| "relevance": round(acc_weight, 4), |
| }) |
|
|
| if remaining > 0: |
| neighbors = sorted( |
| self.graph[node_id].items(), |
| key=lambda x: x[1].get("weight", 0), |
| reverse=True, |
| ) |
| for neighbor_id, edge_data in neighbors[:4]: |
| if neighbor_id not in visited: |
| queue.append(( |
| neighbor_id, |
| remaining - 1, |
| acc_weight * edge_data.get("weight", 0.5), |
| )) |
|
|
| |
| result_nodes.sort(key=lambda x: x["relevance"], reverse=True) |
| return result_nodes[:max_nodes] |
|
|
| def get_stats(self) -> Dict[str, Any]: |
| edge_types = {} |
| for _, _, data in self.graph.edges(data=True): |
| rel = data.get("relation", "unknown") |
| edge_types[rel] = edge_types.get(rel, 0) + 1 |
| return { |
| "nodes": self.graph.number_of_nodes(), |
| "edges": self.graph.number_of_edges(), |
| "edge_types": edge_types, |
| } |