object-memory / qdrant_utils /qdrant_restore.py
russ4stall
fresh history
24f3fb6
#!/usr/bin/env python3
"""
Qdrant Collection Restore Script
Restores collection data from JSON backup files
"""
import json
import os
from datetime import datetime
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct
from typing import List, Dict, Any
from qdrant_client import QdrantClient
from core.clients import get_qdrant
from core.config import QDRANT_COLLECTION
def restore_collection(client, collection_name, backup_dir, batch_size=100):
"""
Restore a Qdrant collection from backup files
Args:
client: QdrantClient instance
collection_name: Name of collection to restore to
backup_dir: Directory containing backup files
batch_size: Number of points to upload per batch
"""
print(f"Starting restore of collection '{collection_name}' from {backup_dir}...")
# Check if backup files exist
points_file = os.path.join(backup_dir, "points.json")
collection_info_file = os.path.join(backup_dir, "collection_info.json")
if not os.path.exists(points_file):
print(f"❌ Points file not found: {points_file}")
return False
if not os.path.exists(collection_info_file):
print(f"❌ Collection info file not found: {collection_info_file}")
return False
try:
# 1. Load backup metadata
with open(collection_info_file, "r") as f:
collection_info = json.load(f)
print(f"βœ“ Loaded collection metadata")
print(f" Original vectors: {list(collection_info.get('vectors_config', {}).keys())}")
# 2. Load points data
with open(points_file, "r") as f:
points_data = json.load(f)
print(f"βœ“ Loaded {len(points_data)} points from backup")
# 3. Verify collection exists and get current config
try:
current_collection = client.get_collection(collection_name)
current_vectors = current_collection.config.params.vectors
print(f"βœ“ Target collection exists with vectors: {list(current_vectors.keys())}")
except Exception as e:
print(f"❌ Target collection '{collection_name}' not found or accessible: {e}")
return False
# 4. Convert backup points to PointStruct objects
points_to_restore = []
skipped_points = 0
for point_data in points_data:
try:
# Convert point data to PointStruct
point_id = point_data["id"]
payload = point_data.get("payload", {})
vectors = point_data.get("vector", {})
# Filter vectors to only include those that exist in target collection
filtered_vectors = {}
for vector_name, vector_data in vectors.items():
if vector_name in current_vectors:
filtered_vectors[vector_name] = vector_data
else:
print(f" Skipping vector '{vector_name}' for point {point_id} (not in target collection)")
# Only add point if it has at least one valid vector
if filtered_vectors:
# For named vectors, always pass as dict, never as single vector
point_struct = PointStruct(
id=point_id,
payload=payload,
vector=filtered_vectors # Always pass as dict for named vectors
)
points_to_restore.append(point_struct)
else:
skipped_points += 1
print(f" Skipping point {point_id} (no valid vectors)")
except Exception as e:
print(f" Error processing point {point_data.get('id', 'unknown')}: {e}")
skipped_points += 1
continue
print(f"βœ“ Prepared {len(points_to_restore)} points for restore")
if skipped_points > 0:
print(f" Skipped {skipped_points} points")
# 5. Upload points in batches
total_uploaded = 0
for i in range(0, len(points_to_restore), batch_size):
batch = points_to_restore[i:i + batch_size]
try:
operation_info = client.upsert(
collection_name=collection_name,
points=batch,
wait=True # Wait for completion
)
total_uploaded += len(batch)
print(f" Uploaded batch {i//batch_size + 1}: {len(batch)} points (Total: {total_uploaded}/{len(points_to_restore)})")
except Exception as e:
print(f"❌ Error uploading batch {i//batch_size + 1}: {e}")
return False
# 6. Verify restoration
collection_info = client.get_collection(collection_name)
point_count = client.count(collection_name=collection_name).count
print(f"βœ“ Restore completed successfully!")
print(f" Points in collection: {point_count}")
print(f" Points uploaded: {total_uploaded}")
print(f" Collection status: {collection_info.status}")
return True
except Exception as e:
print(f"❌ Restore failed: {str(e)}")
return False
def find_latest_backup(backup_root_dir, collection_name):
"""
Find the most recent backup directory for a collection
Args:
backup_root_dir: Root directory containing backups
collection_name: Name of the collection
Returns:
Path to the latest backup directory or None
"""
if not os.path.exists(backup_root_dir):
return None
# Look for directories matching pattern: {collection_name}_{timestamp}
backup_dirs = []
for item in os.listdir(backup_root_dir):
item_path = os.path.join(backup_root_dir, item)
if os.path.isdir(item_path) and item.startswith(f"{collection_name}_"):
backup_dirs.append(item_path)
if not backup_dirs:
return None
# Sort by modification time, return most recent
backup_dirs.sort(key=os.path.getmtime, reverse=True)
return backup_dirs[0]
def main():
# Configuration
COLLECTION_NAME = QDRANT_COLLECTION # Replace with your collection name
BACKUP_ROOT_DIR = "./backups" # Directory containing backup folders
# You can specify exact backup directory or let script find latest
SPECIFIC_BACKUP_DIR = None # e.g., "./backups/my_collection_20250610_143022"
# Initialize client
client = get_qdrant()
# Find backup directory
if SPECIFIC_BACKUP_DIR:
backup_dir = SPECIFIC_BACKUP_DIR
else:
backup_dir = find_latest_backup(BACKUP_ROOT_DIR, COLLECTION_NAME)
if not backup_dir or not os.path.exists(backup_dir):
print(f"❌ No backup found for collection '{COLLECTION_NAME}'")
print(f" Looked in: {BACKUP_ROOT_DIR}")
return
RESTORE_TO_DB = COLLECTION_NAME + '_v2'
print(f"Using backup: {backup_dir}")
print(f'Resotring to: {RESTORE_TO_DB}')
# Confirm before proceeding
response = input(f"Restore collection '{RESTORE_TO_DB}' from backup? (y/N): ")
if response.lower() not in ['y', 'yes']:
print("Restore cancelled.")
return
# Perform restore
success = restore_collection(client, RESTORE_TO_DB, backup_dir)
if success:
print(f"\nπŸŽ‰ Restore completed successfully!")
else:
print(f"\n❌ Restore failed!")
if __name__ == "__main__":
main()