research-document-archive / ml /10_entity_network.py
datamatters24's picture
Upload ml/10_entity_network.py with huggingface_hub
43cc579 verified
#!/usr/bin/env python3
"""
Phase 5: Entity Network Analysis
1. Entity resolution: group similar PERSON names using fuzzy matching
2. Co-occurrence: count entity pairs that appear in the same document
3. Store in entity_aliases and entity_relationships tables
Focuses on PERSON and ORG entities that appear in 3+ documents.
Runs on: Hetzner CPU
"""
import logging
from collections import defaultdict
from difflib import SequenceMatcher
import psycopg2
import psycopg2.extras
from db import get_conn
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s")
log = logging.getLogger(__name__)
MIN_DOCS = 3 # minimum documents for an entity to be included
FUZZY_THRESHOLD = 0.88 # SequenceMatcher ratio for alias detection
MAX_ENTITIES_PER_DOC = 50 # limit entity pairs per document
BATCH_SIZE = 1000
def get_frequent_entities(conn, entity_type, min_docs=MIN_DOCS):
"""Get entities appearing in at least min_docs documents."""
with conn.cursor() as cur:
cur.execute("""
SELECT entity_text, COUNT(DISTINCT document_id) as doc_count
FROM entities
WHERE entity_type = %s
AND LENGTH(entity_text) >= 3
AND LENGTH(entity_text) <= 100
GROUP BY entity_text
HAVING COUNT(DISTINCT document_id) >= %s
ORDER BY doc_count DESC
""", (entity_type, min_docs))
return cur.fetchall()
def resolve_entities(entities):
"""Find aliases among entity names using fuzzy matching."""
names = [e[0] for e in entities]
doc_counts = {e[0]: e[1] for e in entities}
# Sort by frequency (most common = canonical)
names.sort(key=lambda n: doc_counts.get(n, 0), reverse=True)
canonical_map = {} # alias -> canonical
groups = {} # canonical -> [aliases]
for name in names:
if name in canonical_map:
continue
# Check against existing canonical names
best_match = None
best_ratio = 0.0
name_lower = name.lower().strip()
for canonical in groups:
canonical_lower = canonical.lower().strip()
# Quick length check
if abs(len(name_lower) - len(canonical_lower)) > max(len(name_lower), len(canonical_lower)) * 0.3:
continue
# Check containment first (faster)
if name_lower in canonical_lower or canonical_lower in name_lower:
ratio = 0.92
else:
ratio = SequenceMatcher(None, name_lower, canonical_lower).ratio()
if ratio > best_ratio and ratio >= FUZZY_THRESHOLD:
best_ratio = ratio
best_match = canonical
if best_match:
canonical_map[name] = best_match
groups[best_match].append(name)
else:
groups[name] = []
canonical_map[name] = name
return canonical_map, groups
def store_aliases(conn, canonical_map, entity_type):
"""Store alias mappings in entity_aliases table."""
rows = []
for alias, canonical in canonical_map.items():
if alias != canonical:
rows.append((canonical, alias, entity_type, 0.9))
if not rows:
return 0
with conn.cursor() as cur:
psycopg2.extras.execute_batch(
cur,
"""INSERT INTO entity_aliases (canonical_name, alias_name, entity_type, confidence)
VALUES (%s, %s, %s, %s)
ON CONFLICT (alias_name, entity_type) DO UPDATE SET
canonical_name = EXCLUDED.canonical_name""",
rows,
page_size=1000,
)
conn.commit()
return len(rows)
def build_cooccurrence(conn, entity_type, canonical_map):
"""Build co-occurrence relationships per source_section."""
log.info(f"Building co-occurrence for {entity_type}...")
# Get all sections
with conn.cursor() as cur:
cur.execute("SELECT DISTINCT source_section FROM documents ORDER BY source_section")
sections = [r[0] for r in cur.fetchall()]
total_rels = 0
for section in sections:
log.info(f" Processing section: {section}")
# Get entities per document for this section
with conn.cursor() as cur:
cur.execute("""
SELECT e.document_id, array_agg(DISTINCT e.entity_text) as entities
FROM entities e
JOIN documents d ON d.id = e.document_id
WHERE e.entity_type = %s AND d.source_section = %s
AND LENGTH(e.entity_text) >= 3
GROUP BY e.document_id
HAVING COUNT(DISTINCT e.entity_text) >= 2
""", (entity_type, section))
doc_entities = cur.fetchall()
if not doc_entities:
continue
# Count co-occurrences
pair_counts = defaultdict(lambda: {'count': 0, 'docs': set()})
for doc_id, ent_list in doc_entities:
# Resolve to canonical names
resolved = list(set(canonical_map.get(e, e) for e in ent_list))
resolved.sort()
# Limit pairs per document
if len(resolved) > MAX_ENTITIES_PER_DOC:
resolved = resolved[:MAX_ENTITIES_PER_DOC]
for i in range(len(resolved)):
for j in range(i + 1, len(resolved)):
key = (resolved[i], resolved[j])
pair_counts[key]['count'] += 1
if len(pair_counts[key]['docs']) < 10:
pair_counts[key]['docs'].add(doc_id)
# Filter: keep pairs with 2+ co-occurrences
significant = {k: v for k, v in pair_counts.items() if v['count'] >= 2}
if not significant:
continue
# Insert
rows = []
for (ea, eb), data in significant.items():
sample_ids = sorted(list(data['docs']))[:5]
rows.append((
ea, entity_type, eb, entity_type,
data['count'], len(data['docs']),
section, sample_ids,
))
with conn.cursor() as cur:
psycopg2.extras.execute_batch(
cur,
"""INSERT INTO entity_relationships
(entity_a, entity_a_type, entity_b, entity_b_type,
co_occurrence_count, document_count, source_section, sample_doc_ids)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (entity_a, entity_a_type, entity_b, entity_b_type, source_section)
DO UPDATE SET
co_occurrence_count = EXCLUDED.co_occurrence_count,
document_count = EXCLUDED.document_count,
sample_doc_ids = EXCLUDED.sample_doc_ids""",
rows,
page_size=500,
)
conn.commit()
total_rels += len(rows)
log.info(f" {section}: {len(rows)} relationships ({len(doc_entities)} docs)")
return total_rels
def main():
conn = get_conn()
for entity_type in ['PERSON', 'ORG']:
log.info(f"=== Processing {entity_type} entities ===")
# Step 1: Get frequent entities
entities = get_frequent_entities(conn, entity_type)
log.info(f"Found {len(entities)} frequent {entity_type} entities (>= {MIN_DOCS} docs)")
if not entities:
continue
# Step 2: Entity resolution
if len(entities) <= 50000: # Only fuzzy match if manageable
log.info("Running entity resolution...")
canonical_map, groups = resolve_entities(entities)
alias_count = sum(1 for a, c in canonical_map.items() if a != c)
log.info(f"Found {alias_count} aliases across {len(groups)} canonical entities")
stored = store_aliases(conn, canonical_map, entity_type)
log.info(f"Stored {stored} alias mappings")
else:
log.info(f"Too many entities ({len(entities)}) for fuzzy matching, using exact names")
canonical_map = {e[0]: e[0] for e in entities}
# Step 3: Co-occurrence
total_rels = build_cooccurrence(conn, entity_type, canonical_map)
log.info(f"Total {entity_type} relationships: {total_rels}")
conn.close()
log.info("Done.")
if __name__ == "__main__":
main()