| | """
|
| | Deduplication: MinHash LSH for near-duplicate detection.
|
| | """
|
| |
|
| | import hashlib
|
| | import random
|
| | from typing import List, Set, Tuple, Optional
|
| | from dataclasses import dataclass
|
| |
|
| |
|
| | @dataclass
|
| | class MinHashSignature:
|
| | """MinHash signature for a document."""
|
| | hash_values: List[int]
|
| | doc_id: str
|
| |
|
| |
|
| | class MinHashLSH:
|
| | """
|
| | MinHash LSH for near-duplicate detection.
|
| | Uses shingling and MinHash to estimate Jaccard similarity.
|
| | """
|
| |
|
| | def __init__(
|
| | self,
|
| | num_permutations: int = 128,
|
| | threshold: float = 0.8,
|
| | bands: int = 16,
|
| | rows_per_band: int = 8,
|
| | ):
|
| | """
|
| | Initialize MinHash LSH.
|
| |
|
| | Args:
|
| | num_permutations: Number of hash permutations for MinHash
|
| | threshold: Similarity threshold for considering duplicates
|
| | bands: Number of bands for LSH
|
| | rows_per_band: Rows per band (bands * rows_per_band = num_permutations)
|
| | """
|
| | self.num_permutations = num_permutations
|
| | self.threshold = threshold
|
| | self.bands = bands
|
| | self.rows_per_band = rows_per_band
|
| |
|
| | assert bands * rows_per_band == num_permutations
|
| |
|
| |
|
| | self.hash_functions = self._generate_hash_functions(num_permutations)
|
| |
|
| |
|
| | self.index = [dict() for _ in range(bands)]
|
| |
|
| |
|
| | self.signatures = {}
|
| |
|
| | def _generate_hash_functions(self, n: int) -> List:
|
| | """Generate n random hash functions."""
|
| |
|
| | functions = []
|
| | for _ in range(n):
|
| | a = random.randint(1, 2**32 - 1)
|
| | b = random.randint(0, 2**32 - 1)
|
| | functions.append((a, b))
|
| | return functions
|
| |
|
| | def _hash(self, x: int, a: int, b: int) -> int:
|
| | """Universal hash function: (a*x + b) mod prime."""
|
| | prime = 2**61 - 1
|
| | return ((a * x + b) % prime) & 0xFFFFFFFF
|
| |
|
| | def _compute_minhash(self, shingles: Set[int]) -> List[int]:
|
| | """
|
| | Compute MinHash signature for a set of shingles.
|
| |
|
| | Args:
|
| | shingles: Set of shingle hash values
|
| |
|
| | Returns:
|
| | List of minhash values (one per permutation)
|
| | """
|
| | signature = []
|
| | for a, b in self.hash_functions:
|
| | min_hash = min(self._hash(shingle, a, b) for shingle in shingles)
|
| | signature.append(min_hash)
|
| | return signature
|
| |
|
| | def _shingle_text(
|
| | self,
|
| | text: str,
|
| | k: int = 5,
|
| | ) -> Set[int]:
|
| | """
|
| | Extract k-gram shingles from text.
|
| |
|
| | Args:
|
| | text: Input text
|
| | k: Shingle size (characters)
|
| |
|
| | Returns:
|
| | Set of shingle hashes
|
| | """
|
| | text = text.lower()
|
| | shingles = set()
|
| | for i in range(len(text) - k + 1):
|
| | shingle = text[i:i+k]
|
| |
|
| | shingle_hash = int(hashlib.md5(shingle.encode()).hexdigest()[:8], 16)
|
| | shingles.add(shingle_hash)
|
| | return shingles
|
| |
|
| | def add_document(
|
| | self,
|
| | doc_id: str,
|
| | text: str,
|
| | compute_signature: bool = True,
|
| | ) -> MinHashSignature:
|
| | """
|
| | Add a document to the LSH index.
|
| |
|
| | Args:
|
| | doc_id: Unique document ID
|
| | text: Document text
|
| | compute_signature: Whether to compute signature (or use precomputed)
|
| |
|
| | Returns:
|
| | MinHash signature
|
| | """
|
| | if compute_signature:
|
| | shingles = self._shingle_text(text)
|
| | signature = self._compute_minhash(shingles)
|
| | else:
|
| | raise ValueError("Must compute signature")
|
| |
|
| |
|
| | signature_obj = MinHashSignature(hash_values=signature, doc_id=doc_id)
|
| | self.signatures[doc_id] = signature_obj
|
| |
|
| |
|
| | for band_idx in range(self.bands):
|
| | start = band_idx * self.rows_per_band
|
| | end = start + self.rows_per_band
|
| | band_signature = tuple(signature[start:end])
|
| | bucket_hash = hash(band_signature)
|
| |
|
| | if bucket_hash not in self.index[band_idx]:
|
| | self.index[band_idx][bucket_hash] = set()
|
| | self.index[band_idx][bucket_hash].add(doc_id)
|
| |
|
| | return signature_obj
|
| |
|
| | def query(
|
| | self,
|
| | text: str,
|
| | candidate_doc_ids: Optional[Set[str]] = None,
|
| | ) -> List[Tuple[str, float]]:
|
| | """
|
| | Query for near-duplicate documents.
|
| |
|
| | Args:
|
| | text: Query text
|
| | candidate_doc_ids: Optional set of candidate doc IDs to check
|
| |
|
| | Returns:
|
| | List of (doc_id, similarity) above threshold
|
| | """
|
| | shingles = self._shingle_text(text)
|
| | query_signature = self._compute_minhash(shingles)
|
| |
|
| |
|
| | candidate_sets = []
|
| | for band_idx in range(self.bands):
|
| | start = band_idx * self.rows_per_band
|
| | end = start + self.rows_per_band
|
| | band_signature = tuple(query_signature[start:end])
|
| | bucket_hash = hash(band_signature)
|
| |
|
| | if bucket_hash in self.index[band_idx]:
|
| | candidate_sets.append(self.index[band_idx][bucket_hash])
|
| |
|
| |
|
| | candidates = set()
|
| | for s in candidate_sets:
|
| | candidates.update(s)
|
| |
|
| | if candidate_doc_ids is not None:
|
| | candidates = candidates.intersection(candidate_doc_ids)
|
| |
|
| |
|
| | results = []
|
| | query_shingles = shingles
|
| | for doc_id in candidates:
|
| |
|
| |
|
| | similarity = self._estimate_similarity(query_signature, doc_id)
|
| | if similarity >= self.threshold:
|
| | results.append((doc_id, similarity))
|
| |
|
| | return sorted(results, key=lambda x: x[1], reverse=True)
|
| |
|
| | def _estimate_similarity(
|
| | self,
|
| | signature1: List[int],
|
| | doc_id2: str,
|
| | ) -> float:
|
| | """
|
| | Estimate Jaccard similarity between two signatures.
|
| | Uses MinHash similarity: proportion of matching hash values.
|
| |
|
| | Args:
|
| | signature1: First MinHash signature
|
| | doc_id2: Second document ID (signature retrieved from storage)
|
| |
|
| | Returns:
|
| | Estimated Jaccard similarity
|
| | """
|
| | if doc_id2 not in self.signatures:
|
| | return 0.0
|
| |
|
| | signature2 = self.signatures[doc_id2].hash_values
|
| |
|
| |
|
| | matches = sum(1 for h1, h2 in zip(signature1, signature2) if h1 == h2)
|
| | similarity = matches / len(signature1)
|
| |
|
| | return similarity
|
| |
|
| | def compute_signature(self, text: str) -> MinHashSignature:
|
| | """Compute MinHash signature for text."""
|
| | shingles = self._shingle_text(text)
|
| | signature = self._compute_minhash(shingles)
|
| | return MinHashSignature(hash_values=signature, doc_id="")
|
| |
|
| |
|
| | def test_deduplication():
|
| | """Test MinHash LSH."""
|
| | lsh = MinHashLSH(num_permutations=64, threshold=0.7, bands=8, rows_per_band=8)
|
| |
|
| |
|
| | docs = [
|
| | ("doc1", "The quick brown fox jumps over the lazy dog."),
|
| | ("doc2", "The quick brown fox jumps over the lazy dog!!!"),
|
| | ("doc3", "The quick brown fox leaps over the lazy dog."),
|
| | ("doc4", "Completely unrelated text about science and experiments."),
|
| | ]
|
| |
|
| | signatures = {}
|
| | for doc_id, text in docs:
|
| | sig = lsh.add_document(doc_id, text)
|
| | signatures[doc_id] = sig
|
| |
|
| |
|
| | results = lsh.query(docs[0][1])
|
| | print(f"Query results for doc1: {results}")
|
| |
|
| |
|
| | print("Deduplication test passed!")
|
| |
|
| |
|
| | if __name__ == "__main__":
|
| | test_deduplication()
|
| |
|