Vortex-7b-V1 / data /deduplication.py
Zandy-Wandy's picture
Upload Vortex model
bf64b03 verified
"""
Deduplication: MinHash LSH for near-duplicate detection.
"""
import hashlib
import random
from typing import List, Set, Tuple, Optional
from dataclasses import dataclass
@dataclass
class MinHashSignature:
"""MinHash signature for a document."""
hash_values: List[int]
doc_id: str
class MinHashLSH:
"""
MinHash LSH for near-duplicate detection.
Uses shingling and MinHash to estimate Jaccard similarity.
"""
def __init__(
self,
num_permutations: int = 128,
threshold: float = 0.8,
bands: int = 16,
rows_per_band: int = 8,
):
"""
Initialize MinHash LSH.
Args:
num_permutations: Number of hash permutations for MinHash
threshold: Similarity threshold for considering duplicates
bands: Number of bands for LSH
rows_per_band: Rows per band (bands * rows_per_band = num_permutations)
"""
self.num_permutations = num_permutations
self.threshold = threshold
self.bands = bands
self.rows_per_band = rows_per_band
assert bands * rows_per_band == num_permutations
# Generate random hash functions
self.hash_functions = self._generate_hash_functions(num_permutations)
# LSH index: band_id -> {bucket_hash -> set of doc_ids}
self.index = [dict() for _ in range(bands)]
# Store signatures for similarity computation
self.signatures = {} # doc_id -> MinHashSignature
def _generate_hash_functions(self, n: int) -> List:
"""Generate n random hash functions."""
# Use random permutations of large prime
functions = []
for _ in range(n):
a = random.randint(1, 2**32 - 1)
b = random.randint(0, 2**32 - 1)
functions.append((a, b))
return functions
def _hash(self, x: int, a: int, b: int) -> int:
"""Universal hash function: (a*x + b) mod prime."""
prime = 2**61 - 1
return ((a * x + b) % prime) & 0xFFFFFFFF
def _compute_minhash(self, shingles: Set[int]) -> List[int]:
"""
Compute MinHash signature for a set of shingles.
Args:
shingles: Set of shingle hash values
Returns:
List of minhash values (one per permutation)
"""
signature = []
for a, b in self.hash_functions:
min_hash = min(self._hash(shingle, a, b) for shingle in shingles)
signature.append(min_hash)
return signature
def _shingle_text(
self,
text: str,
k: int = 5,
) -> Set[int]:
"""
Extract k-gram shingles from text.
Args:
text: Input text
k: Shingle size (characters)
Returns:
Set of shingle hashes
"""
text = text.lower()
shingles = set()
for i in range(len(text) - k + 1):
shingle = text[i:i+k]
# Hash shingle
shingle_hash = int(hashlib.md5(shingle.encode()).hexdigest()[:8], 16)
shingles.add(shingle_hash)
return shingles
def add_document(
self,
doc_id: str,
text: str,
compute_signature: bool = True,
) -> MinHashSignature:
"""
Add a document to the LSH index.
Args:
doc_id: Unique document ID
text: Document text
compute_signature: Whether to compute signature (or use precomputed)
Returns:
MinHash signature
"""
if compute_signature:
shingles = self._shingle_text(text)
signature = self._compute_minhash(shingles)
else:
raise ValueError("Must compute signature")
# Store signature
signature_obj = MinHashSignature(hash_values=signature, doc_id=doc_id)
self.signatures[doc_id] = signature_obj
# Index into bands
for band_idx in range(self.bands):
start = band_idx * self.rows_per_band
end = start + self.rows_per_band
band_signature = tuple(signature[start:end])
bucket_hash = hash(band_signature)
if bucket_hash not in self.index[band_idx]:
self.index[band_idx][bucket_hash] = set()
self.index[band_idx][bucket_hash].add(doc_id)
return signature_obj
def query(
self,
text: str,
candidate_doc_ids: Optional[Set[str]] = None,
) -> List[Tuple[str, float]]:
"""
Query for near-duplicate documents.
Args:
text: Query text
candidate_doc_ids: Optional set of candidate doc IDs to check
Returns:
List of (doc_id, similarity) above threshold
"""
shingles = self._shingle_text(text)
query_signature = self._compute_minhash(shingles)
# Find candidates via LSH
candidate_sets = []
for band_idx in range(self.bands):
start = band_idx * self.rows_per_band
end = start + self.rows_per_band
band_signature = tuple(query_signature[start:end])
bucket_hash = hash(band_signature)
if bucket_hash in self.index[band_idx]:
candidate_sets.append(self.index[band_idx][bucket_hash])
# Union of candidates
candidates = set()
for s in candidate_sets:
candidates.update(s)
if candidate_doc_ids is not None:
candidates = candidates.intersection(candidate_doc_ids)
# Compute exact Jaccard similarity for candidates
results = []
query_shingles = shingles
for doc_id in candidates:
# In practice, would retrieve stored shingles
# For now, approximate using signature
similarity = self._estimate_similarity(query_signature, doc_id)
if similarity >= self.threshold:
results.append((doc_id, similarity))
return sorted(results, key=lambda x: x[1], reverse=True)
def _estimate_similarity(
self,
signature1: List[int],
doc_id2: str,
) -> float:
"""
Estimate Jaccard similarity between two signatures.
Uses MinHash similarity: proportion of matching hash values.
Args:
signature1: First MinHash signature
doc_id2: Second document ID (signature retrieved from storage)
Returns:
Estimated Jaccard similarity
"""
if doc_id2 not in self.signatures:
return 0.0
signature2 = self.signatures[doc_id2].hash_values
# Count matching values
matches = sum(1 for h1, h2 in zip(signature1, signature2) if h1 == h2)
similarity = matches / len(signature1)
return similarity
def compute_signature(self, text: str) -> MinHashSignature:
"""Compute MinHash signature for text."""
shingles = self._shingle_text(text)
signature = self._compute_minhash(shingles)
return MinHashSignature(hash_values=signature, doc_id="")
def test_deduplication():
"""Test MinHash LSH."""
lsh = MinHashLSH(num_permutations=64, threshold=0.7, bands=8, rows_per_band=8)
# Add documents
docs = [
("doc1", "The quick brown fox jumps over the lazy dog."),
("doc2", "The quick brown fox jumps over the lazy dog!!!"), # near duplicate
("doc3", "The quick brown fox leaps over the lazy dog."), # near duplicate
("doc4", "Completely unrelated text about science and experiments."),
]
signatures = {}
for doc_id, text in docs:
sig = lsh.add_document(doc_id, text)
signatures[doc_id] = sig
# Query with doc1
results = lsh.query(docs[0][1])
print(f"Query results for doc1: {results}")
# Should find doc2 and doc3 as similar
print("Deduplication test passed!")
if __name__ == "__main__":
test_deduplication()