Penny_V2 / sentiment_utils.py
pythonprincess's picture
Upload 5 files
a9113e0 verified
# models/sentiment/sentiment_utils.py
"""
Sentiment Analysis Model Utilities for PENNY Project
Handles text sentiment classification for user input analysis and content moderation.
Provides async sentiment analysis with structured error handling and logging.
"""
import asyncio
import time
import os
import httpx
from typing import Dict, Any, Optional, List
# --- Logging Imports ---
from app.logging_utils import log_interaction, sanitize_for_logging
# --- Hugging Face API Configuration ---
HF_API_URL = "https://api-inference.huggingface.co/models/cardiffnlp/twitter-roberta-base-sentiment"
HF_TOKEN = os.getenv("HF_TOKEN")
AGENT_NAME = "penny-sentiment-agent"
def is_sentiment_available() -> bool:
"""
Check if sentiment analysis service is available.
Returns:
bool: True if sentiment API is configured and ready.
"""
return HF_TOKEN is not None and len(HF_TOKEN) > 0
async def get_sentiment_analysis(
text: str,
tenant_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Runs sentiment analysis on the input text using the loaded pipeline.
Args:
text: The string of text to analyze.
tenant_id: Optional tenant identifier for logging.
Returns:
A dictionary containing:
- label (str): Sentiment label (e.g., "POSITIVE", "NEGATIVE", "NEUTRAL")
- score (float): Confidence score for the sentiment prediction
- available (bool): Whether the service was available
- message (str, optional): Error message if analysis failed
- response_time_ms (int, optional): Analysis time in milliseconds
"""
start_time = time.time()
# Check availability
if not is_sentiment_available():
log_interaction(
intent="sentiment_analysis",
tenant_id=tenant_id,
success=False,
error="Sentiment API not configured (missing HF_TOKEN)",
fallback_used=True
)
return {
"label": "UNKNOWN",
"score": 0.0,
"available": False,
"message": "Sentiment analysis is temporarily unavailable."
}
# Validate input
if not text or not isinstance(text, str):
log_interaction(
intent="sentiment_analysis",
tenant_id=tenant_id,
success=False,
error="Invalid text input"
)
return {
"label": "ERROR",
"score": 0.0,
"available": True,
"message": "Invalid text input provided."
}
# Check text length (prevent processing extremely long texts)
if len(text) > 10000: # 10k character limit
log_interaction(
intent="sentiment_analysis",
tenant_id=tenant_id,
success=False,
error=f"Text too long: {len(text)} characters",
text_preview=sanitize_for_logging(text[:100])
)
return {
"label": "ERROR",
"score": 0.0,
"available": True,
"message": "Text is too long for sentiment analysis (max 10,000 characters)."
}
try:
# Prepare API request
headers = {"Authorization": f"Bearer {HF_TOKEN}"}
payload = {"inputs": text}
# Call Hugging Face Inference API
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(HF_API_URL, json=payload, headers=headers)
response_time_ms = int((time.time() - start_time) * 1000)
if response.status_code != 200:
log_interaction(
intent="sentiment_analysis",
tenant_id=tenant_id,
success=False,
error=f"API returned status {response.status_code}",
response_time_ms=response_time_ms,
text_preview=sanitize_for_logging(text[:100]),
fallback_used=True
)
return {
"label": "ERROR",
"score": 0.0,
"available": False,
"message": f"Sentiment API error: {response.status_code}",
"response_time_ms": response_time_ms
}
results = response.json()
# Validate results
# API returns: [[{"label": "LABEL_2", "score": 0.95}, ...]]
if not results or not isinstance(results, list) or len(results) == 0:
log_interaction(
intent="sentiment_analysis",
tenant_id=tenant_id,
success=False,
error="Empty or invalid model output",
response_time_ms=response_time_ms,
text_preview=sanitize_for_logging(text[:100])
)
return {
"label": "ERROR",
"score": 0.0,
"available": True,
"message": "Sentiment analysis returned unexpected format."
}
# Get the first (highest scoring) result
result_list = results[0] if isinstance(results[0], list) else results
if not result_list or len(result_list) == 0:
log_interaction(
intent="sentiment_analysis",
tenant_id=tenant_id,
success=False,
error="Empty result list",
response_time_ms=response_time_ms,
text_preview=sanitize_for_logging(text[:100])
)
return {
"label": "ERROR",
"score": 0.0,
"available": True,
"message": "Sentiment analysis returned unexpected format."
}
result = result_list[0]
# Validate result structure
if not isinstance(result, dict) or 'label' not in result or 'score' not in result:
log_interaction(
intent="sentiment_analysis",
tenant_id=tenant_id,
success=False,
error="Invalid result structure",
response_time_ms=response_time_ms,
text_preview=sanitize_for_logging(text[:100])
)
return {
"label": "ERROR",
"score": 0.0,
"available": True,
"message": "Sentiment analysis returned unexpected format."
}
# Map RoBERTa labels to readable format
# LABEL_0 = NEGATIVE, LABEL_1 = NEUTRAL, LABEL_2 = POSITIVE
label_mapping = {
"LABEL_0": "NEGATIVE",
"LABEL_1": "NEUTRAL",
"LABEL_2": "POSITIVE"
}
label = label_mapping.get(result['label'], result['label'])
# Log slow analysis
if response_time_ms > 3000: # 3 seconds
log_interaction(
intent="sentiment_analysis_slow",
tenant_id=tenant_id,
success=True,
response_time_ms=response_time_ms,
details="Slow sentiment analysis detected",
text_length=len(text)
)
log_interaction(
intent="sentiment_analysis",
tenant_id=tenant_id,
success=True,
response_time_ms=response_time_ms,
sentiment_label=label,
sentiment_score=result.get('score'),
text_length=len(text)
)
return {
"label": label,
"score": float(result['score']),
"available": True,
"response_time_ms": response_time_ms
}
except httpx.TimeoutException:
response_time_ms = int((time.time() - start_time) * 1000)
log_interaction(
intent="sentiment_analysis",
tenant_id=tenant_id,
success=False,
error="Sentiment analysis request timed out",
response_time_ms=response_time_ms,
text_preview=sanitize_for_logging(text[:100]),
fallback_used=True
)
return {
"label": "ERROR",
"score": 0.0,
"available": False,
"message": "Sentiment analysis request timed out.",
"response_time_ms": response_time_ms
}
except asyncio.CancelledError:
log_interaction(
intent="sentiment_analysis",
tenant_id=tenant_id,
success=False,
error="Analysis cancelled"
)
raise
except Exception as e:
response_time_ms = int((time.time() - start_time) * 1000)
log_interaction(
intent="sentiment_analysis",
tenant_id=tenant_id,
success=False,
error=str(e),
response_time_ms=response_time_ms,
text_preview=sanitize_for_logging(text[:100]),
fallback_used=True
)
return {
"label": "ERROR",
"score": 0.0,
"available": False,
"message": "An error occurred during sentiment analysis.",
"error": str(e),
"response_time_ms": response_time_ms
}
async def analyze_sentiment_batch(
texts: List[str],
tenant_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Runs sentiment analysis on a batch of texts for efficiency.
Args:
texts: List of text strings to analyze.
tenant_id: Optional tenant identifier for logging.
Returns:
A dictionary containing:
- results (list): List of sentiment analysis results for each text
- available (bool): Whether the service was available
- total_analyzed (int): Number of texts successfully analyzed
- response_time_ms (int, optional): Total batch analysis time
"""
start_time = time.time()
# Check availability
if not is_sentiment_available():
log_interaction(
intent="sentiment_batch_analysis",
tenant_id=tenant_id,
success=False,
error="Sentiment API not configured (missing HF_TOKEN)",
batch_size=len(texts) if texts else 0
)
return {
"results": [],
"available": False,
"total_analyzed": 0,
"message": "Sentiment analysis is temporarily unavailable."
}
# Validate input
if not texts or not isinstance(texts, list):
log_interaction(
intent="sentiment_batch_analysis",
tenant_id=tenant_id,
success=False,
error="Invalid texts input"
)
return {
"results": [],
"available": True,
"total_analyzed": 0,
"message": "Invalid batch input provided."
}
# Filter valid texts and limit batch size
valid_texts = [t for t in texts if isinstance(t, str) and t.strip()]
if len(valid_texts) > 100: # Batch size limit
valid_texts = valid_texts[:100]
if not valid_texts:
log_interaction(
intent="sentiment_batch_analysis",
tenant_id=tenant_id,
success=False,
error="No valid texts in batch"
)
return {
"results": [],
"available": True,
"total_analyzed": 0,
"message": "No valid texts provided for analysis."
}
try:
# Prepare API request with batch input
headers = {"Authorization": f"Bearer {HF_TOKEN}"}
payload = {"inputs": valid_texts}
# Call Hugging Face Inference API
async with httpx.AsyncClient(timeout=60.0) as client: # Longer timeout for batch
response = await client.post(HF_API_URL, json=payload, headers=headers)
response_time_ms = int((time.time() - start_time) * 1000)
if response.status_code != 200:
log_interaction(
intent="sentiment_batch_analysis",
tenant_id=tenant_id,
success=False,
error=f"API returned status {response.status_code}",
response_time_ms=response_time_ms,
batch_size=len(valid_texts)
)
return {
"results": [],
"available": False,
"total_analyzed": 0,
"message": f"Sentiment API error: {response.status_code}",
"response_time_ms": response_time_ms
}
results = response.json()
# Process results and map labels
label_mapping = {
"LABEL_0": "NEGATIVE",
"LABEL_1": "NEUTRAL",
"LABEL_2": "POSITIVE"
}
processed_results = []
if results and isinstance(results, list):
for item in results:
if isinstance(item, list) and len(item) > 0:
top_result = item[0]
if isinstance(top_result, dict) and 'label' in top_result:
processed_results.append({
"label": label_mapping.get(top_result['label'], top_result['label']),
"score": float(top_result.get('score', 0.0))
})
log_interaction(
intent="sentiment_batch_analysis",
tenant_id=tenant_id,
success=True,
response_time_ms=response_time_ms,
batch_size=len(valid_texts),
total_analyzed=len(processed_results)
)
return {
"results": processed_results,
"available": True,
"total_analyzed": len(processed_results),
"response_time_ms": response_time_ms
}
except httpx.TimeoutException:
response_time_ms = int((time.time() - start_time) * 1000)
log_interaction(
intent="sentiment_batch_analysis",
tenant_id=tenant_id,
success=False,
error="Batch sentiment analysis timed out",
response_time_ms=response_time_ms,
batch_size=len(valid_texts)
)
return {
"results": [],
"available": False,
"total_analyzed": 0,
"message": "Batch sentiment analysis timed out.",
"error": "Request timeout",
"response_time_ms": response_time_ms
}
except Exception as e:
response_time_ms = int((time.time() - start_time) * 1000)
log_interaction(
intent="sentiment_batch_analysis",
tenant_id=tenant_id,
success=False,
error=str(e),
response_time_ms=response_time_ms,
batch_size=len(valid_texts)
)
return {
"results": [],
"available": False,
"total_analyzed": 0,
"message": "An error occurred during batch sentiment analysis.",
"error": str(e),
"response_time_ms": response_time_ms
}