Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Qdrant Migration Verification Script | |
| This script compares the source and destination Qdrant collections to verify | |
| that the migration was successful. It: | |
| 1. Compares collection configurations | |
| 2. Fetches sample points from source | |
| 3. Retrieves same points from destination using IDs | |
| 4. Compares vectors, metadata, and all attributes | |
| """ | |
| import os | |
| import sys | |
| from typing import List, Dict, Any, Optional | |
| from pathlib import Path | |
| from qdrant_client import QdrantClient | |
| import json | |
| # Try to import config loader and dotenv for automatic source detection | |
| try: | |
| from src.config.loader import load_config | |
| CONFIG_AVAILABLE = True | |
| except ImportError: | |
| CONFIG_AVAILABLE = False | |
| try: | |
| from dotenv import load_dotenv | |
| DOTENV_AVAILABLE = True | |
| except ImportError: | |
| DOTENV_AVAILABLE = False | |
| # Load .env file automatically if available | |
| if DOTENV_AVAILABLE: | |
| project_root = Path(__file__).parent | |
| env_file = project_root / ".env" | |
| if env_file.exists(): | |
| load_dotenv(env_file, override=True) | |
| else: | |
| load_dotenv(override=True) | |
| def get_collection_info(client: QdrantClient, collection_name: str) -> Dict[str, Any]: | |
| """Get collection information including vector size and point count.""" | |
| try: | |
| collection_info = client.get_collection(collection_name) | |
| # Handle different Qdrant client versions and response formats | |
| if hasattr(collection_info, 'config'): | |
| config = collection_info.config | |
| if hasattr(config, 'params') and hasattr(config.params, 'vectors'): | |
| vectors_config = config.params.vectors | |
| if isinstance(vectors_config, dict): | |
| vector_size = vectors_config.get('size') | |
| distance = vectors_config.get('distance') | |
| else: | |
| vector_size = getattr(vectors_config, 'size', None) | |
| distance = getattr(vectors_config, 'distance', None) | |
| else: | |
| vector_size = getattr(config, 'vector_size', None) | |
| distance = getattr(config, 'distance', None) | |
| else: | |
| vector_size = getattr(collection_info, 'vector_size', None) | |
| distance = getattr(collection_info, 'distance', None) | |
| points_count = getattr(collection_info, 'points_count', 0) | |
| indexed_vectors_count = getattr(collection_info, 'indexed_vectors_count', 0) | |
| if vector_size is None: | |
| try: | |
| result, _ = client.scroll(collection_name=collection_name, limit=1, with_vectors=True) | |
| if result and hasattr(result[0], 'vector') and result[0].vector: | |
| vector_size = len(result[0].vector) | |
| except Exception: | |
| pass | |
| return { | |
| "vector_size": vector_size, | |
| "distance": distance or "Cosine", | |
| "points_count": points_count, | |
| "indexed_vectors_count": indexed_vectors_count, | |
| } | |
| except Exception as e: | |
| print(f"β Error getting collection info: {e}") | |
| return None | |
| def fetch_points_by_ids(client: QdrantClient, collection_name: str, point_ids: List) -> Dict: | |
| """Fetch points by their IDs from a collection.""" | |
| try: | |
| points = client.retrieve( | |
| collection_name=collection_name, | |
| ids=point_ids, | |
| with_payload=True, | |
| with_vectors=True | |
| ) | |
| return {point.id: point for point in points} | |
| except Exception as e: | |
| print(f"β Error fetching points by IDs: {e}") | |
| return {} | |
| def compare_points(source_point, dest_point, point_id) -> Dict[str, Any]: | |
| """Compare two points and return differences.""" | |
| differences = [] | |
| matches = [] | |
| # Compare IDs | |
| if source_point.id == dest_point.id: | |
| matches.append("ID") | |
| else: | |
| differences.append(f"ID: source={source_point.id}, dest={dest_point.id}") | |
| # Compare vectors | |
| source_vec = getattr(source_point, 'vector', None) | |
| dest_vec = getattr(dest_point, 'vector', None) | |
| if source_vec is None and dest_vec is None: | |
| matches.append("Vector (both None)") | |
| elif source_vec is None or dest_vec is None: | |
| differences.append(f"Vector: source={'None' if source_vec is None else f'len={len(source_vec)}'}, dest={'None' if dest_vec is None else f'len={len(dest_vec)}'}") | |
| elif len(source_vec) != len(dest_vec): | |
| differences.append(f"Vector length: source={len(source_vec)}, dest={len(dest_vec)}") | |
| else: | |
| # Compare vector values (with tolerance for floating point) | |
| import numpy as np | |
| try: | |
| vec_diff = np.abs(np.array(source_vec) - np.array(dest_vec)) | |
| max_diff = float(np.max(vec_diff)) | |
| if max_diff < 1e-6: | |
| matches.append(f"Vector (max diff: {max_diff:.2e})") | |
| else: | |
| differences.append(f"Vector values differ (max diff: {max_diff:.2e})") | |
| except Exception as e: | |
| differences.append(f"Vector comparison error: {e}") | |
| # Compare payloads | |
| source_payload = getattr(source_point, 'payload', {}) or {} | |
| dest_payload = getattr(dest_point, 'payload', {}) or {} | |
| # Convert to dicts if needed | |
| if hasattr(source_payload, '__dict__'): | |
| source_payload = source_payload.__dict__ | |
| if hasattr(dest_payload, '__dict__'): | |
| dest_payload = dest_payload.__dict__ | |
| source_keys = set(source_payload.keys()) | |
| dest_keys = set(dest_payload.keys()) | |
| if source_keys != dest_keys: | |
| missing_in_dest = source_keys - dest_keys | |
| extra_in_dest = dest_keys - source_keys | |
| if missing_in_dest: | |
| differences.append(f"Payload keys missing in dest: {missing_in_dest}") | |
| if extra_in_dest: | |
| differences.append(f"Payload keys extra in dest: {extra_in_dest}") | |
| # Compare payload values | |
| common_keys = source_keys & dest_keys | |
| for key in common_keys: | |
| source_val = source_payload[key] | |
| dest_val = dest_payload[key] | |
| if source_val == dest_val: | |
| matches.append(f"Payload.{key}") | |
| else: | |
| # Handle nested structures | |
| if isinstance(source_val, dict) and isinstance(dest_val, dict): | |
| if source_val != dest_val: | |
| differences.append(f"Payload.{key}: dicts differ") | |
| elif isinstance(source_val, list) and isinstance(dest_val, list): | |
| if source_val != dest_val: | |
| differences.append(f"Payload.{key}: lists differ (len: {len(source_val)} vs {len(dest_val)})") | |
| else: | |
| differences.append(f"Payload.{key}: '{source_val}' != '{dest_val}'") | |
| return { | |
| "point_id": point_id, | |
| "matches": matches, | |
| "differences": differences, | |
| "match_count": len(matches), | |
| "diff_count": len(differences) | |
| } | |
| def main(): | |
| print("="*70) | |
| print("Qdrant Migration Verification Script") | |
| print("="*70) | |
| # Auto-detect source from config and .env file | |
| source_url = os.getenv('QDRANT_URL') | |
| source_key = os.getenv('QDRANT_API_KEY') | |
| source_collection = os.getenv('QDRANT_COLLECTION', 'docling') | |
| if CONFIG_AVAILABLE: | |
| try: | |
| config = load_config() | |
| qdrant_config = config.get('qdrant', {}) | |
| if not source_url: | |
| source_url = qdrant_config.get('url') | |
| if not source_key: | |
| source_key = qdrant_config.get('api_key') | |
| if not source_collection: | |
| source_collection = qdrant_config.get('collection_name', 'docling') | |
| except Exception as e: | |
| print(f"β οΈ Could not load config: {e}") | |
| # Get destination from env | |
| dest_url = os.getenv('DEST_QDRANT_URL') | |
| dest_key = os.getenv('DEST_QDRANT_API_KEY') | |
| dest_collection = os.getenv('DEST_COLLECTION') # Optional, will auto-detect | |
| # Validate | |
| if not source_url or not source_key: | |
| print("β Source Qdrant credentials missing!") | |
| print(" Set QDRANT_URL and QDRANT_API_KEY in .env or environment") | |
| return 1 | |
| if not dest_url or not dest_key: | |
| print("β Destination Qdrant credentials missing!") | |
| print(" Set DEST_QDRANT_URL and DEST_QDRANT_API_KEY in .env or environment") | |
| return 1 | |
| print(f"\nπ Configuration:") | |
| print(f" Source: {source_url}") | |
| print(f" Source Collection: {source_collection}") | |
| print(f" Destination: {dest_url}") | |
| if dest_collection: | |
| print(f" Destination Collection: {dest_collection} (specified)") | |
| else: | |
| print(f" Destination Collection: (auto-detect)") | |
| # Connect to Qdrant instances | |
| print(f"\nπ Connecting to Qdrant instances...") | |
| try: | |
| source_client = QdrantClient(url=source_url, api_key=source_key, timeout=120) | |
| print(f" β Connected to source") | |
| except Exception as e: | |
| print(f" β Failed to connect to source: {e}") | |
| return 1 | |
| try: | |
| dest_client = QdrantClient(url=dest_url, api_key=dest_key, timeout=120) | |
| print(f" β Connected to destination") | |
| except Exception as e: | |
| print(f" β Failed to connect to destination: {e}") | |
| return 1 | |
| # Auto-detect destination collection if not specified | |
| if not dest_collection: | |
| try: | |
| collections = dest_client.get_collections().collections | |
| collection_names = [c.name for c in collections] | |
| if len(collection_names) == 1: | |
| dest_collection = collection_names[0] | |
| print(f"\nπ Auto-detected destination collection: '{dest_collection}'") | |
| elif len(collection_names) > 1: | |
| print(f"\nβ οΈ Found {len(collection_names)} collections in destination:") | |
| for name in collection_names: | |
| print(f" - {name}") | |
| print(f"\n Using first collection: '{collection_names[0]}'") | |
| dest_collection = collection_names[0] | |
| else: | |
| print("β No collections found in destination!") | |
| return 1 | |
| except Exception as e: | |
| print(f"β Could not list destination collections: {e}") | |
| return 1 | |
| # Get collection info | |
| print(f"\nπ Collection Information Comparison") | |
| print("="*70) | |
| source_info = get_collection_info(source_client, source_collection) | |
| dest_info = get_collection_info(dest_client, dest_collection) | |
| if not source_info: | |
| print("β Could not get source collection info") | |
| return 1 | |
| if not dest_info: | |
| print("β Could not get destination collection info") | |
| return 1 | |
| print(f"\nSource Collection ('{source_collection}'):") | |
| print(f" Vector size: {source_info['vector_size']}") | |
| print(f" Distance: {source_info['distance']}") | |
| print(f" Points: {source_info['points_count']:,}") | |
| print(f" Indexed: {source_info['indexed_vectors_count']:,}") | |
| print(f"\nDestination Collection ('{dest_collection}'):") | |
| print(f" Vector size: {dest_info['vector_size']}") | |
| print(f" Distance: {dest_info['distance']}") | |
| print(f" Points: {dest_info['points_count']:,}") | |
| print(f" Indexed: {dest_info['indexed_vectors_count']:,}") | |
| # Compare configs | |
| print(f"\nπ Configuration Comparison:") | |
| config_matches = [] | |
| config_diffs = [] | |
| if source_info['vector_size'] == dest_info['vector_size']: | |
| config_matches.append(f"Vector size: {source_info['vector_size']}") | |
| else: | |
| config_diffs.append(f"Vector size: source={source_info['vector_size']}, dest={dest_info['vector_size']}") | |
| if str(source_info['distance']) == str(dest_info['distance']): | |
| config_matches.append(f"Distance: {source_info['distance']}") | |
| else: | |
| config_diffs.append(f"Distance: source={source_info['distance']}, dest={dest_info['distance']}") | |
| if source_info['points_count'] == dest_info['points_count']: | |
| config_matches.append(f"Points count: {source_info['points_count']:,}") | |
| else: | |
| config_diffs.append(f"Points count: source={source_info['points_count']:,}, dest={dest_info['points_count']:,}") | |
| if config_matches: | |
| print(f" β Matches: {len(config_matches)}") | |
| for match in config_matches: | |
| print(f" - {match}") | |
| if config_diffs: | |
| print(f" β Differences: {len(config_diffs)}") | |
| for diff in config_diffs: | |
| print(f" - {diff}") | |
| # Fetch sample points from source | |
| print(f"\nπ₯ Fetching sample points from source...") | |
| sample_size = 2000 # Fetch 20 sample points | |
| try: | |
| source_points_result, _ = source_client.scroll( | |
| collection_name=source_collection, | |
| limit=sample_size, | |
| with_payload=True, | |
| with_vectors=True | |
| ) | |
| if not source_points_result: | |
| print("β No points found in source collection!") | |
| return 1 | |
| print(f" β Fetched {len(source_points_result)} points from source") | |
| # Extract point IDs | |
| source_point_ids = [point.id for point in source_points_result] | |
| print(f" Point IDs: {source_point_ids[:5]}{'...' if len(source_point_ids) > 5 else ''}") | |
| except Exception as e: | |
| print(f"β Error fetching source points: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return 1 | |
| # Fetch same points from destination | |
| print(f"\nπ₯ Fetching same points from destination by ID...") | |
| try: | |
| dest_points_dict = fetch_points_by_ids(dest_client, dest_collection, source_point_ids) | |
| print(f" β Fetched {len(dest_points_dict)} points from destination") | |
| missing_ids = set(source_point_ids) - set(dest_points_dict.keys()) | |
| if missing_ids: | |
| print(f" β οΈ Missing {len(missing_ids)} points in destination: {list(missing_ids)[:5]}{'...' if len(missing_ids) > 5 else ''}") | |
| except Exception as e: | |
| print(f"β Error fetching destination points: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return 1 | |
| # Compare points | |
| print(f"\nπ Point-by-Point Comparison") | |
| print("="*70) | |
| comparison_results = [] | |
| for source_point in source_points_result: | |
| point_id = source_point.id | |
| dest_point = dest_points_dict.get(point_id) | |
| if dest_point is None: | |
| comparison_results.append({ | |
| "point_id": point_id, | |
| "status": "MISSING", | |
| "matches": [], | |
| "differences": [f"Point not found in destination"] | |
| }) | |
| else: | |
| comparison = compare_points(source_point, dest_point, point_id) | |
| comparison["status"] = "MATCH" if comparison["diff_count"] == 0 else "DIFF" | |
| comparison_results.append(comparison) | |
| # Summary | |
| matches = [r for r in comparison_results if r["status"] == "MATCH"] | |
| diffs = [r for r in comparison_results if r["status"] == "DIFF"] | |
| missing = [r for r in comparison_results if r["status"] == "MISSING"] | |
| print(f"\nπ Comparison Summary:") | |
| print(f" Total points compared: {len(comparison_results)}") | |
| print(f" β Perfect matches: {len(matches)}") | |
| print(f" β οΈ Differences found: {len(diffs)}") | |
| print(f" β Missing in destination: {len(missing)}") | |
| # Show details for points with differences | |
| if diffs: | |
| print(f"\nβ οΈ Points with differences:") | |
| for diff_result in diffs[:10]: # Show first 10 | |
| print(f"\n Point ID: {diff_result['point_id']}") | |
| if diff_result['matches']: | |
| print(f" β Matches ({len(diff_result['matches'])}): {', '.join(diff_result['matches'][:5])}") | |
| if diff_result['differences']: | |
| print(f" β Differences ({len(diff_result['differences'])}):") | |
| for d in diff_result['differences'][:5]: | |
| print(f" - {d}") | |
| if missing: | |
| print(f"\nβ Missing points in destination:") | |
| for missing_result in missing[:10]: | |
| print(f" - Point ID: {missing_result['point_id']}") | |
| # Final verdict | |
| print(f"\n" + "="*70) | |
| if len(missing) == 0 and len(diffs) == 0: | |
| print("β VERIFICATION PASSED: All points match perfectly!") | |
| return 0 | |
| elif len(missing) == 0: | |
| print(f"β οΈ VERIFICATION PARTIAL: All points present but {len(diffs)} have differences") | |
| return 1 | |
| else: | |
| print(f"β VERIFICATION FAILED: {len(missing)} points missing, {len(diffs)} have differences") | |
| return 1 | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |