research-document-archive / ml /08_detect_duplicates.py
datamatters24's picture
Upload ml/08_detect_duplicates.py with huggingface_hub
7534111 verified
#!/usr/bin/env python3
"""
Phase 6: Duplicate Detection
Finds near-duplicate documents by comparing page 1 embeddings
using pgvector cosine similarity with IVFFlat index.
For each first-page, finds top-K nearest neighbors with similarity > threshold.
Runs on: Hetzner (PostgreSQL pgvector)
"""
import logging
import psycopg2
import psycopg2.extras
from db import get_conn
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s")
log = logging.getLogger(__name__)
SIMILARITY_THRESHOLD = 0.95
DISTANCE_THRESHOLD = 1 - SIMILARITY_THRESHOLD # 0.05
TOP_K = 5
BATCH_SIZE = 100
def main():
conn = get_conn()
with conn.cursor() as cur:
cur.execute("SET ivfflat.probes = 10;")
# Get all first-page IDs that haven't been checked yet
with conn.cursor() as cur:
cur.execute("""
SELECT p.id FROM pages p
WHERE p.page_number = 1 AND p.embedding IS NOT NULL
AND p.id NOT IN (SELECT DISTINCT page_id_a FROM duplicate_pairs)
ORDER BY p.id
""")
page_ids = [r[0] for r in cur.fetchall()]
log.info(f"Checking {len(page_ids)} first-page embeddings for duplicates")
found = 0
checked = 0
for i in range(0, len(page_ids), BATCH_SIZE):
batch_ids = page_ids[i:i + BATCH_SIZE]
insert_rows = []
for pid in batch_ids:
with conn.cursor() as cur:
# Use the IVFFlat index: ORDER BY <=> finds nearest neighbors
cur.execute("""
SELECT p2.id, 1 - (p1.embedding <=> p2.embedding) as sim
FROM pages p1, pages p2
WHERE p1.id = %s
AND p2.page_number = 1
AND p2.embedding IS NOT NULL
AND p2.id > p1.id
ORDER BY p1.embedding <=> p2.embedding
LIMIT %s
""", (pid, TOP_K))
for row in cur.fetchall():
neighbor_id, similarity = row
if similarity >= SIMILARITY_THRESHOLD:
insert_rows.append((pid, neighbor_id, similarity, 'embedding'))
# Batch insert
if insert_rows:
with conn.cursor() as cur:
psycopg2.extras.execute_batch(
cur,
"""INSERT INTO duplicate_pairs (page_id_a, page_id_b, similarity, method)
VALUES (%s, %s, %s, %s)
ON CONFLICT (page_id_a, page_id_b, method) DO NOTHING""",
insert_rows,
page_size=500,
)
found += len(insert_rows)
# Mark checked pages (insert self-pair as marker if no duplicates found)
checked += len(batch_ids)
conn.commit()
if checked % 1000 == 0 or insert_rows:
log.info(f" Checked {checked}/{len(page_ids)}, {found} duplicate pairs found")
conn.close()
log.info(f"Done. {found} duplicate pairs found from {checked} pages checked.")
if __name__ == "__main__":
main()