Spaces:
Sleeping
Sleeping
| """ | |
| مدير النسخ الاحتياطية (Backup Manager) | |
| ========================================= | |
| ينشئ نسخاً احتياطية مُرقّمة لمشروع البيانات مع دعم النسخ التزايدي | |
| والاستعادة والتنظيف. | |
| القدرات: | |
| - إنشاء نسخ احتياطية مرقّمة بالتاريخ والوقت | |
| - إنشاء نسخ مضغوطة (ZIP) | |
| - دعم النسخ التزايدي (الملفات المعدّلة فقط) | |
| - استعادة من نسخة احتياطية | |
| - عرض قائمة النسخ الاحتياطية | |
| - تنظيف النسخ القديمة | |
| - تسجيل جميع العمليات | |
| """ | |
| import hashlib | |
| import json | |
| import logging | |
| import shutil | |
| import tarfile | |
| import zipfile | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Optional | |
| logger = logging.getLogger(__name__) | |
| class BackupManager: | |
| """ | |
| مدير النسخ الاحتياطية — ينشئ ويستعيد النسخ الاحتياطية المرقّمة. | |
| الاستخدام: | |
| manager = BackupManager() | |
| backup_path = manager.create_backup("/my/project", "/backups") | |
| manager.restore_backup(backup_path, "/restore/target") | |
| """ | |
| # ======== ثوابت ======== | |
| MANIFEST_FILENAME: str = ".backup_manifest.json" | |
| HASH_ALGORITHM: str = "sha256" | |
| BACKUP_TIMESTAMP_FORMAT: str = "%Y%m%d_%H%M%S" | |
| DEFAULT_MAX_SIZE_ZIP: int = 4 * 1024 * 1024 * 1024 # 4 جيجابايت | |
| def __init__( | |
| self, | |
| compress: bool = True, | |
| incremental: bool = False, | |
| exclude_patterns: Optional[list[str]] = None, | |
| log_file: Optional[str] = None, | |
| ) -> None: | |
| """ | |
| تهيئة مدير النسخ الاحتياطية. | |
| المعاملات: | |
| compress: ضغط النسخة الاحتياطية كـ ZIP | |
| incremental: تفعيل النسخ التزايدي (الملفات المعدّلة فقط) | |
| exclude_patterns: أنماط glob لاستبعاد ملفات معينة | |
| log_file: مسار ملف سجل العمليات (اختياري) | |
| """ | |
| self.compress: bool = compress | |
| self.incremental: bool = incremental | |
| self.exclude_patterns: list[str] = exclude_patterns or [] | |
| # إعداد تسجيل مخصص | |
| self._operation_log: list[dict] = [] | |
| if log_file: | |
| try: | |
| file_handler = logging.FileHandler(log_file, encoding="utf-8") | |
| file_handler.setLevel(logging.INFO) | |
| file_handler.setFormatter( | |
| logging.Formatter( | |
| "%(asctime)s | %(levelname)s | %(message)s", | |
| datefmt="%Y-%m-%d %H:%M:%S", | |
| ) | |
| ) | |
| logger.addHandler(file_handler) | |
| logger.info("تم تفعيل تسجيل العمليات في: %s", log_file) | |
| except PermissionError as exc: | |
| logger.warning("لا صلاحية لإنشاء ملف السجل: %s", exc) | |
| # تخزين التجزئات للنسخ التزايدي | |
| self._file_hashes: dict[str, str] = {} | |
| logger.info( | |
| "تم تهيئة مدير النسخ الاحتياطية — ضغط: %s | تزايدي: %s", | |
| compress, incremental, | |
| ) | |
| # =================================================================== | |
| # إنشاء نسخة احتياطية | |
| # =================================================================== | |
| def create_backup( | |
| self, | |
| source_dir: str | Path, | |
| backup_dir: str | Path, | |
| label: Optional[str] = None, | |
| ) -> str: | |
| """ | |
| ينشئ نسخة احتياطية كاملة لمجلد المصدر. | |
| المعاملات: | |
| source_dir: مجلد المصدر المراد نسخه | |
| backup_dir: مجلد حفظ النسخ الاحتياطية | |
| label: تسمية اختيارية للنسخة | |
| المعاد: | |
| مسار النسخة الاحتياطية المنشأة | |
| """ | |
| source = Path(source_dir).resolve() | |
| dest = Path(backup_dir).resolve() | |
| if not source.is_dir(): | |
| raise FileNotFoundError(f"مجلد المصدر غير موجود: {source}") | |
| # إنشاء مجلد النسخ الاحتياطية | |
| try: | |
| dest.mkdir(parents=True, exist_ok=True) | |
| except PermissionError as exc: | |
| raise PermissionError(f"لا صلاحية لإنشاء {dest}: {exc}") from exc | |
| # اسم النسخة الاحتياطية | |
| timestamp = datetime.now().strftime(self.BACKUP_TIMESTAMP_FORMAT) | |
| source_name = source.name | |
| label_suffix = f"_{label}" if label else "" | |
| if self.compress: | |
| backup_filename = f"{source_name}_backup_{timestamp}{label_suffix}.zip" | |
| backup_path = dest / backup_filename | |
| else: | |
| backup_filename = f"{source_name}_backup_{timestamp}{label_suffix}" | |
| backup_path = dest / backup_filename | |
| logger.info("بدء إنشاء نسخة احتياطية: %s", backup_path.name) | |
| start_time = datetime.now() | |
| total_files = 0 | |
| total_size = 0 | |
| skipped = 0 | |
| try: | |
| if self.compress: | |
| total_files, total_size, skipped = self._create_zip_backup( | |
| source, backup_path, | |
| ) | |
| else: | |
| total_files, total_size, skipped = self._create_dir_backup( | |
| source, backup_path, | |
| ) | |
| except Exception as exc: | |
| logger.error("فشل إنشاء النسخة الاحتياطية: %s", exc) | |
| # حذف الملف الجزئي | |
| if backup_path.exists(): | |
| try: | |
| if backup_path.is_dir(): | |
| shutil.rmtree(backup_path) | |
| else: | |
| backup_path.unlink() | |
| except OSError: | |
| pass | |
| raise | |
| elapsed = (datetime.now() - start_time).total_seconds() | |
| # تسجيل العملية | |
| operation = { | |
| "timestamp": start_time.isoformat(), | |
| "action": "create_backup", | |
| "source": str(source), | |
| "backup_path": str(backup_path.resolve()), | |
| "total_files": total_files, | |
| "total_size": total_size, | |
| "skipped": skipped, | |
| "elapsed_seconds": round(elapsed, 2), | |
| "compressed": self.compress, | |
| "incremental": self.incremental, | |
| "label": label, | |
| } | |
| self._operation_log.append(operation) | |
| self._save_manifest(backup_path, operation) | |
| logger.info( | |
| "تم إنشاء نسخة احتياطية بنجاح — المسار: %s | الملفات: %d | الحجم: %s | الوقت: %.1f ثانية", | |
| backup_path, total_files, self._format_size(total_size), elapsed, | |
| ) | |
| return str(backup_path.resolve()) | |
| def _create_zip_backup( | |
| self, | |
| source: Path, | |
| backup_path: Path, | |
| ) -> tuple[int, int, int]: | |
| """ | |
| ينشئ نسخة احتياطية مضغوطة كـ ZIP. | |
| المعاد: | |
| (عدد_الملفات، الحجم_الإجمالي، عدد_المتخطّى) | |
| """ | |
| total_files = 0 | |
| total_size = 0 | |
| skipped = 0 | |
| # جمع الملفات المراد نسخها | |
| files_to_backup: list[Path] = [] | |
| for item in source.rglob("*"): | |
| if not item.is_file(): | |
| continue | |
| if self._should_exclude(item, source): | |
| skipped += 1 | |
| continue | |
| if self.incremental and not self._is_file_changed(item, source): | |
| skipped += 1 | |
| continue | |
| files_to_backup.append(item) | |
| # إنشاء أرشيف ZIP | |
| try: | |
| with zipfile.ZipFile( | |
| backup_path, "w", | |
| zipfile.ZIP_DEFLATED, | |
| compresslevel=6, | |
| ) as zf: | |
| for file_path in files_to_backup: | |
| try: | |
| arcname = file_path.relative_to(source) | |
| zf.write(file_path, arcname) | |
| file_size = file_path.stat().st_size | |
| total_size += file_size | |
| total_files += 1 | |
| # تحديث التجزئة | |
| if self.incremental: | |
| rel = str(arcname) | |
| self._file_hashes[rel] = self._hash_file(file_path) | |
| except PermissionError as exc: | |
| logger.warning("تخطي %s (لا صلاحية): %s", file_path, exc) | |
| skipped += 1 | |
| except OSError as exc: | |
| logger.warning("تخطي %s (خطأ): %s", file_path, exc) | |
| skipped += 1 | |
| except PermissionError as exc: | |
| raise PermissionError(f"لا صلاحية لإنشاء {backup_path}: {exc}") from exc | |
| return total_files, total_size, skipped | |
| def _create_dir_backup( | |
| self, | |
| source: Path, | |
| backup_path: Path, | |
| ) -> tuple[int, int, int]: | |
| """ | |
| ينشئ نسخة احتياطية كمجلد (بدون ضغط). | |
| المعاد: | |
| (عدد_الملفات، الحجم_الإجمالي، عدد_المتخطّى) | |
| """ | |
| total_files = 0 | |
| total_size = 0 | |
| skipped = 0 | |
| try: | |
| backup_path.mkdir(parents=True, exist_ok=True) | |
| except PermissionError as exc: | |
| raise PermissionError(f"لا صلاحية لإنشاء {backup_path}: {exc}") from exc | |
| for item in source.rglob("*"): | |
| if self._should_exclude(item, source): | |
| skipped += 1 | |
| continue | |
| rel = item.relative_to(source) | |
| target = backup_path / rel | |
| try: | |
| if item.is_dir(): | |
| target.mkdir(parents=True, exist_ok=True) | |
| elif item.is_file(): | |
| if self.incremental and not self._is_file_changed(item, source): | |
| skipped += 1 | |
| continue | |
| target.parent.mkdir(parents=True, exist_ok=True) | |
| shutil.copy2(item, target) | |
| file_size = item.stat().st_size | |
| total_size += file_size | |
| total_files += 1 | |
| if self.incremental: | |
| self._file_hashes[str(rel)] = self._hash_file(item) | |
| except PermissionError as exc: | |
| logger.warning("تخطي %s (لا صلاحية): %s", item, exc) | |
| skipped += 1 | |
| except OSError as exc: | |
| logger.warning("تخطي %s (خطأ): %s", item, exc) | |
| skipped += 1 | |
| return total_files, total_size, skipped | |
| # =================================================================== | |
| # استعادة نسخة احتياطية | |
| # =================================================================== | |
| def restore_backup( | |
| self, | |
| backup_path: str | Path, | |
| target_dir: str | Path, | |
| overwrite: bool = False, | |
| ) -> dict: | |
| """ | |
| يستعيد نسخة احتياطية إلى مجلد الهدف. | |
| المعاملات: | |
| backup_path: مسار النسخة الاحتياطية | |
| target_dir: مجلد الاستعادة | |
| overwrite: الكتابة فوق الملفات الموجودة | |
| المعاد: | |
| تقرير الاستعادة: | |
| {"restored_files": int, "skipped": int, "errors": list} | |
| """ | |
| backup = Path(backup_path).resolve() | |
| target = Path(target_dir).resolve() | |
| if not backup.exists(): | |
| raise FileNotFoundError(f"النسخة الاحتياطية غير موجودة: {backup}") | |
| # إنشاء مجلد الهدف | |
| try: | |
| target.mkdir(parents=True, exist_ok=True) | |
| except PermissionError as exc: | |
| raise PermissionError(f"لا صلاحية لإنشاء {target}: {exc}") from exc | |
| report: dict = { | |
| "restored_files": 0, | |
| "skipped": 0, | |
| "errors": [], | |
| } | |
| start_time = datetime.now() | |
| logger.info("بدء استعادة من: %s إلى %s", backup.name, target) | |
| try: | |
| if backup.is_file() and backup.suffix.lower() == ".zip": | |
| self._restore_from_zip(backup, target, overwrite, report) | |
| elif backup.is_dir(): | |
| self._restore_from_dir(backup, target, overwrite, report) | |
| else: | |
| raise ValueError(f"نوع نسخة احتياطية غير معروف: {backup}") | |
| except Exception as exc: | |
| logger.error("فشل استعادة النسخة الاحتياطية: %s", exc) | |
| raise | |
| elapsed = (datetime.now() - start_time).total_seconds() | |
| # تسجيل العملية | |
| operation = { | |
| "timestamp": start_time.isoformat(), | |
| "action": "restore_backup", | |
| "backup_path": str(backup), | |
| "target": str(target), | |
| "restored_files": report["restored_files"], | |
| "skipped": report["skipped"], | |
| "errors_count": len(report["errors"]), | |
| "elapsed_seconds": round(elapsed, 2), | |
| } | |
| self._operation_log.append(operation) | |
| logger.info( | |
| "اكتملت الاستعادة — مستعادة: %d | متخطاة: %d | أخطاء: %d | الوقت: %.1f ثانية", | |
| report["restored_files"], report["skipped"], | |
| len(report["errors"]), elapsed, | |
| ) | |
| return report | |
| def _restore_from_zip( | |
| self, | |
| backup: Path, | |
| target: Path, | |
| overwrite: bool, | |
| report: dict, | |
| ) -> None: | |
| """يستعيد من أرشيف ZIP.""" | |
| try: | |
| with zipfile.ZipFile(backup, "r") as zf: | |
| for info in zf.infolist(): | |
| if info.is_dir(): | |
| (target / info.filename).mkdir(parents=True, exist_ok=True) | |
| continue | |
| # حماية من Path Traversal | |
| member_path = Path(info.filename) | |
| if member_path.is_absolute() or ".." in member_path.parts: | |
| report["errors"].append({"file": info.filename, "error": "مسار خطر"}) | |
| continue | |
| dest_file = target / info.filename | |
| if dest_file.exists() and not overwrite: | |
| report["skipped"] += 1 | |
| continue | |
| try: | |
| dest_file.parent.mkdir(parents=True, exist_ok=True) | |
| with zf.open(info) as src, open(dest_file, "wb") as dst: | |
| shutil.copyfileobj(src, dst) | |
| report["restored_files"] += 1 | |
| except PermissionError as exc: | |
| report["errors"].append({"file": info.filename, "error": str(exc)}) | |
| except OSError as exc: | |
| report["errors"].append({"file": info.filename, "error": str(exc)}) | |
| except zipfile.BadZipFile: | |
| raise ValueError(f"ملف ZIP تالف: {backup}") | |
| def _restore_from_dir( | |
| self, | |
| backup: Path, | |
| target: Path, | |
| overwrite: bool, | |
| report: dict, | |
| ) -> None: | |
| """يستعيذ من مجلد نسخة احتياطية.""" | |
| for item in backup.rglob("*"): | |
| rel = item.relative_to(backup) | |
| dest = target / rel | |
| try: | |
| if item.is_dir(): | |
| dest.mkdir(parents=True, exist_ok=True) | |
| elif item.is_file(): | |
| if dest.exists() and not overwrite: | |
| report["skipped"] += 1 | |
| continue | |
| dest.parent.mkdir(parents=True, exist_ok=True) | |
| shutil.copy2(item, dest) | |
| report["restored_files"] += 1 | |
| except PermissionError as exc: | |
| report["errors"].append({"file": str(item), "error": str(exc)}) | |
| except OSError as exc: | |
| report["errors"].append({"file": str(item), "error": str(exc)}) | |
| # =================================================================== | |
| # عرض النسخ الاحتياطية | |
| # =================================================================== | |
| def list_backups(self, backup_dir: str | Path) -> list[dict]: | |
| """ | |
| يعرض قائمة بجميع النسخ الاحتياطية في مجلد. | |
| المعاملات: | |
| backup_dir: مجلد النسخ الاحتياطية | |
| المعاد: | |
| قائمة بمعلومات النسخ الاحتياطية: | |
| [{"name": str, "path": str, "size": int, "created": str, "type": str}, ...] | |
| """ | |
| dir_path = Path(backup_dir).resolve() | |
| if not dir_path.is_dir(): | |
| raise FileNotFoundError(f"مجلد النسخ الاحتياطية غير موجود: {dir_path}") | |
| backups: list[dict] = [] | |
| for item in sorted(dir_path.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True): | |
| try: | |
| if item.is_file() and item.suffix.lower() == ".zip": | |
| size = item.stat().st_size | |
| created = datetime.fromtimestamp(item.stat().st_ctime).isoformat() | |
| # محاولة قراءة البيانات الوصفية من ملف ZIP | |
| manifest = self._read_manifest_from_zip(item) | |
| extra_info = {} | |
| if manifest: | |
| extra_info = { | |
| "source": manifest.get("source", ""), | |
| "total_files": manifest.get("total_files", 0), | |
| "label": manifest.get("label", ""), | |
| } | |
| backups.append({ | |
| "name": item.name, | |
| "path": str(item.resolve()), | |
| "size": size, | |
| "size_formatted": self._format_size(size), | |
| "created": created, | |
| "type": "zip", | |
| **extra_info, | |
| }) | |
| elif item.is_dir() and "backup_" in item.name.lower(): | |
| # حساب حجم المجلد | |
| size = sum(f.stat().st_size for f in item.rglob("*") if f.is_file()) | |
| created = datetime.fromtimestamp(item.stat().st_ctime).isoformat() | |
| backups.append({ | |
| "name": item.name, | |
| "path": str(item.resolve()), | |
| "size": size, | |
| "size_formatted": self._format_size(size), | |
| "created": created, | |
| "type": "directory", | |
| }) | |
| except PermissionError as exc: | |
| logger.warning("لا صلاحية لقراءة %s: %s", item, exc) | |
| except OSError as exc: | |
| logger.warning("خطأ في %s: %s", item, exc) | |
| logger.info("عرض %d نسخة احتياطية في %s", len(backups), dir_path) | |
| return backups | |
| # =================================================================== | |
| # تنظيف النسخ القديمة | |
| # =================================================================== | |
| def cleanup_old_backups( | |
| self, | |
| backup_dir: str | Path, | |
| keep_last: int = 5, | |
| dry_run: bool = False, | |
| ) -> dict: | |
| """ | |
| يحذف النسخ الاحتياطية القديمة مع الاحتفاظ بأحدثها. | |
| المعاملات: | |
| backup_dir: مجلد النسخ الاحتياطية | |
| keep_last: عدد النسخ المراد الاحتفاظ بها | |
| dry_run: عرض ما سيُحذف بدون حذف فعلًا | |
| المعاد: | |
| تقرير التنظيف: | |
| {"kept": [str, ...], "deleted": [str, ...], "freed_space": int} | |
| """ | |
| dir_path = Path(backup_dir).resolve() | |
| if not dir_path.is_dir(): | |
| raise FileNotFoundError(f"مجلد النسخ الاحتياطية غير موجود: {dir_path}") | |
| all_backups = self.list_backups(dir_path) | |
| report: dict = { | |
| "kept": [], | |
| "deleted": [], | |
| "freed_space": 0, | |
| } | |
| if len(all_backups) <= keep_last: | |
| report["kept"] = [b["path"] for b in all_backups] | |
| logger.info("لا حاجة للتنظيف — النسخ (%d) ≤ الحد المطلوب (%d)", len(all_backups), keep_last) | |
| return report | |
| # النسخ المراد حذفها (الأقدم) | |
| to_delete = all_backups[keep_last:] | |
| to_keep = all_backups[:keep_last] | |
| for backup_info in to_delete: | |
| backup_path = Path(backup_info["path"]) | |
| report["deleted"].append(str(backup_path)) | |
| report["freed_space"] += backup_info["size"] | |
| if not dry_run: | |
| try: | |
| if backup_path.is_dir(): | |
| shutil.rmtree(backup_path) | |
| elif backup_path.is_file(): | |
| backup_path.unlink() | |
| logger.info("تم حذف: %s", backup_path.name) | |
| except PermissionError as exc: | |
| logger.error("لا صلاحية لحذف %s: %s", backup_path, exc) | |
| except OSError as exc: | |
| logger.error("خطأ أثناء حذف %s: %s", backup_path, exc) | |
| else: | |
| logger.info("[محاكاة] سيتم حذف: %s", backup_path.name) | |
| report["kept"] = [b["path"] for b in to_keep] | |
| # تسجيل العملية | |
| operation = { | |
| "timestamp": datetime.now().isoformat(), | |
| "action": "cleanup_old_backups", | |
| "backup_dir": str(dir_path), | |
| "keep_last": keep_last, | |
| "deleted_count": len(report["deleted"]), | |
| "freed_space": report["freed_space"], | |
| "dry_run": dry_run, | |
| } | |
| self._operation_log.append(operation) | |
| logger.info( | |
| "اكتمل التنظيف — محذوف: %d | محفوظ: %d | مساحة محررة: %s", | |
| len(report["deleted"]), len(report["kept"]), | |
| self._format_size(report["freed_space"]), | |
| ) | |
| return report | |
| # =================================================================== | |
| # حجم النسخة الاحتياطية | |
| # =================================================================== | |
| def get_backup_size(self, backup_path: str | Path) -> dict: | |
| """ | |
| يعرض معلومات الحجم لنسخة احتياطية. | |
| المعاملات: | |
| backup_path: مسار النسخة الاحتياطية | |
| المعاد: | |
| {"path": str, "size_bytes": int, "size_formatted": str, "type": str} | |
| """ | |
| path = Path(backup_path).resolve() | |
| if not path.exists(): | |
| raise FileNotFoundError(f"النسخة الاحتياطية غير موجودة: {path}") | |
| try: | |
| if path.is_file(): | |
| size = path.stat().st_size | |
| btype = "zip" | |
| elif path.is_dir(): | |
| size = sum(f.stat().st_size for f in path.rglob("*") if f.is_file()) | |
| btype = "directory" | |
| else: | |
| return {"path": str(path), "size_bytes": 0, "size_formatted": "0 B", "type": "unknown"} | |
| except PermissionError as exc: | |
| logger.error("لا صلاحية لقراءة: %s", path) | |
| raise | |
| return { | |
| "path": str(path), | |
| "size_bytes": size, | |
| "size_formatted": self._format_size(size), | |
| "type": btype, | |
| } | |
| # =================================================================== | |
| # بيانات التجزئة (للنسخ التزايدي) | |
| # =================================================================== | |
| def _hash_file(self, file_path: Path) -> str: | |
| """ | |
| يحسب تجزئة SHA-256 لملف. | |
| المعاملات: | |
| file_path: مسار الملف | |
| المعاد: | |
| سلسلة التجزئة السداسية عشرية | |
| """ | |
| hasher = hashlib.new(self.HASH_ALGORITHM) | |
| try: | |
| with open(file_path, "rb") as f: | |
| # قراءة بأجزاء للمفاتيح الكبيرة | |
| for chunk in iter(lambda: f.read(65536), b""): | |
| hasher.update(chunk) | |
| return hasher.hexdigest() | |
| except PermissionError: | |
| logger.warning("لا صلاحية لتجزئة: %s", file_path) | |
| return "" | |
| except OSError as exc: | |
| logger.warning("خطأ أثناء تجزئة %s: %s", file_path, exc) | |
| return "" | |
| def _is_file_changed(self, file_path: Path, source: Path) -> bool: | |
| """ | |
| يتحقق مما إذا كان الملف قد تغيّر منذ آخر نسخة احتياطية. | |
| المعاملات: | |
| file_path: مسار الملف الحالي | |
| source: مجلد المصدر | |
| المعاد: | |
| True إذا كان الملف جديداً أو معدّلاً | |
| """ | |
| rel = str(file_path.relative_to(source)) | |
| current_hash = self._hash_file(file_path) | |
| if not current_hash: | |
| return True # في حالة الخطأ، نأخذ الاحتياط | |
| stored_hash = self._file_hashes.get(rel) | |
| if stored_hash is None or stored_hash != current_hash: | |
| return True | |
| return False | |
| def load_hashes(self, backup_dir: str | Path) -> None: | |
| """ | |
| يحمّل تجزئات الملفات من آخر نسخة احتياطية لتمكين النسخ التزايدي. | |
| المعاملات: | |
| backup_dir: مجلد النسخ الاحتياطية | |
| """ | |
| dir_path = Path(backup_dir).resolve() | |
| if not dir_path.is_dir(): | |
| return | |
| # البحث عن أحدث نسخة احتياطية | |
| backups = self.list_backups(dir_path) | |
| if not backups: | |
| logger.info("لا توجد نسخ احتياطية سابقة لتحميل التجزئات") | |
| return | |
| latest = backups[0] | |
| latest_path = Path(latest["path"]) | |
| if latest["type"] == "zip": | |
| hashes = self._read_hashes_from_zip(latest_path) | |
| elif latest["type"] == "directory": | |
| hashes = self._read_hashes_from_dir(latest_path) | |
| else: | |
| return | |
| if hashes: | |
| self._file_hashes = hashes | |
| logger.info("تم تحميل %d تجزئة من النسخة السابقة", len(hashes)) | |
| def _read_hashes_from_zip(self, zip_path: Path) -> dict[str, str]: | |
| """يقرأ تجزئات الملفات من أرشيف ZIP.""" | |
| hashes: dict[str, str] = {} | |
| try: | |
| with zipfile.ZipFile(zip_path, "r") as zf: | |
| manifest_name = self.MANIFEST_FILENAME | |
| if manifest_name in zf.namelist(): | |
| data = json.loads(zf.read(manifest_name)) | |
| hashes = data.get("file_hashes", {}) | |
| except (zipfile.BadZipFile, json.JSONDecodeError, KeyError) as exc: | |
| logger.warning("تعذرت قراءة التجزئات من ZIP: %s", exc) | |
| return hashes | |
| def _read_hashes_from_dir(self, dir_path: Path) -> dict[str, str]: | |
| """يقرأ تجزئات الملفات من مجلد نسخة احتياطية.""" | |
| manifest_path = dir_path / self.MANIFEST_FILENAME | |
| if manifest_path.is_file(): | |
| try: | |
| data = json.loads(manifest_path.read_text(encoding="utf-8")) | |
| return data.get("file_hashes", {}) | |
| except (json.JSONDecodeError, OSError) as exc: | |
| logger.warning("تعذرت قراءة التجزئات من المجلد: %s", exc) | |
| return {} | |
| # =================================================================== | |
| # البيانات الوصفية (Manifest) | |
| # =================================================================== | |
| def _save_manifest(self, backup_path: Path, info: dict) -> None: | |
| """يحفظ بيانات وصفية للنسخة الاحتياطية.""" | |
| manifest = { | |
| "version": "1.0", | |
| "created": info["timestamp"], | |
| "source": info["source"], | |
| "total_files": info["total_files"], | |
| "total_size": info["total_size"], | |
| "compressed": info["compressed"], | |
| "incremental": info["incremental"], | |
| "label": info.get("label", ""), | |
| "file_hashes": dict(self._file_hashes) if self.incremental else {}, | |
| } | |
| manifest_json = json.dumps(manifest, ensure_ascii=False, indent=2) | |
| try: | |
| if backup_path.is_file() and backup_path.suffix.lower() == ".zip": | |
| # إضافة البيانات الوصفية إلى أرشيف ZIP | |
| with zipfile.ZipFile(backup_path, "a") as zf: | |
| zf.writestr(self.MANIFEST_FILENAME, manifest_json) | |
| else: | |
| # حفظ كملف في مجلد النسخة الاحتياطية | |
| manifest_file = backup_path / self.MANIFEST_FILENAME | |
| manifest_file.write_text(manifest_json, encoding="utf-8") | |
| except Exception as exc: | |
| logger.warning("تعذر حفظ البيانات الوصفية: %s", exc) | |
| def _read_manifest_from_zip(self, zip_path: Path) -> Optional[dict]: | |
| """يقرأ البيانات الوصفية من أرشيف ZIP.""" | |
| try: | |
| with zipfile.ZipFile(zip_path, "r") as zf: | |
| if self.MANIFEST_FILENAME in zf.namelist(): | |
| data = json.loads(zf.read(self.MANIFEST_FILENAME)) | |
| return data | |
| except (zipfile.BadZipFile, json.JSONDecodeError, OSError) as exc: | |
| logger.debug("تعذرت قراءة البيانات الوصفية: %s", exc) | |
| return None | |
| # =================================================================== | |
| # أدوات مساعدة | |
| # =================================================================== | |
| def _should_exclude(self, path: Path, base: Path) -> bool: | |
| """ | |
| يتحقق مما إذا كان يجب استبعاد الملف/المجلد. | |
| المعاملات: | |
| path: مسار الملف | |
| base: مجلد المصدر الأساسي | |
| المعاد: | |
| True إذا كان يجب الاستبعاد | |
| """ | |
| import fnmatch | |
| # استبعاد مجلدات معينة | |
| skip_dirs = { | |
| "__pycache__", ".git", ".svn", ".hg", "node_modules", | |
| ".tox", ".venv", "venv", "env", | |
| ".mypy_cache", ".pytest_cache", ".ruff_cache", | |
| "dist", "build", ".eggs", | |
| } | |
| rel = path.relative_to(base) | |
| for part in rel.parts: | |
| if part in skip_dirs: | |
| return True | |
| # استبعاد الملف المخفي للنسخ الاحتياطية | |
| if path.name == self.MANIFEST_FILENAME: | |
| return True | |
| # فحص أنماط الاستبعاد | |
| for pattern in self.exclude_patterns: | |
| try: | |
| if fnmatch.fnmatch(str(rel), pattern) or fnmatch.fnmatch(path.name, pattern): | |
| return True | |
| except Exception: | |
| continue | |
| return False | |
| def _format_size(size_bytes: int) -> str: | |
| """ | |
| يحوّل الحجم بالبايت إلى صيغة مقروءة. | |
| المعاملات: | |
| size_bytes: الحجم بالبايت | |
| المعاد: | |
| الحجم بصيغة مقروءة (مثل "15.3 MB") | |
| """ | |
| if size_bytes < 0: | |
| return "0 B" | |
| units = ["B", "KB", "MB", "GB", "TB"] | |
| unit_index = 0 | |
| size = float(size_bytes) | |
| while size >= 1024.0 and unit_index < len(units) - 1: | |
| size /= 1024.0 | |
| unit_index += 1 | |
| if unit_index == 0: | |
| return f"{int(size)} {units[unit_index]}" | |
| else: | |
| return f"{size:.1f} {units[unit_index]}" | |
| def get_operation_log(self) -> list[dict]: | |
| """ | |
| يعرض سجل جميع عمليات النسخ الاحتياطي. | |
| المعاد: | |
| قائمة بعمليات النسخ والاستعادة | |
| """ | |
| return list(self._operation_log) | |
| def clear_operation_log(self) -> None: | |
| """يمسح سجل العمليات.""" | |
| self._operation_log.clear() | |
| logger.info("تم مسح سجل العمليات") | |