File size: 11,402 Bytes
6337b71
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
"""
Sentiment Analysis Service for OpenTriage

Uses local Hugging Face DistilBERT model for fast, offline sentiment analysis
of PR comments. Detects sentiment scores and prominent language patterns.

Features:
- DistilBERT sentiment classification (local, no API calls)
- Keyword-based prominent language detection
- In-memory result caching (10-minute TTL)
- Stage 3 RAG prompt integration-ready
"""

import logging
import time
from typing import Dict, Any, Optional, List, Tuple
from datetime import datetime, timezone

logger = logging.getLogger(__name__)

# Lazy-load transformers (only when needed)
_sentiment_pipeline = None
_cache = {}  # {comment_id: {"sentiment": {...}, "timestamp": float}}
CACHE_TTL = 600  # 10 minutes

# Keyword patterns for prominent language detection
LANGUAGE_PATTERNS = {
    "technical": ["bug", "error", "crash", "fix", "optimize", "refactor", "api", "database", "performance", "memory", "cpu"],
    "positive": ["great", "excellent", "amazing", "love", "perfect", "awesome", "wonderful", "fantastic", "brilliant"],
    "negative": ["bad", "horrible", "terrible", "hate", "useless", "broken", "awful", "pathetic", "worst"],
    "urgent": ["critical", "urgent", "asap", "immediately", "emergency", "blocker", "must", "breaking"],
    "discussion": ["thought", "idea", "suggestion", "question", "wondering", "propose", "consider", "discuss"],
    "documentation": ["doc", "readme", "guide", "tutorial", "example", "comment", "explain"],
    "testing": ["test", "coverage", "regression", "edge case", "unit test", "integration test", "quality"]
}


def _get_sentiment_pipeline():
    """Lazy-load the sentiment analysis pipeline on first use."""
    global _sentiment_pipeline
    
    if _sentiment_pipeline is None:
        try:
            from transformers import pipeline
            logger.info("[Sentiment] Loading DistilBERT sentiment-analysis model...")
            _sentiment_pipeline = pipeline(
                "sentiment-analysis",
                model="distilbert-base-uncased-finetuned-sst-2-english",
                device=-1  # CPU mode (set to 0 for GPU if available)
            )
            logger.info("[Sentiment] ✅ DistilBERT model loaded successfully")
        except Exception as e:
            logger.error(f"[Sentiment] Failed to load DistilBERT: {e}")
            raise
    
    return _sentiment_pipeline


def _detect_prominent_language(text: str) -> str:
    """
    Detect prominent language patterns from comment text.
    Returns the most relevant category.
    """
    if not text:
        return "neutral"
    
    text_lower = text.lower()
    pattern_scores = {}
    
    for pattern, keywords in LANGUAGE_PATTERNS.items():
        # Count keyword matches
        matches = sum(1 for keyword in keywords if keyword in text_lower)
        if matches > 0:
            pattern_scores[pattern] = matches
    
    # Return the category with most matches, or "neutral" if none found
    if not pattern_scores:
        return "neutral"
    
    return max(pattern_scores.items(), key=lambda x: x[1])[0]


def _is_cache_valid(timestamp: float) -> bool:
    """Check if cached entry is still valid (not expired)."""
    return (time.time() - timestamp) < CACHE_TTL


def analyze_comment_sentiment(
    comment_id: str,
    comment_text: str,
    author: str = "unknown",
    force_recalc: bool = False
) -> Dict[str, Any]:
    """
    Analyze the sentiment of a PR comment using DistilBERT.
    
    Args:
        comment_id: Unique comment identifier
        comment_text: The comment body text
        author: Comment author (for logging)
        force_recalc: Force recalculation even if cached
        
    Returns:
        Dict with:
        - sentiment_label: "POSITIVE" or "NEGATIVE"
        - sentiment_score: Confidence score (0.0-1.0)
        - prominent_language: Detected language category
        - raw_scores: Full model output (all labels with scores)
        - cached: Whether result came from cache
        - analyzed_at: ISO timestamp
    """
    # Check cache first
    if not force_recalc and comment_id in _cache:
        cache_entry = _cache[comment_id]
        if _is_cache_valid(cache_entry["timestamp"]):
            logger.info(f"[Sentiment] Cache HIT for comment {comment_id} by {author}")
            result = cache_entry["result"].copy()
            result["cached"] = True
            return result
        else:
            # Cache expired, remove it
            del _cache[comment_id]
            logger.info(f"[Sentiment] Cache expired for comment {comment_id}")
    
    logger.info(f"[Sentiment] Analyzing comment {comment_id} by {author}")
    
    try:
        # Get sentiment pipeline
        pipeline = _get_sentiment_pipeline()
        
        # Truncate very long comments (keep first 512 tokens for DistilBERT)
        truncated_text = comment_text[:512] if len(comment_text) > 512 else comment_text
        
        # Run sentiment analysis
        results = pipeline(truncated_text)
        
        if not results:
            logger.warning(f"[Sentiment] No results from model for comment {comment_id}")
            return {
                "sentiment_label": "NEUTRAL",
                "sentiment_score": 0.5,
                "prominent_language": "neutral",
                "raw_scores": [],
                "cached": False,
                "analyzed_at": datetime.now(timezone.utc).isoformat(),
                "error": "Model returned no results"
            }
        
        # Extract sentiment info
        primary_result = results[0]
        sentiment_label = primary_result["label"]  # "POSITIVE" or "NEGATIVE"
        sentiment_score = primary_result["score"]  # Confidence (0.0-1.0)
        
        # Detect prominent language patterns
        prominent_language = _detect_prominent_language(comment_text)
        
        # Build response
        response = {
            "sentiment_label": sentiment_label,
            "sentiment_score": round(sentiment_score, 3),
            "prominent_language": prominent_language,
            "raw_scores": [
                {
                    "label": r["label"],
                    "score": round(r["score"], 3)
                } for r in results
            ],
            "cached": False,
            "analyzed_at": datetime.now(timezone.utc).isoformat()
        }
        
        # Cache the result
        _cache[comment_id] = {
            "result": response.copy(),
            "timestamp": time.time()
        }
        
        logger.info(
            f"[Sentiment] ✅ Comment {comment_id}: {sentiment_label} "
            f"(score: {sentiment_score:.3f}, language: {prominent_language})"
        )
        
        return response
        
    except Exception as e:
        logger.error(f"[Sentiment] Error analyzing comment {comment_id}: {e}")
        return {
            "sentiment_label": "NEUTRAL",
            "sentiment_score": 0.5,
            "prominent_language": "neutral",
            "raw_scores": [],
            "cached": False,
            "analyzed_at": datetime.now(timezone.utc).isoformat(),
            "error": str(e)
        }


def analyze_batch_comments(comments: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Analyze sentiment for multiple comments at once.
    
    Args:
        comments: List of dicts with keys: id, body, author (optional)
        
    Returns:
        List of sentiment analysis results
    """
    results = []
    
    for comment in comments:
        comment_id = comment.get("id", f"comment_{len(results)}")
        comment_text = comment.get("body", "")
        author = comment.get("author", "unknown")
        
        if not comment_text:
            logger.warning(f"Skipping comment {comment_id} with empty body")
            continue
        
        result = analyze_comment_sentiment(
            comment_id=comment_id,
            comment_text=comment_text,
            author=author
        )
        
        result["comment_id"] = comment_id
        result["author"] = author
        results.append(result)
    
    return results


def get_sentiment_summary(comments: List[Dict[str, Any]]) -> Dict[str, Any]:
    """
    Get aggregate sentiment summary from multiple comments.
    
    Useful for Stage 3 prompt: "What's the overall mood of reviewers?"
    
    Args:
        comments: List of sentiment analysis results
        
    Returns:
        Summary dict with:
        - overall_sentiment: Dominant sentiment
        - average_score: Mean sentiment score
        - positive_count: Number of positive comments
        - negative_count: Number of negative comments
        - prominent_languages: Top language categories
        - mood_description: Human-readable description
    """
    if not comments:
        return {
            "overall_sentiment": "NEUTRAL",
            "average_score": 0.5,
            "positive_count": 0,
            "negative_count": 0,
            "prominent_languages": [],
            "mood_description": "No comments to analyze"
        }
    
    positive_count = sum(1 for c in comments if c.get("sentiment_label") == "POSITIVE")
    negative_count = sum(1 for c in comments if c.get("sentiment_label") == "NEGATIVE")
    
    # Calculate average sentiment score
    scores = [c.get("sentiment_score", 0.5) for c in comments]
    average_score = sum(scores) / len(scores) if scores else 0.5
    
    # Count prominent languages
    language_counts = {}
    for comment in comments:
        lang = comment.get("prominent_language", "neutral")
        language_counts[lang] = language_counts.get(lang, 0) + 1
    
    top_languages = sorted(language_counts.items(), key=lambda x: x[1], reverse=True)[:3]
    
    # Determine overall sentiment
    if positive_count > negative_count * 1.5:
        overall = "POSITIVE"
        mood = "Reviewers are enthusiastic and supportive"
    elif negative_count > positive_count * 1.5:
        overall = "NEGATIVE"
        mood = "Reviewers have concerns or objections"
    else:
        overall = "MIXED"
        mood = "Reviewers have mixed feedback with discussion"
    
    return {
        "overall_sentiment": overall,
        "average_score": round(average_score, 3),
        "positive_count": positive_count,
        "negative_count": negative_count,
        "neutral_count": len(comments) - positive_count - negative_count,
        "prominent_languages": [lang for lang, _ in top_languages],
        "mood_description": mood,
        "total_comments": len(comments)
    }


def clear_cache():
    """Clear the sentiment analysis cache."""
    global _cache
    _cache.clear()
    logger.info("[Sentiment] Cache cleared")


def get_cache_stats() -> Dict[str, Any]:
    """Get cache statistics."""
    valid_entries = sum(1 for e in _cache.values() if _is_cache_valid(e["timestamp"]))
    
    return {
        "total_entries": len(_cache),
        "valid_entries": valid_entries,
        "expired_entries": len(_cache) - valid_entries,
        "cache_ttl_seconds": CACHE_TTL,
        "model_loaded": _sentiment_pipeline is not None
    }


# Service instance (singleton)
sentiment_analysis_service = type('SentimentAnalysisService', (), {
    'analyze_comment': analyze_comment_sentiment,
    'analyze_batch': analyze_batch_comments,
    'get_summary': get_sentiment_summary,
    'clear_cache': clear_cache,
    'get_cache_stats': get_cache_stats
})()