|
|
""" |
|
|
Data backup and persistence utilities for the Knowledge Assistant RAG application. |
|
|
Provides automated backup, restore, and data persistence verification capabilities. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import shutil |
|
|
import json |
|
|
import sqlite3 |
|
|
import asyncio |
|
|
import logging |
|
|
import zipfile |
|
|
import hashlib |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Dict, List, Optional, Any, Tuple |
|
|
from pathlib import Path |
|
|
from dataclasses import dataclass, asdict |
|
|
from sqlalchemy.ext.asyncio import AsyncSession |
|
|
from sqlalchemy import text, select |
|
|
from .database import User, DocumentMetadata, get_async_session, engine |
|
|
from .vector_store import get_qdrant_client |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class BackupMetadata: |
|
|
"""Backup metadata information""" |
|
|
backup_id: str |
|
|
timestamp: datetime |
|
|
backup_type: str |
|
|
file_path: str |
|
|
file_size_bytes: int |
|
|
checksum: str |
|
|
database_records: int |
|
|
vector_collections: List[str] |
|
|
status: str |
|
|
error_message: Optional[str] = None |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
"""Convert to dictionary for JSON serialization""" |
|
|
result = asdict(self) |
|
|
result['timestamp'] = self.timestamp.isoformat() |
|
|
return result |
|
|
|
|
|
|
|
|
class BackupManager: |
|
|
"""Comprehensive backup and restore manager""" |
|
|
|
|
|
def __init__(self, backup_dir: str = "backups"): |
|
|
self.backup_dir = Path(backup_dir) |
|
|
self.backup_dir.mkdir(exist_ok=True) |
|
|
self.database_path = "knowledge_assistant.db" |
|
|
self.max_backups = 10 |
|
|
|
|
|
def _generate_backup_id(self) -> str: |
|
|
"""Generate unique backup ID""" |
|
|
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") |
|
|
return f"backup_{timestamp}" |
|
|
|
|
|
def _calculate_file_checksum(self, file_path: str) -> str: |
|
|
"""Calculate SHA-256 checksum of a file""" |
|
|
hash_sha256 = hashlib.sha256() |
|
|
try: |
|
|
with open(file_path, "rb") as f: |
|
|
for chunk in iter(lambda: f.read(4096), b""): |
|
|
hash_sha256.update(chunk) |
|
|
return hash_sha256.hexdigest() |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to calculate checksum for {file_path}: {str(e)}") |
|
|
return "" |
|
|
|
|
|
async def backup_database(self, backup_id: str, backup_dir: Path) -> Tuple[str, int]: |
|
|
"""Backup SQLite database""" |
|
|
try: |
|
|
db_backup_path = backup_dir / "database.db" |
|
|
|
|
|
|
|
|
if os.path.exists(self.database_path): |
|
|
|
|
|
source_conn = sqlite3.connect(self.database_path) |
|
|
|
|
|
|
|
|
backup_conn = sqlite3.connect(str(db_backup_path)) |
|
|
|
|
|
|
|
|
source_conn.backup(backup_conn) |
|
|
|
|
|
|
|
|
backup_conn.close() |
|
|
source_conn.close() |
|
|
|
|
|
|
|
|
conn = sqlite3.connect(str(db_backup_path)) |
|
|
cursor = conn.cursor() |
|
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM users") |
|
|
user_count = cursor.fetchone()[0] |
|
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM documents") |
|
|
doc_count = cursor.fetchone()[0] |
|
|
|
|
|
conn.close() |
|
|
|
|
|
total_records = user_count + doc_count |
|
|
logger.info(f"Database backup completed: {total_records} records") |
|
|
|
|
|
return str(db_backup_path), total_records |
|
|
else: |
|
|
logger.warning(f"Database file not found: {self.database_path}") |
|
|
return "", 0 |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Database backup failed: {str(e)}") |
|
|
raise |
|
|
|
|
|
async def backup_qdrant_collections(self, backup_id: str, backup_dir: Path) -> List[str]: |
|
|
"""Backup Qdrant vector collections""" |
|
|
try: |
|
|
client = get_qdrant_client() |
|
|
collections_info = client.get_collections() |
|
|
backed_up_collections = [] |
|
|
|
|
|
vectors_dir = backup_dir / "vectors" |
|
|
vectors_dir.mkdir(exist_ok=True) |
|
|
|
|
|
for collection in collections_info.collections: |
|
|
collection_name = collection.name |
|
|
|
|
|
try: |
|
|
|
|
|
collection_info = client.get_collection(collection_name) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
points, _ = client.scroll( |
|
|
collection_name=collection_name, |
|
|
limit=10000, |
|
|
with_payload=True, |
|
|
with_vectors=True |
|
|
) |
|
|
|
|
|
|
|
|
collection_backup = { |
|
|
"collection_info": { |
|
|
"name": collection_name, |
|
|
"config": collection_info.config.dict() if hasattr(collection_info, 'config') else {}, |
|
|
"points_count": len(points) |
|
|
}, |
|
|
"points": [] |
|
|
} |
|
|
|
|
|
|
|
|
for point in points: |
|
|
point_data = { |
|
|
"id": str(point.id), |
|
|
"vector": point.vector.tolist() if hasattr(point.vector, 'tolist') else point.vector, |
|
|
"payload": point.payload |
|
|
} |
|
|
collection_backup["points"].append(point_data) |
|
|
|
|
|
|
|
|
collection_file = vectors_dir / f"{collection_name}.json" |
|
|
with open(collection_file, 'w') as f: |
|
|
json.dump(collection_backup, f, indent=2) |
|
|
|
|
|
backed_up_collections.append(collection_name) |
|
|
logger.info(f"Backed up collection '{collection_name}' with {len(points)} points") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to backup collection '{collection_name}': {str(e)}") |
|
|
continue |
|
|
|
|
|
return backed_up_collections |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Qdrant backup failed: {str(e)}") |
|
|
return [] |
|
|
|
|
|
async def create_full_backup(self) -> BackupMetadata: |
|
|
"""Create a full backup of database and vector collections""" |
|
|
backup_id = self._generate_backup_id() |
|
|
backup_start = datetime.utcnow() |
|
|
|
|
|
logger.info(f"Starting full backup: {backup_id}") |
|
|
|
|
|
try: |
|
|
|
|
|
backup_dir = self.backup_dir / backup_id |
|
|
backup_dir.mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
db_path, db_records = await self.backup_database(backup_id, backup_dir) |
|
|
|
|
|
|
|
|
vector_collections = await self.backup_qdrant_collections(backup_id, backup_dir) |
|
|
|
|
|
|
|
|
metadata = { |
|
|
"backup_id": backup_id, |
|
|
"timestamp": backup_start.isoformat(), |
|
|
"backup_type": "full", |
|
|
"database_records": db_records, |
|
|
"vector_collections": vector_collections, |
|
|
"status": "completed" |
|
|
} |
|
|
|
|
|
|
|
|
metadata_file = backup_dir / "metadata.json" |
|
|
with open(metadata_file, 'w') as f: |
|
|
json.dump(metadata, f, indent=2) |
|
|
|
|
|
|
|
|
archive_path = self.backup_dir / f"{backup_id}.zip" |
|
|
with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as zipf: |
|
|
for file_path in backup_dir.rglob('*'): |
|
|
if file_path.is_file(): |
|
|
arcname = file_path.relative_to(backup_dir) |
|
|
zipf.write(file_path, arcname) |
|
|
|
|
|
|
|
|
shutil.rmtree(backup_dir) |
|
|
|
|
|
|
|
|
file_size = archive_path.stat().st_size |
|
|
checksum = self._calculate_file_checksum(str(archive_path)) |
|
|
|
|
|
backup_metadata = BackupMetadata( |
|
|
backup_id=backup_id, |
|
|
timestamp=backup_start, |
|
|
backup_type="full", |
|
|
file_path=str(archive_path), |
|
|
file_size_bytes=file_size, |
|
|
checksum=checksum, |
|
|
database_records=db_records, |
|
|
vector_collections=vector_collections, |
|
|
status="completed" |
|
|
) |
|
|
|
|
|
|
|
|
await self._update_backup_registry(backup_metadata) |
|
|
|
|
|
logger.info(f"Full backup completed: {backup_id} ({file_size} bytes)") |
|
|
return backup_metadata |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Full backup failed: {str(e)}") |
|
|
|
|
|
|
|
|
backup_dir = self.backup_dir / backup_id |
|
|
if backup_dir.exists(): |
|
|
shutil.rmtree(backup_dir) |
|
|
|
|
|
archive_path = self.backup_dir / f"{backup_id}.zip" |
|
|
if archive_path.exists(): |
|
|
archive_path.unlink() |
|
|
|
|
|
return BackupMetadata( |
|
|
backup_id=backup_id, |
|
|
timestamp=backup_start, |
|
|
backup_type="full", |
|
|
file_path="", |
|
|
file_size_bytes=0, |
|
|
checksum="", |
|
|
database_records=0, |
|
|
vector_collections=[], |
|
|
status="failed", |
|
|
error_message=str(e) |
|
|
) |
|
|
|
|
|
async def restore_from_backup(self, backup_id: str) -> bool: |
|
|
"""Restore data from a backup""" |
|
|
try: |
|
|
logger.info(f"Starting restore from backup: {backup_id}") |
|
|
|
|
|
|
|
|
archive_path = self.backup_dir / f"{backup_id}.zip" |
|
|
if not archive_path.exists(): |
|
|
raise FileNotFoundError(f"Backup file not found: {archive_path}") |
|
|
|
|
|
|
|
|
restore_dir = self.backup_dir / f"restore_{backup_id}" |
|
|
restore_dir.mkdir(exist_ok=True) |
|
|
|
|
|
try: |
|
|
|
|
|
with zipfile.ZipFile(archive_path, 'r') as zipf: |
|
|
zipf.extractall(restore_dir) |
|
|
|
|
|
|
|
|
metadata_file = restore_dir / "metadata.json" |
|
|
if metadata_file.exists(): |
|
|
with open(metadata_file, 'r') as f: |
|
|
metadata = json.load(f) |
|
|
logger.info(f"Restoring backup from {metadata['timestamp']}") |
|
|
|
|
|
|
|
|
db_backup_path = restore_dir / "database.db" |
|
|
if db_backup_path.exists(): |
|
|
|
|
|
if os.path.exists(self.database_path): |
|
|
backup_current = f"{self.database_path}.backup_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}" |
|
|
shutil.copy2(self.database_path, backup_current) |
|
|
logger.info(f"Current database backed up to: {backup_current}") |
|
|
|
|
|
|
|
|
shutil.copy2(db_backup_path, self.database_path) |
|
|
logger.info("Database restored successfully") |
|
|
|
|
|
|
|
|
vectors_dir = restore_dir / "vectors" |
|
|
if vectors_dir.exists(): |
|
|
await self._restore_qdrant_collections(vectors_dir) |
|
|
|
|
|
logger.info(f"Restore completed successfully: {backup_id}") |
|
|
return True |
|
|
|
|
|
finally: |
|
|
|
|
|
if restore_dir.exists(): |
|
|
shutil.rmtree(restore_dir) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Restore failed: {str(e)}") |
|
|
return False |
|
|
|
|
|
async def _restore_qdrant_collections(self, vectors_dir: Path): |
|
|
"""Restore Qdrant collections from backup""" |
|
|
try: |
|
|
client = get_qdrant_client() |
|
|
|
|
|
for collection_file in vectors_dir.glob("*.json"): |
|
|
collection_name = collection_file.stem |
|
|
|
|
|
try: |
|
|
with open(collection_file, 'r') as f: |
|
|
collection_backup = json.load(f) |
|
|
|
|
|
collection_info = collection_backup["collection_info"] |
|
|
points_data = collection_backup["points"] |
|
|
|
|
|
|
|
|
try: |
|
|
client.delete_collection(collection_name) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
from qdrant_client.models import Distance, VectorParams |
|
|
|
|
|
|
|
|
vector_size = len(points_data[0]["vector"]) if points_data else 384 |
|
|
|
|
|
client.create_collection( |
|
|
collection_name=collection_name, |
|
|
vectors_config=VectorParams(size=vector_size, distance=Distance.COSINE) |
|
|
) |
|
|
|
|
|
|
|
|
batch_size = 100 |
|
|
for i in range(0, len(points_data), batch_size): |
|
|
batch = points_data[i:i + batch_size] |
|
|
|
|
|
points = [] |
|
|
for point_data in batch: |
|
|
from qdrant_client.models import PointStruct |
|
|
point = PointStruct( |
|
|
id=point_data["id"], |
|
|
vector=point_data["vector"], |
|
|
payload=point_data["payload"] |
|
|
) |
|
|
points.append(point) |
|
|
|
|
|
client.upsert(collection_name=collection_name, points=points) |
|
|
|
|
|
logger.info(f"Restored collection '{collection_name}' with {len(points_data)} points") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to restore collection '{collection_name}': {str(e)}") |
|
|
continue |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Vector collections restore failed: {str(e)}") |
|
|
|
|
|
async def _update_backup_registry(self, backup_metadata: BackupMetadata): |
|
|
"""Update backup registry with new backup information""" |
|
|
registry_file = self.backup_dir / "backup_registry.json" |
|
|
|
|
|
try: |
|
|
|
|
|
if registry_file.exists(): |
|
|
with open(registry_file, 'r') as f: |
|
|
registry = json.load(f) |
|
|
else: |
|
|
registry = {"backups": []} |
|
|
|
|
|
|
|
|
registry["backups"].append(backup_metadata.to_dict()) |
|
|
|
|
|
|
|
|
registry["backups"].sort(key=lambda x: x["timestamp"], reverse=True) |
|
|
|
|
|
|
|
|
registry["backups"] = registry["backups"][:self.max_backups] |
|
|
|
|
|
|
|
|
with open(registry_file, 'w') as f: |
|
|
json.dump(registry, f, indent=2) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to update backup registry: {str(e)}") |
|
|
|
|
|
async def list_backups(self) -> List[BackupMetadata]: |
|
|
"""List all available backups""" |
|
|
registry_file = self.backup_dir / "backup_registry.json" |
|
|
|
|
|
try: |
|
|
if registry_file.exists(): |
|
|
with open(registry_file, 'r') as f: |
|
|
registry = json.load(f) |
|
|
|
|
|
backups = [] |
|
|
for backup_data in registry.get("backups", []): |
|
|
backup_metadata = BackupMetadata( |
|
|
backup_id=backup_data["backup_id"], |
|
|
timestamp=datetime.fromisoformat(backup_data["timestamp"]), |
|
|
backup_type=backup_data["backup_type"], |
|
|
file_path=backup_data["file_path"], |
|
|
file_size_bytes=backup_data["file_size_bytes"], |
|
|
checksum=backup_data["checksum"], |
|
|
database_records=backup_data["database_records"], |
|
|
vector_collections=backup_data["vector_collections"], |
|
|
status=backup_data["status"], |
|
|
error_message=backup_data.get("error_message") |
|
|
) |
|
|
backups.append(backup_metadata) |
|
|
|
|
|
return backups |
|
|
else: |
|
|
return [] |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to list backups: {str(e)}") |
|
|
return [] |
|
|
|
|
|
async def cleanup_old_backups(self, keep_count: int = None): |
|
|
"""Clean up old backup files""" |
|
|
if keep_count is None: |
|
|
keep_count = self.max_backups |
|
|
|
|
|
try: |
|
|
backups = await self.list_backups() |
|
|
|
|
|
|
|
|
backups.sort(key=lambda x: x.timestamp, reverse=True) |
|
|
|
|
|
|
|
|
for backup in backups[keep_count:]: |
|
|
try: |
|
|
if os.path.exists(backup.file_path): |
|
|
os.remove(backup.file_path) |
|
|
logger.info(f"Removed old backup: {backup.backup_id}") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to remove backup {backup.backup_id}: {str(e)}") |
|
|
|
|
|
|
|
|
kept_backups = backups[:keep_count] |
|
|
registry = {"backups": [backup.to_dict() for backup in kept_backups]} |
|
|
|
|
|
registry_file = self.backup_dir / "backup_registry.json" |
|
|
with open(registry_file, 'w') as f: |
|
|
json.dump(registry, f, indent=2) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Backup cleanup failed: {str(e)}") |
|
|
|
|
|
async def verify_backup_integrity(self, backup_id: str) -> bool: |
|
|
"""Verify backup file integrity""" |
|
|
try: |
|
|
backups = await self.list_backups() |
|
|
backup = next((b for b in backups if b.backup_id == backup_id), None) |
|
|
|
|
|
if not backup: |
|
|
logger.error(f"Backup not found: {backup_id}") |
|
|
return False |
|
|
|
|
|
if not os.path.exists(backup.file_path): |
|
|
logger.error(f"Backup file not found: {backup.file_path}") |
|
|
return False |
|
|
|
|
|
|
|
|
actual_size = os.path.getsize(backup.file_path) |
|
|
if actual_size != backup.file_size_bytes: |
|
|
logger.error(f"Backup file size mismatch: expected {backup.file_size_bytes}, got {actual_size}") |
|
|
return False |
|
|
|
|
|
|
|
|
actual_checksum = self._calculate_file_checksum(backup.file_path) |
|
|
if actual_checksum != backup.checksum: |
|
|
logger.error(f"Backup checksum mismatch: expected {backup.checksum}, got {actual_checksum}") |
|
|
return False |
|
|
|
|
|
logger.info(f"Backup integrity verified: {backup_id}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Backup verification failed: {str(e)}") |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
backup_manager = BackupManager() |
|
|
|
|
|
|
|
|
async def create_backup() -> BackupMetadata: |
|
|
"""Create a full backup - main entry point""" |
|
|
return await backup_manager.create_full_backup() |
|
|
|
|
|
|
|
|
async def restore_backup(backup_id: str) -> bool: |
|
|
"""Restore from backup - main entry point""" |
|
|
return await backup_manager.restore_from_backup(backup_id) |
|
|
|
|
|
|
|
|
async def list_available_backups() -> List[BackupMetadata]: |
|
|
"""List available backups - main entry point""" |
|
|
return await backup_manager.list_backups() |
|
|
|
|
|
|
|
|
async def verify_backup(backup_id: str) -> bool: |
|
|
"""Verify backup integrity - main entry point""" |
|
|
return await backup_manager.verify_backup_integrity(backup_id) |