""" Feedback Loop Module for Cognexa ML Service This module provides: - User feedback collection and processing - Model retraining triggers - Prediction correction tracking - Learning from mistakes - Adaptive model updates """ import os import json import logging from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional, Tuple, Any from dataclasses import dataclass, asdict from enum import Enum import numpy as np logger = logging.getLogger(__name__) class FeedbackType(str, Enum): """Types of user feedback""" CORRECTION = "correction" CONFIRMATION = "confirmation" REJECTION = "rejection" RATING = "rating" SUGGESTION = "suggestion" class FeedbackSeverity(str, Enum): """Severity/importance of feedback""" LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical" @dataclass class UserFeedback: """User feedback on a prediction""" feedback_id: str user_id: str task_id: str prediction_id: str feedback_type: FeedbackType severity: FeedbackSeverity original_prediction: Dict[str, Any] corrected_value: Optional[Any] rating: Optional[int] comment: Optional[str] timestamp: str processed: bool = False @dataclass class FeedbackStats: """Statistics on collected feedback""" total_feedback: int corrections: int confirmations: int rejections: int average_rating: float improvement_rate: float most_corrected_features: List[Tuple[str, int]] class FeedbackCollector: """Collects and stores user feedback""" def __init__(self, feedback_dir: str = "feedback"): self.feedback_dir = Path(feedback_dir) self.feedback_dir.mkdir(exist_ok=True) self.feedback_file = self.feedback_dir / "feedback_log.jsonl" self.stats_file = self.feedback_dir / "feedback_stats.json" self.logger = logging.getLogger(__name__) def collect_feedback( self, user_id: str, task_id: str, prediction_id: str, feedback_type: FeedbackType, original_prediction: Dict[str, Any], corrected_value: Optional[Any] = None, rating: Optional[int] = None, comment: Optional[str] = None, severity: FeedbackSeverity = FeedbackSeverity.MEDIUM, ) -> UserFeedback: """Collect user feedback on a prediction""" import uuid feedback_id = str(uuid.uuid4())[:12] feedback = UserFeedback( feedback_id=feedback_id, user_id=user_id, task_id=task_id, prediction_id=prediction_id, feedback_type=feedback_type, severity=severity, original_prediction=original_prediction, corrected_value=corrected_value, rating=rating, comment=comment, timestamp=datetime.now().isoformat(), processed=False, ) self._save_feedback(feedback) self._update_stats(feedback) self.logger.info(f"Feedback collected: {feedback_id} ({feedback_type})") return feedback def _save_feedback(self, feedback: UserFeedback): """Save feedback to log file""" with open(self.feedback_file, "a") as f: f.write(json.dumps(asdict(feedback)) + "\n") def _update_stats(self, feedback: UserFeedback): """Update feedback statistics""" stats = self.load_stats() stats["total_feedback"] += 1 if feedback.feedback_type == FeedbackType.CORRECTION: stats["corrections"] += 1 elif feedback.feedback_type == FeedbackType.CONFIRMATION: stats["confirmations"] += 1 elif feedback.feedback_type == FeedbackType.REJECTION: stats["rejections"] += 1 if feedback.rating is not None: current_avg = stats.get("average_rating", 0.0) rating_count = stats.get("rating_count", 0) stats["average_rating"] = (current_avg * rating_count + feedback.rating) / ( rating_count + 1 ) stats["rating_count"] = rating_count + 1 self._save_stats(stats) def load_stats(self) -> Dict: """Load feedback statistics""" if self.stats_file.exists(): with open(self.stats_file, "r") as f: return json.load(f) return { "total_feedback": 0, "corrections": 0, "confirmations": 0, "rejections": 0, "average_rating": 0.0, "rating_count": 0, "improvement_rate": 0.0, } def _save_stats(self, stats: Dict): """Save feedback statistics""" with open(self.stats_file, "w") as f: json.dump(stats, f, indent=2) def get_feedback_for_task(self, task_id: str) -> List[UserFeedback]: """Get all feedback for a specific task""" feedbacks = [] if self.feedback_file.exists(): with open(self.feedback_file, "r") as f: for line in f: data = json.loads(line) if data["task_id"] == task_id: feedbacks.append(UserFeedback(**data)) return feedbacks def get_unprocessed_feedback(self) -> List[UserFeedback]: """Get all unprocessed feedback""" feedbacks = [] if self.feedback_file.exists(): with open(self.feedback_file, "r") as f: for line in f: data = json.loads(line) if not data.get("processed", False): feedbacks.append(UserFeedback(**data)) return feedbacks def mark_processed(self, feedback_ids: List[str]): """Mark feedback as processed""" lines = [] if self.feedback_file.exists(): with open(self.feedback_file, "r") as f: for line in f: data = json.loads(line) if data["feedback_id"] in feedback_ids: data["processed"] = True lines.append(json.dumps(data) + "\n") with open(self.feedback_file, "w") as f: f.writelines(lines) class FeedbackProcessor: """Processes feedback for model improvement""" def __init__(self, feedback_collector: FeedbackCollector): self.collector = feedback_collector self.logger = logging.getLogger(__name__) def analyze_feedback_patterns(self, days: int = 30) -> Dict: """Analyze patterns in collected feedback""" cutoff = datetime.now() - timedelta(days=days) feedbacks = self._get_recent_feedback(cutoff) if not feedbacks: return {"message": "No feedback in the specified period"} corrections_by_feature = {} correction_magnitudes = [] for fb in feedbacks: if fb.feedback_type == FeedbackType.CORRECTION: orig = fb.original_prediction corr = fb.corrected_value if isinstance(orig, dict) and isinstance(corr, dict): for key in set(orig.keys()) | set(corr.keys()): if key not in corrections_by_feature: corrections_by_feature[key] = 0 corrections_by_feature[key] += 1 if key in orig and key in corr: try: magnitude = abs(float(corr[key]) - float(orig[key])) correction_magnitudes.append((key, magnitude)) except (TypeError, ValueError): pass most_corrected = sorted( corrections_by_feature.items(), key=lambda x: x[1], reverse=True )[:5] avg_correction_by_feature = {} for key, _ in most_corrected: magnitudes = [m for k, m in correction_magnitudes if k == key] if magnitudes: avg_correction_by_feature[key] = np.mean(magnitudes) total = len(feedbacks) corrections = sum( 1 for fb in feedbacks if fb.feedback_type == FeedbackType.CORRECTION ) confirmations = sum( 1 for fb in feedbacks if fb.feedback_type == FeedbackType.CONFIRMATION ) rejections = sum( 1 for fb in feedbacks if fb.feedback_type == FeedbackType.REJECTION ) return { "period_days": days, "total_feedback": total, "corrections": corrections, "confirmations": confirmations, "rejections": rejections, "correction_rate": corrections / total if total > 0 else 0, "confirmation_rate": confirmations / total if total > 0 else 0, "rejection_rate": rejections / total if total > 0 else 0, "most_corrected_features": most_corrected, "average_correction_magnitude": avg_correction_by_feature, "recommendations": self._generate_recommendations( most_corrected, corrections / total if total > 0 else 0 ), } def _get_recent_feedback(self, cutoff: datetime) -> List[UserFeedback]: """Get feedback since cutoff date""" feedbacks = [] if self.collector.feedback_file.exists(): with open(self.collector.feedback_file, "r") as f: for line in f: data = json.loads(line) try: fb_time = datetime.fromisoformat(data["timestamp"]) if fb_time >= cutoff: feedbacks.append(UserFeedback(**data)) except (ValueError, KeyError): continue return feedbacks def _generate_recommendations( self, most_corrected: List[Tuple[str, int]], correction_rate: float ) -> List[str]: """Generate recommendations based on feedback analysis""" recommendations = [] if correction_rate > 0.3: recommendations.append( "High correction rate detected - consider model retraining" ) for feature, count in most_corrected[:3]: if count > 10: recommendations.append( f"Review feature weighting for '{feature}' - frequently corrected" ) if correction_rate < 0.1: recommendations.append("Low correction rate - predictions are accurate") return recommendations def calculate_model_adjustments(self) -> Dict[str, float]: """Calculate feature weight adjustments based on feedback""" unprocessed = self.collector.get_unprocessed_feedback() adjustments = {} for fb in unprocessed: if fb.feedback_type == FeedbackType.CORRECTION: orig = fb.original_prediction corr = fb.corrected_value if isinstance(orig, dict) and isinstance(corr, dict): for key in orig: if key in corr: try: orig_val = float(orig[key]) corr_val = float(corr[key]) error = corr_val - orig_val if key not in adjustments: adjustments[key] = [] adjustments[key].append(error) except (TypeError, ValueError): pass final_adjustments = {} for key, errors in adjustments.items(): if errors: final_adjustments[key] = float(np.mean(errors)) return final_adjustments def should_trigger_retraining(self) -> Tuple[bool, str]: """Determine if model retraining should be triggered""" stats = self.collector.load_stats() patterns = self.analyze_feedback_patterns(days=7) if stats["total_feedback"] < 50: return False, "Insufficient feedback for retraining decision" correction_rate = patterns.get("correction_rate", 0) if correction_rate > 0.4: return ( True, f"High correction rate ({correction_rate:.1%}) - retraining recommended", ) rejection_rate = patterns.get("rejection_rate", 0) if rejection_rate > 0.3: return ( True, f"High rejection rate ({rejection_rate:.1%}) - retraining recommended", ) avg_rating = stats.get("average_rating", 0) if avg_rating > 0 and avg_rating < 3.0: return ( True, f"Low average rating ({avg_rating:.1f}) - retraining recommended", ) return False, "Model performance is acceptable" class AdaptiveLearner: """Implements adaptive learning from feedback""" def __init__(self, feedback_collector: FeedbackCollector): self.collector = feedback_collector self.processor = FeedbackProcessor(feedback_collector) self.logger = logging.getLogger(__name__) self.adaptation_history: List[Dict] = [] def adapt_predictions( self, user_id: str, base_prediction: Dict[str, Any], features: Dict[str, float] ) -> Dict[str, Any]: """Adapt predictions based on user's historical feedback""" user_corrections = self._get_user_correction_patterns(user_id) adapted_prediction = base_prediction.copy() for key, adjustment in user_corrections.items(): if key in adapted_prediction: try: current = float(adapted_prediction[key]) adapted = ( current + adjustment * 0.1 ) # 10% weight to historical adjustment adapted_prediction[key] = adapted except (TypeError, ValueError): pass if "completion_probability" in adapted_prediction: prob = adapted_prediction["completion_probability"] adapted_prediction["confidence_level"] = self._adjust_confidence( prob, user_corrections ) return adapted_prediction def _get_user_correction_patterns(self, user_id: str) -> Dict[str, float]: """Get correction patterns for a specific user""" user_feedbacks = self._get_user_feedback(user_id) corrections = {} for fb in user_feedbacks: if fb.feedback_type == FeedbackType.CORRECTION: orig = fb.original_prediction corr = fb.corrected_value if isinstance(orig, dict) and isinstance(corr, dict): for key in orig: if key in corr: try: error = float(corr[key]) - float(orig[key]) if key not in corrections: corrections[key] = [] corrections[key].append(error) except (TypeError, ValueError): pass patterns = {} for key, errors in corrections.items(): if len(errors) >= 3: patterns[key] = float(np.mean(errors)) return patterns def _get_user_feedback(self, user_id: str) -> List[UserFeedback]: """Get all feedback for a user""" feedbacks = [] if self.collector.feedback_file.exists(): with open(self.collector.feedback_file, "r") as f: for line in f: data = json.loads(line) if data["user_id"] == user_id: feedbacks.append(UserFeedback(**data)) return feedbacks def _adjust_confidence( self, probability: float, corrections: Dict[str, float] ) -> float: """Adjust confidence based on historical correction patterns""" base_confidence = 0.7 if len(corrections) == 0: return base_confidence avg_correction = np.mean([abs(v) for v in corrections.values()]) confidence_penalty = min(0.2, avg_correction * 0.5) adjusted_confidence = base_confidence - confidence_penalty return max(0.4, min(0.95, adjusted_confidence)) def record_adaptation( self, user_id: str, task_id: str, original_prediction: Dict, adapted_prediction: Dict, adjustments: Dict[str, float], ): """Record adaptation for tracking""" adaptation = { "timestamp": datetime.now().isoformat(), "user_id": user_id, "task_id": task_id, "original_prediction": original_prediction, "adapted_prediction": adapted_prediction, "adjustments": adjustments, } self.adaptation_history.append(adaptation) self.logger.info(f"Adaptation recorded for user {user_id}, task {task_id}") class FeedbackIngestor: """Ingests feedback from various sources""" def __init__(self, collector: FeedbackCollector): self.collector = collector self.logger = logging.getLogger(__name__) def ingest_task_completion( self, user_id: str, task_id: str, prediction_id: str, prediction: Dict, actual_outcome: Dict, ) -> UserFeedback: """Ingest feedback from task completion""" predicted_completed = prediction.get("completion_probability", 0.5) > 0.5 actual_completed = actual_outcome.get("completed", False) if predicted_completed == actual_completed: feedback_type = FeedbackType.CONFIRMATION severity = FeedbackSeverity.LOW else: feedback_type = FeedbackType.CORRECTION severity = FeedbackSeverity.HIGH corrected_value = { "completed": actual_completed, "actual_duration": actual_outcome.get("duration"), "actual_stress": actual_outcome.get("stress_level"), } return self.collector.collect_feedback( user_id=user_id, task_id=task_id, prediction_id=prediction_id, feedback_type=feedback_type, original_prediction=prediction, corrected_value=corrected_value, severity=severity, ) def ingest_user_rating( self, user_id: str, task_id: str, prediction_id: str, prediction: Dict, rating: int, comment: Optional[str] = None, ) -> UserFeedback: """Ingest explicit user rating""" severity = FeedbackSeverity.LOW if rating <= 2: severity = FeedbackSeverity.HIGH elif rating <= 3: severity = FeedbackSeverity.MEDIUM return self.collector.collect_feedback( user_id=user_id, task_id=task_id, prediction_id=prediction_id, feedback_type=FeedbackType.RATING, original_prediction=prediction, rating=rating, comment=comment, severity=severity, ) def ingest_rejection( self, user_id: str, task_id: str, prediction_id: str, prediction: Dict, reason: Optional[str] = None, ) -> UserFeedback: """Ingest prediction rejection""" return self.collector.collect_feedback( user_id=user_id, task_id=task_id, prediction_id=prediction_id, feedback_type=FeedbackType.REJECTION, original_prediction=prediction, comment=reason, severity=FeedbackSeverity.HIGH, ) feedback_collector = FeedbackCollector() feedback_processor = FeedbackProcessor(feedback_collector) adaptive_learner = AdaptiveLearner(feedback_collector) feedback_ingestor = FeedbackIngestor(feedback_collector) __all__ = [ "FeedbackType", "FeedbackSeverity", "UserFeedback", "FeedbackStats", "FeedbackCollector", "FeedbackProcessor", "AdaptiveLearner", "FeedbackIngestor", "feedback_collector", "feedback_processor", "adaptive_learner", "feedback_ingestor", ]