Spaces:
Running
Running
| """ | |
| Core initialization and configuration for MorphGuard. | |
| This module provides centralized initialization of all major components and | |
| manages global application state. | |
| """ | |
| import os | |
| import logging | |
| import json | |
| from typing import Dict, Any, Optional, List, Type | |
| import threading | |
| from datetime import datetime | |
| # Import our custom modules | |
| from error_handling import MGError, ErrorCode, ErrorSeverity, ErrorCategory | |
| from telemetry import get_telemetry, TelemetryManager, EventCategory | |
| from status_tracker import get_tracker, StatusTracker, JobStatus, JobType | |
| class MorphGuardCore: | |
| """ | |
| Central core class for MorphGuard application. | |
| This class initializes and manages access to all major components: | |
| - Configuration | |
| - Telemetry & Logging | |
| - Status Tracking | |
| - Error Handling | |
| """ | |
| def __init__( | |
| self, | |
| config_file: Optional[str] = None, | |
| log_dir: Optional[str] = None, | |
| telemetry_dir: Optional[str] = None, | |
| status_dir: Optional[str] = None, | |
| debug: bool = False | |
| ): | |
| """ | |
| Initialize the core system. | |
| Args: | |
| config_file: Path to configuration file | |
| log_dir: Directory for log files | |
| telemetry_dir: Directory for telemetry data | |
| status_dir: Directory for status data | |
| debug: Whether to enable debug mode | |
| """ | |
| self.initialized = False | |
| self.startup_time = datetime.utcnow() | |
| self.debug = debug | |
| # Set default directories | |
| self.base_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) | |
| self.log_dir = log_dir or os.path.join(self.base_dir, "logs") | |
| self.telemetry_dir = telemetry_dir or os.path.join(self.base_dir, "telemetry") | |
| self.status_dir = status_dir or os.path.join(self.base_dir, "status") | |
| self.config_file = config_file or os.path.join(self.base_dir, "config.json") | |
| # Create directories if they don't exist | |
| os.makedirs(self.log_dir, exist_ok=True) | |
| os.makedirs(self.telemetry_dir, exist_ok=True) | |
| os.makedirs(self.status_dir, exist_ok=True) | |
| # Load configuration | |
| self.config = self._load_config() | |
| # Initialize telemetry | |
| log_level = logging.DEBUG if self.debug else logging.INFO | |
| self.telemetry = get_telemetry( | |
| app_name="morphguard", | |
| log_dir=self.log_dir, | |
| telemetry_dir=self.telemetry_dir, | |
| log_level=log_level, | |
| console_logging=True, | |
| file_logging=True, | |
| enable_telemetry=self.config.get("telemetry", {}).get("enabled", True) | |
| ) | |
| # Initialize status tracker | |
| self.status_tracker = get_tracker(self.status_dir) | |
| # Log initialization | |
| self.telemetry.info( | |
| "MorphGuard core initialized", | |
| category=EventCategory.SYSTEM, | |
| context={ | |
| "debug_mode": self.debug, | |
| "config_file": self.config_file, | |
| "base_dir": self.base_dir | |
| } | |
| ) | |
| self.initialized = True | |
| def _load_config(self) -> Dict[str, Any]: | |
| """ | |
| Load configuration from file. | |
| Returns: | |
| Configuration dictionary | |
| """ | |
| # Default configuration | |
| default_config = { | |
| "app": { | |
| "name": "MorphGuard", | |
| "version": "1.0.0", | |
| "environment": "development" | |
| }, | |
| "api": { | |
| "host": "localhost", | |
| "port": 5000, | |
| "debug": self.debug, | |
| "timeout": 30, | |
| "max_content_length": 16 * 1024 * 1024 # 16MB | |
| }, | |
| "telemetry": { | |
| "enabled": True, | |
| "export_metrics": False, | |
| "system_metrics_interval": 60 | |
| }, | |
| "storage": { | |
| "upload_dir": os.path.join(self.base_dir, "uploads"), | |
| "results_dir": os.path.join(self.base_dir, "results"), | |
| "max_file_size": 10 * 1024 * 1024 # 10MB | |
| }, | |
| "security": { | |
| "enable_auth": False, | |
| "token_expiry": 3600, # 1 hour | |
| "refresh_token_expiry": 2592000 # 30 days | |
| }, | |
| "processing": { | |
| "max_workers": 4, | |
| "timeout": 300, # 5 minutes | |
| "detect_batch_size": 16, | |
| "demorph_batch_size": 4 | |
| }, | |
| "models": { | |
| "detector_path": os.path.join(self.base_dir, "models/detector"), | |
| "demorpher_path": os.path.join(self.base_dir, "models/demorpher"), | |
| "face_recognition_path": os.path.join(self.base_dir, "models/face_recognition"), | |
| "liveness_path": os.path.join(self.base_dir, "models/liveness") | |
| } | |
| } | |
| # Try to load configuration from file | |
| config = default_config | |
| try: | |
| if os.path.exists(self.config_file): | |
| with open(self.config_file, "r") as f: | |
| file_config = json.load(f) | |
| # Deep merge configurations | |
| self._deep_merge(config, file_config) | |
| except Exception as e: | |
| if self.debug: | |
| print(f"Error loading configuration: {e}") | |
| return config | |
| def _deep_merge(self, target: Dict[str, Any], source: Dict[str, Any]) -> None: | |
| """ | |
| Deep merge source dictionary into target dictionary. | |
| Args: | |
| target: Target dictionary | |
| source: Source dictionary | |
| """ | |
| for key, value in source.items(): | |
| if key in target and isinstance(target[key], dict) and isinstance(value, dict): | |
| self._deep_merge(target[key], value) | |
| else: | |
| target[key] = value | |
| def get_config(self, section: Optional[str] = None, key: Optional[str] = None) -> Any: | |
| """ | |
| Get configuration value. | |
| Args: | |
| section: Configuration section | |
| key: Configuration key within section | |
| Returns: | |
| Configuration value | |
| """ | |
| if section is None: | |
| return self.config | |
| if key is None: | |
| return self.config.get(section, {}) | |
| return self.config.get(section, {}).get(key) | |
| def set_config(self, section: str, key: str, value: Any) -> None: | |
| """ | |
| Set configuration value. | |
| Args: | |
| section: Configuration section | |
| key: Configuration key within section | |
| value: Configuration value | |
| """ | |
| if section not in self.config: | |
| self.config[section] = {} | |
| self.config[section][key] = value | |
| def save_config(self) -> None: | |
| """Save configuration to file.""" | |
| try: | |
| with open(self.config_file, "w") as f: | |
| json.dump(self.config, f, indent=2) | |
| except Exception as e: | |
| self.telemetry.error( | |
| f"Failed to save configuration: {e}", | |
| category=EventCategory.CONFIGURATION, | |
| context={"config_file": self.config_file} | |
| ) | |
| def create_job( | |
| self, | |
| job_type: JobType, | |
| description: str, | |
| user_id: Optional[str] = None, | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> str: | |
| """ | |
| Create a new processing job. | |
| Args: | |
| job_type: Type of job | |
| description: Description of the job | |
| user_id: ID of the user who created the job | |
| metadata: Additional metadata for the job | |
| Returns: | |
| Job ID | |
| """ | |
| job_id = self.status_tracker.create_job( | |
| job_type=job_type, | |
| description=description, | |
| user_id=user_id, | |
| metadata=metadata | |
| ) | |
| self.telemetry.info( | |
| f"Created {job_type.value} job: {description}", | |
| category=EventCategory.JOB, | |
| context={ | |
| "job_id": job_id, | |
| "job_type": job_type.value, | |
| "user_id": user_id | |
| } | |
| ) | |
| return job_id | |
| def start_job(self, job_id: str) -> None: | |
| """ | |
| Start a job. | |
| Args: | |
| job_id: ID of the job to start | |
| """ | |
| try: | |
| self.status_tracker.start_job(job_id) | |
| job = self.status_tracker.get_job_status(job_id) | |
| self.telemetry.info( | |
| f"Started job: {job['description']}", | |
| category=EventCategory.JOB, | |
| context={ | |
| "job_id": job_id, | |
| "job_type": job["type"] | |
| } | |
| ) | |
| except Exception as e: | |
| self.telemetry.error( | |
| f"Failed to start job {job_id}: {e}", | |
| category=EventCategory.JOB, | |
| context={"job_id": job_id} | |
| ) | |
| raise | |
| def update_job_progress(self, job_id: str, progress: float, message: Optional[str] = None) -> None: | |
| """ | |
| Update job progress. | |
| Args: | |
| job_id: ID of the job | |
| progress: Progress value (0-1) | |
| message: Optional progress message | |
| """ | |
| try: | |
| self.status_tracker.update_progress(job_id, progress, message) | |
| # Only log if message is provided or every 20% progress | |
| if message or int(progress * 5) > int(self.status_tracker.get_job_status(job_id).get("progress", 0) * 5): | |
| self.telemetry.debug( | |
| f"Job {job_id} progress: {progress:.1%}" + (f" - {message}" if message else ""), | |
| category=EventCategory.JOB, | |
| context={"job_id": job_id, "progress": progress} | |
| ) | |
| except Exception as e: | |
| self.telemetry.warning( | |
| f"Failed to update job progress {job_id}: {e}", | |
| category=EventCategory.JOB, | |
| context={"job_id": job_id, "progress": progress} | |
| ) | |
| def complete_job( | |
| self, | |
| job_id: str, | |
| results: Optional[Dict[str, Any]] = None, | |
| message: Optional[str] = None | |
| ) -> None: | |
| """ | |
| Complete a job. | |
| Args: | |
| job_id: ID of the job | |
| results: Job results | |
| message: Optional completion message | |
| """ | |
| try: | |
| self.status_tracker.complete_job(job_id, results, message) | |
| job = self.status_tracker.get_job_status(job_id) | |
| self.telemetry.info( | |
| f"Completed job: {job['description']}" + (f" - {message}" if message else ""), | |
| category=EventCategory.JOB, | |
| context={ | |
| "job_id": job_id, | |
| "job_type": job["type"], | |
| "duration": job["completed_at"] - job["started_at"] if job["started_at"] else None | |
| } | |
| ) | |
| # Record job duration metric | |
| if job["started_at"]: | |
| duration = job["completed_at"] - job["started_at"] | |
| self.telemetry.timer( | |
| f"job.{job['type']}.duration", | |
| duration, | |
| tags={"job_type": job["type"]} | |
| ) | |
| except Exception as e: | |
| self.telemetry.error( | |
| f"Failed to complete job {job_id}: {e}", | |
| category=EventCategory.JOB, | |
| context={"job_id": job_id} | |
| ) | |
| raise | |
| def fail_job(self, job_id: str, error: Union[str, Exception], error_details: Optional[Dict[str, Any]] = None) -> None: | |
| """ | |
| Mark a job as failed. | |
| Args: | |
| job_id: ID of the job | |
| error: Error message or exception | |
| error_details: Additional error details | |
| """ | |
| try: | |
| error_message = str(error) | |
| if isinstance(error, MGError): | |
| if error_details is None: | |
| error_details = {} | |
| error_details["error_code"] = error.code | |
| error_details["error_category"] = error.category | |
| if error.details: | |
| if "error_details" not in error_details: | |
| error_details["error_details"] = {} | |
| error_details["error_details"].update(error.details) | |
| self.status_tracker.fail_job(job_id, error_message, error_details) | |
| job = self.status_tracker.get_job_status(job_id) | |
| self.telemetry.error( | |
| f"Job failed: {job['description']} - {error_message}", | |
| category=EventCategory.JOB, | |
| context={ | |
| "job_id": job_id, | |
| "job_type": job["type"], | |
| "error": error_message, | |
| "error_details": error_details | |
| }, | |
| exc_info=isinstance(error, Exception) | |
| ) | |
| except Exception as e: | |
| self.telemetry.error( | |
| f"Failed to mark job {job_id} as failed: {e}", | |
| category=EventCategory.JOB, | |
| context={"job_id": job_id, "original_error": str(error)} | |
| ) | |
| raise | |
| def get_job_status(self, job_id: str) -> Dict[str, Any]: | |
| """ | |
| Get job status. | |
| Args: | |
| job_id: ID of the job | |
| Returns: | |
| Job status data | |
| """ | |
| return self.status_tracker.get_job_status(job_id) | |
| def get_jobs_by_status(self, status: JobStatus) -> List[Dict[str, Any]]: | |
| """ | |
| Get jobs by status. | |
| Args: | |
| status: Job status | |
| Returns: | |
| List of job data | |
| """ | |
| return self.status_tracker.get_jobs_by_status(status) | |
| def get_jobs_by_user(self, user_id: str) -> List[Dict[str, Any]]: | |
| """ | |
| Get jobs by user. | |
| Args: | |
| user_id: User ID | |
| Returns: | |
| List of job data | |
| """ | |
| return self.status_tracker.get_jobs_by_user(user_id) | |
| def get_all_jobs(self) -> List[Dict[str, Any]]: | |
| """ | |
| Get all jobs. | |
| Returns: | |
| List of all job data | |
| """ | |
| return self.status_tracker.get_all_jobs() | |
| def handle_error(self, error: Exception, context: Optional[Dict[str, Any]] = None) -> MGError: | |
| """ | |
| Handle an exception and convert it to a MGError. | |
| Args: | |
| error: Exception to handle | |
| context: Additional context for the error | |
| Returns: | |
| MGError instance | |
| """ | |
| from error_handling import handle_exception | |
| # Add context to telemetry | |
| if context: | |
| self.telemetry.set_context(**context) | |
| mg_error = handle_exception(error, logger=self.telemetry.logger) | |
| # Clear context | |
| if context: | |
| self.telemetry.clear_context(*context.keys()) | |
| return mg_error | |
| def shutdown(self) -> None: | |
| """Shutdown the core system.""" | |
| self.telemetry.info("Shutting down MorphGuard core", category=EventCategory.SYSTEM) | |
| self.telemetry.shutdown() | |
| # Create a singleton instance | |
| _instance = None | |
| def get_core( | |
| config_file: Optional[str] = None, | |
| log_dir: Optional[str] = None, | |
| telemetry_dir: Optional[str] = None, | |
| status_dir: Optional[str] = None, | |
| debug: bool = False | |
| ) -> MorphGuardCore: | |
| """ | |
| Get the global core instance. | |
| Args: | |
| config_file: Path to configuration file | |
| log_dir: Directory for log files | |
| telemetry_dir: Directory for telemetry data | |
| status_dir: Directory for status data | |
| debug: Whether to enable debug mode | |
| Returns: | |
| MorphGuardCore instance | |
| """ | |
| global _instance | |
| if _instance is None: | |
| _instance = MorphGuardCore( | |
| config_file=config_file, | |
| log_dir=log_dir, | |
| telemetry_dir=telemetry_dir, | |
| status_dir=status_dir, | |
| debug=debug | |
| ) | |
| return _instance | |
| # Import missing dependencies | |
| from typing import Union |