ABSA / src /utils /data_processor.py
parthnuwal7's picture
Fix PyABSA v2.4.2 API
2e2ec5c
"""
Enhanced data processing pipeline for advanced sentiment analysis application.
Handles translation, ABSA, intent classification, priority scoring, and co-occurrence analysis.
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Any, Optional
import re
from datetime import datetime, timedelta
import logging
from langdetect import detect
import streamlit as st
from collections import Counter, defaultdict
from itertools import combinations
import networkx as nx
import requests
import os
import time
# Suppress GitPython warnings for PyABSA
os.environ['GIT_PYTHON_REFRESH'] = 'quiet'
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DataValidator:
"""Validates uploaded CSV data format and content."""
REQUIRED_COLUMNS = ['id', 'reviews_title', 'review', 'date', 'user_id']
@staticmethod
def validate_csv(df: pd.DataFrame) -> Tuple[bool, List[str]]:
"""
Validate CSV format and content.
Args:
df: Uploaded dataframe
Returns:
Tuple of (is_valid, error_messages)
"""
errors = []
# Check required columns
missing_cols = set(DataValidator.REQUIRED_COLUMNS) - set(df.columns)
if missing_cols:
errors.append(f"Missing required columns: {missing_cols}")
if not errors:
# Check for empty values
if df['review'].isnull().any() or (df['review'] == '').any():
errors.append("Found empty review entries")
# Validate date format
try:
pd.to_datetime(df['date'], errors='coerce')
if df['date'].isnull().any():
errors.append("Invalid date format detected")
except Exception as e:
errors.append(f"Date validation error: {str(e)}")
return len(errors) == 0, errors
@staticmethod
def clean_data(df: pd.DataFrame) -> pd.DataFrame:
"""Clean and preprocess the data."""
df = df.copy()
# Convert date column
df['date'] = pd.to_datetime(df['date'], errors='coerce')
# Clean text data
df['review'] = df['review'].astype(str).str.strip()
df['reviews_title'] = df['reviews_title'].astype(str).str.strip()
# Remove rows with null reviews
df = df.dropna(subset=['review'])
# Remove duplicate reviews
df = df.drop_duplicates(subset=['review'], keep='first')
return df.reset_index(drop=True)
class TranslationService:
"""Handles translation using HF Inference API - much more reliable than local models."""
def __init__(self):
self.api_token = self._get_hf_token()
# Using AI4Bharat IndicTrans2 - specialized for Indian languages
self.translation_model = "ai4bharat/indictrans2-en-indic-1.3B"
self.base_url = "https://router.huggingface.co"
logger.info("Initialized HF Inference API for translation (IndicTrans2)")
def _get_hf_token(self) -> Optional[str]:
"""Get HF token from environment or Streamlit secrets."""
try:
return st.secrets["HF_TOKEN"]
except:
pass
token = os.getenv("HF_TOKEN")
if not token:
logger.warning("No HF_TOKEN found. Translation will be limited.")
else:
# Log first 10 chars to verify token is loaded (without exposing full token)
logger.info(f"HF_TOKEN loaded successfully (starts with: {token[:10]}...)")
return token
def _call_hf_translation_api(self, text: str, source_lang: str = "hi", target_lang: str = "en") -> str:
"""Call HF Translation API with fallback."""
if not self.api_token:
logger.debug("No API token, skipping translation")
return text
try:
headers = {"Authorization": f"Bearer {self.api_token}"}
# Try using serverless inference endpoint
url = f"https://api-inference.huggingface.co/models/{self.translation_model}"
# IndicTrans2 requires simple input format
payload = {"inputs": text}
response = requests.post(url, headers=headers, json=payload, timeout=10)
if response.status_code == 200:
result = response.json()
# IndicTrans2 returns: [{"translation_text": "..."}] or {"generated_text": "..."}
if isinstance(result, list) and len(result) > 0:
translated = result[0].get("translation_text", "") or result[0].get("generated_text", "")
if translated:
return translated
elif isinstance(result, dict):
translated = result.get("generated_text", "") or result.get("translation_text", "")
if translated:
return translated
# Silently fallback to original text (translation is optional)
logger.debug(f"Translation unavailable, using original text (status: {response.status_code})")
return text
except Exception as e:
logger.debug(f"Translation skipped: {str(e)}")
return text
def detect_language(self, text: str) -> str:
"""Detect language of the text."""
try:
lang = detect(text)
return lang
except:
return 'unknown'
def translate_to_english(self, text: str, source_lang: str = 'hi') -> str:
"""
Translate text to English using HF API.
Args:
text: Text to translate
source_lang: Source language code
Returns:
Translated text
"""
if source_lang == 'en' or source_lang == 'unknown':
return text
return self._call_hf_translation_api(text, source_lang, "en")
def process_reviews(self, reviews: List[str]) -> Tuple[List[str], List[str]]:
"""
Process list of reviews for translation.
Args:
reviews: List of review texts
Returns:
Tuple of (translated_reviews, detected_languages)
"""
translated_reviews = []
detected_languages = []
for i, review in enumerate(reviews):
if i % 20 == 0: # Progress logging
logger.info(f"Processing translation {i+1}/{len(reviews)}")
lang = self.detect_language(review)
detected_languages.append(lang)
if lang == 'hi': # Hindi detected
translated = self.translate_to_english(review, 'hi')
translated_reviews.append(translated)
else:
translated_reviews.append(review) # Keep original if not Hindi
return translated_reviews, detected_languages
class ABSAProcessor:
"""Enhanced ABSA using PyABSA for accurate aspect extraction and sentiment analysis."""
def __init__(self):
self.model = None
self.task_manager = None
self._load_pyabsa_model()
logger.info("Initialized PyABSA for ABSA processing")
def _load_pyabsa_model(self):
"""Load PyABSA multilingual model with caching."""
try:
# Suppress additional git warnings
import warnings
warnings.filterwarnings('ignore', category=DeprecationWarning)
# PyABSA v2.4.2 correct API - use AspectTermExtraction module
from pyabsa import AspectTermExtraction as ATEPC
# Check if git is available
try:
import subprocess
git_check = subprocess.run(['git', '--version'], capture_output=True, timeout=5)
if git_check.returncode == 0:
logger.info(f"Git available: {git_check.stdout.decode().strip()}")
else:
logger.warning("Git command failed, PyABSA may have issues downloading checkpoints")
except Exception as git_err:
logger.warning(f"Git not found or not executable: {str(git_err)}")
# Load aspect extractor using correct PyABSA v2.4.2 API
logger.info("Loading PyABSA ATEPC multilingual model...")
logger.info("This may take a few minutes on first run (downloading checkpoint)...")
# Use AspectExtractor class - the correct way in v2.4.2
self.model = ATEPC.AspectExtractor('multilingual')
logger.info("✅ PyABSA model loaded successfully")
except Exception as e:
import traceback
logger.error(f"❌ Failed to load PyABSA model: {str(e)}")
logger.error(f"Traceback: {traceback.format_exc()}")
logger.warning("⚠️ Using fallback method for aspect extraction")
self.model = None
def set_task_manager(self, task_manager):
"""Set task manager for cancellation support."""
self.task_manager = task_manager
def _get_hf_token(self) -> Optional[str]:
"""Get HF token from environment or Streamlit secrets."""
# Try Streamlit secrets first
try:
return st.secrets["HF_TOKEN"]
except:
pass
# Try environment variable
token = os.getenv("HF_TOKEN")
if not token:
logger.warning("No HF_TOKEN found. Some features may be limited.")
return token
def _call_hf_api(self, model_name: str, inputs: str, max_retries: int = 3) -> Dict:
"""Call HF Inference API with retry logic."""
headers = {}
if self.api_token:
headers["Authorization"] = f"Bearer {self.api_token}"
url = f"{self.base_url}/{model_name}"
payload = {"inputs": inputs}
for attempt in range(max_retries):
try:
response = requests.post(url, headers=headers, json=payload, timeout=30)
if response.status_code == 503:
# Model is loading, wait and retry
wait_time = 2 ** attempt # Exponential backoff
logger.info(f"Model loading, waiting {wait_time}s before retry...")
time.sleep(wait_time)
continue
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"API call failed (attempt {attempt + 1}): {str(e)}")
if attempt == max_retries - 1:
return {"error": str(e)}
time.sleep(1)
return {"error": "Max retries exceeded"}
def extract_aspects_and_sentiments(self, reviews: List[str], task_id: Optional[str] = None) -> List[Dict[str, Any]]:
"""
Extract aspects and sentiments using PyABSA with fallback and cancellation support.
Args:
reviews: List of review texts (preferably in English after translation)
task_id: Optional task ID for cancellation tracking
Returns:
List of dictionaries containing extracted information or cancellation dict
"""
import gc
logger.info(f"Processing {len(reviews)} reviews with PyABSA")
processed_results = []
batch_size = 5 # Process 5 reviews at a time for responsive cancellation
for batch_start in range(0, len(reviews), batch_size):
# Check cancellation before each batch
if task_id and self.task_manager and self.task_manager.is_cancelled(task_id):
logger.info(f"ABSA processing cancelled at review {batch_start}/{len(reviews)}")
del processed_results
gc.collect()
return {'status': 'cancelled', 'message': 'ABSA processing cancelled by user'}
batch_end = min(batch_start + batch_size, len(reviews))
batch_reviews = reviews[batch_start:batch_end]
# Process batch
for i, review in enumerate(batch_reviews):
review_idx = batch_start + i
if review_idx % 10 == 0: # Progress logging
logger.info(f"Processing review {review_idx+1}/{len(reviews)}")
# Try PyABSA first, fallback to rule-based if unavailable
if self.model is not None:
try:
result = self._extract_with_pyabsa(review)
except Exception as e:
logger.warning(f"PyABSA failed for review {review_idx}: {str(e)}, using fallback")
result = self._extract_with_fallback(review)
else:
result = self._extract_with_fallback(review)
processed_results.append(result)
# Update progress after each batch (50-90% range)
if task_id and self.task_manager:
progress = 50 + int((batch_end / len(reviews)) * 40)
self.task_manager.update_task(task_id, progress=progress)
logger.info(f"Successfully processed {len(processed_results)} reviews")
return processed_results
def _extract_with_pyabsa(self, review: str) -> Dict[str, Any]:
"""Extract aspects and sentiments using PyABSA model."""
result = self.model.predict(review, print_result=False, save_result=False)
# PyABSA returns: aspect, sentiment, confidence, position
aspects = result.get('aspect', [])
sentiments = result.get('sentiment', [])
positions = result.get('position', [])
confidence_scores = result.get('confidence', [])
# Handle single aspect case
if not isinstance(aspects, list):
aspects = [aspects] if aspects else []
sentiments = [sentiments] if sentiments else []
positions = [positions] if positions else []
confidence_scores = [confidence_scores] if confidence_scores else []
# If no aspects found, use fallback
if not aspects:
return self._extract_with_fallback(review)
return {
'sentence': review,
'aspects': aspects,
'sentiments': sentiments,
'positions': positions,
'confidence_scores': confidence_scores,
'tokens': review.split(),
'iob_tags': ['O'] * len(review.split())
}
def _extract_with_fallback(self, review: str) -> Dict[str, Any]:
"""Fallback rule-based extraction when PyABSA is unavailable."""
sentiment = self._get_rule_based_sentiment(review)
aspects = self._extract_simple_aspects(review)
return {
'sentence': review,
'aspects': aspects,
'sentiments': [sentiment] * len(aspects),
'positions': [[0, len(review)]] * len(aspects),
'confidence_scores': [0.7] * len(aspects), # Lower confidence for rule-based
'tokens': review.split(),
'iob_tags': ['O'] * len(review.split())
}
def _get_hf_sentiment(self, text: str) -> str:
"""Get sentiment from HF Inference API with fallback."""
if not self.api_token:
# Fallback to rule-based if no API token
return self._get_rule_based_sentiment(text)
try:
result = self._call_hf_api(self.sentiment_model, text)
if "error" in result:
logger.warning(f"API error, using rule-based fallback: {result['error']}")
return self._get_rule_based_sentiment(text)
# Parse HF sentiment result
if isinstance(result, list) and len(result) > 0:
predictions = result[0]
if isinstance(predictions, list) and len(predictions) > 0:
top_prediction = max(predictions, key=lambda x: x.get('score', 0))
label = top_prediction.get('label', 'NEUTRAL').upper()
# Map HF labels to our format
if 'POSITIVE' in label or 'POS' in label:
return 'Positive'
elif 'NEGATIVE' in label or 'NEG' in label:
return 'Negative'
else:
return 'Neutral'
# Fallback if parsing fails
return self._get_rule_based_sentiment(text)
except Exception as e:
logger.error(f"HF API error: {str(e)}, using rule-based fallback")
return self._get_rule_based_sentiment(text)
def _get_rule_based_sentiment(self, review: str) -> str:
"""Fallback rule-based sentiment analysis with enhanced negative detection."""
review_lower = review.lower()
# Enhanced sentiment words
positive_words = ['good', 'great', 'excellent', 'amazing', 'love', 'best', 'awesome',
'fantastic', 'wonderful', 'perfect', 'satisfied', 'happy', 'pleased',
'outstanding', 'brilliant', 'superb', 'delighted', 'impressed', 'working',
'अच्छा', 'बढ़िया', 'शानदार', 'बेहतरीन']
negative_words = ['bad', 'terrible', 'awful', 'hate', 'worst', 'horrible', 'poor',
'disappointing', 'frustrated', 'angry', 'broken', 'failed', 'useless',
'pathetic', 'disgusting', 'annoying', 'waste', 'regret', 'problem', 'issue',
'not working', 'doesn\'t work', 'never', 'delayed', 'late', 'slow', 'error',
'खराब', 'समस्या', 'देर', 'नहीं', 'बुरा']
# Strong negative phrases (count as 2 points)
negative_phrases = ['too late', 'never comes', 'not received', 'doesn\'t arrive',
'delayed', 'not working', 'बहुत देर', 'नहीं आता']
pos_count = sum(1 for word in positive_words if word in review_lower)
neg_count = sum(1 for word in negative_words if word in review_lower)
# Check for negative phrases (stronger signal)
for phrase in negative_phrases:
if phrase in review_lower:
neg_count += 2
if pos_count > neg_count:
return 'Positive'
elif neg_count > pos_count:
return 'Negative'
else:
return 'Neutral'
def _extract_simple_aspects(self, review: str) -> List[str]:
"""Extract aspects using enhanced keyword matching."""
review_lower = review.lower()
aspects = []
# Enhanced aspect keywords with Hindi/English variants
aspect_keywords = {
'OTP/Verification': ['otp', 'atp', 'verification', 'verify', 'code', 'pin', 'authentication', 'ओटीपी', 'कोड', 'सत्यापन'],
'Login/Account': ['login', 'sign in', 'signin', 'account', 'password', 'username', 'register', 'signup', 'लॉगिन', 'खाता'],
'App Performance': ['app', 'application', 'crash', 'freeze', 'hang', 'loading', 'lag', 'slow', 'एप', 'एप्लिकेशन'],
'Payment': ['payment', 'pay', 'transaction', 'refund', 'money', 'bank', 'upi', 'wallet', 'भुगतान', 'पैसा'],
'Quality': ['quality', 'build', 'material', 'construction', 'durability', 'solid', 'sturdy', 'cheap', 'flimsy', 'गुणवत्ता'],
'Price': ['price', 'cost', 'expensive', 'cheap', 'value', 'money', 'affordable', 'budget', 'worth', 'कीमत', 'दाम'],
'Service': ['service', 'support', 'help', 'staff', 'customer', 'response', 'team', 'representative', 'सेवा', 'सहायता'],
'Delivery': ['delivery', 'shipping', 'fast', 'quick', 'slow', 'delayed', 'arrive', 'package', 'डिलीवरी', 'शिपिंग'],
'Design': ['design', 'look', 'appearance', 'beautiful', 'ugly', 'style', 'color', 'aesthetic', 'डिज़ाइन', 'रूप'],
'Performance': ['performance', 'speed', 'fast', 'slow', 'efficiency', 'works', 'function', 'smooth', 'प्रदर्शन'],
'Usability': ['easy', 'difficult', 'user', 'interface', 'intuitive', 'complex', 'simple', 'confusing', 'उपयोग'],
'Features': ['feature', 'function', 'capability', 'option', 'setting', 'mode', 'tool', 'फीचर', 'सुविधा'],
'Size': ['size', 'big', 'small', 'large', 'compact', 'tiny', 'huge', 'dimension', 'आकार'],
'Battery': ['battery', 'charge', 'power', 'energy', 'last', 'drain', 'life', 'बैटरी', 'चार्ज']
}
for aspect, keywords in aspect_keywords.items():
if any(keyword in review_lower for keyword in keywords):
aspects.append(aspect)
# Default aspect if none found
if not aspects:
aspects = ['General']
return aspects
class IntentClassifier:
"""Enhanced intent classifier with severity scoring for complaints and type classification."""
INTENT_KEYWORDS = {
'complaint': {
'high_severity': ['terrible', 'worst', 'horrible', 'awful', 'hate', 'useless', 'trash', 'garbage', 'waste', 'pathetic'],
'medium_severity': ['bad', 'disappointed', 'frustrated', 'annoying', 'poor', 'defective', 'broken', 'failed'],
'low_severity': ['problem', 'issue', 'concern', 'slow', 'okay but', 'could be better']
},
'praise': {
'high_positive': ['excellent', 'amazing', 'fantastic', 'wonderful', 'perfect', 'outstanding', 'brilliant', 'superb'],
'medium_positive': ['good', 'great', 'love', 'best', 'awesome', 'nice', 'satisfied', 'happy'],
'low_positive': ['fine', 'decent', 'acceptable', 'adequate', 'reasonable']
},
'question': ['how', 'what', 'when', 'where', 'why', 'which', 'who', '?', 'can you', 'could you', 'is it possible'],
'suggestion': ['should', 'could', 'would', 'recommend', 'suggest', 'improve', 'better', 'enhancement', 'feature request'],
'comparison': ['better than', 'worse than', 'compared to', 'versus', 'vs', 'similar to', 'different from'],
'neutral': ['okay', 'fine', 'average', 'normal', 'standard', 'typical', 'nothing special']
}
@classmethod
def classify_intent_with_severity(cls, review: str) -> Tuple[str, str, float]:
"""
Classify intent with severity/type scoring.
Args:
review: Review text
Returns:
Tuple of (intent, severity/type, confidence_score)
"""
review_lower = review.lower()
# Check for complaints with severity
complaint_scores = {}
for severity, keywords in cls.INTENT_KEYWORDS['complaint'].items():
score = sum(1 for keyword in keywords if keyword in review_lower)
if score > 0:
complaint_scores[severity] = score
if complaint_scores:
severity = max(complaint_scores, key=complaint_scores.get)
confidence = min(complaint_scores[severity] / 3.0, 1.0) # Normalize confidence
return 'complaint', severity, confidence
# Check for praise with positivity level
praise_scores = {}
for positivity, keywords in cls.INTENT_KEYWORDS['praise'].items():
score = sum(1 for keyword in keywords if keyword in review_lower)
if score > 0:
praise_scores[positivity] = score
if praise_scores:
positivity = max(praise_scores, key=praise_scores.get)
confidence = min(praise_scores[positivity] / 3.0, 1.0)
return 'praise', positivity, confidence
# Check other intents
for intent, keywords in cls.INTENT_KEYWORDS.items():
if intent not in ['complaint', 'praise']:
score = sum(1 for keyword in keywords if keyword in review_lower)
if score > 0:
confidence = min(score / 2.0, 1.0)
return intent, 'standard', confidence
return 'neutral', 'standard', 0.1
@classmethod
def classify_batch_enhanced(cls, reviews: List[str]) -> List[Dict[str, Any]]:
"""Classify intents with enhanced information for a batch of reviews."""
results = []
for review in reviews:
intent, severity_type, confidence = cls.classify_intent_with_severity(review)
results.append({
'intent': intent,
'severity_type': severity_type,
'confidence': confidence
})
return results
class AspectAnalytics:
"""Advanced analytics for aspect analysis including priority scoring and co-occurrence."""
@staticmethod
def calculate_aspect_scores(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Calculate priority scores for negative aspects and strength scores for positive aspects.
Args:
df: Processed dataframe with aspects and sentiments
Returns:
Tuple of (areas_of_improvement_df, strength_anchors_df)
"""
aspect_data = []
# Extract aspect-sentiment pairs with intent information
for idx, row in df.iterrows():
aspects = row['aspects'] if isinstance(row['aspects'], list) else []
sentiments = row['aspect_sentiments'] if isinstance(row['aspect_sentiments'], list) else []
intent = row['intent']
intent_severity = row.get('intent_severity', 'standard')
date = row['date']
for aspect, sentiment in zip(aspects, sentiments):
aspect_data.append({
'aspect': aspect,
'sentiment': sentiment,
'intent': intent,
'intent_severity': intent_severity,
'date': date,
'review_id': row['id']
})
if not aspect_data:
empty_df = pd.DataFrame(columns=['aspect', 'score', 'frequency', 'sentiment_ratio'])
return empty_df, empty_df
aspect_df = pd.DataFrame(aspect_data)
# Calculate metrics for each aspect
aspect_metrics = {}
for aspect in aspect_df['aspect'].unique():
aspect_subset = aspect_df[aspect_df['aspect'] == aspect]
total_count = len(aspect_subset)
positive_count = len(aspect_subset[aspect_subset['sentiment'] == 'Positive'])
negative_count = len(aspect_subset[aspect_subset['sentiment'] == 'Negative'])
# Calculate sentiment ratios
positivity_ratio = positive_count / total_count if total_count > 0 else 0
negativity_ratio = negative_count / total_count if total_count > 0 else 0
# Calculate intent severity weighting
complaint_subset = aspect_subset[aspect_subset['intent'] == 'complaint']
severity_weight = 0
if len(complaint_subset) > 0:
severity_mapping = {'high_severity': 3, 'medium_severity': 2, 'low_severity': 1, 'standard': 1}
severity_weight = complaint_subset['intent_severity'].map(severity_mapping).mean()
# Calculate priority score for negative aspects
priority_score = negativity_ratio * total_count * (1 + severity_weight)
# Calculate strength score for positive aspects
strength_score = positivity_ratio * total_count * (1 + (positivity_ratio * 2))
aspect_metrics[aspect] = {
'frequency': total_count,
'positivity_ratio': positivity_ratio,
'negativity_ratio': negativity_ratio,
'priority_score': priority_score,
'strength_score': strength_score,
'intent_severity': severity_weight
}
# Create Areas of Improvement DataFrame (top negative aspects)
improvement_data = []
for aspect, metrics in aspect_metrics.items():
if metrics['negativity_ratio'] > 0.1: # Only include aspects with >10% negativity
improvement_data.append({
'aspect': aspect,
'negativity_pct': round(metrics['negativity_ratio'] * 100, 1),
'intent_severity': round(metrics['intent_severity'], 2),
'frequency': metrics['frequency'],
'priority_score': round(metrics['priority_score'], 2)
})
# Handle empty DataFrame case
if improvement_data:
areas_of_improvement = pd.DataFrame(improvement_data).sort_values('priority_score', ascending=False)
else:
# Return empty DataFrame with correct columns
areas_of_improvement = pd.DataFrame(columns=['aspect', 'negativity_pct', 'intent_severity', 'frequency', 'priority_score'])
# Create Strength Anchors DataFrame (top positive aspects)
strength_data = []
for aspect, metrics in aspect_metrics.items():
if metrics['positivity_ratio'] > 0.3: # Only include aspects with >30% positivity
strength_data.append({
'aspect': aspect,
'positivity_pct': round(metrics['positivity_ratio'] * 100, 1),
'intent_type': 'praise', # Simplified for now
'frequency': metrics['frequency'],
'strength_score': round(metrics['strength_score'], 2)
})
# Handle empty DataFrame case
if strength_data:
strength_anchors = pd.DataFrame(strength_data).sort_values('strength_score', ascending=False)
else:
# Return empty DataFrame with correct columns
strength_anchors = pd.DataFrame(columns=['aspect', 'positivity_pct', 'intent_type', 'frequency', 'strength_score'])
return areas_of_improvement, strength_anchors
@staticmethod
def calculate_aspect_cooccurrence(df: pd.DataFrame) -> nx.Graph:
"""
Calculate aspect co-occurrence for network analysis.
Args:
df: Processed dataframe with aspects
Returns:
NetworkX graph with aspect co-occurrence data
"""
G = nx.Graph()
# Calculate co-occurrence matrix
cooccurrence_counts = defaultdict(int)
aspect_sentiments = defaultdict(list)
aspect_frequencies = defaultdict(int)
for idx, row in df.iterrows():
aspects = row['aspects'] if isinstance(row['aspects'], list) else []
sentiments = row['aspect_sentiments'] if isinstance(row['aspect_sentiments'], list) else []
# Count individual aspects
for aspect, sentiment in zip(aspects, sentiments):
aspect_frequencies[aspect] += 1
aspect_sentiments[aspect].append(sentiment)
# Count co-occurrences
for aspect1, aspect2 in combinations(aspects, 2):
pair = tuple(sorted([aspect1, aspect2]))
cooccurrence_counts[pair] += 1
# Add nodes with attributes
for aspect, frequency in aspect_frequencies.items():
sentiments = aspect_sentiments[aspect]
positive_pct = sentiments.count('Positive') / len(sentiments) if sentiments else 0
negative_pct = sentiments.count('Negative') / len(sentiments) if sentiments else 0
# Determine overall sentiment color
if positive_pct > negative_pct:
color = 'green'
sentiment_score = positive_pct
elif negative_pct > positive_pct:
color = 'red'
sentiment_score = -negative_pct
else:
color = 'gray'
sentiment_score = 0
G.add_node(aspect,
frequency=frequency,
sentiment_score=sentiment_score,
color=color,
positive_pct=positive_pct,
negative_pct=negative_pct)
# Add edges with weights
for (aspect1, aspect2), count in cooccurrence_counts.items():
if count >= 2: # Only include co-occurrences that happen at least twice
G.add_edge(aspect1, aspect2, weight=count)
return G
@staticmethod
def detect_sentiment_spikes(df: pd.DataFrame, window_days: int = 7) -> List[Dict[str, Any]]:
"""
Detect week-over-week spikes in negative sentiment for aspects.
Args:
df: Processed dataframe with date and aspect information
window_days: Number of days for the rolling window
Returns:
List of alerts for aspects with significant negative spikes
"""
alerts = []
if len(df) < 2:
return alerts
# Extract aspect-sentiment data with dates
aspect_data = []
for idx, row in df.iterrows():
aspects = row['aspects'] if isinstance(row['aspects'], list) else []
sentiments = row['aspect_sentiments'] if isinstance(row['aspect_sentiments'], list) else []
date = row['date']
for aspect, sentiment in zip(aspects, sentiments):
aspect_data.append({
'aspect': aspect,
'sentiment': sentiment,
'date': date
})
if not aspect_data:
return alerts
aspect_df = pd.DataFrame(aspect_data)
aspect_df['date'] = pd.to_datetime(aspect_df['date'])
# Group by aspect and analyze trends
for aspect in aspect_df['aspect'].unique():
aspect_subset = aspect_df[aspect_df['aspect'] == aspect]
if len(aspect_subset) < 4: # Need minimum data points
continue
# Create daily negative sentiment counts
daily_negative = aspect_subset[aspect_subset['sentiment'] == 'Negative'].groupby(
aspect_subset['date'].dt.date
).size().reindex(
pd.date_range(aspect_subset['date'].min().date(),
aspect_subset['date'].max().date()).date,
fill_value=0
)
if len(daily_negative) >= window_days * 2:
# Calculate rolling averages
recent_avg = daily_negative.tail(window_days).mean()
previous_avg = daily_negative.iloc[-(window_days*2):-window_days].mean()
# Check for spike (>50% increase and at least 2 more complaints)
if recent_avg > previous_avg * 1.5 and (recent_avg - previous_avg) >= 2:
spike_magnitude = ((recent_avg - previous_avg) / previous_avg * 100) if previous_avg > 0 else 100
alerts.append({
'aspect': aspect,
'spike_magnitude': round(spike_magnitude, 1),
'recent_avg_negative': round(recent_avg, 1),
'previous_avg_negative': round(previous_avg, 1),
'alert_severity': 'high' if spike_magnitude > 100 else 'medium'
})
return sorted(alerts, key=lambda x: x['spike_magnitude'], reverse=True)
class SummaryGenerator:
"""Generates macro and micro-level summaries for aspects and overall sentiment."""
@staticmethod
def generate_macro_summary(df: pd.DataFrame, areas_of_improvement: pd.DataFrame,
strength_anchors: pd.DataFrame) -> Dict[str, str]:
"""
Generate high-level summary of sentiment analysis.
Args:
df: Processed dataframe
areas_of_improvement: Problem areas dataframe
strength_anchors: Strength areas dataframe
Returns:
Dictionary with macro-level insights
"""
total_reviews = len(df)
positive_pct = (df['overall_sentiment'] == 'Positive').mean() * 100
negative_pct = (df['overall_sentiment'] == 'Negative').mean() * 100
# Top issues and strengths
top_issue = areas_of_improvement.iloc[0]['aspect'] if len(areas_of_improvement) > 0 else "None identified"
top_strength = strength_anchors.iloc[0]['aspect'] if len(strength_anchors) > 0 else "None identified"
# Intent distribution
complaint_pct = (df['intent'] == 'complaint').mean() * 100
summary = {
'overall_sentiment': f"Out of {total_reviews} reviews, {positive_pct:.1f}% are positive and {negative_pct:.1f}% are negative.",
'top_issues': f"Primary concern: '{top_issue}' - requires immediate attention based on complaint frequency and severity.",
'top_strengths': f"Greatest strength: '{top_strength}' - leverage this positive aspect in marketing and product positioning.",
'intent_insights': f"{complaint_pct:.1f}% of reviews contain complaints, indicating specific areas for product improvement.",
'recommendation': "Focus on addressing top negative aspects while maintaining and promoting strengths."
}
return summary
@staticmethod
def generate_aspect_micro_summaries(df: pd.DataFrame, top_aspects: List[str],
max_aspects: int = 5) -> Dict[str, str]:
"""
Generate detailed summaries for specific aspects.
Args:
df: Processed dataframe
top_aspects: List of aspects to analyze
max_aspects: Maximum number of aspects to summarize
Returns:
Dictionary with aspect-specific insights
"""
summaries = {}
for aspect in top_aspects[:max_aspects]:
# Filter reviews mentioning this aspect
aspect_reviews = []
aspect_sentiments = []
for idx, row in df.iterrows():
aspects = row['aspects'] if isinstance(row['aspects'], list) else []
sentiments = row['aspect_sentiments'] if isinstance(row['aspect_sentiments'], list) else []
if aspect in aspects:
aspect_idx = aspects.index(aspect)
if aspect_idx < len(sentiments):
aspect_reviews.append(row['translated_review'])
aspect_sentiments.append(sentiments[aspect_idx])
if not aspect_reviews:
continue
positive_count = aspect_sentiments.count('Positive')
negative_count = aspect_sentiments.count('Negative')
total_count = len(aspect_sentiments)
# Generate contextual summary
if negative_count > positive_count:
sentiment_trend = "predominantly negative"
key_issues = "Issues include quality concerns, performance problems, and user dissatisfaction."
elif positive_count > negative_count:
sentiment_trend = "predominantly positive"
key_issues = "Users appreciate the quality, performance, and overall experience."
else:
sentiment_trend = "mixed"
key_issues = "Reviews show both satisfaction and areas for improvement."
summaries[aspect] = f"'{aspect}' mentioned in {total_count} reviews with {sentiment_trend} sentiment ({positive_count} positive, {negative_count} negative). {key_issues}"
return summaries
class DataProcessor:
"""Enhanced main data processing pipeline coordinator."""
def __init__(self):
self.translator = TranslationService()
self.absa_processor = ABSAProcessor()
self.validator = DataValidator()
self.analytics = AspectAnalytics()
self.summary_generator = SummaryGenerator()
self.task_manager = None # Will be set by API
def set_task_manager(self, task_manager):
"""Set task manager for cancellation support."""
self.task_manager = task_manager
# Also set it for ABSA processor
self.absa_processor.set_task_manager(task_manager)
def process_uploaded_data(self, df: pd.DataFrame, task_id: Optional[str] = None) -> Dict[str, Any]:
"""
Complete enhanced processing pipeline for uploaded data with cancellation support.
Args:
df: Uploaded dataframe
task_id: Optional task ID for cancellation tracking
Returns:
Dictionary containing all processed results with advanced analytics
"""
# Import gc for cleanup
import gc
# Check cancellation at start
if task_id and self.task_manager and self.task_manager.is_cancelled(task_id):
return {'status': 'cancelled', 'message': 'Task cancelled by user'}
# Validate data
if task_id and self.task_manager:
self.task_manager.update_task(task_id, stage='validation', progress=5, status='processing')
is_valid, errors = self.validator.validate_csv(df)
if not is_valid:
return {'error': errors}
# Clean data
df_clean = self.validator.clean_data(df)
reviews = df_clean['review'].tolist()
# Check cancellation before translation
if task_id and self.task_manager and self.task_manager.is_cancelled(task_id):
del df_clean, reviews
gc.collect()
return {'status': 'cancelled', 'message': 'Task cancelled during validation'}
# Translation with batch processing and cancellation checks
if task_id and self.task_manager:
self.task_manager.update_task(task_id, stage='translation', progress=10)
with st.spinner("Translating reviews..."):
translated_reviews = []
detected_languages = []
# Process in batches to allow cancellation
batch_size = 10
for i in range(0, len(reviews), batch_size):
# Check cancellation
if task_id and self.task_manager and self.task_manager.is_cancelled(task_id):
del df_clean, reviews, translated_reviews, detected_languages
gc.collect()
return {'status': 'cancelled', 'message': 'Task cancelled during translation'}
batch = reviews[i:i+batch_size]
batch_translated, batch_langs = self.translator.process_reviews(batch)
translated_reviews.extend(batch_translated)
detected_languages.extend(batch_langs)
# Update progress (10-40%)
if task_id and self.task_manager:
progress = 10 + int((i / len(reviews)) * 30)
self.task_manager.update_task(task_id, progress=progress)
# Check cancellation before intent classification
if task_id and self.task_manager and self.task_manager.is_cancelled(task_id):
del df_clean, reviews, translated_reviews, detected_languages
gc.collect()
return {'status': 'cancelled', 'message': 'Task cancelled after translation'}
# Enhanced intent classification
if task_id and self.task_manager:
self.task_manager.update_task(task_id, stage='intent_classification', progress=40)
with st.spinner("Classifying intents with severity analysis..."):
intent_results = IntentClassifier.classify_batch_enhanced(translated_reviews)
# Check cancellation before ABSA (most expensive step)
if task_id and self.task_manager and self.task_manager.is_cancelled(task_id):
del df_clean, reviews, translated_reviews, detected_languages, intent_results
gc.collect()
return {'status': 'cancelled', 'message': 'Task cancelled after intent classification'}
# ABSA processing with cancellation support
if task_id and self.task_manager:
self.task_manager.update_task(task_id, stage='aspect_extraction', progress=50)
with st.spinner("Extracting aspects and sentiments..."):
absa_results = self.absa_processor.extract_aspects_and_sentiments(
translated_reviews,
task_id=task_id
)
# Check if ABSA was cancelled
if isinstance(absa_results, dict) and absa_results.get('status') == 'cancelled':
del df_clean, reviews, translated_reviews, detected_languages, intent_results
gc.collect()
return absa_results
# Combine results with enhanced structure
if task_id and self.task_manager:
self.task_manager.update_task(task_id, stage='combining_results', progress=90)
df_processed = df_clean.copy()
df_processed['translated_review'] = translated_reviews
df_processed['detected_language'] = detected_languages
df_processed['intent'] = [r['intent'] for r in intent_results]
df_processed['intent_severity'] = [r['severity_type'] for r in intent_results]
df_processed['intent_confidence'] = [r['confidence'] for r in intent_results]
# Add ABSA results with enhanced structure
aspects_list = []
aspect_sentiments_list = []
overall_sentiment = []
for result in absa_results:
aspects_list.append(result['aspects'])
aspect_sentiments_list.append(result['sentiments'])
# Calculate overall sentiment
if result['sentiments']:
positive_count = result['sentiments'].count('Positive')
negative_count = result['sentiments'].count('Negative')
if positive_count > negative_count:
overall_sentiment.append('Positive')
elif negative_count > positive_count:
overall_sentiment.append('Negative')
else:
overall_sentiment.append('Neutral')
else:
overall_sentiment.append('Neutral')
df_processed['aspects'] = aspects_list
df_processed['aspect_sentiments'] = aspect_sentiments_list
df_processed['overall_sentiment'] = overall_sentiment
# Advanced analytics
if task_id and self.task_manager:
self.task_manager.update_task(task_id, stage='analytics', progress=95)
with st.spinner("Calculating aspect analytics and priority scores..."):
areas_of_improvement, strength_anchors = self.analytics.calculate_aspect_scores(df_processed)
aspect_network = self.analytics.calculate_aspect_cooccurrence(df_processed)
sentiment_alerts = self.analytics.detect_sentiment_spikes(df_processed)
# Generate summaries
with st.spinner("Generating AI-powered summaries..."):
macro_summary = self.summary_generator.generate_macro_summary(
df_processed, areas_of_improvement, strength_anchors
)
# Get top aspects for micro summaries
top_negative_aspects = areas_of_improvement['aspect'].head(3).tolist() if len(areas_of_improvement) > 0 else []
top_positive_aspects = strength_anchors['aspect'].head(3).tolist() if len(strength_anchors) > 0 else []
top_aspects = top_negative_aspects + top_positive_aspects
micro_summaries = self.summary_generator.generate_aspect_micro_summaries(
df_processed, top_aspects
)
# Mark task as complete
if task_id and self.task_manager:
self.task_manager.update_task(task_id, stage='completed', progress=100)
# ========== NEW: ASPECT-LEVEL DATA TRANSFORMATION ==========
aspect_level_data = []
mixed_sentiment_reviews = []
for idx, row in df_processed.iterrows():
aspects = row['aspects'] if isinstance(row['aspects'], list) else []
aspect_sentiments = row['aspect_sentiments'] if isinstance(row['aspect_sentiments'], list) else []
# Check for mixed sentiments (conflicting aspect sentiments)
unique_sentiments = set(aspect_sentiments)
is_mixed = ('Positive' in unique_sentiments and 'Negative' in unique_sentiments)
if is_mixed:
mixed_sentiment_reviews.append({
'review_id': row['id'],
'review': row['review'],
'aspects': aspects,
'aspect_sentiments': aspect_sentiments,
'intent': row['intent'],
'date': row['date']
})
# Create aspect-level records
for aspect, sentiment in zip(aspects, aspect_sentiments):
aspect_level_data.append({
'review_id': row['id'],
'review': row['review'],
'aspect': aspect,
'aspect_sentiment': sentiment,
'overall_sentiment': row['overall_sentiment'],
'intent': row['intent'],
'intent_severity': row['intent_severity'],
'date': row['date'],
'language': row['detected_language']
})
aspect_level_df = pd.DataFrame(aspect_level_data) if aspect_level_data else pd.DataFrame()
mixed_sentiment_df = pd.DataFrame(mixed_sentiment_reviews) if mixed_sentiment_reviews else pd.DataFrame()
return {
'processed_data': df_processed,
'aspect_level_data': aspect_level_df, # NEW: Aspect-level granular data
'mixed_sentiment_reviews': mixed_sentiment_df, # NEW: Mixed sentiment detection
'absa_details': absa_results,
'areas_of_improvement': areas_of_improvement,
'strength_anchors': strength_anchors,
'aspect_network': aspect_network,
'sentiment_alerts': sentiment_alerts,
'macro_summary': macro_summary,
'micro_summaries': micro_summaries,
'summary': {
'total_reviews': len(df_processed),
'total_aspects': len(aspect_level_df),
'mixed_sentiment_count': len(mixed_sentiment_df),
'mixed_sentiment_pct': round(len(mixed_sentiment_df) / len(df_processed) * 100, 1) if len(df_processed) > 0 else 0,
'languages_detected': list(set(detected_languages)),
'intents_distribution': pd.Series([r['intent'] for r in intent_results]).value_counts().to_dict(),
'sentiment_distribution': pd.Series(overall_sentiment).value_counts().to_dict(),
'top_problem_areas': len(areas_of_improvement),
'top_strength_anchors': len(strength_anchors),
'active_alerts': len(sentiment_alerts)
}
}