""" Error handling and validation utilities for the optimization system. """ import logging from typing import Dict, List, Optional, Any from datetime import datetime import json class OptimizationError(Exception): """Base exception for optimization errors.""" pass class DataValidationError(OptimizationError): """Raised when input data is invalid or malformed.""" pass class ConstraintViolationError(OptimizationError): """Raised when optimization constraints cannot be satisfied.""" pass class ConfigurationError(OptimizationError): """Raised when optimization configuration is invalid.""" pass class DataValidator: """Validates input data for optimization.""" REQUIRED_FIELDS = { 'trainset_status': ['trainset_id', 'operational_status'], 'fitness_certificates': ['trainset_id', 'department', 'status'], 'job_cards': ['trainset_id', 'priority', 'status'], 'component_health': ['trainset_id', 'component', 'status'] } # Accept both legacy and new backend formats VALID_STATUSES = { 'operational': [ # Legacy format 'Available', 'In-Service', 'Maintenance', 'Standby', 'Out-of-Order', # New backend format 'IN_SERVICE', 'STANDBY', 'MAINTENANCE', 'OUT_OF_SERVICE', 'TESTING' ], 'certificate': [ # Legacy format 'Valid', 'Expired', 'Expiring-Soon', 'Suspended', # New backend format 'PENDING', 'IN_PROGRESS', 'ISSUED', 'EXPIRED', 'SUSPENDED', 'REVOKED', 'RENEWED', 'CANCELLED' ], 'job': ['Open', 'In-Progress', 'Closed', 'Pending-Parts'], 'component': [ # Legacy format 'Good', 'Fair', 'Warning', 'Critical', # New backend format 'EXCELLENT', 'GOOD', 'FAIR', 'POOR', 'CRITICAL', 'FAILED' ] } # Mapping from backend format to internal format for optimization logic STATUS_MAPPINGS = { 'operational': { 'IN_SERVICE': 'In-Service', 'STANDBY': 'Standby', 'MAINTENANCE': 'Maintenance', 'OUT_OF_SERVICE': 'Out-of-Order', 'TESTING': 'Maintenance', # Treat testing as maintenance for optimization }, 'certificate': { 'PENDING': 'Expiring-Soon', 'IN_PROGRESS': 'Expiring-Soon', 'ISSUED': 'Valid', 'EXPIRED': 'Expired', 'SUSPENDED': 'Suspended', 'REVOKED': 'Expired', 'RENEWED': 'Valid', 'CANCELLED': 'Expired', }, 'component': { 'EXCELLENT': 'Good', 'GOOD': 'Good', 'FAIR': 'Fair', 'POOR': 'Warning', 'CRITICAL': 'Critical', 'FAILED': 'Critical', } } @classmethod def validate_data(cls, data: Dict) -> List[str]: """Validate input data structure and content. Returns: List of validation errors (empty if valid) """ errors = [] try: # Check required top-level keys (job_cards is now optional) required_keys = ['trainset_status', 'fitness_certificates', 'component_health'] optional_keys = ['job_cards'] for key in required_keys: if key not in data: errors.append(f"Missing required data section: {key}") continue if not isinstance(data[key], list): errors.append(f"Data section {key} must be a list") continue # Validate individual records section_errors = cls._validate_section(data[key], key) errors.extend(section_errors) # Validate optional keys if present for key in optional_keys: if key in data and data[key]: if not isinstance(data[key], list): errors.append(f"Data section {key} must be a list") continue section_errors = cls._validate_section(data[key], key) errors.extend(section_errors) # Cross-validation if not errors: # Only if basic structure is valid cross_errors = cls._cross_validate(data) errors.extend(cross_errors) except Exception as e: errors.append(f"Unexpected error during validation: {str(e)}") return errors @classmethod def _validate_section(cls, section_data: List[Dict], section_name: str) -> List[str]: """Validate a specific data section.""" errors = [] required_fields = cls.REQUIRED_FIELDS.get(section_name, []) for i, record in enumerate(section_data): if not isinstance(record, dict): errors.append(f"{section_name}[{i}]: Record must be a dictionary") continue # Check required fields for field in required_fields: if field not in record: errors.append(f"{section_name}[{i}]: Missing required field '{field}'") elif record[field] is None or record[field] == "": errors.append(f"{section_name}[{i}]: Field '{field}' cannot be empty") # Validate specific fields validation_errors = cls._validate_record_fields(record, section_name, i) errors.extend(validation_errors) return errors @classmethod def _validate_record_fields(cls, record: Dict, section_name: str, index: int) -> List[str]: """Validate specific fields in a record.""" errors = [] try: if section_name == 'trainset_status': if 'operational_status' in record: if record['operational_status'] not in cls.VALID_STATUSES['operational']: errors.append(f"{section_name}[{index}]: Invalid operational_status") if 'total_mileage_km' in record: if not isinstance(record['total_mileage_km'], (int, float)) or record['total_mileage_km'] < 0: errors.append(f"{section_name}[{index}]: total_mileage_km must be non-negative number") elif section_name == 'fitness_certificates': if 'status' in record: if record['status'] not in cls.VALID_STATUSES['certificate']: errors.append(f"{section_name}[{index}]: Invalid certificate status") # Validate dates for date_field in ['issue_date', 'expiry_date']: if date_field in record and record[date_field] is not None: try: datetime.fromisoformat(record[date_field]) except ValueError: errors.append(f"{section_name}[{index}]: Invalid {date_field} format") elif section_name == 'job_cards': if 'status' in record: if record['status'] not in cls.VALID_STATUSES['job']: errors.append(f"{section_name}[{index}]: Invalid job status") if 'priority' in record: if record['priority'] not in ['Critical', 'High', 'Medium', 'Low']: errors.append(f"{section_name}[{index}]: Invalid priority") elif section_name == 'component_health': if 'status' in record: if record['status'] not in cls.VALID_STATUSES['component']: errors.append(f"{section_name}[{index}]: Invalid component status") if 'wear_level' in record: if not isinstance(record['wear_level'], (int, float)) or not (0 <= record['wear_level'] <= 100): errors.append(f"{section_name}[{index}]: wear_level must be between 0-100") except Exception as e: errors.append(f"{section_name}[{index}]: Error validating record: {str(e)}") return errors @classmethod def _cross_validate(cls, data: Dict) -> List[str]: """Cross-validate data consistency across sections.""" errors = [] try: # Get all trainset IDs trainset_ids = {record['trainset_id'] for record in data['trainset_status']} # Check if all other sections reference valid trainset IDs for section_name in ['fitness_certificates', 'job_cards', 'component_health']: if section_name in data: for record in data[section_name]: if 'trainset_id' in record: if record['trainset_id'] not in trainset_ids: errors.append(f"{section_name}: References unknown trainset_id '{record['trainset_id']}'") # Check minimum data requirements if len(trainset_ids) < 10: errors.append("Insufficient trainsets for optimization (minimum 10 required)") # Count available trainsets (both legacy and new formats) available_statuses = {'Available', 'In-Service', 'Standby', 'IN_SERVICE', 'STANDBY'} available_trainsets = sum(1 for record in data['trainset_status'] if record.get('operational_status') in available_statuses) if available_trainsets < 15: errors.append(f"Insufficient available trainsets for optimization ({available_trainsets} available, need at least 15)") except Exception as e: errors.append(f"Error in cross-validation: {str(e)}") return errors class ErrorHandler: """Centralized error handling for optimization system.""" def __init__(self, log_file: Optional[str] = None): self.logger = self._setup_logger(log_file) def _setup_logger(self, log_file: Optional[str]) -> logging.Logger: """Setup logging configuration.""" logger = logging.getLogger('optimization') logger.setLevel(logging.INFO) # Clear existing handlers logger.handlers.clear() # Console handler console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') console_handler.setFormatter(console_format) logger.addHandler(console_handler) # File handler if specified if log_file: file_handler = logging.FileHandler(log_file) file_handler.setLevel(logging.DEBUG) file_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(file_format) logger.addHandler(file_handler) return logger def validate_and_prepare_data(self, data: Dict) -> Dict: """Validate data and prepare for optimization. Raises: DataValidationError: If data validation fails """ self.logger.info("Validating input data...") try: # Validate data structure validation_errors = DataValidator.validate_data(data) if validation_errors: error_msg = "Data validation failed:\n" + "\n".join(f" • {error}" for error in validation_errors) self.logger.error(error_msg) raise DataValidationError(error_msg) # Data cleanup and preparation cleaned_data = self._clean_data(data) self.logger.info("Data validation successful") return cleaned_data except DataValidationError: raise except Exception as e: self.logger.error(f"Unexpected error during data validation: {str(e)}") raise DataValidationError(f"Unexpected validation error: {str(e)}") def _clean_data(self, data: Dict) -> Dict: """Clean and prepare data for optimization.""" cleaned_data = data.copy() try: # Remove records with missing critical data for section_name in ['trainset_status', 'fitness_certificates', 'job_cards', 'component_health']: if section_name in cleaned_data: original_count = len(cleaned_data[section_name]) cleaned_data[section_name] = [ record for record in cleaned_data[section_name] if record.get('trainset_id') is not None ] removed_count = original_count - len(cleaned_data[section_name]) if removed_count > 0: self.logger.warning(f"Removed {removed_count} records from {section_name} due to missing trainset_id") # Ensure consistent trainset IDs across sections valid_trainset_ids = {record['trainset_id'] for record in cleaned_data['trainset_status']} for section_name in ['fitness_certificates', 'job_cards', 'component_health']: if section_name in cleaned_data: original_count = len(cleaned_data[section_name]) cleaned_data[section_name] = [ record for record in cleaned_data[section_name] if record.get('trainset_id') in valid_trainset_ids ] removed_count = original_count - len(cleaned_data[section_name]) if removed_count > 0: self.logger.warning(f"Removed {removed_count} records from {section_name} with invalid trainset_id") return cleaned_data except Exception as e: self.logger.error(f"Error during data cleaning: {str(e)}") raise DataValidationError(f"Data cleaning failed: {str(e)}") def handle_optimization_error(self, error: Exception, context: str = "") -> None: """Handle optimization errors with appropriate logging and re-raising.""" error_msg = f"Optimization error{' in ' + context if context else ''}: {str(error)}" if isinstance(error, (DataValidationError, ConstraintViolationError, ConfigurationError)): self.logger.error(error_msg) raise else: self.logger.exception(f"Unexpected {error_msg}") raise OptimizationError(f"Unexpected error: {str(error)}") def log_optimization_start(self, method: str, config: Any) -> None: """Log optimization start with parameters.""" self.logger.info(f"Starting optimization with method: {method}") if hasattr(config, '__dict__'): for key, value in config.__dict__.items(): self.logger.info(f" {key}: {value}") def log_optimization_result(self, result: Any, method: str) -> None: """Log optimization results.""" if hasattr(result, 'fitness_score') and hasattr(result, 'selected_trainsets'): self.logger.info(f"Optimization completed with {method}") self.logger.info(f" Fitness score: {result.fitness_score:.2f}") self.logger.info(f" Service trainsets: {len(result.selected_trainsets)}") self.logger.info(f" Standby trainsets: {len(getattr(result, 'standby_trainsets', []))}") self.logger.info(f" Maintenance trainsets: {len(getattr(result, 'maintenance_trainsets', []))}") else: self.logger.info(f"Optimization completed with {method}") def safe_optimize(data: Dict, method: str = 'ga', config: Any = None, log_file: Optional[str] = None, **kwargs) -> Any: """Safely run optimization with comprehensive error handling. Args: data: Input data dictionary method: Optimization method config: Optimization configuration log_file: Path to log file (optional) **kwargs: Additional method-specific parameters Returns: OptimizationResult Raises: OptimizationError: For any optimization-related errors """ error_handler = ErrorHandler(log_file) try: # Validate and prepare data cleaned_data = error_handler.validate_and_prepare_data(data) # Import here to avoid circular imports from .scheduler import optimize_trainset_schedule # Log optimization start error_handler.log_optimization_start(method, config) # Run optimization result = optimize_trainset_schedule(cleaned_data, method, config, **kwargs) # Log results error_handler.log_optimization_result(result, method) return result except (DataValidationError, ConstraintViolationError, ConfigurationError): raise except Exception as e: error_handler.handle_optimization_error(e, f"method={method}") # Usage example if __name__ == "__main__": # Test data validation test_data = { "trainset_status": [ {"trainset_id": "TS-001", "operational_status": "Available", "total_mileage_km": 150000}, {"trainset_id": "TS-002", "operational_status": "Invalid"} # This should cause an error ], "fitness_certificates": [ {"trainset_id": "TS-001", "department": "Rolling Stock", "status": "Valid"} ], "job_cards": [], "component_health": [] } # Validate data errors = DataValidator.validate_data(test_data) if errors: print("Validation errors found:") for error in errors: print(f" • {error}") else: print("✅ Data validation passed!") # Test error handler try: handler = ErrorHandler() cleaned_data = handler.validate_and_prepare_data(test_data) print("✅ Data preparation successful!") except DataValidationError as e: print(f"❌ Data validation failed: {e}")