Spaces:
Running
Running
File size: 3,719 Bytes
b3a84d6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | """
BharatGraph - Alias Graph
Maps every known name variant of an entity to its canonical ID.
Loaded at startup and used during all entity lookups so that
"RAHUL KUMAR", "Rahul Kumar", and "R. Kumar" all resolve to one node.
Pure ASCII.
"""
import json
import os
from loguru import logger
class AliasGraph:
"""
In-memory lookup: alias_name.lower() -> canonical_id
Built from EntityResolverV2.build_alias_graph() output and
persisted at data/processed/alias_graph.json.
"""
DEFAULT_PATH = "data/processed/alias_graph.json"
def __init__(self):
self._graph = {} # alias_lower -> canonical_id
def load(self, path: str = None) -> int:
"""Load alias graph from disk. Returns number of entries loaded."""
path = path or self.DEFAULT_PATH
if not os.path.exists(path):
logger.warning(
f"[AliasGraph] File not found: {path}. "
"Run pipeline first to build alias graph."
)
return 0
try:
with open(path, "r", encoding="utf-8") as f:
self._graph = json.load(f)
logger.success(
f"[AliasGraph] Loaded {len(self._graph)} aliases from {path}"
)
return len(self._graph)
except Exception as e:
logger.error(f"[AliasGraph] Load failed: {e}")
return 0
def save(self, path: str = None):
"""Persist current alias graph to disk."""
path = path or self.DEFAULT_PATH
os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(self._graph, f, indent=2, ensure_ascii=False)
logger.success(
f"[AliasGraph] Saved {len(self._graph)} aliases to {path}"
)
def resolve(self, name: str) -> str:
"""
Resolve a name variant to its canonical ID.
Returns the canonical ID if found, else empty string.
"""
return self._graph.get(name.lower().strip(), "")
def add(self, alias: str, canonical_id: str):
"""Add or update a single alias -> canonical_id mapping."""
self._graph[alias.lower().strip()] = canonical_id
def merge(self, other: dict):
"""Merge another {alias: canonical_id} dict into this graph."""
normalised = {k.lower().strip(): v for k, v in other.items()}
self._graph.update(normalised)
logger.info(
f"[AliasGraph] Merged {len(other)} entries. "
f"Total: {len(self._graph)}"
)
def bulk_add(self, records: list, name_field: str,
canonical_id_field: str):
"""
Add aliases from a list of records.
Each record[name_field] becomes an alias for record[canonical_id_field].
"""
added = 0
for rec in records:
name = str(rec.get(name_field, "")).strip()
cid = str(rec.get(canonical_id_field, "")).strip()
if name and cid:
self._graph[name.lower()] = cid
added += 1
logger.info(f"[AliasGraph] Bulk-added {added} aliases")
def __len__(self) -> int:
return len(self._graph)
def __contains__(self, name: str) -> bool:
return name.lower().strip() in self._graph
def stats(self) -> dict:
"""Return basic statistics about the alias graph."""
canonical_ids = set(self._graph.values())
return {
"total_aliases": len(self._graph),
"unique_canonical_ids": len(canonical_ids),
"avg_aliases_per_id": round(
len(self._graph) / max(len(canonical_ids), 1), 2
),
}
|