#!/usr/bin/env python3 """ Qdrant Migration Verification Script This script compares the source and destination Qdrant collections to verify that the migration was successful. It: 1. Compares collection configurations 2. Fetches sample points from source 3. Retrieves same points from destination using IDs 4. Compares vectors, metadata, and all attributes """ import os import sys from typing import List, Dict, Any, Optional from pathlib import Path from qdrant_client import QdrantClient import json # Try to import config loader and dotenv for automatic source detection try: from src.config.loader import load_config CONFIG_AVAILABLE = True except ImportError: CONFIG_AVAILABLE = False try: from dotenv import load_dotenv DOTENV_AVAILABLE = True except ImportError: DOTENV_AVAILABLE = False # Load .env file automatically if available if DOTENV_AVAILABLE: project_root = Path(__file__).parent env_file = project_root / ".env" if env_file.exists(): load_dotenv(env_file, override=True) else: load_dotenv(override=True) def get_collection_info(client: QdrantClient, collection_name: str) -> Dict[str, Any]: """Get collection information including vector size and point count.""" try: collection_info = client.get_collection(collection_name) # Handle different Qdrant client versions and response formats if hasattr(collection_info, 'config'): config = collection_info.config if hasattr(config, 'params') and hasattr(config.params, 'vectors'): vectors_config = config.params.vectors if isinstance(vectors_config, dict): vector_size = vectors_config.get('size') distance = vectors_config.get('distance') else: vector_size = getattr(vectors_config, 'size', None) distance = getattr(vectors_config, 'distance', None) else: vector_size = getattr(config, 'vector_size', None) distance = getattr(config, 'distance', None) else: vector_size = getattr(collection_info, 'vector_size', None) distance = getattr(collection_info, 'distance', None) points_count = getattr(collection_info, 'points_count', 0) indexed_vectors_count = getattr(collection_info, 'indexed_vectors_count', 0) if vector_size is None: try: result, _ = client.scroll(collection_name=collection_name, limit=1, with_vectors=True) if result and hasattr(result[0], 'vector') and result[0].vector: vector_size = len(result[0].vector) except Exception: pass return { "vector_size": vector_size, "distance": distance or "Cosine", "points_count": points_count, "indexed_vectors_count": indexed_vectors_count, } except Exception as e: print(f"❌ Error getting collection info: {e}") return None def fetch_points_by_ids(client: QdrantClient, collection_name: str, point_ids: List) -> Dict: """Fetch points by their IDs from a collection.""" try: points = client.retrieve( collection_name=collection_name, ids=point_ids, with_payload=True, with_vectors=True ) return {point.id: point for point in points} except Exception as e: print(f"❌ Error fetching points by IDs: {e}") return {} def compare_points(source_point, dest_point, point_id) -> Dict[str, Any]: """Compare two points and return differences.""" differences = [] matches = [] # Compare IDs if source_point.id == dest_point.id: matches.append("ID") else: differences.append(f"ID: source={source_point.id}, dest={dest_point.id}") # Compare vectors source_vec = getattr(source_point, 'vector', None) dest_vec = getattr(dest_point, 'vector', None) if source_vec is None and dest_vec is None: matches.append("Vector (both None)") elif source_vec is None or dest_vec is None: differences.append(f"Vector: source={'None' if source_vec is None else f'len={len(source_vec)}'}, dest={'None' if dest_vec is None else f'len={len(dest_vec)}'}") elif len(source_vec) != len(dest_vec): differences.append(f"Vector length: source={len(source_vec)}, dest={len(dest_vec)}") else: # Compare vector values (with tolerance for floating point) import numpy as np try: vec_diff = np.abs(np.array(source_vec) - np.array(dest_vec)) max_diff = float(np.max(vec_diff)) if max_diff < 1e-6: matches.append(f"Vector (max diff: {max_diff:.2e})") else: differences.append(f"Vector values differ (max diff: {max_diff:.2e})") except Exception as e: differences.append(f"Vector comparison error: {e}") # Compare payloads source_payload = getattr(source_point, 'payload', {}) or {} dest_payload = getattr(dest_point, 'payload', {}) or {} # Convert to dicts if needed if hasattr(source_payload, '__dict__'): source_payload = source_payload.__dict__ if hasattr(dest_payload, '__dict__'): dest_payload = dest_payload.__dict__ source_keys = set(source_payload.keys()) dest_keys = set(dest_payload.keys()) if source_keys != dest_keys: missing_in_dest = source_keys - dest_keys extra_in_dest = dest_keys - source_keys if missing_in_dest: differences.append(f"Payload keys missing in dest: {missing_in_dest}") if extra_in_dest: differences.append(f"Payload keys extra in dest: {extra_in_dest}") # Compare payload values common_keys = source_keys & dest_keys for key in common_keys: source_val = source_payload[key] dest_val = dest_payload[key] if source_val == dest_val: matches.append(f"Payload.{key}") else: # Handle nested structures if isinstance(source_val, dict) and isinstance(dest_val, dict): if source_val != dest_val: differences.append(f"Payload.{key}: dicts differ") elif isinstance(source_val, list) and isinstance(dest_val, list): if source_val != dest_val: differences.append(f"Payload.{key}: lists differ (len: {len(source_val)} vs {len(dest_val)})") else: differences.append(f"Payload.{key}: '{source_val}' != '{dest_val}'") return { "point_id": point_id, "matches": matches, "differences": differences, "match_count": len(matches), "diff_count": len(differences) } def main(): print("="*70) print("Qdrant Migration Verification Script") print("="*70) # Auto-detect source from config and .env file source_url = os.getenv('QDRANT_URL') source_key = os.getenv('QDRANT_API_KEY') source_collection = os.getenv('QDRANT_COLLECTION', 'docling') if CONFIG_AVAILABLE: try: config = load_config() qdrant_config = config.get('qdrant', {}) if not source_url: source_url = qdrant_config.get('url') if not source_key: source_key = qdrant_config.get('api_key') if not source_collection: source_collection = qdrant_config.get('collection_name', 'docling') except Exception as e: print(f"⚠️ Could not load config: {e}") # Get destination from env dest_url = os.getenv('DEST_QDRANT_URL') dest_key = os.getenv('DEST_QDRANT_API_KEY') dest_collection = os.getenv('DEST_COLLECTION') # Optional, will auto-detect # Validate if not source_url or not source_key: print("❌ Source Qdrant credentials missing!") print(" Set QDRANT_URL and QDRANT_API_KEY in .env or environment") return 1 if not dest_url or not dest_key: print("❌ Destination Qdrant credentials missing!") print(" Set DEST_QDRANT_URL and DEST_QDRANT_API_KEY in .env or environment") return 1 print(f"\n📋 Configuration:") print(f" Source: {source_url}") print(f" Source Collection: {source_collection}") print(f" Destination: {dest_url}") if dest_collection: print(f" Destination Collection: {dest_collection} (specified)") else: print(f" Destination Collection: (auto-detect)") # Connect to Qdrant instances print(f"\n🔌 Connecting to Qdrant instances...") try: source_client = QdrantClient(url=source_url, api_key=source_key, timeout=120) print(f" ✅ Connected to source") except Exception as e: print(f" ❌ Failed to connect to source: {e}") return 1 try: dest_client = QdrantClient(url=dest_url, api_key=dest_key, timeout=120) print(f" ✅ Connected to destination") except Exception as e: print(f" ❌ Failed to connect to destination: {e}") return 1 # Auto-detect destination collection if not specified if not dest_collection: try: collections = dest_client.get_collections().collections collection_names = [c.name for c in collections] if len(collection_names) == 1: dest_collection = collection_names[0] print(f"\n📋 Auto-detected destination collection: '{dest_collection}'") elif len(collection_names) > 1: print(f"\n⚠️ Found {len(collection_names)} collections in destination:") for name in collection_names: print(f" - {name}") print(f"\n Using first collection: '{collection_names[0]}'") dest_collection = collection_names[0] else: print("❌ No collections found in destination!") return 1 except Exception as e: print(f"❌ Could not list destination collections: {e}") return 1 # Get collection info print(f"\n📊 Collection Information Comparison") print("="*70) source_info = get_collection_info(source_client, source_collection) dest_info = get_collection_info(dest_client, dest_collection) if not source_info: print("❌ Could not get source collection info") return 1 if not dest_info: print("❌ Could not get destination collection info") return 1 print(f"\nSource Collection ('{source_collection}'):") print(f" Vector size: {source_info['vector_size']}") print(f" Distance: {source_info['distance']}") print(f" Points: {source_info['points_count']:,}") print(f" Indexed: {source_info['indexed_vectors_count']:,}") print(f"\nDestination Collection ('{dest_collection}'):") print(f" Vector size: {dest_info['vector_size']}") print(f" Distance: {dest_info['distance']}") print(f" Points: {dest_info['points_count']:,}") print(f" Indexed: {dest_info['indexed_vectors_count']:,}") # Compare configs print(f"\n🔍 Configuration Comparison:") config_matches = [] config_diffs = [] if source_info['vector_size'] == dest_info['vector_size']: config_matches.append(f"Vector size: {source_info['vector_size']}") else: config_diffs.append(f"Vector size: source={source_info['vector_size']}, dest={dest_info['vector_size']}") if str(source_info['distance']) == str(dest_info['distance']): config_matches.append(f"Distance: {source_info['distance']}") else: config_diffs.append(f"Distance: source={source_info['distance']}, dest={dest_info['distance']}") if source_info['points_count'] == dest_info['points_count']: config_matches.append(f"Points count: {source_info['points_count']:,}") else: config_diffs.append(f"Points count: source={source_info['points_count']:,}, dest={dest_info['points_count']:,}") if config_matches: print(f" ✅ Matches: {len(config_matches)}") for match in config_matches: print(f" - {match}") if config_diffs: print(f" ❌ Differences: {len(config_diffs)}") for diff in config_diffs: print(f" - {diff}") # Fetch sample points from source print(f"\n📥 Fetching sample points from source...") sample_size = 2000 # Fetch 20 sample points try: source_points_result, _ = source_client.scroll( collection_name=source_collection, limit=sample_size, with_payload=True, with_vectors=True ) if not source_points_result: print("❌ No points found in source collection!") return 1 print(f" ✅ Fetched {len(source_points_result)} points from source") # Extract point IDs source_point_ids = [point.id for point in source_points_result] print(f" Point IDs: {source_point_ids[:5]}{'...' if len(source_point_ids) > 5 else ''}") except Exception as e: print(f"❌ Error fetching source points: {e}") import traceback traceback.print_exc() return 1 # Fetch same points from destination print(f"\n📥 Fetching same points from destination by ID...") try: dest_points_dict = fetch_points_by_ids(dest_client, dest_collection, source_point_ids) print(f" ✅ Fetched {len(dest_points_dict)} points from destination") missing_ids = set(source_point_ids) - set(dest_points_dict.keys()) if missing_ids: print(f" ⚠️ Missing {len(missing_ids)} points in destination: {list(missing_ids)[:5]}{'...' if len(missing_ids) > 5 else ''}") except Exception as e: print(f"❌ Error fetching destination points: {e}") import traceback traceback.print_exc() return 1 # Compare points print(f"\n🔍 Point-by-Point Comparison") print("="*70) comparison_results = [] for source_point in source_points_result: point_id = source_point.id dest_point = dest_points_dict.get(point_id) if dest_point is None: comparison_results.append({ "point_id": point_id, "status": "MISSING", "matches": [], "differences": [f"Point not found in destination"] }) else: comparison = compare_points(source_point, dest_point, point_id) comparison["status"] = "MATCH" if comparison["diff_count"] == 0 else "DIFF" comparison_results.append(comparison) # Summary matches = [r for r in comparison_results if r["status"] == "MATCH"] diffs = [r for r in comparison_results if r["status"] == "DIFF"] missing = [r for r in comparison_results if r["status"] == "MISSING"] print(f"\n📊 Comparison Summary:") print(f" Total points compared: {len(comparison_results)}") print(f" ✅ Perfect matches: {len(matches)}") print(f" ⚠️ Differences found: {len(diffs)}") print(f" ❌ Missing in destination: {len(missing)}") # Show details for points with differences if diffs: print(f"\n⚠️ Points with differences:") for diff_result in diffs[:10]: # Show first 10 print(f"\n Point ID: {diff_result['point_id']}") if diff_result['matches']: print(f" ✅ Matches ({len(diff_result['matches'])}): {', '.join(diff_result['matches'][:5])}") if diff_result['differences']: print(f" ❌ Differences ({len(diff_result['differences'])}):") for d in diff_result['differences'][:5]: print(f" - {d}") if missing: print(f"\n❌ Missing points in destination:") for missing_result in missing[:10]: print(f" - Point ID: {missing_result['point_id']}") # Final verdict print(f"\n" + "="*70) if len(missing) == 0 and len(diffs) == 0: print("✅ VERIFICATION PASSED: All points match perfectly!") return 0 elif len(missing) == 0: print(f"⚠️ VERIFICATION PARTIAL: All points present but {len(diffs)} have differences") return 1 else: print(f"❌ VERIFICATION FAILED: {len(missing)} points missing, {len(diffs)} have differences") return 1 if __name__ == "__main__": sys.exit(main())