Spaces:
Sleeping
Sleeping
| """ | |
| Enhanced data processing pipeline for advanced sentiment analysis application. | |
| Handles translation, ABSA, intent classification, priority scoring, and co-occurrence analysis. | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from typing import Dict, List, Tuple, Any, Optional | |
| import re | |
| from datetime import datetime, timedelta | |
| import logging | |
| from langdetect import detect | |
| import streamlit as st | |
| from collections import Counter, defaultdict | |
| from itertools import combinations | |
| import networkx as nx | |
| import requests | |
| import os | |
| import time | |
| # Suppress GitPython warnings for PyABSA | |
| os.environ['GIT_PYTHON_REFRESH'] = 'quiet' | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class DataValidator: | |
| """Validates uploaded CSV data format and content.""" | |
| REQUIRED_COLUMNS = ['id', 'reviews_title', 'review', 'date', 'user_id'] | |
| def validate_csv(df: pd.DataFrame) -> Tuple[bool, List[str]]: | |
| """ | |
| Validate CSV format and content. | |
| Args: | |
| df: Uploaded dataframe | |
| Returns: | |
| Tuple of (is_valid, error_messages) | |
| """ | |
| errors = [] | |
| # Check required columns | |
| missing_cols = set(DataValidator.REQUIRED_COLUMNS) - set(df.columns) | |
| if missing_cols: | |
| errors.append(f"Missing required columns: {missing_cols}") | |
| if not errors: | |
| # Check for empty values | |
| if df['review'].isnull().any() or (df['review'] == '').any(): | |
| errors.append("Found empty review entries") | |
| # Validate date format | |
| try: | |
| pd.to_datetime(df['date'], errors='coerce') | |
| if df['date'].isnull().any(): | |
| errors.append("Invalid date format detected") | |
| except Exception as e: | |
| errors.append(f"Date validation error: {str(e)}") | |
| return len(errors) == 0, errors | |
| def clean_data(df: pd.DataFrame) -> pd.DataFrame: | |
| """Clean and preprocess the data.""" | |
| df = df.copy() | |
| # Convert date column | |
| df['date'] = pd.to_datetime(df['date'], errors='coerce') | |
| # Clean text data | |
| df['review'] = df['review'].astype(str).str.strip() | |
| df['reviews_title'] = df['reviews_title'].astype(str).str.strip() | |
| # Remove rows with null reviews | |
| df = df.dropna(subset=['review']) | |
| # Remove duplicate reviews | |
| df = df.drop_duplicates(subset=['review'], keep='first') | |
| return df.reset_index(drop=True) | |
| class TranslationService: | |
| """Handles translation using HF Inference API - much more reliable than local models.""" | |
| def __init__(self): | |
| self.api_token = self._get_hf_token() | |
| # Using AI4Bharat IndicTrans2 - specialized for Indian languages | |
| self.translation_model = "ai4bharat/indictrans2-en-indic-1.3B" | |
| self.base_url = "https://router.huggingface.co" | |
| logger.info("Initialized HF Inference API for translation (IndicTrans2)") | |
| def _get_hf_token(self) -> Optional[str]: | |
| """Get HF token from environment or Streamlit secrets.""" | |
| try: | |
| return st.secrets["HF_TOKEN"] | |
| except: | |
| pass | |
| token = os.getenv("HF_TOKEN") | |
| if not token: | |
| logger.warning("No HF_TOKEN found. Translation will be limited.") | |
| else: | |
| # Log first 10 chars to verify token is loaded (without exposing full token) | |
| logger.info(f"HF_TOKEN loaded successfully (starts with: {token[:10]}...)") | |
| return token | |
| def _call_hf_translation_api(self, text: str, source_lang: str = "hi", target_lang: str = "en") -> str: | |
| """Call HF Translation API with fallback.""" | |
| if not self.api_token: | |
| logger.debug("No API token, skipping translation") | |
| return text | |
| try: | |
| headers = {"Authorization": f"Bearer {self.api_token}"} | |
| # Try using serverless inference endpoint | |
| url = f"https://api-inference.huggingface.co/models/{self.translation_model}" | |
| # IndicTrans2 requires simple input format | |
| payload = {"inputs": text} | |
| response = requests.post(url, headers=headers, json=payload, timeout=10) | |
| if response.status_code == 200: | |
| result = response.json() | |
| # IndicTrans2 returns: [{"translation_text": "..."}] or {"generated_text": "..."} | |
| if isinstance(result, list) and len(result) > 0: | |
| translated = result[0].get("translation_text", "") or result[0].get("generated_text", "") | |
| if translated: | |
| return translated | |
| elif isinstance(result, dict): | |
| translated = result.get("generated_text", "") or result.get("translation_text", "") | |
| if translated: | |
| return translated | |
| # Silently fallback to original text (translation is optional) | |
| logger.debug(f"Translation unavailable, using original text (status: {response.status_code})") | |
| return text | |
| except Exception as e: | |
| logger.debug(f"Translation skipped: {str(e)}") | |
| return text | |
| def detect_language(self, text: str) -> str: | |
| """Detect language of the text.""" | |
| try: | |
| lang = detect(text) | |
| return lang | |
| except: | |
| return 'unknown' | |
| def translate_to_english(self, text: str, source_lang: str = 'hi') -> str: | |
| """ | |
| Translate text to English using HF API. | |
| Args: | |
| text: Text to translate | |
| source_lang: Source language code | |
| Returns: | |
| Translated text | |
| """ | |
| if source_lang == 'en' or source_lang == 'unknown': | |
| return text | |
| return self._call_hf_translation_api(text, source_lang, "en") | |
| def process_reviews(self, reviews: List[str]) -> Tuple[List[str], List[str]]: | |
| """ | |
| Process list of reviews for translation. | |
| Args: | |
| reviews: List of review texts | |
| Returns: | |
| Tuple of (translated_reviews, detected_languages) | |
| """ | |
| translated_reviews = [] | |
| detected_languages = [] | |
| for i, review in enumerate(reviews): | |
| if i % 20 == 0: # Progress logging | |
| logger.info(f"Processing translation {i+1}/{len(reviews)}") | |
| lang = self.detect_language(review) | |
| detected_languages.append(lang) | |
| if lang == 'hi': # Hindi detected | |
| translated = self.translate_to_english(review, 'hi') | |
| translated_reviews.append(translated) | |
| else: | |
| translated_reviews.append(review) # Keep original if not Hindi | |
| return translated_reviews, detected_languages | |
| class ABSAProcessor: | |
| """Enhanced ABSA using PyABSA for accurate aspect extraction and sentiment analysis.""" | |
| def __init__(self): | |
| self.model = None | |
| self.task_manager = None | |
| self._load_pyabsa_model() | |
| logger.info("Initialized PyABSA for ABSA processing") | |
| def _load_pyabsa_model(self): | |
| """Load PyABSA multilingual model with caching.""" | |
| try: | |
| # Suppress additional git warnings | |
| import warnings | |
| warnings.filterwarnings('ignore', category=DeprecationWarning) | |
| # PyABSA v2.4.2 correct API - use AspectTermExtraction module | |
| from pyabsa import AspectTermExtraction as ATEPC | |
| # Check if git is available | |
| try: | |
| import subprocess | |
| git_check = subprocess.run(['git', '--version'], capture_output=True, timeout=5) | |
| if git_check.returncode == 0: | |
| logger.info(f"Git available: {git_check.stdout.decode().strip()}") | |
| else: | |
| logger.warning("Git command failed, PyABSA may have issues downloading checkpoints") | |
| except Exception as git_err: | |
| logger.warning(f"Git not found or not executable: {str(git_err)}") | |
| # Load aspect extractor using correct PyABSA v2.4.2 API | |
| logger.info("Loading PyABSA ATEPC multilingual model...") | |
| logger.info("This may take a few minutes on first run (downloading checkpoint)...") | |
| # Use AspectExtractor class - the correct way in v2.4.2 | |
| self.model = ATEPC.AspectExtractor('multilingual') | |
| logger.info("✅ PyABSA model loaded successfully") | |
| except Exception as e: | |
| import traceback | |
| logger.error(f"❌ Failed to load PyABSA model: {str(e)}") | |
| logger.error(f"Traceback: {traceback.format_exc()}") | |
| logger.warning("⚠️ Using fallback method for aspect extraction") | |
| self.model = None | |
| def set_task_manager(self, task_manager): | |
| """Set task manager for cancellation support.""" | |
| self.task_manager = task_manager | |
| def _get_hf_token(self) -> Optional[str]: | |
| """Get HF token from environment or Streamlit secrets.""" | |
| # Try Streamlit secrets first | |
| try: | |
| return st.secrets["HF_TOKEN"] | |
| except: | |
| pass | |
| # Try environment variable | |
| token = os.getenv("HF_TOKEN") | |
| if not token: | |
| logger.warning("No HF_TOKEN found. Some features may be limited.") | |
| return token | |
| def _call_hf_api(self, model_name: str, inputs: str, max_retries: int = 3) -> Dict: | |
| """Call HF Inference API with retry logic.""" | |
| headers = {} | |
| if self.api_token: | |
| headers["Authorization"] = f"Bearer {self.api_token}" | |
| url = f"{self.base_url}/{model_name}" | |
| payload = {"inputs": inputs} | |
| for attempt in range(max_retries): | |
| try: | |
| response = requests.post(url, headers=headers, json=payload, timeout=30) | |
| if response.status_code == 503: | |
| # Model is loading, wait and retry | |
| wait_time = 2 ** attempt # Exponential backoff | |
| logger.info(f"Model loading, waiting {wait_time}s before retry...") | |
| time.sleep(wait_time) | |
| continue | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"API call failed (attempt {attempt + 1}): {str(e)}") | |
| if attempt == max_retries - 1: | |
| return {"error": str(e)} | |
| time.sleep(1) | |
| return {"error": "Max retries exceeded"} | |
| def extract_aspects_and_sentiments(self, reviews: List[str], task_id: Optional[str] = None) -> List[Dict[str, Any]]: | |
| """ | |
| Extract aspects and sentiments using PyABSA with fallback and cancellation support. | |
| Args: | |
| reviews: List of review texts (preferably in English after translation) | |
| task_id: Optional task ID for cancellation tracking | |
| Returns: | |
| List of dictionaries containing extracted information or cancellation dict | |
| """ | |
| import gc | |
| logger.info(f"Processing {len(reviews)} reviews with PyABSA") | |
| processed_results = [] | |
| batch_size = 5 # Process 5 reviews at a time for responsive cancellation | |
| for batch_start in range(0, len(reviews), batch_size): | |
| # Check cancellation before each batch | |
| if task_id and self.task_manager and self.task_manager.is_cancelled(task_id): | |
| logger.info(f"ABSA processing cancelled at review {batch_start}/{len(reviews)}") | |
| del processed_results | |
| gc.collect() | |
| return {'status': 'cancelled', 'message': 'ABSA processing cancelled by user'} | |
| batch_end = min(batch_start + batch_size, len(reviews)) | |
| batch_reviews = reviews[batch_start:batch_end] | |
| # Process batch | |
| for i, review in enumerate(batch_reviews): | |
| review_idx = batch_start + i | |
| if review_idx % 10 == 0: # Progress logging | |
| logger.info(f"Processing review {review_idx+1}/{len(reviews)}") | |
| # Try PyABSA first, fallback to rule-based if unavailable | |
| if self.model is not None: | |
| try: | |
| result = self._extract_with_pyabsa(review) | |
| except Exception as e: | |
| logger.warning(f"PyABSA failed for review {review_idx}: {str(e)}, using fallback") | |
| result = self._extract_with_fallback(review) | |
| else: | |
| result = self._extract_with_fallback(review) | |
| processed_results.append(result) | |
| # Update progress after each batch (50-90% range) | |
| if task_id and self.task_manager: | |
| progress = 50 + int((batch_end / len(reviews)) * 40) | |
| self.task_manager.update_task(task_id, progress=progress) | |
| logger.info(f"Successfully processed {len(processed_results)} reviews") | |
| return processed_results | |
| def _extract_with_pyabsa(self, review: str) -> Dict[str, Any]: | |
| """Extract aspects and sentiments using PyABSA model.""" | |
| result = self.model.predict(review, print_result=False, save_result=False) | |
| # PyABSA returns: aspect, sentiment, confidence, position | |
| aspects = result.get('aspect', []) | |
| sentiments = result.get('sentiment', []) | |
| positions = result.get('position', []) | |
| confidence_scores = result.get('confidence', []) | |
| # Handle single aspect case | |
| if not isinstance(aspects, list): | |
| aspects = [aspects] if aspects else [] | |
| sentiments = [sentiments] if sentiments else [] | |
| positions = [positions] if positions else [] | |
| confidence_scores = [confidence_scores] if confidence_scores else [] | |
| # If no aspects found, use fallback | |
| if not aspects: | |
| return self._extract_with_fallback(review) | |
| return { | |
| 'sentence': review, | |
| 'aspects': aspects, | |
| 'sentiments': sentiments, | |
| 'positions': positions, | |
| 'confidence_scores': confidence_scores, | |
| 'tokens': review.split(), | |
| 'iob_tags': ['O'] * len(review.split()) | |
| } | |
| def _extract_with_fallback(self, review: str) -> Dict[str, Any]: | |
| """Fallback rule-based extraction when PyABSA is unavailable.""" | |
| sentiment = self._get_rule_based_sentiment(review) | |
| aspects = self._extract_simple_aspects(review) | |
| return { | |
| 'sentence': review, | |
| 'aspects': aspects, | |
| 'sentiments': [sentiment] * len(aspects), | |
| 'positions': [[0, len(review)]] * len(aspects), | |
| 'confidence_scores': [0.7] * len(aspects), # Lower confidence for rule-based | |
| 'tokens': review.split(), | |
| 'iob_tags': ['O'] * len(review.split()) | |
| } | |
| def _get_hf_sentiment(self, text: str) -> str: | |
| """Get sentiment from HF Inference API with fallback.""" | |
| if not self.api_token: | |
| # Fallback to rule-based if no API token | |
| return self._get_rule_based_sentiment(text) | |
| try: | |
| result = self._call_hf_api(self.sentiment_model, text) | |
| if "error" in result: | |
| logger.warning(f"API error, using rule-based fallback: {result['error']}") | |
| return self._get_rule_based_sentiment(text) | |
| # Parse HF sentiment result | |
| if isinstance(result, list) and len(result) > 0: | |
| predictions = result[0] | |
| if isinstance(predictions, list) and len(predictions) > 0: | |
| top_prediction = max(predictions, key=lambda x: x.get('score', 0)) | |
| label = top_prediction.get('label', 'NEUTRAL').upper() | |
| # Map HF labels to our format | |
| if 'POSITIVE' in label or 'POS' in label: | |
| return 'Positive' | |
| elif 'NEGATIVE' in label or 'NEG' in label: | |
| return 'Negative' | |
| else: | |
| return 'Neutral' | |
| # Fallback if parsing fails | |
| return self._get_rule_based_sentiment(text) | |
| except Exception as e: | |
| logger.error(f"HF API error: {str(e)}, using rule-based fallback") | |
| return self._get_rule_based_sentiment(text) | |
| def _get_rule_based_sentiment(self, review: str) -> str: | |
| """Fallback rule-based sentiment analysis with enhanced negative detection.""" | |
| review_lower = review.lower() | |
| # Enhanced sentiment words | |
| positive_words = ['good', 'great', 'excellent', 'amazing', 'love', 'best', 'awesome', | |
| 'fantastic', 'wonderful', 'perfect', 'satisfied', 'happy', 'pleased', | |
| 'outstanding', 'brilliant', 'superb', 'delighted', 'impressed', 'working', | |
| 'अच्छा', 'बढ़िया', 'शानदार', 'बेहतरीन'] | |
| negative_words = ['bad', 'terrible', 'awful', 'hate', 'worst', 'horrible', 'poor', | |
| 'disappointing', 'frustrated', 'angry', 'broken', 'failed', 'useless', | |
| 'pathetic', 'disgusting', 'annoying', 'waste', 'regret', 'problem', 'issue', | |
| 'not working', 'doesn\'t work', 'never', 'delayed', 'late', 'slow', 'error', | |
| 'खराब', 'समस्या', 'देर', 'नहीं', 'बुरा'] | |
| # Strong negative phrases (count as 2 points) | |
| negative_phrases = ['too late', 'never comes', 'not received', 'doesn\'t arrive', | |
| 'delayed', 'not working', 'बहुत देर', 'नहीं आता'] | |
| pos_count = sum(1 for word in positive_words if word in review_lower) | |
| neg_count = sum(1 for word in negative_words if word in review_lower) | |
| # Check for negative phrases (stronger signal) | |
| for phrase in negative_phrases: | |
| if phrase in review_lower: | |
| neg_count += 2 | |
| if pos_count > neg_count: | |
| return 'Positive' | |
| elif neg_count > pos_count: | |
| return 'Negative' | |
| else: | |
| return 'Neutral' | |
| def _extract_simple_aspects(self, review: str) -> List[str]: | |
| """Extract aspects using enhanced keyword matching.""" | |
| review_lower = review.lower() | |
| aspects = [] | |
| # Enhanced aspect keywords with Hindi/English variants | |
| aspect_keywords = { | |
| 'OTP/Verification': ['otp', 'atp', 'verification', 'verify', 'code', 'pin', 'authentication', 'ओटीपी', 'कोड', 'सत्यापन'], | |
| 'Login/Account': ['login', 'sign in', 'signin', 'account', 'password', 'username', 'register', 'signup', 'लॉगिन', 'खाता'], | |
| 'App Performance': ['app', 'application', 'crash', 'freeze', 'hang', 'loading', 'lag', 'slow', 'एप', 'एप्लिकेशन'], | |
| 'Payment': ['payment', 'pay', 'transaction', 'refund', 'money', 'bank', 'upi', 'wallet', 'भुगतान', 'पैसा'], | |
| 'Quality': ['quality', 'build', 'material', 'construction', 'durability', 'solid', 'sturdy', 'cheap', 'flimsy', 'गुणवत्ता'], | |
| 'Price': ['price', 'cost', 'expensive', 'cheap', 'value', 'money', 'affordable', 'budget', 'worth', 'कीमत', 'दाम'], | |
| 'Service': ['service', 'support', 'help', 'staff', 'customer', 'response', 'team', 'representative', 'सेवा', 'सहायता'], | |
| 'Delivery': ['delivery', 'shipping', 'fast', 'quick', 'slow', 'delayed', 'arrive', 'package', 'डिलीवरी', 'शिपिंग'], | |
| 'Design': ['design', 'look', 'appearance', 'beautiful', 'ugly', 'style', 'color', 'aesthetic', 'डिज़ाइन', 'रूप'], | |
| 'Performance': ['performance', 'speed', 'fast', 'slow', 'efficiency', 'works', 'function', 'smooth', 'प्रदर्शन'], | |
| 'Usability': ['easy', 'difficult', 'user', 'interface', 'intuitive', 'complex', 'simple', 'confusing', 'उपयोग'], | |
| 'Features': ['feature', 'function', 'capability', 'option', 'setting', 'mode', 'tool', 'फीचर', 'सुविधा'], | |
| 'Size': ['size', 'big', 'small', 'large', 'compact', 'tiny', 'huge', 'dimension', 'आकार'], | |
| 'Battery': ['battery', 'charge', 'power', 'energy', 'last', 'drain', 'life', 'बैटरी', 'चार्ज'] | |
| } | |
| for aspect, keywords in aspect_keywords.items(): | |
| if any(keyword in review_lower for keyword in keywords): | |
| aspects.append(aspect) | |
| # Default aspect if none found | |
| if not aspects: | |
| aspects = ['General'] | |
| return aspects | |
| class IntentClassifier: | |
| """Enhanced intent classifier with severity scoring for complaints and type classification.""" | |
| INTENT_KEYWORDS = { | |
| 'complaint': { | |
| 'high_severity': ['terrible', 'worst', 'horrible', 'awful', 'hate', 'useless', 'trash', 'garbage', 'waste', 'pathetic'], | |
| 'medium_severity': ['bad', 'disappointed', 'frustrated', 'annoying', 'poor', 'defective', 'broken', 'failed'], | |
| 'low_severity': ['problem', 'issue', 'concern', 'slow', 'okay but', 'could be better'] | |
| }, | |
| 'praise': { | |
| 'high_positive': ['excellent', 'amazing', 'fantastic', 'wonderful', 'perfect', 'outstanding', 'brilliant', 'superb'], | |
| 'medium_positive': ['good', 'great', 'love', 'best', 'awesome', 'nice', 'satisfied', 'happy'], | |
| 'low_positive': ['fine', 'decent', 'acceptable', 'adequate', 'reasonable'] | |
| }, | |
| 'question': ['how', 'what', 'when', 'where', 'why', 'which', 'who', '?', 'can you', 'could you', 'is it possible'], | |
| 'suggestion': ['should', 'could', 'would', 'recommend', 'suggest', 'improve', 'better', 'enhancement', 'feature request'], | |
| 'comparison': ['better than', 'worse than', 'compared to', 'versus', 'vs', 'similar to', 'different from'], | |
| 'neutral': ['okay', 'fine', 'average', 'normal', 'standard', 'typical', 'nothing special'] | |
| } | |
| def classify_intent_with_severity(cls, review: str) -> Tuple[str, str, float]: | |
| """ | |
| Classify intent with severity/type scoring. | |
| Args: | |
| review: Review text | |
| Returns: | |
| Tuple of (intent, severity/type, confidence_score) | |
| """ | |
| review_lower = review.lower() | |
| # Check for complaints with severity | |
| complaint_scores = {} | |
| for severity, keywords in cls.INTENT_KEYWORDS['complaint'].items(): | |
| score = sum(1 for keyword in keywords if keyword in review_lower) | |
| if score > 0: | |
| complaint_scores[severity] = score | |
| if complaint_scores: | |
| severity = max(complaint_scores, key=complaint_scores.get) | |
| confidence = min(complaint_scores[severity] / 3.0, 1.0) # Normalize confidence | |
| return 'complaint', severity, confidence | |
| # Check for praise with positivity level | |
| praise_scores = {} | |
| for positivity, keywords in cls.INTENT_KEYWORDS['praise'].items(): | |
| score = sum(1 for keyword in keywords if keyword in review_lower) | |
| if score > 0: | |
| praise_scores[positivity] = score | |
| if praise_scores: | |
| positivity = max(praise_scores, key=praise_scores.get) | |
| confidence = min(praise_scores[positivity] / 3.0, 1.0) | |
| return 'praise', positivity, confidence | |
| # Check other intents | |
| for intent, keywords in cls.INTENT_KEYWORDS.items(): | |
| if intent not in ['complaint', 'praise']: | |
| score = sum(1 for keyword in keywords if keyword in review_lower) | |
| if score > 0: | |
| confidence = min(score / 2.0, 1.0) | |
| return intent, 'standard', confidence | |
| return 'neutral', 'standard', 0.1 | |
| def classify_batch_enhanced(cls, reviews: List[str]) -> List[Dict[str, Any]]: | |
| """Classify intents with enhanced information for a batch of reviews.""" | |
| results = [] | |
| for review in reviews: | |
| intent, severity_type, confidence = cls.classify_intent_with_severity(review) | |
| results.append({ | |
| 'intent': intent, | |
| 'severity_type': severity_type, | |
| 'confidence': confidence | |
| }) | |
| return results | |
| class AspectAnalytics: | |
| """Advanced analytics for aspect analysis including priority scoring and co-occurrence.""" | |
| def calculate_aspect_scores(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: | |
| """ | |
| Calculate priority scores for negative aspects and strength scores for positive aspects. | |
| Args: | |
| df: Processed dataframe with aspects and sentiments | |
| Returns: | |
| Tuple of (areas_of_improvement_df, strength_anchors_df) | |
| """ | |
| aspect_data = [] | |
| # Extract aspect-sentiment pairs with intent information | |
| for idx, row in df.iterrows(): | |
| aspects = row['aspects'] if isinstance(row['aspects'], list) else [] | |
| sentiments = row['aspect_sentiments'] if isinstance(row['aspect_sentiments'], list) else [] | |
| intent = row['intent'] | |
| intent_severity = row.get('intent_severity', 'standard') | |
| date = row['date'] | |
| for aspect, sentiment in zip(aspects, sentiments): | |
| aspect_data.append({ | |
| 'aspect': aspect, | |
| 'sentiment': sentiment, | |
| 'intent': intent, | |
| 'intent_severity': intent_severity, | |
| 'date': date, | |
| 'review_id': row['id'] | |
| }) | |
| if not aspect_data: | |
| empty_df = pd.DataFrame(columns=['aspect', 'score', 'frequency', 'sentiment_ratio']) | |
| return empty_df, empty_df | |
| aspect_df = pd.DataFrame(aspect_data) | |
| # Calculate metrics for each aspect | |
| aspect_metrics = {} | |
| for aspect in aspect_df['aspect'].unique(): | |
| aspect_subset = aspect_df[aspect_df['aspect'] == aspect] | |
| total_count = len(aspect_subset) | |
| positive_count = len(aspect_subset[aspect_subset['sentiment'] == 'Positive']) | |
| negative_count = len(aspect_subset[aspect_subset['sentiment'] == 'Negative']) | |
| # Calculate sentiment ratios | |
| positivity_ratio = positive_count / total_count if total_count > 0 else 0 | |
| negativity_ratio = negative_count / total_count if total_count > 0 else 0 | |
| # Calculate intent severity weighting | |
| complaint_subset = aspect_subset[aspect_subset['intent'] == 'complaint'] | |
| severity_weight = 0 | |
| if len(complaint_subset) > 0: | |
| severity_mapping = {'high_severity': 3, 'medium_severity': 2, 'low_severity': 1, 'standard': 1} | |
| severity_weight = complaint_subset['intent_severity'].map(severity_mapping).mean() | |
| # Calculate priority score for negative aspects | |
| priority_score = negativity_ratio * total_count * (1 + severity_weight) | |
| # Calculate strength score for positive aspects | |
| strength_score = positivity_ratio * total_count * (1 + (positivity_ratio * 2)) | |
| aspect_metrics[aspect] = { | |
| 'frequency': total_count, | |
| 'positivity_ratio': positivity_ratio, | |
| 'negativity_ratio': negativity_ratio, | |
| 'priority_score': priority_score, | |
| 'strength_score': strength_score, | |
| 'intent_severity': severity_weight | |
| } | |
| # Create Areas of Improvement DataFrame (top negative aspects) | |
| improvement_data = [] | |
| for aspect, metrics in aspect_metrics.items(): | |
| if metrics['negativity_ratio'] > 0.1: # Only include aspects with >10% negativity | |
| improvement_data.append({ | |
| 'aspect': aspect, | |
| 'negativity_pct': round(metrics['negativity_ratio'] * 100, 1), | |
| 'intent_severity': round(metrics['intent_severity'], 2), | |
| 'frequency': metrics['frequency'], | |
| 'priority_score': round(metrics['priority_score'], 2) | |
| }) | |
| # Handle empty DataFrame case | |
| if improvement_data: | |
| areas_of_improvement = pd.DataFrame(improvement_data).sort_values('priority_score', ascending=False) | |
| else: | |
| # Return empty DataFrame with correct columns | |
| areas_of_improvement = pd.DataFrame(columns=['aspect', 'negativity_pct', 'intent_severity', 'frequency', 'priority_score']) | |
| # Create Strength Anchors DataFrame (top positive aspects) | |
| strength_data = [] | |
| for aspect, metrics in aspect_metrics.items(): | |
| if metrics['positivity_ratio'] > 0.3: # Only include aspects with >30% positivity | |
| strength_data.append({ | |
| 'aspect': aspect, | |
| 'positivity_pct': round(metrics['positivity_ratio'] * 100, 1), | |
| 'intent_type': 'praise', # Simplified for now | |
| 'frequency': metrics['frequency'], | |
| 'strength_score': round(metrics['strength_score'], 2) | |
| }) | |
| # Handle empty DataFrame case | |
| if strength_data: | |
| strength_anchors = pd.DataFrame(strength_data).sort_values('strength_score', ascending=False) | |
| else: | |
| # Return empty DataFrame with correct columns | |
| strength_anchors = pd.DataFrame(columns=['aspect', 'positivity_pct', 'intent_type', 'frequency', 'strength_score']) | |
| return areas_of_improvement, strength_anchors | |
| def calculate_aspect_cooccurrence(df: pd.DataFrame) -> nx.Graph: | |
| """ | |
| Calculate aspect co-occurrence for network analysis. | |
| Args: | |
| df: Processed dataframe with aspects | |
| Returns: | |
| NetworkX graph with aspect co-occurrence data | |
| """ | |
| G = nx.Graph() | |
| # Calculate co-occurrence matrix | |
| cooccurrence_counts = defaultdict(int) | |
| aspect_sentiments = defaultdict(list) | |
| aspect_frequencies = defaultdict(int) | |
| for idx, row in df.iterrows(): | |
| aspects = row['aspects'] if isinstance(row['aspects'], list) else [] | |
| sentiments = row['aspect_sentiments'] if isinstance(row['aspect_sentiments'], list) else [] | |
| # Count individual aspects | |
| for aspect, sentiment in zip(aspects, sentiments): | |
| aspect_frequencies[aspect] += 1 | |
| aspect_sentiments[aspect].append(sentiment) | |
| # Count co-occurrences | |
| for aspect1, aspect2 in combinations(aspects, 2): | |
| pair = tuple(sorted([aspect1, aspect2])) | |
| cooccurrence_counts[pair] += 1 | |
| # Add nodes with attributes | |
| for aspect, frequency in aspect_frequencies.items(): | |
| sentiments = aspect_sentiments[aspect] | |
| positive_pct = sentiments.count('Positive') / len(sentiments) if sentiments else 0 | |
| negative_pct = sentiments.count('Negative') / len(sentiments) if sentiments else 0 | |
| # Determine overall sentiment color | |
| if positive_pct > negative_pct: | |
| color = 'green' | |
| sentiment_score = positive_pct | |
| elif negative_pct > positive_pct: | |
| color = 'red' | |
| sentiment_score = -negative_pct | |
| else: | |
| color = 'gray' | |
| sentiment_score = 0 | |
| G.add_node(aspect, | |
| frequency=frequency, | |
| sentiment_score=sentiment_score, | |
| color=color, | |
| positive_pct=positive_pct, | |
| negative_pct=negative_pct) | |
| # Add edges with weights | |
| for (aspect1, aspect2), count in cooccurrence_counts.items(): | |
| if count >= 2: # Only include co-occurrences that happen at least twice | |
| G.add_edge(aspect1, aspect2, weight=count) | |
| return G | |
| def detect_sentiment_spikes(df: pd.DataFrame, window_days: int = 7) -> List[Dict[str, Any]]: | |
| """ | |
| Detect week-over-week spikes in negative sentiment for aspects. | |
| Args: | |
| df: Processed dataframe with date and aspect information | |
| window_days: Number of days for the rolling window | |
| Returns: | |
| List of alerts for aspects with significant negative spikes | |
| """ | |
| alerts = [] | |
| if len(df) < 2: | |
| return alerts | |
| # Extract aspect-sentiment data with dates | |
| aspect_data = [] | |
| for idx, row in df.iterrows(): | |
| aspects = row['aspects'] if isinstance(row['aspects'], list) else [] | |
| sentiments = row['aspect_sentiments'] if isinstance(row['aspect_sentiments'], list) else [] | |
| date = row['date'] | |
| for aspect, sentiment in zip(aspects, sentiments): | |
| aspect_data.append({ | |
| 'aspect': aspect, | |
| 'sentiment': sentiment, | |
| 'date': date | |
| }) | |
| if not aspect_data: | |
| return alerts | |
| aspect_df = pd.DataFrame(aspect_data) | |
| aspect_df['date'] = pd.to_datetime(aspect_df['date']) | |
| # Group by aspect and analyze trends | |
| for aspect in aspect_df['aspect'].unique(): | |
| aspect_subset = aspect_df[aspect_df['aspect'] == aspect] | |
| if len(aspect_subset) < 4: # Need minimum data points | |
| continue | |
| # Create daily negative sentiment counts | |
| daily_negative = aspect_subset[aspect_subset['sentiment'] == 'Negative'].groupby( | |
| aspect_subset['date'].dt.date | |
| ).size().reindex( | |
| pd.date_range(aspect_subset['date'].min().date(), | |
| aspect_subset['date'].max().date()).date, | |
| fill_value=0 | |
| ) | |
| if len(daily_negative) >= window_days * 2: | |
| # Calculate rolling averages | |
| recent_avg = daily_negative.tail(window_days).mean() | |
| previous_avg = daily_negative.iloc[-(window_days*2):-window_days].mean() | |
| # Check for spike (>50% increase and at least 2 more complaints) | |
| if recent_avg > previous_avg * 1.5 and (recent_avg - previous_avg) >= 2: | |
| spike_magnitude = ((recent_avg - previous_avg) / previous_avg * 100) if previous_avg > 0 else 100 | |
| alerts.append({ | |
| 'aspect': aspect, | |
| 'spike_magnitude': round(spike_magnitude, 1), | |
| 'recent_avg_negative': round(recent_avg, 1), | |
| 'previous_avg_negative': round(previous_avg, 1), | |
| 'alert_severity': 'high' if spike_magnitude > 100 else 'medium' | |
| }) | |
| return sorted(alerts, key=lambda x: x['spike_magnitude'], reverse=True) | |
| class SummaryGenerator: | |
| """Generates macro and micro-level summaries for aspects and overall sentiment.""" | |
| def generate_macro_summary(df: pd.DataFrame, areas_of_improvement: pd.DataFrame, | |
| strength_anchors: pd.DataFrame) -> Dict[str, str]: | |
| """ | |
| Generate high-level summary of sentiment analysis. | |
| Args: | |
| df: Processed dataframe | |
| areas_of_improvement: Problem areas dataframe | |
| strength_anchors: Strength areas dataframe | |
| Returns: | |
| Dictionary with macro-level insights | |
| """ | |
| total_reviews = len(df) | |
| positive_pct = (df['overall_sentiment'] == 'Positive').mean() * 100 | |
| negative_pct = (df['overall_sentiment'] == 'Negative').mean() * 100 | |
| # Top issues and strengths | |
| top_issue = areas_of_improvement.iloc[0]['aspect'] if len(areas_of_improvement) > 0 else "None identified" | |
| top_strength = strength_anchors.iloc[0]['aspect'] if len(strength_anchors) > 0 else "None identified" | |
| # Intent distribution | |
| complaint_pct = (df['intent'] == 'complaint').mean() * 100 | |
| summary = { | |
| 'overall_sentiment': f"Out of {total_reviews} reviews, {positive_pct:.1f}% are positive and {negative_pct:.1f}% are negative.", | |
| 'top_issues': f"Primary concern: '{top_issue}' - requires immediate attention based on complaint frequency and severity.", | |
| 'top_strengths': f"Greatest strength: '{top_strength}' - leverage this positive aspect in marketing and product positioning.", | |
| 'intent_insights': f"{complaint_pct:.1f}% of reviews contain complaints, indicating specific areas for product improvement.", | |
| 'recommendation': "Focus on addressing top negative aspects while maintaining and promoting strengths." | |
| } | |
| return summary | |
| def generate_aspect_micro_summaries(df: pd.DataFrame, top_aspects: List[str], | |
| max_aspects: int = 5) -> Dict[str, str]: | |
| """ | |
| Generate detailed summaries for specific aspects. | |
| Args: | |
| df: Processed dataframe | |
| top_aspects: List of aspects to analyze | |
| max_aspects: Maximum number of aspects to summarize | |
| Returns: | |
| Dictionary with aspect-specific insights | |
| """ | |
| summaries = {} | |
| for aspect in top_aspects[:max_aspects]: | |
| # Filter reviews mentioning this aspect | |
| aspect_reviews = [] | |
| aspect_sentiments = [] | |
| for idx, row in df.iterrows(): | |
| aspects = row['aspects'] if isinstance(row['aspects'], list) else [] | |
| sentiments = row['aspect_sentiments'] if isinstance(row['aspect_sentiments'], list) else [] | |
| if aspect in aspects: | |
| aspect_idx = aspects.index(aspect) | |
| if aspect_idx < len(sentiments): | |
| aspect_reviews.append(row['translated_review']) | |
| aspect_sentiments.append(sentiments[aspect_idx]) | |
| if not aspect_reviews: | |
| continue | |
| positive_count = aspect_sentiments.count('Positive') | |
| negative_count = aspect_sentiments.count('Negative') | |
| total_count = len(aspect_sentiments) | |
| # Generate contextual summary | |
| if negative_count > positive_count: | |
| sentiment_trend = "predominantly negative" | |
| key_issues = "Issues include quality concerns, performance problems, and user dissatisfaction." | |
| elif positive_count > negative_count: | |
| sentiment_trend = "predominantly positive" | |
| key_issues = "Users appreciate the quality, performance, and overall experience." | |
| else: | |
| sentiment_trend = "mixed" | |
| key_issues = "Reviews show both satisfaction and areas for improvement." | |
| summaries[aspect] = f"'{aspect}' mentioned in {total_count} reviews with {sentiment_trend} sentiment ({positive_count} positive, {negative_count} negative). {key_issues}" | |
| return summaries | |
| class DataProcessor: | |
| """Enhanced main data processing pipeline coordinator.""" | |
| def __init__(self): | |
| self.translator = TranslationService() | |
| self.absa_processor = ABSAProcessor() | |
| self.validator = DataValidator() | |
| self.analytics = AspectAnalytics() | |
| self.summary_generator = SummaryGenerator() | |
| self.task_manager = None # Will be set by API | |
| def set_task_manager(self, task_manager): | |
| """Set task manager for cancellation support.""" | |
| self.task_manager = task_manager | |
| # Also set it for ABSA processor | |
| self.absa_processor.set_task_manager(task_manager) | |
| def process_uploaded_data(self, df: pd.DataFrame, task_id: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Complete enhanced processing pipeline for uploaded data with cancellation support. | |
| Args: | |
| df: Uploaded dataframe | |
| task_id: Optional task ID for cancellation tracking | |
| Returns: | |
| Dictionary containing all processed results with advanced analytics | |
| """ | |
| # Import gc for cleanup | |
| import gc | |
| # Check cancellation at start | |
| if task_id and self.task_manager and self.task_manager.is_cancelled(task_id): | |
| return {'status': 'cancelled', 'message': 'Task cancelled by user'} | |
| # Validate data | |
| if task_id and self.task_manager: | |
| self.task_manager.update_task(task_id, stage='validation', progress=5, status='processing') | |
| is_valid, errors = self.validator.validate_csv(df) | |
| if not is_valid: | |
| return {'error': errors} | |
| # Clean data | |
| df_clean = self.validator.clean_data(df) | |
| reviews = df_clean['review'].tolist() | |
| # Check cancellation before translation | |
| if task_id and self.task_manager and self.task_manager.is_cancelled(task_id): | |
| del df_clean, reviews | |
| gc.collect() | |
| return {'status': 'cancelled', 'message': 'Task cancelled during validation'} | |
| # Translation with batch processing and cancellation checks | |
| if task_id and self.task_manager: | |
| self.task_manager.update_task(task_id, stage='translation', progress=10) | |
| with st.spinner("Translating reviews..."): | |
| translated_reviews = [] | |
| detected_languages = [] | |
| # Process in batches to allow cancellation | |
| batch_size = 10 | |
| for i in range(0, len(reviews), batch_size): | |
| # Check cancellation | |
| if task_id and self.task_manager and self.task_manager.is_cancelled(task_id): | |
| del df_clean, reviews, translated_reviews, detected_languages | |
| gc.collect() | |
| return {'status': 'cancelled', 'message': 'Task cancelled during translation'} | |
| batch = reviews[i:i+batch_size] | |
| batch_translated, batch_langs = self.translator.process_reviews(batch) | |
| translated_reviews.extend(batch_translated) | |
| detected_languages.extend(batch_langs) | |
| # Update progress (10-40%) | |
| if task_id and self.task_manager: | |
| progress = 10 + int((i / len(reviews)) * 30) | |
| self.task_manager.update_task(task_id, progress=progress) | |
| # Check cancellation before intent classification | |
| if task_id and self.task_manager and self.task_manager.is_cancelled(task_id): | |
| del df_clean, reviews, translated_reviews, detected_languages | |
| gc.collect() | |
| return {'status': 'cancelled', 'message': 'Task cancelled after translation'} | |
| # Enhanced intent classification | |
| if task_id and self.task_manager: | |
| self.task_manager.update_task(task_id, stage='intent_classification', progress=40) | |
| with st.spinner("Classifying intents with severity analysis..."): | |
| intent_results = IntentClassifier.classify_batch_enhanced(translated_reviews) | |
| # Check cancellation before ABSA (most expensive step) | |
| if task_id and self.task_manager and self.task_manager.is_cancelled(task_id): | |
| del df_clean, reviews, translated_reviews, detected_languages, intent_results | |
| gc.collect() | |
| return {'status': 'cancelled', 'message': 'Task cancelled after intent classification'} | |
| # ABSA processing with cancellation support | |
| if task_id and self.task_manager: | |
| self.task_manager.update_task(task_id, stage='aspect_extraction', progress=50) | |
| with st.spinner("Extracting aspects and sentiments..."): | |
| absa_results = self.absa_processor.extract_aspects_and_sentiments( | |
| translated_reviews, | |
| task_id=task_id | |
| ) | |
| # Check if ABSA was cancelled | |
| if isinstance(absa_results, dict) and absa_results.get('status') == 'cancelled': | |
| del df_clean, reviews, translated_reviews, detected_languages, intent_results | |
| gc.collect() | |
| return absa_results | |
| # Combine results with enhanced structure | |
| if task_id and self.task_manager: | |
| self.task_manager.update_task(task_id, stage='combining_results', progress=90) | |
| df_processed = df_clean.copy() | |
| df_processed['translated_review'] = translated_reviews | |
| df_processed['detected_language'] = detected_languages | |
| df_processed['intent'] = [r['intent'] for r in intent_results] | |
| df_processed['intent_severity'] = [r['severity_type'] for r in intent_results] | |
| df_processed['intent_confidence'] = [r['confidence'] for r in intent_results] | |
| # Add ABSA results with enhanced structure | |
| aspects_list = [] | |
| aspect_sentiments_list = [] | |
| overall_sentiment = [] | |
| for result in absa_results: | |
| aspects_list.append(result['aspects']) | |
| aspect_sentiments_list.append(result['sentiments']) | |
| # Calculate overall sentiment | |
| if result['sentiments']: | |
| positive_count = result['sentiments'].count('Positive') | |
| negative_count = result['sentiments'].count('Negative') | |
| if positive_count > negative_count: | |
| overall_sentiment.append('Positive') | |
| elif negative_count > positive_count: | |
| overall_sentiment.append('Negative') | |
| else: | |
| overall_sentiment.append('Neutral') | |
| else: | |
| overall_sentiment.append('Neutral') | |
| df_processed['aspects'] = aspects_list | |
| df_processed['aspect_sentiments'] = aspect_sentiments_list | |
| df_processed['overall_sentiment'] = overall_sentiment | |
| # Advanced analytics | |
| if task_id and self.task_manager: | |
| self.task_manager.update_task(task_id, stage='analytics', progress=95) | |
| with st.spinner("Calculating aspect analytics and priority scores..."): | |
| areas_of_improvement, strength_anchors = self.analytics.calculate_aspect_scores(df_processed) | |
| aspect_network = self.analytics.calculate_aspect_cooccurrence(df_processed) | |
| sentiment_alerts = self.analytics.detect_sentiment_spikes(df_processed) | |
| # Generate summaries | |
| with st.spinner("Generating AI-powered summaries..."): | |
| macro_summary = self.summary_generator.generate_macro_summary( | |
| df_processed, areas_of_improvement, strength_anchors | |
| ) | |
| # Get top aspects for micro summaries | |
| top_negative_aspects = areas_of_improvement['aspect'].head(3).tolist() if len(areas_of_improvement) > 0 else [] | |
| top_positive_aspects = strength_anchors['aspect'].head(3).tolist() if len(strength_anchors) > 0 else [] | |
| top_aspects = top_negative_aspects + top_positive_aspects | |
| micro_summaries = self.summary_generator.generate_aspect_micro_summaries( | |
| df_processed, top_aspects | |
| ) | |
| # Mark task as complete | |
| if task_id and self.task_manager: | |
| self.task_manager.update_task(task_id, stage='completed', progress=100) | |
| # ========== NEW: ASPECT-LEVEL DATA TRANSFORMATION ========== | |
| aspect_level_data = [] | |
| mixed_sentiment_reviews = [] | |
| for idx, row in df_processed.iterrows(): | |
| aspects = row['aspects'] if isinstance(row['aspects'], list) else [] | |
| aspect_sentiments = row['aspect_sentiments'] if isinstance(row['aspect_sentiments'], list) else [] | |
| # Check for mixed sentiments (conflicting aspect sentiments) | |
| unique_sentiments = set(aspect_sentiments) | |
| is_mixed = ('Positive' in unique_sentiments and 'Negative' in unique_sentiments) | |
| if is_mixed: | |
| mixed_sentiment_reviews.append({ | |
| 'review_id': row['id'], | |
| 'review': row['review'], | |
| 'aspects': aspects, | |
| 'aspect_sentiments': aspect_sentiments, | |
| 'intent': row['intent'], | |
| 'date': row['date'] | |
| }) | |
| # Create aspect-level records | |
| for aspect, sentiment in zip(aspects, aspect_sentiments): | |
| aspect_level_data.append({ | |
| 'review_id': row['id'], | |
| 'review': row['review'], | |
| 'aspect': aspect, | |
| 'aspect_sentiment': sentiment, | |
| 'overall_sentiment': row['overall_sentiment'], | |
| 'intent': row['intent'], | |
| 'intent_severity': row['intent_severity'], | |
| 'date': row['date'], | |
| 'language': row['detected_language'] | |
| }) | |
| aspect_level_df = pd.DataFrame(aspect_level_data) if aspect_level_data else pd.DataFrame() | |
| mixed_sentiment_df = pd.DataFrame(mixed_sentiment_reviews) if mixed_sentiment_reviews else pd.DataFrame() | |
| return { | |
| 'processed_data': df_processed, | |
| 'aspect_level_data': aspect_level_df, # NEW: Aspect-level granular data | |
| 'mixed_sentiment_reviews': mixed_sentiment_df, # NEW: Mixed sentiment detection | |
| 'absa_details': absa_results, | |
| 'areas_of_improvement': areas_of_improvement, | |
| 'strength_anchors': strength_anchors, | |
| 'aspect_network': aspect_network, | |
| 'sentiment_alerts': sentiment_alerts, | |
| 'macro_summary': macro_summary, | |
| 'micro_summaries': micro_summaries, | |
| 'summary': { | |
| 'total_reviews': len(df_processed), | |
| 'total_aspects': len(aspect_level_df), | |
| 'mixed_sentiment_count': len(mixed_sentiment_df), | |
| 'mixed_sentiment_pct': round(len(mixed_sentiment_df) / len(df_processed) * 100, 1) if len(df_processed) > 0 else 0, | |
| 'languages_detected': list(set(detected_languages)), | |
| 'intents_distribution': pd.Series([r['intent'] for r in intent_results]).value_counts().to_dict(), | |
| 'sentiment_distribution': pd.Series(overall_sentiment).value_counts().to_dict(), | |
| 'top_problem_areas': len(areas_of_improvement), | |
| 'top_strength_anchors': len(strength_anchors), | |
| 'active_alerts': len(sentiment_alerts) | |
| } | |
| } |