MorphGuard / src /core.py
juanquy's picture
Initial clean commit of modular MorphGuard
2978bba
Raw
History Blame Contribute Delete
16.6 kB
"""
Core initialization and configuration for MorphGuard.
This module provides centralized initialization of all major components and
manages global application state.
"""
import os
import logging
import json
from typing import Dict, Any, Optional, List, Type
import threading
from datetime import datetime
# Import our custom modules
from error_handling import MGError, ErrorCode, ErrorSeverity, ErrorCategory
from telemetry import get_telemetry, TelemetryManager, EventCategory
from status_tracker import get_tracker, StatusTracker, JobStatus, JobType
class MorphGuardCore:
"""
Central core class for MorphGuard application.
This class initializes and manages access to all major components:
- Configuration
- Telemetry & Logging
- Status Tracking
- Error Handling
"""
def __init__(
self,
config_file: Optional[str] = None,
log_dir: Optional[str] = None,
telemetry_dir: Optional[str] = None,
status_dir: Optional[str] = None,
debug: bool = False
):
"""
Initialize the core system.
Args:
config_file: Path to configuration file
log_dir: Directory for log files
telemetry_dir: Directory for telemetry data
status_dir: Directory for status data
debug: Whether to enable debug mode
"""
self.initialized = False
self.startup_time = datetime.utcnow()
self.debug = debug
# Set default directories
self.base_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
self.log_dir = log_dir or os.path.join(self.base_dir, "logs")
self.telemetry_dir = telemetry_dir or os.path.join(self.base_dir, "telemetry")
self.status_dir = status_dir or os.path.join(self.base_dir, "status")
self.config_file = config_file or os.path.join(self.base_dir, "config.json")
# Create directories if they don't exist
os.makedirs(self.log_dir, exist_ok=True)
os.makedirs(self.telemetry_dir, exist_ok=True)
os.makedirs(self.status_dir, exist_ok=True)
# Load configuration
self.config = self._load_config()
# Initialize telemetry
log_level = logging.DEBUG if self.debug else logging.INFO
self.telemetry = get_telemetry(
app_name="morphguard",
log_dir=self.log_dir,
telemetry_dir=self.telemetry_dir,
log_level=log_level,
console_logging=True,
file_logging=True,
enable_telemetry=self.config.get("telemetry", {}).get("enabled", True)
)
# Initialize status tracker
self.status_tracker = get_tracker(self.status_dir)
# Log initialization
self.telemetry.info(
"MorphGuard core initialized",
category=EventCategory.SYSTEM,
context={
"debug_mode": self.debug,
"config_file": self.config_file,
"base_dir": self.base_dir
}
)
self.initialized = True
def _load_config(self) -> Dict[str, Any]:
"""
Load configuration from file.
Returns:
Configuration dictionary
"""
# Default configuration
default_config = {
"app": {
"name": "MorphGuard",
"version": "1.0.0",
"environment": "development"
},
"api": {
"host": "localhost",
"port": 5000,
"debug": self.debug,
"timeout": 30,
"max_content_length": 16 * 1024 * 1024 # 16MB
},
"telemetry": {
"enabled": True,
"export_metrics": False,
"system_metrics_interval": 60
},
"storage": {
"upload_dir": os.path.join(self.base_dir, "uploads"),
"results_dir": os.path.join(self.base_dir, "results"),
"max_file_size": 10 * 1024 * 1024 # 10MB
},
"security": {
"enable_auth": False,
"token_expiry": 3600, # 1 hour
"refresh_token_expiry": 2592000 # 30 days
},
"processing": {
"max_workers": 4,
"timeout": 300, # 5 minutes
"detect_batch_size": 16,
"demorph_batch_size": 4
},
"models": {
"detector_path": os.path.join(self.base_dir, "models/detector"),
"demorpher_path": os.path.join(self.base_dir, "models/demorpher"),
"face_recognition_path": os.path.join(self.base_dir, "models/face_recognition"),
"liveness_path": os.path.join(self.base_dir, "models/liveness")
}
}
# Try to load configuration from file
config = default_config
try:
if os.path.exists(self.config_file):
with open(self.config_file, "r") as f:
file_config = json.load(f)
# Deep merge configurations
self._deep_merge(config, file_config)
except Exception as e:
if self.debug:
print(f"Error loading configuration: {e}")
return config
def _deep_merge(self, target: Dict[str, Any], source: Dict[str, Any]) -> None:
"""
Deep merge source dictionary into target dictionary.
Args:
target: Target dictionary
source: Source dictionary
"""
for key, value in source.items():
if key in target and isinstance(target[key], dict) and isinstance(value, dict):
self._deep_merge(target[key], value)
else:
target[key] = value
def get_config(self, section: Optional[str] = None, key: Optional[str] = None) -> Any:
"""
Get configuration value.
Args:
section: Configuration section
key: Configuration key within section
Returns:
Configuration value
"""
if section is None:
return self.config
if key is None:
return self.config.get(section, {})
return self.config.get(section, {}).get(key)
def set_config(self, section: str, key: str, value: Any) -> None:
"""
Set configuration value.
Args:
section: Configuration section
key: Configuration key within section
value: Configuration value
"""
if section not in self.config:
self.config[section] = {}
self.config[section][key] = value
def save_config(self) -> None:
"""Save configuration to file."""
try:
with open(self.config_file, "w") as f:
json.dump(self.config, f, indent=2)
except Exception as e:
self.telemetry.error(
f"Failed to save configuration: {e}",
category=EventCategory.CONFIGURATION,
context={"config_file": self.config_file}
)
def create_job(
self,
job_type: JobType,
description: str,
user_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
) -> str:
"""
Create a new processing job.
Args:
job_type: Type of job
description: Description of the job
user_id: ID of the user who created the job
metadata: Additional metadata for the job
Returns:
Job ID
"""
job_id = self.status_tracker.create_job(
job_type=job_type,
description=description,
user_id=user_id,
metadata=metadata
)
self.telemetry.info(
f"Created {job_type.value} job: {description}",
category=EventCategory.JOB,
context={
"job_id": job_id,
"job_type": job_type.value,
"user_id": user_id
}
)
return job_id
def start_job(self, job_id: str) -> None:
"""
Start a job.
Args:
job_id: ID of the job to start
"""
try:
self.status_tracker.start_job(job_id)
job = self.status_tracker.get_job_status(job_id)
self.telemetry.info(
f"Started job: {job['description']}",
category=EventCategory.JOB,
context={
"job_id": job_id,
"job_type": job["type"]
}
)
except Exception as e:
self.telemetry.error(
f"Failed to start job {job_id}: {e}",
category=EventCategory.JOB,
context={"job_id": job_id}
)
raise
def update_job_progress(self, job_id: str, progress: float, message: Optional[str] = None) -> None:
"""
Update job progress.
Args:
job_id: ID of the job
progress: Progress value (0-1)
message: Optional progress message
"""
try:
self.status_tracker.update_progress(job_id, progress, message)
# Only log if message is provided or every 20% progress
if message or int(progress * 5) > int(self.status_tracker.get_job_status(job_id).get("progress", 0) * 5):
self.telemetry.debug(
f"Job {job_id} progress: {progress:.1%}" + (f" - {message}" if message else ""),
category=EventCategory.JOB,
context={"job_id": job_id, "progress": progress}
)
except Exception as e:
self.telemetry.warning(
f"Failed to update job progress {job_id}: {e}",
category=EventCategory.JOB,
context={"job_id": job_id, "progress": progress}
)
def complete_job(
self,
job_id: str,
results: Optional[Dict[str, Any]] = None,
message: Optional[str] = None
) -> None:
"""
Complete a job.
Args:
job_id: ID of the job
results: Job results
message: Optional completion message
"""
try:
self.status_tracker.complete_job(job_id, results, message)
job = self.status_tracker.get_job_status(job_id)
self.telemetry.info(
f"Completed job: {job['description']}" + (f" - {message}" if message else ""),
category=EventCategory.JOB,
context={
"job_id": job_id,
"job_type": job["type"],
"duration": job["completed_at"] - job["started_at"] if job["started_at"] else None
}
)
# Record job duration metric
if job["started_at"]:
duration = job["completed_at"] - job["started_at"]
self.telemetry.timer(
f"job.{job['type']}.duration",
duration,
tags={"job_type": job["type"]}
)
except Exception as e:
self.telemetry.error(
f"Failed to complete job {job_id}: {e}",
category=EventCategory.JOB,
context={"job_id": job_id}
)
raise
def fail_job(self, job_id: str, error: Union[str, Exception], error_details: Optional[Dict[str, Any]] = None) -> None:
"""
Mark a job as failed.
Args:
job_id: ID of the job
error: Error message or exception
error_details: Additional error details
"""
try:
error_message = str(error)
if isinstance(error, MGError):
if error_details is None:
error_details = {}
error_details["error_code"] = error.code
error_details["error_category"] = error.category
if error.details:
if "error_details" not in error_details:
error_details["error_details"] = {}
error_details["error_details"].update(error.details)
self.status_tracker.fail_job(job_id, error_message, error_details)
job = self.status_tracker.get_job_status(job_id)
self.telemetry.error(
f"Job failed: {job['description']} - {error_message}",
category=EventCategory.JOB,
context={
"job_id": job_id,
"job_type": job["type"],
"error": error_message,
"error_details": error_details
},
exc_info=isinstance(error, Exception)
)
except Exception as e:
self.telemetry.error(
f"Failed to mark job {job_id} as failed: {e}",
category=EventCategory.JOB,
context={"job_id": job_id, "original_error": str(error)}
)
raise
def get_job_status(self, job_id: str) -> Dict[str, Any]:
"""
Get job status.
Args:
job_id: ID of the job
Returns:
Job status data
"""
return self.status_tracker.get_job_status(job_id)
def get_jobs_by_status(self, status: JobStatus) -> List[Dict[str, Any]]:
"""
Get jobs by status.
Args:
status: Job status
Returns:
List of job data
"""
return self.status_tracker.get_jobs_by_status(status)
def get_jobs_by_user(self, user_id: str) -> List[Dict[str, Any]]:
"""
Get jobs by user.
Args:
user_id: User ID
Returns:
List of job data
"""
return self.status_tracker.get_jobs_by_user(user_id)
def get_all_jobs(self) -> List[Dict[str, Any]]:
"""
Get all jobs.
Returns:
List of all job data
"""
return self.status_tracker.get_all_jobs()
def handle_error(self, error: Exception, context: Optional[Dict[str, Any]] = None) -> MGError:
"""
Handle an exception and convert it to a MGError.
Args:
error: Exception to handle
context: Additional context for the error
Returns:
MGError instance
"""
from error_handling import handle_exception
# Add context to telemetry
if context:
self.telemetry.set_context(**context)
mg_error = handle_exception(error, logger=self.telemetry.logger)
# Clear context
if context:
self.telemetry.clear_context(*context.keys())
return mg_error
def shutdown(self) -> None:
"""Shutdown the core system."""
self.telemetry.info("Shutting down MorphGuard core", category=EventCategory.SYSTEM)
self.telemetry.shutdown()
# Create a singleton instance
_instance = None
def get_core(
config_file: Optional[str] = None,
log_dir: Optional[str] = None,
telemetry_dir: Optional[str] = None,
status_dir: Optional[str] = None,
debug: bool = False
) -> MorphGuardCore:
"""
Get the global core instance.
Args:
config_file: Path to configuration file
log_dir: Directory for log files
telemetry_dir: Directory for telemetry data
status_dir: Directory for status data
debug: Whether to enable debug mode
Returns:
MorphGuardCore instance
"""
global _instance
if _instance is None:
_instance = MorphGuardCore(
config_file=config_file,
log_dir=log_dir,
telemetry_dir=telemetry_dir,
status_dir=status_dir,
debug=debug
)
return _instance
# Import missing dependencies
from typing import Union