""" وحدة سجل التدقيق (Audit Logger Module) ========================================= تسجيل جميع العمليات المهمة في النظام (من قام بماذا ومتى). يدعم: تسجيل في ملف، قاعدة بيانات، و Redis. Audit logging module for tracking all important system operations. Supports: file logging, database logging, and Redis logging. OmniFile AI Processor v3.0 المصدر: اقتراح من مراجعة Mistral - 2026-05-03 """ import json import logging import os import time import uuid from datetime import datetime, timezone from enum import Enum from typing import Any, Optional logger = logging.getLogger(__name__) class AuditAction(str, Enum): """أنواع العمليات المدققة / Types of audited actions.""" # OCR Operations OCR_PROCESS = "ocr_process" OCR_CORRECT = "ocr_correct" OCR_FUSION = "ocr_fusion" # NLP Operations NLP_TRANSLATE = "nlp_translate" NLP_SUMMARIZE = "nlp_summarize" NLP_SPELL_CHECK = "nlp_spell_check" NLP_NER = "nlp_ner" NLP_CLASSIFY = "nlp_classify" # Security Operations SECURITY_ENCRYPT = "security_encrypt" SECURITY_DECRYPT = "security_decrypt" SECURITY_SCAN = "security_scan" SECURITY_PII_DETECT = "security_pii_detect" # File Operations FILE_UPLOAD = "file_upload" FILE_DOWNLOAD = "file_download" FILE_DELETE = "file_delete" FILE_EXPORT = "file_export" # System Operations SYSTEM_LOGIN = "system_login" SYSTEM_LOGOUT = "system_logout" SYSTEM_CONFIG_CHANGE = "system_config_change" SYSTEM_ERROR = "system_error" # AI Operations AI_CORRECT = "ai_correct" AI_REFINE = "ai_refine" class AuditLevel(str, Enum): """مستوى أهمية العملية / Operation severity level.""" INFO = "info" WARNING = "warning" CRITICAL = "critical" SECURITY = "security" class AuditLogger: """ سجل تدقيق شامل للنظام. Comprehensive system audit logger. الميزات / Features: - تسجيل كل عملية مع التفاصيل الكاملة - دعم إخراج متعدد (ملف، Redis، قاعدة بيانات) - تحديد المستخدم وعنوان IP - تصفية حسب المستوى والنوع - دعم البحث والاستعلام """ def __init__( self, log_file: Optional[str] = None, redis_url: Optional[str] = None, enable_file: bool = True, enable_redis: bool = False, enable_db: bool = False, ): """ تهيئة سجل التدقيق. Args: log_file: مسار ملف السجل (الافتراضي: logs/audit.log) redis_url: عنوان Redis لتخزين السجلات enable_file: تفعيل تسجيل في ملف enable_redis: تفعيل تسجيل في Redis enable_db: تفعيل تسجيل في قاعدة البيانات """ self.enable_file = enable_file self.enable_redis = enable_redis self.enable_db = enable_db self._redis = None self._redis_list = "omnifile:audit_log" # إعداد ملف السجل self.log_file = log_file or os.path.join("logs", "audit.log") if self.enable_file: os.makedirs(os.path.dirname(self.log_file), exist_ok=True) # إعداد Redis if self.enable_redis and redis_url: self._init_redis(redis_url) def _init_redis(self, redis_url: str) -> None: """تهيئة اتصال Redis.""" try: import redis self._redis = redis.from_url(redis_url, decode_responses=True) self._redis.ping() logger.info("تم الاتصال بـ Redis للسجل التدقيقي") except ImportError: logger.warning("مكتبة redis غير مثبتة") self.enable_redis = False except Exception as e: logger.warning("فشل الاتصال بـ Redis: %s", e) self.enable_redis = False def log( self, action: AuditAction, level: AuditLevel = AuditLevel.INFO, user: Optional[str] = None, ip_address: Optional[str] = None, details: Optional[dict] = None, status: str = "success", duration_ms: Optional[float] = None, resource: Optional[str] = None, ) -> dict: """ تسجيل عملية في السجل التدقيقي. Args: action: نوع العملية level: مستوى الأهمية user: اسم المستخدم (إن وجد) ip_address: عنوان IP details: تفاصيل إضافية status: حالة العملية (success, failed, pending) duration_ms: مدة التنفيذ بالميلي ثانية resource: المورد المتعلق (مثلاً: اسم الملف) Returns: قاموس السجل المدخل """ entry = { "id": str(uuid.uuid4())[:12], "timestamp": datetime.now(timezone.utc).isoformat(), "action": action.value, "level": level.value, "user": user or "anonymous", "ip_address": ip_address or "unknown", "details": details or {}, "status": status, "duration_ms": round(duration_ms, 2) if duration_ms else None, "resource": resource, "version": "3.0.0", } # تسجيل في ملف if self.enable_file: self._log_to_file(entry) # تسجيل في Redis if self.enable_redis and self._redis: self._log_to_redis(entry) # تسجيل في Python logger self._log_to_logger(entry) return entry def _log_to_file(self, entry: dict) -> None: """كتابة السجل في ملف.""" try: with open(self.log_file, "a", encoding="utf-8") as f: f.write(json.dumps(entry, ensure_ascii=False) + "\n") except Exception as e: logger.error("فشل كتابة السجل التدقيقي: %s", e) def _log_to_redis(self, entry: dict) -> None: """كتابة السجل في Redis.""" try: self._redis.lpush(self._redis_list, json.dumps(entry, ensure_ascii=False)) # الاحتفاظ بآخر 10000 سجل فقط self._redis.ltrim(self._redis_list, 0, 9999) except Exception as e: logger.error("فشل كتابة السجل في Redis: %s", e) def _log_to_logger(self, entry: dict) -> None: """كتابة السجل في Python logger.""" msg = ( f"[AUDIT] {entry['action']} | " f"user={entry['user']} | " f"ip={entry['ip_address']} | " f"status={entry['status']}" ) if entry.get("duration_ms"): msg += f" | duration={entry['duration_ms']}ms" if entry.get("resource"): msg += f" | resource={entry['resource']}" if entry["level"] == AuditLevel.CRITICAL.value: logger.critical(msg) elif entry["level"] == AuditLevel.SECURITY.value: logger.warning(msg) elif entry["level"] == AuditLevel.WARNING.value: logger.warning(msg) else: logger.info(msg) def get_logs( self, action: Optional[str] = None, user: Optional[str] = None, level: Optional[str] = None, limit: int = 100, ) -> list[dict]: """ استعلام السجلات. Args: action: تصفية حسب نوع العملية user: تصفية حسب المستخدم level: تصفية حسب المستوى limit: أقصى عدد من السجلات Returns: قائمة السجلات """ logs = [] # قراءة من Redis أولاً (أسرع) if self.enable_redis and self._redis: try: raw_logs = self._redis.lrange(self._redis_list, 0, limit - 1) logs = [json.loads(log) for log in raw_logs] except Exception as e: logger.error("فشل قراءة السجلات من Redis: %s", e) # قراءة من ملف كاحتياط if not logs and self.enable_file: try: with open(self.log_file, "r", encoding="utf-8") as f: lines = f.readlines()[-limit:] logs = [json.loads(line.strip()) for line in lines if line.strip()] except FileNotFoundError: pass except Exception as e: logger.error("فشل قراءة السجلات من الملف: %s", e) # تطبيق التصفية if action: logs = [l for l in logs if l.get("action") == action] if user: logs = [l for l in logs if l.get("user") == user] if level: logs = [l for l in logs if l.get("level") == level] return logs def get_stats(self) -> dict: """إحصائيات السجلات.""" logs = self.get_logs(limit=10000) stats = { "total_logs": len(logs), "by_action": {}, "by_level": {}, "by_status": {}, "by_user": {}, "errors_count": 0, } for log in logs: action = log.get("action", "unknown") level = log.get("level", "info") status = log.get("status", "unknown") user = log.get("user", "anonymous") stats["by_action"][action] = stats["by_action"].get(action, 0) + 1 stats["by_level"][level] = stats["by_level"].get(level, 0) + 1 stats["by_status"][status] = stats["by_status"].get(status, 0) + 1 stats["by_user"][user] = stats["by_user"].get(user, 0) + 1 if status == "failed": stats["errors_count"] += 1 return stats def clear_logs(self, older_than_days: int = 30) -> int: """ حذف السجلات القديمة. Args: older_than_days: حذف السجلات الأقدم من هذا العدد من الأيام Returns: عدد السجلات المحذوفة """ cutoff = time.time() - (older_than_days * 86400) deleted = 0 if self.enable_file and os.path.exists(self.log_file): try: with open(self.log_file, "r", encoding="utf-8") as f: lines = f.readlines() kept_lines = [] for line in lines: try: entry = json.loads(line.strip()) ts = datetime.fromisoformat(entry["timestamp"]).timestamp() if ts >= cutoff: kept_lines.append(line) else: deleted += 1 except (json.JSONDecodeError, KeyError): kept_lines.append(line) with open(self.log_file, "w", encoding="utf-8") as f: f.writelines(kept_lines) logger.info("تم حذف %d سجل تدقيقي قديم", deleted) except Exception as e: logger.error("فشل حذف السجلات القديمة: %s", e) return deleted # ============================================================================= # Global Audit Logger Instance # ============================================================================= def get_audit_logger() -> AuditLogger: """الحصول على مثيل سجل التدقيق العام.""" return AuditLogger( redis_url=os.getenv("REDIS_URL"), enable_redis=bool(os.getenv("REDIS_URL")), )