Spaces:
Paused
Paused
| import os | |
| import shutil | |
| import json | |
| import tarfile | |
| from datetime import datetime | |
| from typing import Dict, Any, List | |
| from bson import ObjectId | |
| from ..db.database import db | |
| from ..utils.logger import logger | |
| class BackupService: | |
| def __init__(self): | |
| self.backup_dir = "backups" | |
| self._ensure_backup_dir() | |
| def _ensure_backup_dir(self): | |
| """Ensure backup directory exists""" | |
| if not os.path.exists(self.backup_dir): | |
| os.makedirs(self.backup_dir) | |
| async def create_backup(self, include_files: bool = True) -> Dict[str, Any]: | |
| """Create a new system backup""" | |
| try: | |
| timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") | |
| backup_id = str(ObjectId()) | |
| backup_name = f"backup_{timestamp}_{backup_id}" | |
| backup_path = os.path.join(self.backup_dir, backup_name) | |
| # Create backup directory | |
| os.makedirs(backup_path, exist_ok=True) | |
| # Backup database collections | |
| db_backup = {} | |
| for collection in await db.db.list_collection_names(): | |
| docs = await db.db[collection].find().to_list(None) | |
| db_backup[collection] = [ | |
| {**doc, "_id": str(doc["_id"])} | |
| for doc in docs | |
| ] | |
| # Save database backup | |
| with open(os.path.join(backup_path, "database.json"), "w") as f: | |
| json.dump(db_backup, f, default=str) | |
| # Backup files if requested | |
| if include_files: | |
| uploads_dir = "uploads" | |
| if os.path.exists(uploads_dir): | |
| shutil.copytree( | |
| uploads_dir, | |
| os.path.join(backup_path, "uploads"), | |
| dirs_exist_ok=True | |
| ) | |
| # Create archive | |
| archive_path = f"{backup_path}.tar.gz" | |
| with tarfile.open(archive_path, "w:gz") as tar: | |
| tar.add(backup_path, arcname=os.path.basename(backup_path)) | |
| # Clean up temporary directory | |
| shutil.rmtree(backup_path) | |
| # Record backup in database | |
| backup_info = { | |
| "_id": backup_id, | |
| "filename": f"{backup_name}.tar.gz", | |
| "path": archive_path, | |
| "created_at": datetime.utcnow(), | |
| "size": os.path.getsize(archive_path), | |
| "includes_files": include_files | |
| } | |
| await db.db["backup_history"].insert_one(backup_info) | |
| return { | |
| "id": backup_id, | |
| "path": archive_path, | |
| "size": backup_info["size"], | |
| "created_at": backup_info["created_at"] | |
| } | |
| except Exception as e: | |
| logger.error(f"Backup creation failed: {str(e)}") | |
| raise | |
| async def restore_backup(self, backup_path: str) -> Dict[str, Any]: | |
| """Restore system from a backup""" | |
| try: | |
| if not os.path.exists(backup_path): | |
| raise FileNotFoundError("Backup file not found") | |
| # Create temporary restoration directory | |
| restore_dir = f"restore_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}" | |
| os.makedirs(restore_dir, exist_ok=True) | |
| # Extract archive | |
| with tarfile.open(backup_path, "r:gz") as tar: | |
| tar.extractall(restore_dir) | |
| backup_contents = os.listdir(restore_dir)[0] | |
| backup_root = os.path.join(restore_dir, backup_contents) | |
| # Restore database | |
| with open(os.path.join(backup_root, "database.json"), "r") as f: | |
| db_backup = json.load(f) | |
| # Clear existing collections | |
| for collection in await db.db.list_collection_names(): | |
| await db.db[collection].delete_many({}) | |
| # Restore collections | |
| for collection, docs in db_backup.items(): | |
| if docs: | |
| # Convert string IDs back to ObjectId | |
| for doc in docs: | |
| doc["_id"] = ObjectId(doc["_id"]) | |
| await db.db[collection].insert_many(docs) | |
| # Restore files if present | |
| uploads_source = os.path.join(backup_root, "uploads") | |
| if os.path.exists(uploads_source): | |
| if os.path.exists("uploads"): | |
| shutil.rmtree("uploads") | |
| shutil.copytree(uploads_source, "uploads") | |
| # Clean up | |
| shutil.rmtree(restore_dir) | |
| return { | |
| "success": True, | |
| "collections_restored": len(db_backup), | |
| "files_restored": os.path.exists(uploads_source) | |
| } | |
| except Exception as e: | |
| logger.error(f"Backup restoration failed: {str(e)}") | |
| raise | |
| finally: | |
| if os.path.exists(restore_dir): | |
| shutil.rmtree(restore_dir) | |
| async def list_backups(self) -> List[Dict[str, Any]]: | |
| """List all available backups""" | |
| try: | |
| backups = await db.db["backup_history"].find().sort("created_at", -1).to_list(None) | |
| return [ | |
| { | |
| "id": str(backup["_id"]), | |
| "filename": backup["filename"], | |
| "created_at": backup["created_at"], | |
| "size": backup["size"], | |
| "includes_files": backup["includes_files"] | |
| } | |
| for backup in backups | |
| ] | |
| except Exception as e: | |
| logger.error(f"Failed to list backups: {str(e)}") | |
| raise | |
| async def delete_backup(self, backup_id: str) -> bool: | |
| """Delete a backup""" | |
| try: | |
| backup = await db.db["backup_history"].find_one({"_id": backup_id}) | |
| if not backup: | |
| return False | |
| # Delete the physical backup file | |
| if os.path.exists(backup["path"]): | |
| os.remove(backup["path"]) | |
| # Remove from database | |
| await db.db["backup_history"].delete_one({"_id": backup_id}) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to delete backup: {str(e)}") | |
| raise | |
| backup = BackupService() |