datamatters24 commited on
Commit
cf5f4c0
·
verified ·
1 Parent(s): aa8f7e4

Upload notebooks/04_forensic/42_duplicate_detection.ipynb with huggingface_hub

Browse files
notebooks/04_forensic/42_duplicate_detection.ipynb ADDED
@@ -0,0 +1,214 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ {
2
+ "cells": [
3
+ {
4
+ "cell_type": "markdown",
5
+ "metadata": {},
6
+ "source": [
7
+ "# 42 - Duplicate Detection via Page Embeddings\n",
8
+ "\n",
9
+ "Pipeline notebook that finds near-duplicate pages using pgvector cosine distance.\n",
10
+ "\n",
11
+ "Uses `CROSS JOIN LATERAL` to efficiently find the top-5 nearest neighbours per page\n",
12
+ "and filters by a cosine similarity threshold.\n",
13
+ "\n",
14
+ "**Outputs:**\n",
15
+ "- `duplicate_pairs` table rows\n",
16
+ "- `page_features`: `is_duplicate` = 1.0 for flagged pages"
17
+ ]
18
+ },
19
+ {
20
+ "cell_type": "code",
21
+ "execution_count": null,
22
+ "metadata": {
23
+ "tags": [
24
+ "parameters"
25
+ ]
26
+ },
27
+ "outputs": [],
28
+ "source": [
29
+ "# Parameters\n",
30
+ "source_section = None\n",
31
+ "similarity_threshold = 0.95\n",
32
+ "batch_size = 10000"
33
+ ]
34
+ },
35
+ {
36
+ "cell_type": "code",
37
+ "execution_count": null,
38
+ "metadata": {},
39
+ "outputs": [],
40
+ "source": [
41
+ "import sys, warnings, time\n",
42
+ "sys.path.insert(0, '/opt/epstein_env/research')\n",
43
+ "warnings.filterwarnings('ignore')\n",
44
+ "\n",
45
+ "import pandas as pd\n",
46
+ "import numpy as np\n",
47
+ "\n",
48
+ "from research_lib.config import COLLECTIONS\n",
49
+ "from research_lib.db import fetch_df, fetch_all, get_conn, bulk_insert, upsert_feature\n",
50
+ "from research_lib.incremental import start_run, finish_run\n",
51
+ "\n",
52
+ "print('Libraries loaded.')"
53
+ ]
54
+ },
55
+ {
56
+ "cell_type": "code",
57
+ "execution_count": null,
58
+ "metadata": {},
59
+ "outputs": [],
60
+ "source": [
61
+ "# ---- Start run ----\n",
62
+ "PIPELINE = 'duplicate_detection'\n",
63
+ "run_id = start_run(PIPELINE, source_section=source_section, parameters={\n",
64
+ " 'similarity_threshold': similarity_threshold,\n",
65
+ " 'batch_size': batch_size,\n",
66
+ "})\n",
67
+ "\n",
68
+ "# Get page IDs with embeddings\n",
69
+ "section_filter = ''\n",
70
+ "params = []\n",
71
+ "if source_section:\n",
72
+ " section_filter = 'AND d.source_section = %s'\n",
73
+ " params.append(source_section)\n",
74
+ "\n",
75
+ "page_ids_df = fetch_df(f\"\"\"\n",
76
+ " SELECT p.id\n",
77
+ " FROM pages p\n",
78
+ " JOIN documents d ON d.id = p.document_id\n",
79
+ " WHERE p.embedding IS NOT NULL\n",
80
+ " {section_filter}\n",
81
+ " ORDER BY p.id\n",
82
+ "\"\"\", params or None)\n",
83
+ "\n",
84
+ "all_page_ids = page_ids_df['id'].tolist()\n",
85
+ "print(f'Total pages with embeddings: {len(all_page_ids)}')"
86
+ ]
87
+ },
88
+ {
89
+ "cell_type": "code",
90
+ "execution_count": null,
91
+ "metadata": {},
92
+ "outputs": [],
93
+ "source": [
94
+ "# ---- Process in batches using CROSS JOIN LATERAL ----\n",
95
+ "total_pairs = 0\n",
96
+ "total_batches = (len(all_page_ids) + batch_size - 1) // batch_size\n",
97
+ "all_duplicate_page_ids = set()\n",
98
+ "\n",
99
+ "for batch_idx in range(total_batches):\n",
100
+ " start = batch_idx * batch_size\n",
101
+ " end = min(start + batch_size, len(all_page_ids))\n",
102
+ " batch_ids = all_page_ids[start:end]\n",
103
+ " print(f'Batch {batch_idx + 1}/{total_batches}: pages {start}-{end - 1}')\n",
104
+ "\n",
105
+ " t0 = time.time()\n",
106
+ "\n",
107
+ " # Build the query -- find top-5 nearest neighbours for each page in batch\n",
108
+ " id_list = ','.join(str(i) for i in batch_ids)\n",
109
+ " sql = f\"\"\"\n",
110
+ " SELECT p1.id AS page_id_a, p2.id AS page_id_b,\n",
111
+ " 1 - (p1.embedding <=> p2.embedding) AS similarity\n",
112
+ " FROM pages p1\n",
113
+ " CROSS JOIN LATERAL (\n",
114
+ " SELECT id, embedding FROM pages\n",
115
+ " WHERE id > p1.id AND embedding IS NOT NULL\n",
116
+ " ORDER BY p1.embedding <=> embedding\n",
117
+ " LIMIT 5\n",
118
+ " ) p2\n",
119
+ " WHERE p1.id IN ({id_list})\n",
120
+ " AND (1 - (p1.embedding <=> p2.embedding)) >= %s\n",
121
+ " \"\"\"\n",
122
+ "\n",
123
+ " pairs_df = fetch_df(sql, [similarity_threshold])\n",
124
+ " elapsed = time.time() - t0\n",
125
+ " print(f' Found {len(pairs_df)} pairs in {elapsed:.1f}s')\n",
126
+ "\n",
127
+ " if len(pairs_df) > 0:\n",
128
+ " # Insert into duplicate_pairs\n",
129
+ " pair_rows = [\n",
130
+ " (int(r.page_id_a), int(r.page_id_b), float(r.similarity))\n",
131
+ " for r in pairs_df.itertuples()\n",
132
+ " ]\n",
133
+ " n = bulk_insert(\n",
134
+ " 'duplicate_pairs',\n",
135
+ " ['page_id_a', 'page_id_b', 'similarity'],\n",
136
+ " pair_rows,\n",
137
+ " on_conflict='(page_id_a, page_id_b) DO NOTHING',\n",
138
+ " )\n",
139
+ " print(f' Inserted {n} duplicate_pairs rows')\n",
140
+ " total_pairs += n\n",
141
+ "\n",
142
+ " # Track duplicate page IDs\n",
143
+ " all_duplicate_page_ids.update(pairs_df['page_id_a'].tolist())\n",
144
+ " all_duplicate_page_ids.update(pairs_df['page_id_b'].tolist())\n",
145
+ "\n",
146
+ "print(f'\\nTotal duplicate pairs inserted: {total_pairs}')\n",
147
+ "print(f'Unique pages flagged as duplicates: {len(all_duplicate_page_ids)}')"
148
+ ]
149
+ },
150
+ {
151
+ "cell_type": "code",
152
+ "execution_count": null,
153
+ "metadata": {},
154
+ "outputs": [],
155
+ "source": [
156
+ "# ---- Flag pages in page_features ----\n",
157
+ "if all_duplicate_page_ids:\n",
158
+ " dup_rows = [\n",
159
+ " (int(pid), 'is_duplicate', 1.0, None)\n",
160
+ " for pid in all_duplicate_page_ids\n",
161
+ " ]\n",
162
+ " n = upsert_feature(\n",
163
+ " 'page_features',\n",
164
+ " ['page_id', 'feature_name'],\n",
165
+ " ['feature_value', 'feature_json'],\n",
166
+ " dup_rows,\n",
167
+ " )\n",
168
+ " print(f'Flagged {n} pages as is_duplicate in page_features')\n",
169
+ "\n",
170
+ "finish_run(run_id, documents_processed=len(all_page_ids))\n",
171
+ "print(f'Run {run_id} complete.')"
172
+ ]
173
+ },
174
+ {
175
+ "cell_type": "code",
176
+ "execution_count": null,
177
+ "metadata": {},
178
+ "outputs": [],
179
+ "source": [
180
+ "# ---- Stats: documents with duplicates ----\n",
181
+ "doc_dup_df = fetch_df(\"\"\"\n",
182
+ " SELECT d.source_section,\n",
183
+ " COUNT(DISTINCT p.document_id) AS docs_with_duplicates,\n",
184
+ " COUNT(*) AS duplicate_pages\n",
185
+ " FROM page_features pf\n",
186
+ " JOIN pages p ON p.id = pf.page_id\n",
187
+ " JOIN documents d ON d.id = p.document_id\n",
188
+ " WHERE pf.feature_name = 'is_duplicate' AND pf.feature_value = 1.0\n",
189
+ " GROUP BY d.source_section\n",
190
+ " ORDER BY docs_with_duplicates DESC\n",
191
+ "\"\"\")\n",
192
+ "\n",
193
+ "print('Documents with duplicate pages by collection:')\n",
194
+ "print(doc_dup_df.to_string(index=False))\n",
195
+ "\n",
196
+ "total_dup_count = fetch_df('SELECT COUNT(*) AS cnt FROM duplicate_pairs')\n",
197
+ "print(f\"\\nTotal duplicate pairs in database: {total_dup_count['cnt'].iloc[0]}\")"
198
+ ]
199
+ }
200
+ ],
201
+ "metadata": {
202
+ "kernelspec": {
203
+ "display_name": "Python 3",
204
+ "language": "python",
205
+ "name": "python3"
206
+ },
207
+ "language_info": {
208
+ "name": "python",
209
+ "version": "3.10.0"
210
+ }
211
+ },
212
+ "nbformat": 4,
213
+ "nbformat_minor": 5
214
+ }