Spaces:
Sleeping
Sleeping
| """ | |
| Feedback Loop Module for Cognexa ML Service | |
| This module provides: | |
| - User feedback collection and processing | |
| - Model retraining triggers | |
| - Prediction correction tracking | |
| - Learning from mistakes | |
| - Adaptive model updates | |
| """ | |
| import os | |
| import json | |
| import logging | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple, Any | |
| from dataclasses import dataclass, asdict | |
| from enum import Enum | |
| import numpy as np | |
| logger = logging.getLogger(__name__) | |
| class FeedbackType(str, Enum): | |
| """Types of user feedback""" | |
| CORRECTION = "correction" | |
| CONFIRMATION = "confirmation" | |
| REJECTION = "rejection" | |
| RATING = "rating" | |
| SUGGESTION = "suggestion" | |
| class FeedbackSeverity(str, Enum): | |
| """Severity/importance of feedback""" | |
| LOW = "low" | |
| MEDIUM = "medium" | |
| HIGH = "high" | |
| CRITICAL = "critical" | |
| class UserFeedback: | |
| """User feedback on a prediction""" | |
| feedback_id: str | |
| user_id: str | |
| task_id: str | |
| prediction_id: str | |
| feedback_type: FeedbackType | |
| severity: FeedbackSeverity | |
| original_prediction: Dict[str, Any] | |
| corrected_value: Optional[Any] | |
| rating: Optional[int] | |
| comment: Optional[str] | |
| timestamp: str | |
| processed: bool = False | |
| class FeedbackStats: | |
| """Statistics on collected feedback""" | |
| total_feedback: int | |
| corrections: int | |
| confirmations: int | |
| rejections: int | |
| average_rating: float | |
| improvement_rate: float | |
| most_corrected_features: List[Tuple[str, int]] | |
| class FeedbackCollector: | |
| """Collects and stores user feedback""" | |
| def __init__(self, feedback_dir: str = "feedback"): | |
| self.feedback_dir = Path(feedback_dir) | |
| self.feedback_dir.mkdir(exist_ok=True) | |
| self.feedback_file = self.feedback_dir / "feedback_log.jsonl" | |
| self.stats_file = self.feedback_dir / "feedback_stats.json" | |
| self.logger = logging.getLogger(__name__) | |
| def collect_feedback( | |
| self, | |
| user_id: str, | |
| task_id: str, | |
| prediction_id: str, | |
| feedback_type: FeedbackType, | |
| original_prediction: Dict[str, Any], | |
| corrected_value: Optional[Any] = None, | |
| rating: Optional[int] = None, | |
| comment: Optional[str] = None, | |
| severity: FeedbackSeverity = FeedbackSeverity.MEDIUM, | |
| ) -> UserFeedback: | |
| """Collect user feedback on a prediction""" | |
| import uuid | |
| feedback_id = str(uuid.uuid4())[:12] | |
| feedback = UserFeedback( | |
| feedback_id=feedback_id, | |
| user_id=user_id, | |
| task_id=task_id, | |
| prediction_id=prediction_id, | |
| feedback_type=feedback_type, | |
| severity=severity, | |
| original_prediction=original_prediction, | |
| corrected_value=corrected_value, | |
| rating=rating, | |
| comment=comment, | |
| timestamp=datetime.now().isoformat(), | |
| processed=False, | |
| ) | |
| self._save_feedback(feedback) | |
| self._update_stats(feedback) | |
| self.logger.info(f"Feedback collected: {feedback_id} ({feedback_type})") | |
| return feedback | |
| def _save_feedback(self, feedback: UserFeedback): | |
| """Save feedback to log file""" | |
| with open(self.feedback_file, "a") as f: | |
| f.write(json.dumps(asdict(feedback)) + "\n") | |
| def _update_stats(self, feedback: UserFeedback): | |
| """Update feedback statistics""" | |
| stats = self.load_stats() | |
| stats["total_feedback"] += 1 | |
| if feedback.feedback_type == FeedbackType.CORRECTION: | |
| stats["corrections"] += 1 | |
| elif feedback.feedback_type == FeedbackType.CONFIRMATION: | |
| stats["confirmations"] += 1 | |
| elif feedback.feedback_type == FeedbackType.REJECTION: | |
| stats["rejections"] += 1 | |
| if feedback.rating is not None: | |
| current_avg = stats.get("average_rating", 0.0) | |
| rating_count = stats.get("rating_count", 0) | |
| stats["average_rating"] = (current_avg * rating_count + feedback.rating) / ( | |
| rating_count + 1 | |
| ) | |
| stats["rating_count"] = rating_count + 1 | |
| self._save_stats(stats) | |
| def load_stats(self) -> Dict: | |
| """Load feedback statistics""" | |
| if self.stats_file.exists(): | |
| with open(self.stats_file, "r") as f: | |
| return json.load(f) | |
| return { | |
| "total_feedback": 0, | |
| "corrections": 0, | |
| "confirmations": 0, | |
| "rejections": 0, | |
| "average_rating": 0.0, | |
| "rating_count": 0, | |
| "improvement_rate": 0.0, | |
| } | |
| def _save_stats(self, stats: Dict): | |
| """Save feedback statistics""" | |
| with open(self.stats_file, "w") as f: | |
| json.dump(stats, f, indent=2) | |
| def get_feedback_for_task(self, task_id: str) -> List[UserFeedback]: | |
| """Get all feedback for a specific task""" | |
| feedbacks = [] | |
| if self.feedback_file.exists(): | |
| with open(self.feedback_file, "r") as f: | |
| for line in f: | |
| data = json.loads(line) | |
| if data["task_id"] == task_id: | |
| feedbacks.append(UserFeedback(**data)) | |
| return feedbacks | |
| def get_unprocessed_feedback(self) -> List[UserFeedback]: | |
| """Get all unprocessed feedback""" | |
| feedbacks = [] | |
| if self.feedback_file.exists(): | |
| with open(self.feedback_file, "r") as f: | |
| for line in f: | |
| data = json.loads(line) | |
| if not data.get("processed", False): | |
| feedbacks.append(UserFeedback(**data)) | |
| return feedbacks | |
| def mark_processed(self, feedback_ids: List[str]): | |
| """Mark feedback as processed""" | |
| lines = [] | |
| if self.feedback_file.exists(): | |
| with open(self.feedback_file, "r") as f: | |
| for line in f: | |
| data = json.loads(line) | |
| if data["feedback_id"] in feedback_ids: | |
| data["processed"] = True | |
| lines.append(json.dumps(data) + "\n") | |
| with open(self.feedback_file, "w") as f: | |
| f.writelines(lines) | |
| class FeedbackProcessor: | |
| """Processes feedback for model improvement""" | |
| def __init__(self, feedback_collector: FeedbackCollector): | |
| self.collector = feedback_collector | |
| self.logger = logging.getLogger(__name__) | |
| def analyze_feedback_patterns(self, days: int = 30) -> Dict: | |
| """Analyze patterns in collected feedback""" | |
| cutoff = datetime.now() - timedelta(days=days) | |
| feedbacks = self._get_recent_feedback(cutoff) | |
| if not feedbacks: | |
| return {"message": "No feedback in the specified period"} | |
| corrections_by_feature = {} | |
| correction_magnitudes = [] | |
| for fb in feedbacks: | |
| if fb.feedback_type == FeedbackType.CORRECTION: | |
| orig = fb.original_prediction | |
| corr = fb.corrected_value | |
| if isinstance(orig, dict) and isinstance(corr, dict): | |
| for key in set(orig.keys()) | set(corr.keys()): | |
| if key not in corrections_by_feature: | |
| corrections_by_feature[key] = 0 | |
| corrections_by_feature[key] += 1 | |
| if key in orig and key in corr: | |
| try: | |
| magnitude = abs(float(corr[key]) - float(orig[key])) | |
| correction_magnitudes.append((key, magnitude)) | |
| except (TypeError, ValueError): | |
| pass | |
| most_corrected = sorted( | |
| corrections_by_feature.items(), key=lambda x: x[1], reverse=True | |
| )[:5] | |
| avg_correction_by_feature = {} | |
| for key, _ in most_corrected: | |
| magnitudes = [m for k, m in correction_magnitudes if k == key] | |
| if magnitudes: | |
| avg_correction_by_feature[key] = np.mean(magnitudes) | |
| total = len(feedbacks) | |
| corrections = sum( | |
| 1 for fb in feedbacks if fb.feedback_type == FeedbackType.CORRECTION | |
| ) | |
| confirmations = sum( | |
| 1 for fb in feedbacks if fb.feedback_type == FeedbackType.CONFIRMATION | |
| ) | |
| rejections = sum( | |
| 1 for fb in feedbacks if fb.feedback_type == FeedbackType.REJECTION | |
| ) | |
| return { | |
| "period_days": days, | |
| "total_feedback": total, | |
| "corrections": corrections, | |
| "confirmations": confirmations, | |
| "rejections": rejections, | |
| "correction_rate": corrections / total if total > 0 else 0, | |
| "confirmation_rate": confirmations / total if total > 0 else 0, | |
| "rejection_rate": rejections / total if total > 0 else 0, | |
| "most_corrected_features": most_corrected, | |
| "average_correction_magnitude": avg_correction_by_feature, | |
| "recommendations": self._generate_recommendations( | |
| most_corrected, corrections / total if total > 0 else 0 | |
| ), | |
| } | |
| def _get_recent_feedback(self, cutoff: datetime) -> List[UserFeedback]: | |
| """Get feedback since cutoff date""" | |
| feedbacks = [] | |
| if self.collector.feedback_file.exists(): | |
| with open(self.collector.feedback_file, "r") as f: | |
| for line in f: | |
| data = json.loads(line) | |
| try: | |
| fb_time = datetime.fromisoformat(data["timestamp"]) | |
| if fb_time >= cutoff: | |
| feedbacks.append(UserFeedback(**data)) | |
| except (ValueError, KeyError): | |
| continue | |
| return feedbacks | |
| def _generate_recommendations( | |
| self, most_corrected: List[Tuple[str, int]], correction_rate: float | |
| ) -> List[str]: | |
| """Generate recommendations based on feedback analysis""" | |
| recommendations = [] | |
| if correction_rate > 0.3: | |
| recommendations.append( | |
| "High correction rate detected - consider model retraining" | |
| ) | |
| for feature, count in most_corrected[:3]: | |
| if count > 10: | |
| recommendations.append( | |
| f"Review feature weighting for '{feature}' - frequently corrected" | |
| ) | |
| if correction_rate < 0.1: | |
| recommendations.append("Low correction rate - predictions are accurate") | |
| return recommendations | |
| def calculate_model_adjustments(self) -> Dict[str, float]: | |
| """Calculate feature weight adjustments based on feedback""" | |
| unprocessed = self.collector.get_unprocessed_feedback() | |
| adjustments = {} | |
| for fb in unprocessed: | |
| if fb.feedback_type == FeedbackType.CORRECTION: | |
| orig = fb.original_prediction | |
| corr = fb.corrected_value | |
| if isinstance(orig, dict) and isinstance(corr, dict): | |
| for key in orig: | |
| if key in corr: | |
| try: | |
| orig_val = float(orig[key]) | |
| corr_val = float(corr[key]) | |
| error = corr_val - orig_val | |
| if key not in adjustments: | |
| adjustments[key] = [] | |
| adjustments[key].append(error) | |
| except (TypeError, ValueError): | |
| pass | |
| final_adjustments = {} | |
| for key, errors in adjustments.items(): | |
| if errors: | |
| final_adjustments[key] = float(np.mean(errors)) | |
| return final_adjustments | |
| def should_trigger_retraining(self) -> Tuple[bool, str]: | |
| """Determine if model retraining should be triggered""" | |
| stats = self.collector.load_stats() | |
| patterns = self.analyze_feedback_patterns(days=7) | |
| if stats["total_feedback"] < 50: | |
| return False, "Insufficient feedback for retraining decision" | |
| correction_rate = patterns.get("correction_rate", 0) | |
| if correction_rate > 0.4: | |
| return ( | |
| True, | |
| f"High correction rate ({correction_rate:.1%}) - retraining recommended", | |
| ) | |
| rejection_rate = patterns.get("rejection_rate", 0) | |
| if rejection_rate > 0.3: | |
| return ( | |
| True, | |
| f"High rejection rate ({rejection_rate:.1%}) - retraining recommended", | |
| ) | |
| avg_rating = stats.get("average_rating", 0) | |
| if avg_rating > 0 and avg_rating < 3.0: | |
| return ( | |
| True, | |
| f"Low average rating ({avg_rating:.1f}) - retraining recommended", | |
| ) | |
| return False, "Model performance is acceptable" | |
| class AdaptiveLearner: | |
| """Implements adaptive learning from feedback""" | |
| def __init__(self, feedback_collector: FeedbackCollector): | |
| self.collector = feedback_collector | |
| self.processor = FeedbackProcessor(feedback_collector) | |
| self.logger = logging.getLogger(__name__) | |
| self.adaptation_history: List[Dict] = [] | |
| def adapt_predictions( | |
| self, user_id: str, base_prediction: Dict[str, Any], features: Dict[str, float] | |
| ) -> Dict[str, Any]: | |
| """Adapt predictions based on user's historical feedback""" | |
| user_corrections = self._get_user_correction_patterns(user_id) | |
| adapted_prediction = base_prediction.copy() | |
| for key, adjustment in user_corrections.items(): | |
| if key in adapted_prediction: | |
| try: | |
| current = float(adapted_prediction[key]) | |
| adapted = ( | |
| current + adjustment * 0.1 | |
| ) # 10% weight to historical adjustment | |
| adapted_prediction[key] = adapted | |
| except (TypeError, ValueError): | |
| pass | |
| if "completion_probability" in adapted_prediction: | |
| prob = adapted_prediction["completion_probability"] | |
| adapted_prediction["confidence_level"] = self._adjust_confidence( | |
| prob, user_corrections | |
| ) | |
| return adapted_prediction | |
| def _get_user_correction_patterns(self, user_id: str) -> Dict[str, float]: | |
| """Get correction patterns for a specific user""" | |
| user_feedbacks = self._get_user_feedback(user_id) | |
| corrections = {} | |
| for fb in user_feedbacks: | |
| if fb.feedback_type == FeedbackType.CORRECTION: | |
| orig = fb.original_prediction | |
| corr = fb.corrected_value | |
| if isinstance(orig, dict) and isinstance(corr, dict): | |
| for key in orig: | |
| if key in corr: | |
| try: | |
| error = float(corr[key]) - float(orig[key]) | |
| if key not in corrections: | |
| corrections[key] = [] | |
| corrections[key].append(error) | |
| except (TypeError, ValueError): | |
| pass | |
| patterns = {} | |
| for key, errors in corrections.items(): | |
| if len(errors) >= 3: | |
| patterns[key] = float(np.mean(errors)) | |
| return patterns | |
| def _get_user_feedback(self, user_id: str) -> List[UserFeedback]: | |
| """Get all feedback for a user""" | |
| feedbacks = [] | |
| if self.collector.feedback_file.exists(): | |
| with open(self.collector.feedback_file, "r") as f: | |
| for line in f: | |
| data = json.loads(line) | |
| if data["user_id"] == user_id: | |
| feedbacks.append(UserFeedback(**data)) | |
| return feedbacks | |
| def _adjust_confidence( | |
| self, probability: float, corrections: Dict[str, float] | |
| ) -> float: | |
| """Adjust confidence based on historical correction patterns""" | |
| base_confidence = 0.7 | |
| if len(corrections) == 0: | |
| return base_confidence | |
| avg_correction = np.mean([abs(v) for v in corrections.values()]) | |
| confidence_penalty = min(0.2, avg_correction * 0.5) | |
| adjusted_confidence = base_confidence - confidence_penalty | |
| return max(0.4, min(0.95, adjusted_confidence)) | |
| def record_adaptation( | |
| self, | |
| user_id: str, | |
| task_id: str, | |
| original_prediction: Dict, | |
| adapted_prediction: Dict, | |
| adjustments: Dict[str, float], | |
| ): | |
| """Record adaptation for tracking""" | |
| adaptation = { | |
| "timestamp": datetime.now().isoformat(), | |
| "user_id": user_id, | |
| "task_id": task_id, | |
| "original_prediction": original_prediction, | |
| "adapted_prediction": adapted_prediction, | |
| "adjustments": adjustments, | |
| } | |
| self.adaptation_history.append(adaptation) | |
| self.logger.info(f"Adaptation recorded for user {user_id}, task {task_id}") | |
| class FeedbackIngestor: | |
| """Ingests feedback from various sources""" | |
| def __init__(self, collector: FeedbackCollector): | |
| self.collector = collector | |
| self.logger = logging.getLogger(__name__) | |
| def ingest_task_completion( | |
| self, | |
| user_id: str, | |
| task_id: str, | |
| prediction_id: str, | |
| prediction: Dict, | |
| actual_outcome: Dict, | |
| ) -> UserFeedback: | |
| """Ingest feedback from task completion""" | |
| predicted_completed = prediction.get("completion_probability", 0.5) > 0.5 | |
| actual_completed = actual_outcome.get("completed", False) | |
| if predicted_completed == actual_completed: | |
| feedback_type = FeedbackType.CONFIRMATION | |
| severity = FeedbackSeverity.LOW | |
| else: | |
| feedback_type = FeedbackType.CORRECTION | |
| severity = FeedbackSeverity.HIGH | |
| corrected_value = { | |
| "completed": actual_completed, | |
| "actual_duration": actual_outcome.get("duration"), | |
| "actual_stress": actual_outcome.get("stress_level"), | |
| } | |
| return self.collector.collect_feedback( | |
| user_id=user_id, | |
| task_id=task_id, | |
| prediction_id=prediction_id, | |
| feedback_type=feedback_type, | |
| original_prediction=prediction, | |
| corrected_value=corrected_value, | |
| severity=severity, | |
| ) | |
| def ingest_user_rating( | |
| self, | |
| user_id: str, | |
| task_id: str, | |
| prediction_id: str, | |
| prediction: Dict, | |
| rating: int, | |
| comment: Optional[str] = None, | |
| ) -> UserFeedback: | |
| """Ingest explicit user rating""" | |
| severity = FeedbackSeverity.LOW | |
| if rating <= 2: | |
| severity = FeedbackSeverity.HIGH | |
| elif rating <= 3: | |
| severity = FeedbackSeverity.MEDIUM | |
| return self.collector.collect_feedback( | |
| user_id=user_id, | |
| task_id=task_id, | |
| prediction_id=prediction_id, | |
| feedback_type=FeedbackType.RATING, | |
| original_prediction=prediction, | |
| rating=rating, | |
| comment=comment, | |
| severity=severity, | |
| ) | |
| def ingest_rejection( | |
| self, | |
| user_id: str, | |
| task_id: str, | |
| prediction_id: str, | |
| prediction: Dict, | |
| reason: Optional[str] = None, | |
| ) -> UserFeedback: | |
| """Ingest prediction rejection""" | |
| return self.collector.collect_feedback( | |
| user_id=user_id, | |
| task_id=task_id, | |
| prediction_id=prediction_id, | |
| feedback_type=FeedbackType.REJECTION, | |
| original_prediction=prediction, | |
| comment=reason, | |
| severity=FeedbackSeverity.HIGH, | |
| ) | |
| feedback_collector = FeedbackCollector() | |
| feedback_processor = FeedbackProcessor(feedback_collector) | |
| adaptive_learner = AdaptiveLearner(feedback_collector) | |
| feedback_ingestor = FeedbackIngestor(feedback_collector) | |
| __all__ = [ | |
| "FeedbackType", | |
| "FeedbackSeverity", | |
| "UserFeedback", | |
| "FeedbackStats", | |
| "FeedbackCollector", | |
| "FeedbackProcessor", | |
| "AdaptiveLearner", | |
| "FeedbackIngestor", | |
| "feedback_collector", | |
| "feedback_processor", | |
| "adaptive_learner", | |
| "feedback_ingestor", | |
| ] | |