| |
| """ |
| Phase 6: Duplicate Detection |
| |
| Finds near-duplicate documents by comparing page 1 embeddings |
| using pgvector cosine similarity with IVFFlat index. |
| |
| For each first-page, finds top-K nearest neighbors with similarity > threshold. |
| |
| Runs on: Hetzner (PostgreSQL pgvector) |
| """ |
|
|
| import logging |
| import psycopg2 |
| import psycopg2.extras |
| from db import get_conn |
|
|
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s") |
| log = logging.getLogger(__name__) |
|
|
| SIMILARITY_THRESHOLD = 0.95 |
| DISTANCE_THRESHOLD = 1 - SIMILARITY_THRESHOLD |
| TOP_K = 5 |
| BATCH_SIZE = 100 |
|
|
|
|
| def main(): |
| conn = get_conn() |
|
|
| with conn.cursor() as cur: |
| cur.execute("SET ivfflat.probes = 10;") |
|
|
| |
| with conn.cursor() as cur: |
| cur.execute(""" |
| SELECT p.id FROM pages p |
| WHERE p.page_number = 1 AND p.embedding IS NOT NULL |
| AND p.id NOT IN (SELECT DISTINCT page_id_a FROM duplicate_pairs) |
| ORDER BY p.id |
| """) |
| page_ids = [r[0] for r in cur.fetchall()] |
|
|
| log.info(f"Checking {len(page_ids)} first-page embeddings for duplicates") |
|
|
| found = 0 |
| checked = 0 |
|
|
| for i in range(0, len(page_ids), BATCH_SIZE): |
| batch_ids = page_ids[i:i + BATCH_SIZE] |
| insert_rows = [] |
|
|
| for pid in batch_ids: |
| with conn.cursor() as cur: |
| |
| cur.execute(""" |
| SELECT p2.id, 1 - (p1.embedding <=> p2.embedding) as sim |
| FROM pages p1, pages p2 |
| WHERE p1.id = %s |
| AND p2.page_number = 1 |
| AND p2.embedding IS NOT NULL |
| AND p2.id > p1.id |
| ORDER BY p1.embedding <=> p2.embedding |
| LIMIT %s |
| """, (pid, TOP_K)) |
|
|
| for row in cur.fetchall(): |
| neighbor_id, similarity = row |
| if similarity >= SIMILARITY_THRESHOLD: |
| insert_rows.append((pid, neighbor_id, similarity, 'embedding')) |
|
|
| |
| if insert_rows: |
| with conn.cursor() as cur: |
| psycopg2.extras.execute_batch( |
| cur, |
| """INSERT INTO duplicate_pairs (page_id_a, page_id_b, similarity, method) |
| VALUES (%s, %s, %s, %s) |
| ON CONFLICT (page_id_a, page_id_b, method) DO NOTHING""", |
| insert_rows, |
| page_size=500, |
| ) |
| found += len(insert_rows) |
|
|
| |
| checked += len(batch_ids) |
| conn.commit() |
|
|
| if checked % 1000 == 0 or insert_rows: |
| log.info(f" Checked {checked}/{len(page_ids)}, {found} duplicate pairs found") |
|
|
| conn.close() |
| log.info(f"Done. {found} duplicate pairs found from {checked} pages checked.") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|