"""
Enhanced visualization components for the advanced analytics dashboard.
Creates interactive charts, plots, network graphs, Sankey diagrams, and visual analytics.
"""
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import seaborn as sns
import matplotlib.pyplot as plt
from wordcloud import WordCloud
import streamlit as st
from typing import Dict, List, Any, Tuple, Optional
from datetime import datetime, timedelta
import base64
from io import BytesIO
import networkx as nx
from collections import defaultdict
class KPIEngine:
"""Handles KPI calculations and display for the dashboard header."""
@staticmethod
def calculate_kpis(df: pd.DataFrame, areas_of_improvement: pd.DataFrame,
strength_anchors: pd.DataFrame) -> Dict[str, Any]:
"""
Calculate all KPIs for the dashboard header.
Args:
df: Processed dataframe
areas_of_improvement: Problem areas dataframe
strength_anchors: Strength areas dataframe
Returns:
Dictionary with all KPI values
"""
total_reviews = len(df)
# Sentiment percentages
positive_count = (df['overall_sentiment'] == 'Positive').sum()
negative_count = (df['overall_sentiment'] == 'Negative').sum()
neutral_count = (df['overall_sentiment'] == 'Neutral').sum()
positive_pct = (positive_count / total_reviews * 100) if total_reviews > 0 else 0
negative_pct = (negative_count / total_reviews * 100) if total_reviews > 0 else 0
neutral_pct = (neutral_count / total_reviews * 100) if total_reviews > 0 else 0
# Problem areas and strength anchors counts
problem_areas_count = len(areas_of_improvement)
strength_anchors_count = len(strength_anchors)
# Language distribution
languages = df['detected_language'].value_counts().to_dict()
# Intent distribution
intent_distribution = df['intent'].value_counts().to_dict()
complaint_pct = (df['intent'] == 'complaint').mean() * 100
return {
'total_reviews': total_reviews,
'positive_pct': round(positive_pct, 1),
'negative_pct': round(negative_pct, 1),
'neutral_pct': round(neutral_pct, 1),
'positive_count': positive_count,
'negative_count': negative_count,
'neutral_count': neutral_count,
'problem_areas_count': problem_areas_count,
'strength_anchors_count': strength_anchors_count,
'languages': languages,
'intent_distribution': intent_distribution,
'complaint_pct': round(complaint_pct, 1)
}
@staticmethod
def create_kpi_header(kpis: Dict[str, Any]) -> None:
"""
Create and display the KPI header section.
Args:
kpis: Dictionary containing all KPI values
"""
# Main KPI Row
col1, col2, col3, col4, col5 = st.columns(5)
with col1:
st.metric(
label="📊 Total Reviews",
value=f"{kpis['total_reviews']:,}",
delta=None
)
with col2:
st.metric(
label="😊 Positive %",
value=f"{kpis['positive_pct']}%",
delta=f"{kpis['positive_count']} reviews"
)
with col3:
st.metric(
label="😞 Negative %",
value=f"{kpis['negative_pct']}%",
delta=f"{kpis['negative_count']} reviews"
)
with col4:
st.metric(
label="🔴 Problem Areas",
value=kpis['problem_areas_count'],
delta="Aspects needing attention"
)
with col5:
st.metric(
label="🟢 Strength Anchors",
value=kpis['strength_anchors_count'],
delta="Positive aspects"
)
class AdvancedVisualizationEngine:
"""Enhanced visualization engine with advanced charts and network analysis."""
def __init__(self):
self.color_schemes = {
'sentiment': {'Positive': '#2E8B57', 'Negative': '#DC143C', 'Neutral': '#708090'},
'intent': px.colors.qualitative.Set3,
'default': px.colors.qualitative.Plotly,
'priority': px.colors.sequential.Reds,
'strength': px.colors.sequential.Greens
}
def create_dual_ranking_tables(self, areas_of_improvement: pd.DataFrame,
strength_anchors: pd.DataFrame) -> None:
"""
Create and display the dual ranking tables.
Args:
areas_of_improvement: Problem areas dataframe
strength_anchors: Strength areas dataframe
"""
col1, col2 = st.columns(2)
with col1:
st.subheader("🔴 Areas of Improvement")
if len(areas_of_improvement) > 0:
# Format the dataframe for display
display_improvement = areas_of_improvement.copy()
display_improvement['Negativity %'] = display_improvement['negativity_pct'].astype(str) + '%'
display_improvement['Intent Severity'] = display_improvement['intent_severity']
display_improvement['Frequency'] = display_improvement['frequency']
display_improvement['Priority Score'] = display_improvement['priority_score']
# Select columns for display
display_cols = ['aspect', 'Negativity %', 'Intent Severity', 'Frequency', 'Priority Score']
display_improvement = display_improvement[display_cols].rename(columns={'aspect': 'Aspect'})
st.dataframe(
display_improvement,
use_container_width=True,
hide_index=True
)
else:
st.info("No significant problem areas identified.")
with col2:
st.subheader("🟢 Strength Anchors")
if len(strength_anchors) > 0:
# Format the dataframe for display
display_strength = strength_anchors.copy()
display_strength['Positivity %'] = display_strength['positivity_pct'].astype(str) + '%'
display_strength['Intent Type'] = display_strength['intent_type']
display_strength['Frequency'] = display_strength['frequency']
display_strength['Strength Score'] = display_strength['strength_score']
# Select columns for display
display_cols = ['aspect', 'Positivity %', 'Intent Type', 'Frequency', 'Strength Score']
display_strength = display_strength[display_cols].rename(columns={'aspect': 'Aspect'})
st.dataframe(
display_strength,
use_container_width=True,
hide_index=True
)
else:
st.info("No significant strength anchors identified.")
def create_aspect_network_graph(self, aspect_network: nx.Graph) -> go.Figure:
"""
Create interactive network graph showing aspect relationships.
Args:
aspect_network: NetworkX graph with aspect co-occurrence data
Returns:
Plotly figure object
"""
if len(aspect_network.nodes()) == 0:
fig = go.Figure()
fig.add_annotation(
text="No aspect relationships found",
xref="paper", yref="paper",
x=0.5, y=0.5, xanchor='center', yanchor='middle',
showarrow=False, font_size=16
)
fig.update_layout(title="Aspect Co-occurrence Network")
return fig
# Calculate positions using spring layout
pos = nx.spring_layout(aspect_network, k=3, iterations=50)
# Extract node information
node_x = []
node_y = []
node_text = []
node_size = []
node_color = []
node_info = []
for node in aspect_network.nodes():
x, y = pos[node]
node_x.append(x)
node_y.append(y)
# Get node attributes
freq = aspect_network.nodes[node].get('frequency', 1)
sentiment_score = aspect_network.nodes[node].get('sentiment_score', 0)
color = aspect_network.nodes[node].get('color', 'gray')
positive_pct = aspect_network.nodes[node].get('positive_pct', 0) * 100
negative_pct = aspect_network.nodes[node].get('negative_pct', 0) * 100
node_text.append(node)
node_size.append(max(20, freq * 5)) # Size based on frequency
# Color based on sentiment
if color == 'green':
node_color.append('#2E8B57')
elif color == 'red':
node_color.append('#DC143C')
else:
node_color.append('#708090')
node_info.append(f"Aspect: {node}
" +
f"Frequency: {freq}
" +
f"Positive: {positive_pct:.1f}%
" +
f"Negative: {negative_pct:.1f}%")
# Extract edge information
edge_x = []
edge_y = []
edge_weights = []
for edge in aspect_network.edges():
x0, y0 = pos[edge[0]]
x1, y1 = pos[edge[1]]
edge_x.extend([x0, x1, None])
edge_y.extend([y0, y1, None])
weight = aspect_network.edges[edge].get('weight', 1)
edge_weights.append(weight)
# Create edge trace
edge_trace = go.Scatter(
x=edge_x, y=edge_y,
line=dict(width=2, color='#888'),
hoverinfo='none',
mode='lines'
)
# Create node trace
node_trace = go.Scatter(
x=node_x, y=node_y,
mode='markers+text',
hoverinfo='text',
text=node_text,
hovertext=node_info,
textposition="middle center",
marker=dict(
size=node_size,
color=node_color,
line=dict(width=2, color='white')
)
)
# Create figure
fig = go.Figure(data=[edge_trace, node_trace],
layout=go.Layout(
title=dict(text="Aspect Co-occurrence Network", font=dict(size=16)),
showlegend=False,
hovermode='closest',
margin=dict(b=20,l=5,r=5,t=40),
annotations=[ dict(
text="Node size = frequency, Color = sentiment (green=positive, red=negative)",
showarrow=False,
xref="paper", yref="paper",
x=0.005, y=-0.002,
xanchor='left', yanchor='bottom',
font=dict(color='gray', size=10)
)],
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
height=600
))
return fig
def create_intent_aspect_sankey(self, df: pd.DataFrame) -> go.Figure:
"""
Create Sankey diagram showing Intent → Aspect → Sentiment flow.
Args:
df: Processed dataframe
Returns:
Plotly figure object
"""
# Prepare data for Sankey diagram
sankey_data = []
for idx, row in df.iterrows():
aspects = row['aspects'] if isinstance(row['aspects'], list) else []
sentiments = row['aspect_sentiments'] if isinstance(row['aspect_sentiments'], list) else []
intent = row['intent']
for aspect, sentiment in zip(aspects, sentiments):
sankey_data.append({
'intent': intent,
'aspect': aspect,
'sentiment': sentiment
})
if not sankey_data:
fig = go.Figure()
fig.add_annotation(
text="No data available for Sankey diagram",
xref="paper", yref="paper",
x=0.5, y=0.5, xanchor='center', yanchor='middle',
showarrow=False, font_size=16
)
fig.update_layout(title="Intent → Aspect → Sentiment Flow")
return fig
sankey_df = pd.DataFrame(sankey_data)
# Create node lists
intents = sorted(sankey_df['intent'].unique())
aspects = sorted(sankey_df['aspect'].unique())
sentiments = sorted(sankey_df['sentiment'].unique())
# Create labels and indices
all_labels = intents + aspects + sentiments
label_to_idx = {label: idx for idx, label in enumerate(all_labels)}
# Create flows
flows = []
# Intent → Aspect flows
intent_aspect_flows = sankey_df.groupby(['intent', 'aspect']).size().reset_index(name='count')
for _, row in intent_aspect_flows.iterrows():
flows.append({
'source': label_to_idx[row['intent']],
'target': label_to_idx[row['aspect']],
'value': row['count']
})
# Aspect → Sentiment flows
aspect_sentiment_flows = sankey_df.groupby(['aspect', 'sentiment']).size().reset_index(name='count')
for _, row in aspect_sentiment_flows.iterrows():
flows.append({
'source': label_to_idx[row['aspect']],
'target': label_to_idx[row['sentiment']],
'value': row['count']
})
# Create colors
intent_colors = ['rgba(255, 127, 14, 0.8)'] * len(intents)
aspect_colors = ['rgba(31, 119, 180, 0.8)'] * len(aspects)
sentiment_colors = []
for sentiment in sentiments:
if sentiment == 'Positive':
sentiment_colors.append('rgba(46, 139, 87, 0.8)')
elif sentiment == 'Negative':
sentiment_colors.append('rgba(220, 20, 60, 0.8)')
else:
sentiment_colors.append('rgba(112, 128, 144, 0.8)')
node_colors = intent_colors + aspect_colors + sentiment_colors
# Create Sankey diagram
fig = go.Figure(data=[go.Sankey(
node=dict(
pad=15,
thickness=20,
line=dict(color="black", width=0.5),
label=all_labels,
color=node_colors
),
link=dict(
source=[flow['source'] for flow in flows],
target=[flow['target'] for flow in flows],
value=[flow['value'] for flow in flows]
)
)])
fig.update_layout(
title=dict(text="Intent → Aspect → Sentiment Flow", font=dict(size=12)),
font_size=12,
height=600
)
return fig
def create_enhanced_timeline_chart(self, df: pd.DataFrame,
annotations: Optional[List[Dict]] = None) -> go.Figure:
"""
Create enhanced timeline chart with annotation support.
Args:
df: Processed dataframe with date and sentiment columns
annotations: List of event annotations to add to chart
Returns:
Plotly figure object
"""
try:
# Ensure we have data
if df.empty:
fig = go.Figure()
fig.add_annotation(
text="No data available for timeline chart",
xref="paper", yref="paper",
x=0.5, y=0.5, xanchor='center', yanchor='middle',
showarrow=False, font_size=16
)
fig.update_layout(title="Sentiment Trends Over Time")
return fig
# Ensure date column is properly formatted
df_copy = df.copy()
df_copy['date'] = pd.to_datetime(df_copy['date'], errors='coerce')
# Remove any rows with invalid dates
df_copy = df_copy.dropna(subset=['date'])
if df_copy.empty:
fig = go.Figure()
fig.add_annotation(
text="No valid dates found in data",
xref="paper", yref="paper",
x=0.5, y=0.5, xanchor='center', yanchor='middle',
showarrow=False, font_size=16
)
fig.update_layout(title="Sentiment Trends Over Time")
return fig
# Group by date and sentiment
timeline_data = df_copy.groupby([df_copy['date'].dt.date, 'overall_sentiment']).size().reset_index(name='count')
timeline_data['date'] = pd.to_datetime(timeline_data['date'])
# Sort by date for proper line chart
timeline_data = timeline_data.sort_values('date')
# Create figure manually
fig = go.Figure()
# Add traces for each sentiment
sentiments = timeline_data['overall_sentiment'].unique()
for sentiment in sentiments:
sentiment_data = timeline_data[timeline_data['overall_sentiment'] == sentiment].copy()
fig.add_trace(go.Scatter(
x=sentiment_data['date'],
y=sentiment_data['count'],
mode='lines+markers',
name=sentiment,
line=dict(
color=self.color_schemes['sentiment'].get(sentiment, '#999999'),
width=2
),
marker=dict(size=6),
hovertemplate=f'{sentiment}
Date: %{{x|%Y-%m-%d}}
Count: %{{y}}'
))
# Add simple text annotations (no arrows or complex shapes)
if annotations and len(timeline_data) > 0:
max_count = timeline_data['count'].max()
y_range = max_count * 0.3 # Space for annotations
for i, annotation in enumerate(annotations):
try:
# Convert annotation date
if isinstance(annotation['date'], str):
ann_date = pd.to_datetime(annotation['date'])
else:
ann_date = pd.to_datetime(annotation['date'])
# Add simple text annotation
fig.add_annotation(
x=ann_date,
y=max_count + y_range * (0.2 + i * 0.3),
text=f"📌 {annotation['text']}",
showarrow=False,
bgcolor="rgba(255,255,255,0.9)",
bordercolor="purple",
borderwidth=1,
font=dict(size=10, color="purple"),
xanchor="center"
)
except Exception:
# Skip problematic annotations silently
continue
fig.update_layout(
title="Sentiment Trends Over Time",
xaxis_title="Date",
yaxis_title="Number of Reviews",
template='plotly_white',
height=500,
showlegend=True,
xaxis=dict(
tickangle=45,
type='date'
),
yaxis=dict(
rangemode='tozero'
),
hovermode='x unified'
)
return fig
except Exception as e:
# Fallback: return empty chart with error message
fig = go.Figure()
fig.add_annotation(
text=f"Error creating timeline chart: {str(e)}",
xref="paper", yref="paper",
x=0.5, y=0.5, xanchor='center', yanchor='middle',
showarrow=False, font_size=14, font_color="red"
)
fig.update_layout(title="Sentiment Trends Over Time")
return fig
def create_regional_language_analysis(self, df: pd.DataFrame) -> go.Figure:
"""
Create comprehensive language-wise sentiment and intent analysis.
Args:
df: Processed dataframe
Returns:
Plotly figure object with subplots
"""
# Create subplots
fig = make_subplots(
rows=2, cols=2,
subplot_titles=('Language Distribution', 'Sentiment by Language',
'Intent by Language', 'Language Trends'),
specs=[[{"type": "pie"}, {"type": "bar"}],
[{"type": "bar"}, {"type": "scatter"}]]
)
# 1. Language Distribution (Pie Chart)
lang_counts = df['detected_language'].value_counts()
fig.add_trace(
go.Pie(labels=lang_counts.index, values=lang_counts.values, name="Languages"),
row=1, col=1
)
# 2. Sentiment by Language (Stacked Bar)
sentiment_lang = df.groupby(['detected_language', 'overall_sentiment']).size().unstack(fill_value=0)
for sentiment in sentiment_lang.columns:
fig.add_trace(
go.Bar(
name=sentiment,
x=sentiment_lang.index,
y=sentiment_lang[sentiment],
marker_color=self.color_schemes['sentiment'].get(sentiment, '#808080')
),
row=1, col=2
)
# 3. Intent by Language (Stacked Bar)
intent_lang = df.groupby(['detected_language', 'intent']).size().unstack(fill_value=0)
for intent in intent_lang.columns:
fig.add_trace(
go.Bar(
name=intent,
x=intent_lang.index,
y=intent_lang[intent],
showlegend=False
),
row=2, col=1
)
# 4. Language Trends Over Time
df_copy = df.copy()
df_copy['date'] = pd.to_datetime(df_copy['date'])
df_copy['date_str'] = df_copy['date'].dt.strftime('%Y-%m-%d')
daily_lang = df_copy.groupby(['date_str', 'detected_language']).size().unstack(fill_value=0)
daily_lang.index = pd.to_datetime(daily_lang.index)
for lang in daily_lang.columns:
fig.add_trace(
go.Scatter(
x=daily_lang.index,
y=daily_lang[lang],
mode='lines',
name=f'{lang} trend',
showlegend=False
),
row=2, col=2
)
fig.update_layout(
height=800,
title=dict(text="Regional/Language Analysis Dashboard", font=dict(size=16)),
showlegend=True
)
return fig
def create_alert_dashboard(self, sentiment_alerts: List[Dict[str, Any]]) -> None:
"""
Create alert dashboard showing sentiment spikes.
Args:
sentiment_alerts: List of sentiment spike alerts
"""
st.subheader("🚨 Sentiment Alerts")
if not sentiment_alerts:
st.info("No sentiment spikes detected in recent data.")
return
# Display alerts
for i, alert in enumerate(sentiment_alerts[:5]): # Show top 5 alerts
severity_color = "🔴" if alert['alert_severity'] == 'high' else "🟡"
with st.expander(f"{severity_color} {alert['aspect']} - {alert['spike_magnitude']}% increase"):
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Recent Avg Negative", f"{alert['recent_avg_negative']:.1f}")
with col2:
st.metric("Previous Avg Negative", f"{alert['previous_avg_negative']:.1f}")
with col3:
st.metric("Spike Magnitude", f"{alert['spike_magnitude']}%")
st.warning(f"Aspect '{alert['aspect']}' showing {alert['alert_severity']} severity spike in negative sentiment.")
def create_impact_simulation_tool(self, df: pd.DataFrame) -> None:
"""
Create what-if analysis tool for aspect improvements.
Args:
df: Processed dataframe
"""
st.subheader("🎯 Impact Simulation")
st.write("Simulate the impact of fixing specific aspects on overall sentiment.")
# Get list of negative aspects
negative_aspects = []
for idx, row in df.iterrows():
aspects = row['aspects'] if isinstance(row['aspects'], list) else []
sentiments = row['aspect_sentiments'] if isinstance(row['aspect_sentiments'], list) else []
for aspect, sentiment in zip(aspects, sentiments):
if sentiment == 'Negative':
negative_aspects.append(aspect)
if not negative_aspects:
st.info("No negative aspects found for simulation.")
return
unique_negative_aspects = list(set(negative_aspects))
# Aspect selection
selected_aspects = st.multiselect(
"Select aspects to 'fix' (simulate removing negative reviews):",
unique_negative_aspects,
default=unique_negative_aspects[:3] if len(unique_negative_aspects) >= 3 else unique_negative_aspects
)
if selected_aspects:
# Calculate current sentiment distribution
current_sentiment = df['overall_sentiment'].value_counts()
current_positive_pct = (current_sentiment.get('Positive', 0) / len(df)) * 100
current_negative_pct = (current_sentiment.get('Negative', 0) / len(df)) * 100
# Simulate fixing aspects (remove reviews with selected negative aspects)
df_simulated = df.copy()
for aspect in selected_aspects:
# Remove reviews that mention this aspect negatively
mask = df_simulated.apply(lambda row: not (
isinstance(row['aspects'], list) and
isinstance(row['aspect_sentiments'], list) and
any(asp == aspect and sent == 'Negative'
for asp, sent in zip(row['aspects'], row['aspect_sentiments']))
), axis=1)
df_simulated = df_simulated[mask]
# Calculate new sentiment distribution
if len(df_simulated) > 0:
new_sentiment = df_simulated['overall_sentiment'].value_counts()
new_positive_pct = (new_sentiment.get('Positive', 0) / len(df_simulated)) * 100
new_negative_pct = (new_sentiment.get('Negative', 0) / len(df_simulated)) * 100
# Display results
col1, col2 = st.columns(2)
with col1:
st.metric("Current Positive %", f"{current_positive_pct:.1f}%")
st.metric("Current Negative %", f"{current_negative_pct:.1f}%")
with col2:
positive_change = new_positive_pct - current_positive_pct
negative_change = new_negative_pct - current_negative_pct
st.metric(
"Simulated Positive %",
f"{new_positive_pct:.1f}%",
delta=f"{positive_change:+.1f}%"
)
st.metric(
"Simulated Negative %",
f"{new_negative_pct:.1f}%",
delta=f"{negative_change:+.1f}%"
)
reviews_removed = len(df) - len(df_simulated)
st.info(f"Simulation removed {reviews_removed} negative reviews mentioning the selected aspects.")
def create_summary_sections(self, macro_summary: Dict[str, str],
micro_summaries: Dict[str, str]) -> None:
"""
Create and display macro and micro summary sections.
Args:
macro_summary: High-level insights
micro_summaries: Aspect-specific insights
"""
# Macro Summary
st.subheader("📊 Executive Summary")
for category, summary in macro_summary.items():
st.write(f"**{category.replace('_', ' ').title()}:** {summary}")
st.divider()
# Micro Summaries
st.subheader("🔍 Aspect-Level Insights")
if micro_summaries:
for aspect, summary in micro_summaries.items():
with st.expander(f"📌 {aspect.title()} Analysis"):
st.write(summary)
else:
st.info("No detailed aspect summaries available.")
class ExportEngine:
"""Handles export functionality for reports and insights."""
@staticmethod
def generate_pdf_report(df: pd.DataFrame, kpis: Dict[str, Any],
areas_of_improvement: pd.DataFrame,
strength_anchors: pd.DataFrame) -> bytes:
"""
Generate PDF report with key insights.
Args:
df: Processed dataframe
kpis: KPI dictionary
areas_of_improvement: Problem areas dataframe
strength_anchors: Strength areas dataframe
Returns:
PDF bytes
"""
# Placeholder for PDF generation
# In a real implementation, you would use libraries like reportlab or weasyprint
pdf_content = f"""
SENTIMENT ANALYSIS REPORT
========================
Executive Summary:
- Total Reviews: {kpis['total_reviews']}
- Positive Sentiment: {kpis['positive_pct']}%
- Negative Sentiment: {kpis['negative_pct']}%
- Problem Areas: {kpis['problem_areas_count']}
- Strength Anchors: {kpis['strength_anchors_count']}
Top Issues:
{areas_of_improvement[['aspect', 'priority_score']].to_string() if len(areas_of_improvement) > 0 else 'None'}
Top Strengths:
{strength_anchors[['aspect', 'strength_score']].to_string() if len(strength_anchors) > 0 else 'None'}
"""
return pdf_content.encode('utf-8')
@staticmethod
def generate_excel_export(df: pd.DataFrame, areas_of_improvement: pd.DataFrame,
strength_anchors: pd.DataFrame) -> bytes:
"""
Generate Excel export with multiple sheets.
Args:
df: Processed dataframe
areas_of_improvement: Problem areas dataframe
strength_anchors: Strength areas dataframe
Returns:
Excel bytes
"""
from io import BytesIO
import pandas as pd
output = BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
df.to_excel(writer, sheet_name='Raw Data', index=False)
areas_of_improvement.to_excel(writer, sheet_name='Problem Areas', index=False)
strength_anchors.to_excel(writer, sheet_name='Strengths', index=False)
return output.getvalue()
def create_timeline_chart(self, df: pd.DataFrame) -> go.Figure:
"""
Create timeline chart showing sentiment trends over time.
Args:
df: Processed dataframe with date and sentiment columns
Returns:
Plotly figure object
"""
# Ensure date column is properly formatted
df_copy = df.copy()
df_copy['date'] = pd.to_datetime(df_copy['date'])
# Group by date and sentiment - convert to string dates to avoid timestamp issues
df_copy['date_str'] = df_copy['date'].dt.strftime('%Y-%m-%d')
timeline_data = df_copy.groupby(['date_str', 'overall_sentiment']).size().reset_index(name='count')
# Convert date strings back to datetime objects for plotting
timeline_data['date'] = pd.to_datetime(timeline_data['date_str'])
# Create the line chart
fig = go.Figure()
# Add traces for each sentiment
for sentiment in timeline_data['overall_sentiment'].unique():
sentiment_data = timeline_data[timeline_data['overall_sentiment'] == sentiment]
fig.add_trace(go.Scatter(
x=sentiment_data['date'],
y=sentiment_data['count'],
mode='lines+markers',
name=sentiment,
line=dict(color=self.color_schemes['sentiment'].get(sentiment, '#999999')),
marker=dict(size=6)
))
fig.update_layout(
title="Sentiment Trends Over Time",
xaxis_title="Date",
yaxis_title="Number of Reviews",
hovermode='x unified',
template='plotly_white',
height=400,
showlegend=True
)
return fig
def create_sentiment_distribution(self, df: pd.DataFrame) -> go.Figure:
"""Create pie chart for sentiment distribution."""
sentiment_counts = df['overall_sentiment'].value_counts()
fig = go.Figure(data=[go.Pie(
labels=sentiment_counts.index,
values=sentiment_counts.values,
hole=.3,
marker_colors=[self.color_schemes['sentiment'].get(label, '#808080') for label in sentiment_counts.index]
)])
fig.update_traces(textposition='inside', textinfo='percent+label')
fig.update_layout(
title="Overall Sentiment Distribution",
template='plotly_white'
)
return fig
def create_intent_distribution(self, df: pd.DataFrame) -> go.Figure:
"""Create bar chart for intent distribution."""
intent_counts = df['intent'].value_counts()
fig = px.bar(
x=intent_counts.index,
y=intent_counts.values,
title="Intent Classification Distribution",
labels={'x': 'Intent Type', 'y': 'Number of Reviews'},
color=intent_counts.index,
color_discrete_sequence=self.color_schemes['intent']
)
fig.update_layout(
showlegend=False,
template='plotly_white'
)
return fig
def create_aspect_sentiment_heatmap(self, df: pd.DataFrame) -> go.Figure:
"""
Create heatmap showing sentiment distribution across aspects.
Args:
df: Processed dataframe with aspects and aspect_sentiments
Returns:
Plotly figure object
"""
# Extract all aspects and their sentiments
aspect_sentiment_data = []
for idx, row in df.iterrows():
aspects = row['aspects'] if isinstance(row['aspects'], list) else []
sentiments = row['aspect_sentiments'] if isinstance(row['aspect_sentiments'], list) else []
for aspect, sentiment in zip(aspects, sentiments):
aspect_sentiment_data.append({'aspect': aspect, 'sentiment': sentiment})
if not aspect_sentiment_data:
# Return empty heatmap if no aspect data
fig = go.Figure()
fig.add_annotation(
text="No aspect data available for heatmap",
xref="paper", yref="paper",
x=0.5, y=0.5, xanchor='center', yanchor='middle',
showarrow=False, font_size=16
)
fig.update_layout(title="Aspect-Sentiment Heatmap")
return fig
# Create dataframe and pivot table
aspect_df = pd.DataFrame(aspect_sentiment_data)
heatmap_data = aspect_df.groupby(['aspect', 'sentiment']).size().unstack(fill_value=0)
# Create heatmap
fig = go.Figure(data=go.Heatmap(
z=heatmap_data.values,
x=heatmap_data.columns,
y=heatmap_data.index,
colorscale='RdYlGn',
text=heatmap_data.values,
texttemplate="%{text}",
textfont={"size": 12},
))
fig.update_layout(
title="Aspect-Sentiment Correlation Heatmap",
xaxis_title="Sentiment",
yaxis_title="Aspects",
template='plotly_white'
)
return fig
def create_wordcloud(self, df: pd.DataFrame, sentiment_filter: str = None) -> str:
"""
Create word cloud from reviews.
Args:
df: Processed dataframe
sentiment_filter: Filter by sentiment ('Positive', 'Negative', 'Neutral')
Returns:
Base64 encoded image string
"""
# Filter data if sentiment filter is provided
if sentiment_filter:
filtered_df = df[df['overall_sentiment'] == sentiment_filter]
else:
filtered_df = df
# Combine all translated reviews
text = ' '.join(filtered_df['translated_review'].astype(str))
if not text.strip():
return None
# Generate word cloud
wordcloud = WordCloud(
width=800,
height=400,
background_color='white',
max_words=100,
colormap='viridis'
).generate(text)
# Convert to base64 for display
img = BytesIO()
wordcloud.to_image().save(img, format='PNG')
img.seek(0)
img_b64 = base64.b64encode(img.read()).decode()
return img_b64
def create_language_distribution(self, df: pd.DataFrame) -> go.Figure:
"""Create pie chart for language distribution."""
lang_counts = df['detected_language'].value_counts()
fig = go.Figure(data=[go.Pie(
labels=lang_counts.index,
values=lang_counts.values,
hole=.3
)])
fig.update_traces(textposition='inside', textinfo='percent+label')
fig.update_layout(
title="Language Distribution",
template='plotly_white'
)
return fig
def create_correlation_matrix(self, df: pd.DataFrame) -> go.Figure:
"""
Create correlation matrix for numerical relationships.
Args:
df: Processed dataframe
Returns:
Plotly figure object
"""
# Create numerical features for correlation
correlation_df = pd.DataFrame()
# Add sentiment as numerical
sentiment_mapping = {'Positive': 1, 'Neutral': 0, 'Negative': -1}
correlation_df['sentiment_score'] = df['overall_sentiment'].map(sentiment_mapping)
# Add intent as categorical numerical
intent_mapping = {intent: idx for idx, intent in enumerate(df['intent'].unique())}
correlation_df['intent_code'] = df['intent'].map(intent_mapping)
# Add review length
correlation_df['review_length'] = df['translated_review'].str.len()
# Add number of aspects
correlation_df['aspect_count'] = df['aspects'].apply(lambda x: len(x) if isinstance(x, list) else 0)
# Calculate correlation matrix
corr_matrix = correlation_df.corr()
# Create heatmap
fig = go.Figure(data=go.Heatmap(
z=corr_matrix.values,
x=corr_matrix.columns,
y=corr_matrix.columns,
colorscale='RdBu',
zmid=0,
text=np.round(corr_matrix.values, 2),
texttemplate="%{text}",
textfont={"size": 12},
))
fig.update_layout(
title="Feature Correlation Matrix",
template='plotly_white'
)
return fig
def create_aspect_frequency_chart(self, df: pd.DataFrame) -> go.Figure:
"""Create bar chart showing most frequent aspects."""
# Extract all aspects
all_aspects = []
for aspects in df['aspects']:
if isinstance(aspects, list):
all_aspects.extend(aspects)
if not all_aspects:
fig = go.Figure()
fig.add_annotation(
text="No aspects extracted from reviews",
xref="paper", yref="paper",
x=0.5, y=0.5, xanchor='center', yanchor='middle',
showarrow=False, font_size=16
)
fig.update_layout(title="Most Frequent Aspects")
return fig
# Count frequency
aspect_counts = pd.Series(all_aspects).value_counts().head(15)
fig = px.bar(
x=aspect_counts.values,
y=aspect_counts.index,
orientation='h',
title="Most Frequent Aspects (Top 15)",
labels={'x': 'Frequency', 'y': 'Aspects'}
)
fig.update_layout(
template='plotly_white',
height=500
)
return fig
def create_daily_volume_chart(self, df: pd.DataFrame) -> go.Figure:
"""Create chart showing daily review volume."""
df_copy = df.copy()
df_copy['date'] = pd.to_datetime(df_copy['date'])
df_copy['date_str'] = df_copy['date'].dt.strftime('%Y-%m-%d')
daily_counts = df_copy.groupby('date_str').size().reset_index(name='count')
daily_counts['date'] = pd.to_datetime(daily_counts['date_str'])
daily_counts.columns = ['date_str', 'review_count', 'date']
fig = px.bar(
daily_counts,
x='date',
y='review_count',
title="Daily Review Volume",
labels={'review_count': 'Number of Reviews', 'date': 'Date'}
)
fig.update_layout(
template='plotly_white',
xaxis_title="Date",
yaxis_title="Number of Reviews"
)
return fig
class FilterEngine:
"""Handles data filtering based on user selections."""
@staticmethod
def apply_filters(df: pd.DataFrame, filters: Dict[str, Any]) -> pd.DataFrame:
"""
Apply multiple filters to the dataframe.
Args:
df: Original dataframe
filters: Dictionary containing filter criteria
Returns:
Filtered dataframe
"""
filtered_df = df.copy()
# Date range filter
if filters.get('date_range'):
start_date, end_date = filters['date_range']
# Convert to pandas datetime for comparison
filtered_df['date'] = pd.to_datetime(filtered_df['date'])
start_date_pd = pd.to_datetime(start_date)
end_date_pd = pd.to_datetime(end_date)
filtered_df = filtered_df[
(filtered_df['date'] >= start_date_pd) &
(filtered_df['date'] <= end_date_pd)
]
# Sentiment filter
if filters.get('sentiment') and filters['sentiment'] != 'All':
filtered_df = filtered_df[filtered_df['overall_sentiment'] == filters['sentiment']]
# Intent filter
if filters.get('intent') and filters['intent'] != 'All':
filtered_df = filtered_df[filtered_df['intent'] == filters['intent']]
# Language filter
if filters.get('language') and filters['language'] != 'All':
filtered_df = filtered_df[filtered_df['detected_language'] == filters['language']]
# Aspect filter
if filters.get('aspect') and filters['aspect'] != 'All':
filtered_df = filtered_df[
filtered_df['aspects'].apply(
lambda x: filters['aspect'] in x if isinstance(x, list) else False
)
]
return filtered_df
@staticmethod
def get_filter_options(df: pd.DataFrame) -> Dict[str, List]:
"""
Extract available filter options from the dataframe.
Args:
df: Processed dataframe
Returns:
Dictionary containing filter options
"""
# Extract unique aspects
all_aspects = set()
for aspects in df['aspects']:
if isinstance(aspects, list):
all_aspects.update(aspects)
return {
'sentiments': ['All'] + sorted(df['overall_sentiment'].unique().tolist()),
'intents': ['All'] + sorted(df['intent'].unique().tolist()),
'languages': ['All'] + sorted(df['detected_language'].unique().tolist()),
'aspects': ['All'] + sorted(list(all_aspects)),
'date_range': (df['date'].min().date(), df['date'].max().date())
}