Danialebrat's picture
Deploying sentiment analysis project
9858829
"""
Data processing utilities for sentiment analysis
Handles aggregation, grouping, and transformation operations
"""
import pandas as pd
import numpy as np
from typing import List, Dict, Tuple
class SentimentDataProcessor:
"""
Processes sentiment data for visualization
"""
@staticmethod
def aggregate_by_dimensions(df, group_by_cols, agg_cols=None):
"""
Aggregate data by specified dimensions
Args:
df: Sentiment dataframe
group_by_cols: List of columns to group by
agg_cols: Dictionary of columns and aggregation functions
Returns:
pd.DataFrame: Aggregated dataframe
"""
if agg_cols is None:
agg_cols = {
'comment_sk': 'count',
'requires_reply': 'sum'
}
return df.groupby(group_by_cols, as_index=False).agg(agg_cols)
@staticmethod
def get_sentiment_distribution(df, group_by=None):
"""
Calculate sentiment distribution
Args:
df: Sentiment dataframe
group_by: Optional column(s) to group by
Returns:
pd.DataFrame: Sentiment distribution
"""
if group_by:
# Group by specified columns and sentiment
if isinstance(group_by, str):
group_by = [group_by]
sentiment_counts = df.groupby(
group_by + ['sentiment_polarity'],
as_index=False
).size().rename(columns={'size': 'count'})
# Calculate percentages within each group
sentiment_counts['percentage'] = sentiment_counts.groupby(group_by)['count'].transform(
lambda x: (x / x.sum() * 100).round(2)
)
else:
# Overall sentiment distribution
sentiment_counts = df['sentiment_polarity'].value_counts().reset_index()
sentiment_counts.columns = ['sentiment_polarity', 'count']
sentiment_counts['percentage'] = (
sentiment_counts['count'] / sentiment_counts['count'].sum() * 100
).round(2)
return sentiment_counts
@staticmethod
def get_intent_distribution(df, group_by=None):
"""
Calculate intent distribution (handles multi-label)
Args:
df: Sentiment dataframe
group_by: Optional column(s) to group by
Returns:
pd.DataFrame: Intent distribution
"""
# Explode intents (split comma-separated values)
df_exploded = df.copy()
df_exploded['intent'] = df_exploded['intent'].str.split(',')
df_exploded = df_exploded.explode('intent')
df_exploded['intent'] = df_exploded['intent'].str.strip()
if group_by:
# Group by specified columns and intent
if isinstance(group_by, str):
group_by = [group_by]
intent_counts = df_exploded.groupby(
group_by + ['intent'],
as_index=False
).size().rename(columns={'size': 'count'})
# Calculate percentages within each group
intent_counts['percentage'] = intent_counts.groupby(group_by)['count'].transform(
lambda x: (x / x.sum() * 100).round(2)
)
else:
# Overall intent distribution
intent_counts = df_exploded['intent'].value_counts().reset_index()
intent_counts.columns = ['intent', 'count']
intent_counts['percentage'] = (
intent_counts['count'] / intent_counts['count'].sum() * 100
).round(2)
return intent_counts
@staticmethod
def get_content_summary(df):
"""
Get summary statistics for each content
Args:
df: Sentiment dataframe
Returns:
pd.DataFrame: Content summary with statistics
"""
# Group by content (dropna=False to include records with NULL permalink_url, e.g., YouTube)
content_summary = df.groupby(['content_sk', 'content_description', 'permalink_url'], dropna=False).agg({
'comment_sk': 'count',
'requires_reply': 'sum',
'sentiment_polarity': lambda x: x.mode()[0] if len(x.mode()) > 0 else 'unknown'
}).reset_index()
content_summary.columns = [
'content_sk', 'content_description', 'permalink_url',
'total_comments', 'reply_required_count', 'dominant_sentiment'
]
# Calculate negative sentiment percentage for each content
negative_sentiments = ['negative', 'very_negative']
content_negative = df[df['sentiment_polarity'].isin(negative_sentiments)].groupby(
'content_sk'
).size().reset_index(name='negative_count')
content_summary = content_summary.merge(content_negative, on='content_sk', how='left')
content_summary['negative_count'] = content_summary['negative_count'].fillna(0)
content_summary['negative_percentage'] = (
content_summary['negative_count'] / content_summary['total_comments'] * 100
).round(2)
# Calculate severity score (balances percentage and volume)
# Formula: negative_percentage * sqrt(total_comments)
# This gives weight to both high negative % and high comment volume
content_summary['severity_score'] = (
content_summary['negative_percentage'] *
(content_summary['total_comments'] ** 0.5)
).round(2)
return content_summary
@staticmethod
def get_top_poor_sentiment_contents(df, top_n=10, min_comments=1, sort_by='severity_score'):
"""
Get contents with highest poor sentiment based on selected criteria
Args:
df: Sentiment dataframe
top_n: Number of top contents to return
min_comments: Minimum number of comments a content must have to be included
sort_by: Sorting criteria - 'severity_score', 'negative_percentage', 'negative_count', 'total_comments'
Returns:
pd.DataFrame: Top contents with poor sentiment
"""
content_summary = SentimentDataProcessor.get_content_summary(df)
# Filter by minimum comments
content_summary = content_summary[content_summary['total_comments'] >= min_comments]
# Determine sort columns based on sort_by parameter
if sort_by == 'severity_score':
# Sort by severity score (balanced), then by negative percentage as tie-breaker
sort_columns = ['severity_score', 'negative_percentage']
elif sort_by == 'negative_percentage':
# Sort by negative percentage, then by total comments
sort_columns = ['negative_percentage', 'total_comments']
elif sort_by == 'negative_count':
# Sort by absolute negative count, then by negative percentage
sort_columns = ['negative_count', 'negative_percentage']
elif sort_by == 'total_comments':
# Sort by total comments volume
sort_columns = ['total_comments', 'negative_count']
else:
# Default to severity score
sort_columns = ['severity_score', 'negative_percentage']
# Sort and get top N
top_poor = content_summary.sort_values(
by=sort_columns,
ascending=[False, False]
).head(top_n)
return top_poor
@staticmethod
def get_comments_requiring_reply(df):
"""
Get all comments that require reply
Args:
df: Sentiment dataframe
Returns:
pd.DataFrame: Comments requiring reply
"""
reply_df = df[df['requires_reply'] == True].copy()
# Sort by timestamp (most recent first)
if 'comment_timestamp' in reply_df.columns:
reply_df = reply_df.sort_values('comment_timestamp', ascending=False)
return reply_df
@staticmethod
def get_platform_brand_summary(df):
"""
Get summary statistics by platform and brand
Args:
df: Sentiment dataframe
Returns:
pd.DataFrame: Platform and brand summary
"""
summary = df.groupby(['platform', 'brand']).agg({
'comment_sk': 'count',
'requires_reply': 'sum'
}).reset_index()
summary.columns = ['platform', 'brand', 'total_comments', 'reply_required']
# Add sentiment distribution
sentiment_dist = SentimentDataProcessor.get_sentiment_distribution(
df, group_by=['platform', 'brand']
)
# Pivot sentiment distribution
sentiment_pivot = sentiment_dist.pivot_table(
index=['platform', 'brand'],
columns='sentiment_polarity',
values='count',
fill_value=0
).reset_index()
# Merge with summary
summary = summary.merge(sentiment_pivot, on=['platform', 'brand'], how='left')
return summary
@staticmethod
def get_temporal_trends(df, freq='D'):
"""
Get temporal trends of sentiment over time
Args:
df: Sentiment dataframe
freq: Frequency for aggregation ('D'=daily, 'W'=weekly, 'M'=monthly)
Returns:
pd.DataFrame: Temporal sentiment trends
"""
if 'comment_timestamp' not in df.columns:
return pd.DataFrame()
df_temporal = df.copy()
df_temporal['date'] = pd.to_datetime(df_temporal['comment_timestamp']).dt.to_period(freq)
# Aggregate by date and sentiment
trends = df_temporal.groupby(['date', 'sentiment_polarity']).size().reset_index(name='count')
trends['date'] = trends['date'].dt.to_timestamp()
return trends
@staticmethod
def calculate_sentiment_score(df):
"""
Calculate weighted sentiment score
Args:
df: Sentiment dataframe
Returns:
float: Average sentiment score (-2 to +2)
"""
sentiment_weights = {
'very_negative': -2,
'negative': -1,
'neutral': 0,
'positive': 1,
'very_positive': 2
}
df['sentiment_score'] = df['sentiment_polarity'].map(sentiment_weights)
return df['sentiment_score'].mean()
@staticmethod
def get_language_distribution(df):
"""
Get distribution of detected languages
Args:
df: Sentiment dataframe
Returns:
pd.DataFrame: Language distribution
"""
if 'detected_language' not in df.columns:
return pd.DataFrame()
lang_dist = df['detected_language'].value_counts().reset_index()
lang_dist.columns = ['language', 'count']
lang_dist['percentage'] = (lang_dist['count'] / lang_dist['count'].sum() * 100).round(2)
return lang_dist
@staticmethod
def get_sentiment_filtered_contents(df, selected_sentiments=None, selected_intents=None,
top_n=10, min_comments=1, sort_by='severity_score'):
"""
Get contents filtered by selected sentiments and intents with dynamic sorting
Args:
df: Sentiment dataframe
selected_sentiments: List of sentiments to filter by (filters by dominant sentiment)
selected_intents: List of intents to filter by (content must have at least one comment with these intents)
top_n: Number of top contents to return
min_comments: Minimum number of comments a content must have
sort_by: Sorting criteria - 'severity_score', 'sentiment_percentage', 'sentiment_count', 'total_comments'
Returns:
pd.DataFrame: Filtered and sorted contents
"""
content_summary = SentimentDataProcessor.get_content_summary(df)
# Filter by minimum comments
content_summary = content_summary[content_summary['total_comments'] >= min_comments]
# If no sentiments selected, default to all sentiments
if not selected_sentiments:
selected_sentiments = df['sentiment_polarity'].unique().tolist()
# Filter by dominant sentiment
content_summary = content_summary[content_summary['dominant_sentiment'].isin(selected_sentiments)]
# Filter by intents if specified
if selected_intents:
# Get content_sks that have at least one comment with the selected intents
content_sks_with_intent = set()
for intent in selected_intents:
matching_contents = df[df['intent'].str.contains(intent, na=False, case=False)]['content_sk'].unique()
content_sks_with_intent.update(matching_contents)
content_summary = content_summary[content_summary['content_sk'].isin(content_sks_with_intent)]
# Calculate percentage and count for selected sentiments
sentiment_counts = df[df['sentiment_polarity'].isin(selected_sentiments)].groupby(
'content_sk'
).size().reset_index(name='selected_sentiment_count')
content_summary = content_summary.merge(sentiment_counts, on='content_sk', how='left')
content_summary['selected_sentiment_count'] = content_summary['selected_sentiment_count'].fillna(0)
content_summary['selected_sentiment_percentage'] = (
content_summary['selected_sentiment_count'] / content_summary['total_comments'] * 100
).round(2)
# Calculate dynamic severity score based on selected sentiments
content_summary['dynamic_severity_score'] = (
content_summary['selected_sentiment_percentage'] *
(content_summary['total_comments'] ** 0.5)
).round(2)
# Determine sort columns based on sort_by parameter
if sort_by == 'severity_score':
sort_columns = ['dynamic_severity_score', 'selected_sentiment_percentage']
elif sort_by == 'sentiment_percentage':
sort_columns = ['selected_sentiment_percentage', 'total_comments']
elif sort_by == 'sentiment_count':
sort_columns = ['selected_sentiment_count', 'selected_sentiment_percentage']
elif sort_by == 'total_comments':
sort_columns = ['total_comments', 'selected_sentiment_count']
else:
sort_columns = ['dynamic_severity_score', 'selected_sentiment_percentage']
# Sort and get top N
filtered_contents = content_summary.sort_values(
by=sort_columns,
ascending=[False, False]
).head(top_n)
return filtered_contents
@staticmethod
def get_demographics_distribution(df, demographic_field, filter_platform='musora_app'):
"""
Get distribution of a demographic field (only for specified platform)
Args:
df: Sentiment dataframe with demographic fields
demographic_field: Field to analyze ('age_group', 'timezone', 'timezone_region', 'experience_level', 'experience_group')
filter_platform: Platform to filter (default: 'musora_app')
Returns:
pd.DataFrame: Distribution with count and percentage
"""
# Filter for specified platform only
if filter_platform and 'platform' in df.columns:
df_filtered = df[df['platform'] == filter_platform].copy()
else:
df_filtered = df.copy()
if df_filtered.empty or demographic_field not in df_filtered.columns:
return pd.DataFrame()
# Remove 'Unknown' and null values
df_filtered = df_filtered[
(df_filtered[demographic_field].notna()) &
(df_filtered[demographic_field] != 'Unknown')
]
if df_filtered.empty:
return pd.DataFrame()
# Count distribution
distribution = df_filtered[demographic_field].value_counts().reset_index()
distribution.columns = [demographic_field, 'count']
# Calculate percentage
distribution['percentage'] = (
distribution['count'] / distribution['count'].sum() * 100
).round(2)
# Sort by count descending
distribution = distribution.sort_values('count', ascending=False)
return distribution
@staticmethod
def get_demographics_by_sentiment(df, demographic_field, filter_platform='musora_app'):
"""
Get sentiment distribution for each demographic group
Args:
df: Sentiment dataframe with demographic fields
demographic_field: Field to analyze
filter_platform: Platform to filter (default: 'musora_app')
Returns:
pd.DataFrame: Sentiment distribution per demographic group
"""
# Filter for specified platform only
if filter_platform and 'platform' in df.columns:
df_filtered = df[df['platform'] == filter_platform].copy()
else:
df_filtered = df.copy()
if df_filtered.empty or demographic_field not in df_filtered.columns:
return pd.DataFrame()
# Remove 'Unknown' and null values
df_filtered = df_filtered[
(df_filtered[demographic_field].notna()) &
(df_filtered[demographic_field] != 'Unknown')
]
if df_filtered.empty:
return pd.DataFrame()
# Group by demographic field and sentiment
sentiment_by_demo = df_filtered.groupby(
[demographic_field, 'sentiment_polarity'],
as_index=False
).size().rename(columns={'size': 'count'})
# Calculate percentage within each demographic group
sentiment_by_demo['percentage'] = sentiment_by_demo.groupby(demographic_field)['count'].transform(
lambda x: (x / x.sum() * 100).round(2)
)
return sentiment_by_demo
@staticmethod
def get_top_timezones(df, top_n=15, filter_platform='musora_app'):
"""
Get top N timezones with most comments
Args:
df: Sentiment dataframe with timezone field
top_n: Number of top timezones to return
filter_platform: Platform to filter (default: 'musora_app')
Returns:
pd.DataFrame: Top timezones with counts
"""
return SentimentDataProcessor.get_demographics_distribution(
df, 'timezone', filter_platform
).head(top_n)
@staticmethod
def get_timezone_regions_distribution(df, filter_platform='musora_app'):
"""
Get distribution of timezone regions
Args:
df: Sentiment dataframe with timezone_region field
filter_platform: Platform to filter (default: 'musora_app')
Returns:
pd.DataFrame: Region distribution with counts
"""
return SentimentDataProcessor.get_demographics_distribution(
df, 'timezone_region', filter_platform
)
@staticmethod
def get_experience_level_distribution(df, filter_platform='musora_app', use_groups=False):
"""
Get distribution of experience levels
Args:
df: Sentiment dataframe with experience fields
filter_platform: Platform to filter (default: 'musora_app')
use_groups: If True, use grouped experience levels, otherwise use raw values
Returns:
pd.DataFrame: Experience distribution
"""
field = 'experience_group' if use_groups else 'experience_level'
return SentimentDataProcessor.get_demographics_distribution(
df, field, filter_platform
)
@staticmethod
def get_demographics_summary(df, filter_platform='musora_app'):
"""
Get summary statistics for demographic data
Args:
df: Sentiment dataframe with demographic fields
filter_platform: Platform to filter (default: 'musora_app')
Returns:
dict: Summary statistics
"""
# Filter for specified platform only
if filter_platform and 'platform' in df.columns:
df_filtered = df[df['platform'] == filter_platform].copy()
else:
df_filtered = df.copy()
if df_filtered.empty:
return {
'total_comments': 0,
'users_with_demographics': 0,
'avg_age': None,
'most_common_age_group': 'Unknown',
'most_common_region': 'Unknown',
'avg_experience': None
}
# Remove records without demographic data
df_with_demo = df_filtered[
(df_filtered['age'].notna()) |
(df_filtered['timezone'].notna()) |
(df_filtered['experience_level'].notna())
].copy()
summary = {
'total_comments': len(df_filtered),
'users_with_demographics': len(df_with_demo),
'coverage_percentage': round(len(df_with_demo) / len(df_filtered) * 100, 2) if len(df_filtered) > 0 else 0
}
# Age statistics
if 'age' in df_with_demo.columns:
valid_ages = df_with_demo['age'].dropna()
summary['avg_age'] = round(valid_ages.mean(), 1) if len(valid_ages) > 0 else None
age_groups = df_with_demo['age_group'].value_counts()
summary['most_common_age_group'] = age_groups.index[0] if len(age_groups) > 0 else 'Unknown'
# Timezone statistics
if 'timezone_region' in df_with_demo.columns:
regions = df_with_demo[df_with_demo['timezone_region'] != 'Unknown']['timezone_region'].value_counts()
summary['most_common_region'] = regions.index[0] if len(regions) > 0 else 'Unknown'
# Experience statistics
if 'experience_level' in df_with_demo.columns:
valid_exp = df_with_demo['experience_level'].dropna()
summary['avg_experience'] = round(valid_exp.mean(), 2) if len(valid_exp) > 0 else None
exp_groups = df_with_demo['experience_group'].value_counts()
summary['most_common_experience'] = exp_groups.index[0] if len(exp_groups) > 0 else 'Unknown'
return summary