File size: 8,305 Bytes
bf64b03
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
"""

Deduplication: MinHash LSH for near-duplicate detection.

"""

import hashlib
import random
from typing import List, Set, Tuple, Optional
from dataclasses import dataclass


@dataclass
class MinHashSignature:
    """MinHash signature for a document."""
    hash_values: List[int]
    doc_id: str


class MinHashLSH:
    """

    MinHash LSH for near-duplicate detection.

    Uses shingling and MinHash to estimate Jaccard similarity.

    """

    def __init__(

        self,

        num_permutations: int = 128,

        threshold: float = 0.8,

        bands: int = 16,

        rows_per_band: int = 8,

    ):
        """

        Initialize MinHash LSH.



        Args:

            num_permutations: Number of hash permutations for MinHash

            threshold: Similarity threshold for considering duplicates

            bands: Number of bands for LSH

            rows_per_band: Rows per band (bands * rows_per_band = num_permutations)

        """
        self.num_permutations = num_permutations
        self.threshold = threshold
        self.bands = bands
        self.rows_per_band = rows_per_band

        assert bands * rows_per_band == num_permutations

        # Generate random hash functions
        self.hash_functions = self._generate_hash_functions(num_permutations)

        # LSH index: band_id -> {bucket_hash -> set of doc_ids}
        self.index = [dict() for _ in range(bands)]
        
        # Store signatures for similarity computation
        self.signatures = {}  # doc_id -> MinHashSignature

    def _generate_hash_functions(self, n: int) -> List:
        """Generate n random hash functions."""
        # Use random permutations of large prime
        functions = []
        for _ in range(n):
            a = random.randint(1, 2**32 - 1)
            b = random.randint(0, 2**32 - 1)
            functions.append((a, b))
        return functions

    def _hash(self, x: int, a: int, b: int) -> int:
        """Universal hash function: (a*x + b) mod prime."""
        prime = 2**61 - 1
        return ((a * x + b) % prime) & 0xFFFFFFFF

    def _compute_minhash(self, shingles: Set[int]) -> List[int]:
        """

        Compute MinHash signature for a set of shingles.



        Args:

            shingles: Set of shingle hash values



        Returns:

            List of minhash values (one per permutation)

        """
        signature = []
        for a, b in self.hash_functions:
            min_hash = min(self._hash(shingle, a, b) for shingle in shingles)
            signature.append(min_hash)
        return signature

    def _shingle_text(

        self,

        text: str,

        k: int = 5,

    ) -> Set[int]:
        """

        Extract k-gram shingles from text.



        Args:

            text: Input text

            k: Shingle size (characters)



        Returns:

            Set of shingle hashes

        """
        text = text.lower()
        shingles = set()
        for i in range(len(text) - k + 1):
            shingle = text[i:i+k]
            # Hash shingle
            shingle_hash = int(hashlib.md5(shingle.encode()).hexdigest()[:8], 16)
            shingles.add(shingle_hash)
        return shingles

    def add_document(

        self,

        doc_id: str,

        text: str,

        compute_signature: bool = True,

    ) -> MinHashSignature:
        """

        Add a document to the LSH index.



        Args:

            doc_id: Unique document ID

            text: Document text

            compute_signature: Whether to compute signature (or use precomputed)



        Returns:

            MinHash signature

        """
        if compute_signature:
            shingles = self._shingle_text(text)
            signature = self._compute_minhash(shingles)
        else:
            raise ValueError("Must compute signature")

        # Store signature
        signature_obj = MinHashSignature(hash_values=signature, doc_id=doc_id)
        self.signatures[doc_id] = signature_obj

        # Index into bands
        for band_idx in range(self.bands):
            start = band_idx * self.rows_per_band
            end = start + self.rows_per_band
            band_signature = tuple(signature[start:end])
            bucket_hash = hash(band_signature)

            if bucket_hash not in self.index[band_idx]:
                self.index[band_idx][bucket_hash] = set()
            self.index[band_idx][bucket_hash].add(doc_id)

        return signature_obj

    def query(

        self,

        text: str,

        candidate_doc_ids: Optional[Set[str]] = None,

    ) -> List[Tuple[str, float]]:
        """

        Query for near-duplicate documents.



        Args:

            text: Query text

            candidate_doc_ids: Optional set of candidate doc IDs to check



        Returns:

            List of (doc_id, similarity) above threshold

        """
        shingles = self._shingle_text(text)
        query_signature = self._compute_minhash(shingles)

        # Find candidates via LSH
        candidate_sets = []
        for band_idx in range(self.bands):
            start = band_idx * self.rows_per_band
            end = start + self.rows_per_band
            band_signature = tuple(query_signature[start:end])
            bucket_hash = hash(band_signature)

            if bucket_hash in self.index[band_idx]:
                candidate_sets.append(self.index[band_idx][bucket_hash])

        # Union of candidates
        candidates = set()
        for s in candidate_sets:
            candidates.update(s)

        if candidate_doc_ids is not None:
            candidates = candidates.intersection(candidate_doc_ids)

        # Compute exact Jaccard similarity for candidates
        results = []
        query_shingles = shingles
        for doc_id in candidates:
            # In practice, would retrieve stored shingles
            # For now, approximate using signature
            similarity = self._estimate_similarity(query_signature, doc_id)
            if similarity >= self.threshold:
                results.append((doc_id, similarity))

        return sorted(results, key=lambda x: x[1], reverse=True)

    def _estimate_similarity(

        self,

        signature1: List[int],

        doc_id2: str,

    ) -> float:
        """

        Estimate Jaccard similarity between two signatures.

        Uses MinHash similarity: proportion of matching hash values.

        

        Args:

            signature1: First MinHash signature

            doc_id2: Second document ID (signature retrieved from storage)

            

        Returns:

            Estimated Jaccard similarity

        """
        if doc_id2 not in self.signatures:
            return 0.0
        
        signature2 = self.signatures[doc_id2].hash_values
        
        # Count matching values
        matches = sum(1 for h1, h2 in zip(signature1, signature2) if h1 == h2)
        similarity = matches / len(signature1)
        
        return similarity

    def compute_signature(self, text: str) -> MinHashSignature:
        """Compute MinHash signature for text."""
        shingles = self._shingle_text(text)
        signature = self._compute_minhash(shingles)
        return MinHashSignature(hash_values=signature, doc_id="")


def test_deduplication():
    """Test MinHash LSH."""
    lsh = MinHashLSH(num_permutations=64, threshold=0.7, bands=8, rows_per_band=8)

    # Add documents
    docs = [
        ("doc1", "The quick brown fox jumps over the lazy dog."),
        ("doc2", "The quick brown fox jumps over the lazy dog!!!"),  # near duplicate
        ("doc3", "The quick brown fox leaps over the lazy dog."),  # near duplicate
        ("doc4", "Completely unrelated text about science and experiments."),
    ]

    signatures = {}
    for doc_id, text in docs:
        sig = lsh.add_document(doc_id, text)
        signatures[doc_id] = sig

    # Query with doc1
    results = lsh.query(docs[0][1])
    print(f"Query results for doc1: {results}")
    # Should find doc2 and doc3 as similar

    print("Deduplication test passed!")


if __name__ == "__main__":
    test_deduplication()