""" Deduplication: MinHash LSH for near-duplicate detection. """ import hashlib import random from typing import List, Set, Tuple, Optional from dataclasses import dataclass @dataclass class MinHashSignature: """MinHash signature for a document.""" hash_values: List[int] doc_id: str class MinHashLSH: """ MinHash LSH for near-duplicate detection. Uses shingling and MinHash to estimate Jaccard similarity. """ def __init__( self, num_permutations: int = 128, threshold: float = 0.8, bands: int = 16, rows_per_band: int = 8, ): """ Initialize MinHash LSH. Args: num_permutations: Number of hash permutations for MinHash threshold: Similarity threshold for considering duplicates bands: Number of bands for LSH rows_per_band: Rows per band (bands * rows_per_band = num_permutations) """ self.num_permutations = num_permutations self.threshold = threshold self.bands = bands self.rows_per_band = rows_per_band assert bands * rows_per_band == num_permutations # Generate random hash functions self.hash_functions = self._generate_hash_functions(num_permutations) # LSH index: band_id -> {bucket_hash -> set of doc_ids} self.index = [dict() for _ in range(bands)] # Store signatures for similarity computation self.signatures = {} # doc_id -> MinHashSignature def _generate_hash_functions(self, n: int) -> List: """Generate n random hash functions.""" # Use random permutations of large prime functions = [] for _ in range(n): a = random.randint(1, 2**32 - 1) b = random.randint(0, 2**32 - 1) functions.append((a, b)) return functions def _hash(self, x: int, a: int, b: int) -> int: """Universal hash function: (a*x + b) mod prime.""" prime = 2**61 - 1 return ((a * x + b) % prime) & 0xFFFFFFFF def _compute_minhash(self, shingles: Set[int]) -> List[int]: """ Compute MinHash signature for a set of shingles. Args: shingles: Set of shingle hash values Returns: List of minhash values (one per permutation) """ signature = [] for a, b in self.hash_functions: min_hash = min(self._hash(shingle, a, b) for shingle in shingles) signature.append(min_hash) return signature def _shingle_text( self, text: str, k: int = 5, ) -> Set[int]: """ Extract k-gram shingles from text. Args: text: Input text k: Shingle size (characters) Returns: Set of shingle hashes """ text = text.lower() shingles = set() for i in range(len(text) - k + 1): shingle = text[i:i+k] # Hash shingle shingle_hash = int(hashlib.md5(shingle.encode()).hexdigest()[:8], 16) shingles.add(shingle_hash) return shingles def add_document( self, doc_id: str, text: str, compute_signature: bool = True, ) -> MinHashSignature: """ Add a document to the LSH index. Args: doc_id: Unique document ID text: Document text compute_signature: Whether to compute signature (or use precomputed) Returns: MinHash signature """ if compute_signature: shingles = self._shingle_text(text) signature = self._compute_minhash(shingles) else: raise ValueError("Must compute signature") # Store signature signature_obj = MinHashSignature(hash_values=signature, doc_id=doc_id) self.signatures[doc_id] = signature_obj # Index into bands for band_idx in range(self.bands): start = band_idx * self.rows_per_band end = start + self.rows_per_band band_signature = tuple(signature[start:end]) bucket_hash = hash(band_signature) if bucket_hash not in self.index[band_idx]: self.index[band_idx][bucket_hash] = set() self.index[band_idx][bucket_hash].add(doc_id) return signature_obj def query( self, text: str, candidate_doc_ids: Optional[Set[str]] = None, ) -> List[Tuple[str, float]]: """ Query for near-duplicate documents. Args: text: Query text candidate_doc_ids: Optional set of candidate doc IDs to check Returns: List of (doc_id, similarity) above threshold """ shingles = self._shingle_text(text) query_signature = self._compute_minhash(shingles) # Find candidates via LSH candidate_sets = [] for band_idx in range(self.bands): start = band_idx * self.rows_per_band end = start + self.rows_per_band band_signature = tuple(query_signature[start:end]) bucket_hash = hash(band_signature) if bucket_hash in self.index[band_idx]: candidate_sets.append(self.index[band_idx][bucket_hash]) # Union of candidates candidates = set() for s in candidate_sets: candidates.update(s) if candidate_doc_ids is not None: candidates = candidates.intersection(candidate_doc_ids) # Compute exact Jaccard similarity for candidates results = [] query_shingles = shingles for doc_id in candidates: # In practice, would retrieve stored shingles # For now, approximate using signature similarity = self._estimate_similarity(query_signature, doc_id) if similarity >= self.threshold: results.append((doc_id, similarity)) return sorted(results, key=lambda x: x[1], reverse=True) def _estimate_similarity( self, signature1: List[int], doc_id2: str, ) -> float: """ Estimate Jaccard similarity between two signatures. Uses MinHash similarity: proportion of matching hash values. Args: signature1: First MinHash signature doc_id2: Second document ID (signature retrieved from storage) Returns: Estimated Jaccard similarity """ if doc_id2 not in self.signatures: return 0.0 signature2 = self.signatures[doc_id2].hash_values # Count matching values matches = sum(1 for h1, h2 in zip(signature1, signature2) if h1 == h2) similarity = matches / len(signature1) return similarity def compute_signature(self, text: str) -> MinHashSignature: """Compute MinHash signature for text.""" shingles = self._shingle_text(text) signature = self._compute_minhash(shingles) return MinHashSignature(hash_values=signature, doc_id="") def test_deduplication(): """Test MinHash LSH.""" lsh = MinHashLSH(num_permutations=64, threshold=0.7, bands=8, rows_per_band=8) # Add documents docs = [ ("doc1", "The quick brown fox jumps over the lazy dog."), ("doc2", "The quick brown fox jumps over the lazy dog!!!"), # near duplicate ("doc3", "The quick brown fox leaps over the lazy dog."), # near duplicate ("doc4", "Completely unrelated text about science and experiments."), ] signatures = {} for doc_id, text in docs: sig = lsh.add_document(doc_id, text) signatures[doc_id] = sig # Query with doc1 results = lsh.query(docs[0][1]) print(f"Query results for doc1: {results}") # Should find doc2 and doc3 as similar print("Deduplication test passed!") if __name__ == "__main__": test_deduplication()