File size: 8,351 Bytes
43cc579
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
#!/usr/bin/env python3
"""
Phase 5: Entity Network Analysis

1. Entity resolution: group similar PERSON names using fuzzy matching
2. Co-occurrence: count entity pairs that appear in the same document
3. Store in entity_aliases and entity_relationships tables

Focuses on PERSON and ORG entities that appear in 3+ documents.

Runs on: Hetzner CPU
"""

import logging
from collections import defaultdict
from difflib import SequenceMatcher

import psycopg2
import psycopg2.extras

from db import get_conn

logging.basicConfig(level=logging.INFO, format="%(asctime)s  %(levelname)-8s  %(message)s")
log = logging.getLogger(__name__)

MIN_DOCS = 3  # minimum documents for an entity to be included
FUZZY_THRESHOLD = 0.88  # SequenceMatcher ratio for alias detection
MAX_ENTITIES_PER_DOC = 50  # limit entity pairs per document
BATCH_SIZE = 1000


def get_frequent_entities(conn, entity_type, min_docs=MIN_DOCS):
    """Get entities appearing in at least min_docs documents."""
    with conn.cursor() as cur:
        cur.execute("""
            SELECT entity_text, COUNT(DISTINCT document_id) as doc_count
            FROM entities
            WHERE entity_type = %s
              AND LENGTH(entity_text) >= 3
              AND LENGTH(entity_text) <= 100
            GROUP BY entity_text
            HAVING COUNT(DISTINCT document_id) >= %s
            ORDER BY doc_count DESC
        """, (entity_type, min_docs))
        return cur.fetchall()


def resolve_entities(entities):
    """Find aliases among entity names using fuzzy matching."""
    names = [e[0] for e in entities]
    doc_counts = {e[0]: e[1] for e in entities}

    # Sort by frequency (most common = canonical)
    names.sort(key=lambda n: doc_counts.get(n, 0), reverse=True)

    canonical_map = {}  # alias -> canonical
    groups = {}  # canonical -> [aliases]

    for name in names:
        if name in canonical_map:
            continue

        # Check against existing canonical names
        best_match = None
        best_ratio = 0.0

        name_lower = name.lower().strip()

        for canonical in groups:
            canonical_lower = canonical.lower().strip()

            # Quick length check
            if abs(len(name_lower) - len(canonical_lower)) > max(len(name_lower), len(canonical_lower)) * 0.3:
                continue

            # Check containment first (faster)
            if name_lower in canonical_lower or canonical_lower in name_lower:
                ratio = 0.92
            else:
                ratio = SequenceMatcher(None, name_lower, canonical_lower).ratio()

            if ratio > best_ratio and ratio >= FUZZY_THRESHOLD:
                best_ratio = ratio
                best_match = canonical

        if best_match:
            canonical_map[name] = best_match
            groups[best_match].append(name)
        else:
            groups[name] = []
            canonical_map[name] = name

    return canonical_map, groups


def store_aliases(conn, canonical_map, entity_type):
    """Store alias mappings in entity_aliases table."""
    rows = []
    for alias, canonical in canonical_map.items():
        if alias != canonical:
            rows.append((canonical, alias, entity_type, 0.9))

    if not rows:
        return 0

    with conn.cursor() as cur:
        psycopg2.extras.execute_batch(
            cur,
            """INSERT INTO entity_aliases (canonical_name, alias_name, entity_type, confidence)
               VALUES (%s, %s, %s, %s)
               ON CONFLICT (alias_name, entity_type) DO UPDATE SET
                 canonical_name = EXCLUDED.canonical_name""",
            rows,
            page_size=1000,
        )
    conn.commit()
    return len(rows)


def build_cooccurrence(conn, entity_type, canonical_map):
    """Build co-occurrence relationships per source_section."""
    log.info(f"Building co-occurrence for {entity_type}...")

    # Get all sections
    with conn.cursor() as cur:
        cur.execute("SELECT DISTINCT source_section FROM documents ORDER BY source_section")
        sections = [r[0] for r in cur.fetchall()]

    total_rels = 0

    for section in sections:
        log.info(f"  Processing section: {section}")

        # Get entities per document for this section
        with conn.cursor() as cur:
            cur.execute("""
                SELECT e.document_id, array_agg(DISTINCT e.entity_text) as entities
                FROM entities e
                JOIN documents d ON d.id = e.document_id
                WHERE e.entity_type = %s AND d.source_section = %s
                  AND LENGTH(e.entity_text) >= 3
                GROUP BY e.document_id
                HAVING COUNT(DISTINCT e.entity_text) >= 2
            """, (entity_type, section))
            doc_entities = cur.fetchall()

        if not doc_entities:
            continue

        # Count co-occurrences
        pair_counts = defaultdict(lambda: {'count': 0, 'docs': set()})

        for doc_id, ent_list in doc_entities:
            # Resolve to canonical names
            resolved = list(set(canonical_map.get(e, e) for e in ent_list))
            resolved.sort()

            # Limit pairs per document
            if len(resolved) > MAX_ENTITIES_PER_DOC:
                resolved = resolved[:MAX_ENTITIES_PER_DOC]

            for i in range(len(resolved)):
                for j in range(i + 1, len(resolved)):
                    key = (resolved[i], resolved[j])
                    pair_counts[key]['count'] += 1
                    if len(pair_counts[key]['docs']) < 10:
                        pair_counts[key]['docs'].add(doc_id)

        # Filter: keep pairs with 2+ co-occurrences
        significant = {k: v for k, v in pair_counts.items() if v['count'] >= 2}

        if not significant:
            continue

        # Insert
        rows = []
        for (ea, eb), data in significant.items():
            sample_ids = sorted(list(data['docs']))[:5]
            rows.append((
                ea, entity_type, eb, entity_type,
                data['count'], len(data['docs']),
                section, sample_ids,
            ))

        with conn.cursor() as cur:
            psycopg2.extras.execute_batch(
                cur,
                """INSERT INTO entity_relationships
                   (entity_a, entity_a_type, entity_b, entity_b_type,
                    co_occurrence_count, document_count, source_section, sample_doc_ids)
                   VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                   ON CONFLICT (entity_a, entity_a_type, entity_b, entity_b_type, source_section)
                   DO UPDATE SET
                     co_occurrence_count = EXCLUDED.co_occurrence_count,
                     document_count = EXCLUDED.document_count,
                     sample_doc_ids = EXCLUDED.sample_doc_ids""",
                rows,
                page_size=500,
            )
        conn.commit()
        total_rels += len(rows)
        log.info(f"    {section}: {len(rows)} relationships ({len(doc_entities)} docs)")

    return total_rels


def main():
    conn = get_conn()

    for entity_type in ['PERSON', 'ORG']:
        log.info(f"=== Processing {entity_type} entities ===")

        # Step 1: Get frequent entities
        entities = get_frequent_entities(conn, entity_type)
        log.info(f"Found {len(entities)} frequent {entity_type} entities (>= {MIN_DOCS} docs)")

        if not entities:
            continue

        # Step 2: Entity resolution
        if len(entities) <= 50000:  # Only fuzzy match if manageable
            log.info("Running entity resolution...")
            canonical_map, groups = resolve_entities(entities)
            alias_count = sum(1 for a, c in canonical_map.items() if a != c)
            log.info(f"Found {alias_count} aliases across {len(groups)} canonical entities")
            stored = store_aliases(conn, canonical_map, entity_type)
            log.info(f"Stored {stored} alias mappings")
        else:
            log.info(f"Too many entities ({len(entities)}) for fuzzy matching, using exact names")
            canonical_map = {e[0]: e[0] for e in entities}

        # Step 3: Co-occurrence
        total_rels = build_cooccurrence(conn, entity_type, canonical_map)
        log.info(f"Total {entity_type} relationships: {total_rels}")

    conn.close()
    log.info("Done.")


if __name__ == "__main__":
    main()