research-document-archive / ml /11_cross_collection.py
datamatters24's picture
Upload ml/11_cross_collection.py with huggingface_hub
303e71a verified
#!/usr/bin/env python3
"""
Phase 8: Cross-Collection Analysis
1. Bridge entities: entities shared across 2+ collections (multi-word names for quality)
2. Collection similarity: Jaccard similarity based on shared entities
3. Topic overlap: compare topic distributions across collections
4. Store results in document_features
Runs on: Hetzner CPU (PostgreSQL queries)
"""
import json
import logging
from collections import defaultdict
from itertools import combinations
import psycopg2.extras
from db import get_conn
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s")
log = logging.getLogger(__name__)
def find_bridge_entities(conn):
"""Find entities that appear across multiple collections."""
log.info("Finding bridge entities...")
for entity_type in ['PERSON', 'ORG']:
with conn.cursor() as cur:
if entity_type == 'PERSON':
cur.execute("""
SELECT entity_text,
COUNT(DISTINCT d.source_section) as num_collections,
array_agg(DISTINCT d.source_section ORDER BY d.source_section) as collections,
COUNT(DISTINCT d.id) as doc_count
FROM entities e
JOIN documents d ON d.id = e.document_id
WHERE e.entity_type = %s AND entity_text LIKE %s
GROUP BY entity_text
HAVING COUNT(DISTINCT d.source_section) >= 2
ORDER BY num_collections DESC, doc_count DESC
LIMIT 500
""", (entity_type, '% %'))
else:
cur.execute("""
SELECT entity_text,
COUNT(DISTINCT d.source_section) as num_collections,
array_agg(DISTINCT d.source_section ORDER BY d.source_section) as collections,
COUNT(DISTINCT d.id) as doc_count
FROM entities e
JOIN documents d ON d.id = e.document_id
WHERE e.entity_type = %s AND LENGTH(entity_text) >= 4
GROUP BY entity_text
HAVING COUNT(DISTINCT d.source_section) >= 2
ORDER BY num_collections DESC, doc_count DESC
LIMIT 500
""", (entity_type,))
bridges = cur.fetchall()
log.info(f" {entity_type}: {len(bridges)} bridge entities found")
# Store as a single feature
bridge_data = []
for text, num_cols, cols, doc_count in bridges:
bridge_data.append({
'entity': text,
'type': entity_type,
'collections': num_cols,
'sections': cols if isinstance(cols, list) else list(cols),
'doc_count': doc_count,
})
# Store top bridges per type
with conn.cursor() as cur:
cur.execute("""
INSERT INTO analytics_cache (key, value)
VALUES (%s, %s::jsonb)
ON CONFLICT (key) DO UPDATE SET
value = EXCLUDED.value,
updated_at = NOW()
""", (
f'bridge_entities_{entity_type.lower()}',
json.dumps(bridge_data[:200]),
))
conn.commit()
return bridges
def collection_similarity(conn):
"""Compute Jaccard similarity between collections based on shared entities."""
log.info("Computing collection similarity...")
sections = []
with conn.cursor() as cur:
cur.execute("SELECT DISTINCT source_section FROM documents ORDER BY source_section")
sections = [r[0] for r in cur.fetchall()]
# Get entity sets per collection (multi-word PERSONs + ORGs)
entity_sets = {}
for section in sections:
with conn.cursor() as cur:
cur.execute("""
SELECT DISTINCT LOWER(e.entity_text)
FROM entities e
JOIN documents d ON d.id = e.document_id
WHERE d.source_section = %s
AND e.entity_type IN ('PERSON', 'ORG')
AND LENGTH(e.entity_text) >= 4
AND e.entity_text LIKE '%% %%'
""", (section,))
entity_sets[section] = set(r[0] for r in cur.fetchall())
log.info(f" Entity set sizes: {', '.join(f'{s}={len(v)}' for s, v in entity_sets.items())}")
# Compute pairwise Jaccard
similarities = []
for a, b in combinations(sections, 2):
set_a = entity_sets.get(a, set())
set_b = entity_sets.get(b, set())
if not set_a or not set_b:
continue
intersection = len(set_a & set_b)
union = len(set_a | set_b)
jaccard = intersection / union if union > 0 else 0
shared_entities = sorted(set_a & set_b, key=lambda x: len(x), reverse=True)[:20]
similarities.append({
'section_a': a,
'section_b': b,
'jaccard': round(jaccard, 4),
'shared_count': intersection,
'union_count': union,
'top_shared': shared_entities,
})
similarities.sort(key=lambda x: x['jaccard'], reverse=True)
# Store
with conn.cursor() as cur:
cur.execute("""
INSERT INTO analytics_cache (key, value)
VALUES (%s, %s::jsonb)
ON CONFLICT (key) DO UPDATE SET
value = EXCLUDED.value, updated_at = NOW()
""", ('collection_similarity', json.dumps(similarities)))
conn.commit()
for s in similarities[:10]:
log.info(f" {s['section_a']} <-> {s['section_b']}: Jaccard={s['jaccard']:.4f} ({s['shared_count']} shared)")
return similarities
def topic_overlap(conn):
"""Compare topic distributions across collections."""
log.info("Computing topic overlap...")
with conn.cursor() as cur:
cur.execute("""
SELECT d.source_section, key as topic, COUNT(*) as doc_count
FROM document_features df
JOIN documents d ON d.id = df.document_id
CROSS JOIN LATERAL jsonb_each(df.feature_json::jsonb)
WHERE df.feature_name = 'topic_distribution'
AND (value::text)::float > 0.5
GROUP BY d.source_section, key
ORDER BY d.source_section, doc_count DESC
""")
rows = cur.fetchall()
# Build per-section topic profiles
profiles = defaultdict(dict)
for section, topic, count in rows:
profiles[section][topic] = count
# Normalize to proportions
for section in profiles:
total = sum(profiles[section].values())
if total > 0:
profiles[section] = {k: round(v/total, 4) for k, v in profiles[section].items()}
# Store
with conn.cursor() as cur:
cur.execute("""
INSERT INTO analytics_cache (key, value)
VALUES (%s, %s::jsonb)
ON CONFLICT (key) DO UPDATE SET
value = EXCLUDED.value, updated_at = NOW()
""", ('topic_profiles', json.dumps(dict(profiles))))
conn.commit()
for section, topics in profiles.items():
top3 = sorted(topics.items(), key=lambda x: x[1], reverse=True)[:3]
log.info(f" {section}: {', '.join(f'{t}={p:.1%}' for t, p in top3)}")
return profiles
def collection_stats_summary(conn):
"""Build comprehensive stats per collection."""
log.info("Building collection stats summary...")
with conn.cursor() as cur:
cur.execute("""
SELECT d.source_section,
COUNT(DISTINCT d.id) as docs,
SUM(d.total_pages) as pages,
COUNT(DISTINCT CASE WHEN df.feature_name = 'redaction_summary' AND df.feature_value > 0 THEN d.id END) as redacted_docs,
COUNT(DISTINCT CASE WHEN df2.feature_json::text LIKE '%%TOP SECRET%%' THEN d.id END) as top_secret,
COUNT(DISTINCT CASE WHEN df2.feature_json::text LIKE '%%SECRET%%' AND df2.feature_json::text NOT LIKE '%%TOP SECRET%%' THEN d.id END) as secret
FROM documents d
LEFT JOIN document_features df ON df.document_id = d.id AND df.feature_name = 'redaction_summary'
LEFT JOIN document_features df2 ON df2.document_id = d.id AND df2.feature_name = 'forensic_metadata'
GROUP BY d.source_section
ORDER BY docs DESC
""")
stats = cur.fetchall()
summary = {}
for section, docs, pages, redacted, top_secret, secret in stats:
summary[section] = {
'docs': docs, 'pages': pages,
'redacted_docs': redacted,
'top_secret': top_secret, 'secret': secret,
}
log.info(f" {section}: {docs} docs, {pages} pages, {redacted} redacted, {top_secret} TOP SECRET, {secret} SECRET")
with conn.cursor() as cur:
cur.execute("""
INSERT INTO analytics_cache (key, value)
VALUES (%s, %s::jsonb)
ON CONFLICT (key) DO UPDATE SET
value = EXCLUDED.value, updated_at = NOW()
""", ('collection_summary', json.dumps(summary)))
conn.commit()
return summary
def main():
conn = get_conn()
find_bridge_entities(conn)
collection_similarity(conn)
topic_overlap(conn)
collection_stats_summary(conn)
conn.close()
log.info("Cross-collection analysis complete.")
if __name__ == "__main__":
main()