""" Core initialization and configuration for MorphGuard. This module provides centralized initialization of all major components and manages global application state. """ import os import logging import json from typing import Dict, Any, Optional, List, Type import threading from datetime import datetime # Import our custom modules from error_handling import MGError, ErrorCode, ErrorSeverity, ErrorCategory from telemetry import get_telemetry, TelemetryManager, EventCategory from status_tracker import get_tracker, StatusTracker, JobStatus, JobType class MorphGuardCore: """ Central core class for MorphGuard application. This class initializes and manages access to all major components: - Configuration - Telemetry & Logging - Status Tracking - Error Handling """ def __init__( self, config_file: Optional[str] = None, log_dir: Optional[str] = None, telemetry_dir: Optional[str] = None, status_dir: Optional[str] = None, debug: bool = False ): """ Initialize the core system. Args: config_file: Path to configuration file log_dir: Directory for log files telemetry_dir: Directory for telemetry data status_dir: Directory for status data debug: Whether to enable debug mode """ self.initialized = False self.startup_time = datetime.utcnow() self.debug = debug # Set default directories self.base_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) self.log_dir = log_dir or os.path.join(self.base_dir, "logs") self.telemetry_dir = telemetry_dir or os.path.join(self.base_dir, "telemetry") self.status_dir = status_dir or os.path.join(self.base_dir, "status") self.config_file = config_file or os.path.join(self.base_dir, "config.json") # Create directories if they don't exist os.makedirs(self.log_dir, exist_ok=True) os.makedirs(self.telemetry_dir, exist_ok=True) os.makedirs(self.status_dir, exist_ok=True) # Load configuration self.config = self._load_config() # Initialize telemetry log_level = logging.DEBUG if self.debug else logging.INFO self.telemetry = get_telemetry( app_name="morphguard", log_dir=self.log_dir, telemetry_dir=self.telemetry_dir, log_level=log_level, console_logging=True, file_logging=True, enable_telemetry=self.config.get("telemetry", {}).get("enabled", True) ) # Initialize status tracker self.status_tracker = get_tracker(self.status_dir) # Log initialization self.telemetry.info( "MorphGuard core initialized", category=EventCategory.SYSTEM, context={ "debug_mode": self.debug, "config_file": self.config_file, "base_dir": self.base_dir } ) self.initialized = True def _load_config(self) -> Dict[str, Any]: """ Load configuration from file. Returns: Configuration dictionary """ # Default configuration default_config = { "app": { "name": "MorphGuard", "version": "1.0.0", "environment": "development" }, "api": { "host": "localhost", "port": 5000, "debug": self.debug, "timeout": 30, "max_content_length": 16 * 1024 * 1024 # 16MB }, "telemetry": { "enabled": True, "export_metrics": False, "system_metrics_interval": 60 }, "storage": { "upload_dir": os.path.join(self.base_dir, "uploads"), "results_dir": os.path.join(self.base_dir, "results"), "max_file_size": 10 * 1024 * 1024 # 10MB }, "security": { "enable_auth": False, "token_expiry": 3600, # 1 hour "refresh_token_expiry": 2592000 # 30 days }, "processing": { "max_workers": 4, "timeout": 300, # 5 minutes "detect_batch_size": 16, "demorph_batch_size": 4 }, "models": { "detector_path": os.path.join(self.base_dir, "models/detector"), "demorpher_path": os.path.join(self.base_dir, "models/demorpher"), "face_recognition_path": os.path.join(self.base_dir, "models/face_recognition"), "liveness_path": os.path.join(self.base_dir, "models/liveness") } } # Try to load configuration from file config = default_config try: if os.path.exists(self.config_file): with open(self.config_file, "r") as f: file_config = json.load(f) # Deep merge configurations self._deep_merge(config, file_config) except Exception as e: if self.debug: print(f"Error loading configuration: {e}") return config def _deep_merge(self, target: Dict[str, Any], source: Dict[str, Any]) -> None: """ Deep merge source dictionary into target dictionary. Args: target: Target dictionary source: Source dictionary """ for key, value in source.items(): if key in target and isinstance(target[key], dict) and isinstance(value, dict): self._deep_merge(target[key], value) else: target[key] = value def get_config(self, section: Optional[str] = None, key: Optional[str] = None) -> Any: """ Get configuration value. Args: section: Configuration section key: Configuration key within section Returns: Configuration value """ if section is None: return self.config if key is None: return self.config.get(section, {}) return self.config.get(section, {}).get(key) def set_config(self, section: str, key: str, value: Any) -> None: """ Set configuration value. Args: section: Configuration section key: Configuration key within section value: Configuration value """ if section not in self.config: self.config[section] = {} self.config[section][key] = value def save_config(self) -> None: """Save configuration to file.""" try: with open(self.config_file, "w") as f: json.dump(self.config, f, indent=2) except Exception as e: self.telemetry.error( f"Failed to save configuration: {e}", category=EventCategory.CONFIGURATION, context={"config_file": self.config_file} ) def create_job( self, job_type: JobType, description: str, user_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None ) -> str: """ Create a new processing job. Args: job_type: Type of job description: Description of the job user_id: ID of the user who created the job metadata: Additional metadata for the job Returns: Job ID """ job_id = self.status_tracker.create_job( job_type=job_type, description=description, user_id=user_id, metadata=metadata ) self.telemetry.info( f"Created {job_type.value} job: {description}", category=EventCategory.JOB, context={ "job_id": job_id, "job_type": job_type.value, "user_id": user_id } ) return job_id def start_job(self, job_id: str) -> None: """ Start a job. Args: job_id: ID of the job to start """ try: self.status_tracker.start_job(job_id) job = self.status_tracker.get_job_status(job_id) self.telemetry.info( f"Started job: {job['description']}", category=EventCategory.JOB, context={ "job_id": job_id, "job_type": job["type"] } ) except Exception as e: self.telemetry.error( f"Failed to start job {job_id}: {e}", category=EventCategory.JOB, context={"job_id": job_id} ) raise def update_job_progress(self, job_id: str, progress: float, message: Optional[str] = None) -> None: """ Update job progress. Args: job_id: ID of the job progress: Progress value (0-1) message: Optional progress message """ try: self.status_tracker.update_progress(job_id, progress, message) # Only log if message is provided or every 20% progress if message or int(progress * 5) > int(self.status_tracker.get_job_status(job_id).get("progress", 0) * 5): self.telemetry.debug( f"Job {job_id} progress: {progress:.1%}" + (f" - {message}" if message else ""), category=EventCategory.JOB, context={"job_id": job_id, "progress": progress} ) except Exception as e: self.telemetry.warning( f"Failed to update job progress {job_id}: {e}", category=EventCategory.JOB, context={"job_id": job_id, "progress": progress} ) def complete_job( self, job_id: str, results: Optional[Dict[str, Any]] = None, message: Optional[str] = None ) -> None: """ Complete a job. Args: job_id: ID of the job results: Job results message: Optional completion message """ try: self.status_tracker.complete_job(job_id, results, message) job = self.status_tracker.get_job_status(job_id) self.telemetry.info( f"Completed job: {job['description']}" + (f" - {message}" if message else ""), category=EventCategory.JOB, context={ "job_id": job_id, "job_type": job["type"], "duration": job["completed_at"] - job["started_at"] if job["started_at"] else None } ) # Record job duration metric if job["started_at"]: duration = job["completed_at"] - job["started_at"] self.telemetry.timer( f"job.{job['type']}.duration", duration, tags={"job_type": job["type"]} ) except Exception as e: self.telemetry.error( f"Failed to complete job {job_id}: {e}", category=EventCategory.JOB, context={"job_id": job_id} ) raise def fail_job(self, job_id: str, error: Union[str, Exception], error_details: Optional[Dict[str, Any]] = None) -> None: """ Mark a job as failed. Args: job_id: ID of the job error: Error message or exception error_details: Additional error details """ try: error_message = str(error) if isinstance(error, MGError): if error_details is None: error_details = {} error_details["error_code"] = error.code error_details["error_category"] = error.category if error.details: if "error_details" not in error_details: error_details["error_details"] = {} error_details["error_details"].update(error.details) self.status_tracker.fail_job(job_id, error_message, error_details) job = self.status_tracker.get_job_status(job_id) self.telemetry.error( f"Job failed: {job['description']} - {error_message}", category=EventCategory.JOB, context={ "job_id": job_id, "job_type": job["type"], "error": error_message, "error_details": error_details }, exc_info=isinstance(error, Exception) ) except Exception as e: self.telemetry.error( f"Failed to mark job {job_id} as failed: {e}", category=EventCategory.JOB, context={"job_id": job_id, "original_error": str(error)} ) raise def get_job_status(self, job_id: str) -> Dict[str, Any]: """ Get job status. Args: job_id: ID of the job Returns: Job status data """ return self.status_tracker.get_job_status(job_id) def get_jobs_by_status(self, status: JobStatus) -> List[Dict[str, Any]]: """ Get jobs by status. Args: status: Job status Returns: List of job data """ return self.status_tracker.get_jobs_by_status(status) def get_jobs_by_user(self, user_id: str) -> List[Dict[str, Any]]: """ Get jobs by user. Args: user_id: User ID Returns: List of job data """ return self.status_tracker.get_jobs_by_user(user_id) def get_all_jobs(self) -> List[Dict[str, Any]]: """ Get all jobs. Returns: List of all job data """ return self.status_tracker.get_all_jobs() def handle_error(self, error: Exception, context: Optional[Dict[str, Any]] = None) -> MGError: """ Handle an exception and convert it to a MGError. Args: error: Exception to handle context: Additional context for the error Returns: MGError instance """ from error_handling import handle_exception # Add context to telemetry if context: self.telemetry.set_context(**context) mg_error = handle_exception(error, logger=self.telemetry.logger) # Clear context if context: self.telemetry.clear_context(*context.keys()) return mg_error def shutdown(self) -> None: """Shutdown the core system.""" self.telemetry.info("Shutting down MorphGuard core", category=EventCategory.SYSTEM) self.telemetry.shutdown() # Create a singleton instance _instance = None def get_core( config_file: Optional[str] = None, log_dir: Optional[str] = None, telemetry_dir: Optional[str] = None, status_dir: Optional[str] = None, debug: bool = False ) -> MorphGuardCore: """ Get the global core instance. Args: config_file: Path to configuration file log_dir: Directory for log files telemetry_dir: Directory for telemetry data status_dir: Directory for status data debug: Whether to enable debug mode Returns: MorphGuardCore instance """ global _instance if _instance is None: _instance = MorphGuardCore( config_file=config_file, log_dir=log_dir, telemetry_dir=telemetry_dir, status_dir=status_dir, debug=debug ) return _instance # Import missing dependencies from typing import Union