import os import shutil import json import tarfile from datetime import datetime from typing import Dict, Any, List from bson import ObjectId from ..db.database import db from ..utils.logger import logger class BackupService: def __init__(self): self.backup_dir = "backups" self._ensure_backup_dir() def _ensure_backup_dir(self): """Ensure backup directory exists""" if not os.path.exists(self.backup_dir): os.makedirs(self.backup_dir) async def create_backup(self, include_files: bool = True) -> Dict[str, Any]: """Create a new system backup""" try: timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") backup_id = str(ObjectId()) backup_name = f"backup_{timestamp}_{backup_id}" backup_path = os.path.join(self.backup_dir, backup_name) # Create backup directory os.makedirs(backup_path, exist_ok=True) # Backup database collections db_backup = {} for collection in await db.db.list_collection_names(): docs = await db.db[collection].find().to_list(None) db_backup[collection] = [ {**doc, "_id": str(doc["_id"])} for doc in docs ] # Save database backup with open(os.path.join(backup_path, "database.json"), "w") as f: json.dump(db_backup, f, default=str) # Backup files if requested if include_files: uploads_dir = "uploads" if os.path.exists(uploads_dir): shutil.copytree( uploads_dir, os.path.join(backup_path, "uploads"), dirs_exist_ok=True ) # Create archive archive_path = f"{backup_path}.tar.gz" with tarfile.open(archive_path, "w:gz") as tar: tar.add(backup_path, arcname=os.path.basename(backup_path)) # Clean up temporary directory shutil.rmtree(backup_path) # Record backup in database backup_info = { "_id": backup_id, "filename": f"{backup_name}.tar.gz", "path": archive_path, "created_at": datetime.utcnow(), "size": os.path.getsize(archive_path), "includes_files": include_files } await db.db["backup_history"].insert_one(backup_info) return { "id": backup_id, "path": archive_path, "size": backup_info["size"], "created_at": backup_info["created_at"] } except Exception as e: logger.error(f"Backup creation failed: {str(e)}") raise async def restore_backup(self, backup_path: str) -> Dict[str, Any]: """Restore system from a backup""" try: if not os.path.exists(backup_path): raise FileNotFoundError("Backup file not found") # Create temporary restoration directory restore_dir = f"restore_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}" os.makedirs(restore_dir, exist_ok=True) # Extract archive with tarfile.open(backup_path, "r:gz") as tar: tar.extractall(restore_dir) backup_contents = os.listdir(restore_dir)[0] backup_root = os.path.join(restore_dir, backup_contents) # Restore database with open(os.path.join(backup_root, "database.json"), "r") as f: db_backup = json.load(f) # Clear existing collections for collection in await db.db.list_collection_names(): await db.db[collection].delete_many({}) # Restore collections for collection, docs in db_backup.items(): if docs: # Convert string IDs back to ObjectId for doc in docs: doc["_id"] = ObjectId(doc["_id"]) await db.db[collection].insert_many(docs) # Restore files if present uploads_source = os.path.join(backup_root, "uploads") if os.path.exists(uploads_source): if os.path.exists("uploads"): shutil.rmtree("uploads") shutil.copytree(uploads_source, "uploads") # Clean up shutil.rmtree(restore_dir) return { "success": True, "collections_restored": len(db_backup), "files_restored": os.path.exists(uploads_source) } except Exception as e: logger.error(f"Backup restoration failed: {str(e)}") raise finally: if os.path.exists(restore_dir): shutil.rmtree(restore_dir) async def list_backups(self) -> List[Dict[str, Any]]: """List all available backups""" try: backups = await db.db["backup_history"].find().sort("created_at", -1).to_list(None) return [ { "id": str(backup["_id"]), "filename": backup["filename"], "created_at": backup["created_at"], "size": backup["size"], "includes_files": backup["includes_files"] } for backup in backups ] except Exception as e: logger.error(f"Failed to list backups: {str(e)}") raise async def delete_backup(self, backup_id: str) -> bool: """Delete a backup""" try: backup = await db.db["backup_history"].find_one({"_id": backup_id}) if not backup: return False # Delete the physical backup file if os.path.exists(backup["path"]): os.remove(backup["path"]) # Remove from database await db.db["backup_history"].delete_one({"_id": backup_id}) return True except Exception as e: logger.error(f"Failed to delete backup: {str(e)}") raise backup = BackupService()