rag-api-node-1 / src /infrastructure /adapters /feedback_tracker.py
Peterase's picture
feat: Add query enhancements and flexible prompting (v2.1)
6246bba
"""
User Feedback Tracking System
Tracks user feedback on search results for continuous improvement:
- Thumbs up/down on answers
- Relevance ratings on sources
- Intent classification accuracy
- Search strategy effectiveness
Stores feedback in ClickHouse for analysis and model improvement.
"""
import logging
from typing import Dict, List, Any, Optional
from datetime import datetime
from dataclasses import dataclass, asdict
import json
logger = logging.getLogger(__name__)
@dataclass
class FeedbackEvent:
"""User feedback event"""
# Identifiers
session_id: str
query_id: str
user_id: Optional[int]
# Query info
query: str
expanded_query: Optional[str]
# Classification info
intent_classified: str
intent_confidence: float
intent_method: str
# Search info
search_strategy: str
live_results_count: int
db_results_count: int
total_sources: int
# Feedback
feedback_type: str # "thumbs_up", "thumbs_down", "source_rating", "intent_correction"
feedback_value: Any # True/False for thumbs, 1-5 for rating, corrected intent for correction
feedback_comment: Optional[str]
# Metadata
timestamp: str
response_time_ms: float
cache_hit: bool
class FeedbackTracker:
"""
Track and store user feedback for continuous improvement.
Features:
- Multiple feedback types (thumbs, ratings, corrections)
- ClickHouse storage for analytics
- Async logging (non-blocking)
- Aggregation and reporting
"""
def __init__(self, analytics_db=None):
"""
Initialize feedback tracker.
Args:
analytics_db: ClickHouse analytics database adapter
"""
self.analytics_db = analytics_db
self._ensure_table_exists()
def _ensure_table_exists(self):
"""Create feedback table if it doesn't exist"""
if not self.analytics_db:
return
try:
create_table_query = """
CREATE TABLE IF NOT EXISTS user_feedback (
session_id String,
query_id String,
user_id Nullable(Int32),
query String,
expanded_query Nullable(String),
intent_classified String,
intent_confidence Float32,
intent_method String,
search_strategy String,
live_results_count Int32,
db_results_count Int32,
total_sources Int32,
feedback_type String,
feedback_value String,
feedback_comment Nullable(String),
timestamp DateTime,
response_time_ms Float32,
cache_hit UInt8
) ENGINE = MergeTree()
ORDER BY (timestamp, session_id)
"""
self.analytics_db.execute(create_table_query)
logger.info("βœ… Feedback table ensured")
except Exception as e:
logger.error(f"Failed to create feedback table: {e}")
def record_feedback(
self,
session_id: str,
query: str,
feedback_type: str,
feedback_value: Any,
query_metadata: Dict[str, Any],
feedback_comment: Optional[str] = None,
user_id: Optional[int] = None
):
"""
Record user feedback.
Args:
session_id: User session ID
query: Original query
feedback_type: Type of feedback (thumbs_up, thumbs_down, etc.)
feedback_value: Feedback value
query_metadata: Metadata about the query and response
feedback_comment: Optional comment from user
user_id: Optional user ID
"""
try:
# Create feedback event
event = FeedbackEvent(
session_id=session_id,
query_id=query_metadata.get("query_id", f"{session_id}_{int(datetime.utcnow().timestamp())}"),
user_id=user_id,
query=query,
expanded_query=query_metadata.get("expanded_query"),
intent_classified=query_metadata.get("intent", "UNKNOWN"),
intent_confidence=query_metadata.get("intent_confidence", 0.0),
intent_method=query_metadata.get("intent_method", "unknown"),
search_strategy=query_metadata.get("search_strategy", "unknown"),
live_results_count=query_metadata.get("live_results_count", 0),
db_results_count=query_metadata.get("db_results_count", 0),
total_sources=query_metadata.get("total_sources", 0),
feedback_type=feedback_type,
feedback_value=str(feedback_value),
feedback_comment=feedback_comment,
timestamp=datetime.utcnow().isoformat(),
response_time_ms=query_metadata.get("response_time_ms", 0.0),
cache_hit=query_metadata.get("cache_hit", False)
)
# Store in ClickHouse
if self.analytics_db:
self._store_feedback(event)
# Log feedback
logger.info(
f"Feedback recorded: {feedback_type}={feedback_value} "
f"for query='{query}' (intent={event.intent_classified})"
)
except Exception as e:
logger.error(f"Failed to record feedback: {e}")
def _store_feedback(self, event: FeedbackEvent):
"""Store feedback event in ClickHouse"""
try:
insert_query = """
INSERT INTO user_feedback (
session_id, query_id, user_id,
query, expanded_query,
intent_classified, intent_confidence, intent_method,
search_strategy, live_results_count, db_results_count, total_sources,
feedback_type, feedback_value, feedback_comment,
timestamp, response_time_ms, cache_hit
) VALUES
"""
values = (
event.session_id,
event.query_id,
event.user_id,
event.query,
event.expanded_query,
event.intent_classified,
event.intent_confidence,
event.intent_method,
event.search_strategy,
event.live_results_count,
event.db_results_count,
event.total_sources,
event.feedback_type,
event.feedback_value,
event.feedback_comment,
event.timestamp,
event.response_time_ms,
1 if event.cache_hit else 0
)
self.analytics_db.execute(insert_query, [values])
except Exception as e:
logger.error(f"Failed to store feedback in ClickHouse: {e}")
def get_feedback_stats(self, days: int = 7) -> Dict[str, Any]:
"""
Get feedback statistics for the last N days.
Args:
days: Number of days to analyze
Returns:
Dictionary with feedback statistics
"""
if not self.analytics_db:
return {}
try:
query = f"""
SELECT
feedback_type,
COUNT(*) as count,
AVG(intent_confidence) as avg_confidence,
AVG(response_time_ms) as avg_response_time,
SUM(cache_hit) / COUNT(*) as cache_hit_rate
FROM user_feedback
WHERE timestamp >= now() - INTERVAL {days} DAY
GROUP BY feedback_type
ORDER BY count DESC
"""
results = self.analytics_db.query(query)
stats = {
"total_feedback": sum(r["count"] for r in results),
"by_type": {
r["feedback_type"]: {
"count": r["count"],
"avg_confidence": r["avg_confidence"],
"avg_response_time": r["avg_response_time"],
"cache_hit_rate": r["cache_hit_rate"]
}
for r in results
},
"period_days": days
}
return stats
except Exception as e:
logger.error(f"Failed to get feedback stats: {e}")
return {}
def get_intent_accuracy(self, days: int = 7) -> Dict[str, Any]:
"""
Get intent classification accuracy based on user corrections.
Args:
days: Number of days to analyze
Returns:
Dictionary with accuracy metrics
"""
if not self.analytics_db:
return {}
try:
query = f"""
SELECT
intent_classified,
COUNT(*) as total,
SUM(CASE WHEN feedback_type = 'intent_correction' THEN 1 ELSE 0 END) as corrections,
AVG(intent_confidence) as avg_confidence
FROM user_feedback
WHERE timestamp >= now() - INTERVAL {days} DAY
GROUP BY intent_classified
ORDER BY total DESC
"""
results = self.analytics_db.query(query)
accuracy = {
"by_intent": {
r["intent_classified"]: {
"total": r["total"],
"corrections": r["corrections"],
"accuracy": 1.0 - (r["corrections"] / r["total"]) if r["total"] > 0 else 0.0,
"avg_confidence": r["avg_confidence"]
}
for r in results
},
"period_days": days
}
return accuracy
except Exception as e:
logger.error(f"Failed to get intent accuracy: {e}")
return {}
def get_low_confidence_queries(self, threshold: float = 0.7, limit: int = 100) -> List[Dict[str, Any]]:
"""
Get queries with low intent classification confidence.
Args:
threshold: Confidence threshold (queries below this)
limit: Maximum number of queries to return
Returns:
List of low-confidence queries
"""
if not self.analytics_db:
return []
try:
query = f"""
SELECT
query,
intent_classified,
intent_confidence,
intent_method,
COUNT(*) as occurrences
FROM user_feedback
WHERE intent_confidence < {threshold}
GROUP BY query, intent_classified, intent_confidence, intent_method
ORDER BY occurrences DESC, intent_confidence ASC
LIMIT {limit}
"""
results = self.analytics_db.query(query)
return results
except Exception as e:
logger.error(f"Failed to get low confidence queries: {e}")
return []
# ═══════════════════════════════════════════════════════════════════════════
# SINGLETON INSTANCE
# ═══════════════════════════════════════════════════════════════════════════
# Will be initialized with dependencies in main.py
feedback_tracker: Optional[FeedbackTracker] = None
def initialize_feedback_tracker(analytics_db=None):
"""Initialize global feedback tracker instance"""
global feedback_tracker
feedback_tracker = FeedbackTracker(analytics_db)
logger.info("Feedback tracker initialized")