| |
| """ |
| Phase 8: Cross-Collection Analysis |
| |
| 1. Bridge entities: entities shared across 2+ collections (multi-word names for quality) |
| 2. Collection similarity: Jaccard similarity based on shared entities |
| 3. Topic overlap: compare topic distributions across collections |
| 4. Store results in document_features |
| |
| Runs on: Hetzner CPU (PostgreSQL queries) |
| """ |
|
|
| import json |
| import logging |
| from collections import defaultdict |
| from itertools import combinations |
|
|
| import psycopg2.extras |
| from db import get_conn |
|
|
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s") |
| log = logging.getLogger(__name__) |
|
|
|
|
| def find_bridge_entities(conn): |
| """Find entities that appear across multiple collections.""" |
| log.info("Finding bridge entities...") |
|
|
| for entity_type in ['PERSON', 'ORG']: |
| with conn.cursor() as cur: |
| if entity_type == 'PERSON': |
| cur.execute(""" |
| SELECT entity_text, |
| COUNT(DISTINCT d.source_section) as num_collections, |
| array_agg(DISTINCT d.source_section ORDER BY d.source_section) as collections, |
| COUNT(DISTINCT d.id) as doc_count |
| FROM entities e |
| JOIN documents d ON d.id = e.document_id |
| WHERE e.entity_type = %s AND entity_text LIKE %s |
| GROUP BY entity_text |
| HAVING COUNT(DISTINCT d.source_section) >= 2 |
| ORDER BY num_collections DESC, doc_count DESC |
| LIMIT 500 |
| """, (entity_type, '% %')) |
| else: |
| cur.execute(""" |
| SELECT entity_text, |
| COUNT(DISTINCT d.source_section) as num_collections, |
| array_agg(DISTINCT d.source_section ORDER BY d.source_section) as collections, |
| COUNT(DISTINCT d.id) as doc_count |
| FROM entities e |
| JOIN documents d ON d.id = e.document_id |
| WHERE e.entity_type = %s AND LENGTH(entity_text) >= 4 |
| GROUP BY entity_text |
| HAVING COUNT(DISTINCT d.source_section) >= 2 |
| ORDER BY num_collections DESC, doc_count DESC |
| LIMIT 500 |
| """, (entity_type,)) |
| bridges = cur.fetchall() |
|
|
| log.info(f" {entity_type}: {len(bridges)} bridge entities found") |
|
|
| |
| bridge_data = [] |
| for text, num_cols, cols, doc_count in bridges: |
| bridge_data.append({ |
| 'entity': text, |
| 'type': entity_type, |
| 'collections': num_cols, |
| 'sections': cols if isinstance(cols, list) else list(cols), |
| 'doc_count': doc_count, |
| }) |
|
|
| |
| with conn.cursor() as cur: |
| cur.execute(""" |
| INSERT INTO analytics_cache (key, value) |
| VALUES (%s, %s::jsonb) |
| ON CONFLICT (key) DO UPDATE SET |
| value = EXCLUDED.value, |
| updated_at = NOW() |
| """, ( |
| f'bridge_entities_{entity_type.lower()}', |
| json.dumps(bridge_data[:200]), |
| )) |
| conn.commit() |
|
|
| return bridges |
|
|
|
|
| def collection_similarity(conn): |
| """Compute Jaccard similarity between collections based on shared entities.""" |
| log.info("Computing collection similarity...") |
|
|
| sections = [] |
| with conn.cursor() as cur: |
| cur.execute("SELECT DISTINCT source_section FROM documents ORDER BY source_section") |
| sections = [r[0] for r in cur.fetchall()] |
|
|
| |
| entity_sets = {} |
| for section in sections: |
| with conn.cursor() as cur: |
| cur.execute(""" |
| SELECT DISTINCT LOWER(e.entity_text) |
| FROM entities e |
| JOIN documents d ON d.id = e.document_id |
| WHERE d.source_section = %s |
| AND e.entity_type IN ('PERSON', 'ORG') |
| AND LENGTH(e.entity_text) >= 4 |
| AND e.entity_text LIKE '%% %%' |
| """, (section,)) |
| entity_sets[section] = set(r[0] for r in cur.fetchall()) |
|
|
| log.info(f" Entity set sizes: {', '.join(f'{s}={len(v)}' for s, v in entity_sets.items())}") |
|
|
| |
| similarities = [] |
| for a, b in combinations(sections, 2): |
| set_a = entity_sets.get(a, set()) |
| set_b = entity_sets.get(b, set()) |
| if not set_a or not set_b: |
| continue |
|
|
| intersection = len(set_a & set_b) |
| union = len(set_a | set_b) |
| jaccard = intersection / union if union > 0 else 0 |
|
|
| shared_entities = sorted(set_a & set_b, key=lambda x: len(x), reverse=True)[:20] |
|
|
| similarities.append({ |
| 'section_a': a, |
| 'section_b': b, |
| 'jaccard': round(jaccard, 4), |
| 'shared_count': intersection, |
| 'union_count': union, |
| 'top_shared': shared_entities, |
| }) |
|
|
| similarities.sort(key=lambda x: x['jaccard'], reverse=True) |
|
|
| |
| with conn.cursor() as cur: |
| cur.execute(""" |
| INSERT INTO analytics_cache (key, value) |
| VALUES (%s, %s::jsonb) |
| ON CONFLICT (key) DO UPDATE SET |
| value = EXCLUDED.value, updated_at = NOW() |
| """, ('collection_similarity', json.dumps(similarities))) |
| conn.commit() |
|
|
| for s in similarities[:10]: |
| log.info(f" {s['section_a']} <-> {s['section_b']}: Jaccard={s['jaccard']:.4f} ({s['shared_count']} shared)") |
|
|
| return similarities |
|
|
|
|
| def topic_overlap(conn): |
| """Compare topic distributions across collections.""" |
| log.info("Computing topic overlap...") |
|
|
| with conn.cursor() as cur: |
| cur.execute(""" |
| SELECT d.source_section, key as topic, COUNT(*) as doc_count |
| FROM document_features df |
| JOIN documents d ON d.id = df.document_id |
| CROSS JOIN LATERAL jsonb_each(df.feature_json::jsonb) |
| WHERE df.feature_name = 'topic_distribution' |
| AND (value::text)::float > 0.5 |
| GROUP BY d.source_section, key |
| ORDER BY d.source_section, doc_count DESC |
| """) |
| rows = cur.fetchall() |
|
|
| |
| profiles = defaultdict(dict) |
| for section, topic, count in rows: |
| profiles[section][topic] = count |
|
|
| |
| for section in profiles: |
| total = sum(profiles[section].values()) |
| if total > 0: |
| profiles[section] = {k: round(v/total, 4) for k, v in profiles[section].items()} |
|
|
| |
| with conn.cursor() as cur: |
| cur.execute(""" |
| INSERT INTO analytics_cache (key, value) |
| VALUES (%s, %s::jsonb) |
| ON CONFLICT (key) DO UPDATE SET |
| value = EXCLUDED.value, updated_at = NOW() |
| """, ('topic_profiles', json.dumps(dict(profiles)))) |
| conn.commit() |
|
|
| for section, topics in profiles.items(): |
| top3 = sorted(topics.items(), key=lambda x: x[1], reverse=True)[:3] |
| log.info(f" {section}: {', '.join(f'{t}={p:.1%}' for t, p in top3)}") |
|
|
| return profiles |
|
|
|
|
| def collection_stats_summary(conn): |
| """Build comprehensive stats per collection.""" |
| log.info("Building collection stats summary...") |
|
|
| with conn.cursor() as cur: |
| cur.execute(""" |
| SELECT d.source_section, |
| COUNT(DISTINCT d.id) as docs, |
| SUM(d.total_pages) as pages, |
| COUNT(DISTINCT CASE WHEN df.feature_name = 'redaction_summary' AND df.feature_value > 0 THEN d.id END) as redacted_docs, |
| COUNT(DISTINCT CASE WHEN df2.feature_json::text LIKE '%%TOP SECRET%%' THEN d.id END) as top_secret, |
| COUNT(DISTINCT CASE WHEN df2.feature_json::text LIKE '%%SECRET%%' AND df2.feature_json::text NOT LIKE '%%TOP SECRET%%' THEN d.id END) as secret |
| FROM documents d |
| LEFT JOIN document_features df ON df.document_id = d.id AND df.feature_name = 'redaction_summary' |
| LEFT JOIN document_features df2 ON df2.document_id = d.id AND df2.feature_name = 'forensic_metadata' |
| GROUP BY d.source_section |
| ORDER BY docs DESC |
| """) |
| stats = cur.fetchall() |
|
|
| summary = {} |
| for section, docs, pages, redacted, top_secret, secret in stats: |
| summary[section] = { |
| 'docs': docs, 'pages': pages, |
| 'redacted_docs': redacted, |
| 'top_secret': top_secret, 'secret': secret, |
| } |
| log.info(f" {section}: {docs} docs, {pages} pages, {redacted} redacted, {top_secret} TOP SECRET, {secret} SECRET") |
|
|
| with conn.cursor() as cur: |
| cur.execute(""" |
| INSERT INTO analytics_cache (key, value) |
| VALUES (%s, %s::jsonb) |
| ON CONFLICT (key) DO UPDATE SET |
| value = EXCLUDED.value, updated_at = NOW() |
| """, ('collection_summary', json.dumps(summary))) |
| conn.commit() |
|
|
| return summary |
|
|
|
|
| def main(): |
| conn = get_conn() |
|
|
| find_bridge_entities(conn) |
| collection_similarity(conn) |
| topic_overlap(conn) |
| collection_stats_summary(conn) |
|
|
| conn.close() |
| log.info("Cross-collection analysis complete.") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|