ShrishtiAI-backend / server /controllers /feature_engineering_controller.py
MEWTROS
My 6 can be your 9
5ccd893
"""
Feature Engineering Controller
Handles feature engineering operations and coordinates between service and API
"""
import logging
from typing import Dict, Any, List, Optional
from services.feature_engineering_service import FeatureEngineeringService
from models.feature_engineering_model import WeatherFeatureModel
from utils import create_error_response, create_success_response
class FeatureEngineeringController:
"""Controller for weather feature engineering operations"""
def __init__(self, feature_service: FeatureEngineeringService):
self.feature_service = feature_service
self.logger = logging.getLogger(__name__)
def process_features(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Process weather data to compute engineered features
Args:
data: Request data containing weather_data and optional parameters
Returns:
Feature engineering response
"""
try:
# Validate required parameters
if 'weather_data' not in data or not data['weather_data']:
return create_error_response(
"Missing required parameter: 'weather_data'",
{"required_fields": ["weather_data"]}
)
weather_data = data['weather_data']
event_duration = data.get('event_duration', 1.0)
include_metadata = data.get('include_metadata', True)
# Validate event duration
try:
event_duration = float(event_duration) if event_duration else 1.0
if event_duration <= 0:
event_duration = 1.0
except (ValueError, TypeError):
return create_error_response(
"Invalid event_duration: must be a positive number",
{"event_duration": event_duration}
)
self.logger.info(f"Processing features for weather data with {len(weather_data)} fields, "
f"event_duration: {event_duration} days")
# Process features
success, result = self.feature_service.process_weather_features(
weather_data, event_duration, include_metadata
)
if success:
return create_success_response(result)
else:
return create_error_response(
"Failed to process engineered features",
result
)
except Exception as e:
self.logger.error(f"Feature processing error: {str(e)}")
return create_error_response(
f"Failed to process features: {str(e)}"
)
def process_batch_features(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Process multiple weather datasets for feature engineering
Args:
data: Request data containing batch of weather datasets
Returns:
Batch feature engineering response
"""
try:
# Validate batch request
if 'batch_data' not in data or not isinstance(data['batch_data'], list):
return create_error_response(
"Invalid batch request: 'batch_data' array required"
)
batch_data = data['batch_data']
include_metadata = data.get('include_metadata', True)
if len(batch_data) > 100: # Limit batch size
return create_error_response(
"Batch size too large: maximum 100 items allowed",
{"max_allowed": 100, "requested": len(batch_data)}
)
self.logger.info(f"Processing batch feature engineering for {len(batch_data)} datasets")
# Process batch
success, result = self.feature_service.process_batch_features(
batch_data, include_metadata
)
if success:
return create_success_response(result)
else:
return create_error_response(
"Failed to process batch features",
result
)
except Exception as e:
self.logger.error(f"Batch feature processing error: {str(e)}")
return create_error_response(
f"Failed to process batch features: {str(e)}"
)
def create_feature_dataframe(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Create DataFrame with weather data and engineered features
Args:
data: Request data containing weather_data, disaster_date, and days_before
Returns:
DataFrame creation response
"""
try:
# Validate required parameters
required_fields = ['weather_data', 'disaster_date', 'days_before']
missing_fields = [field for field in required_fields if field not in data or data[field] is None]
if missing_fields:
return create_error_response(
f"Missing required fields: {', '.join(missing_fields)}",
{"missing_fields": missing_fields}
)
weather_data = data['weather_data']
disaster_date = str(data['disaster_date'])
event_duration = data.get('event_duration', 1.0)
try:
days_before = int(data['days_before'])
event_duration = float(event_duration)
except (ValueError, TypeError) as e:
return create_error_response(
f"Invalid parameter format: {str(e)}",
{"validation_error": str(e)}
)
self.logger.info(f"Creating feature DataFrame for {disaster_date}, "
f"{days_before} days, duration: {event_duration}")
# Process features first
success, feature_result = self.feature_service.process_weather_features(
weather_data, event_duration, include_metadata=True
)
if not success:
return create_error_response(
"Failed to process features for DataFrame",
feature_result
)
# Create DataFrame
try:
df = self.feature_service.create_feature_dataframe(
weather_data,
feature_result['engineered_features'],
disaster_date,
days_before
)
# Convert DataFrame to dict for JSON response
dataframe_data = {
'dates': df['date'].tolist(),
'weather_data': {
col: df[col].tolist()
for col in df.columns
if col in WeatherFeatureModel.WEATHER_FIELDS
},
'engineered_features': {
col: df[col].tolist()
for col in df.columns
if col in WeatherFeatureModel.ENGINEERED_FEATURES
}
}
return create_success_response({
'dataframe': dataframe_data,
'shape': df.shape,
'columns': list(df.columns),
'metadata': feature_result.get('metadata', {}),
'validation': feature_result.get('validation', {})
})
except Exception as e:
return create_error_response(
f"Failed to create DataFrame: {str(e)}"
)
except Exception as e:
self.logger.error(f"DataFrame creation error: {str(e)}")
return create_error_response(
f"Failed to create feature DataFrame: {str(e)}"
)
def get_feature_info(self) -> Dict[str, Any]:
"""Get information about available engineered features"""
try:
feature_info = self.feature_service.get_feature_info()
return create_success_response({
'feature_info': feature_info,
'service_status': self.feature_service.get_service_status()
})
except Exception as e:
self.logger.error(f"Feature info error: {str(e)}")
return create_error_response(
f"Failed to get feature info: {str(e)}"
)
def validate_weather_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Validate weather data for feature engineering
Args:
data: Request data containing weather_data
Returns:
Validation response
"""
try:
if 'weather_data' not in data or not data['weather_data']:
return create_error_response(
"Missing required parameter: 'weather_data'",
{"required_fields": ["weather_data"]}
)
weather_data = data['weather_data']
# Validate data
is_valid, validation = self.feature_service.validate_input_data(weather_data)
validation_result = {
'validation': validation,
'is_valid': is_valid,
'ready_for_processing': is_valid
}
if is_valid:
return create_success_response(validation_result)
else:
return create_error_response(
"Weather data validation failed",
validation_result
)
except Exception as e:
self.logger.error(f"Validation error: {str(e)}")
return create_error_response(
f"Failed to validate weather data: {str(e)}"
)
def process_and_export(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Process features and export in specified format
Args:
data: Request data with weather_data, disaster_date, days_before, and export options
Returns:
Export response
"""
try:
# Validate required parameters
required_fields = ['weather_data', 'disaster_date', 'days_before']
missing_fields = [field for field in required_fields if field not in data or data[field] is None]
if missing_fields:
return create_error_response(
f"Missing required fields: {', '.join(missing_fields)}",
{"missing_fields": missing_fields}
)
weather_data = data['weather_data']
disaster_date = str(data['disaster_date'])
event_duration = data.get('event_duration', 1.0)
export_format = data.get('export_format', 'dict').lower()
try:
days_before = int(data['days_before'])
event_duration = float(event_duration)
except (ValueError, TypeError) as e:
return create_error_response(
f"Invalid parameter format: {str(e)}",
{"validation_error": str(e)}
)
# Validate export format
valid_formats = ['dict', 'dataframe', 'json']
if export_format not in valid_formats:
return create_error_response(
f"Invalid export format: {export_format}",
{"valid_formats": valid_formats}
)
self.logger.info(f"Processing and exporting features in '{export_format}' format")
# Process and export
success, result = self.feature_service.process_and_export(
weather_data, disaster_date, days_before, event_duration, export_format
)
if success:
# Handle DataFrame special case for JSON response
if export_format == 'dataframe' and 'export' in result:
export_data = result['export']
if 'dataframe' in export_data:
# Convert DataFrame to dict for JSON serialization
df = export_data['dataframe']
export_data['dataframe_dict'] = df.to_dict(orient='list')
# Remove actual DataFrame object for JSON response
del export_data['dataframe']
return create_success_response(result)
else:
return create_error_response(
"Failed to process and export features",
result
)
except Exception as e:
self.logger.error(f"Process and export error: {str(e)}")
return create_error_response(
f"Failed to process and export: {str(e)}"
)
def get_service_status(self) -> Dict[str, Any]:
"""Get feature engineering service status and health"""
try:
service_status = self.feature_service.get_service_status()
return create_success_response({
'controller': 'Feature Engineering Controller',
'service': service_status,
'health': 'healthy' if service_status.get('initialized') else 'unhealthy',
'available_operations': [
'process_features',
'process_batch_features',
'create_feature_dataframe',
'validate_weather_data',
'process_and_export',
'get_feature_info'
]
})
except Exception as e:
self.logger.error(f"Service status error: {str(e)}")
return create_error_response(
f"Failed to get service status: {str(e)}"
)