Spaces:
Paused
Paused
| # models/sentiment/sentiment_utils.py | |
| """ | |
| Sentiment Analysis Model Utilities for PENNY Project | |
| Handles text sentiment classification for user input analysis and content moderation. | |
| Provides async sentiment analysis with structured error handling and logging. | |
| """ | |
| import asyncio | |
| import time | |
| from typing import Dict, Any, Optional, List | |
| # --- Logging Imports --- | |
| from app.logging_utils import log_interaction, sanitize_for_logging | |
| # --- Model Loader Import --- | |
| try: | |
| from app.model_loader import load_model_pipeline | |
| MODEL_LOADER_AVAILABLE = True | |
| except ImportError: | |
| MODEL_LOADER_AVAILABLE = False | |
| import logging | |
| logging.getLogger(__name__).warning("Could not import load_model_pipeline. Sentiment service unavailable.") | |
| # Global variable to store the loaded pipeline for re-use | |
| SENTIMENT_PIPELINE: Optional[Any] = None | |
| AGENT_NAME = "penny-sentiment-agent" | |
| INITIALIZATION_ATTEMPTED = False | |
| def _initialize_sentiment_pipeline() -> bool: | |
| """ | |
| Initializes the sentiment pipeline only once. | |
| Returns: | |
| bool: True if initialization succeeded, False otherwise. | |
| """ | |
| global SENTIMENT_PIPELINE, INITIALIZATION_ATTEMPTED | |
| if INITIALIZATION_ATTEMPTED: | |
| return SENTIMENT_PIPELINE is not None | |
| INITIALIZATION_ATTEMPTED = True | |
| if not MODEL_LOADER_AVAILABLE: | |
| log_interaction( | |
| intent="sentiment_initialization", | |
| success=False, | |
| error="model_loader unavailable" | |
| ) | |
| return False | |
| try: | |
| log_interaction( | |
| intent="sentiment_initialization", | |
| success=None, | |
| details=f"Loading {AGENT_NAME}" | |
| ) | |
| SENTIMENT_PIPELINE = load_model_pipeline(AGENT_NAME) | |
| if SENTIMENT_PIPELINE is None: | |
| log_interaction( | |
| intent="sentiment_initialization", | |
| success=False, | |
| error="Pipeline returned None" | |
| ) | |
| return False | |
| log_interaction( | |
| intent="sentiment_initialization", | |
| success=True, | |
| details=f"Model {AGENT_NAME} loaded successfully" | |
| ) | |
| return True | |
| except Exception as e: | |
| log_interaction( | |
| intent="sentiment_initialization", | |
| success=False, | |
| error=str(e) | |
| ) | |
| return False | |
| # Attempt initialization at module load | |
| _initialize_sentiment_pipeline() | |
| def is_sentiment_available() -> bool: | |
| """ | |
| Check if sentiment analysis service is available. | |
| Returns: | |
| bool: True if sentiment pipeline is loaded and ready. | |
| """ | |
| return SENTIMENT_PIPELINE is not None | |
| async def get_sentiment_analysis( | |
| text: str, | |
| tenant_id: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Runs sentiment analysis on the input text using the loaded pipeline. | |
| Args: | |
| text: The string of text to analyze. | |
| tenant_id: Optional tenant identifier for logging. | |
| Returns: | |
| A dictionary containing: | |
| - label (str): Sentiment label (e.g., "POSITIVE", "NEGATIVE", "NEUTRAL") | |
| - score (float): Confidence score for the sentiment prediction | |
| - available (bool): Whether the service was available | |
| - message (str, optional): Error message if analysis failed | |
| - response_time_ms (int, optional): Analysis time in milliseconds | |
| """ | |
| start_time = time.time() | |
| global SENTIMENT_PIPELINE | |
| # Check availability | |
| if not is_sentiment_available(): | |
| log_interaction( | |
| intent="sentiment_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error="Sentiment pipeline not available", | |
| fallback_used=True | |
| ) | |
| return { | |
| "label": "UNKNOWN", | |
| "score": 0.0, | |
| "available": False, | |
| "message": "Sentiment analysis is temporarily unavailable." | |
| } | |
| # Validate input | |
| if not text or not isinstance(text, str): | |
| log_interaction( | |
| intent="sentiment_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error="Invalid text input" | |
| ) | |
| return { | |
| "label": "ERROR", | |
| "score": 0.0, | |
| "available": True, | |
| "message": "Invalid text input provided." | |
| } | |
| # Check text length (prevent processing extremely long texts) | |
| if len(text) > 10000: # 10k character limit | |
| log_interaction( | |
| intent="sentiment_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error=f"Text too long: {len(text)} characters", | |
| text_preview=sanitize_for_logging(text[:100]) | |
| ) | |
| return { | |
| "label": "ERROR", | |
| "score": 0.0, | |
| "available": True, | |
| "message": "Text is too long for sentiment analysis (max 10,000 characters)." | |
| } | |
| try: | |
| loop = asyncio.get_event_loop() | |
| # Run model inference in thread executor | |
| # Hugging Face pipelines accept lists and return lists | |
| results = await loop.run_in_executor( | |
| None, | |
| lambda: SENTIMENT_PIPELINE([text]) | |
| ) | |
| response_time_ms = int((time.time() - start_time) * 1000) | |
| # Validate results | |
| if not results or not isinstance(results, list) or len(results) == 0: | |
| log_interaction( | |
| intent="sentiment_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error="Empty or invalid model output", | |
| response_time_ms=response_time_ms, | |
| text_preview=sanitize_for_logging(text[:100]) | |
| ) | |
| return { | |
| "label": "ERROR", | |
| "score": 0.0, | |
| "available": True, | |
| "message": "Sentiment analysis returned unexpected format." | |
| } | |
| result = results[0] | |
| # Validate result structure | |
| if not isinstance(result, dict) or 'label' not in result or 'score' not in result: | |
| log_interaction( | |
| intent="sentiment_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error="Invalid result structure", | |
| response_time_ms=response_time_ms, | |
| text_preview=sanitize_for_logging(text[:100]) | |
| ) | |
| return { | |
| "label": "ERROR", | |
| "score": 0.0, | |
| "available": True, | |
| "message": "Sentiment analysis returned unexpected format." | |
| } | |
| # Log slow analysis | |
| if response_time_ms > 3000: # 3 seconds | |
| log_interaction( | |
| intent="sentiment_analysis_slow", | |
| tenant_id=tenant_id, | |
| success=True, | |
| response_time_ms=response_time_ms, | |
| details="Slow sentiment analysis detected", | |
| text_length=len(text) | |
| ) | |
| log_interaction( | |
| intent="sentiment_analysis", | |
| tenant_id=tenant_id, | |
| success=True, | |
| response_time_ms=response_time_ms, | |
| sentiment_label=result.get('label'), | |
| sentiment_score=result.get('score'), | |
| text_length=len(text) | |
| ) | |
| return { | |
| "label": result['label'], | |
| "score": float(result['score']), | |
| "available": True, | |
| "response_time_ms": response_time_ms | |
| } | |
| except asyncio.CancelledError: | |
| log_interaction( | |
| intent="sentiment_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error="Analysis cancelled" | |
| ) | |
| raise | |
| except Exception as e: | |
| response_time_ms = int((time.time() - start_time) * 1000) | |
| log_interaction( | |
| intent="sentiment_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error=str(e), | |
| response_time_ms=response_time_ms, | |
| text_preview=sanitize_for_logging(text[:100]), | |
| fallback_used=True | |
| ) | |
| return { | |
| "label": "ERROR", | |
| "score": 0.0, | |
| "available": False, | |
| "message": "An error occurred during sentiment analysis.", | |
| "error": str(e), | |
| "response_time_ms": response_time_ms | |
| } | |
| async def analyze_sentiment_batch( | |
| texts: List[str], | |
| tenant_id: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Runs sentiment analysis on a batch of texts for efficiency. | |
| Args: | |
| texts: List of text strings to analyze. | |
| tenant_id: Optional tenant identifier for logging. | |
| Returns: | |
| A dictionary containing: | |
| - results (list): List of sentiment analysis results for each text | |
| - available (bool): Whether the service was available | |
| - total_analyzed (int): Number of texts successfully analyzed | |
| - response_time_ms (int, optional): Total batch analysis time | |
| """ | |
| start_time = time.time() | |
| global SENTIMENT_PIPELINE | |
| # Check availability | |
| if not is_sentiment_available(): | |
| log_interaction( | |
| intent="sentiment_batch_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error="Sentiment pipeline not available", | |
| batch_size=len(texts) if texts else 0 | |
| ) | |
| return { | |
| "results": [], | |
| "available": False, | |
| "total_analyzed": 0, | |
| "message": "Sentiment analysis is temporarily unavailable." | |
| } | |
| # Validate input | |
| if not texts or not isinstance(texts, list): | |
| log_interaction( | |
| intent="sentiment_batch_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error="Invalid texts input" | |
| ) | |
| return { | |
| "results": [], | |
| "available": True, | |
| "total_analyzed": 0, | |
| "message": "Invalid batch input provided." | |
| } | |
| # Filter valid texts and limit batch size | |
| valid_texts = [t for t in texts if isinstance(t, str) and t.strip()] | |
| if len(valid_texts) > 100: # Batch size limit | |
| valid_texts = valid_texts[:100] | |
| if not valid_texts: | |
| log_interaction( | |
| intent="sentiment_batch_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error="No valid texts in batch" | |
| ) | |
| return { | |
| "results": [], | |
| "available": True, | |
| "total_analyzed": 0, | |
| "message": "No valid texts provided for analysis." | |
| } | |
| try: | |
| loop = asyncio.get_event_loop() | |
| # Run batch inference in thread executor | |
| results = await loop.run_in_executor( | |
| None, | |
| lambda: SENTIMENT_PIPELINE(valid_texts) | |
| ) | |
| response_time_ms = int((time.time() - start_time) * 1000) | |
| log_interaction( | |
| intent="sentiment_batch_analysis", | |
| tenant_id=tenant_id, | |
| success=True, | |
| response_time_ms=response_time_ms, | |
| batch_size=len(valid_texts), | |
| total_analyzed=len(results) if results else 0 | |
| ) | |
| return { | |
| "results": results if results else [], | |
| "available": True, | |
| "total_analyzed": len(results) if results else 0, | |
| "response_time_ms": response_time_ms | |
| } | |
| except Exception as e: | |
| response_time_ms = int((time.time() - start_time) * 1000) | |
| log_interaction( | |
| intent="sentiment_batch_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error=str(e), | |
| response_time_ms=response_time_ms, | |
| batch_size=len(valid_texts) | |
| ) | |
| return { | |
| "results": [], | |
| "available": False, | |
| "total_analyzed": 0, | |
| "message": "An error occurred during batch sentiment analysis.", | |
| "error": str(e), | |
| "response_time_ms": response_time_ms | |
| } |