""" HandwrittenOCR - نظام المزامنة v5.0 ===================================== وحدة متكاملة تدعم: - قفل الملفات (File Locking) لمنع التعارضات بين الأجهزة - إدارة حالة المزامنة (Sync Status) - تكامل مع Syncthing للمزامنة التلقائية أوفلاين/أونلاين - دعم العمل المتزامن من جهازين (جوال + حاسوب) """ import os import json import time import logging import platform import socket from datetime import datetime from pathlib import Path from contextlib import contextmanager from typing import Optional logger = logging.getLogger("HandwrittenOCR.Sync") # ========================================================================= # 1. نظام قفل الملفات (File Locking) # ========================================================================= class FileLock: """ قفل ملفات لمنع التعارضات عند العمل من عدة أجهزة متزامنة. يستخدم fcntl على Linux/macOS و msvcrt على Windows. الاستخدام: lock = FileLock(config.lock_file_path, timeout=30) with lock: # عمليات قاعدة البيانات هنا db.insert_word(...) """ def __init__(self, lock_path: str, timeout: int = 30): self.lock_path = Path(lock_path) self.timeout = timeout self._lock_file = None self._system = platform.system() def acquire(self) -> bool: """الحصول على القفل مع مهلة محددة""" self.lock_path.parent.mkdir(parents=True, exist_ok=True) self._lock_file = open(self.lock_path, "w") start = time.time() while True: try: if self._system == "Windows": import msvcrt msvcrt.locking(self._lock_file.fileno(), msvcrt.LK_NBLCK, 1) else: import fcntl fcntl.flock(self._lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) # كتابة معلومات القفل lock_info = { "pid": os.getpid(), "hostname": socket.gethostname(), "timestamp": datetime.now().isoformat(), "user": os.environ.get("USER", "unknown"), } self._lock_file.seek(0) self._lock_file.truncate() self._lock_file.write(json.dumps(lock_info, indent=2)) self._lock_file.flush() logger.debug(f"تم الحصول على القفل: {self.lock_path}") return True except (BlockingIOError, OSError, ImportError): if time.time() - start > self.timeout: self._lock_file.close() self._lock_file = None raise TimeoutError( "تعذر الحصول على قفل الملف - جهاز آخر يعمل حالياً. " "حاول بعد قليل أو تحقق من أن الجهاز الآخر أوقف المعالجة." ) time.sleep(0.5) def release(self) -> None: """تحرير القفل""" if self._lock_file is None: return try: if self._system == "Windows": import msvcrt self._lock_file.seek(0) msvcrt.locking(self._lock_file.fileno(), msvcrt.LK_UNLCK, 1) else: import fcntl fcntl.flock(self._lock_file.fileno(), fcntl.LOCK_UN) except Exception as e: logger.debug(f"خطأ في تحرير القفل: {e}") finally: try: self._lock_file.close() except Exception: pass self._lock_file = None def get_lock_info(self) -> Optional[dict]: """قراءة معلومات القفل الحالي (من جهاز آخر)""" if not self.lock_path.exists(): return None try: with open(self.lock_path, "r") as f: return json.load(f) except Exception: return None def is_locked(self) -> bool: """التحقق مما إذا كان القفل مفعلاً بواسطة عملية أخرى""" info = self.get_lock_info() if info is None: return False # التحقق مما إذا كانت العملية لا تزال نشطة pid = info.get("pid") if pid is None: return False try: os.kill(pid, 0) # لا يرسل إشارة، فقط يتحقق return True except (ProcessLookupError, PermissionError): # العملية غير موجودة - القفل قديم try: self.lock_path.unlink() except Exception: pass return False def __enter__(self): self.acquire() return self def __exit__(self, exc_type, exc_val, exc_tb): self.release() return False # ========================================================================= # 2. إدارة حالة المزامنة (Sync Status) # ========================================================================= class SyncManager: """ مدير المزامنة الشامل - يتتبع حالة التزامن بين الأجهزة. يدعم Syncthing ويمكن تمديد لدعم خدمات أخرى. """ # الملفات والمجلدات التي يجب مزامنتها SYNC_PATTERNS = [ "database/handwriting_data.db", "logs/user_corrections_feedback.csv", "artifacts/correction_dict.json", "exports/", "input_pdfs/", "sync_status.json", "artifacts/ocr_checkpoint.json", "logs/processing_stats.json", "logs/runs_history.csv", ] # المجلدات التي لا يجب مزامنتها (كبيرة جداً) NO_SYNC_PATTERNS = [ "models_cache/", "runs/", # TensorBoard logs ".EasyOCR/", "backups/", ] def __init__(self, config): """ Args: config: كائن Config من config.py """ self.config = config self.status_path = config.sync_status_path self.lock = FileLock(config.lock_file_path, timeout=config.sync_lock_timeout) self._device_id = self._generate_device_id() def _generate_device_id(self) -> str: """إنشاء معرف فريد للجهاز الحالي""" import uuid try: mac = uuid.getnode() return f"{socket.gethostname()}-{mac:012x}" except Exception: return f"device-{uuid.uuid4().hex[:8]}" @property def device_id(self) -> str: """معرف الجهاز الحالي""" return self._device_id def get_status(self) -> dict: """ قراءة حالة المزامنة الحالية. يُرجع dict يحتوي على آخر تحديث من كل جهاز. """ if not os.path.exists(self.status_path): return { "last_sync": None, "devices": {}, "current_device": self._device_id, "conflicts": [], } try: with open(self.status_path, "r", encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, KeyError): return { "last_sync": None, "devices": {}, "current_device": self._device_id, "conflicts": [], } def update_device_status(self, action: str = "process", details: dict = None) -> None: """ تحديث حالة الجهاز الحالي بعد كل عملية مهمة. يُكتب تلقائياً عند: معالجة PDF، مراجعة كلمات، تصحيح، تصدير. Args: action: نوع العملية (process, review, correct, export, sync) details: تفاصيل إضافية اختيارية """ status = self.get_status() now = datetime.now().isoformat() if "devices" not in status: status["devices"] = {} device_info = status["devices"].get(self._device_id, {}) device_info.update({ "hostname": socket.gethostname(), "platform": platform.system(), "last_action": action, "last_update": now, "details": details or {}, }) status["devices"][self._device_id] = device_info status["current_device"] = self._device_id status["last_sync"] = now # حفظ الحالة os.makedirs(os.path.dirname(self.status_path), exist_ok=True) with open(self.status_path, "w", encoding="utf-8") as f: json.dump(status, f, indent=2, ensure_ascii=False) logger.debug(f"تم تحديث حالة الجهاز: {action}") def detect_conflicts(self) -> list: """ كشف التعارضات بين الأجهزة. يتحقق مما إذا كان أكثر من جهاز قد عدّل نفس البيانات. """ status = self.get_status() conflicts = [] if not status.get("devices"): return conflicts devices = status["devices"] device_ids = list(devices.keys()) # التحقق من تعارضات المعالجة for i, d1 in enumerate(device_ids): for d2 in device_ids[i + 1:]: dev1 = devices[d1] dev2 = devices[d2] # إذا كلاهما عملا على نفس الملف في وقت قريب if (dev1.get("last_action") == "process" and dev2.get("last_action") == "process"): t1 = dev1.get("last_update", "") t2 = dev2.get("last_update", "") if t1 and t2: try: dt1 = datetime.fromisoformat(t1) dt2 = datetime.fromisoformat(t2) diff = abs((dt1 - dt2).total_seconds()) if diff < 300: # أقل من 5 دقائق conflicts.append({ "type": "concurrent_processing", "devices": [d1, d2], "time_diff_sec": diff, "message": ( "كلا الجهازين عالجا نفس الملف في وقت قريب. " "قد تحتاج لإعادة المعالجة." ), }) except (ValueError, TypeError): pass status["conflicts"] = conflicts return conflicts def get_syncthing_config(self) -> dict: """ توليد إعدادات Syncthing للمشروع. يُرجع JSON جاهز للاستخدام أو العرض. """ root = Path(self.config.project_root) sync_folders = [] for pattern in self.SYNC_PATTERNS: full_path = root / pattern if full_path.exists() or pattern.endswith("/"): sync_folders.append({ "path": pattern, "exists": full_path.exists(), "type": "folder" if pattern.endswith("/") else "file", }) ignore_folders = [] for pattern in self.NO_SYNC_PATTERNS: ignore_folders.append({ "path": pattern, "reason": "large_files" if "models" in pattern or "runs" in pattern else "cache", }) return { "project_name": "HandwrittenOCR", "project_root": str(root), "sync_folders": sync_folders, "ignore_folders": ignore_folders, "setup_instructions": { "linux": ( "sudo pacman -S syncthing # Manjaro/Arch\n" "systemctl --user enable --now syncthing\n" f"# ثم أضف المجلد: {root}\n" "# افتح الواجهة: http://127.0.0.1:8384" ), "android": ( "1. نزّل Syncthing من F-Droid أو Google Play\n" "2. امنح صلاحية الوصول لمجلد المشروع\n" "3. اقترن مع الحاسوب عبر QR Code\n" "4. فعّل 'Sync when on Wi-Fi only'" ), }, } def generate_syncthing_stignore(self) -> str: """توليد محتوى ملف .stignore لـ Syncthing""" lines = ["# HandwrittenOCR - Syncthing ignore patterns", ""] for pattern in self.NO_SYNC_PATTERNS: lines.append(pattern) lines.append("") lines.append("// temp files") lines.append("*.pyc") lines.append("__pycache__/") lines.append(".git/") lines.append("*.lock") lines.append("ocr_env/") return "\n".join(lines) def get_network_info(self) -> dict: """الحصول على معلومات الشبكة المحلية (للوصول من الجوال)""" try: hostname = socket.gethostname() local_ip = socket.gethostbyname(hostname) # محاولة الحصول على IP الشبكة المحلية بشكل أدق s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: s.connect(("8.8.8.8", 80)) local_ip = s.getsockname()[0] finally: s.close() return { "hostname": hostname, "local_ip": local_ip, "server_url": f"http://{local_ip}:{self.config.gradio_port}", "api_url": f"http://{local_ip}:8000", } except Exception as e: return { "hostname": socket.gethostname(), "local_ip": "127.0.0.1", "server_url": f"http://127.0.0.1:{self.config.gradio_port}", "api_url": "http://127.0.0.1:8000", "error": str(e), } # ========================================================================= # 3. Context Manager مريح للاستخدام # ========================================================================= @contextmanager def sync_lock(config, action: str = "process", details: dict = None): """ Context Manager يجمع بين القفل وتحديث حالة المزامنة. الاستخدام: with sync_lock(config, action="process", details={"words": 150}): processor.process() """ sync_mgr = SyncManager(config) # كشف التعارضات قبل البدء conflicts = sync_mgr.detect_conflicts() if conflicts: for conflict in conflicts: logger.warning(f"تعارض مزامنة: {conflict['message']}") # الحصول على القفل lock = FileLock(config.lock_file_path, timeout=config.sync_lock_timeout) lock.acquire() try: yield sync_mgr # تحديث حالة المزامنة بنجاح sync_mgr.update_device_status(action=action, details=details) except Exception as e: # تحديث حالة المزامنة بالفشل sync_mgr.update_device_status( action=f"{action}_failed", details={"error": str(e)} ) raise finally: lock.release()