| |
| """ |
| Phase 5: Entity Network Analysis |
| |
| 1. Entity resolution: group similar PERSON names using fuzzy matching |
| 2. Co-occurrence: count entity pairs that appear in the same document |
| 3. Store in entity_aliases and entity_relationships tables |
| |
| Focuses on PERSON and ORG entities that appear in 3+ documents. |
| |
| Runs on: Hetzner CPU |
| """ |
|
|
| import logging |
| from collections import defaultdict |
| from difflib import SequenceMatcher |
|
|
| import psycopg2 |
| import psycopg2.extras |
|
|
| from db import get_conn |
|
|
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s") |
| log = logging.getLogger(__name__) |
|
|
| MIN_DOCS = 3 |
| FUZZY_THRESHOLD = 0.88 |
| MAX_ENTITIES_PER_DOC = 50 |
| BATCH_SIZE = 1000 |
|
|
|
|
| def get_frequent_entities(conn, entity_type, min_docs=MIN_DOCS): |
| """Get entities appearing in at least min_docs documents.""" |
| with conn.cursor() as cur: |
| cur.execute(""" |
| SELECT entity_text, COUNT(DISTINCT document_id) as doc_count |
| FROM entities |
| WHERE entity_type = %s |
| AND LENGTH(entity_text) >= 3 |
| AND LENGTH(entity_text) <= 100 |
| GROUP BY entity_text |
| HAVING COUNT(DISTINCT document_id) >= %s |
| ORDER BY doc_count DESC |
| """, (entity_type, min_docs)) |
| return cur.fetchall() |
|
|
|
|
| def resolve_entities(entities): |
| """Find aliases among entity names using fuzzy matching.""" |
| names = [e[0] for e in entities] |
| doc_counts = {e[0]: e[1] for e in entities} |
|
|
| |
| names.sort(key=lambda n: doc_counts.get(n, 0), reverse=True) |
|
|
| canonical_map = {} |
| groups = {} |
|
|
| for name in names: |
| if name in canonical_map: |
| continue |
|
|
| |
| best_match = None |
| best_ratio = 0.0 |
|
|
| name_lower = name.lower().strip() |
|
|
| for canonical in groups: |
| canonical_lower = canonical.lower().strip() |
|
|
| |
| if abs(len(name_lower) - len(canonical_lower)) > max(len(name_lower), len(canonical_lower)) * 0.3: |
| continue |
|
|
| |
| if name_lower in canonical_lower or canonical_lower in name_lower: |
| ratio = 0.92 |
| else: |
| ratio = SequenceMatcher(None, name_lower, canonical_lower).ratio() |
|
|
| if ratio > best_ratio and ratio >= FUZZY_THRESHOLD: |
| best_ratio = ratio |
| best_match = canonical |
|
|
| if best_match: |
| canonical_map[name] = best_match |
| groups[best_match].append(name) |
| else: |
| groups[name] = [] |
| canonical_map[name] = name |
|
|
| return canonical_map, groups |
|
|
|
|
| def store_aliases(conn, canonical_map, entity_type): |
| """Store alias mappings in entity_aliases table.""" |
| rows = [] |
| for alias, canonical in canonical_map.items(): |
| if alias != canonical: |
| rows.append((canonical, alias, entity_type, 0.9)) |
|
|
| if not rows: |
| return 0 |
|
|
| with conn.cursor() as cur: |
| psycopg2.extras.execute_batch( |
| cur, |
| """INSERT INTO entity_aliases (canonical_name, alias_name, entity_type, confidence) |
| VALUES (%s, %s, %s, %s) |
| ON CONFLICT (alias_name, entity_type) DO UPDATE SET |
| canonical_name = EXCLUDED.canonical_name""", |
| rows, |
| page_size=1000, |
| ) |
| conn.commit() |
| return len(rows) |
|
|
|
|
| def build_cooccurrence(conn, entity_type, canonical_map): |
| """Build co-occurrence relationships per source_section.""" |
| log.info(f"Building co-occurrence for {entity_type}...") |
|
|
| |
| with conn.cursor() as cur: |
| cur.execute("SELECT DISTINCT source_section FROM documents ORDER BY source_section") |
| sections = [r[0] for r in cur.fetchall()] |
|
|
| total_rels = 0 |
|
|
| for section in sections: |
| log.info(f" Processing section: {section}") |
|
|
| |
| with conn.cursor() as cur: |
| cur.execute(""" |
| SELECT e.document_id, array_agg(DISTINCT e.entity_text) as entities |
| FROM entities e |
| JOIN documents d ON d.id = e.document_id |
| WHERE e.entity_type = %s AND d.source_section = %s |
| AND LENGTH(e.entity_text) >= 3 |
| GROUP BY e.document_id |
| HAVING COUNT(DISTINCT e.entity_text) >= 2 |
| """, (entity_type, section)) |
| doc_entities = cur.fetchall() |
|
|
| if not doc_entities: |
| continue |
|
|
| |
| pair_counts = defaultdict(lambda: {'count': 0, 'docs': set()}) |
|
|
| for doc_id, ent_list in doc_entities: |
| |
| resolved = list(set(canonical_map.get(e, e) for e in ent_list)) |
| resolved.sort() |
|
|
| |
| if len(resolved) > MAX_ENTITIES_PER_DOC: |
| resolved = resolved[:MAX_ENTITIES_PER_DOC] |
|
|
| for i in range(len(resolved)): |
| for j in range(i + 1, len(resolved)): |
| key = (resolved[i], resolved[j]) |
| pair_counts[key]['count'] += 1 |
| if len(pair_counts[key]['docs']) < 10: |
| pair_counts[key]['docs'].add(doc_id) |
|
|
| |
| significant = {k: v for k, v in pair_counts.items() if v['count'] >= 2} |
|
|
| if not significant: |
| continue |
|
|
| |
| rows = [] |
| for (ea, eb), data in significant.items(): |
| sample_ids = sorted(list(data['docs']))[:5] |
| rows.append(( |
| ea, entity_type, eb, entity_type, |
| data['count'], len(data['docs']), |
| section, sample_ids, |
| )) |
|
|
| with conn.cursor() as cur: |
| psycopg2.extras.execute_batch( |
| cur, |
| """INSERT INTO entity_relationships |
| (entity_a, entity_a_type, entity_b, entity_b_type, |
| co_occurrence_count, document_count, source_section, sample_doc_ids) |
| VALUES (%s, %s, %s, %s, %s, %s, %s, %s) |
| ON CONFLICT (entity_a, entity_a_type, entity_b, entity_b_type, source_section) |
| DO UPDATE SET |
| co_occurrence_count = EXCLUDED.co_occurrence_count, |
| document_count = EXCLUDED.document_count, |
| sample_doc_ids = EXCLUDED.sample_doc_ids""", |
| rows, |
| page_size=500, |
| ) |
| conn.commit() |
| total_rels += len(rows) |
| log.info(f" {section}: {len(rows)} relationships ({len(doc_entities)} docs)") |
|
|
| return total_rels |
|
|
|
|
| def main(): |
| conn = get_conn() |
|
|
| for entity_type in ['PERSON', 'ORG']: |
| log.info(f"=== Processing {entity_type} entities ===") |
|
|
| |
| entities = get_frequent_entities(conn, entity_type) |
| log.info(f"Found {len(entities)} frequent {entity_type} entities (>= {MIN_DOCS} docs)") |
|
|
| if not entities: |
| continue |
|
|
| |
| if len(entities) <= 50000: |
| log.info("Running entity resolution...") |
| canonical_map, groups = resolve_entities(entities) |
| alias_count = sum(1 for a, c in canonical_map.items() if a != c) |
| log.info(f"Found {alias_count} aliases across {len(groups)} canonical entities") |
| stored = store_aliases(conn, canonical_map, entity_type) |
| log.info(f"Stored {stored} alias mappings") |
| else: |
| log.info(f"Too many entities ({len(entities)}) for fuzzy matching, using exact names") |
| canonical_map = {e[0]: e[0] for e in entities} |
|
|
| |
| total_rels = build_cooccurrence(conn, entity_type, canonical_map) |
| log.info(f"Total {entity_type} relationships: {total_rels}") |
|
|
| conn.close() |
| log.info("Done.") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|