SPG_ML / feedback.py
meetmendapara's picture
Initial commit for ML space
df31aa1
"""
Feedback Loop Module for Cognexa ML Service
This module provides:
- User feedback collection and processing
- Model retraining triggers
- Prediction correction tracking
- Learning from mistakes
- Adaptive model updates
"""
import os
import json
import logging
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass, asdict
from enum import Enum
import numpy as np
logger = logging.getLogger(__name__)
class FeedbackType(str, Enum):
"""Types of user feedback"""
CORRECTION = "correction"
CONFIRMATION = "confirmation"
REJECTION = "rejection"
RATING = "rating"
SUGGESTION = "suggestion"
class FeedbackSeverity(str, Enum):
"""Severity/importance of feedback"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class UserFeedback:
"""User feedback on a prediction"""
feedback_id: str
user_id: str
task_id: str
prediction_id: str
feedback_type: FeedbackType
severity: FeedbackSeverity
original_prediction: Dict[str, Any]
corrected_value: Optional[Any]
rating: Optional[int]
comment: Optional[str]
timestamp: str
processed: bool = False
@dataclass
class FeedbackStats:
"""Statistics on collected feedback"""
total_feedback: int
corrections: int
confirmations: int
rejections: int
average_rating: float
improvement_rate: float
most_corrected_features: List[Tuple[str, int]]
class FeedbackCollector:
"""Collects and stores user feedback"""
def __init__(self, feedback_dir: str = "feedback"):
self.feedback_dir = Path(feedback_dir)
self.feedback_dir.mkdir(exist_ok=True)
self.feedback_file = self.feedback_dir / "feedback_log.jsonl"
self.stats_file = self.feedback_dir / "feedback_stats.json"
self.logger = logging.getLogger(__name__)
def collect_feedback(
self,
user_id: str,
task_id: str,
prediction_id: str,
feedback_type: FeedbackType,
original_prediction: Dict[str, Any],
corrected_value: Optional[Any] = None,
rating: Optional[int] = None,
comment: Optional[str] = None,
severity: FeedbackSeverity = FeedbackSeverity.MEDIUM,
) -> UserFeedback:
"""Collect user feedback on a prediction"""
import uuid
feedback_id = str(uuid.uuid4())[:12]
feedback = UserFeedback(
feedback_id=feedback_id,
user_id=user_id,
task_id=task_id,
prediction_id=prediction_id,
feedback_type=feedback_type,
severity=severity,
original_prediction=original_prediction,
corrected_value=corrected_value,
rating=rating,
comment=comment,
timestamp=datetime.now().isoformat(),
processed=False,
)
self._save_feedback(feedback)
self._update_stats(feedback)
self.logger.info(f"Feedback collected: {feedback_id} ({feedback_type})")
return feedback
def _save_feedback(self, feedback: UserFeedback):
"""Save feedback to log file"""
with open(self.feedback_file, "a") as f:
f.write(json.dumps(asdict(feedback)) + "\n")
def _update_stats(self, feedback: UserFeedback):
"""Update feedback statistics"""
stats = self.load_stats()
stats["total_feedback"] += 1
if feedback.feedback_type == FeedbackType.CORRECTION:
stats["corrections"] += 1
elif feedback.feedback_type == FeedbackType.CONFIRMATION:
stats["confirmations"] += 1
elif feedback.feedback_type == FeedbackType.REJECTION:
stats["rejections"] += 1
if feedback.rating is not None:
current_avg = stats.get("average_rating", 0.0)
rating_count = stats.get("rating_count", 0)
stats["average_rating"] = (current_avg * rating_count + feedback.rating) / (
rating_count + 1
)
stats["rating_count"] = rating_count + 1
self._save_stats(stats)
def load_stats(self) -> Dict:
"""Load feedback statistics"""
if self.stats_file.exists():
with open(self.stats_file, "r") as f:
return json.load(f)
return {
"total_feedback": 0,
"corrections": 0,
"confirmations": 0,
"rejections": 0,
"average_rating": 0.0,
"rating_count": 0,
"improvement_rate": 0.0,
}
def _save_stats(self, stats: Dict):
"""Save feedback statistics"""
with open(self.stats_file, "w") as f:
json.dump(stats, f, indent=2)
def get_feedback_for_task(self, task_id: str) -> List[UserFeedback]:
"""Get all feedback for a specific task"""
feedbacks = []
if self.feedback_file.exists():
with open(self.feedback_file, "r") as f:
for line in f:
data = json.loads(line)
if data["task_id"] == task_id:
feedbacks.append(UserFeedback(**data))
return feedbacks
def get_unprocessed_feedback(self) -> List[UserFeedback]:
"""Get all unprocessed feedback"""
feedbacks = []
if self.feedback_file.exists():
with open(self.feedback_file, "r") as f:
for line in f:
data = json.loads(line)
if not data.get("processed", False):
feedbacks.append(UserFeedback(**data))
return feedbacks
def mark_processed(self, feedback_ids: List[str]):
"""Mark feedback as processed"""
lines = []
if self.feedback_file.exists():
with open(self.feedback_file, "r") as f:
for line in f:
data = json.loads(line)
if data["feedback_id"] in feedback_ids:
data["processed"] = True
lines.append(json.dumps(data) + "\n")
with open(self.feedback_file, "w") as f:
f.writelines(lines)
class FeedbackProcessor:
"""Processes feedback for model improvement"""
def __init__(self, feedback_collector: FeedbackCollector):
self.collector = feedback_collector
self.logger = logging.getLogger(__name__)
def analyze_feedback_patterns(self, days: int = 30) -> Dict:
"""Analyze patterns in collected feedback"""
cutoff = datetime.now() - timedelta(days=days)
feedbacks = self._get_recent_feedback(cutoff)
if not feedbacks:
return {"message": "No feedback in the specified period"}
corrections_by_feature = {}
correction_magnitudes = []
for fb in feedbacks:
if fb.feedback_type == FeedbackType.CORRECTION:
orig = fb.original_prediction
corr = fb.corrected_value
if isinstance(orig, dict) and isinstance(corr, dict):
for key in set(orig.keys()) | set(corr.keys()):
if key not in corrections_by_feature:
corrections_by_feature[key] = 0
corrections_by_feature[key] += 1
if key in orig and key in corr:
try:
magnitude = abs(float(corr[key]) - float(orig[key]))
correction_magnitudes.append((key, magnitude))
except (TypeError, ValueError):
pass
most_corrected = sorted(
corrections_by_feature.items(), key=lambda x: x[1], reverse=True
)[:5]
avg_correction_by_feature = {}
for key, _ in most_corrected:
magnitudes = [m for k, m in correction_magnitudes if k == key]
if magnitudes:
avg_correction_by_feature[key] = np.mean(magnitudes)
total = len(feedbacks)
corrections = sum(
1 for fb in feedbacks if fb.feedback_type == FeedbackType.CORRECTION
)
confirmations = sum(
1 for fb in feedbacks if fb.feedback_type == FeedbackType.CONFIRMATION
)
rejections = sum(
1 for fb in feedbacks if fb.feedback_type == FeedbackType.REJECTION
)
return {
"period_days": days,
"total_feedback": total,
"corrections": corrections,
"confirmations": confirmations,
"rejections": rejections,
"correction_rate": corrections / total if total > 0 else 0,
"confirmation_rate": confirmations / total if total > 0 else 0,
"rejection_rate": rejections / total if total > 0 else 0,
"most_corrected_features": most_corrected,
"average_correction_magnitude": avg_correction_by_feature,
"recommendations": self._generate_recommendations(
most_corrected, corrections / total if total > 0 else 0
),
}
def _get_recent_feedback(self, cutoff: datetime) -> List[UserFeedback]:
"""Get feedback since cutoff date"""
feedbacks = []
if self.collector.feedback_file.exists():
with open(self.collector.feedback_file, "r") as f:
for line in f:
data = json.loads(line)
try:
fb_time = datetime.fromisoformat(data["timestamp"])
if fb_time >= cutoff:
feedbacks.append(UserFeedback(**data))
except (ValueError, KeyError):
continue
return feedbacks
def _generate_recommendations(
self, most_corrected: List[Tuple[str, int]], correction_rate: float
) -> List[str]:
"""Generate recommendations based on feedback analysis"""
recommendations = []
if correction_rate > 0.3:
recommendations.append(
"High correction rate detected - consider model retraining"
)
for feature, count in most_corrected[:3]:
if count > 10:
recommendations.append(
f"Review feature weighting for '{feature}' - frequently corrected"
)
if correction_rate < 0.1:
recommendations.append("Low correction rate - predictions are accurate")
return recommendations
def calculate_model_adjustments(self) -> Dict[str, float]:
"""Calculate feature weight adjustments based on feedback"""
unprocessed = self.collector.get_unprocessed_feedback()
adjustments = {}
for fb in unprocessed:
if fb.feedback_type == FeedbackType.CORRECTION:
orig = fb.original_prediction
corr = fb.corrected_value
if isinstance(orig, dict) and isinstance(corr, dict):
for key in orig:
if key in corr:
try:
orig_val = float(orig[key])
corr_val = float(corr[key])
error = corr_val - orig_val
if key not in adjustments:
adjustments[key] = []
adjustments[key].append(error)
except (TypeError, ValueError):
pass
final_adjustments = {}
for key, errors in adjustments.items():
if errors:
final_adjustments[key] = float(np.mean(errors))
return final_adjustments
def should_trigger_retraining(self) -> Tuple[bool, str]:
"""Determine if model retraining should be triggered"""
stats = self.collector.load_stats()
patterns = self.analyze_feedback_patterns(days=7)
if stats["total_feedback"] < 50:
return False, "Insufficient feedback for retraining decision"
correction_rate = patterns.get("correction_rate", 0)
if correction_rate > 0.4:
return (
True,
f"High correction rate ({correction_rate:.1%}) - retraining recommended",
)
rejection_rate = patterns.get("rejection_rate", 0)
if rejection_rate > 0.3:
return (
True,
f"High rejection rate ({rejection_rate:.1%}) - retraining recommended",
)
avg_rating = stats.get("average_rating", 0)
if avg_rating > 0 and avg_rating < 3.0:
return (
True,
f"Low average rating ({avg_rating:.1f}) - retraining recommended",
)
return False, "Model performance is acceptable"
class AdaptiveLearner:
"""Implements adaptive learning from feedback"""
def __init__(self, feedback_collector: FeedbackCollector):
self.collector = feedback_collector
self.processor = FeedbackProcessor(feedback_collector)
self.logger = logging.getLogger(__name__)
self.adaptation_history: List[Dict] = []
def adapt_predictions(
self, user_id: str, base_prediction: Dict[str, Any], features: Dict[str, float]
) -> Dict[str, Any]:
"""Adapt predictions based on user's historical feedback"""
user_corrections = self._get_user_correction_patterns(user_id)
adapted_prediction = base_prediction.copy()
for key, adjustment in user_corrections.items():
if key in adapted_prediction:
try:
current = float(adapted_prediction[key])
adapted = (
current + adjustment * 0.1
) # 10% weight to historical adjustment
adapted_prediction[key] = adapted
except (TypeError, ValueError):
pass
if "completion_probability" in adapted_prediction:
prob = adapted_prediction["completion_probability"]
adapted_prediction["confidence_level"] = self._adjust_confidence(
prob, user_corrections
)
return adapted_prediction
def _get_user_correction_patterns(self, user_id: str) -> Dict[str, float]:
"""Get correction patterns for a specific user"""
user_feedbacks = self._get_user_feedback(user_id)
corrections = {}
for fb in user_feedbacks:
if fb.feedback_type == FeedbackType.CORRECTION:
orig = fb.original_prediction
corr = fb.corrected_value
if isinstance(orig, dict) and isinstance(corr, dict):
for key in orig:
if key in corr:
try:
error = float(corr[key]) - float(orig[key])
if key not in corrections:
corrections[key] = []
corrections[key].append(error)
except (TypeError, ValueError):
pass
patterns = {}
for key, errors in corrections.items():
if len(errors) >= 3:
patterns[key] = float(np.mean(errors))
return patterns
def _get_user_feedback(self, user_id: str) -> List[UserFeedback]:
"""Get all feedback for a user"""
feedbacks = []
if self.collector.feedback_file.exists():
with open(self.collector.feedback_file, "r") as f:
for line in f:
data = json.loads(line)
if data["user_id"] == user_id:
feedbacks.append(UserFeedback(**data))
return feedbacks
def _adjust_confidence(
self, probability: float, corrections: Dict[str, float]
) -> float:
"""Adjust confidence based on historical correction patterns"""
base_confidence = 0.7
if len(corrections) == 0:
return base_confidence
avg_correction = np.mean([abs(v) for v in corrections.values()])
confidence_penalty = min(0.2, avg_correction * 0.5)
adjusted_confidence = base_confidence - confidence_penalty
return max(0.4, min(0.95, adjusted_confidence))
def record_adaptation(
self,
user_id: str,
task_id: str,
original_prediction: Dict,
adapted_prediction: Dict,
adjustments: Dict[str, float],
):
"""Record adaptation for tracking"""
adaptation = {
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"task_id": task_id,
"original_prediction": original_prediction,
"adapted_prediction": adapted_prediction,
"adjustments": adjustments,
}
self.adaptation_history.append(adaptation)
self.logger.info(f"Adaptation recorded for user {user_id}, task {task_id}")
class FeedbackIngestor:
"""Ingests feedback from various sources"""
def __init__(self, collector: FeedbackCollector):
self.collector = collector
self.logger = logging.getLogger(__name__)
def ingest_task_completion(
self,
user_id: str,
task_id: str,
prediction_id: str,
prediction: Dict,
actual_outcome: Dict,
) -> UserFeedback:
"""Ingest feedback from task completion"""
predicted_completed = prediction.get("completion_probability", 0.5) > 0.5
actual_completed = actual_outcome.get("completed", False)
if predicted_completed == actual_completed:
feedback_type = FeedbackType.CONFIRMATION
severity = FeedbackSeverity.LOW
else:
feedback_type = FeedbackType.CORRECTION
severity = FeedbackSeverity.HIGH
corrected_value = {
"completed": actual_completed,
"actual_duration": actual_outcome.get("duration"),
"actual_stress": actual_outcome.get("stress_level"),
}
return self.collector.collect_feedback(
user_id=user_id,
task_id=task_id,
prediction_id=prediction_id,
feedback_type=feedback_type,
original_prediction=prediction,
corrected_value=corrected_value,
severity=severity,
)
def ingest_user_rating(
self,
user_id: str,
task_id: str,
prediction_id: str,
prediction: Dict,
rating: int,
comment: Optional[str] = None,
) -> UserFeedback:
"""Ingest explicit user rating"""
severity = FeedbackSeverity.LOW
if rating <= 2:
severity = FeedbackSeverity.HIGH
elif rating <= 3:
severity = FeedbackSeverity.MEDIUM
return self.collector.collect_feedback(
user_id=user_id,
task_id=task_id,
prediction_id=prediction_id,
feedback_type=FeedbackType.RATING,
original_prediction=prediction,
rating=rating,
comment=comment,
severity=severity,
)
def ingest_rejection(
self,
user_id: str,
task_id: str,
prediction_id: str,
prediction: Dict,
reason: Optional[str] = None,
) -> UserFeedback:
"""Ingest prediction rejection"""
return self.collector.collect_feedback(
user_id=user_id,
task_id=task_id,
prediction_id=prediction_id,
feedback_type=FeedbackType.REJECTION,
original_prediction=prediction,
comment=reason,
severity=FeedbackSeverity.HIGH,
)
feedback_collector = FeedbackCollector()
feedback_processor = FeedbackProcessor(feedback_collector)
adaptive_learner = AdaptiveLearner(feedback_collector)
feedback_ingestor = FeedbackIngestor(feedback_collector)
__all__ = [
"FeedbackType",
"FeedbackSeverity",
"UserFeedback",
"FeedbackStats",
"FeedbackCollector",
"FeedbackProcessor",
"AdaptiveLearner",
"FeedbackIngestor",
"feedback_collector",
"feedback_processor",
"adaptive_learner",
"feedback_ingestor",
]