Aus_F / services /feedback.py
minhvtt's picture
Upload 15 files
34b2632 verified
"""
Feedback Loop System
Author: AI Generated
Created: 2025-11-24
Purpose: Collect feedback metrics to improve AI models over time
"""
from datetime import datetime
from typing import Dict, Optional
from bson import ObjectId
from database import db
class FeedbackCollector:
"""
Collect feedback on AI outputs for continuous improvement.
"""
def __init__(self):
self.collection = "AIFeedback"
def record_email_engagement(self,
segment_id: str,
user_id: str,
opened: bool = False,
clicked: bool = False,
converted: bool = False,
unsubscribed: bool = False):
"""
Record email engagement metrics.
Used to evaluate email generation quality.
"""
doc = {
"feedback_type": "email_engagement",
"segment_id": ObjectId(segment_id),
"user_id": ObjectId(user_id),
"opened": opened,
"clicked": clicked,
"converted": converted,
"unsubscribed": unsubscribed,
"timestamp": datetime.utcnow()
}
db.get_collection(self.collection).insert_one(doc)
def record_sentiment_correction(self,
analysis_id: str,
original_label: str,
corrected_label: str,
corrected_by: str):
"""
Record manual corrections to sentiment analysis.
Used to fine-tune PhoBERT.
"""
doc = {
"feedback_type": "sentiment_correction",
"analysis_id": ObjectId(analysis_id),
"original_label": original_label,
"corrected_label": corrected_label,
"corrected_by": corrected_by,
"timestamp": datetime.utcnow()
}
db.get_collection(self.collection).insert_one(doc)
def record_segment_feedback(self,
segment_id: str,
user_id: str,
interaction_type: str,
value: Optional[float] = None):
"""
Record user interactions with segment-targeted campaigns.
interaction_type: 'purchase', 'view', 'ignore', etc.
value: revenue/engagement metric
"""
doc = {
"feedback_type": "segment_interaction",
"segment_id": ObjectId(segment_id),
"user_id": ObjectId(user_id),
"interaction_type": interaction_type,
"value": value,
"timestamp": datetime.utcnow()
}
db.get_collection(self.collection).insert_one(doc)
def record_insight_usefulness(self,
insight_report_id: str,
user_id: str,
rating: int,
implemented: bool = False):
"""
Record how useful an insight report was.
rating: 1-5 stars
"""
doc = {
"feedback_type": "insight_rating",
"insight_report_id": ObjectId(insight_report_id),
"user_id": user_id,
"rating": rating,
"implemented": implemented,
"timestamp": datetime.utcnow()
}
db.get_collection(self.collection).insert_one(doc)
def get_email_performance(self, segment_id: str) -> Dict:
"""
Get aggregated email performance for a segment.
"""
pipeline = [
{
"$match": {
"feedback_type": "email_engagement",
"segment_id": ObjectId(segment_id)
}
},
{
"$group": {
"_id": None,
"total_sent": {"$sum": 1},
"opened": {"$sum": {"$cond": ["$opened", 1, 0]}},
"clicked": {"$sum": {"$cond": ["$clicked", 1, 0]}},
"converted": {"$sum": {"$cond": ["$converted", 1, 0]}},
"unsubscribed": {"$sum": {"$cond": ["$unsubscribed", 1, 0]}}
}
}
]
results = list(db.get_collection(self.collection).aggregate(pipeline))
if not results:
return {"error": "No data"}
data = results[0]
total = data["total_sent"]
return {
"total_sent": total,
"open_rate": data["opened"] / total if total > 0 else 0,
"click_rate": data["clicked"] / total if total > 0 else 0,
"conversion_rate": data["converted"] / total if total > 0 else 0,
"unsubscribe_rate": data["unsubscribed"] / total if total > 0 else 0
}
def get_sentiment_accuracy(self) -> Dict:
"""
Calculate sentiment analysis accuracy based on corrections.
"""
corrections = list(db.get_collection(self.collection).find({
"feedback_type": "sentiment_correction"
}))
if not corrections:
return {"error": "No corrections recorded"}
total = len(corrections)
correct = sum(1 for c in corrections if c["original_label"] == c["corrected_label"])
accuracy = correct / total
# Breakdown by label
by_label = {}
for c in corrections:
label = c["original_label"]
if label not in by_label:
by_label[label] = {"total": 0, "correct": 0}
by_label[label]["total"] += 1
if c["original_label"] == c["corrected_label"]:
by_label[label]["correct"] += 1
for label in by_label:
data = by_label[label]
by_label[label]["accuracy"] = data["correct"] / data["total"]
return {
"overall_accuracy": accuracy,
"total_corrections": total,
"by_label": by_label
}
def get_retaining_dataset(self) -> tuple:
"""
Get dataset for retraining sentiment model from corrections.
Returns: (texts, labels)
"""
corrections = list(db.get_collection(self.collection).find({
"feedback_type": "sentiment_correction"
}))
# Fetch original texts
analysis_ids = [c["analysis_id"] for c in corrections]
analyses = {
str(a["_id"]): a
for a in db.sentiment_results.find({"_id": {"$in": analysis_ids}})
}
# Get comment texts
source_ids = [analyses[str(c["analysis_id"])]["source_id"] for c in corrections if str(c["analysis_id"]) in analyses]
comments = {
str(c["_id"]): c.get("CommentText", "")
for c in db.user_comment_post.find({"_id": {"$in": source_ids}})
}
# Build training data
texts = []
labels = []
for c in corrections:
analysis_id_str = str(c["analysis_id"])
if analysis_id_str in analyses:
source_id_str = str(analyses[analysis_id_str]["source_id"])
if source_id_str in comments:
texts.append(comments[source_id_str])
labels.append(c["corrected_label"])
print(f"✓ Built retraining dataset: {len(texts)} samples")
return texts, labels
# Global feedback collector
feedback = FeedbackCollector()