Spaces:
Running
Running
fix(test): rewrite normalise_indian_name -- add Sh. to honorifics, while-loop for stacked honorifics, fix broken function body
9195906 | """ | |
| BharatGraph - Entity Resolution v2: Canonical Identity Engine | |
| Replaces the basic Jaccard resolver with a multi-signal pipeline: | |
| 1. Exact key match (CIN, PAN, GSTIN, Aadhaar prefix) | |
| 2. Jaro-Winkler string similarity | |
| 3. Jaccard token overlap | |
| 4. Embedding cosine similarity (optional, skipped if model unavailable) | |
| The same real-world entity appearing under multiple scraped IDs is | |
| collapsed into one canonical ID without deleting source records. | |
| Pure ASCII. | |
| """ | |
| import re | |
| import os | |
| import json | |
| import hashlib | |
| from datetime import datetime | |
| from loguru import logger | |
| from processing.cleaner import NameCleaner | |
| from processing.canonical_id import canonical_id | |
| # ---- Jaro-Winkler (no external dependency) ---------------------------- | |
| def _jaro(s1: str, s2: str) -> float: | |
| if s1 == s2: | |
| return 1.0 | |
| l1, l2 = len(s1), len(s2) | |
| if l1 == 0 or l2 == 0: | |
| return 0.0 | |
| match_dist = max(l1, l2) // 2 - 1 | |
| if match_dist < 0: | |
| match_dist = 0 | |
| s1_matches = [False] * l1 | |
| s2_matches = [False] * l2 | |
| matches = 0 | |
| transpositions = 0 | |
| for i in range(l1): | |
| start = max(0, i - match_dist) | |
| end = min(i + match_dist + 1, l2) | |
| for j in range(start, end): | |
| if s2_matches[j] or s1[i] != s2[j]: | |
| continue | |
| s1_matches[i] = True | |
| s2_matches[j] = True | |
| matches += 1 | |
| break | |
| if matches == 0: | |
| return 0.0 | |
| k = 0 | |
| for i in range(l1): | |
| if not s1_matches[i]: | |
| continue | |
| while not s2_matches[k]: | |
| k += 1 | |
| if s1[i] != s2[k]: | |
| transpositions += 1 | |
| k += 1 | |
| return (matches / l1 + matches / l2 + | |
| (matches - transpositions / 2) / matches) / 3 | |
| def jaro_winkler(s1: str, s2: str, p: float = 0.1) -> float: | |
| j = _jaro(s1, s2) | |
| prefix = 0 | |
| for c1, c2 in zip(s1[:4], s2[:4]): | |
| if c1 == c2: | |
| prefix += 1 | |
| else: | |
| break | |
| return j + prefix * p * (1 - j) | |
| # ---- Jaccard on tokens ------------------------------------------------ | |
| def _tokenize(name: str) -> set: | |
| name = name.lower() | |
| name = re.sub(r"[^\w\s]", " ", name) | |
| tokens = set(name.split()) | |
| long_t = {t for t in tokens if len(t) > 1} | |
| return long_t if long_t else tokens | |
| def jaccard(name1: str, name2: str) -> float: | |
| t1, t2 = _tokenize(name1), _tokenize(name2) | |
| if not t1 or not t2: | |
| return 0.0 | |
| return len(t1 & t2) / len(t1 | t2) | |
| # ---- Indian name normalisation ---------------------------------------- | |
| _HONORIFICS = [ | |
| "shri", "smt", "dr", "prof", "mr", "mrs", "ms", "adv", "er", | |
| "hon", "honble", "col", "gen", "brig", "maj", "capt", "late", | |
| "sri", "kumari", "km", | |
| "sh", "kum", "shr", "retd", "rtd", "ex", | |
| ] | |
| _COMPANY_SUFFIXES = [ | |
| ("private limited", "pvt ltd"), | |
| ("pvt. ltd.", "pvt ltd"), | |
| ("pvt.ltd.", "pvt ltd"), | |
| ("pvt ltd.", "pvt ltd"), | |
| ("p. ltd.", "pvt ltd"), | |
| ("p.ltd.", "pvt ltd"), | |
| ("limited", "ltd"), | |
| ("ltd.", "ltd"), | |
| ("(india)", ""), | |
| ("india pvt", "pvt"), | |
| ("llp", "llp"), | |
| ] | |
| def normalise_indian_name(name: str, kind: str = "person") -> str: | |
| name = str(name).strip().lower() | |
| name = re.sub(r"^m\s*/\s*s\.?\s*", "", name) | |
| if kind == "person": | |
| _changed = True | |
| while _changed: | |
| _prev = name | |
| for h in _HONORIFICS: | |
| name = re.sub(rf"^{re.escape(h)}\.?\s+", "", name) | |
| name = re.sub(rf"^{re.escape(h)}\.?\s*$", "", name) | |
| _changed = name != _prev | |
| else: | |
| for old, new in _COMPANY_SUFFIXES: | |
| name = name.replace(old, new) | |
| name = re.sub(r"[^\w\s\-]", " ", name) | |
| return re.sub(r"\s+", " ", name).strip() | |
| # ---- Optional sentence-transformers embedding ------------------------- | |
| _embedding_model = None | |
| _embedding_available = None | |
| def _get_embedding_model(): | |
| global _embedding_model, _embedding_available | |
| if _embedding_available is False: | |
| return None | |
| if _embedding_model is not None: | |
| return _embedding_model | |
| try: | |
| from sentence_transformers import SentenceTransformer, util | |
| _embedding_model = SentenceTransformer( | |
| "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2" | |
| ) | |
| _embedding_available = True | |
| logger.success("[ResolverV2] Embedding model loaded") | |
| return _embedding_model | |
| except Exception as e: | |
| _embedding_available = False | |
| logger.warning(f"[ResolverV2] Embedding model unavailable: {e}. " | |
| "Using Jaro-Winkler + Jaccard only.") | |
| return None | |
| def _embedding_cosine(name1: str, name2: str) -> float: | |
| model = _get_embedding_model() | |
| if model is None: | |
| return 0.0 | |
| try: | |
| from sentence_transformers import util | |
| embs = model.encode([name1, name2], convert_to_tensor=True) | |
| return float(util.cos_sim(embs[0], embs[1])) | |
| except Exception: | |
| return 0.0 | |
| # ---- Main resolver class ---------------------------------------------- | |
| class EntityResolverV2: | |
| def __init__(self, threshold: float = 0.82): | |
| self.threshold = threshold | |
| self.cleaner = NameCleaner() | |
| logger.info(f"[ResolverV2] threshold={threshold}") | |
| def _exact_key_score(self, rec_a: dict, rec_b: dict) -> float: | |
| key_fields = ["cin", "pan", "gstin", "wikidata_id", "darpan_id", | |
| "bond_number", "order_id", "tender_id", "icij_node_id"] | |
| for field in key_fields: | |
| val_a = str(rec_a.get(field) or "").strip().upper() | |
| val_b = str(rec_b.get(field) or "").strip().upper() | |
| if val_a and val_b: | |
| return 1.0 if val_a == val_b else 0.0 | |
| return 0.0 | |
| def combined_score(self, name_a: str, name_b: str, | |
| rec_a: dict = None, rec_b: dict = None, | |
| kind: str = "person") -> float: | |
| if rec_a and rec_b: | |
| ek = self._exact_key_score(rec_a, rec_b) | |
| if ek == 1.0: | |
| return 1.0 | |
| if ek == 0.0 and any( | |
| str(rec_a.get(f) or "").strip() | |
| for f in ["cin", "pan", "gstin"] | |
| ): | |
| return 0.0 | |
| a = normalise_indian_name(name_a, kind) | |
| b = normalise_indian_name(name_b, kind) | |
| if not a or not b: | |
| return 0.0 | |
| if a == b: | |
| return 1.0 | |
| jw = jaro_winkler(a, b) | |
| jacc = jaccard(a, b) | |
| model = _get_embedding_model() | |
| if model is not None: | |
| cos = _embedding_cosine(name_a, name_b) | |
| return 0.30 * jw + 0.20 * jacc + 0.50 * cos | |
| else: | |
| return 0.60 * jw + 0.40 * jacc | |
| def is_same_entity(self, name_a: str, name_b: str, | |
| rec_a: dict = None, rec_b: dict = None, | |
| kind: str = "person") -> bool: | |
| return self.combined_score(name_a, name_b, rec_a, rec_b, kind) >= self.threshold | |
| def resolve_dataset(self, records: list, | |
| name_field: str = "name", | |
| kind: str = "person") -> list: | |
| if not records: | |
| return [] | |
| resolved = [] | |
| used = set() | |
| for i, rec in enumerate(records): | |
| if i in used: | |
| continue | |
| canon = dict(rec) | |
| canon["aliases"] = [] | |
| canon["duplicates"] = [] | |
| canon["_resolved_v2"] = True | |
| name_i = rec.get(name_field, "") | |
| for j in range(i + 1, len(records)): | |
| if j in used: | |
| continue | |
| name_j = records[j].get(name_field, "") | |
| score = self.combined_score(name_i, name_j, rec, records[j], kind) | |
| if score >= self.threshold: | |
| alias_entry = { | |
| "name": name_j, | |
| "id": records[j].get("id", ""), | |
| "score": round(score, 3), | |
| "source": records[j].get("_source", ""), | |
| } | |
| canon["aliases"].append(alias_entry) | |
| canon["duplicates"].append({"record": records[j], "score": round(score, 3)}) | |
| used.add(j) | |
| used.add(i) | |
| resolved.append(canon) | |
| merged = len(records) - len(resolved) | |
| logger.info(f"[ResolverV2] {len(records)} records -> {len(resolved)} canonical ({merged} merged)") | |
| return resolved | |
| def cross_dataset_match(self, dataset_a: list, dataset_b: list, | |
| name_field_a: str = "name", | |
| name_field_b: str = "name", | |
| kind: str = "person") -> list: | |
| matches = [] | |
| for rec_a in dataset_a: | |
| name_a = rec_a.get(name_field_a, "") | |
| if not name_a: | |
| continue | |
| for rec_b in dataset_b: | |
| name_b = rec_b.get(name_field_b, "") | |
| if not name_b: | |
| continue | |
| score = self.combined_score(name_a, name_b, rec_a, rec_b, kind) | |
| if score >= self.threshold: | |
| cid = canonical_id(normalise_indian_name(name_a, kind), rec_a.get("_source", "")) | |
| matches.append({ | |
| "record_a": rec_a, "record_b": rec_b, | |
| "name_a": name_a, "name_b": name_b, | |
| "score": round(score, 3), | |
| "canonical_id": cid, | |
| "match_type": "cross_dataset_v2", | |
| "matched_at": datetime.now().isoformat(), | |
| }) | |
| logger.info(f"[ResolverV2] Cross-dataset: {len(dataset_a)} x {len(dataset_b)} -> {len(matches)} matches") | |
| return matches | |
| def build_alias_graph(self, canonical_records: list, name_field: str = "name") -> dict: | |
| graph = {} | |
| for rec in canonical_records: | |
| cid = rec.get("id") or canonical_id(normalise_indian_name(rec.get(name_field, ""))) | |
| main_name = normalise_indian_name(rec.get(name_field, "")).lower() | |
| if main_name: | |
| graph[main_name] = cid | |
| for alias in rec.get("aliases", []): | |
| alias_name = normalise_indian_name(alias.get("name", "")).lower() | |
| if alias_name: | |
| graph[alias_name] = cid | |
| return graph | |
| def save_alias_graph(self, graph: dict, path: str): | |
| os.makedirs(os.path.dirname(path) or ".", exist_ok=True) | |
| with open(path, "w", encoding="utf-8") as f: | |
| json.dump(graph, f, indent=2, ensure_ascii=False) | |
| logger.success(f"[ResolverV2] Alias graph: {len(graph)} entries -> {path}") | |
| # ---- Backward-compatible alias ------------------------------------ | |
| EntityResolver = EntityResolverV2 | |
| if __name__ == "__main__": | |
| resolver = EntityResolverV2(threshold=0.72) | |
| tests = [ | |
| ("Sh. Ram Kumar", "Ramkumar", "person"), | |
| ("Narendra Modi", "N. Modi", "person"), | |
| ("Sample Pvt Ltd", "Sample Private Limited", "company"), | |
| ] | |
| for a, b, kind in tests: | |
| score = resolver.combined_score(a, b, kind=kind) | |
| print(f"{score:.3f} {a!r} vs {b!r}") | |