""" Data processing utilities for sentiment analysis Handles aggregation, grouping, and transformation operations """ import pandas as pd import numpy as np from typing import List, Dict, Tuple class SentimentDataProcessor: """ Processes sentiment data for visualization """ @staticmethod def aggregate_by_dimensions(df, group_by_cols, agg_cols=None): """ Aggregate data by specified dimensions Args: df: Sentiment dataframe group_by_cols: List of columns to group by agg_cols: Dictionary of columns and aggregation functions Returns: pd.DataFrame: Aggregated dataframe """ if agg_cols is None: agg_cols = { 'comment_sk': 'count', 'requires_reply': 'sum' } return df.groupby(group_by_cols, as_index=False).agg(agg_cols) @staticmethod def get_sentiment_distribution(df, group_by=None): """ Calculate sentiment distribution Args: df: Sentiment dataframe group_by: Optional column(s) to group by Returns: pd.DataFrame: Sentiment distribution """ if group_by: # Group by specified columns and sentiment if isinstance(group_by, str): group_by = [group_by] sentiment_counts = df.groupby( group_by + ['sentiment_polarity'], as_index=False ).size().rename(columns={'size': 'count'}) # Calculate percentages within each group sentiment_counts['percentage'] = sentiment_counts.groupby(group_by)['count'].transform( lambda x: (x / x.sum() * 100).round(2) ) else: # Overall sentiment distribution sentiment_counts = df['sentiment_polarity'].value_counts().reset_index() sentiment_counts.columns = ['sentiment_polarity', 'count'] sentiment_counts['percentage'] = ( sentiment_counts['count'] / sentiment_counts['count'].sum() * 100 ).round(2) return sentiment_counts @staticmethod def get_intent_distribution(df, group_by=None): """ Calculate intent distribution (handles multi-label) Args: df: Sentiment dataframe group_by: Optional column(s) to group by Returns: pd.DataFrame: Intent distribution """ # Explode intents (split comma-separated values) df_exploded = df.copy() df_exploded['intent'] = df_exploded['intent'].str.split(',') df_exploded = df_exploded.explode('intent') df_exploded['intent'] = df_exploded['intent'].str.strip() if group_by: # Group by specified columns and intent if isinstance(group_by, str): group_by = [group_by] intent_counts = df_exploded.groupby( group_by + ['intent'], as_index=False ).size().rename(columns={'size': 'count'}) # Calculate percentages within each group intent_counts['percentage'] = intent_counts.groupby(group_by)['count'].transform( lambda x: (x / x.sum() * 100).round(2) ) else: # Overall intent distribution intent_counts = df_exploded['intent'].value_counts().reset_index() intent_counts.columns = ['intent', 'count'] intent_counts['percentage'] = ( intent_counts['count'] / intent_counts['count'].sum() * 100 ).round(2) return intent_counts @staticmethod def get_content_summary(df): """ Get summary statistics for each content Args: df: Sentiment dataframe Returns: pd.DataFrame: Content summary with statistics """ # Group by content (dropna=False to include records with NULL permalink_url, e.g., YouTube) content_summary = df.groupby(['content_sk', 'content_description', 'permalink_url'], dropna=False).agg({ 'comment_sk': 'count', 'requires_reply': 'sum', 'sentiment_polarity': lambda x: x.mode()[0] if len(x.mode()) > 0 else 'unknown' }).reset_index() content_summary.columns = [ 'content_sk', 'content_description', 'permalink_url', 'total_comments', 'reply_required_count', 'dominant_sentiment' ] # Calculate negative sentiment percentage for each content negative_sentiments = ['negative', 'very_negative'] content_negative = df[df['sentiment_polarity'].isin(negative_sentiments)].groupby( 'content_sk' ).size().reset_index(name='negative_count') content_summary = content_summary.merge(content_negative, on='content_sk', how='left') content_summary['negative_count'] = content_summary['negative_count'].fillna(0) content_summary['negative_percentage'] = ( content_summary['negative_count'] / content_summary['total_comments'] * 100 ).round(2) # Calculate severity score (balances percentage and volume) # Formula: negative_percentage * sqrt(total_comments) # This gives weight to both high negative % and high comment volume content_summary['severity_score'] = ( content_summary['negative_percentage'] * (content_summary['total_comments'] ** 0.5) ).round(2) return content_summary @staticmethod def get_top_poor_sentiment_contents(df, top_n=10, min_comments=1, sort_by='severity_score'): """ Get contents with highest poor sentiment based on selected criteria Args: df: Sentiment dataframe top_n: Number of top contents to return min_comments: Minimum number of comments a content must have to be included sort_by: Sorting criteria - 'severity_score', 'negative_percentage', 'negative_count', 'total_comments' Returns: pd.DataFrame: Top contents with poor sentiment """ content_summary = SentimentDataProcessor.get_content_summary(df) # Filter by minimum comments content_summary = content_summary[content_summary['total_comments'] >= min_comments] # Determine sort columns based on sort_by parameter if sort_by == 'severity_score': # Sort by severity score (balanced), then by negative percentage as tie-breaker sort_columns = ['severity_score', 'negative_percentage'] elif sort_by == 'negative_percentage': # Sort by negative percentage, then by total comments sort_columns = ['negative_percentage', 'total_comments'] elif sort_by == 'negative_count': # Sort by absolute negative count, then by negative percentage sort_columns = ['negative_count', 'negative_percentage'] elif sort_by == 'total_comments': # Sort by total comments volume sort_columns = ['total_comments', 'negative_count'] else: # Default to severity score sort_columns = ['severity_score', 'negative_percentage'] # Sort and get top N top_poor = content_summary.sort_values( by=sort_columns, ascending=[False, False] ).head(top_n) return top_poor @staticmethod def get_comments_requiring_reply(df): """ Get all comments that require reply Args: df: Sentiment dataframe Returns: pd.DataFrame: Comments requiring reply """ reply_df = df[df['requires_reply'] == True].copy() # Sort by timestamp (most recent first) if 'comment_timestamp' in reply_df.columns: reply_df = reply_df.sort_values('comment_timestamp', ascending=False) return reply_df @staticmethod def get_platform_brand_summary(df): """ Get summary statistics by platform and brand Args: df: Sentiment dataframe Returns: pd.DataFrame: Platform and brand summary """ summary = df.groupby(['platform', 'brand']).agg({ 'comment_sk': 'count', 'requires_reply': 'sum' }).reset_index() summary.columns = ['platform', 'brand', 'total_comments', 'reply_required'] # Add sentiment distribution sentiment_dist = SentimentDataProcessor.get_sentiment_distribution( df, group_by=['platform', 'brand'] ) # Pivot sentiment distribution sentiment_pivot = sentiment_dist.pivot_table( index=['platform', 'brand'], columns='sentiment_polarity', values='count', fill_value=0 ).reset_index() # Merge with summary summary = summary.merge(sentiment_pivot, on=['platform', 'brand'], how='left') return summary @staticmethod def get_temporal_trends(df, freq='D'): """ Get temporal trends of sentiment over time Args: df: Sentiment dataframe freq: Frequency for aggregation ('D'=daily, 'W'=weekly, 'M'=monthly) Returns: pd.DataFrame: Temporal sentiment trends """ if 'comment_timestamp' not in df.columns: return pd.DataFrame() df_temporal = df.copy() df_temporal['date'] = pd.to_datetime(df_temporal['comment_timestamp']).dt.to_period(freq) # Aggregate by date and sentiment trends = df_temporal.groupby(['date', 'sentiment_polarity']).size().reset_index(name='count') trends['date'] = trends['date'].dt.to_timestamp() return trends @staticmethod def calculate_sentiment_score(df): """ Calculate weighted sentiment score Args: df: Sentiment dataframe Returns: float: Average sentiment score (-2 to +2) """ sentiment_weights = { 'very_negative': -2, 'negative': -1, 'neutral': 0, 'positive': 1, 'very_positive': 2 } df['sentiment_score'] = df['sentiment_polarity'].map(sentiment_weights) return df['sentiment_score'].mean() @staticmethod def get_language_distribution(df): """ Get distribution of detected languages Args: df: Sentiment dataframe Returns: pd.DataFrame: Language distribution """ if 'detected_language' not in df.columns: return pd.DataFrame() lang_dist = df['detected_language'].value_counts().reset_index() lang_dist.columns = ['language', 'count'] lang_dist['percentage'] = (lang_dist['count'] / lang_dist['count'].sum() * 100).round(2) return lang_dist @staticmethod def get_sentiment_filtered_contents(df, selected_sentiments=None, selected_intents=None, top_n=10, min_comments=1, sort_by='severity_score'): """ Get contents filtered by selected sentiments and intents with dynamic sorting Args: df: Sentiment dataframe selected_sentiments: List of sentiments to filter by (filters by dominant sentiment) selected_intents: List of intents to filter by (content must have at least one comment with these intents) top_n: Number of top contents to return min_comments: Minimum number of comments a content must have sort_by: Sorting criteria - 'severity_score', 'sentiment_percentage', 'sentiment_count', 'total_comments' Returns: pd.DataFrame: Filtered and sorted contents """ content_summary = SentimentDataProcessor.get_content_summary(df) # Filter by minimum comments content_summary = content_summary[content_summary['total_comments'] >= min_comments] # If no sentiments selected, default to all sentiments if not selected_sentiments: selected_sentiments = df['sentiment_polarity'].unique().tolist() # Filter by dominant sentiment content_summary = content_summary[content_summary['dominant_sentiment'].isin(selected_sentiments)] # Filter by intents if specified if selected_intents: # Get content_sks that have at least one comment with the selected intents content_sks_with_intent = set() for intent in selected_intents: matching_contents = df[df['intent'].str.contains(intent, na=False, case=False)]['content_sk'].unique() content_sks_with_intent.update(matching_contents) content_summary = content_summary[content_summary['content_sk'].isin(content_sks_with_intent)] # Calculate percentage and count for selected sentiments sentiment_counts = df[df['sentiment_polarity'].isin(selected_sentiments)].groupby( 'content_sk' ).size().reset_index(name='selected_sentiment_count') content_summary = content_summary.merge(sentiment_counts, on='content_sk', how='left') content_summary['selected_sentiment_count'] = content_summary['selected_sentiment_count'].fillna(0) content_summary['selected_sentiment_percentage'] = ( content_summary['selected_sentiment_count'] / content_summary['total_comments'] * 100 ).round(2) # Calculate dynamic severity score based on selected sentiments content_summary['dynamic_severity_score'] = ( content_summary['selected_sentiment_percentage'] * (content_summary['total_comments'] ** 0.5) ).round(2) # Determine sort columns based on sort_by parameter if sort_by == 'severity_score': sort_columns = ['dynamic_severity_score', 'selected_sentiment_percentage'] elif sort_by == 'sentiment_percentage': sort_columns = ['selected_sentiment_percentage', 'total_comments'] elif sort_by == 'sentiment_count': sort_columns = ['selected_sentiment_count', 'selected_sentiment_percentage'] elif sort_by == 'total_comments': sort_columns = ['total_comments', 'selected_sentiment_count'] else: sort_columns = ['dynamic_severity_score', 'selected_sentiment_percentage'] # Sort and get top N filtered_contents = content_summary.sort_values( by=sort_columns, ascending=[False, False] ).head(top_n) return filtered_contents @staticmethod def get_demographics_distribution(df, demographic_field, filter_platform='musora_app'): """ Get distribution of a demographic field (only for specified platform) Args: df: Sentiment dataframe with demographic fields demographic_field: Field to analyze ('age_group', 'timezone', 'timezone_region', 'experience_level', 'experience_group') filter_platform: Platform to filter (default: 'musora_app') Returns: pd.DataFrame: Distribution with count and percentage """ # Filter for specified platform only if filter_platform and 'platform' in df.columns: df_filtered = df[df['platform'] == filter_platform].copy() else: df_filtered = df.copy() if df_filtered.empty or demographic_field not in df_filtered.columns: return pd.DataFrame() # Remove 'Unknown' and null values df_filtered = df_filtered[ (df_filtered[demographic_field].notna()) & (df_filtered[demographic_field] != 'Unknown') ] if df_filtered.empty: return pd.DataFrame() # Count distribution distribution = df_filtered[demographic_field].value_counts().reset_index() distribution.columns = [demographic_field, 'count'] # Calculate percentage distribution['percentage'] = ( distribution['count'] / distribution['count'].sum() * 100 ).round(2) # Sort by count descending distribution = distribution.sort_values('count', ascending=False) return distribution @staticmethod def get_demographics_by_sentiment(df, demographic_field, filter_platform='musora_app'): """ Get sentiment distribution for each demographic group Args: df: Sentiment dataframe with demographic fields demographic_field: Field to analyze filter_platform: Platform to filter (default: 'musora_app') Returns: pd.DataFrame: Sentiment distribution per demographic group """ # Filter for specified platform only if filter_platform and 'platform' in df.columns: df_filtered = df[df['platform'] == filter_platform].copy() else: df_filtered = df.copy() if df_filtered.empty or demographic_field not in df_filtered.columns: return pd.DataFrame() # Remove 'Unknown' and null values df_filtered = df_filtered[ (df_filtered[demographic_field].notna()) & (df_filtered[demographic_field] != 'Unknown') ] if df_filtered.empty: return pd.DataFrame() # Group by demographic field and sentiment sentiment_by_demo = df_filtered.groupby( [demographic_field, 'sentiment_polarity'], as_index=False ).size().rename(columns={'size': 'count'}) # Calculate percentage within each demographic group sentiment_by_demo['percentage'] = sentiment_by_demo.groupby(demographic_field)['count'].transform( lambda x: (x / x.sum() * 100).round(2) ) return sentiment_by_demo @staticmethod def get_top_timezones(df, top_n=15, filter_platform='musora_app'): """ Get top N timezones with most comments Args: df: Sentiment dataframe with timezone field top_n: Number of top timezones to return filter_platform: Platform to filter (default: 'musora_app') Returns: pd.DataFrame: Top timezones with counts """ return SentimentDataProcessor.get_demographics_distribution( df, 'timezone', filter_platform ).head(top_n) @staticmethod def get_timezone_regions_distribution(df, filter_platform='musora_app'): """ Get distribution of timezone regions Args: df: Sentiment dataframe with timezone_region field filter_platform: Platform to filter (default: 'musora_app') Returns: pd.DataFrame: Region distribution with counts """ return SentimentDataProcessor.get_demographics_distribution( df, 'timezone_region', filter_platform ) @staticmethod def get_experience_level_distribution(df, filter_platform='musora_app', use_groups=False): """ Get distribution of experience levels Args: df: Sentiment dataframe with experience fields filter_platform: Platform to filter (default: 'musora_app') use_groups: If True, use grouped experience levels, otherwise use raw values Returns: pd.DataFrame: Experience distribution """ field = 'experience_group' if use_groups else 'experience_level' return SentimentDataProcessor.get_demographics_distribution( df, field, filter_platform ) @staticmethod def get_demographics_summary(df, filter_platform='musora_app'): """ Get summary statistics for demographic data Args: df: Sentiment dataframe with demographic fields filter_platform: Platform to filter (default: 'musora_app') Returns: dict: Summary statistics """ # Filter for specified platform only if filter_platform and 'platform' in df.columns: df_filtered = df[df['platform'] == filter_platform].copy() else: df_filtered = df.copy() if df_filtered.empty: return { 'total_comments': 0, 'users_with_demographics': 0, 'avg_age': None, 'most_common_age_group': 'Unknown', 'most_common_region': 'Unknown', 'avg_experience': None } # Remove records without demographic data df_with_demo = df_filtered[ (df_filtered['age'].notna()) | (df_filtered['timezone'].notna()) | (df_filtered['experience_level'].notna()) ].copy() summary = { 'total_comments': len(df_filtered), 'users_with_demographics': len(df_with_demo), 'coverage_percentage': round(len(df_with_demo) / len(df_filtered) * 100, 2) if len(df_filtered) > 0 else 0 } # Age statistics if 'age' in df_with_demo.columns: valid_ages = df_with_demo['age'].dropna() summary['avg_age'] = round(valid_ages.mean(), 1) if len(valid_ages) > 0 else None age_groups = df_with_demo['age_group'].value_counts() summary['most_common_age_group'] = age_groups.index[0] if len(age_groups) > 0 else 'Unknown' # Timezone statistics if 'timezone_region' in df_with_demo.columns: regions = df_with_demo[df_with_demo['timezone_region'] != 'Unknown']['timezone_region'].value_counts() summary['most_common_region'] = regions.index[0] if len(regions) > 0 else 'Unknown' # Experience statistics if 'experience_level' in df_with_demo.columns: valid_exp = df_with_demo['experience_level'].dropna() summary['avg_experience'] = round(valid_exp.mean(), 2) if len(valid_exp) > 0 else None exp_groups = df_with_demo['experience_group'].value_counts() summary['most_common_experience'] = exp_groups.index[0] if len(exp_groups) > 0 else 'Unknown' return summary