#!/usr/bin/env python3 """ Phase 5: Entity Network Analysis 1. Entity resolution: group similar PERSON names using fuzzy matching 2. Co-occurrence: count entity pairs that appear in the same document 3. Store in entity_aliases and entity_relationships tables Focuses on PERSON and ORG entities that appear in 3+ documents. Runs on: Hetzner CPU """ import logging from collections import defaultdict from difflib import SequenceMatcher import psycopg2 import psycopg2.extras from db import get_conn logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s") log = logging.getLogger(__name__) MIN_DOCS = 3 # minimum documents for an entity to be included FUZZY_THRESHOLD = 0.88 # SequenceMatcher ratio for alias detection MAX_ENTITIES_PER_DOC = 50 # limit entity pairs per document BATCH_SIZE = 1000 def get_frequent_entities(conn, entity_type, min_docs=MIN_DOCS): """Get entities appearing in at least min_docs documents.""" with conn.cursor() as cur: cur.execute(""" SELECT entity_text, COUNT(DISTINCT document_id) as doc_count FROM entities WHERE entity_type = %s AND LENGTH(entity_text) >= 3 AND LENGTH(entity_text) <= 100 GROUP BY entity_text HAVING COUNT(DISTINCT document_id) >= %s ORDER BY doc_count DESC """, (entity_type, min_docs)) return cur.fetchall() def resolve_entities(entities): """Find aliases among entity names using fuzzy matching.""" names = [e[0] for e in entities] doc_counts = {e[0]: e[1] for e in entities} # Sort by frequency (most common = canonical) names.sort(key=lambda n: doc_counts.get(n, 0), reverse=True) canonical_map = {} # alias -> canonical groups = {} # canonical -> [aliases] for name in names: if name in canonical_map: continue # Check against existing canonical names best_match = None best_ratio = 0.0 name_lower = name.lower().strip() for canonical in groups: canonical_lower = canonical.lower().strip() # Quick length check if abs(len(name_lower) - len(canonical_lower)) > max(len(name_lower), len(canonical_lower)) * 0.3: continue # Check containment first (faster) if name_lower in canonical_lower or canonical_lower in name_lower: ratio = 0.92 else: ratio = SequenceMatcher(None, name_lower, canonical_lower).ratio() if ratio > best_ratio and ratio >= FUZZY_THRESHOLD: best_ratio = ratio best_match = canonical if best_match: canonical_map[name] = best_match groups[best_match].append(name) else: groups[name] = [] canonical_map[name] = name return canonical_map, groups def store_aliases(conn, canonical_map, entity_type): """Store alias mappings in entity_aliases table.""" rows = [] for alias, canonical in canonical_map.items(): if alias != canonical: rows.append((canonical, alias, entity_type, 0.9)) if not rows: return 0 with conn.cursor() as cur: psycopg2.extras.execute_batch( cur, """INSERT INTO entity_aliases (canonical_name, alias_name, entity_type, confidence) VALUES (%s, %s, %s, %s) ON CONFLICT (alias_name, entity_type) DO UPDATE SET canonical_name = EXCLUDED.canonical_name""", rows, page_size=1000, ) conn.commit() return len(rows) def build_cooccurrence(conn, entity_type, canonical_map): """Build co-occurrence relationships per source_section.""" log.info(f"Building co-occurrence for {entity_type}...") # Get all sections with conn.cursor() as cur: cur.execute("SELECT DISTINCT source_section FROM documents ORDER BY source_section") sections = [r[0] for r in cur.fetchall()] total_rels = 0 for section in sections: log.info(f" Processing section: {section}") # Get entities per document for this section with conn.cursor() as cur: cur.execute(""" SELECT e.document_id, array_agg(DISTINCT e.entity_text) as entities FROM entities e JOIN documents d ON d.id = e.document_id WHERE e.entity_type = %s AND d.source_section = %s AND LENGTH(e.entity_text) >= 3 GROUP BY e.document_id HAVING COUNT(DISTINCT e.entity_text) >= 2 """, (entity_type, section)) doc_entities = cur.fetchall() if not doc_entities: continue # Count co-occurrences pair_counts = defaultdict(lambda: {'count': 0, 'docs': set()}) for doc_id, ent_list in doc_entities: # Resolve to canonical names resolved = list(set(canonical_map.get(e, e) for e in ent_list)) resolved.sort() # Limit pairs per document if len(resolved) > MAX_ENTITIES_PER_DOC: resolved = resolved[:MAX_ENTITIES_PER_DOC] for i in range(len(resolved)): for j in range(i + 1, len(resolved)): key = (resolved[i], resolved[j]) pair_counts[key]['count'] += 1 if len(pair_counts[key]['docs']) < 10: pair_counts[key]['docs'].add(doc_id) # Filter: keep pairs with 2+ co-occurrences significant = {k: v for k, v in pair_counts.items() if v['count'] >= 2} if not significant: continue # Insert rows = [] for (ea, eb), data in significant.items(): sample_ids = sorted(list(data['docs']))[:5] rows.append(( ea, entity_type, eb, entity_type, data['count'], len(data['docs']), section, sample_ids, )) with conn.cursor() as cur: psycopg2.extras.execute_batch( cur, """INSERT INTO entity_relationships (entity_a, entity_a_type, entity_b, entity_b_type, co_occurrence_count, document_count, source_section, sample_doc_ids) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (entity_a, entity_a_type, entity_b, entity_b_type, source_section) DO UPDATE SET co_occurrence_count = EXCLUDED.co_occurrence_count, document_count = EXCLUDED.document_count, sample_doc_ids = EXCLUDED.sample_doc_ids""", rows, page_size=500, ) conn.commit() total_rels += len(rows) log.info(f" {section}: {len(rows)} relationships ({len(doc_entities)} docs)") return total_rels def main(): conn = get_conn() for entity_type in ['PERSON', 'ORG']: log.info(f"=== Processing {entity_type} entities ===") # Step 1: Get frequent entities entities = get_frequent_entities(conn, entity_type) log.info(f"Found {len(entities)} frequent {entity_type} entities (>= {MIN_DOCS} docs)") if not entities: continue # Step 2: Entity resolution if len(entities) <= 50000: # Only fuzzy match if manageable log.info("Running entity resolution...") canonical_map, groups = resolve_entities(entities) alias_count = sum(1 for a, c in canonical_map.items() if a != c) log.info(f"Found {alias_count} aliases across {len(groups)} canonical entities") stored = store_aliases(conn, canonical_map, entity_type) log.info(f"Stored {stored} alias mappings") else: log.info(f"Too many entities ({len(entities)}) for fuzzy matching, using exact names") canonical_map = {e[0]: e[0] for e in entities} # Step 3: Co-occurrence total_rels = build_cooccurrence(conn, entity_type, canonical_map) log.info(f"Total {entity_type} relationships: {total_rels}") conn.close() log.info("Done.") if __name__ == "__main__": main()