""" Enhanced data processing pipeline for advanced sentiment analysis application. Handles translation, ABSA, intent classification, priority scoring, and co-occurrence analysis. """ import pandas as pd import numpy as np from typing import Dict, List, Tuple, Any, Optional import re from datetime import datetime, timedelta import logging from langdetect import detect import streamlit as st from collections import Counter, defaultdict from itertools import combinations import networkx as nx import requests import os import time # Suppress GitPython warnings for PyABSA os.environ['GIT_PYTHON_REFRESH'] = 'quiet' # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class DataValidator: """Validates uploaded CSV data format and content.""" REQUIRED_COLUMNS = ['id', 'reviews_title', 'review', 'date', 'user_id'] @staticmethod def validate_csv(df: pd.DataFrame) -> Tuple[bool, List[str]]: """ Validate CSV format and content. Args: df: Uploaded dataframe Returns: Tuple of (is_valid, error_messages) """ errors = [] # Check required columns missing_cols = set(DataValidator.REQUIRED_COLUMNS) - set(df.columns) if missing_cols: errors.append(f"Missing required columns: {missing_cols}") if not errors: # Check for empty values if df['review'].isnull().any() or (df['review'] == '').any(): errors.append("Found empty review entries") # Validate date format try: pd.to_datetime(df['date'], errors='coerce') if df['date'].isnull().any(): errors.append("Invalid date format detected") except Exception as e: errors.append(f"Date validation error: {str(e)}") return len(errors) == 0, errors @staticmethod def clean_data(df: pd.DataFrame) -> pd.DataFrame: """Clean and preprocess the data.""" df = df.copy() # Convert date column df['date'] = pd.to_datetime(df['date'], errors='coerce') # Clean text data df['review'] = df['review'].astype(str).str.strip() df['reviews_title'] = df['reviews_title'].astype(str).str.strip() # Remove rows with null reviews df = df.dropna(subset=['review']) # Remove duplicate reviews df = df.drop_duplicates(subset=['review'], keep='first') return df.reset_index(drop=True) class TranslationService: """Handles translation using HF Inference API - much more reliable than local models.""" def __init__(self): self.api_token = self._get_hf_token() # Using AI4Bharat IndicTrans2 - specialized for Indian languages self.translation_model = "ai4bharat/indictrans2-en-indic-1.3B" self.base_url = "https://router.huggingface.co" logger.info("Initialized HF Inference API for translation (IndicTrans2)") def _get_hf_token(self) -> Optional[str]: """Get HF token from environment or Streamlit secrets.""" try: return st.secrets["HF_TOKEN"] except: pass token = os.getenv("HF_TOKEN") if not token: logger.warning("No HF_TOKEN found. Translation will be limited.") else: # Log first 10 chars to verify token is loaded (without exposing full token) logger.info(f"HF_TOKEN loaded successfully (starts with: {token[:10]}...)") return token def _call_hf_translation_api(self, text: str, source_lang: str = "hi", target_lang: str = "en") -> str: """Call HF Translation API with fallback.""" if not self.api_token: logger.debug("No API token, skipping translation") return text try: headers = {"Authorization": f"Bearer {self.api_token}"} # Try using serverless inference endpoint url = f"https://api-inference.huggingface.co/models/{self.translation_model}" # IndicTrans2 requires simple input format payload = {"inputs": text} response = requests.post(url, headers=headers, json=payload, timeout=10) if response.status_code == 200: result = response.json() # IndicTrans2 returns: [{"translation_text": "..."}] or {"generated_text": "..."} if isinstance(result, list) and len(result) > 0: translated = result[0].get("translation_text", "") or result[0].get("generated_text", "") if translated: return translated elif isinstance(result, dict): translated = result.get("generated_text", "") or result.get("translation_text", "") if translated: return translated # Silently fallback to original text (translation is optional) logger.debug(f"Translation unavailable, using original text (status: {response.status_code})") return text except Exception as e: logger.debug(f"Translation skipped: {str(e)}") return text def detect_language(self, text: str) -> str: """Detect language of the text.""" try: lang = detect(text) return lang except: return 'unknown' def translate_to_english(self, text: str, source_lang: str = 'hi') -> str: """ Translate text to English using HF API. Args: text: Text to translate source_lang: Source language code Returns: Translated text """ if source_lang == 'en' or source_lang == 'unknown': return text return self._call_hf_translation_api(text, source_lang, "en") def process_reviews(self, reviews: List[str]) -> Tuple[List[str], List[str]]: """ Process list of reviews for translation. Args: reviews: List of review texts Returns: Tuple of (translated_reviews, detected_languages) """ translated_reviews = [] detected_languages = [] for i, review in enumerate(reviews): if i % 20 == 0: # Progress logging logger.info(f"Processing translation {i+1}/{len(reviews)}") lang = self.detect_language(review) detected_languages.append(lang) if lang == 'hi': # Hindi detected translated = self.translate_to_english(review, 'hi') translated_reviews.append(translated) else: translated_reviews.append(review) # Keep original if not Hindi return translated_reviews, detected_languages class ABSAProcessor: """Enhanced ABSA using PyABSA for accurate aspect extraction and sentiment analysis.""" def __init__(self): self.model = None self.task_manager = None self._load_pyabsa_model() logger.info("Initialized PyABSA for ABSA processing") def _load_pyabsa_model(self): """Load PyABSA multilingual model with caching.""" try: # Suppress additional git warnings import warnings warnings.filterwarnings('ignore', category=DeprecationWarning) # PyABSA v2.4.2 correct API - use AspectTermExtraction module from pyabsa import AspectTermExtraction as ATEPC # Check if git is available try: import subprocess git_check = subprocess.run(['git', '--version'], capture_output=True, timeout=5) if git_check.returncode == 0: logger.info(f"Git available: {git_check.stdout.decode().strip()}") else: logger.warning("Git command failed, PyABSA may have issues downloading checkpoints") except Exception as git_err: logger.warning(f"Git not found or not executable: {str(git_err)}") # Load aspect extractor using correct PyABSA v2.4.2 API logger.info("Loading PyABSA ATEPC multilingual model...") logger.info("This may take a few minutes on first run (downloading checkpoint)...") # Use AspectExtractor class - the correct way in v2.4.2 self.model = ATEPC.AspectExtractor('multilingual') logger.info("✅ PyABSA model loaded successfully") except Exception as e: import traceback logger.error(f"❌ Failed to load PyABSA model: {str(e)}") logger.error(f"Traceback: {traceback.format_exc()}") logger.warning("⚠️ Using fallback method for aspect extraction") self.model = None def set_task_manager(self, task_manager): """Set task manager for cancellation support.""" self.task_manager = task_manager def _get_hf_token(self) -> Optional[str]: """Get HF token from environment or Streamlit secrets.""" # Try Streamlit secrets first try: return st.secrets["HF_TOKEN"] except: pass # Try environment variable token = os.getenv("HF_TOKEN") if not token: logger.warning("No HF_TOKEN found. Some features may be limited.") return token def _call_hf_api(self, model_name: str, inputs: str, max_retries: int = 3) -> Dict: """Call HF Inference API with retry logic.""" headers = {} if self.api_token: headers["Authorization"] = f"Bearer {self.api_token}" url = f"{self.base_url}/{model_name}" payload = {"inputs": inputs} for attempt in range(max_retries): try: response = requests.post(url, headers=headers, json=payload, timeout=30) if response.status_code == 503: # Model is loading, wait and retry wait_time = 2 ** attempt # Exponential backoff logger.info(f"Model loading, waiting {wait_time}s before retry...") time.sleep(wait_time) continue response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logger.error(f"API call failed (attempt {attempt + 1}): {str(e)}") if attempt == max_retries - 1: return {"error": str(e)} time.sleep(1) return {"error": "Max retries exceeded"} def extract_aspects_and_sentiments(self, reviews: List[str], task_id: Optional[str] = None) -> List[Dict[str, Any]]: """ Extract aspects and sentiments using PyABSA with fallback and cancellation support. Args: reviews: List of review texts (preferably in English after translation) task_id: Optional task ID for cancellation tracking Returns: List of dictionaries containing extracted information or cancellation dict """ import gc logger.info(f"Processing {len(reviews)} reviews with PyABSA") processed_results = [] batch_size = 5 # Process 5 reviews at a time for responsive cancellation for batch_start in range(0, len(reviews), batch_size): # Check cancellation before each batch if task_id and self.task_manager and self.task_manager.is_cancelled(task_id): logger.info(f"ABSA processing cancelled at review {batch_start}/{len(reviews)}") del processed_results gc.collect() return {'status': 'cancelled', 'message': 'ABSA processing cancelled by user'} batch_end = min(batch_start + batch_size, len(reviews)) batch_reviews = reviews[batch_start:batch_end] # Process batch for i, review in enumerate(batch_reviews): review_idx = batch_start + i if review_idx % 10 == 0: # Progress logging logger.info(f"Processing review {review_idx+1}/{len(reviews)}") # Try PyABSA first, fallback to rule-based if unavailable if self.model is not None: try: result = self._extract_with_pyabsa(review) except Exception as e: logger.warning(f"PyABSA failed for review {review_idx}: {str(e)}, using fallback") result = self._extract_with_fallback(review) else: result = self._extract_with_fallback(review) processed_results.append(result) # Update progress after each batch (50-90% range) if task_id and self.task_manager: progress = 50 + int((batch_end / len(reviews)) * 40) self.task_manager.update_task(task_id, progress=progress) logger.info(f"Successfully processed {len(processed_results)} reviews") return processed_results def _extract_with_pyabsa(self, review: str) -> Dict[str, Any]: """Extract aspects and sentiments using PyABSA model.""" result = self.model.predict(review, print_result=False, save_result=False) # PyABSA returns: aspect, sentiment, confidence, position aspects = result.get('aspect', []) sentiments = result.get('sentiment', []) positions = result.get('position', []) confidence_scores = result.get('confidence', []) # Handle single aspect case if not isinstance(aspects, list): aspects = [aspects] if aspects else [] sentiments = [sentiments] if sentiments else [] positions = [positions] if positions else [] confidence_scores = [confidence_scores] if confidence_scores else [] # If no aspects found, use fallback if not aspects: return self._extract_with_fallback(review) return { 'sentence': review, 'aspects': aspects, 'sentiments': sentiments, 'positions': positions, 'confidence_scores': confidence_scores, 'tokens': review.split(), 'iob_tags': ['O'] * len(review.split()) } def _extract_with_fallback(self, review: str) -> Dict[str, Any]: """Fallback rule-based extraction when PyABSA is unavailable.""" sentiment = self._get_rule_based_sentiment(review) aspects = self._extract_simple_aspects(review) return { 'sentence': review, 'aspects': aspects, 'sentiments': [sentiment] * len(aspects), 'positions': [[0, len(review)]] * len(aspects), 'confidence_scores': [0.7] * len(aspects), # Lower confidence for rule-based 'tokens': review.split(), 'iob_tags': ['O'] * len(review.split()) } def _get_hf_sentiment(self, text: str) -> str: """Get sentiment from HF Inference API with fallback.""" if not self.api_token: # Fallback to rule-based if no API token return self._get_rule_based_sentiment(text) try: result = self._call_hf_api(self.sentiment_model, text) if "error" in result: logger.warning(f"API error, using rule-based fallback: {result['error']}") return self._get_rule_based_sentiment(text) # Parse HF sentiment result if isinstance(result, list) and len(result) > 0: predictions = result[0] if isinstance(predictions, list) and len(predictions) > 0: top_prediction = max(predictions, key=lambda x: x.get('score', 0)) label = top_prediction.get('label', 'NEUTRAL').upper() # Map HF labels to our format if 'POSITIVE' in label or 'POS' in label: return 'Positive' elif 'NEGATIVE' in label or 'NEG' in label: return 'Negative' else: return 'Neutral' # Fallback if parsing fails return self._get_rule_based_sentiment(text) except Exception as e: logger.error(f"HF API error: {str(e)}, using rule-based fallback") return self._get_rule_based_sentiment(text) def _get_rule_based_sentiment(self, review: str) -> str: """Fallback rule-based sentiment analysis with enhanced negative detection.""" review_lower = review.lower() # Enhanced sentiment words positive_words = ['good', 'great', 'excellent', 'amazing', 'love', 'best', 'awesome', 'fantastic', 'wonderful', 'perfect', 'satisfied', 'happy', 'pleased', 'outstanding', 'brilliant', 'superb', 'delighted', 'impressed', 'working', 'अच्छा', 'बढ़िया', 'शानदार', 'बेहतरीन'] negative_words = ['bad', 'terrible', 'awful', 'hate', 'worst', 'horrible', 'poor', 'disappointing', 'frustrated', 'angry', 'broken', 'failed', 'useless', 'pathetic', 'disgusting', 'annoying', 'waste', 'regret', 'problem', 'issue', 'not working', 'doesn\'t work', 'never', 'delayed', 'late', 'slow', 'error', 'खराब', 'समस्या', 'देर', 'नहीं', 'बुरा'] # Strong negative phrases (count as 2 points) negative_phrases = ['too late', 'never comes', 'not received', 'doesn\'t arrive', 'delayed', 'not working', 'बहुत देर', 'नहीं आता'] pos_count = sum(1 for word in positive_words if word in review_lower) neg_count = sum(1 for word in negative_words if word in review_lower) # Check for negative phrases (stronger signal) for phrase in negative_phrases: if phrase in review_lower: neg_count += 2 if pos_count > neg_count: return 'Positive' elif neg_count > pos_count: return 'Negative' else: return 'Neutral' def _extract_simple_aspects(self, review: str) -> List[str]: """Extract aspects using enhanced keyword matching.""" review_lower = review.lower() aspects = [] # Enhanced aspect keywords with Hindi/English variants aspect_keywords = { 'OTP/Verification': ['otp', 'atp', 'verification', 'verify', 'code', 'pin', 'authentication', 'ओटीपी', 'कोड', 'सत्यापन'], 'Login/Account': ['login', 'sign in', 'signin', 'account', 'password', 'username', 'register', 'signup', 'लॉगिन', 'खाता'], 'App Performance': ['app', 'application', 'crash', 'freeze', 'hang', 'loading', 'lag', 'slow', 'एप', 'एप्लिकेशन'], 'Payment': ['payment', 'pay', 'transaction', 'refund', 'money', 'bank', 'upi', 'wallet', 'भुगतान', 'पैसा'], 'Quality': ['quality', 'build', 'material', 'construction', 'durability', 'solid', 'sturdy', 'cheap', 'flimsy', 'गुणवत्ता'], 'Price': ['price', 'cost', 'expensive', 'cheap', 'value', 'money', 'affordable', 'budget', 'worth', 'कीमत', 'दाम'], 'Service': ['service', 'support', 'help', 'staff', 'customer', 'response', 'team', 'representative', 'सेवा', 'सहायता'], 'Delivery': ['delivery', 'shipping', 'fast', 'quick', 'slow', 'delayed', 'arrive', 'package', 'डिलीवरी', 'शिपिंग'], 'Design': ['design', 'look', 'appearance', 'beautiful', 'ugly', 'style', 'color', 'aesthetic', 'डिज़ाइन', 'रूप'], 'Performance': ['performance', 'speed', 'fast', 'slow', 'efficiency', 'works', 'function', 'smooth', 'प्रदर्शन'], 'Usability': ['easy', 'difficult', 'user', 'interface', 'intuitive', 'complex', 'simple', 'confusing', 'उपयोग'], 'Features': ['feature', 'function', 'capability', 'option', 'setting', 'mode', 'tool', 'फीचर', 'सुविधा'], 'Size': ['size', 'big', 'small', 'large', 'compact', 'tiny', 'huge', 'dimension', 'आकार'], 'Battery': ['battery', 'charge', 'power', 'energy', 'last', 'drain', 'life', 'बैटरी', 'चार्ज'] } for aspect, keywords in aspect_keywords.items(): if any(keyword in review_lower for keyword in keywords): aspects.append(aspect) # Default aspect if none found if not aspects: aspects = ['General'] return aspects class IntentClassifier: """Enhanced intent classifier with severity scoring for complaints and type classification.""" INTENT_KEYWORDS = { 'complaint': { 'high_severity': ['terrible', 'worst', 'horrible', 'awful', 'hate', 'useless', 'trash', 'garbage', 'waste', 'pathetic'], 'medium_severity': ['bad', 'disappointed', 'frustrated', 'annoying', 'poor', 'defective', 'broken', 'failed'], 'low_severity': ['problem', 'issue', 'concern', 'slow', 'okay but', 'could be better'] }, 'praise': { 'high_positive': ['excellent', 'amazing', 'fantastic', 'wonderful', 'perfect', 'outstanding', 'brilliant', 'superb'], 'medium_positive': ['good', 'great', 'love', 'best', 'awesome', 'nice', 'satisfied', 'happy'], 'low_positive': ['fine', 'decent', 'acceptable', 'adequate', 'reasonable'] }, 'question': ['how', 'what', 'when', 'where', 'why', 'which', 'who', '?', 'can you', 'could you', 'is it possible'], 'suggestion': ['should', 'could', 'would', 'recommend', 'suggest', 'improve', 'better', 'enhancement', 'feature request'], 'comparison': ['better than', 'worse than', 'compared to', 'versus', 'vs', 'similar to', 'different from'], 'neutral': ['okay', 'fine', 'average', 'normal', 'standard', 'typical', 'nothing special'] } @classmethod def classify_intent_with_severity(cls, review: str) -> Tuple[str, str, float]: """ Classify intent with severity/type scoring. Args: review: Review text Returns: Tuple of (intent, severity/type, confidence_score) """ review_lower = review.lower() # Check for complaints with severity complaint_scores = {} for severity, keywords in cls.INTENT_KEYWORDS['complaint'].items(): score = sum(1 for keyword in keywords if keyword in review_lower) if score > 0: complaint_scores[severity] = score if complaint_scores: severity = max(complaint_scores, key=complaint_scores.get) confidence = min(complaint_scores[severity] / 3.0, 1.0) # Normalize confidence return 'complaint', severity, confidence # Check for praise with positivity level praise_scores = {} for positivity, keywords in cls.INTENT_KEYWORDS['praise'].items(): score = sum(1 for keyword in keywords if keyword in review_lower) if score > 0: praise_scores[positivity] = score if praise_scores: positivity = max(praise_scores, key=praise_scores.get) confidence = min(praise_scores[positivity] / 3.0, 1.0) return 'praise', positivity, confidence # Check other intents for intent, keywords in cls.INTENT_KEYWORDS.items(): if intent not in ['complaint', 'praise']: score = sum(1 for keyword in keywords if keyword in review_lower) if score > 0: confidence = min(score / 2.0, 1.0) return intent, 'standard', confidence return 'neutral', 'standard', 0.1 @classmethod def classify_batch_enhanced(cls, reviews: List[str]) -> List[Dict[str, Any]]: """Classify intents with enhanced information for a batch of reviews.""" results = [] for review in reviews: intent, severity_type, confidence = cls.classify_intent_with_severity(review) results.append({ 'intent': intent, 'severity_type': severity_type, 'confidence': confidence }) return results class AspectAnalytics: """Advanced analytics for aspect analysis including priority scoring and co-occurrence.""" @staticmethod def calculate_aspect_scores(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Calculate priority scores for negative aspects and strength scores for positive aspects. Args: df: Processed dataframe with aspects and sentiments Returns: Tuple of (areas_of_improvement_df, strength_anchors_df) """ aspect_data = [] # Extract aspect-sentiment pairs with intent information for idx, row in df.iterrows(): aspects = row['aspects'] if isinstance(row['aspects'], list) else [] sentiments = row['aspect_sentiments'] if isinstance(row['aspect_sentiments'], list) else [] intent = row['intent'] intent_severity = row.get('intent_severity', 'standard') date = row['date'] for aspect, sentiment in zip(aspects, sentiments): aspect_data.append({ 'aspect': aspect, 'sentiment': sentiment, 'intent': intent, 'intent_severity': intent_severity, 'date': date, 'review_id': row['id'] }) if not aspect_data: empty_df = pd.DataFrame(columns=['aspect', 'score', 'frequency', 'sentiment_ratio']) return empty_df, empty_df aspect_df = pd.DataFrame(aspect_data) # Calculate metrics for each aspect aspect_metrics = {} for aspect in aspect_df['aspect'].unique(): aspect_subset = aspect_df[aspect_df['aspect'] == aspect] total_count = len(aspect_subset) positive_count = len(aspect_subset[aspect_subset['sentiment'] == 'Positive']) negative_count = len(aspect_subset[aspect_subset['sentiment'] == 'Negative']) # Calculate sentiment ratios positivity_ratio = positive_count / total_count if total_count > 0 else 0 negativity_ratio = negative_count / total_count if total_count > 0 else 0 # Calculate intent severity weighting complaint_subset = aspect_subset[aspect_subset['intent'] == 'complaint'] severity_weight = 0 if len(complaint_subset) > 0: severity_mapping = {'high_severity': 3, 'medium_severity': 2, 'low_severity': 1, 'standard': 1} severity_weight = complaint_subset['intent_severity'].map(severity_mapping).mean() # Calculate priority score for negative aspects priority_score = negativity_ratio * total_count * (1 + severity_weight) # Calculate strength score for positive aspects strength_score = positivity_ratio * total_count * (1 + (positivity_ratio * 2)) aspect_metrics[aspect] = { 'frequency': total_count, 'positivity_ratio': positivity_ratio, 'negativity_ratio': negativity_ratio, 'priority_score': priority_score, 'strength_score': strength_score, 'intent_severity': severity_weight } # Create Areas of Improvement DataFrame (top negative aspects) improvement_data = [] for aspect, metrics in aspect_metrics.items(): if metrics['negativity_ratio'] > 0.1: # Only include aspects with >10% negativity improvement_data.append({ 'aspect': aspect, 'negativity_pct': round(metrics['negativity_ratio'] * 100, 1), 'intent_severity': round(metrics['intent_severity'], 2), 'frequency': metrics['frequency'], 'priority_score': round(metrics['priority_score'], 2) }) # Handle empty DataFrame case if improvement_data: areas_of_improvement = pd.DataFrame(improvement_data).sort_values('priority_score', ascending=False) else: # Return empty DataFrame with correct columns areas_of_improvement = pd.DataFrame(columns=['aspect', 'negativity_pct', 'intent_severity', 'frequency', 'priority_score']) # Create Strength Anchors DataFrame (top positive aspects) strength_data = [] for aspect, metrics in aspect_metrics.items(): if metrics['positivity_ratio'] > 0.3: # Only include aspects with >30% positivity strength_data.append({ 'aspect': aspect, 'positivity_pct': round(metrics['positivity_ratio'] * 100, 1), 'intent_type': 'praise', # Simplified for now 'frequency': metrics['frequency'], 'strength_score': round(metrics['strength_score'], 2) }) # Handle empty DataFrame case if strength_data: strength_anchors = pd.DataFrame(strength_data).sort_values('strength_score', ascending=False) else: # Return empty DataFrame with correct columns strength_anchors = pd.DataFrame(columns=['aspect', 'positivity_pct', 'intent_type', 'frequency', 'strength_score']) return areas_of_improvement, strength_anchors @staticmethod def calculate_aspect_cooccurrence(df: pd.DataFrame) -> nx.Graph: """ Calculate aspect co-occurrence for network analysis. Args: df: Processed dataframe with aspects Returns: NetworkX graph with aspect co-occurrence data """ G = nx.Graph() # Calculate co-occurrence matrix cooccurrence_counts = defaultdict(int) aspect_sentiments = defaultdict(list) aspect_frequencies = defaultdict(int) for idx, row in df.iterrows(): aspects = row['aspects'] if isinstance(row['aspects'], list) else [] sentiments = row['aspect_sentiments'] if isinstance(row['aspect_sentiments'], list) else [] # Count individual aspects for aspect, sentiment in zip(aspects, sentiments): aspect_frequencies[aspect] += 1 aspect_sentiments[aspect].append(sentiment) # Count co-occurrences for aspect1, aspect2 in combinations(aspects, 2): pair = tuple(sorted([aspect1, aspect2])) cooccurrence_counts[pair] += 1 # Add nodes with attributes for aspect, frequency in aspect_frequencies.items(): sentiments = aspect_sentiments[aspect] positive_pct = sentiments.count('Positive') / len(sentiments) if sentiments else 0 negative_pct = sentiments.count('Negative') / len(sentiments) if sentiments else 0 # Determine overall sentiment color if positive_pct > negative_pct: color = 'green' sentiment_score = positive_pct elif negative_pct > positive_pct: color = 'red' sentiment_score = -negative_pct else: color = 'gray' sentiment_score = 0 G.add_node(aspect, frequency=frequency, sentiment_score=sentiment_score, color=color, positive_pct=positive_pct, negative_pct=negative_pct) # Add edges with weights for (aspect1, aspect2), count in cooccurrence_counts.items(): if count >= 2: # Only include co-occurrences that happen at least twice G.add_edge(aspect1, aspect2, weight=count) return G @staticmethod def detect_sentiment_spikes(df: pd.DataFrame, window_days: int = 7) -> List[Dict[str, Any]]: """ Detect week-over-week spikes in negative sentiment for aspects. Args: df: Processed dataframe with date and aspect information window_days: Number of days for the rolling window Returns: List of alerts for aspects with significant negative spikes """ alerts = [] if len(df) < 2: return alerts # Extract aspect-sentiment data with dates aspect_data = [] for idx, row in df.iterrows(): aspects = row['aspects'] if isinstance(row['aspects'], list) else [] sentiments = row['aspect_sentiments'] if isinstance(row['aspect_sentiments'], list) else [] date = row['date'] for aspect, sentiment in zip(aspects, sentiments): aspect_data.append({ 'aspect': aspect, 'sentiment': sentiment, 'date': date }) if not aspect_data: return alerts aspect_df = pd.DataFrame(aspect_data) aspect_df['date'] = pd.to_datetime(aspect_df['date']) # Group by aspect and analyze trends for aspect in aspect_df['aspect'].unique(): aspect_subset = aspect_df[aspect_df['aspect'] == aspect] if len(aspect_subset) < 4: # Need minimum data points continue # Create daily negative sentiment counts daily_negative = aspect_subset[aspect_subset['sentiment'] == 'Negative'].groupby( aspect_subset['date'].dt.date ).size().reindex( pd.date_range(aspect_subset['date'].min().date(), aspect_subset['date'].max().date()).date, fill_value=0 ) if len(daily_negative) >= window_days * 2: # Calculate rolling averages recent_avg = daily_negative.tail(window_days).mean() previous_avg = daily_negative.iloc[-(window_days*2):-window_days].mean() # Check for spike (>50% increase and at least 2 more complaints) if recent_avg > previous_avg * 1.5 and (recent_avg - previous_avg) >= 2: spike_magnitude = ((recent_avg - previous_avg) / previous_avg * 100) if previous_avg > 0 else 100 alerts.append({ 'aspect': aspect, 'spike_magnitude': round(spike_magnitude, 1), 'recent_avg_negative': round(recent_avg, 1), 'previous_avg_negative': round(previous_avg, 1), 'alert_severity': 'high' if spike_magnitude > 100 else 'medium' }) return sorted(alerts, key=lambda x: x['spike_magnitude'], reverse=True) class SummaryGenerator: """Generates macro and micro-level summaries for aspects and overall sentiment.""" @staticmethod def generate_macro_summary(df: pd.DataFrame, areas_of_improvement: pd.DataFrame, strength_anchors: pd.DataFrame) -> Dict[str, str]: """ Generate high-level summary of sentiment analysis. Args: df: Processed dataframe areas_of_improvement: Problem areas dataframe strength_anchors: Strength areas dataframe Returns: Dictionary with macro-level insights """ total_reviews = len(df) positive_pct = (df['overall_sentiment'] == 'Positive').mean() * 100 negative_pct = (df['overall_sentiment'] == 'Negative').mean() * 100 # Top issues and strengths top_issue = areas_of_improvement.iloc[0]['aspect'] if len(areas_of_improvement) > 0 else "None identified" top_strength = strength_anchors.iloc[0]['aspect'] if len(strength_anchors) > 0 else "None identified" # Intent distribution complaint_pct = (df['intent'] == 'complaint').mean() * 100 summary = { 'overall_sentiment': f"Out of {total_reviews} reviews, {positive_pct:.1f}% are positive and {negative_pct:.1f}% are negative.", 'top_issues': f"Primary concern: '{top_issue}' - requires immediate attention based on complaint frequency and severity.", 'top_strengths': f"Greatest strength: '{top_strength}' - leverage this positive aspect in marketing and product positioning.", 'intent_insights': f"{complaint_pct:.1f}% of reviews contain complaints, indicating specific areas for product improvement.", 'recommendation': "Focus on addressing top negative aspects while maintaining and promoting strengths." } return summary @staticmethod def generate_aspect_micro_summaries(df: pd.DataFrame, top_aspects: List[str], max_aspects: int = 5) -> Dict[str, str]: """ Generate detailed summaries for specific aspects. Args: df: Processed dataframe top_aspects: List of aspects to analyze max_aspects: Maximum number of aspects to summarize Returns: Dictionary with aspect-specific insights """ summaries = {} for aspect in top_aspects[:max_aspects]: # Filter reviews mentioning this aspect aspect_reviews = [] aspect_sentiments = [] for idx, row in df.iterrows(): aspects = row['aspects'] if isinstance(row['aspects'], list) else [] sentiments = row['aspect_sentiments'] if isinstance(row['aspect_sentiments'], list) else [] if aspect in aspects: aspect_idx = aspects.index(aspect) if aspect_idx < len(sentiments): aspect_reviews.append(row['translated_review']) aspect_sentiments.append(sentiments[aspect_idx]) if not aspect_reviews: continue positive_count = aspect_sentiments.count('Positive') negative_count = aspect_sentiments.count('Negative') total_count = len(aspect_sentiments) # Generate contextual summary if negative_count > positive_count: sentiment_trend = "predominantly negative" key_issues = "Issues include quality concerns, performance problems, and user dissatisfaction." elif positive_count > negative_count: sentiment_trend = "predominantly positive" key_issues = "Users appreciate the quality, performance, and overall experience." else: sentiment_trend = "mixed" key_issues = "Reviews show both satisfaction and areas for improvement." summaries[aspect] = f"'{aspect}' mentioned in {total_count} reviews with {sentiment_trend} sentiment ({positive_count} positive, {negative_count} negative). {key_issues}" return summaries class DataProcessor: """Enhanced main data processing pipeline coordinator.""" def __init__(self): self.translator = TranslationService() self.absa_processor = ABSAProcessor() self.validator = DataValidator() self.analytics = AspectAnalytics() self.summary_generator = SummaryGenerator() self.task_manager = None # Will be set by API def set_task_manager(self, task_manager): """Set task manager for cancellation support.""" self.task_manager = task_manager # Also set it for ABSA processor self.absa_processor.set_task_manager(task_manager) def process_uploaded_data(self, df: pd.DataFrame, task_id: Optional[str] = None) -> Dict[str, Any]: """ Complete enhanced processing pipeline for uploaded data with cancellation support. Args: df: Uploaded dataframe task_id: Optional task ID for cancellation tracking Returns: Dictionary containing all processed results with advanced analytics """ # Import gc for cleanup import gc # Check cancellation at start if task_id and self.task_manager and self.task_manager.is_cancelled(task_id): return {'status': 'cancelled', 'message': 'Task cancelled by user'} # Validate data if task_id and self.task_manager: self.task_manager.update_task(task_id, stage='validation', progress=5, status='processing') is_valid, errors = self.validator.validate_csv(df) if not is_valid: return {'error': errors} # Clean data df_clean = self.validator.clean_data(df) reviews = df_clean['review'].tolist() # Check cancellation before translation if task_id and self.task_manager and self.task_manager.is_cancelled(task_id): del df_clean, reviews gc.collect() return {'status': 'cancelled', 'message': 'Task cancelled during validation'} # Translation with batch processing and cancellation checks if task_id and self.task_manager: self.task_manager.update_task(task_id, stage='translation', progress=10) with st.spinner("Translating reviews..."): translated_reviews = [] detected_languages = [] # Process in batches to allow cancellation batch_size = 10 for i in range(0, len(reviews), batch_size): # Check cancellation if task_id and self.task_manager and self.task_manager.is_cancelled(task_id): del df_clean, reviews, translated_reviews, detected_languages gc.collect() return {'status': 'cancelled', 'message': 'Task cancelled during translation'} batch = reviews[i:i+batch_size] batch_translated, batch_langs = self.translator.process_reviews(batch) translated_reviews.extend(batch_translated) detected_languages.extend(batch_langs) # Update progress (10-40%) if task_id and self.task_manager: progress = 10 + int((i / len(reviews)) * 30) self.task_manager.update_task(task_id, progress=progress) # Check cancellation before intent classification if task_id and self.task_manager and self.task_manager.is_cancelled(task_id): del df_clean, reviews, translated_reviews, detected_languages gc.collect() return {'status': 'cancelled', 'message': 'Task cancelled after translation'} # Enhanced intent classification if task_id and self.task_manager: self.task_manager.update_task(task_id, stage='intent_classification', progress=40) with st.spinner("Classifying intents with severity analysis..."): intent_results = IntentClassifier.classify_batch_enhanced(translated_reviews) # Check cancellation before ABSA (most expensive step) if task_id and self.task_manager and self.task_manager.is_cancelled(task_id): del df_clean, reviews, translated_reviews, detected_languages, intent_results gc.collect() return {'status': 'cancelled', 'message': 'Task cancelled after intent classification'} # ABSA processing with cancellation support if task_id and self.task_manager: self.task_manager.update_task(task_id, stage='aspect_extraction', progress=50) with st.spinner("Extracting aspects and sentiments..."): absa_results = self.absa_processor.extract_aspects_and_sentiments( translated_reviews, task_id=task_id ) # Check if ABSA was cancelled if isinstance(absa_results, dict) and absa_results.get('status') == 'cancelled': del df_clean, reviews, translated_reviews, detected_languages, intent_results gc.collect() return absa_results # Combine results with enhanced structure if task_id and self.task_manager: self.task_manager.update_task(task_id, stage='combining_results', progress=90) df_processed = df_clean.copy() df_processed['translated_review'] = translated_reviews df_processed['detected_language'] = detected_languages df_processed['intent'] = [r['intent'] for r in intent_results] df_processed['intent_severity'] = [r['severity_type'] for r in intent_results] df_processed['intent_confidence'] = [r['confidence'] for r in intent_results] # Add ABSA results with enhanced structure aspects_list = [] aspect_sentiments_list = [] overall_sentiment = [] for result in absa_results: aspects_list.append(result['aspects']) aspect_sentiments_list.append(result['sentiments']) # Calculate overall sentiment if result['sentiments']: positive_count = result['sentiments'].count('Positive') negative_count = result['sentiments'].count('Negative') if positive_count > negative_count: overall_sentiment.append('Positive') elif negative_count > positive_count: overall_sentiment.append('Negative') else: overall_sentiment.append('Neutral') else: overall_sentiment.append('Neutral') df_processed['aspects'] = aspects_list df_processed['aspect_sentiments'] = aspect_sentiments_list df_processed['overall_sentiment'] = overall_sentiment # Advanced analytics if task_id and self.task_manager: self.task_manager.update_task(task_id, stage='analytics', progress=95) with st.spinner("Calculating aspect analytics and priority scores..."): areas_of_improvement, strength_anchors = self.analytics.calculate_aspect_scores(df_processed) aspect_network = self.analytics.calculate_aspect_cooccurrence(df_processed) sentiment_alerts = self.analytics.detect_sentiment_spikes(df_processed) # Generate summaries with st.spinner("Generating AI-powered summaries..."): macro_summary = self.summary_generator.generate_macro_summary( df_processed, areas_of_improvement, strength_anchors ) # Get top aspects for micro summaries top_negative_aspects = areas_of_improvement['aspect'].head(3).tolist() if len(areas_of_improvement) > 0 else [] top_positive_aspects = strength_anchors['aspect'].head(3).tolist() if len(strength_anchors) > 0 else [] top_aspects = top_negative_aspects + top_positive_aspects micro_summaries = self.summary_generator.generate_aspect_micro_summaries( df_processed, top_aspects ) # Mark task as complete if task_id and self.task_manager: self.task_manager.update_task(task_id, stage='completed', progress=100) # ========== NEW: ASPECT-LEVEL DATA TRANSFORMATION ========== aspect_level_data = [] mixed_sentiment_reviews = [] for idx, row in df_processed.iterrows(): aspects = row['aspects'] if isinstance(row['aspects'], list) else [] aspect_sentiments = row['aspect_sentiments'] if isinstance(row['aspect_sentiments'], list) else [] # Check for mixed sentiments (conflicting aspect sentiments) unique_sentiments = set(aspect_sentiments) is_mixed = ('Positive' in unique_sentiments and 'Negative' in unique_sentiments) if is_mixed: mixed_sentiment_reviews.append({ 'review_id': row['id'], 'review': row['review'], 'aspects': aspects, 'aspect_sentiments': aspect_sentiments, 'intent': row['intent'], 'date': row['date'] }) # Create aspect-level records for aspect, sentiment in zip(aspects, aspect_sentiments): aspect_level_data.append({ 'review_id': row['id'], 'review': row['review'], 'aspect': aspect, 'aspect_sentiment': sentiment, 'overall_sentiment': row['overall_sentiment'], 'intent': row['intent'], 'intent_severity': row['intent_severity'], 'date': row['date'], 'language': row['detected_language'] }) aspect_level_df = pd.DataFrame(aspect_level_data) if aspect_level_data else pd.DataFrame() mixed_sentiment_df = pd.DataFrame(mixed_sentiment_reviews) if mixed_sentiment_reviews else pd.DataFrame() return { 'processed_data': df_processed, 'aspect_level_data': aspect_level_df, # NEW: Aspect-level granular data 'mixed_sentiment_reviews': mixed_sentiment_df, # NEW: Mixed sentiment detection 'absa_details': absa_results, 'areas_of_improvement': areas_of_improvement, 'strength_anchors': strength_anchors, 'aspect_network': aspect_network, 'sentiment_alerts': sentiment_alerts, 'macro_summary': macro_summary, 'micro_summaries': micro_summaries, 'summary': { 'total_reviews': len(df_processed), 'total_aspects': len(aspect_level_df), 'mixed_sentiment_count': len(mixed_sentiment_df), 'mixed_sentiment_pct': round(len(mixed_sentiment_df) / len(df_processed) * 100, 1) if len(df_processed) > 0 else 0, 'languages_detected': list(set(detected_languages)), 'intents_distribution': pd.Series([r['intent'] for r in intent_results]).value_counts().to_dict(), 'sentiment_distribution': pd.Series(overall_sentiment).value_counts().to_dict(), 'top_problem_areas': len(areas_of_improvement), 'top_strength_anchors': len(strength_anchors), 'active_alerts': len(sentiment_alerts) } }