SPG_ML / dynamic_api_handler.py
meetmendapara's picture
Added Personalization Models
5059de5
"""
Dynamic API Request Handler for COGNEXA ML Service
Provides:
- Flexible request validation with fallback defaults
- Automatic field coercion and type conversion
- Better error messages
- Graceful schema validation
Version: 1.0.0
"""
import logging
from typing import Dict, Any, List, Optional, Type, TypeVar, Union
from functools import wraps
from datetime import datetime
import json
T = TypeVar('T')
logger = logging.getLogger(__name__)
class ValidationError(Exception):
"""Validation error with detailed information"""
def __init__(self, field: str, value: Any, expected_type: str, message: str = None):
self.field = field
self.value = value
self.expected_type = expected_type
self.message = message or f"Field '{field}' failed validation"
super().__init__(self.message)
class FlexibleRequestHandler:
"""Handles flexible request validation with fallbacks"""
@staticmethod
def coerce_value(value: Any, target_type: Type[T], field_name: str = "field") -> T:
"""Coerce value to target type with intelligent fallbacks"""
if value is None:
return None
try:
# String to number
if target_type in [int, float]:
if isinstance(value, str):
value = float(value)
if target_type is int:
value = int(value)
return target_type(value)
# String to bool
elif target_type is bool:
if isinstance(value, str):
return value.lower() in ['true', '1', 'yes', 'on']
return bool(value)
# Dict preservation
elif target_type is dict:
if isinstance(value, dict):
return value
elif isinstance(value, str):
try:
return json.loads(value)
except:
return {}
return {}
# List preservation
elif target_type is list:
if isinstance(value, list):
return value
elif isinstance(value, str):
try:
return json.loads(value)
except:
return [value]
return [value] if value else []
# Direct conversion
else:
return target_type(value)
except (ValueError, TypeError) as e:
logger.warning(f"Could not coerce {field_name}={value} to {target_type.__name__}: {e}")
return None
@staticmethod
def validate_dict(data: Dict[str, Any], schema: Dict[str, tuple]) -> Dict[str, Any]:
"""
Validate and coerce dictionary against schema.
Schema format:
{
'field_name': (required, type, default_value),
'name': (True, str, None), # required string
'age': (False, int, 18), # optional int, default 18
}
"""
result = {}
errors = []
for field_name, (required, field_type, default) in schema.items():
value = data.get(field_name, default)
# Check required
if required and value is None:
errors.append(f"Required field '{field_name}' is missing")
continue
# Coerce type
if value is not None:
coerced = FlexibleRequestHandler.coerce_value(value, field_type, field_name)
if coerced is None and value is not None:
# Try using default
coerced = default
logger.warning(f"Field {field_name}: coercion failed, using default: {default}")
result[field_name] = coerced
else:
result[field_name] = default
return result, errors
class DynamicRequestValidator:
"""Validates and coerces API requests dynamically"""
# Define schemas for common request types
PERSONALITY_ANALYSIS_SCHEMA = {
'responses': (True, dict, {}), # required dict
'response_scale_max': (False, int, 5), # optional int, default 5
}
PRODUCTIVITY_FORECAST_SCHEMA = {
'user_id': (True, str, None),
'historical_data': (False, list, []),
'forecast_days': (False, int, 7),
'personality': (False, dict, None),
}
TASK_PREDICTION_SCHEMA = {
'title': (True, str, None),
'description': (False, str, ""),
'category': (False, str, "PERSONAL"),
'priority': (False, str, "MEDIUM"),
'complexity': (False, int, 3),
'estimated_duration': (False, int, 60),
'due_date': (False, str, None),
'personality': (False, dict, None),
}
PERSONALITY_RESPONSE_SCHEMA = {
'responses': (True, dict, {}),
'response_scale_max': (False, int, 5),
}
ENGAGEMENT_PREDICTION_SCHEMA = {
'user_id': (True, str, None),
'task_id': (True, str, None),
'task': (False, dict, {}),
'user_profile': (False, dict, {}),
'recent_interactions': (False, list, []),
'similar_tasks': (False, list, []),
}
@classmethod
def validate_personality_analysis(cls, data: Dict) -> tuple[Dict, List[str]]:
"""Validate personality analysis request"""
return FlexibleRequestHandler.validate_dict(data, cls.PERSONALITY_ANALYSIS_SCHEMA)
@classmethod
def validate_productivity_forecast(cls, data: Dict) -> tuple[Dict, List[str]]:
"""Validate productivity forecast request"""
return FlexibleRequestHandler.validate_dict(data, cls.PRODUCTIVITY_FORECAST_SCHEMA)
@classmethod
def validate_task_prediction(cls, data: Dict) -> tuple[Dict, List[str]]:
"""Validate task prediction request"""
return FlexibleRequestHandler.validate_dict(data, cls.TASK_PREDICTION_SCHEMA)
@classmethod
def validate_personality_responses(cls, data: Dict) -> tuple[Dict, List[str]]:
"""Validate personality responses request"""
return FlexibleRequestHandler.validate_dict(data, cls.PERSONALITY_RESPONSE_SCHEMA)
@classmethod
def validate_engagement_prediction(cls, data: Dict) -> tuple[Dict, List[str]]:
"""Validate engagement prediction request"""
return FlexibleRequestHandler.validate_dict(data, cls.ENGAGEMENT_PREDICTION_SCHEMA)
def flexible_endpoint(endpoint_name: str = None):
"""
Decorator for API endpoints that validates and coerces requests.
Usage:
@flexible_endpoint("personality_analysis")
async def my_endpoint(request: dict):
...
"""
def decorator(func):
@wraps(func)
async def wrapper(request):
try:
# Convert request to dict
if hasattr(request, 'dict'):
data = request.dict()
else:
data = dict(request)
# Validate based on endpoint
validator = DynamicRequestValidator
if endpoint_name == "personality_analysis":
validated, errors = validator.validate_personality_analysis(data)
elif endpoint_name == "productivity_forecast":
validated, errors = validator.validate_productivity_forecast(data)
elif endpoint_name == "task_prediction":
validated, errors = validator.validate_task_prediction(data)
elif endpoint_name == "personality_responses":
validated, errors = validator.validate_personality_responses(data)
elif endpoint_name == "engagement_prediction":
validated, errors = validator.validate_engagement_prediction(data)
else:
validated, errors = data, []
# Log warnings if there were validation issues
if errors:
logger.warning(f"Validation issues for {endpoint_name}: {errors}")
# Call function with validated data
return await func(validated)
except Exception as e:
logger.error(f"Error in flexible_endpoint: {e}")
raise
return wrapper
return decorator
class DynamicModelConfigurator:
"""Configures models dynamically based on input parameters"""
@staticmethod
def get_model_config(model_type: str, **kwargs) -> Dict[str, Any]:
"""Get dynamic model configuration"""
base_config = {
'verbose': kwargs.get('verbose', False),
'random_seed': kwargs.get('random_seed', 42),
'scale_features': kwargs.get('scale_features', True),
'track_metrics': kwargs.get('track_metrics', True),
}
if model_type == 'task_completion':
return {
**base_config,
'model_type': 'task_completion',
'task_type': 'classification',
'threshold': kwargs.get('threshold', 0.5),
'min_confidence': kwargs.get('min_confidence', 0.3),
}
elif model_type == 'duration_prediction':
return {
**base_config,
'model_type': 'duration_prediction',
'task_type': 'regression',
'confidence_interval': kwargs.get('confidence_interval', 0.95),
'max_forecast_hours': kwargs.get('max_forecast_hours', 8),
}
elif model_type == 'stress_prediction':
return {
**base_config,
'model_type': 'stress_prediction',
'task_type': 'regression',
'stress_scale_max': kwargs.get('stress_scale_max', 10),
'alert_threshold': kwargs.get('alert_threshold', 7),
}
elif model_type == 'personality_clustering':
return {
**base_config,
'model_type': 'personality_clustering',
'task_type': 'clustering',
'n_clusters': kwargs.get('n_clusters', 5),
'personality_traits': kwargs.get('personality_traits', ['openness', 'conscientiousness', 'extraversion', 'agreeableness', 'neuroticism']),
}
elif model_type == 'engagement_prediction':
return {
**base_config,
'model_type': 'engagement_prediction',
'task_type': 'classification',
'engagement_scale': kwargs.get('engagement_scale', 100),
'positive_threshold': kwargs.get('positive_threshold', 50),
}
else:
return base_config
class ParameterizedModelTrainer:
"""Train models with dynamic parameters"""
@staticmethod
def get_training_config(model_type: str, **kwargs) -> Dict[str, Any]:
"""Get dynamic training configuration"""
base_config = {
'epochs': kwargs.get('epochs', 100),
'batch_size': kwargs.get('batch_size', 32),
'learning_rate': kwargs.get('learning_rate', 0.01),
'early_stopping_patience': kwargs.get('early_stopping_patience', 10),
'validation_split': kwargs.get('validation_split', 0.2),
'random_seed': kwargs.get('random_seed', 42),
}
if model_type == 'xgboost':
return {
**base_config,
'max_depth': kwargs.get('max_depth', 6),
'subsample': kwargs.get('subsample', 0.8),
'colsample_bytree': kwargs.get('colsample_bytree', 0.8),
'min_child_weight': kwargs.get('min_child_weight', 1),
'gamma': kwargs.get('gamma', 0),
'lambda': kwargs.get('lambda', 1),
}
elif model_type == 'random_forest':
return {
**base_config,
'n_estimators': kwargs.get('n_estimators', 100),
'max_depth': kwargs.get('max_depth', 15),
'min_samples_split': kwargs.get('min_samples_split', 5),
'min_samples_leaf': kwargs.get('min_samples_leaf', 2),
'max_features': kwargs.get('max_features', 'sqrt'),
}
elif model_type == 'gradient_boosting':
return {
**base_config,
'n_estimators': kwargs.get('n_estimators', 100),
'learning_rate': kwargs.get('learning_rate', 0.01),
'max_depth': kwargs.get('max_depth', 5),
'min_samples_split': kwargs.get('min_samples_split', 5),
'min_samples_leaf': kwargs.get('min_samples_leaf', 2),
}
else:
return base_config