Spaces:
Sleeping
Sleeping
| """Deduplication module for fabric-to-espanso.""" | |
| import logging | |
| from typing import List, Dict, Any, Tuple, Set | |
| import difflib | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.http.models import Filter, PointIdsList | |
| from .config import config | |
| from .database import get_dense_vector_name, get_sparse_vector_name | |
| logger = logging.getLogger('fabric_to_espanso') | |
| def calculate_text_difference_percentage(text1: str, text2: str) -> float: | |
| """ | |
| Calculate the percentage difference between two text strings. | |
| Args: | |
| text1: First text string | |
| text2: Second text string | |
| Returns: | |
| Percentage difference as a float between 0.0 (identical) and 1.0 (completely different) | |
| """ | |
| # Use difflib's SequenceMatcher to calculate similarity ratio | |
| similarity = difflib.SequenceMatcher(None, text1, text2).ratio() | |
| # Convert similarity to difference percentage | |
| difference_percentage = 1.0 - similarity | |
| return difference_percentage | |
| # TODO: Consider moving the vector similarity search functionality to database_query.py and import it here | |
| # This would create a more structured codebase with search functionality centralized in one place | |
| def find_duplicates(client: QdrantClient, collection_name: str = config.embedding.collection_name) -> List[Tuple[str, List[str]]]: | |
| """ | |
| Find duplicate entries in the database based on semantic similarity and text difference. | |
| Args: | |
| client: Initialized Qdrant client | |
| collection_name: Name of the collection to query | |
| Returns: | |
| List of tuples containing (kept_point_id, [duplicate_point_ids]) | |
| """ | |
| # Constants for duplicate detection | |
| SIMILARITY_THRESHOLD = 0.85 # Minimum semantic similarity to consider as potential duplicate | |
| DIFFERENCE_THRESHOLD = 0.1 # Maximum text difference (10%) to consider as duplicate | |
| # Get all points from the database | |
| all_points = client.scroll( | |
| collection_name=collection_name, | |
| with_vectors=True, # Include vector data, else no vector will be available | |
| limit=10000 # Adjust based on expected file count | |
| )[0] | |
| logger.info(f"Checking {len(all_points)} entries for duplicates") | |
| # Track processed points to avoid redundant comparisons | |
| processed_points = set() | |
| # Store duplicates as (kept_id, [duplicate_ids]) | |
| duplicates = [] | |
| # For each point, find semantically similar points | |
| for i, point in enumerate(all_points): | |
| if point.id in processed_points: | |
| continue | |
| point_id = point.id | |
| point_content = point.payload.get('content', '') | |
| logger.debug(f"Checking point {point_id} for duplicates") | |
| logger.debug(f"Content: {point_content}") | |
| # Skip if no content | |
| if not point_content: | |
| logger.debug(f"Skipping point {point_id} as it has no content") | |
| continue | |
| # Get the actual vector names from the collection configuration | |
| dense_vector_name = get_dense_vector_name(client, collection_name) | |
| # Skip points without vector or without the required vector type | |
| if not point.vector or dense_vector_name not in point.vector: | |
| logger.debug(f"Skipping point {point_id} as it has no valid vector") | |
| continue | |
| # Find semantically similar points using Qdrant's search | |
| similar_points = client.search( | |
| collection_name=collection_name, | |
| query_vector=(dense_vector_name, point.vector.get(dense_vector_name)), | |
| limit=100, | |
| score_threshold=SIMILARITY_THRESHOLD # Only consider points with similarity > threshold | |
| ) | |
| # Skip the first result (which is the point itself) | |
| similar_points = [p for p in similar_points if p.id != point_id] | |
| if not similar_points: | |
| continue | |
| logger.debug(f"Found {len(similar_points)} semantically similar points for {point.payload.get('filename', 'unknown')}") | |
| # Check text difference for each similar point | |
| duplicate_ids = [] | |
| for similar_point in similar_points: | |
| similar_id = similar_point.id | |
| # Skip if already processed | |
| if similar_id in processed_points: | |
| continue | |
| # Get content of similar point | |
| similar_content = None | |
| for p in all_points: | |
| if p.id == similar_id: | |
| similar_content = p.payload.get('content', '') | |
| break | |
| if not similar_content: | |
| continue | |
| # Calculate text difference percentage | |
| diff_percentage = calculate_text_difference_percentage(point_content, similar_content) | |
| # If difference is less than threshold, consider it a duplicate | |
| if diff_percentage <= DIFFERENCE_THRESHOLD: | |
| duplicate_ids.append(similar_id) | |
| processed_points.add(similar_id) | |
| logger.debug(f"Found duplicate: {similar_id} (diff: {diff_percentage:.2%})") | |
| if duplicate_ids: | |
| duplicates.append((point_id, duplicate_ids)) | |
| processed_points.add(point_id) | |
| logger.info(f"Found {sum(len(dups) for _, dups in duplicates)} duplicate entries in {len(duplicates)} groups") | |
| return duplicates | |
| def remove_duplicates(client: QdrantClient, collection_name: str = config.embedding.collection_name) -> int: | |
| """ | |
| Remove duplicate entries from the database based on semantic similarity and text difference. | |
| Uses a two-step verification process: | |
| 1. Find entries with semantic similarity > 0.9 (using vector search) | |
| 2. For those entries, keep only those with text difference <= 5% | |
| Args: | |
| client: Initialized Qdrant client | |
| collection_name: Name of the collection to query | |
| Returns: | |
| Number of removed duplicate entries | |
| """ | |
| # Find duplicates | |
| duplicate_groups = find_duplicates(client, collection_name) | |
| if not duplicate_groups: | |
| logger.info("No duplicates found") | |
| return 0 | |
| # Count total duplicates | |
| total_duplicates = sum(len(dups) for _, dups in duplicate_groups) | |
| # Remove duplicates | |
| for _, duplicate_ids in duplicate_groups: | |
| if duplicate_ids: | |
| client.delete( | |
| collection_name=collection_name, | |
| points_selector=PointIdsList(points=duplicate_ids) | |
| ) | |
| logger.info(f"Removed {total_duplicates} duplicate entries from the database") | |
| return total_duplicates | |