Spaces:
Configuration error
Configuration error
| #!/usr/bin/env python3 | |
| """ | |
| Qdrant Collection Restore Script | |
| Restores collection data from JSON backup files | |
| """ | |
| import json | |
| import os | |
| from datetime import datetime | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.models import PointStruct | |
| from typing import List, Dict, Any | |
| from qdrant_client import QdrantClient | |
| from core.clients import get_qdrant | |
| from core.config import QDRANT_COLLECTION | |
| def restore_collection(client, collection_name, backup_dir, batch_size=100): | |
| """ | |
| Restore a Qdrant collection from backup files | |
| Args: | |
| client: QdrantClient instance | |
| collection_name: Name of collection to restore to | |
| backup_dir: Directory containing backup files | |
| batch_size: Number of points to upload per batch | |
| """ | |
| print(f"Starting restore of collection '{collection_name}' from {backup_dir}...") | |
| # Check if backup files exist | |
| points_file = os.path.join(backup_dir, "points.json") | |
| collection_info_file = os.path.join(backup_dir, "collection_info.json") | |
| if not os.path.exists(points_file): | |
| print(f"β Points file not found: {points_file}") | |
| return False | |
| if not os.path.exists(collection_info_file): | |
| print(f"β Collection info file not found: {collection_info_file}") | |
| return False | |
| try: | |
| # 1. Load backup metadata | |
| with open(collection_info_file, "r") as f: | |
| collection_info = json.load(f) | |
| print(f"β Loaded collection metadata") | |
| print(f" Original vectors: {list(collection_info.get('vectors_config', {}).keys())}") | |
| # 2. Load points data | |
| with open(points_file, "r") as f: | |
| points_data = json.load(f) | |
| print(f"β Loaded {len(points_data)} points from backup") | |
| # 3. Verify collection exists and get current config | |
| try: | |
| current_collection = client.get_collection(collection_name) | |
| current_vectors = current_collection.config.params.vectors | |
| print(f"β Target collection exists with vectors: {list(current_vectors.keys())}") | |
| except Exception as e: | |
| print(f"β Target collection '{collection_name}' not found or accessible: {e}") | |
| return False | |
| # 4. Convert backup points to PointStruct objects | |
| points_to_restore = [] | |
| skipped_points = 0 | |
| for point_data in points_data: | |
| try: | |
| # Convert point data to PointStruct | |
| point_id = point_data["id"] | |
| payload = point_data.get("payload", {}) | |
| vectors = point_data.get("vector", {}) | |
| # Filter vectors to only include those that exist in target collection | |
| filtered_vectors = {} | |
| for vector_name, vector_data in vectors.items(): | |
| if vector_name in current_vectors: | |
| filtered_vectors[vector_name] = vector_data | |
| else: | |
| print(f" Skipping vector '{vector_name}' for point {point_id} (not in target collection)") | |
| # Only add point if it has at least one valid vector | |
| if filtered_vectors: | |
| # For named vectors, always pass as dict, never as single vector | |
| point_struct = PointStruct( | |
| id=point_id, | |
| payload=payload, | |
| vector=filtered_vectors # Always pass as dict for named vectors | |
| ) | |
| points_to_restore.append(point_struct) | |
| else: | |
| skipped_points += 1 | |
| print(f" Skipping point {point_id} (no valid vectors)") | |
| except Exception as e: | |
| print(f" Error processing point {point_data.get('id', 'unknown')}: {e}") | |
| skipped_points += 1 | |
| continue | |
| print(f"β Prepared {len(points_to_restore)} points for restore") | |
| if skipped_points > 0: | |
| print(f" Skipped {skipped_points} points") | |
| # 5. Upload points in batches | |
| total_uploaded = 0 | |
| for i in range(0, len(points_to_restore), batch_size): | |
| batch = points_to_restore[i:i + batch_size] | |
| try: | |
| operation_info = client.upsert( | |
| collection_name=collection_name, | |
| points=batch, | |
| wait=True # Wait for completion | |
| ) | |
| total_uploaded += len(batch) | |
| print(f" Uploaded batch {i//batch_size + 1}: {len(batch)} points (Total: {total_uploaded}/{len(points_to_restore)})") | |
| except Exception as e: | |
| print(f"β Error uploading batch {i//batch_size + 1}: {e}") | |
| return False | |
| # 6. Verify restoration | |
| collection_info = client.get_collection(collection_name) | |
| point_count = client.count(collection_name=collection_name).count | |
| print(f"β Restore completed successfully!") | |
| print(f" Points in collection: {point_count}") | |
| print(f" Points uploaded: {total_uploaded}") | |
| print(f" Collection status: {collection_info.status}") | |
| return True | |
| except Exception as e: | |
| print(f"β Restore failed: {str(e)}") | |
| return False | |
| def find_latest_backup(backup_root_dir, collection_name): | |
| """ | |
| Find the most recent backup directory for a collection | |
| Args: | |
| backup_root_dir: Root directory containing backups | |
| collection_name: Name of the collection | |
| Returns: | |
| Path to the latest backup directory or None | |
| """ | |
| if not os.path.exists(backup_root_dir): | |
| return None | |
| # Look for directories matching pattern: {collection_name}_{timestamp} | |
| backup_dirs = [] | |
| for item in os.listdir(backup_root_dir): | |
| item_path = os.path.join(backup_root_dir, item) | |
| if os.path.isdir(item_path) and item.startswith(f"{collection_name}_"): | |
| backup_dirs.append(item_path) | |
| if not backup_dirs: | |
| return None | |
| # Sort by modification time, return most recent | |
| backup_dirs.sort(key=os.path.getmtime, reverse=True) | |
| return backup_dirs[0] | |
| def main(): | |
| # Configuration | |
| COLLECTION_NAME = QDRANT_COLLECTION # Replace with your collection name | |
| BACKUP_ROOT_DIR = "./backups" # Directory containing backup folders | |
| # You can specify exact backup directory or let script find latest | |
| SPECIFIC_BACKUP_DIR = None # e.g., "./backups/my_collection_20250610_143022" | |
| # Initialize client | |
| client = get_qdrant() | |
| # Find backup directory | |
| if SPECIFIC_BACKUP_DIR: | |
| backup_dir = SPECIFIC_BACKUP_DIR | |
| else: | |
| backup_dir = find_latest_backup(BACKUP_ROOT_DIR, COLLECTION_NAME) | |
| if not backup_dir or not os.path.exists(backup_dir): | |
| print(f"β No backup found for collection '{COLLECTION_NAME}'") | |
| print(f" Looked in: {BACKUP_ROOT_DIR}") | |
| return | |
| RESTORE_TO_DB = COLLECTION_NAME + '_v2' | |
| print(f"Using backup: {backup_dir}") | |
| print(f'Resotring to: {RESTORE_TO_DB}') | |
| # Confirm before proceeding | |
| response = input(f"Restore collection '{RESTORE_TO_DB}' from backup? (y/N): ") | |
| if response.lower() not in ['y', 'yes']: | |
| print("Restore cancelled.") | |
| return | |
| # Perform restore | |
| success = restore_collection(client, RESTORE_TO_DB, backup_dir) | |
| if success: | |
| print(f"\nπ Restore completed successfully!") | |
| else: | |
| print(f"\nβ Restore failed!") | |
| if __name__ == "__main__": | |
| main() |