| | """
|
| | Feedback Loop System
|
| | Author: AI Generated
|
| | Created: 2025-11-24
|
| | Purpose: Collect feedback metrics to improve AI models over time
|
| | """
|
| |
|
| | from datetime import datetime
|
| | from typing import Dict, Optional
|
| | from bson import ObjectId
|
| |
|
| | from database import db
|
| |
|
| |
|
| | class FeedbackCollector:
|
| | """
|
| | Collect feedback on AI outputs for continuous improvement.
|
| | """
|
| |
|
| | def __init__(self):
|
| | self.collection = "AIFeedback"
|
| |
|
| | def record_email_engagement(self,
|
| | segment_id: str,
|
| | user_id: str,
|
| | opened: bool = False,
|
| | clicked: bool = False,
|
| | converted: bool = False,
|
| | unsubscribed: bool = False):
|
| | """
|
| | Record email engagement metrics.
|
| | Used to evaluate email generation quality.
|
| | """
|
| | doc = {
|
| | "feedback_type": "email_engagement",
|
| | "segment_id": ObjectId(segment_id),
|
| | "user_id": ObjectId(user_id),
|
| | "opened": opened,
|
| | "clicked": clicked,
|
| | "converted": converted,
|
| | "unsubscribed": unsubscribed,
|
| | "timestamp": datetime.utcnow()
|
| | }
|
| |
|
| | db.get_collection(self.collection).insert_one(doc)
|
| |
|
| | def record_sentiment_correction(self,
|
| | analysis_id: str,
|
| | original_label: str,
|
| | corrected_label: str,
|
| | corrected_by: str):
|
| | """
|
| | Record manual corrections to sentiment analysis.
|
| | Used to fine-tune PhoBERT.
|
| | """
|
| | doc = {
|
| | "feedback_type": "sentiment_correction",
|
| | "analysis_id": ObjectId(analysis_id),
|
| | "original_label": original_label,
|
| | "corrected_label": corrected_label,
|
| | "corrected_by": corrected_by,
|
| | "timestamp": datetime.utcnow()
|
| | }
|
| |
|
| | db.get_collection(self.collection).insert_one(doc)
|
| |
|
| | def record_segment_feedback(self,
|
| | segment_id: str,
|
| | user_id: str,
|
| | interaction_type: str,
|
| | value: Optional[float] = None):
|
| | """
|
| | Record user interactions with segment-targeted campaigns.
|
| |
|
| | interaction_type: 'purchase', 'view', 'ignore', etc.
|
| | value: revenue/engagement metric
|
| | """
|
| | doc = {
|
| | "feedback_type": "segment_interaction",
|
| | "segment_id": ObjectId(segment_id),
|
| | "user_id": ObjectId(user_id),
|
| | "interaction_type": interaction_type,
|
| | "value": value,
|
| | "timestamp": datetime.utcnow()
|
| | }
|
| |
|
| | db.get_collection(self.collection).insert_one(doc)
|
| |
|
| | def record_insight_usefulness(self,
|
| | insight_report_id: str,
|
| | user_id: str,
|
| | rating: int,
|
| | implemented: bool = False):
|
| | """
|
| | Record how useful an insight report was.
|
| | rating: 1-5 stars
|
| | """
|
| | doc = {
|
| | "feedback_type": "insight_rating",
|
| | "insight_report_id": ObjectId(insight_report_id),
|
| | "user_id": user_id,
|
| | "rating": rating,
|
| | "implemented": implemented,
|
| | "timestamp": datetime.utcnow()
|
| | }
|
| |
|
| | db.get_collection(self.collection).insert_one(doc)
|
| |
|
| | def get_email_performance(self, segment_id: str) -> Dict:
|
| | """
|
| | Get aggregated email performance for a segment.
|
| | """
|
| | pipeline = [
|
| | {
|
| | "$match": {
|
| | "feedback_type": "email_engagement",
|
| | "segment_id": ObjectId(segment_id)
|
| | }
|
| | },
|
| | {
|
| | "$group": {
|
| | "_id": None,
|
| | "total_sent": {"$sum": 1},
|
| | "opened": {"$sum": {"$cond": ["$opened", 1, 0]}},
|
| | "clicked": {"$sum": {"$cond": ["$clicked", 1, 0]}},
|
| | "converted": {"$sum": {"$cond": ["$converted", 1, 0]}},
|
| | "unsubscribed": {"$sum": {"$cond": ["$unsubscribed", 1, 0]}}
|
| | }
|
| | }
|
| | ]
|
| |
|
| | results = list(db.get_collection(self.collection).aggregate(pipeline))
|
| |
|
| | if not results:
|
| | return {"error": "No data"}
|
| |
|
| | data = results[0]
|
| | total = data["total_sent"]
|
| |
|
| | return {
|
| | "total_sent": total,
|
| | "open_rate": data["opened"] / total if total > 0 else 0,
|
| | "click_rate": data["clicked"] / total if total > 0 else 0,
|
| | "conversion_rate": data["converted"] / total if total > 0 else 0,
|
| | "unsubscribe_rate": data["unsubscribed"] / total if total > 0 else 0
|
| | }
|
| |
|
| | def get_sentiment_accuracy(self) -> Dict:
|
| | """
|
| | Calculate sentiment analysis accuracy based on corrections.
|
| | """
|
| | corrections = list(db.get_collection(self.collection).find({
|
| | "feedback_type": "sentiment_correction"
|
| | }))
|
| |
|
| | if not corrections:
|
| | return {"error": "No corrections recorded"}
|
| |
|
| | total = len(corrections)
|
| | correct = sum(1 for c in corrections if c["original_label"] == c["corrected_label"])
|
| |
|
| | accuracy = correct / total
|
| |
|
| |
|
| | by_label = {}
|
| | for c in corrections:
|
| | label = c["original_label"]
|
| | if label not in by_label:
|
| | by_label[label] = {"total": 0, "correct": 0}
|
| | by_label[label]["total"] += 1
|
| | if c["original_label"] == c["corrected_label"]:
|
| | by_label[label]["correct"] += 1
|
| |
|
| | for label in by_label:
|
| | data = by_label[label]
|
| | by_label[label]["accuracy"] = data["correct"] / data["total"]
|
| |
|
| | return {
|
| | "overall_accuracy": accuracy,
|
| | "total_corrections": total,
|
| | "by_label": by_label
|
| | }
|
| |
|
| | def get_retaining_dataset(self) -> tuple:
|
| | """
|
| | Get dataset for retraining sentiment model from corrections.
|
| | Returns: (texts, labels)
|
| | """
|
| | corrections = list(db.get_collection(self.collection).find({
|
| | "feedback_type": "sentiment_correction"
|
| | }))
|
| |
|
| |
|
| | analysis_ids = [c["analysis_id"] for c in corrections]
|
| | analyses = {
|
| | str(a["_id"]): a
|
| | for a in db.sentiment_results.find({"_id": {"$in": analysis_ids}})
|
| | }
|
| |
|
| |
|
| | source_ids = [analyses[str(c["analysis_id"])]["source_id"] for c in corrections if str(c["analysis_id"]) in analyses]
|
| | comments = {
|
| | str(c["_id"]): c.get("CommentText", "")
|
| | for c in db.user_comment_post.find({"_id": {"$in": source_ids}})
|
| | }
|
| |
|
| |
|
| | texts = []
|
| | labels = []
|
| |
|
| | for c in corrections:
|
| | analysis_id_str = str(c["analysis_id"])
|
| | if analysis_id_str in analyses:
|
| | source_id_str = str(analyses[analysis_id_str]["source_id"])
|
| | if source_id_str in comments:
|
| | texts.append(comments[source_id_str])
|
| | labels.append(c["corrected_label"])
|
| |
|
| | print(f"✓ Built retraining dataset: {len(texts)} samples")
|
| | return texts, labels
|
| |
|
| |
|
| |
|
| | feedback = FeedbackCollector()
|
| |
|