#!/usr/bin/env python3 """ Qdrant Collection Restore Script Restores collection data from JSON backup files """ import json import os from datetime import datetime from qdrant_client import QdrantClient from qdrant_client.models import PointStruct from typing import List, Dict, Any from qdrant_client import QdrantClient from core.clients import get_qdrant from core.config import QDRANT_COLLECTION def restore_collection(client, collection_name, backup_dir, batch_size=100): """ Restore a Qdrant collection from backup files Args: client: QdrantClient instance collection_name: Name of collection to restore to backup_dir: Directory containing backup files batch_size: Number of points to upload per batch """ print(f"Starting restore of collection '{collection_name}' from {backup_dir}...") # Check if backup files exist points_file = os.path.join(backup_dir, "points.json") collection_info_file = os.path.join(backup_dir, "collection_info.json") if not os.path.exists(points_file): print(f"❌ Points file not found: {points_file}") return False if not os.path.exists(collection_info_file): print(f"❌ Collection info file not found: {collection_info_file}") return False try: # 1. Load backup metadata with open(collection_info_file, "r") as f: collection_info = json.load(f) print(f"✓ Loaded collection metadata") print(f" Original vectors: {list(collection_info.get('vectors_config', {}).keys())}") # 2. Load points data with open(points_file, "r") as f: points_data = json.load(f) print(f"✓ Loaded {len(points_data)} points from backup") # 3. Verify collection exists and get current config try: current_collection = client.get_collection(collection_name) current_vectors = current_collection.config.params.vectors print(f"✓ Target collection exists with vectors: {list(current_vectors.keys())}") except Exception as e: print(f"❌ Target collection '{collection_name}' not found or accessible: {e}") return False # 4. Convert backup points to PointStruct objects points_to_restore = [] skipped_points = 0 for point_data in points_data: try: # Convert point data to PointStruct point_id = point_data["id"] payload = point_data.get("payload", {}) vectors = point_data.get("vector", {}) # Filter vectors to only include those that exist in target collection filtered_vectors = {} for vector_name, vector_data in vectors.items(): if vector_name in current_vectors: filtered_vectors[vector_name] = vector_data else: print(f" Skipping vector '{vector_name}' for point {point_id} (not in target collection)") # Only add point if it has at least one valid vector if filtered_vectors: # For named vectors, always pass as dict, never as single vector point_struct = PointStruct( id=point_id, payload=payload, vector=filtered_vectors # Always pass as dict for named vectors ) points_to_restore.append(point_struct) else: skipped_points += 1 print(f" Skipping point {point_id} (no valid vectors)") except Exception as e: print(f" Error processing point {point_data.get('id', 'unknown')}: {e}") skipped_points += 1 continue print(f"✓ Prepared {len(points_to_restore)} points for restore") if skipped_points > 0: print(f" Skipped {skipped_points} points") # 5. Upload points in batches total_uploaded = 0 for i in range(0, len(points_to_restore), batch_size): batch = points_to_restore[i:i + batch_size] try: operation_info = client.upsert( collection_name=collection_name, points=batch, wait=True # Wait for completion ) total_uploaded += len(batch) print(f" Uploaded batch {i//batch_size + 1}: {len(batch)} points (Total: {total_uploaded}/{len(points_to_restore)})") except Exception as e: print(f"❌ Error uploading batch {i//batch_size + 1}: {e}") return False # 6. Verify restoration collection_info = client.get_collection(collection_name) point_count = client.count(collection_name=collection_name).count print(f"✓ Restore completed successfully!") print(f" Points in collection: {point_count}") print(f" Points uploaded: {total_uploaded}") print(f" Collection status: {collection_info.status}") return True except Exception as e: print(f"❌ Restore failed: {str(e)}") return False def find_latest_backup(backup_root_dir, collection_name): """ Find the most recent backup directory for a collection Args: backup_root_dir: Root directory containing backups collection_name: Name of the collection Returns: Path to the latest backup directory or None """ if not os.path.exists(backup_root_dir): return None # Look for directories matching pattern: {collection_name}_{timestamp} backup_dirs = [] for item in os.listdir(backup_root_dir): item_path = os.path.join(backup_root_dir, item) if os.path.isdir(item_path) and item.startswith(f"{collection_name}_"): backup_dirs.append(item_path) if not backup_dirs: return None # Sort by modification time, return most recent backup_dirs.sort(key=os.path.getmtime, reverse=True) return backup_dirs[0] def main(): # Configuration COLLECTION_NAME = QDRANT_COLLECTION # Replace with your collection name BACKUP_ROOT_DIR = "./backups" # Directory containing backup folders # You can specify exact backup directory or let script find latest SPECIFIC_BACKUP_DIR = None # e.g., "./backups/my_collection_20250610_143022" # Initialize client client = get_qdrant() # Find backup directory if SPECIFIC_BACKUP_DIR: backup_dir = SPECIFIC_BACKUP_DIR else: backup_dir = find_latest_backup(BACKUP_ROOT_DIR, COLLECTION_NAME) if not backup_dir or not os.path.exists(backup_dir): print(f"❌ No backup found for collection '{COLLECTION_NAME}'") print(f" Looked in: {BACKUP_ROOT_DIR}") return RESTORE_TO_DB = COLLECTION_NAME + '_v2' print(f"Using backup: {backup_dir}") print(f'Resotring to: {RESTORE_TO_DB}') # Confirm before proceeding response = input(f"Restore collection '{RESTORE_TO_DB}' from backup? (y/N): ") if response.lower() not in ['y', 'yes']: print("Restore cancelled.") return # Perform restore success = restore_collection(client, RESTORE_TO_DB, backup_dir) if success: print(f"\n🎉 Restore completed successfully!") else: print(f"\n❌ Restore failed!") if __name__ == "__main__": main()