""" Dynamic API Request Handler for COGNEXA ML Service Provides: - Flexible request validation with fallback defaults - Automatic field coercion and type conversion - Better error messages - Graceful schema validation Version: 1.0.0 """ import logging from typing import Dict, Any, List, Optional, Type, TypeVar, Union from functools import wraps from datetime import datetime import json T = TypeVar('T') logger = logging.getLogger(__name__) class ValidationError(Exception): """Validation error with detailed information""" def __init__(self, field: str, value: Any, expected_type: str, message: str = None): self.field = field self.value = value self.expected_type = expected_type self.message = message or f"Field '{field}' failed validation" super().__init__(self.message) class FlexibleRequestHandler: """Handles flexible request validation with fallbacks""" @staticmethod def coerce_value(value: Any, target_type: Type[T], field_name: str = "field") -> T: """Coerce value to target type with intelligent fallbacks""" if value is None: return None try: # String to number if target_type in [int, float]: if isinstance(value, str): value = float(value) if target_type is int: value = int(value) return target_type(value) # String to bool elif target_type is bool: if isinstance(value, str): return value.lower() in ['true', '1', 'yes', 'on'] return bool(value) # Dict preservation elif target_type is dict: if isinstance(value, dict): return value elif isinstance(value, str): try: return json.loads(value) except: return {} return {} # List preservation elif target_type is list: if isinstance(value, list): return value elif isinstance(value, str): try: return json.loads(value) except: return [value] return [value] if value else [] # Direct conversion else: return target_type(value) except (ValueError, TypeError) as e: logger.warning(f"Could not coerce {field_name}={value} to {target_type.__name__}: {e}") return None @staticmethod def validate_dict(data: Dict[str, Any], schema: Dict[str, tuple]) -> Dict[str, Any]: """ Validate and coerce dictionary against schema. Schema format: { 'field_name': (required, type, default_value), 'name': (True, str, None), # required string 'age': (False, int, 18), # optional int, default 18 } """ result = {} errors = [] for field_name, (required, field_type, default) in schema.items(): value = data.get(field_name, default) # Check required if required and value is None: errors.append(f"Required field '{field_name}' is missing") continue # Coerce type if value is not None: coerced = FlexibleRequestHandler.coerce_value(value, field_type, field_name) if coerced is None and value is not None: # Try using default coerced = default logger.warning(f"Field {field_name}: coercion failed, using default: {default}") result[field_name] = coerced else: result[field_name] = default return result, errors class DynamicRequestValidator: """Validates and coerces API requests dynamically""" # Define schemas for common request types PERSONALITY_ANALYSIS_SCHEMA = { 'responses': (True, dict, {}), # required dict 'response_scale_max': (False, int, 5), # optional int, default 5 } PRODUCTIVITY_FORECAST_SCHEMA = { 'user_id': (True, str, None), 'historical_data': (False, list, []), 'forecast_days': (False, int, 7), 'personality': (False, dict, None), } TASK_PREDICTION_SCHEMA = { 'title': (True, str, None), 'description': (False, str, ""), 'category': (False, str, "PERSONAL"), 'priority': (False, str, "MEDIUM"), 'complexity': (False, int, 3), 'estimated_duration': (False, int, 60), 'due_date': (False, str, None), 'personality': (False, dict, None), } PERSONALITY_RESPONSE_SCHEMA = { 'responses': (True, dict, {}), 'response_scale_max': (False, int, 5), } ENGAGEMENT_PREDICTION_SCHEMA = { 'user_id': (True, str, None), 'task_id': (True, str, None), 'task': (False, dict, {}), 'user_profile': (False, dict, {}), 'recent_interactions': (False, list, []), 'similar_tasks': (False, list, []), } @classmethod def validate_personality_analysis(cls, data: Dict) -> tuple[Dict, List[str]]: """Validate personality analysis request""" return FlexibleRequestHandler.validate_dict(data, cls.PERSONALITY_ANALYSIS_SCHEMA) @classmethod def validate_productivity_forecast(cls, data: Dict) -> tuple[Dict, List[str]]: """Validate productivity forecast request""" return FlexibleRequestHandler.validate_dict(data, cls.PRODUCTIVITY_FORECAST_SCHEMA) @classmethod def validate_task_prediction(cls, data: Dict) -> tuple[Dict, List[str]]: """Validate task prediction request""" return FlexibleRequestHandler.validate_dict(data, cls.TASK_PREDICTION_SCHEMA) @classmethod def validate_personality_responses(cls, data: Dict) -> tuple[Dict, List[str]]: """Validate personality responses request""" return FlexibleRequestHandler.validate_dict(data, cls.PERSONALITY_RESPONSE_SCHEMA) @classmethod def validate_engagement_prediction(cls, data: Dict) -> tuple[Dict, List[str]]: """Validate engagement prediction request""" return FlexibleRequestHandler.validate_dict(data, cls.ENGAGEMENT_PREDICTION_SCHEMA) def flexible_endpoint(endpoint_name: str = None): """ Decorator for API endpoints that validates and coerces requests. Usage: @flexible_endpoint("personality_analysis") async def my_endpoint(request: dict): ... """ def decorator(func): @wraps(func) async def wrapper(request): try: # Convert request to dict if hasattr(request, 'dict'): data = request.dict() else: data = dict(request) # Validate based on endpoint validator = DynamicRequestValidator if endpoint_name == "personality_analysis": validated, errors = validator.validate_personality_analysis(data) elif endpoint_name == "productivity_forecast": validated, errors = validator.validate_productivity_forecast(data) elif endpoint_name == "task_prediction": validated, errors = validator.validate_task_prediction(data) elif endpoint_name == "personality_responses": validated, errors = validator.validate_personality_responses(data) elif endpoint_name == "engagement_prediction": validated, errors = validator.validate_engagement_prediction(data) else: validated, errors = data, [] # Log warnings if there were validation issues if errors: logger.warning(f"Validation issues for {endpoint_name}: {errors}") # Call function with validated data return await func(validated) except Exception as e: logger.error(f"Error in flexible_endpoint: {e}") raise return wrapper return decorator class DynamicModelConfigurator: """Configures models dynamically based on input parameters""" @staticmethod def get_model_config(model_type: str, **kwargs) -> Dict[str, Any]: """Get dynamic model configuration""" base_config = { 'verbose': kwargs.get('verbose', False), 'random_seed': kwargs.get('random_seed', 42), 'scale_features': kwargs.get('scale_features', True), 'track_metrics': kwargs.get('track_metrics', True), } if model_type == 'task_completion': return { **base_config, 'model_type': 'task_completion', 'task_type': 'classification', 'threshold': kwargs.get('threshold', 0.5), 'min_confidence': kwargs.get('min_confidence', 0.3), } elif model_type == 'duration_prediction': return { **base_config, 'model_type': 'duration_prediction', 'task_type': 'regression', 'confidence_interval': kwargs.get('confidence_interval', 0.95), 'max_forecast_hours': kwargs.get('max_forecast_hours', 8), } elif model_type == 'stress_prediction': return { **base_config, 'model_type': 'stress_prediction', 'task_type': 'regression', 'stress_scale_max': kwargs.get('stress_scale_max', 10), 'alert_threshold': kwargs.get('alert_threshold', 7), } elif model_type == 'personality_clustering': return { **base_config, 'model_type': 'personality_clustering', 'task_type': 'clustering', 'n_clusters': kwargs.get('n_clusters', 5), 'personality_traits': kwargs.get('personality_traits', ['openness', 'conscientiousness', 'extraversion', 'agreeableness', 'neuroticism']), } elif model_type == 'engagement_prediction': return { **base_config, 'model_type': 'engagement_prediction', 'task_type': 'classification', 'engagement_scale': kwargs.get('engagement_scale', 100), 'positive_threshold': kwargs.get('positive_threshold', 50), } else: return base_config class ParameterizedModelTrainer: """Train models with dynamic parameters""" @staticmethod def get_training_config(model_type: str, **kwargs) -> Dict[str, Any]: """Get dynamic training configuration""" base_config = { 'epochs': kwargs.get('epochs', 100), 'batch_size': kwargs.get('batch_size', 32), 'learning_rate': kwargs.get('learning_rate', 0.01), 'early_stopping_patience': kwargs.get('early_stopping_patience', 10), 'validation_split': kwargs.get('validation_split', 0.2), 'random_seed': kwargs.get('random_seed', 42), } if model_type == 'xgboost': return { **base_config, 'max_depth': kwargs.get('max_depth', 6), 'subsample': kwargs.get('subsample', 0.8), 'colsample_bytree': kwargs.get('colsample_bytree', 0.8), 'min_child_weight': kwargs.get('min_child_weight', 1), 'gamma': kwargs.get('gamma', 0), 'lambda': kwargs.get('lambda', 1), } elif model_type == 'random_forest': return { **base_config, 'n_estimators': kwargs.get('n_estimators', 100), 'max_depth': kwargs.get('max_depth', 15), 'min_samples_split': kwargs.get('min_samples_split', 5), 'min_samples_leaf': kwargs.get('min_samples_leaf', 2), 'max_features': kwargs.get('max_features', 'sqrt'), } elif model_type == 'gradient_boosting': return { **base_config, 'n_estimators': kwargs.get('n_estimators', 100), 'learning_rate': kwargs.get('learning_rate', 0.01), 'max_depth': kwargs.get('max_depth', 5), 'min_samples_split': kwargs.get('min_samples_split', 5), 'min_samples_leaf': kwargs.get('min_samples_leaf', 2), } else: return base_config