""" Feature Engineering Controller Handles feature engineering operations and coordinates between service and API """ import logging from typing import Dict, Any, List, Optional from services.feature_engineering_service import FeatureEngineeringService from models.feature_engineering_model import WeatherFeatureModel from utils import create_error_response, create_success_response class FeatureEngineeringController: """Controller for weather feature engineering operations""" def __init__(self, feature_service: FeatureEngineeringService): self.feature_service = feature_service self.logger = logging.getLogger(__name__) def process_features(self, data: Dict[str, Any]) -> Dict[str, Any]: """ Process weather data to compute engineered features Args: data: Request data containing weather_data and optional parameters Returns: Feature engineering response """ try: # Validate required parameters if 'weather_data' not in data or not data['weather_data']: return create_error_response( "Missing required parameter: 'weather_data'", {"required_fields": ["weather_data"]} ) weather_data = data['weather_data'] event_duration = data.get('event_duration', 1.0) include_metadata = data.get('include_metadata', True) # Validate event duration try: event_duration = float(event_duration) if event_duration else 1.0 if event_duration <= 0: event_duration = 1.0 except (ValueError, TypeError): return create_error_response( "Invalid event_duration: must be a positive number", {"event_duration": event_duration} ) self.logger.info(f"Processing features for weather data with {len(weather_data)} fields, " f"event_duration: {event_duration} days") # Process features success, result = self.feature_service.process_weather_features( weather_data, event_duration, include_metadata ) if success: return create_success_response(result) else: return create_error_response( "Failed to process engineered features", result ) except Exception as e: self.logger.error(f"Feature processing error: {str(e)}") return create_error_response( f"Failed to process features: {str(e)}" ) def process_batch_features(self, data: Dict[str, Any]) -> Dict[str, Any]: """ Process multiple weather datasets for feature engineering Args: data: Request data containing batch of weather datasets Returns: Batch feature engineering response """ try: # Validate batch request if 'batch_data' not in data or not isinstance(data['batch_data'], list): return create_error_response( "Invalid batch request: 'batch_data' array required" ) batch_data = data['batch_data'] include_metadata = data.get('include_metadata', True) if len(batch_data) > 100: # Limit batch size return create_error_response( "Batch size too large: maximum 100 items allowed", {"max_allowed": 100, "requested": len(batch_data)} ) self.logger.info(f"Processing batch feature engineering for {len(batch_data)} datasets") # Process batch success, result = self.feature_service.process_batch_features( batch_data, include_metadata ) if success: return create_success_response(result) else: return create_error_response( "Failed to process batch features", result ) except Exception as e: self.logger.error(f"Batch feature processing error: {str(e)}") return create_error_response( f"Failed to process batch features: {str(e)}" ) def create_feature_dataframe(self, data: Dict[str, Any]) -> Dict[str, Any]: """ Create DataFrame with weather data and engineered features Args: data: Request data containing weather_data, disaster_date, and days_before Returns: DataFrame creation response """ try: # Validate required parameters required_fields = ['weather_data', 'disaster_date', 'days_before'] missing_fields = [field for field in required_fields if field not in data or data[field] is None] if missing_fields: return create_error_response( f"Missing required fields: {', '.join(missing_fields)}", {"missing_fields": missing_fields} ) weather_data = data['weather_data'] disaster_date = str(data['disaster_date']) event_duration = data.get('event_duration', 1.0) try: days_before = int(data['days_before']) event_duration = float(event_duration) except (ValueError, TypeError) as e: return create_error_response( f"Invalid parameter format: {str(e)}", {"validation_error": str(e)} ) self.logger.info(f"Creating feature DataFrame for {disaster_date}, " f"{days_before} days, duration: {event_duration}") # Process features first success, feature_result = self.feature_service.process_weather_features( weather_data, event_duration, include_metadata=True ) if not success: return create_error_response( "Failed to process features for DataFrame", feature_result ) # Create DataFrame try: df = self.feature_service.create_feature_dataframe( weather_data, feature_result['engineered_features'], disaster_date, days_before ) # Convert DataFrame to dict for JSON response dataframe_data = { 'dates': df['date'].tolist(), 'weather_data': { col: df[col].tolist() for col in df.columns if col in WeatherFeatureModel.WEATHER_FIELDS }, 'engineered_features': { col: df[col].tolist() for col in df.columns if col in WeatherFeatureModel.ENGINEERED_FEATURES } } return create_success_response({ 'dataframe': dataframe_data, 'shape': df.shape, 'columns': list(df.columns), 'metadata': feature_result.get('metadata', {}), 'validation': feature_result.get('validation', {}) }) except Exception as e: return create_error_response( f"Failed to create DataFrame: {str(e)}" ) except Exception as e: self.logger.error(f"DataFrame creation error: {str(e)}") return create_error_response( f"Failed to create feature DataFrame: {str(e)}" ) def get_feature_info(self) -> Dict[str, Any]: """Get information about available engineered features""" try: feature_info = self.feature_service.get_feature_info() return create_success_response({ 'feature_info': feature_info, 'service_status': self.feature_service.get_service_status() }) except Exception as e: self.logger.error(f"Feature info error: {str(e)}") return create_error_response( f"Failed to get feature info: {str(e)}" ) def validate_weather_data(self, data: Dict[str, Any]) -> Dict[str, Any]: """ Validate weather data for feature engineering Args: data: Request data containing weather_data Returns: Validation response """ try: if 'weather_data' not in data or not data['weather_data']: return create_error_response( "Missing required parameter: 'weather_data'", {"required_fields": ["weather_data"]} ) weather_data = data['weather_data'] # Validate data is_valid, validation = self.feature_service.validate_input_data(weather_data) validation_result = { 'validation': validation, 'is_valid': is_valid, 'ready_for_processing': is_valid } if is_valid: return create_success_response(validation_result) else: return create_error_response( "Weather data validation failed", validation_result ) except Exception as e: self.logger.error(f"Validation error: {str(e)}") return create_error_response( f"Failed to validate weather data: {str(e)}" ) def process_and_export(self, data: Dict[str, Any]) -> Dict[str, Any]: """ Process features and export in specified format Args: data: Request data with weather_data, disaster_date, days_before, and export options Returns: Export response """ try: # Validate required parameters required_fields = ['weather_data', 'disaster_date', 'days_before'] missing_fields = [field for field in required_fields if field not in data or data[field] is None] if missing_fields: return create_error_response( f"Missing required fields: {', '.join(missing_fields)}", {"missing_fields": missing_fields} ) weather_data = data['weather_data'] disaster_date = str(data['disaster_date']) event_duration = data.get('event_duration', 1.0) export_format = data.get('export_format', 'dict').lower() try: days_before = int(data['days_before']) event_duration = float(event_duration) except (ValueError, TypeError) as e: return create_error_response( f"Invalid parameter format: {str(e)}", {"validation_error": str(e)} ) # Validate export format valid_formats = ['dict', 'dataframe', 'json'] if export_format not in valid_formats: return create_error_response( f"Invalid export format: {export_format}", {"valid_formats": valid_formats} ) self.logger.info(f"Processing and exporting features in '{export_format}' format") # Process and export success, result = self.feature_service.process_and_export( weather_data, disaster_date, days_before, event_duration, export_format ) if success: # Handle DataFrame special case for JSON response if export_format == 'dataframe' and 'export' in result: export_data = result['export'] if 'dataframe' in export_data: # Convert DataFrame to dict for JSON serialization df = export_data['dataframe'] export_data['dataframe_dict'] = df.to_dict(orient='list') # Remove actual DataFrame object for JSON response del export_data['dataframe'] return create_success_response(result) else: return create_error_response( "Failed to process and export features", result ) except Exception as e: self.logger.error(f"Process and export error: {str(e)}") return create_error_response( f"Failed to process and export: {str(e)}" ) def get_service_status(self) -> Dict[str, Any]: """Get feature engineering service status and health""" try: service_status = self.feature_service.get_service_status() return create_success_response({ 'controller': 'Feature Engineering Controller', 'service': service_status, 'health': 'healthy' if service_status.get('initialized') else 'unhealthy', 'available_operations': [ 'process_features', 'process_batch_features', 'create_feature_dataframe', 'validate_weather_data', 'process_and_export', 'get_feature_info' ] }) except Exception as e: self.logger.error(f"Service status error: {str(e)}") return create_error_response( f"Failed to get service status: {str(e)}" )