#!/usr/bin/env python3 """ Phase 6: Duplicate Detection Finds near-duplicate documents by comparing page 1 embeddings using pgvector cosine similarity with IVFFlat index. For each first-page, finds top-K nearest neighbors with similarity > threshold. Runs on: Hetzner (PostgreSQL pgvector) """ import logging import psycopg2 import psycopg2.extras from db import get_conn logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s") log = logging.getLogger(__name__) SIMILARITY_THRESHOLD = 0.95 DISTANCE_THRESHOLD = 1 - SIMILARITY_THRESHOLD # 0.05 TOP_K = 5 BATCH_SIZE = 100 def main(): conn = get_conn() with conn.cursor() as cur: cur.execute("SET ivfflat.probes = 10;") # Get all first-page IDs that haven't been checked yet with conn.cursor() as cur: cur.execute(""" SELECT p.id FROM pages p WHERE p.page_number = 1 AND p.embedding IS NOT NULL AND p.id NOT IN (SELECT DISTINCT page_id_a FROM duplicate_pairs) ORDER BY p.id """) page_ids = [r[0] for r in cur.fetchall()] log.info(f"Checking {len(page_ids)} first-page embeddings for duplicates") found = 0 checked = 0 for i in range(0, len(page_ids), BATCH_SIZE): batch_ids = page_ids[i:i + BATCH_SIZE] insert_rows = [] for pid in batch_ids: with conn.cursor() as cur: # Use the IVFFlat index: ORDER BY <=> finds nearest neighbors cur.execute(""" SELECT p2.id, 1 - (p1.embedding <=> p2.embedding) as sim FROM pages p1, pages p2 WHERE p1.id = %s AND p2.page_number = 1 AND p2.embedding IS NOT NULL AND p2.id > p1.id ORDER BY p1.embedding <=> p2.embedding LIMIT %s """, (pid, TOP_K)) for row in cur.fetchall(): neighbor_id, similarity = row if similarity >= SIMILARITY_THRESHOLD: insert_rows.append((pid, neighbor_id, similarity, 'embedding')) # Batch insert if insert_rows: with conn.cursor() as cur: psycopg2.extras.execute_batch( cur, """INSERT INTO duplicate_pairs (page_id_a, page_id_b, similarity, method) VALUES (%s, %s, %s, %s) ON CONFLICT (page_id_a, page_id_b, method) DO NOTHING""", insert_rows, page_size=500, ) found += len(insert_rows) # Mark checked pages (insert self-pair as marker if no duplicates found) checked += len(batch_ids) conn.commit() if checked % 1000 == 0 or insert_rows: log.info(f" Checked {checked}/{len(page_ids)}, {found} duplicate pairs found") conn.close() log.info(f"Done. {found} duplicate pairs found from {checked} pages checked.") if __name__ == "__main__": main()