""" Feedback Loop System Author: AI Generated Created: 2025-11-24 Purpose: Collect feedback metrics to improve AI models over time """ from datetime import datetime from typing import Dict, Optional from bson import ObjectId from database import db class FeedbackCollector: """ Collect feedback on AI outputs for continuous improvement. """ def __init__(self): self.collection = "AIFeedback" def record_email_engagement(self, segment_id: str, user_id: str, opened: bool = False, clicked: bool = False, converted: bool = False, unsubscribed: bool = False): """ Record email engagement metrics. Used to evaluate email generation quality. """ doc = { "feedback_type": "email_engagement", "segment_id": ObjectId(segment_id), "user_id": ObjectId(user_id), "opened": opened, "clicked": clicked, "converted": converted, "unsubscribed": unsubscribed, "timestamp": datetime.utcnow() } db.get_collection(self.collection).insert_one(doc) def record_sentiment_correction(self, analysis_id: str, original_label: str, corrected_label: str, corrected_by: str): """ Record manual corrections to sentiment analysis. Used to fine-tune PhoBERT. """ doc = { "feedback_type": "sentiment_correction", "analysis_id": ObjectId(analysis_id), "original_label": original_label, "corrected_label": corrected_label, "corrected_by": corrected_by, "timestamp": datetime.utcnow() } db.get_collection(self.collection).insert_one(doc) def record_segment_feedback(self, segment_id: str, user_id: str, interaction_type: str, value: Optional[float] = None): """ Record user interactions with segment-targeted campaigns. interaction_type: 'purchase', 'view', 'ignore', etc. value: revenue/engagement metric """ doc = { "feedback_type": "segment_interaction", "segment_id": ObjectId(segment_id), "user_id": ObjectId(user_id), "interaction_type": interaction_type, "value": value, "timestamp": datetime.utcnow() } db.get_collection(self.collection).insert_one(doc) def record_insight_usefulness(self, insight_report_id: str, user_id: str, rating: int, implemented: bool = False): """ Record how useful an insight report was. rating: 1-5 stars """ doc = { "feedback_type": "insight_rating", "insight_report_id": ObjectId(insight_report_id), "user_id": user_id, "rating": rating, "implemented": implemented, "timestamp": datetime.utcnow() } db.get_collection(self.collection).insert_one(doc) def get_email_performance(self, segment_id: str) -> Dict: """ Get aggregated email performance for a segment. """ pipeline = [ { "$match": { "feedback_type": "email_engagement", "segment_id": ObjectId(segment_id) } }, { "$group": { "_id": None, "total_sent": {"$sum": 1}, "opened": {"$sum": {"$cond": ["$opened", 1, 0]}}, "clicked": {"$sum": {"$cond": ["$clicked", 1, 0]}}, "converted": {"$sum": {"$cond": ["$converted", 1, 0]}}, "unsubscribed": {"$sum": {"$cond": ["$unsubscribed", 1, 0]}} } } ] results = list(db.get_collection(self.collection).aggregate(pipeline)) if not results: return {"error": "No data"} data = results[0] total = data["total_sent"] return { "total_sent": total, "open_rate": data["opened"] / total if total > 0 else 0, "click_rate": data["clicked"] / total if total > 0 else 0, "conversion_rate": data["converted"] / total if total > 0 else 0, "unsubscribe_rate": data["unsubscribed"] / total if total > 0 else 0 } def get_sentiment_accuracy(self) -> Dict: """ Calculate sentiment analysis accuracy based on corrections. """ corrections = list(db.get_collection(self.collection).find({ "feedback_type": "sentiment_correction" })) if not corrections: return {"error": "No corrections recorded"} total = len(corrections) correct = sum(1 for c in corrections if c["original_label"] == c["corrected_label"]) accuracy = correct / total # Breakdown by label by_label = {} for c in corrections: label = c["original_label"] if label not in by_label: by_label[label] = {"total": 0, "correct": 0} by_label[label]["total"] += 1 if c["original_label"] == c["corrected_label"]: by_label[label]["correct"] += 1 for label in by_label: data = by_label[label] by_label[label]["accuracy"] = data["correct"] / data["total"] return { "overall_accuracy": accuracy, "total_corrections": total, "by_label": by_label } def get_retaining_dataset(self) -> tuple: """ Get dataset for retraining sentiment model from corrections. Returns: (texts, labels) """ corrections = list(db.get_collection(self.collection).find({ "feedback_type": "sentiment_correction" })) # Fetch original texts analysis_ids = [c["analysis_id"] for c in corrections] analyses = { str(a["_id"]): a for a in db.sentiment_results.find({"_id": {"$in": analysis_ids}}) } # Get comment texts source_ids = [analyses[str(c["analysis_id"])]["source_id"] for c in corrections if str(c["analysis_id"]) in analyses] comments = { str(c["_id"]): c.get("CommentText", "") for c in db.user_comment_post.find({"_id": {"$in": source_ids}}) } # Build training data texts = [] labels = [] for c in corrections: analysis_id_str = str(c["analysis_id"]) if analysis_id_str in analyses: source_id_str = str(analyses[analysis_id_str]["source_id"]) if source_id_str in comments: texts.append(comments[source_id_str]) labels.append(c["corrected_label"]) print(f"✓ Built retraining dataset: {len(texts)} samples") return texts, labels # Global feedback collector feedback = FeedbackCollector()