Spaces:
Build error
Build error
| """ | |
| 回滚管理器 | |
| 负责修复前的自动备份、失败时的自动回滚、状态恢复和审计日志 | |
| """ | |
| import asyncio | |
| import json | |
| import sqlite3 | |
| import shutil | |
| import hashlib | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Any, Tuple | |
| from pathlib import Path | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| import logging | |
| import threading | |
| import zipfile | |
| import tarfile | |
| from data_models import SpaceInfo, ErrorInfo, RepairStrategy, RepairHistory | |
| from config import get_config | |
| class BackupType(Enum): | |
| """备份类型""" | |
| FILE = "file" | |
| DIRECTORY = "directory" | |
| GIT_STATE = "git_state" | |
| DATABASE = "database" | |
| CONFIGURATION = "configuration" | |
| class RollbackStatus(Enum): | |
| """回滚状态""" | |
| PENDING = "pending" | |
| IN_PROGRESS = "in_progress" | |
| COMPLETED = "completed" | |
| FAILED = "failed" | |
| PARTIAL = "partial" | |
| class BackupInfo: | |
| """备份信息""" | |
| backup_id: str | |
| space_id: str | |
| backup_type: BackupType | |
| original_path: str | |
| backup_path: str | |
| timestamp: datetime | |
| file_hash: Optional[str] = None | |
| size_bytes: Optional[int] = None | |
| description: str = "" | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| class RollbackInfo: | |
| """回滚信息""" | |
| rollback_id: str | |
| backup_id: str | |
| space_id: str | |
| rollback_type: BackupType | |
| status: RollbackStatus | |
| timestamp: datetime | |
| completed_at: Optional[datetime] = None | |
| error_message: Optional[str] = None | |
| affected_files: List[str] = field(default_factory=list) | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| class BackupStrategy: | |
| """备份策略""" | |
| def __init__(self, backup_dir: str = "backups"): | |
| self.logger = logging.getLogger(__name__) | |
| self.backup_dir = Path(backup_dir) | |
| self.backup_dir.mkdir(exist_ok=True) | |
| # 创建子目录 | |
| (self.backup_dir / "files").mkdir(exist_ok=True) | |
| (self.backup_dir / "git_states").mkdir(exist_ok=True) | |
| (self.backup_dir / "databases").mkdir(exist_ok=True) | |
| (self.backup_dir / "configs").mkdir(exist_ok=True) | |
| # 数据库初始化 | |
| self.db_path = self.backup_dir / "rollback.db" | |
| self._init_database() | |
| def _init_database(self) -> None: | |
| """初始化数据库""" | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| # 备份记录表 | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS backups ( | |
| backup_id TEXT PRIMARY KEY, | |
| space_id TEXT NOT NULL, | |
| backup_type TEXT NOT NULL, | |
| original_path TEXT NOT NULL, | |
| backup_path TEXT NOT NULL, | |
| timestamp TEXT NOT NULL, | |
| file_hash TEXT, | |
| size_bytes INTEGER, | |
| description TEXT, | |
| metadata TEXT | |
| ) | |
| """) | |
| # 回滚记录表 | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS rollbacks ( | |
| rollback_id TEXT PRIMARY KEY, | |
| backup_id TEXT NOT NULL, | |
| space_id TEXT NOT NULL, | |
| rollback_type TEXT NOT NULL, | |
| status TEXT NOT NULL, | |
| timestamp TEXT NOT NULL, | |
| completed_at TEXT, | |
| error_message TEXT, | |
| affected_files TEXT, | |
| metadata TEXT, | |
| FOREIGN KEY (backup_id) REFERENCES backups (backup_id) | |
| ) | |
| """) | |
| # 审计日志表 | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS audit_log ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| event_type TEXT NOT NULL, | |
| space_id TEXT NOT NULL, | |
| timestamp TEXT NOT NULL, | |
| actor TEXT, | |
| action TEXT NOT NULL, | |
| details TEXT, | |
| success BOOLEAN | |
| ) | |
| """) | |
| conn.commit() | |
| async def create_backup(self, space_id: str, target_path: str, | |
| backup_type: BackupType, description: str = "") -> str: | |
| """创建备份""" | |
| backup_id = self._generate_backup_id(space_id, backup_type) | |
| backup_info = None | |
| try: | |
| if backup_type == BackupType.FILE: | |
| backup_info = await self._backup_file(space_id, target_path, backup_id, description) | |
| elif backup_type == BackupType.DIRECTORY: | |
| backup_info = await self._backup_directory(space_id, target_path, backup_id, description) | |
| elif backup_type == BackupType.GIT_STATE: | |
| backup_info = await self._backup_git_state(space_id, backup_id, description) | |
| elif backup_type == BackupType.DATABASE: | |
| backup_info = await self._backup_database(space_id, target_path, backup_id, description) | |
| elif backup_type == BackupType.CONFIGURATION: | |
| backup_info = await self._backup_configuration(space_id, target_path, backup_id, description) | |
| if backup_info: | |
| await self._save_backup_info(backup_info) | |
| await self._log_audit_event("backup_created", space_id, "创建备份", { | |
| "backup_id": backup_id, | |
| "type": backup_type.value, | |
| "target": target_path | |
| }, True) | |
| self.logger.info(f"备份创建成功: {backup_id}") | |
| return backup_id | |
| else: | |
| raise Exception("备份创建失败") | |
| except Exception as e: | |
| self.logger.error(f"创建备份失败: {e}") | |
| await self._log_audit_event("backup_failed", space_id, "创建备份失败", { | |
| "target": target_path, | |
| "type": backup_type.value, | |
| "error": str(e) | |
| }, False) | |
| raise | |
| async def _backup_file(self, space_id: str, file_path: str, backup_id: str, description: str) -> BackupInfo: | |
| """备份单个文件""" | |
| source_path = Path(file_path) | |
| if not source_path.exists(): | |
| raise FileNotFoundError(f"源文件不存在: {file_path}") | |
| # 生成备份路径 | |
| backup_subdir = self.backup_dir / "files" / space_id | |
| backup_subdir.mkdir(exist_ok=True) | |
| backup_filename = f"{backup_id}_{source_path.name}" | |
| backup_path = backup_subdir / backup_filename | |
| # 复制文件 | |
| shutil.copy2(source_path, backup_path) | |
| # 计算文件哈希和大小 | |
| file_hash = await self._calculate_file_hash(source_path) | |
| size_bytes = source_path.stat().st_size | |
| return BackupInfo( | |
| backup_id=backup_id, | |
| space_id=space_id, | |
| backup_type=BackupType.FILE, | |
| original_path=str(source_path.absolute()), | |
| backup_path=str(backup_path.absolute()), | |
| timestamp=datetime.now(), | |
| file_hash=file_hash, | |
| size_bytes=size_bytes, | |
| description=description, | |
| metadata={"original_filename": source_path.name} | |
| ) | |
| async def _backup_directory(self, space_id: str, dir_path: str, backup_id: str, description: str) -> BackupInfo: | |
| """备份目录""" | |
| source_path = Path(dir_path) | |
| if not source_path.exists() or not source_path.is_dir(): | |
| raise FileNotFoundError(f"源目录不存在或不是目录: {dir_path}") | |
| # 生成备份路径 | |
| backup_subdir = self.backup_dir / "files" / space_id | |
| backup_subdir.mkdir(exist_ok=True) | |
| backup_filename = f"{backup_id}_dir.tar.gz" | |
| backup_path = backup_subdir / backup_filename | |
| # 创建压缩备份 | |
| with tarfile.open(backup_path, "w:gz") as tar: | |
| tar.add(source_path, arcname=source_path.name) | |
| # 计算哈希和大小 | |
| file_hash = await self._calculate_file_hash(backup_path) | |
| size_bytes = backup_path.stat().st_size | |
| return BackupInfo( | |
| backup_id=backup_id, | |
| space_id=space_id, | |
| backup_type=BackupType.DIRECTORY, | |
| original_path=str(source_path.absolute()), | |
| backup_path=str(backup_path.absolute()), | |
| timestamp=datetime.now(), | |
| file_hash=file_hash, | |
| size_bytes=size_bytes, | |
| description=description, | |
| metadata={"original_dirname": source_path.name} | |
| ) | |
| async def _backup_git_state(self, space_id: str, backup_id: str, description: str) -> BackupInfo: | |
| """备份 Git 状态""" | |
| import git | |
| try: | |
| repo = git.Repo(".") | |
| # 生成备份路径 | |
| backup_subdir = self.backup_dir / "git_states" / space_id | |
| backup_subdir.mkdir(exist_ok=True) | |
| backup_path = backup_subdir / f"{backup_id}_git_state.json" | |
| # 获取 Git 状态信息 | |
| git_state = { | |
| "current_branch": repo.active_branch.name, | |
| "current_commit": repo.head.commit.hexsha, | |
| "untracked_files": repo.untracked_files, | |
| "modified_files": [item.a_path for item in repo.index.diff(None)], | |
| "staged_files": [item.a_path for item in repo.index.diff("HEAD")], | |
| "remote_urls": {remote.name: remote.url for remote in repo.remotes}, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| # 保存到文件 | |
| with open(backup_path, 'w', encoding='utf-8') as f: | |
| json.dump(git_state, f, indent=2, ensure_ascii=False) | |
| # 计算哈希和大小 | |
| file_hash = await self._calculate_file_hash(backup_path) | |
| size_bytes = backup_path.stat().st_size | |
| return BackupInfo( | |
| backup_id=backup_id, | |
| space_id=space_id, | |
| backup_type=BackupType.GIT_STATE, | |
| original_path=".", | |
| backup_path=str(backup_path.absolute()), | |
| timestamp=datetime.now(), | |
| file_hash=file_hash, | |
| size_bytes=size_bytes, | |
| description=description, | |
| metadata=git_state | |
| ) | |
| except Exception as e: | |
| raise Exception(f"备份 Git 状态失败: {e}") | |
| async def _backup_database(self, space_id: str, db_path: str, backup_id: str, description: str) -> BackupInfo: | |
| """备份数据库""" | |
| source_path = Path(db_path) | |
| if not source_path.exists(): | |
| raise FileNotFoundError(f"数据库文件不存在: {db_path}") | |
| # 生成备份路径 | |
| backup_subdir = self.backup_dir / "databases" / space_id | |
| backup_subdir.mkdir(exist_ok=True) | |
| backup_filename = f"{backup_id}_db.sqlite" | |
| backup_path = backup_subdir / backup_filename | |
| # 复制数据库文件 | |
| shutil.copy2(source_path, backup_path) | |
| # 计算哈希和大小 | |
| file_hash = await self._calculate_file_hash(backup_path) | |
| size_bytes = backup_path.stat().st_size | |
| return BackupInfo( | |
| backup_id=backup_id, | |
| space_id=space_id, | |
| backup_type=BackupType.DATABASE, | |
| original_path=str(source_path.absolute()), | |
| backup_path=str(backup_path.absolute()), | |
| timestamp=datetime.now(), | |
| file_hash=file_hash, | |
| size_bytes=size_bytes, | |
| description=description, | |
| metadata={"database_type": "sqlite"} | |
| ) | |
| async def _backup_configuration(self, space_id: str, config_path: str, backup_id: str, description: str) -> BackupInfo: | |
| """备份配置文件""" | |
| source_path = Path(config_path) | |
| if not source_path.exists(): | |
| raise FileNotFoundError(f"配置文件不存在: {config_path}") | |
| # 生成备份路径 | |
| backup_subdir = self.backup_dir / "configs" / space_id | |
| backup_subdir.mkdir(exist_ok=True) | |
| backup_filename = f"{backup_id}_config.json" | |
| backup_path = backup_subdir / backup_filename | |
| # 如果是 JSON 或 YAML 文件,验证格式后再备份 | |
| if source_path.suffix in ['.json', '.yaml', '.yml']: | |
| try: | |
| with open(source_path, 'r', encoding='utf-8') as f: | |
| if source_path.suffix == '.json': | |
| json.load(f) | |
| # YAML 验证可以在这里添加 | |
| except Exception as e: | |
| raise Exception(f"配置文件格式错误: {e}") | |
| # 复制文件 | |
| shutil.copy2(source_path, backup_path) | |
| # 计算哈希和大小 | |
| file_hash = await self._calculate_file_hash(backup_path) | |
| size_bytes = backup_path.stat().st_size | |
| return BackupInfo( | |
| backup_id=backup_id, | |
| space_id=space_id, | |
| backup_type=BackupType.CONFIGURATION, | |
| original_path=str(source_path.absolute()), | |
| backup_path=str(backup_path.absolute()), | |
| timestamp=datetime.now(), | |
| file_hash=file_hash, | |
| size_bytes=size_bytes, | |
| description=description, | |
| metadata={"config_type": source_path.suffix} | |
| ) | |
| async def _calculate_file_hash(self, file_path: Path) -> str: | |
| """计算文件哈希""" | |
| hash_sha256 = hashlib.sha256() | |
| with open(file_path, 'rb') as f: | |
| for chunk in iter(lambda: f.read(4096), b""): | |
| hash_sha256.update(chunk) | |
| return hash_sha256.hexdigest() | |
| def _generate_backup_id(self, space_id: str, backup_type: BackupType) -> str: | |
| """生成备份 ID""" | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| return f"{space_id}_{backup_type.value}_{timestamp}" | |
| async def _save_backup_info(self, backup_info: BackupInfo) -> None: | |
| """保存备份信息到数据库""" | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| INSERT INTO backups | |
| (backup_id, space_id, backup_type, original_path, backup_path, timestamp, | |
| file_hash, size_bytes, description, metadata) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| backup_info.backup_id, | |
| backup_info.space_id, | |
| backup_info.backup_type.value, | |
| backup_info.original_path, | |
| backup_info.backup_path, | |
| backup_info.timestamp.isoformat(), | |
| backup_info.file_hash, | |
| backup_info.size_bytes, | |
| backup_info.description, | |
| json.dumps(backup_info.metadata) | |
| )) | |
| conn.commit() | |
| async def get_backup_info(self, backup_id: str) -> Optional[BackupInfo]: | |
| """获取备份信息""" | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| SELECT backup_id, space_id, backup_type, original_path, backup_path, timestamp, | |
| file_hash, size_bytes, description, metadata | |
| FROM backups WHERE backup_id = ? | |
| """, (backup_id,)) | |
| row = cursor.fetchone() | |
| if row: | |
| return BackupInfo( | |
| backup_id=row[0], | |
| space_id=row[1], | |
| backup_type=BackupType(row[2]), | |
| original_path=row[3], | |
| backup_path=row[4], | |
| timestamp=datetime.fromisoformat(row[5]), | |
| file_hash=row[6], | |
| size_bytes=row[7], | |
| description=row[8] or "", | |
| metadata=json.loads(row[9]) if row[9] else {} | |
| ) | |
| return None | |
| async def list_backups(self, space_id: Optional[str] = None, | |
| backup_type: Optional[BackupType] = None, | |
| limit: int = 100) -> List[BackupInfo]: | |
| """列出备份""" | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| query = "SELECT * FROM backups WHERE 1=1" | |
| params = [] | |
| if space_id: | |
| query += " AND space_id = ?" | |
| params.append(space_id) | |
| if backup_type: | |
| query += " AND backup_type = ?" | |
| params.append(backup_type.value) | |
| query += " ORDER BY timestamp DESC LIMIT ?" | |
| params.append(limit) | |
| cursor.execute(query, params) | |
| rows = cursor.fetchall() | |
| backups = [] | |
| for row in rows: | |
| backups.append(BackupInfo( | |
| backup_id=row[0], | |
| space_id=row[1], | |
| backup_type=BackupType(row[2]), | |
| original_path=row[3], | |
| backup_path=row[4], | |
| timestamp=datetime.fromisoformat(row[5]), | |
| file_hash=row[6], | |
| size_bytes=row[7], | |
| description=row[8] or "", | |
| metadata=json.loads(row[9]) if row[9] else {} | |
| )) | |
| return backups | |
| class StateRecovery: | |
| """状态恢复""" | |
| def __init__(self, backup_strategy: BackupStrategy): | |
| self.logger = logging.getLogger(__name__) | |
| self.backup_strategy = backup_strategy | |
| async def restore_from_backup(self, backup_id: str, target_path: Optional[str] = None) -> bool: | |
| """从备份恢复""" | |
| try: | |
| backup_info = await self.backup_strategy.get_backup_info(backup_id) | |
| if not backup_info: | |
| raise FileNotFoundError(f"备份不存在: {backup_id}") | |
| backup_path = Path(backup_info.backup_path) | |
| if not backup_path.exists(): | |
| raise FileNotFoundError(f"备份文件不存在: {backup_path}") | |
| # 确定恢复目标路径 | |
| if target_path: | |
| restore_path = Path(target_path) | |
| else: | |
| restore_path = Path(backup_info.original_path) | |
| success = False | |
| if backup_info.backup_type == BackupType.FILE: | |
| success = await self._restore_file(backup_path, restore_path) | |
| elif backup_info.backup_type == BackupType.DIRECTORY: | |
| success = await self._restore_directory(backup_path, restore_path) | |
| elif backup_info.backup_type == BackupType.GIT_STATE: | |
| success = await self._restore_git_state(backup_info, restore_path) | |
| elif backup_info.backup_type == BackupType.DATABASE: | |
| success = await self._restore_file(backup_path, restore_path) | |
| elif backup_info.backup_type == BackupType.CONFIGURATION: | |
| success = await self._restore_file(backup_path, restore_path) | |
| if success: | |
| await self.backup_strategy._log_audit_event( | |
| "restore_success", backup_info.space_id, "从备份恢复", { | |
| "backup_id": backup_id, | |
| "target": str(restore_path) | |
| }, True | |
| ) | |
| self.logger.info(f"恢复成功: {backup_id} -> {restore_path}") | |
| else: | |
| await self.backup_strategy._log_audit_event( | |
| "restore_failed", backup_info.space_id, "恢复失败", { | |
| "backup_id": backup_id, | |
| "target": str(restore_path) | |
| }, False | |
| ) | |
| self.logger.error(f"恢复失败: {backup_id}") | |
| return success | |
| except Exception as e: | |
| self.logger.error(f"恢复异常: {e}") | |
| return False | |
| async def _restore_file(self, backup_path: Path, restore_path: Path) -> bool: | |
| """恢复文件""" | |
| try: | |
| # 确保目标目录存在 | |
| restore_path.parent.mkdir(parents=True, exist_ok=True) | |
| # 备份当前文件(如果存在) | |
| if restore_path.exists(): | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| current_backup = restore_path.with_suffix(f".{timestamp}.bak") | |
| shutil.copy2(restore_path, current_backup) | |
| # 恢复文件 | |
| shutil.copy2(backup_path, restore_path) | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"恢复文件失败: {e}") | |
| return False | |
| async def _restore_directory(self, backup_path: Path, restore_path: Path) -> bool: | |
| """恢复目录""" | |
| try: | |
| # 备份当前目录(如果存在) | |
| if restore_path.exists(): | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| current_backup = restore_path.with_name(f"{restore_path.name}_{timestamp}_bak") | |
| shutil.move(str(restore_path), str(current_backup)) | |
| # 解压恢复目录 | |
| with tarfile.open(backup_path, "r:gz") as tar: | |
| tar.extractall(restore_path.parent) | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"恢复目录失败: {e}") | |
| return False | |
| async def _restore_git_state(self, backup_info: BackupInfo, repo_path: Path) -> bool: | |
| """恢复 Git 状态""" | |
| try: | |
| import git | |
| git_state = backup_info.metadata | |
| repo = git.Repo(str(repo_path)) | |
| # 恢复到指定提交 | |
| if 'current_commit' in git_state: | |
| commit = git_state['current_commit'] | |
| repo.git.checkout(commit) | |
| # 清理未跟踪的文件 | |
| if git_state.get('untracked_files'): | |
| for untracked_file in git_state['untracked_files']: | |
| file_path = repo_path / untracked_file | |
| if file_path.exists(): | |
| file_path.unlink() | |
| # 恢复修改的文件 | |
| if git_state.get('modified_files'): | |
| repo.git.reset('--hard', 'HEAD') | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"恢复 Git 状态失败: {e}") | |
| return False | |
| class AuditLogger: | |
| """审计日志""" | |
| def __init__(self, backup_strategy: BackupStrategy): | |
| self.backup_strategy = backup_strategy | |
| self.logger = logging.getLogger(__name__) | |
| async def _log_audit_event(self, event_type: str, space_id: str, action: str, | |
| details: Dict[str, Any], success: bool, actor: str = "system") -> None: | |
| """记录审计事件""" | |
| try: | |
| with sqlite3.connect(self.backup_strategy.db_path) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| INSERT INTO audit_log | |
| (event_type, space_id, timestamp, actor, action, details, success) | |
| VALUES (?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| event_type, | |
| space_id, | |
| datetime.now().isoformat(), | |
| actor, | |
| action, | |
| json.dumps(details), | |
| success | |
| )) | |
| conn.commit() | |
| except Exception as e: | |
| self.logger.error(f"记录审计日志失败: {e}") | |
| async def get_audit_logs(self, space_id: Optional[str] = None, | |
| event_type: Optional[str] = None, | |
| limit: int = 100) -> List[Dict[str, Any]]: | |
| """获取审计日志""" | |
| try: | |
| with sqlite3.connect(self.backup_strategy.db_path) as conn: | |
| cursor = conn.cursor() | |
| query = "SELECT * FROM audit_log WHERE 1=1" | |
| params = [] | |
| if space_id: | |
| query += " AND space_id = ?" | |
| params.append(space_id) | |
| if event_type: | |
| query += " AND event_type = ?" | |
| params.append(event_type) | |
| query += " ORDER BY timestamp DESC LIMIT ?" | |
| params.append(limit) | |
| cursor.execute(query, params) | |
| rows = cursor.fetchall() | |
| logs = [] | |
| for row in rows: | |
| logs.append({ | |
| "id": row[0], | |
| "event_type": row[1], | |
| "space_id": row[2], | |
| "timestamp": row[3], | |
| "actor": row[4], | |
| "action": row[5], | |
| "details": json.loads(row[6]) if row[6] else {}, | |
| "success": bool(row[7]) | |
| }) | |
| return logs | |
| except Exception as e: | |
| self.logger.error(f"获取审计日志失败: {e}") | |
| return [] | |
| async def generate_audit_report(self, space_id: str, | |
| start_date: Optional[datetime] = None, | |
| end_date: Optional[datetime] = None) -> Dict[str, Any]: | |
| """生成审计报告""" | |
| try: | |
| with sqlite3.connect(self.backup_strategy.db_path) as conn: | |
| cursor = conn.cursor() | |
| query = """ | |
| SELECT event_type, COUNT(*) as count, | |
| SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) as success_count, | |
| SUM(CASE WHEN success = 0 THEN 1 ELSE 0 END) as failure_count | |
| FROM audit_log | |
| WHERE space_id = ? | |
| """ | |
| params = [space_id] | |
| if start_date: | |
| query += " AND timestamp >= ?" | |
| params.append(start_date.isoformat()) | |
| if end_date: | |
| query += " AND timestamp <= ?" | |
| params.append(end_date.isoformat()) | |
| query += " GROUP BY event_type" | |
| cursor.execute(query, params) | |
| rows = cursor.fetchall() | |
| report = { | |
| "space_id": space_id, | |
| "period": { | |
| "start": start_date.isoformat() if start_date else None, | |
| "end": end_date.isoformat() if end_date else None | |
| }, | |
| "summary": {}, | |
| "total_events": 0, | |
| "total_success": 0, | |
| "total_failure": 0 | |
| } | |
| for row in rows: | |
| event_type, count, success_count, failure_count = row | |
| report["summary"][event_type] = { | |
| "total": count, | |
| "success": success_count, | |
| "failure": failure_count, | |
| "success_rate": success_count / count if count > 0 else 0 | |
| } | |
| report["total_events"] += count | |
| report["total_success"] += success_count | |
| report["total_failure"] += failure_count | |
| if report["total_events"] > 0: | |
| report["overall_success_rate"] = report["total_success"] / report["total_events"] | |
| else: | |
| report["overall_success_rate"] = 0 | |
| return report | |
| except Exception as e: | |
| self.logger.error(f"生成审计报告失败: {e}") | |
| return {} | |
| class RollbackManager: | |
| """回滚管理器主类""" | |
| def __init__(self, backup_dir: str = "backups"): | |
| self.logger = logging.getLogger(__name__) | |
| self.backup_strategy = BackupStrategy(backup_dir) | |
| self.state_recovery = StateRecovery(self.backup_strategy) | |
| self.audit_logger = AuditLogger(self.backup_strategy) | |
| # 回滚统计 | |
| self.rollback_stats = { | |
| "total_rollbacks": 0, | |
| "successful_rollbacks": 0, | |
| "failed_rollbacks": 0 | |
| } | |
| async def create_backup_set(self, space_id: str, targets: List[Tuple[str, BackupType]], | |
| description: str = "") -> List[str]: | |
| """创建备份集合""" | |
| backup_ids = [] | |
| try: | |
| for target_path, backup_type in targets: | |
| backup_id = await self.backup_strategy.create_backup( | |
| space_id, target_path, backup_type, f"{description} - {backup_type.value}" | |
| ) | |
| backup_ids.append(backup_id) | |
| self.logger.info(f"备份集合创建成功: {space_id} - {len(backup_ids)} 个备份") | |
| return backup_ids | |
| except Exception as e: | |
| self.logger.error(f"创建备份集合失败: {e}") | |
| raise | |
| async def execute_rollback(self, backup_id: str, target_path: Optional[str] = None) -> bool: | |
| """执行回滚""" | |
| rollback_id = self._generate_rollback_id() | |
| try: | |
| # 记录回滚开始 | |
| await self._record_rollback_start(rollback_id, backup_id) | |
| # 执行恢复 | |
| success = await self.state_recovery.restore_from_backup(backup_id, target_path) | |
| # 记录回滚结果 | |
| await self._record_rollback_complete(rollback_id, success) | |
| # 更新统计 | |
| self.rollback_stats["total_rollbacks"] += 1 | |
| if success: | |
| self.rollback_stats["successful_rollbacks"] += 1 | |
| self.logger.info(f"回滚成功: {backup_id}") | |
| else: | |
| self.rollback_stats["failed_rollbacks"] += 1 | |
| self.logger.error(f"回滚失败: {backup_id}") | |
| return success | |
| except Exception as e: | |
| self.logger.error(f"回滚执行异常: {e}") | |
| await self._record_rollback_complete(rollback_id, False, str(e)) | |
| self.rollback_stats["total_rollbacks"] += 1 | |
| self.rollback_stats["failed_rollbacks"] += 1 | |
| return False | |
| def _generate_rollback_id(self) -> str: | |
| """生成回滚 ID""" | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") | |
| return f"rollback_{timestamp}" | |
| async def _record_rollback_start(self, rollback_id: str, backup_id: str) -> None: | |
| """记录回滚开始""" | |
| backup_info = await self.backup_strategy.get_backup_info(backup_id) | |
| if not backup_info: | |
| raise Exception(f"备份信息不存在: {backup_id}") | |
| with sqlite3.connect(self.backup_strategy.db_path) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| INSERT INTO rollbacks | |
| (rollback_id, backup_id, space_id, rollback_type, status, timestamp) | |
| VALUES (?, ?, ?, ?, ?, ?) | |
| """, ( | |
| rollback_id, | |
| backup_id, | |
| backup_info.space_id, | |
| backup_info.backup_type.value, | |
| RollbackStatus.PENDING.value, | |
| datetime.now().isoformat() | |
| )) | |
| conn.commit() | |
| async def _record_rollback_complete(self, rollback_id: str, success: bool, | |
| error_message: Optional[str] = None) -> None: | |
| """记录回滚完成""" | |
| status = RollbackStatus.COMPLETED if success else RollbackStatus.FAILED | |
| with sqlite3.connect(self.backup_strategy.db_path) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| UPDATE rollbacks | |
| SET status = ?, completed_at = ?, error_message = ? | |
| WHERE rollback_id = ? | |
| """, ( | |
| status.value, | |
| datetime.now().isoformat(), | |
| error_message, | |
| rollback_id | |
| )) | |
| conn.commit() | |
| async def cleanup_old_backups(self, days: int = 30) -> None: | |
| """清理旧备份""" | |
| try: | |
| cutoff_date = datetime.now() - timedelta(days=days) | |
| # 获取需要清理的备份 | |
| backups = await self.backup_strategy.list_backups() | |
| old_backups = [ | |
| backup for backup in backups | |
| if backup.timestamp < cutoff_date | |
| ] | |
| for backup in old_backups: | |
| try: | |
| # 删除备份文件 | |
| backup_path = Path(backup.backup_path) | |
| if backup_path.exists(): | |
| backup_path.unlink() | |
| # 删除备份信息文件(如果有) | |
| info_file = backup_path.with_suffix('.json') | |
| if info_file.exists(): | |
| info_file.unlink() | |
| # 删除数据库记录 | |
| with sqlite3.connect(self.backup_strategy.db_path) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute("DELETE FROM backups WHERE backup_id = ?", (backup.backup_id,)) | |
| cursor.execute("DELETE FROM rollbacks WHERE backup_id = ?", (backup.backup_id,)) | |
| conn.commit() | |
| self.logger.info(f"清理旧备份: {backup.backup_id}") | |
| except Exception as e: | |
| self.logger.error(f"清理备份失败 {backup.backup_id}: {e}") | |
| except Exception as e: | |
| self.logger.error(f"清理旧备份异常: {e}") | |
| def get_rollback_stats(self) -> Dict[str, Any]: | |
| """获取回滚统计""" | |
| return self.rollback_stats.copy() | |
| async def get_rollback_history(self, space_id: Optional[str] = None, | |
| limit: int = 50) -> List[RollbackInfo]: | |
| """获取回滚历史""" | |
| try: | |
| with sqlite3.connect(self.backup_strategy.db_path) as conn: | |
| cursor = conn.cursor() | |
| query = "SELECT * FROM rollbacks WHERE 1=1" | |
| params = [] | |
| if space_id: | |
| query += " AND space_id = ?" | |
| params.append(space_id) | |
| query += " ORDER BY timestamp DESC LIMIT ?" | |
| params.append(limit) | |
| cursor.execute(query, params) | |
| rows = cursor.fetchall() | |
| rollbacks = [] | |
| for row in rows: | |
| rollbacks.append(RollbackInfo( | |
| rollback_id=row[0], | |
| backup_id=row[1], | |
| space_id=row[2], | |
| rollback_type=BackupType(row[3]), | |
| rollback_status=RollbackStatus(row[4]), | |
| timestamp=datetime.fromisoformat(row[5]), | |
| completed_at=datetime.fromisoformat(row[6]) if row[6] else None, | |
| error_message=row[7], | |
| affected_files=json.loads(row[8]) if row[8] else [], | |
| metadata=json.loads(row[9]) if row[9] else {} | |
| )) | |
| return rollbacks | |
| except Exception as e: | |
| self.logger.error(f"获取回滚历史失败: {e}") | |
| return [] | |
| if __name__ == "__main__": | |
| # 示例用法 | |
| async def main(): | |
| # 创建回滚管理器 | |
| rollback_manager = RollbackManager("test_backups") | |
| # 创建备份 | |
| backup_id = await rollback_manager.backup_strategy.create_backup( | |
| "test-space", | |
| "example.txt", | |
| BackupType.FILE, | |
| "测试备份" | |
| ) | |
| print(f"创建备份: {backup_id}") | |
| # 执行回滚 | |
| success = await rollback_manager.execute_rollback(backup_id) | |
| print(f"回滚结果: {success}") | |
| # 获取统计信息 | |
| stats = rollback_manager.get_rollback_stats() | |
| print(f"统计信息: {stats}") | |
| # 获取审计日志 | |
| logs = await rollback_manager.audit_logger.get_audit_logs("test-space") | |
| print(f"审计日志数量: {len(logs)}") | |
| asyncio.run(main()) |