File size: 3,719 Bytes
b3a84d6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
"""
BharatGraph - Alias Graph
Maps every known name variant of an entity to its canonical ID.
Loaded at startup and used during all entity lookups so that
"RAHUL KUMAR", "Rahul Kumar", and "R. Kumar" all resolve to one node.
Pure ASCII.
"""
import json
import os
from loguru import logger


class AliasGraph:
    """
    In-memory lookup: alias_name.lower() -> canonical_id

    Built from EntityResolverV2.build_alias_graph() output and
    persisted at data/processed/alias_graph.json.
    """

    DEFAULT_PATH = "data/processed/alias_graph.json"

    def __init__(self):
        self._graph = {}   # alias_lower -> canonical_id

    def load(self, path: str = None) -> int:
        """Load alias graph from disk. Returns number of entries loaded."""
        path = path or self.DEFAULT_PATH
        if not os.path.exists(path):
            logger.warning(
                f"[AliasGraph] File not found: {path}. "
                "Run pipeline first to build alias graph."
            )
            return 0
        try:
            with open(path, "r", encoding="utf-8") as f:
                self._graph = json.load(f)
            logger.success(
                f"[AliasGraph] Loaded {len(self._graph)} aliases from {path}"
            )
            return len(self._graph)
        except Exception as e:
            logger.error(f"[AliasGraph] Load failed: {e}")
            return 0

    def save(self, path: str = None):
        """Persist current alias graph to disk."""
        path = path or self.DEFAULT_PATH
        os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
        with open(path, "w", encoding="utf-8") as f:
            json.dump(self._graph, f, indent=2, ensure_ascii=False)
        logger.success(
            f"[AliasGraph] Saved {len(self._graph)} aliases to {path}"
        )

    def resolve(self, name: str) -> str:
        """
        Resolve a name variant to its canonical ID.
        Returns the canonical ID if found, else empty string.
        """
        return self._graph.get(name.lower().strip(), "")

    def add(self, alias: str, canonical_id: str):
        """Add or update a single alias -> canonical_id mapping."""
        self._graph[alias.lower().strip()] = canonical_id

    def merge(self, other: dict):
        """Merge another {alias: canonical_id} dict into this graph."""
        normalised = {k.lower().strip(): v for k, v in other.items()}
        self._graph.update(normalised)
        logger.info(
            f"[AliasGraph] Merged {len(other)} entries. "
            f"Total: {len(self._graph)}"
        )

    def bulk_add(self, records: list, name_field: str,
                  canonical_id_field: str):
        """
        Add aliases from a list of records.
        Each record[name_field] becomes an alias for record[canonical_id_field].
        """
        added = 0
        for rec in records:
            name = str(rec.get(name_field, "")).strip()
            cid  = str(rec.get(canonical_id_field, "")).strip()
            if name and cid:
                self._graph[name.lower()] = cid
                added += 1
        logger.info(f"[AliasGraph] Bulk-added {added} aliases")

    def __len__(self) -> int:
        return len(self._graph)

    def __contains__(self, name: str) -> bool:
        return name.lower().strip() in self._graph

    def stats(self) -> dict:
        """Return basic statistics about the alias graph."""
        canonical_ids = set(self._graph.values())
        return {
            "total_aliases":        len(self._graph),
            "unique_canonical_ids": len(canonical_ids),
            "avg_aliases_per_id":   round(
                len(self._graph) / max(len(canonical_ids), 1), 2
            ),
        }