OmniFile-Processor / modules /security /backup_manager.py
Dr. Abdulmalek
deploy: OmniFile AI Processor v4.3.0
900df0b
"""
مدير النسخ الاحتياطية (Backup Manager)
=========================================
ينشئ نسخاً احتياطية مُرقّمة لمشروع البيانات مع دعم النسخ التزايدي
والاستعادة والتنظيف.
القدرات:
- إنشاء نسخ احتياطية مرقّمة بالتاريخ والوقت
- إنشاء نسخ مضغوطة (ZIP)
- دعم النسخ التزايدي (الملفات المعدّلة فقط)
- استعادة من نسخة احتياطية
- عرض قائمة النسخ الاحتياطية
- تنظيف النسخ القديمة
- تسجيل جميع العمليات
"""
import hashlib
import json
import logging
import shutil
import tarfile
import zipfile
from datetime import datetime
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
class BackupManager:
"""
مدير النسخ الاحتياطية — ينشئ ويستعيد النسخ الاحتياطية المرقّمة.
الاستخدام:
manager = BackupManager()
backup_path = manager.create_backup("/my/project", "/backups")
manager.restore_backup(backup_path, "/restore/target")
"""
# ======== ثوابت ========
MANIFEST_FILENAME: str = ".backup_manifest.json"
HASH_ALGORITHM: str = "sha256"
BACKUP_TIMESTAMP_FORMAT: str = "%Y%m%d_%H%M%S"
DEFAULT_MAX_SIZE_ZIP: int = 4 * 1024 * 1024 * 1024 # 4 جيجابايت
def __init__(
self,
compress: bool = True,
incremental: bool = False,
exclude_patterns: Optional[list[str]] = None,
log_file: Optional[str] = None,
) -> None:
"""
تهيئة مدير النسخ الاحتياطية.
المعاملات:
compress: ضغط النسخة الاحتياطية كـ ZIP
incremental: تفعيل النسخ التزايدي (الملفات المعدّلة فقط)
exclude_patterns: أنماط glob لاستبعاد ملفات معينة
log_file: مسار ملف سجل العمليات (اختياري)
"""
self.compress: bool = compress
self.incremental: bool = incremental
self.exclude_patterns: list[str] = exclude_patterns or []
# إعداد تسجيل مخصص
self._operation_log: list[dict] = []
if log_file:
try:
file_handler = logging.FileHandler(log_file, encoding="utf-8")
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(
logging.Formatter(
"%(asctime)s | %(levelname)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
)
logger.addHandler(file_handler)
logger.info("تم تفعيل تسجيل العمليات في: %s", log_file)
except PermissionError as exc:
logger.warning("لا صلاحية لإنشاء ملف السجل: %s", exc)
# تخزين التجزئات للنسخ التزايدي
self._file_hashes: dict[str, str] = {}
logger.info(
"تم تهيئة مدير النسخ الاحتياطية — ضغط: %s | تزايدي: %s",
compress, incremental,
)
# ===================================================================
# إنشاء نسخة احتياطية
# ===================================================================
def create_backup(
self,
source_dir: str | Path,
backup_dir: str | Path,
label: Optional[str] = None,
) -> str:
"""
ينشئ نسخة احتياطية كاملة لمجلد المصدر.
المعاملات:
source_dir: مجلد المصدر المراد نسخه
backup_dir: مجلد حفظ النسخ الاحتياطية
label: تسمية اختيارية للنسخة
المعاد:
مسار النسخة الاحتياطية المنشأة
"""
source = Path(source_dir).resolve()
dest = Path(backup_dir).resolve()
if not source.is_dir():
raise FileNotFoundError(f"مجلد المصدر غير موجود: {source}")
# إنشاء مجلد النسخ الاحتياطية
try:
dest.mkdir(parents=True, exist_ok=True)
except PermissionError as exc:
raise PermissionError(f"لا صلاحية لإنشاء {dest}: {exc}") from exc
# اسم النسخة الاحتياطية
timestamp = datetime.now().strftime(self.BACKUP_TIMESTAMP_FORMAT)
source_name = source.name
label_suffix = f"_{label}" if label else ""
if self.compress:
backup_filename = f"{source_name}_backup_{timestamp}{label_suffix}.zip"
backup_path = dest / backup_filename
else:
backup_filename = f"{source_name}_backup_{timestamp}{label_suffix}"
backup_path = dest / backup_filename
logger.info("بدء إنشاء نسخة احتياطية: %s", backup_path.name)
start_time = datetime.now()
total_files = 0
total_size = 0
skipped = 0
try:
if self.compress:
total_files, total_size, skipped = self._create_zip_backup(
source, backup_path,
)
else:
total_files, total_size, skipped = self._create_dir_backup(
source, backup_path,
)
except Exception as exc:
logger.error("فشل إنشاء النسخة الاحتياطية: %s", exc)
# حذف الملف الجزئي
if backup_path.exists():
try:
if backup_path.is_dir():
shutil.rmtree(backup_path)
else:
backup_path.unlink()
except OSError:
pass
raise
elapsed = (datetime.now() - start_time).total_seconds()
# تسجيل العملية
operation = {
"timestamp": start_time.isoformat(),
"action": "create_backup",
"source": str(source),
"backup_path": str(backup_path.resolve()),
"total_files": total_files,
"total_size": total_size,
"skipped": skipped,
"elapsed_seconds": round(elapsed, 2),
"compressed": self.compress,
"incremental": self.incremental,
"label": label,
}
self._operation_log.append(operation)
self._save_manifest(backup_path, operation)
logger.info(
"تم إنشاء نسخة احتياطية بنجاح — المسار: %s | الملفات: %d | الحجم: %s | الوقت: %.1f ثانية",
backup_path, total_files, self._format_size(total_size), elapsed,
)
return str(backup_path.resolve())
def _create_zip_backup(
self,
source: Path,
backup_path: Path,
) -> tuple[int, int, int]:
"""
ينشئ نسخة احتياطية مضغوطة كـ ZIP.
المعاد:
(عدد_الملفات، الحجم_الإجمالي، عدد_المتخطّى)
"""
total_files = 0
total_size = 0
skipped = 0
# جمع الملفات المراد نسخها
files_to_backup: list[Path] = []
for item in source.rglob("*"):
if not item.is_file():
continue
if self._should_exclude(item, source):
skipped += 1
continue
if self.incremental and not self._is_file_changed(item, source):
skipped += 1
continue
files_to_backup.append(item)
# إنشاء أرشيف ZIP
try:
with zipfile.ZipFile(
backup_path, "w",
zipfile.ZIP_DEFLATED,
compresslevel=6,
) as zf:
for file_path in files_to_backup:
try:
arcname = file_path.relative_to(source)
zf.write(file_path, arcname)
file_size = file_path.stat().st_size
total_size += file_size
total_files += 1
# تحديث التجزئة
if self.incremental:
rel = str(arcname)
self._file_hashes[rel] = self._hash_file(file_path)
except PermissionError as exc:
logger.warning("تخطي %s (لا صلاحية): %s", file_path, exc)
skipped += 1
except OSError as exc:
logger.warning("تخطي %s (خطأ): %s", file_path, exc)
skipped += 1
except PermissionError as exc:
raise PermissionError(f"لا صلاحية لإنشاء {backup_path}: {exc}") from exc
return total_files, total_size, skipped
def _create_dir_backup(
self,
source: Path,
backup_path: Path,
) -> tuple[int, int, int]:
"""
ينشئ نسخة احتياطية كمجلد (بدون ضغط).
المعاد:
(عدد_الملفات، الحجم_الإجمالي، عدد_المتخطّى)
"""
total_files = 0
total_size = 0
skipped = 0
try:
backup_path.mkdir(parents=True, exist_ok=True)
except PermissionError as exc:
raise PermissionError(f"لا صلاحية لإنشاء {backup_path}: {exc}") from exc
for item in source.rglob("*"):
if self._should_exclude(item, source):
skipped += 1
continue
rel = item.relative_to(source)
target = backup_path / rel
try:
if item.is_dir():
target.mkdir(parents=True, exist_ok=True)
elif item.is_file():
if self.incremental and not self._is_file_changed(item, source):
skipped += 1
continue
target.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(item, target)
file_size = item.stat().st_size
total_size += file_size
total_files += 1
if self.incremental:
self._file_hashes[str(rel)] = self._hash_file(item)
except PermissionError as exc:
logger.warning("تخطي %s (لا صلاحية): %s", item, exc)
skipped += 1
except OSError as exc:
logger.warning("تخطي %s (خطأ): %s", item, exc)
skipped += 1
return total_files, total_size, skipped
# ===================================================================
# استعادة نسخة احتياطية
# ===================================================================
def restore_backup(
self,
backup_path: str | Path,
target_dir: str | Path,
overwrite: bool = False,
) -> dict:
"""
يستعيد نسخة احتياطية إلى مجلد الهدف.
المعاملات:
backup_path: مسار النسخة الاحتياطية
target_dir: مجلد الاستعادة
overwrite: الكتابة فوق الملفات الموجودة
المعاد:
تقرير الاستعادة:
{"restored_files": int, "skipped": int, "errors": list}
"""
backup = Path(backup_path).resolve()
target = Path(target_dir).resolve()
if not backup.exists():
raise FileNotFoundError(f"النسخة الاحتياطية غير موجودة: {backup}")
# إنشاء مجلد الهدف
try:
target.mkdir(parents=True, exist_ok=True)
except PermissionError as exc:
raise PermissionError(f"لا صلاحية لإنشاء {target}: {exc}") from exc
report: dict = {
"restored_files": 0,
"skipped": 0,
"errors": [],
}
start_time = datetime.now()
logger.info("بدء استعادة من: %s إلى %s", backup.name, target)
try:
if backup.is_file() and backup.suffix.lower() == ".zip":
self._restore_from_zip(backup, target, overwrite, report)
elif backup.is_dir():
self._restore_from_dir(backup, target, overwrite, report)
else:
raise ValueError(f"نوع نسخة احتياطية غير معروف: {backup}")
except Exception as exc:
logger.error("فشل استعادة النسخة الاحتياطية: %s", exc)
raise
elapsed = (datetime.now() - start_time).total_seconds()
# تسجيل العملية
operation = {
"timestamp": start_time.isoformat(),
"action": "restore_backup",
"backup_path": str(backup),
"target": str(target),
"restored_files": report["restored_files"],
"skipped": report["skipped"],
"errors_count": len(report["errors"]),
"elapsed_seconds": round(elapsed, 2),
}
self._operation_log.append(operation)
logger.info(
"اكتملت الاستعادة — مستعادة: %d | متخطاة: %d | أخطاء: %d | الوقت: %.1f ثانية",
report["restored_files"], report["skipped"],
len(report["errors"]), elapsed,
)
return report
def _restore_from_zip(
self,
backup: Path,
target: Path,
overwrite: bool,
report: dict,
) -> None:
"""يستعيد من أرشيف ZIP."""
try:
with zipfile.ZipFile(backup, "r") as zf:
for info in zf.infolist():
if info.is_dir():
(target / info.filename).mkdir(parents=True, exist_ok=True)
continue
# حماية من Path Traversal
member_path = Path(info.filename)
if member_path.is_absolute() or ".." in member_path.parts:
report["errors"].append({"file": info.filename, "error": "مسار خطر"})
continue
dest_file = target / info.filename
if dest_file.exists() and not overwrite:
report["skipped"] += 1
continue
try:
dest_file.parent.mkdir(parents=True, exist_ok=True)
with zf.open(info) as src, open(dest_file, "wb") as dst:
shutil.copyfileobj(src, dst)
report["restored_files"] += 1
except PermissionError as exc:
report["errors"].append({"file": info.filename, "error": str(exc)})
except OSError as exc:
report["errors"].append({"file": info.filename, "error": str(exc)})
except zipfile.BadZipFile:
raise ValueError(f"ملف ZIP تالف: {backup}")
def _restore_from_dir(
self,
backup: Path,
target: Path,
overwrite: bool,
report: dict,
) -> None:
"""يستعيذ من مجلد نسخة احتياطية."""
for item in backup.rglob("*"):
rel = item.relative_to(backup)
dest = target / rel
try:
if item.is_dir():
dest.mkdir(parents=True, exist_ok=True)
elif item.is_file():
if dest.exists() and not overwrite:
report["skipped"] += 1
continue
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(item, dest)
report["restored_files"] += 1
except PermissionError as exc:
report["errors"].append({"file": str(item), "error": str(exc)})
except OSError as exc:
report["errors"].append({"file": str(item), "error": str(exc)})
# ===================================================================
# عرض النسخ الاحتياطية
# ===================================================================
def list_backups(self, backup_dir: str | Path) -> list[dict]:
"""
يعرض قائمة بجميع النسخ الاحتياطية في مجلد.
المعاملات:
backup_dir: مجلد النسخ الاحتياطية
المعاد:
قائمة بمعلومات النسخ الاحتياطية:
[{"name": str, "path": str, "size": int, "created": str, "type": str}, ...]
"""
dir_path = Path(backup_dir).resolve()
if not dir_path.is_dir():
raise FileNotFoundError(f"مجلد النسخ الاحتياطية غير موجود: {dir_path}")
backups: list[dict] = []
for item in sorted(dir_path.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True):
try:
if item.is_file() and item.suffix.lower() == ".zip":
size = item.stat().st_size
created = datetime.fromtimestamp(item.stat().st_ctime).isoformat()
# محاولة قراءة البيانات الوصفية من ملف ZIP
manifest = self._read_manifest_from_zip(item)
extra_info = {}
if manifest:
extra_info = {
"source": manifest.get("source", ""),
"total_files": manifest.get("total_files", 0),
"label": manifest.get("label", ""),
}
backups.append({
"name": item.name,
"path": str(item.resolve()),
"size": size,
"size_formatted": self._format_size(size),
"created": created,
"type": "zip",
**extra_info,
})
elif item.is_dir() and "backup_" in item.name.lower():
# حساب حجم المجلد
size = sum(f.stat().st_size for f in item.rglob("*") if f.is_file())
created = datetime.fromtimestamp(item.stat().st_ctime).isoformat()
backups.append({
"name": item.name,
"path": str(item.resolve()),
"size": size,
"size_formatted": self._format_size(size),
"created": created,
"type": "directory",
})
except PermissionError as exc:
logger.warning("لا صلاحية لقراءة %s: %s", item, exc)
except OSError as exc:
logger.warning("خطأ في %s: %s", item, exc)
logger.info("عرض %d نسخة احتياطية في %s", len(backups), dir_path)
return backups
# ===================================================================
# تنظيف النسخ القديمة
# ===================================================================
def cleanup_old_backups(
self,
backup_dir: str | Path,
keep_last: int = 5,
dry_run: bool = False,
) -> dict:
"""
يحذف النسخ الاحتياطية القديمة مع الاحتفاظ بأحدثها.
المعاملات:
backup_dir: مجلد النسخ الاحتياطية
keep_last: عدد النسخ المراد الاحتفاظ بها
dry_run: عرض ما سيُحذف بدون حذف فعلًا
المعاد:
تقرير التنظيف:
{"kept": [str, ...], "deleted": [str, ...], "freed_space": int}
"""
dir_path = Path(backup_dir).resolve()
if not dir_path.is_dir():
raise FileNotFoundError(f"مجلد النسخ الاحتياطية غير موجود: {dir_path}")
all_backups = self.list_backups(dir_path)
report: dict = {
"kept": [],
"deleted": [],
"freed_space": 0,
}
if len(all_backups) <= keep_last:
report["kept"] = [b["path"] for b in all_backups]
logger.info("لا حاجة للتنظيف — النسخ (%d) ≤ الحد المطلوب (%d)", len(all_backups), keep_last)
return report
# النسخ المراد حذفها (الأقدم)
to_delete = all_backups[keep_last:]
to_keep = all_backups[:keep_last]
for backup_info in to_delete:
backup_path = Path(backup_info["path"])
report["deleted"].append(str(backup_path))
report["freed_space"] += backup_info["size"]
if not dry_run:
try:
if backup_path.is_dir():
shutil.rmtree(backup_path)
elif backup_path.is_file():
backup_path.unlink()
logger.info("تم حذف: %s", backup_path.name)
except PermissionError as exc:
logger.error("لا صلاحية لحذف %s: %s", backup_path, exc)
except OSError as exc:
logger.error("خطأ أثناء حذف %s: %s", backup_path, exc)
else:
logger.info("[محاكاة] سيتم حذف: %s", backup_path.name)
report["kept"] = [b["path"] for b in to_keep]
# تسجيل العملية
operation = {
"timestamp": datetime.now().isoformat(),
"action": "cleanup_old_backups",
"backup_dir": str(dir_path),
"keep_last": keep_last,
"deleted_count": len(report["deleted"]),
"freed_space": report["freed_space"],
"dry_run": dry_run,
}
self._operation_log.append(operation)
logger.info(
"اكتمل التنظيف — محذوف: %d | محفوظ: %d | مساحة محررة: %s",
len(report["deleted"]), len(report["kept"]),
self._format_size(report["freed_space"]),
)
return report
# ===================================================================
# حجم النسخة الاحتياطية
# ===================================================================
def get_backup_size(self, backup_path: str | Path) -> dict:
"""
يعرض معلومات الحجم لنسخة احتياطية.
المعاملات:
backup_path: مسار النسخة الاحتياطية
المعاد:
{"path": str, "size_bytes": int, "size_formatted": str, "type": str}
"""
path = Path(backup_path).resolve()
if not path.exists():
raise FileNotFoundError(f"النسخة الاحتياطية غير موجودة: {path}")
try:
if path.is_file():
size = path.stat().st_size
btype = "zip"
elif path.is_dir():
size = sum(f.stat().st_size for f in path.rglob("*") if f.is_file())
btype = "directory"
else:
return {"path": str(path), "size_bytes": 0, "size_formatted": "0 B", "type": "unknown"}
except PermissionError as exc:
logger.error("لا صلاحية لقراءة: %s", path)
raise
return {
"path": str(path),
"size_bytes": size,
"size_formatted": self._format_size(size),
"type": btype,
}
# ===================================================================
# بيانات التجزئة (للنسخ التزايدي)
# ===================================================================
def _hash_file(self, file_path: Path) -> str:
"""
يحسب تجزئة SHA-256 لملف.
المعاملات:
file_path: مسار الملف
المعاد:
سلسلة التجزئة السداسية عشرية
"""
hasher = hashlib.new(self.HASH_ALGORITHM)
try:
with open(file_path, "rb") as f:
# قراءة بأجزاء للمفاتيح الكبيرة
for chunk in iter(lambda: f.read(65536), b""):
hasher.update(chunk)
return hasher.hexdigest()
except PermissionError:
logger.warning("لا صلاحية لتجزئة: %s", file_path)
return ""
except OSError as exc:
logger.warning("خطأ أثناء تجزئة %s: %s", file_path, exc)
return ""
def _is_file_changed(self, file_path: Path, source: Path) -> bool:
"""
يتحقق مما إذا كان الملف قد تغيّر منذ آخر نسخة احتياطية.
المعاملات:
file_path: مسار الملف الحالي
source: مجلد المصدر
المعاد:
True إذا كان الملف جديداً أو معدّلاً
"""
rel = str(file_path.relative_to(source))
current_hash = self._hash_file(file_path)
if not current_hash:
return True # في حالة الخطأ، نأخذ الاحتياط
stored_hash = self._file_hashes.get(rel)
if stored_hash is None or stored_hash != current_hash:
return True
return False
def load_hashes(self, backup_dir: str | Path) -> None:
"""
يحمّل تجزئات الملفات من آخر نسخة احتياطية لتمكين النسخ التزايدي.
المعاملات:
backup_dir: مجلد النسخ الاحتياطية
"""
dir_path = Path(backup_dir).resolve()
if not dir_path.is_dir():
return
# البحث عن أحدث نسخة احتياطية
backups = self.list_backups(dir_path)
if not backups:
logger.info("لا توجد نسخ احتياطية سابقة لتحميل التجزئات")
return
latest = backups[0]
latest_path = Path(latest["path"])
if latest["type"] == "zip":
hashes = self._read_hashes_from_zip(latest_path)
elif latest["type"] == "directory":
hashes = self._read_hashes_from_dir(latest_path)
else:
return
if hashes:
self._file_hashes = hashes
logger.info("تم تحميل %d تجزئة من النسخة السابقة", len(hashes))
def _read_hashes_from_zip(self, zip_path: Path) -> dict[str, str]:
"""يقرأ تجزئات الملفات من أرشيف ZIP."""
hashes: dict[str, str] = {}
try:
with zipfile.ZipFile(zip_path, "r") as zf:
manifest_name = self.MANIFEST_FILENAME
if manifest_name in zf.namelist():
data = json.loads(zf.read(manifest_name))
hashes = data.get("file_hashes", {})
except (zipfile.BadZipFile, json.JSONDecodeError, KeyError) as exc:
logger.warning("تعذرت قراءة التجزئات من ZIP: %s", exc)
return hashes
def _read_hashes_from_dir(self, dir_path: Path) -> dict[str, str]:
"""يقرأ تجزئات الملفات من مجلد نسخة احتياطية."""
manifest_path = dir_path / self.MANIFEST_FILENAME
if manifest_path.is_file():
try:
data = json.loads(manifest_path.read_text(encoding="utf-8"))
return data.get("file_hashes", {})
except (json.JSONDecodeError, OSError) as exc:
logger.warning("تعذرت قراءة التجزئات من المجلد: %s", exc)
return {}
# ===================================================================
# البيانات الوصفية (Manifest)
# ===================================================================
def _save_manifest(self, backup_path: Path, info: dict) -> None:
"""يحفظ بيانات وصفية للنسخة الاحتياطية."""
manifest = {
"version": "1.0",
"created": info["timestamp"],
"source": info["source"],
"total_files": info["total_files"],
"total_size": info["total_size"],
"compressed": info["compressed"],
"incremental": info["incremental"],
"label": info.get("label", ""),
"file_hashes": dict(self._file_hashes) if self.incremental else {},
}
manifest_json = json.dumps(manifest, ensure_ascii=False, indent=2)
try:
if backup_path.is_file() and backup_path.suffix.lower() == ".zip":
# إضافة البيانات الوصفية إلى أرشيف ZIP
with zipfile.ZipFile(backup_path, "a") as zf:
zf.writestr(self.MANIFEST_FILENAME, manifest_json)
else:
# حفظ كملف في مجلد النسخة الاحتياطية
manifest_file = backup_path / self.MANIFEST_FILENAME
manifest_file.write_text(manifest_json, encoding="utf-8")
except Exception as exc:
logger.warning("تعذر حفظ البيانات الوصفية: %s", exc)
def _read_manifest_from_zip(self, zip_path: Path) -> Optional[dict]:
"""يقرأ البيانات الوصفية من أرشيف ZIP."""
try:
with zipfile.ZipFile(zip_path, "r") as zf:
if self.MANIFEST_FILENAME in zf.namelist():
data = json.loads(zf.read(self.MANIFEST_FILENAME))
return data
except (zipfile.BadZipFile, json.JSONDecodeError, OSError) as exc:
logger.debug("تعذرت قراءة البيانات الوصفية: %s", exc)
return None
# ===================================================================
# أدوات مساعدة
# ===================================================================
def _should_exclude(self, path: Path, base: Path) -> bool:
"""
يتحقق مما إذا كان يجب استبعاد الملف/المجلد.
المعاملات:
path: مسار الملف
base: مجلد المصدر الأساسي
المعاد:
True إذا كان يجب الاستبعاد
"""
import fnmatch
# استبعاد مجلدات معينة
skip_dirs = {
"__pycache__", ".git", ".svn", ".hg", "node_modules",
".tox", ".venv", "venv", "env",
".mypy_cache", ".pytest_cache", ".ruff_cache",
"dist", "build", ".eggs",
}
rel = path.relative_to(base)
for part in rel.parts:
if part in skip_dirs:
return True
# استبعاد الملف المخفي للنسخ الاحتياطية
if path.name == self.MANIFEST_FILENAME:
return True
# فحص أنماط الاستبعاد
for pattern in self.exclude_patterns:
try:
if fnmatch.fnmatch(str(rel), pattern) or fnmatch.fnmatch(path.name, pattern):
return True
except Exception:
continue
return False
@staticmethod
def _format_size(size_bytes: int) -> str:
"""
يحوّل الحجم بالبايت إلى صيغة مقروءة.
المعاملات:
size_bytes: الحجم بالبايت
المعاد:
الحجم بصيغة مقروءة (مثل "15.3 MB")
"""
if size_bytes < 0:
return "0 B"
units = ["B", "KB", "MB", "GB", "TB"]
unit_index = 0
size = float(size_bytes)
while size >= 1024.0 and unit_index < len(units) - 1:
size /= 1024.0
unit_index += 1
if unit_index == 0:
return f"{int(size)} {units[unit_index]}"
else:
return f"{size:.1f} {units[unit_index]}"
def get_operation_log(self) -> list[dict]:
"""
يعرض سجل جميع عمليات النسخ الاحتياطي.
المعاد:
قائمة بعمليات النسخ والاستعادة
"""
return list(self._operation_log)
def clear_operation_log(self) -> None:
"""يمسح سجل العمليات."""
self._operation_log.clear()
logger.info("تم مسح سجل العمليات")