OpenTriage_AI / services /sentiment_analysis_service.py
KrishnaCosmic's picture
checking changes
6337b71
"""
Sentiment Analysis Service for OpenTriage
Uses local Hugging Face DistilBERT model for fast, offline sentiment analysis
of PR comments. Detects sentiment scores and prominent language patterns.
Features:
- DistilBERT sentiment classification (local, no API calls)
- Keyword-based prominent language detection
- In-memory result caching (10-minute TTL)
- Stage 3 RAG prompt integration-ready
"""
import logging
import time
from typing import Dict, Any, Optional, List, Tuple
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
# Lazy-load transformers (only when needed)
_sentiment_pipeline = None
_cache = {} # {comment_id: {"sentiment": {...}, "timestamp": float}}
CACHE_TTL = 600 # 10 minutes
# Keyword patterns for prominent language detection
LANGUAGE_PATTERNS = {
"technical": ["bug", "error", "crash", "fix", "optimize", "refactor", "api", "database", "performance", "memory", "cpu"],
"positive": ["great", "excellent", "amazing", "love", "perfect", "awesome", "wonderful", "fantastic", "brilliant"],
"negative": ["bad", "horrible", "terrible", "hate", "useless", "broken", "awful", "pathetic", "worst"],
"urgent": ["critical", "urgent", "asap", "immediately", "emergency", "blocker", "must", "breaking"],
"discussion": ["thought", "idea", "suggestion", "question", "wondering", "propose", "consider", "discuss"],
"documentation": ["doc", "readme", "guide", "tutorial", "example", "comment", "explain"],
"testing": ["test", "coverage", "regression", "edge case", "unit test", "integration test", "quality"]
}
def _get_sentiment_pipeline():
"""Lazy-load the sentiment analysis pipeline on first use."""
global _sentiment_pipeline
if _sentiment_pipeline is None:
try:
from transformers import pipeline
logger.info("[Sentiment] Loading DistilBERT sentiment-analysis model...")
_sentiment_pipeline = pipeline(
"sentiment-analysis",
model="distilbert-base-uncased-finetuned-sst-2-english",
device=-1 # CPU mode (set to 0 for GPU if available)
)
logger.info("[Sentiment] ✅ DistilBERT model loaded successfully")
except Exception as e:
logger.error(f"[Sentiment] Failed to load DistilBERT: {e}")
raise
return _sentiment_pipeline
def _detect_prominent_language(text: str) -> str:
"""
Detect prominent language patterns from comment text.
Returns the most relevant category.
"""
if not text:
return "neutral"
text_lower = text.lower()
pattern_scores = {}
for pattern, keywords in LANGUAGE_PATTERNS.items():
# Count keyword matches
matches = sum(1 for keyword in keywords if keyword in text_lower)
if matches > 0:
pattern_scores[pattern] = matches
# Return the category with most matches, or "neutral" if none found
if not pattern_scores:
return "neutral"
return max(pattern_scores.items(), key=lambda x: x[1])[0]
def _is_cache_valid(timestamp: float) -> bool:
"""Check if cached entry is still valid (not expired)."""
return (time.time() - timestamp) < CACHE_TTL
def analyze_comment_sentiment(
comment_id: str,
comment_text: str,
author: str = "unknown",
force_recalc: bool = False
) -> Dict[str, Any]:
"""
Analyze the sentiment of a PR comment using DistilBERT.
Args:
comment_id: Unique comment identifier
comment_text: The comment body text
author: Comment author (for logging)
force_recalc: Force recalculation even if cached
Returns:
Dict with:
- sentiment_label: "POSITIVE" or "NEGATIVE"
- sentiment_score: Confidence score (0.0-1.0)
- prominent_language: Detected language category
- raw_scores: Full model output (all labels with scores)
- cached: Whether result came from cache
- analyzed_at: ISO timestamp
"""
# Check cache first
if not force_recalc and comment_id in _cache:
cache_entry = _cache[comment_id]
if _is_cache_valid(cache_entry["timestamp"]):
logger.info(f"[Sentiment] Cache HIT for comment {comment_id} by {author}")
result = cache_entry["result"].copy()
result["cached"] = True
return result
else:
# Cache expired, remove it
del _cache[comment_id]
logger.info(f"[Sentiment] Cache expired for comment {comment_id}")
logger.info(f"[Sentiment] Analyzing comment {comment_id} by {author}")
try:
# Get sentiment pipeline
pipeline = _get_sentiment_pipeline()
# Truncate very long comments (keep first 512 tokens for DistilBERT)
truncated_text = comment_text[:512] if len(comment_text) > 512 else comment_text
# Run sentiment analysis
results = pipeline(truncated_text)
if not results:
logger.warning(f"[Sentiment] No results from model for comment {comment_id}")
return {
"sentiment_label": "NEUTRAL",
"sentiment_score": 0.5,
"prominent_language": "neutral",
"raw_scores": [],
"cached": False,
"analyzed_at": datetime.now(timezone.utc).isoformat(),
"error": "Model returned no results"
}
# Extract sentiment info
primary_result = results[0]
sentiment_label = primary_result["label"] # "POSITIVE" or "NEGATIVE"
sentiment_score = primary_result["score"] # Confidence (0.0-1.0)
# Detect prominent language patterns
prominent_language = _detect_prominent_language(comment_text)
# Build response
response = {
"sentiment_label": sentiment_label,
"sentiment_score": round(sentiment_score, 3),
"prominent_language": prominent_language,
"raw_scores": [
{
"label": r["label"],
"score": round(r["score"], 3)
} for r in results
],
"cached": False,
"analyzed_at": datetime.now(timezone.utc).isoformat()
}
# Cache the result
_cache[comment_id] = {
"result": response.copy(),
"timestamp": time.time()
}
logger.info(
f"[Sentiment] ✅ Comment {comment_id}: {sentiment_label} "
f"(score: {sentiment_score:.3f}, language: {prominent_language})"
)
return response
except Exception as e:
logger.error(f"[Sentiment] Error analyzing comment {comment_id}: {e}")
return {
"sentiment_label": "NEUTRAL",
"sentiment_score": 0.5,
"prominent_language": "neutral",
"raw_scores": [],
"cached": False,
"analyzed_at": datetime.now(timezone.utc).isoformat(),
"error": str(e)
}
def analyze_batch_comments(comments: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Analyze sentiment for multiple comments at once.
Args:
comments: List of dicts with keys: id, body, author (optional)
Returns:
List of sentiment analysis results
"""
results = []
for comment in comments:
comment_id = comment.get("id", f"comment_{len(results)}")
comment_text = comment.get("body", "")
author = comment.get("author", "unknown")
if not comment_text:
logger.warning(f"Skipping comment {comment_id} with empty body")
continue
result = analyze_comment_sentiment(
comment_id=comment_id,
comment_text=comment_text,
author=author
)
result["comment_id"] = comment_id
result["author"] = author
results.append(result)
return results
def get_sentiment_summary(comments: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Get aggregate sentiment summary from multiple comments.
Useful for Stage 3 prompt: "What's the overall mood of reviewers?"
Args:
comments: List of sentiment analysis results
Returns:
Summary dict with:
- overall_sentiment: Dominant sentiment
- average_score: Mean sentiment score
- positive_count: Number of positive comments
- negative_count: Number of negative comments
- prominent_languages: Top language categories
- mood_description: Human-readable description
"""
if not comments:
return {
"overall_sentiment": "NEUTRAL",
"average_score": 0.5,
"positive_count": 0,
"negative_count": 0,
"prominent_languages": [],
"mood_description": "No comments to analyze"
}
positive_count = sum(1 for c in comments if c.get("sentiment_label") == "POSITIVE")
negative_count = sum(1 for c in comments if c.get("sentiment_label") == "NEGATIVE")
# Calculate average sentiment score
scores = [c.get("sentiment_score", 0.5) for c in comments]
average_score = sum(scores) / len(scores) if scores else 0.5
# Count prominent languages
language_counts = {}
for comment in comments:
lang = comment.get("prominent_language", "neutral")
language_counts[lang] = language_counts.get(lang, 0) + 1
top_languages = sorted(language_counts.items(), key=lambda x: x[1], reverse=True)[:3]
# Determine overall sentiment
if positive_count > negative_count * 1.5:
overall = "POSITIVE"
mood = "Reviewers are enthusiastic and supportive"
elif negative_count > positive_count * 1.5:
overall = "NEGATIVE"
mood = "Reviewers have concerns or objections"
else:
overall = "MIXED"
mood = "Reviewers have mixed feedback with discussion"
return {
"overall_sentiment": overall,
"average_score": round(average_score, 3),
"positive_count": positive_count,
"negative_count": negative_count,
"neutral_count": len(comments) - positive_count - negative_count,
"prominent_languages": [lang for lang, _ in top_languages],
"mood_description": mood,
"total_comments": len(comments)
}
def clear_cache():
"""Clear the sentiment analysis cache."""
global _cache
_cache.clear()
logger.info("[Sentiment] Cache cleared")
def get_cache_stats() -> Dict[str, Any]:
"""Get cache statistics."""
valid_entries = sum(1 for e in _cache.values() if _is_cache_valid(e["timestamp"]))
return {
"total_entries": len(_cache),
"valid_entries": valid_entries,
"expired_entries": len(_cache) - valid_entries,
"cache_ttl_seconds": CACHE_TTL,
"model_loaded": _sentiment_pipeline is not None
}
# Service instance (singleton)
sentiment_analysis_service = type('SentimentAnalysisService', (), {
'analyze_comment': analyze_comment_sentiment,
'analyze_batch': analyze_batch_comments,
'get_summary': get_sentiment_summary,
'clear_cache': clear_cache,
'get_cache_stats': get_cache_stats
})()