""" معالج الأرشيفات (Archive Handler) ==================================== يتعامل مع الأرشيفات المضغوطة بما في ذلك المحمية بكلمات مرور. القدرات: - استخراج الأرشيفات (zip, tar.gz, tar.bz2, 7z, rar) - إنشاء أرشيفات محمية بكلمات مرور - كشف نوع الأرشيف تلقائياً - التعامل مع الأرشيفات المتداخلة - عرض محتويات الأرشيف - كشف الحماية بكلمة مرور """ import logging import os import shutil import subprocess import tarfile import zipfile from pathlib import Path from typing import Optional, Callable logger = logging.getLogger(__name__) class ArchiveHandler: """ معالج الأرشيفات — يستخرج وينشئ أرشيفات متنوعة مع دعم كلمات المرور. الاستخدام: handler = ArchiveHandler() files = handler.extract_archive("backup.zip", "output/", password="123") """ # ======== الأنواع المدعومة ======== SUPPORTED_EXTENSIONS: set[str] = { ".zip", ".tar", ".gz", ".bz2", ".xz", ".7z", ".rar", ".tgz", ".tar.gz", ".tar.bz2", ".tar.xz", ".lzma", ".tbz2", ".txz", } # توقيعات سحرية لتحديد النوع MAGIC_TYPES: list[tuple[bytes, str]] = [ (b"PK\x03\x04", "zip"), (b"\x1f\x8b", "gzip"), (b"7z\xbc\xaf\x27\x1c", "7z"), (b"Rar!\x1a\x07", "rar"), (b"BZh", "bzip2"), (b"\xfd7zXZ\x00", "xz"), ] def __init__( self, progress_callback: Optional[Callable[[str, int, int], None]] = None, max_nested_depth: int = 5, ) -> None: """ تهيئة معالج الأرشيفات. المعاملات: progress_callback: دالة تُستدعى أثناء التقدم (اسم_الملف، الحالي، الإجمالي) max_nested_depth: أقصى عمق للأرشيفات المتداخلة """ self.progress_callback: Optional[Callable[[str, int, int], None]] = progress_callback self.max_nested_depth: int = max_nested_depth logger.info("تم تهيئة معالج الأرشيفات (أقصى عمق تداخل: %d)", max_nested_depth) # =================================================================== # كشف نوع الأرشيف # =================================================================== def detect_archive_type(self, archive_path: str | Path) -> str: """ يكشف نوع الأرشيف من الامتداد والتوقيع السحري. المعاملات: archive_path: مسار الأرشيف المعاد: نوع الأرشيف: 'zip', 'tar', 'tar.gz', 'tar.bz2', 'tar.xz', '7z', 'rar', أو 'unknown' """ path = Path(archive_path) # 1) كشف بالامتداد name_lower = path.name.lower() ext_map = { ".zip": "zip", ".tar": "tar", ".tar.gz": "tar.gz", ".tgz": "tar.gz", ".tar.bz2": "tar.bz2", ".tbz2": "tar.bz2", ".tar.xz": "tar.xz", ".txz": "tar.xz", ".gz": "gzip", ".bz2": "bzip2", ".xz": "xz", ".7z": "7z", ".rar": "rar", } # فحص الامتدادات المركبة أولاً for compound_ext in (".tar.gz", ".tar.bz2", ".tar.xz"): if name_lower.endswith(compound_ext): detected = ext_map[compound_ext] logger.debug("كشف نوع أرشيف %s -> %s (امتداد مركب)", path.name, detected) return detected suffix = path.suffix.lower() detected = ext_map.get(suffix) if detected: logger.debug("كشف نوع أرشيف %s -> %s (بالامتداد)", path.name, detected) return detected # 2) كشف بالتوقيع السحري try: with open(path, "rb") as f: header = f.read(16) for magic, atype in self.MAGIC_TYPES: if header.startswith(magic): logger.debug("كشف نوع أرشيف %s -> %s (بالتوقيع السحري)", path.name, atype) return atype except PermissionError: logger.warning("لا صلاحية لقراءة: %s", path) except OSError as exc: logger.warning("خطأ أثناء قراءة %s: %s", path, exc) logger.warning("نوع أرشيف غير معروف: %s", path.name) return "unknown" # =================================================================== # عرض المحتويات # =================================================================== def list_contents(self, archive_path: str | Path) -> list[dict]: """ يعرض قائمة بملفات الأرشيف مع معلوماتها. المعاملات: archive_path: مسار الأرشيف المعاد: قائمة بقواميس: [{"name": str, "size": int, "is_dir": bool, "date_time": tuple}, ...] """ path = Path(archive_path) if not path.exists(): raise FileNotFoundError(f"الأرشيف غير موجود: {path}") archive_type = self.detect_archive_type(path) contents: list[dict] = [] try: if archive_type == "zip": contents = self._list_zip(path) elif archive_type in ("tar", "tar.gz", "tar.bz2", "tar.xz"): contents = self._list_tar(path) elif archive_type == "7z": contents = self._list_7z(path) elif archive_type == "rar": contents = self._list_rar(path) else: logger.warning("نوع أرشيف غير مدعوم للعرض: %s", archive_type) except Exception as exc: logger.error("خطأ أثناء عرض محتويات %s: %s", path, exc) raise logger.info("عرض محتويات %s: %d ملف", path.name, len(contents)) return contents def _list_zip(self, path: Path) -> list[dict]: """يعرض محتويات أرشيف ZIP.""" contents = [] try: with zipfile.ZipFile(path, "r") as zf: for info in zf.infolist(): contents.append({ "name": info.filename, "size": info.file_size, "compressed_size": info.compress_size, "is_dir": info.is_dir(), "date_time": info.date_time, }) except zipfile.BadZipFile: logger.error("ملف ZIP تالف: %s", path) raise return contents def _list_tar(self, path: Path) -> list[dict]: """يعرض محتويات أرشيف TAR.""" contents = [] try: with tarfile.open(path, "r:*") as tf: for member in tf.getmembers(): contents.append({ "name": member.name, "size": member.size, "is_dir": member.isdir(), "date_time": member.mtime, "mode": member.mode, }) except tarfile.TarError as exc: logger.error("خطأ في أرشيف TAR %s: %s", path, exc) raise return contents def _list_7z(self, path: Path) -> list[dict]: """يعرض محتويات أرشيف 7Z (يتطلب 7z command).""" try: result = subprocess.run( ["7z", "l", "-slt", str(path)], capture_output=True, text=True, timeout=60, ) except FileNotFoundError: logger.error("الأمر '7z' غير متوفر. قم بتثبيت p7zip-full") raise RuntimeError("الأمر '7z' غير متوفر في النظام") except subprocess.TimeoutExpired: logger.error("انتهت مهلة عرض محتويات 7z") raise if result.returncode != 0: logger.error("خطأ في 7z: %s", result.stderr) raise RuntimeError(f"خطأ في 7z: {result.stderr}") contents = [] current_file: dict = {} for line in result.stdout.splitlines(): line = line.strip() if line.startswith("---"): if current_file.get("name"): contents.append(current_file) current_file = {} elif line.startswith("Path = "): current_file["name"] = line[7:] elif line.startswith("Size = "): try: current_file["size"] = int(line[7:]) except ValueError: current_file["size"] = 0 elif line.startswith("Folder = "): current_file["is_dir"] = line[9:] == "1" elif line.startswith("Attributes = "): current_file["is_dir"] = current_file.get("is_dir", "D" in line[13:]) if current_file.get("name"): contents.append(current_file) # تعبئة الحقول الافتراضية for item in contents: item.setdefault("size", 0) item.setdefault("is_dir", False) return contents def _list_rar(self, path: Path) -> list[dict]: """يعرض محتويات أرشيف RAR (يتطلب unrar).""" try: result = subprocess.run( ["unrar", "lt", "-p-", str(path)], capture_output=True, text=True, timeout=60, ) except FileNotFoundError: logger.error("الأمر 'unrar' غير متوفر") raise RuntimeError("الأمر 'unrar' غير متوفر في النظام") except subprocess.TimeoutExpired: logger.error("انتهت مهلة عرض محتويات RAR") raise if result.returncode not in (0, 10): # 10 = هناك تحذيرات logger.error("خطأ في unrar: %s", result.stderr) raise RuntimeError(f"خطأ في unrar: {result.stderr}") contents = [] for line in result.stdout.splitlines(): parts = line.split() if len(parts) >= 6 and parts[0].isdigit(): try: size = int(parts[1].replace(",", "")) except ValueError: size = 0 name = " ".join(parts[5:]) is_dir = parts[-1].upper() == "D" if parts else False contents.append({ "name": name, "size": size, "is_dir": is_dir, }) return contents # =================================================================== # فحص الحماية بكلمة مرور # =================================================================== def is_password_protected(self, archive_path: str | Path) -> bool: """ يتحقق مما إذا كان الأرشيف محمياً بكلمة مرور. المعاملات: archive_path: مسار الأرشيف المعاد: True إذا كان محمياً، False إذا لم يكن كذلك """ path = Path(archive_path) archive_type = self.detect_archive_type(path) try: if archive_type == "zip": return self._zip_is_encrypted(path) elif archive_type in ("tar", "tar.gz", "tar.bz2", "tar.xz"): # أرشيفات TAR عادية لا تدعم كلمات المرور # لكن يمكن حمايتها ببرامج خارجية return False elif archive_type == "7z": return self._7z_is_encrypted(path) elif archive_type == "rar": return self._rar_is_encrypted(path) else: logger.warning("لا يمكن فحص الحماية للنوع: %s", archive_type) return False except Exception as exc: logger.error("خطأ أثناء فحص الحماية: %s", exc) return False def _zip_is_encrypted(self, path: Path) -> bool: """يتحقق مما إذا كان ZIP محمياً.""" try: with zipfile.ZipFile(path, "r") as zf: for info in zf.infolist(): if info.flag_bits & 0x1: # بت التشفير return True except zipfile.BadZipFile: logger.error("ملف ZIP تالف: %s", path) return False def _7z_is_encrypted(self, path: Path) -> bool: """يتحقق مما إذا كان 7Z محمياً.""" try: result = subprocess.run( ["7z", "l", "-slt", str(path)], capture_output=True, text=True, timeout=30, ) return "Encrypted = +" in result.stdout except (FileNotFoundError, subprocess.TimeoutExpired): return False def _rar_is_encrypted(self, path: Path) -> bool: """يتحقق مما إذا كان RAR محمياً.""" try: result = subprocess.run( ["unrar", "lt", "-p-", str(path)], capture_output=True, text=True, timeout=30, ) return "*" in result.stdout.splitlines()[0] if result.stdout else False except (FileNotFoundError, subprocess.TimeoutExpired): return False # =================================================================== # استخراج الأرشيفات # =================================================================== def extract_archive( self, archive_path: str | Path, dest_dir: str | Path, password: Optional[str] = None, extract_nested: bool = False, ) -> list[str]: """ يستخرج أرشيفاً إلى مجلد الوجهة. المعاملات: archive_path: مسار الأرشيف dest_dir: مجلد الوجهة password: كلمة المرور (إذا كان محمياً) extract_nested: استخراج الأرشيفات المتداخلة المعاد: قائمة بمسارات الملفات المستخرجة """ path = Path(archive_path) dest = Path(dest_dir) if not path.exists(): raise FileNotFoundError(f"الأرشيف غير موجود: {path}") # إنشاء مجلد الوجهة try: dest.mkdir(parents=True, exist_ok=True) except PermissionError as exc: raise PermissionError(f"لا صلاحية لإنشاء {dest}: {exc}") from exc archive_type = self.detect_archive_type(path) extracted_files: list[str] = [] logger.info( "استخراج %s (النوع: %s) إلى %s", path.name, archive_type, dest, ) try: if archive_type == "zip": extracted_files = self._extract_zip(path, dest, password) elif archive_type in ("tar", "tar.gz", "tar.bz2", "tar.xz"): extracted_files = self._extract_tar(path, dest) elif archive_type == "7z": extracted_files = self._extract_7z(path, dest, password) elif archive_type == "rar": extracted_files = self._extract_rar(path, dest, password) else: raise ValueError(f"نوع أرشيف غير مدعوم: {archive_type}") except Exception as exc: logger.error("فشل استخراج %s: %s", path, exc) raise # استخراج الأرشيفات المتداخلة if extract_nested: nested_files = self._extract_nested(dest, depth=1) extracted_files.extend(nested_files) logger.info("تم استخراج %d ملف من %s", len(extracted_files), path.name) return extracted_files def _extract_zip( self, path: Path, dest: Path, password: Optional[str], ) -> list[str]: """يستخرج أرشيف ZIP.""" extracted: list[str] = [] pwd_bytes = password.encode("utf-8") if password else None try: with zipfile.ZipFile(path, "r") as zf: members = zf.infolist() total = len(members) for i, member in enumerate(members, 1): if self.progress_callback: self.progress_callback(member.filename, i, total) try: # حماية من Path Traversal member_path = Path(member.filename) if member_path.is_absolute() or ".." in member.parts: logger.warning("تخطي مسار خطر: %s", member.filename) continue target = dest / member.filename if member.is_dir(): target.mkdir(parents=True, exist_ok=True) continue target.parent.mkdir(parents=True, exist_ok=True) if pwd_bytes: zf.extract(member, dest, pwd=pwd_bytes) else: # محاولة بدون كلمة مرور try: zf.extract(member, dest) except RuntimeError: raise RuntimeError( "الأرشيف محمي بكلمة مرور. يرجى توفير كلمة المرور." ) extracted.append(str(target.resolve())) except RuntimeError as exc: if "password" in str(exc).lower() or "decrypt" in str(exc).lower(): raise RuntimeError( "الأرشيف محمي بكلمة مرور. يرجى توفير كلمة المرور." ) from exc raise except zipfile.BadZipFile: logger.error("ملف ZIP تالف: %s", path) raise return extracted def _extract_tar(self, path: Path, dest: Path) -> list[str]: """يستخرج أرشيف TAR.""" extracted: list[str] = [] try: with tarfile.open(path, "r:*") as tf: members = tf.getmembers() total = len(members) for i, member in enumerate(members, 1): if self.progress_callback: self.progress_callback(member.name, i, total) # حماية من Path Traversal member_path = Path(member.name) if member_path.is_absolute() or ".." in member_path.parts: logger.warning("تخطي مسار خطر في TAR: %s", member.name) continue try: tf.extract(member, dest) extracted.append(str((dest / member.name).resolve())) except PermissionError as exc: logger.warning("لا صلاحية لاستخراج %s: %s", member.name, exc) except OSError as exc: logger.warning("خطأ أثناء استخراج %s: %s", member.name, exc) except tarfile.TarError as exc: logger.error("خطأ في أرشيف TAR %s: %s", path, exc) raise return extracted def _extract_7z( self, path: Path, dest: Path, password: Optional[str], ) -> list[str]: """يستخرج أرشيف 7Z.""" cmd = ["7z", "x", f"-o{dest}", "-aoa", str(path)] if password: cmd.append(f"-p{password}") try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=300, ) except FileNotFoundError: raise RuntimeError("الأمر '7z' غير متوفر. قم بتثبيت p7zip-full") except subprocess.TimeoutExpired: raise RuntimeError("انتهت مهلة استخراج 7z") if result.returncode != 0: if password is None and "Wrong password" in result.stderr: raise RuntimeError("كلمة المرور خاطئة أو الأرشيف محمي بكلمة مرور") raise RuntimeError(f"خطأ في 7z: {result.stderr}") # جمع الملفات المستخرجة extracted = [ str(f.resolve()) for f in dest.rglob("*") if f.is_file() ] return extracted def _extract_rar( self, path: Path, dest: Path, password: Optional[str], ) -> list[str]: """يستخرج أرشيف RAR.""" cmd = ["unrar", "x", "-o+", f"{dest}/", str(path)] if password: cmd.insert(2, f"-p{password}") else: cmd.insert(2, "-p-") try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=300, ) except FileNotFoundError: raise RuntimeError("الأمر 'unrar' غير متوفر") except subprocess.TimeoutExpired: raise RuntimeError("انتهت مهلة استخراج RAR") if result.returncode not in (0, 10): if password is None: raise RuntimeError("الأرشيف محمي بكلمة مرور أو تالف") raise RuntimeError(f"خطأ في unrar: {result.stderr}") extracted = [ str(f.resolve()) for f in dest.rglob("*") if f.is_file() ] return extracted def _extract_nested(self, directory: Path, depth: int) -> list[str]: """ يبحث عن أرشيفات متداخلة ويستخرجها. المعاملات: directory: المجلد المراد فحصه depth: عمق التداخل الحالي المعاد: قائمة بالملفات المستخرجة من الأرشيفات المتداخلة """ if depth > self.max_nested_depth: logger.warning("تم تجاوز أقصى عمق تداخل: %d", self.max_nested_depth) return [] nested_extracted: list[str] = [] for item in directory.rglob("*"): if not item.is_file(): continue archive_type = self.detect_archive_type(item) if archive_type == "unknown": continue logger.info( "اكتشاف أرشيف متداخل (العمق %d): %s", depth, item.name, ) # مجلد خاص للأرشيف المتداخل nested_dest = item.parent / f"{item.stem}_extracted" try: files = self.extract_archive( item, nested_dest, extract_nested=False, ) nested_extracted.extend(files) # حذف الأرشيف المتداخل بعد الاستخراج try: item.unlink() logger.debug("تم حذف الأرشيف المتداخل: %s", item.name) except OSError as exc: logger.warning("تعذر حذف %s: %s", item.name, exc) # استمرار البحث في المجلد الجديد nested_extracted.extend( self._extract_nested(nested_dest, depth + 1) ) except Exception as exc: logger.warning( "فشل استخراج الأرشيف المتداخل %s: %s", item.name, exc, ) return nested_extracted # =================================================================== # إنشاء الأرشيفات # =================================================================== def create_archive( self, files: list[str | Path], output_path: str | Path, password: Optional[str] = None, archive_type: Optional[str] = None, ) -> str: """ ينشئ أرشيفاً من قائمة ملفات. المعاملات: files: قائمة مسارات الملفات/Mجلدات output_path: مسار الأرشيف الناتج password: كلمة المرور (اختياري) archive_type: نوع الأرشيف ('zip', 'tar.gz', '7z'). يُكتشف تلقائياً إن لم يحدد. المعاد: مسار الأرشيف المنشأ """ output = Path(output_path) total = len(files) # كشف النوع if archive_type is None: suffix = output.suffix.lower() type_by_ext = { ".zip": "zip", ".tar.gz": "tar.gz", ".tgz": "tar.gz", ".tar.bz2": "tar.bz2", ".tbz2": "tar.bz2", ".tar.xz": "tar.xz", ".txz": "tar.xz", ".7z": "7z", } for ext, atype in sorted(type_by_ext.items(), key=lambda x: -len(x[0])): if output.name.lower().endswith(ext): archive_type = atype break if archive_type is None: archive_type = "zip" # إنشاء المجلد الأب output.parent.mkdir(parents=True, exist_ok=True) logger.info( "إنشاء أرشيف %s (النوع: %s, الملفات: %d, محمي: %s)", output.name, archive_type, total, password is not None, ) try: if archive_type == "zip": self._create_zip(files, output, password) elif archive_type in ("tar.gz", "tar.bz2", "tar.xz", "tar"): self._create_tar(files, output, archive_type) elif archive_type == "7z": self._create_7z(files, output, password) else: raise ValueError(f"نوع أرشيف غير مدعوم للإنشاء: {archive_type}") except Exception as exc: logger.error("فشل إنشاء الأرشيف: %s", exc) # حذف الملف الجزئي if output.exists(): try: output.unlink() except OSError: pass raise logger.info("تم إنشاء الأرشيف بنجاح: %s", output) return str(output.resolve()) def _create_zip( self, files: list[str | Path], output: Path, password: Optional[str], ) -> None: """ينشئ أرشيف ZIP.""" pwd_bytes = password.encode("utf-8") if password else None with zipfile.ZipFile(output, "w", zipfile.ZIP_DEFLATED) as zf: for i, file_path in enumerate(files, 1): path = Path(file_path) if self.progress_callback: self.progress_callback(str(path), i, len(files)) if path.is_file(): try: zf.write(path, path.name) except PermissionError as exc: logger.warning("تخطي %s (لا صلاحية): %s", path, exc) elif path.is_dir(): for item in path.rglob("*"): if item.is_file(): try: arcname = item.relative_to(path.parent) zf.write(item, arcname) except PermissionError as exc: logger.warning("تخطي %s: %s", item, exc) def _create_tar( self, files: list[str | Path], output: Path, archive_type: str, ) -> None: """ينشئ أرشيف TAR.""" mode_map = { "tar": "w:", "tar.gz": "w:gz", "tar.bz2": "w:bz2", "tar.xz": "w:xz", } mode = mode_map.get(archive_type, "w:gz") with tarfile.open(output, mode) as tf: for i, file_path in enumerate(files, 1): path = Path(file_path) if self.progress_callback: self.progress_callback(str(path), i, len(files)) if path.is_file(): try: tf.add(path, arcname=path.name) except PermissionError as exc: logger.warning("تخطي %s: %s", path, exc) elif path.is_dir(): try: tf.add(path, arcname=path.name) except PermissionError as exc: logger.warning("تخطي %s: %s", path, exc) def _create_7z( self, files: list[str | Path], output: Path, password: Optional[str], ) -> None: """ينشئ أرشيف 7Z.""" cmd = ["7z", "a", str(output)] if password: cmd.append(f"-p{password}") file_args = [str(Path(f).resolve()) for f in files] cmd.extend(file_args) try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=600, ) except FileNotFoundError: raise RuntimeError("الأمر '7z' غير متوفر") except subprocess.TimeoutExpired: raise RuntimeError("انتهت مهلة إنشاء 7z") if result.returncode != 0: raise RuntimeError(f"خطأ في 7z: {result.stderr}")