| | """ |
| | Data loader module for Sentiment Analysis Visualization |
| | Handles Snowflake connection and data loading with caching |
| | """ |
| | import sys |
| | import os |
| | import re |
| | import pandas as pd |
| | import numpy as np |
| | import streamlit as st |
| | from pathlib import Path |
| | import json |
| | from datetime import datetime, timedelta |
| | from dateutil.relativedelta import relativedelta |
| |
|
| | |
| | parent_dir = Path(__file__).resolve().parent.parent.parent |
| | sys.path.append(str(parent_dir)) |
| |
|
| | from visualization.SnowFlakeConnection import SnowFlakeConn |
| |
|
| |
|
| | class SentimentDataLoader: |
| | """ |
| | Loads sentiment analysis data from Snowflake with caching. |
| | |
| | Three data loading modes: |
| | - load_dashboard_data() : lightweight (no text), cached 24h |
| | - load_sa_data(...) : top-N content stats + sampled comments, on-demand |
| | - load_reply_required_data() : reply-queue comments with text, on-demand |
| | """ |
| |
|
| | def __init__(self, config_path=None): |
| | if config_path is None: |
| | config_path = Path(__file__).parent.parent / "config" / "viz_config.json" |
| |
|
| | with open(config_path, 'r') as f: |
| | self.config = json.load(f) |
| |
|
| | self.query = self.config['snowflake']['query'] |
| | self.dashboard_query = self.config['snowflake'].get('dashboard_query', self.query) |
| | self.demographics_query = self.config['snowflake'].get('demographics_query', None) |
| |
|
| | |
| | |
| | |
| |
|
| | @st.cache_data(ttl=86400) |
| | def load_dashboard_data(_self): |
| | """ |
| | Load lightweight dashboard data from Snowflake (no text columns). |
| | Includes demographics merge if demographics_query is configured. |
| | |
| | Returns: |
| | pd.DataFrame |
| | """ |
| | try: |
| | conn = SnowFlakeConn() |
| | df = conn.run_read_query(_self.dashboard_query, "dashboard data") |
| | conn.close_connection() |
| |
|
| | if df is None or df.empty: |
| | st.error("No dashboard data returned from Snowflake") |
| | return pd.DataFrame() |
| |
|
| | df = _self._process_dashboard_dataframe(df) |
| |
|
| | if _self.demographics_query: |
| | demographics_df = _self.load_demographics_data() |
| | df = _self.merge_demographics_with_comments(df, demographics_df) |
| |
|
| | return df |
| |
|
| | except Exception as e: |
| | st.error(f"Error loading dashboard data from Snowflake: {e}") |
| | return pd.DataFrame() |
| |
|
| | def _process_dashboard_dataframe(self, df): |
| | """Process lightweight dashboard dataframe (no text columns).""" |
| | df.columns = df.columns.str.lower() |
| |
|
| | if 'comment_timestamp' in df.columns: |
| | df['comment_timestamp'] = pd.to_datetime(df['comment_timestamp'], errors='coerce') |
| |
|
| | if 'processed_at' in df.columns: |
| | df['processed_at'] = pd.to_datetime(df['processed_at'], errors='coerce') |
| |
|
| | df['sentiment_polarity'] = df['sentiment_polarity'].fillna('unknown') |
| | df['intent'] = df['intent'].fillna('unknown') |
| | df['platform'] = df['platform'].fillna('unknown').str.lower() |
| | df['brand'] = df['brand'].fillna('unknown').str.lower() |
| |
|
| | if 'requires_reply' in df.columns: |
| | df['requires_reply'] = df['requires_reply'].astype(bool) |
| |
|
| | return df |
| |
|
| | |
| | |
| | |
| |
|
| | @st.cache_data(ttl=86400) |
| | def load_data(_self, reload=False): |
| | """ |
| | Load full sentiment data (with text). Kept for compatibility. |
| | Prefer load_dashboard_data() for dashboard views. |
| | """ |
| | try: |
| | conn = SnowFlakeConn() |
| | df = conn.run_read_query(_self.query, "sentiment features") |
| | conn.close_connection() |
| |
|
| | if df is None or df.empty: |
| | st.error("No data returned from Snowflake") |
| | return pd.DataFrame() |
| |
|
| | df = _self._process_dataframe(df) |
| |
|
| | if _self.demographics_query: |
| | demographics_df = _self.load_demographics_data() |
| | df = _self.merge_demographics_with_comments(df, demographics_df) |
| |
|
| | return df |
| |
|
| | except Exception as e: |
| | st.error(f"Error loading data from Snowflake: {e}") |
| | return pd.DataFrame() |
| |
|
| | def _process_dataframe(self, df): |
| | """Process full dataframe including vectorized display_text computation.""" |
| | df.columns = df.columns.str.lower() |
| |
|
| | if 'comment_timestamp' in df.columns: |
| | df['comment_timestamp'] = pd.to_datetime(df['comment_timestamp'], errors='coerce') |
| |
|
| | if 'processed_at' in df.columns: |
| | df['processed_at'] = pd.to_datetime(df['processed_at'], errors='coerce') |
| |
|
| | df['sentiment_polarity'] = df['sentiment_polarity'].fillna('unknown') |
| | df['intent'] = df['intent'].fillna('unknown') |
| | df['platform'] = df['platform'].fillna('unknown').str.lower() |
| | df['brand'] = df['brand'].fillna('unknown').str.lower() |
| |
|
| | if 'requires_reply' in df.columns: |
| | df['requires_reply'] = df['requires_reply'].astype(bool) |
| |
|
| | |
| | if 'translated_text' in df.columns and 'is_english' in df.columns: |
| | mask_translate = (df['is_english'] == False) & df['translated_text'].notna() |
| | df['display_text'] = df.get('original_text', pd.Series('', index=df.index)).fillna('') |
| | df.loc[mask_translate, 'display_text'] = df.loc[mask_translate, 'translated_text'] |
| | elif 'original_text' in df.columns: |
| | df['display_text'] = df['original_text'].fillna('') |
| | else: |
| | df['display_text'] = '' |
| |
|
| | |
| | text = df['display_text'].astype(str) |
| | df['display_text_short'] = text.where(text.str.len() <= 100, text.str[:100] + '...') |
| |
|
| | return df |
| |
|
| | |
| | |
| | |
| |
|
| | def load_sa_data(self, platform, brand, top_n=10, min_comments=10, |
| | sort_by='severity_score', sentiments=None, intents=None, |
| | date_range=None): |
| | """ |
| | Load Sentiment Analysis page data: |
| | 1. Content aggregation stats for top-N contents |
| | 2. Sampled comments (up to 50 neg + 50 pos + 50 other per content) |
| | |
| | Args: |
| | platform: Selected platform string |
| | brand: Selected brand string |
| | top_n: Max number of contents to return |
| | min_comments: Minimum comment threshold for inclusion |
| | sort_by: 'severity_score' | 'sentiment_percentage' | 'sentiment_count' | 'total_comments' |
| | sentiments: List of sentiments to filter by (dominant_sentiment) |
| | intents: List of intents to filter by |
| | date_range: Tuple (start_date, end_date) or None |
| | |
| | Returns: |
| | tuple: (contents_df, comments_df) |
| | """ |
| | sentiments_key = tuple(sorted(sentiments)) if sentiments else () |
| | intents_key = tuple(sorted(intents)) if intents else () |
| | date_key = (str(date_range[0]), str(date_range[1])) if date_range and len(date_range) == 2 else () |
| |
|
| | return self._fetch_sa_data( |
| | platform, brand, top_n, min_comments, sort_by, |
| | sentiments_key, intents_key, date_key |
| | ) |
| |
|
| | @st.cache_data(ttl=86400) |
| | def _fetch_sa_data(_self, platform, brand, top_n, min_comments, sort_by, |
| | sentiments, intents, date_range): |
| | """Cached SA data fetch β returns (contents_df, comments_df).""" |
| | try: |
| | conn = SnowFlakeConn() |
| |
|
| | |
| | content_query = _self._build_sa_content_query( |
| | platform, brand, min_comments, sort_by, date_range |
| | ) |
| | contents_df = conn.run_read_query(content_query, "SA content aggregation") |
| |
|
| | if contents_df is None or contents_df.empty: |
| | conn.close_connection() |
| | return pd.DataFrame(), pd.DataFrame() |
| |
|
| | |
| | contents_df = _self._process_sa_content_stats(contents_df) |
| |
|
| | |
| | if sentiments: |
| | contents_df = contents_df[contents_df['dominant_sentiment'].isin(sentiments)] |
| |
|
| | |
| | contents_df = contents_df.head(top_n) |
| |
|
| | if contents_df.empty: |
| | conn.close_connection() |
| | return pd.DataFrame(), pd.DataFrame() |
| |
|
| | |
| | content_sk_list = contents_df['content_sk'].tolist() |
| | comments_query = _self._build_sa_comments_query( |
| | platform, brand, content_sk_list, date_range |
| | ) |
| | comments_df = conn.run_read_query(comments_query, "SA sampled comments") |
| | conn.close_connection() |
| |
|
| | if comments_df is not None and not comments_df.empty: |
| | comments_df = _self._process_sa_comments(comments_df) |
| |
|
| | |
| | |
| | if intents: |
| | pattern = '|'.join(re.escape(i) for i in intents) |
| | valid_sks = comments_df[ |
| | comments_df['intent'].str.contains(pattern, na=False, case=False) |
| | ]['content_sk'].unique() |
| | contents_df = contents_df[contents_df['content_sk'].isin(valid_sks)] |
| | comments_df = comments_df[comments_df['content_sk'].isin(valid_sks)] |
| | else: |
| | comments_df = pd.DataFrame() |
| |
|
| | return contents_df, comments_df |
| |
|
| | except Exception as e: |
| | st.error(f"Error loading SA data: {e}") |
| | return pd.DataFrame(), pd.DataFrame() |
| |
|
| | def _build_sa_content_query(self, platform, brand, min_comments, sort_by, date_range): |
| | """Build dynamic SQL for content-level aggregation (no text columns).""" |
| | |
| | social_date_clause = self._build_date_clause(date_range, table_alias='s') |
| | musora_date_clause = self._build_date_clause(date_range) |
| |
|
| | safe_brand = self._sanitize_value(brand.lower()) |
| | safe_platform = self._sanitize_value(platform.lower()) |
| |
|
| | sort_exprs = { |
| | 'severity_score': ( |
| | "(SUM(CASE WHEN SENTIMENT_POLARITY IN ('negative','very_negative') THEN 1 ELSE 0 END)" |
| | " * 100.0 / COUNT(*)) * SQRT(COUNT(*))" |
| | ), |
| | 'sentiment_percentage': ( |
| | "SUM(CASE WHEN SENTIMENT_POLARITY IN ('negative','very_negative') THEN 1 ELSE 0 END)" |
| | " * 100.0 / COUNT(*)" |
| | ), |
| | 'sentiment_count': ( |
| | "SUM(CASE WHEN SENTIMENT_POLARITY IN ('negative','very_negative') THEN 1 ELSE 0 END)" |
| | ), |
| | 'total_comments': "COUNT(*)", |
| | } |
| | sort_expr = sort_exprs.get(sort_by, sort_exprs['severity_score']) |
| |
|
| | parts = [] |
| |
|
| | if platform != 'musora_app': |
| | parts.append(f""" |
| | SELECT |
| | s.COMMENT_SK, s.CONTENT_SK, s.CONTENT_DESCRIPTION, |
| | c.PERMALINK_URL, CAST(NULL AS VARCHAR) AS THUMBNAIL_URL, |
| | s.SENTIMENT_POLARITY, s.INTENT, s.REQUIRES_REPLY, s.COMMENT_TIMESTAMP |
| | FROM SOCIAL_MEDIA_DB.ML_FEATURES.COMMENT_SENTIMENT_FEATURES s |
| | LEFT JOIN SOCIAL_MEDIA_DB.CORE.DIM_CONTENT c ON s.CONTENT_SK = c.CONTENT_SK |
| | WHERE LOWER(s.CHANNEL_NAME) = '{safe_brand}' |
| | AND LOWER(s.PLATFORM) = '{safe_platform}' |
| | {social_date_clause} |
| | """) |
| |
|
| | if platform == 'musora_app': |
| | parts.append(f""" |
| | SELECT |
| | COMMENT_SK, CONTENT_SK, CONTENT_DESCRIPTION, |
| | PERMALINK_URL, THUMBNAIL_URL, |
| | SENTIMENT_POLARITY, INTENT, REQUIRES_REPLY, COMMENT_TIMESTAMP |
| | FROM SOCIAL_MEDIA_DB.ML_FEATURES.MUSORA_COMMENT_SENTIMENT_FEATURES |
| | WHERE LOWER(CHANNEL_NAME) = '{safe_brand}' |
| | {musora_date_clause} |
| | """) |
| |
|
| | combined = " UNION ALL ".join(parts) |
| |
|
| | return f""" |
| | WITH combined AS ({combined}) |
| | SELECT |
| | CONTENT_SK, |
| | MAX(CONTENT_DESCRIPTION) AS CONTENT_DESCRIPTION, |
| | MAX(PERMALINK_URL) AS PERMALINK_URL, |
| | MAX(THUMBNAIL_URL) AS THUMBNAIL_URL, |
| | COUNT(*) AS TOTAL_COMMENTS, |
| | SUM(CASE WHEN REQUIRES_REPLY THEN 1 ELSE 0 END) AS REPLY_REQUIRED_COUNT, |
| | SUM(CASE WHEN SENTIMENT_POLARITY = 'very_negative' THEN 1 ELSE 0 END) AS VERY_NEGATIVE_COUNT, |
| | SUM(CASE WHEN SENTIMENT_POLARITY = 'negative' THEN 1 ELSE 0 END) AS NEGATIVE_COUNT_RAW, |
| | SUM(CASE WHEN SENTIMENT_POLARITY = 'neutral' THEN 1 ELSE 0 END) AS NEUTRAL_COUNT, |
| | SUM(CASE WHEN SENTIMENT_POLARITY = 'positive' THEN 1 ELSE 0 END) AS POSITIVE_COUNT_RAW, |
| | SUM(CASE WHEN SENTIMENT_POLARITY = 'very_positive' THEN 1 ELSE 0 END) AS VERY_POSITIVE_COUNT |
| | FROM combined |
| | GROUP BY CONTENT_SK |
| | HAVING COUNT(*) >= {int(min_comments)} |
| | ORDER BY {sort_expr} DESC |
| | """ |
| |
|
| | def _process_sa_content_stats(self, df): |
| | """ |
| | Derive all columns expected by the existing SA page UI from the |
| | raw content-aggregation result. |
| | """ |
| | df['negative_count'] = df['very_negative_count'] + df['negative_count_raw'] |
| | df['positive_count'] = df['positive_count_raw'] + df['very_positive_count'] |
| |
|
| | df['negative_percentage'] = ( |
| | df['negative_count'] / df['total_comments'] * 100 |
| | ).round(2) |
| | df['positive_percentage'] = ( |
| | df['positive_count'] / df['total_comments'] * 100 |
| | ).round(2) |
| |
|
| | df['severity_score'] = ( |
| | df['negative_percentage'] * (df['total_comments'] ** 0.5) |
| | ).round(2) |
| |
|
| | |
| | df['dynamic_severity_score'] = df['severity_score'] |
| | df['selected_sentiment_count'] = df['negative_count'] |
| | df['selected_sentiment_percentage'] = df['negative_percentage'] |
| |
|
| | |
| | sentiment_cols = pd.DataFrame({ |
| | 'very_negative': df['very_negative_count'], |
| | 'negative': df['negative_count_raw'], |
| | 'neutral': df['neutral_count'], |
| | 'positive': df['positive_count_raw'], |
| | 'very_positive': df['very_positive_count'], |
| | }) |
| | df['dominant_sentiment'] = sentiment_cols.idxmax(axis=1) |
| |
|
| | return df |
| |
|
| | def _build_sa_comments_query(self, platform, brand, content_sk_list, date_range): |
| | """ |
| | Build SQL for sampled comments for a list of content_sks. |
| | Samples up to 50 per (content_sk, sentiment_group) β neg, pos, other. |
| | display_text is computed in SQL (no need to fetch both original + translated). |
| | """ |
| | |
| | social_date_clause = self._build_date_clause(date_range, table_alias='s') |
| | musora_date_clause = self._build_date_clause(date_range) |
| | safe_brand = self._sanitize_value(brand.lower()) |
| | content_sks_str = ", ".join(f"'{self._sanitize_value(str(sk))}'" for sk in content_sk_list) |
| |
|
| | parts = [] |
| |
|
| | if platform != 'musora_app': |
| | parts.append(f""" |
| | SELECT |
| | s.COMMENT_SK, s.COMMENT_ID, s.CONTENT_SK, s.CONTENT_DESCRIPTION, |
| | CASE WHEN s.IS_ENGLISH = FALSE AND s.TRANSLATED_TEXT IS NOT NULL |
| | THEN s.TRANSLATED_TEXT ELSE s.ORIGINAL_TEXT END AS DISPLAY_TEXT, |
| | s.ORIGINAL_TEXT, |
| | LOWER(s.PLATFORM) AS PLATFORM, |
| | LOWER(s.CHANNEL_NAME) AS BRAND, |
| | s.COMMENT_TIMESTAMP, s.AUTHOR_NAME, |
| | s.DETECTED_LANGUAGE, s.SENTIMENT_POLARITY, s.INTENT, |
| | s.REQUIRES_REPLY, s.SENTIMENT_CONFIDENCE, s.IS_ENGLISH, |
| | c.PERMALINK_URL |
| | FROM SOCIAL_MEDIA_DB.ML_FEATURES.COMMENT_SENTIMENT_FEATURES s |
| | LEFT JOIN SOCIAL_MEDIA_DB.CORE.DIM_CONTENT c ON s.CONTENT_SK = c.CONTENT_SK |
| | WHERE s.CONTENT_SK IN ({content_sks_str}) |
| | AND LOWER(s.CHANNEL_NAME) = '{safe_brand}' |
| | {social_date_clause} |
| | """) |
| |
|
| | if platform == 'musora_app': |
| | parts.append(f""" |
| | SELECT |
| | COMMENT_SK, COMMENT_ID, CONTENT_SK, CONTENT_DESCRIPTION, |
| | CASE WHEN IS_ENGLISH = FALSE AND TRANSLATED_TEXT IS NOT NULL |
| | THEN TRANSLATED_TEXT ELSE ORIGINAL_TEXT END AS DISPLAY_TEXT, |
| | ORIGINAL_TEXT, |
| | 'musora_app' AS PLATFORM, |
| | LOWER(CHANNEL_NAME) AS BRAND, |
| | COMMENT_TIMESTAMP, AUTHOR_NAME, |
| | DETECTED_LANGUAGE, SENTIMENT_POLARITY, INTENT, |
| | REQUIRES_REPLY, SENTIMENT_CONFIDENCE, IS_ENGLISH, |
| | PERMALINK_URL |
| | FROM SOCIAL_MEDIA_DB.ML_FEATURES.MUSORA_COMMENT_SENTIMENT_FEATURES |
| | WHERE CONTENT_SK IN ({content_sks_str}) |
| | AND LOWER(CHANNEL_NAME) = '{safe_brand}' |
| | {musora_date_clause} |
| | """) |
| |
|
| | combined = " UNION ALL ".join(parts) |
| |
|
| | return f""" |
| | WITH combined AS ({combined}) |
| | SELECT * |
| | FROM combined |
| | QUALIFY ROW_NUMBER() OVER ( |
| | PARTITION BY CONTENT_SK, |
| | CASE |
| | WHEN SENTIMENT_POLARITY IN ('negative', 'very_negative') THEN 'neg' |
| | WHEN SENTIMENT_POLARITY IN ('positive', 'very_positive') THEN 'pos' |
| | ELSE 'other' |
| | END |
| | ORDER BY |
| | CASE SENTIMENT_POLARITY |
| | WHEN 'very_negative' THEN 1 WHEN 'negative' THEN 2 |
| | WHEN 'very_positive' THEN 1 WHEN 'positive' THEN 2 |
| | ELSE 3 |
| | END, |
| | RANDOM() |
| | ) <= 50 |
| | """ |
| |
|
| | def _process_sa_comments(self, df): |
| | """Process sampled comments dataframe for the SA page.""" |
| | if 'comment_timestamp' in df.columns: |
| | df['comment_timestamp'] = pd.to_datetime(df['comment_timestamp'], errors='coerce') |
| |
|
| | df['sentiment_polarity'] = df['sentiment_polarity'].fillna('unknown') |
| | df['intent'] = df['intent'].fillna('unknown') |
| | df['platform'] = df['platform'].fillna('unknown').str.lower() |
| |
|
| | if 'requires_reply' in df.columns: |
| | df['requires_reply'] = df['requires_reply'].astype(bool) |
| |
|
| | |
| | if 'display_text' in df.columns: |
| | text = df['display_text'].astype(str) |
| | df['display_text_short'] = text.where( |
| | text.str.len() <= 100, text.str[:100] + '...' |
| | ) |
| |
|
| | return df |
| |
|
| | |
| | |
| | |
| |
|
| | def load_reply_required_data(self, platforms=None, brands=None, date_range=None): |
| | """ |
| | Load comments requiring reply, filtered by platform/brand/date. |
| | |
| | Args: |
| | platforms: List of platform strings (or None for all) |
| | brands: List of brand strings (or None for all) |
| | date_range: Tuple (start_date, end_date) or None |
| | |
| | Returns: |
| | pd.DataFrame |
| | """ |
| | platforms_key = tuple(sorted(platforms)) if platforms else () |
| | brands_key = tuple(sorted(brands)) if brands else () |
| | date_key = (str(date_range[0]), str(date_range[1])) if date_range and len(date_range) == 2 else () |
| |
|
| | return self._fetch_rr_data(platforms_key, brands_key, date_key) |
| |
|
| | @st.cache_data(ttl=86400) |
| | def _fetch_rr_data(_self, platforms, brands, date_range): |
| | """Cached Reply Required data fetch.""" |
| | try: |
| | query = _self._build_rr_query(platforms, brands, date_range) |
| | if not query: |
| | return pd.DataFrame() |
| |
|
| | conn = SnowFlakeConn() |
| | df = conn.run_read_query(query, "reply required comments") |
| | conn.close_connection() |
| |
|
| | if df is None or df.empty: |
| | return pd.DataFrame() |
| |
|
| | |
| | if 'comment_timestamp' in df.columns: |
| | df['comment_timestamp'] = pd.to_datetime(df['comment_timestamp'], errors='coerce') |
| |
|
| | df['sentiment_polarity'] = df['sentiment_polarity'].fillna('unknown') |
| | df['intent'] = df['intent'].fillna('unknown') |
| | df['platform'] = df['platform'].fillna('unknown').str.lower() |
| | df['brand'] = df['brand'].fillna('unknown').str.lower() |
| |
|
| | if 'requires_reply' in df.columns: |
| | df['requires_reply'] = df['requires_reply'].astype(bool) |
| |
|
| | if 'comment_timestamp' in df.columns: |
| | df = df.sort_values('comment_timestamp', ascending=False) |
| |
|
| | |
| | if 'display_text' in df.columns: |
| | text = df['display_text'].astype(str) |
| | df['display_text_short'] = text.where( |
| | text.str.len() <= 100, text.str[:100] + '...' |
| | ) |
| |
|
| | return df |
| |
|
| | except Exception as e: |
| | st.error(f"Error loading reply required data: {e}") |
| | return pd.DataFrame() |
| |
|
| | def _build_rr_query(self, platforms, brands, date_range): |
| | """Build dynamic SQL for the Reply Required page.""" |
| | |
| | |
| |
|
| | |
| | social_date_clause = "" |
| | musora_date_clause = "" |
| | if date_range and len(date_range) == 2: |
| | social_date_clause = ( |
| | f"AND s.COMMENT_TIMESTAMP >= '{date_range[0]}'" |
| | f" AND s.COMMENT_TIMESTAMP <= '{date_range[1]}'" |
| | ) |
| | musora_date_clause = ( |
| | f"AND COMMENT_TIMESTAMP >= '{date_range[0]}'" |
| | f" AND COMMENT_TIMESTAMP <= '{date_range[1]}'" |
| | ) |
| |
|
| | |
| | social_brand_clause = "" |
| | musora_brand_clause = "" |
| | if brands: |
| | brands_str = "', '".join(self._sanitize_value(b.lower()) for b in brands) |
| | social_brand_clause = f"AND LOWER(s.CHANNEL_NAME) IN ('{brands_str}')" |
| | musora_brand_clause = f"AND LOWER(CHANNEL_NAME) IN ('{brands_str}')" |
| |
|
| | |
| | include_social = True |
| | include_musora = True |
| | social_platform_clause = "" |
| |
|
| | if platforms: |
| | non_musora = [p for p in platforms if p != 'musora_app'] |
| | include_musora = 'musora_app' in platforms |
| | include_social = len(non_musora) > 0 |
| | if non_musora: |
| | plat_str = "', '".join(self._sanitize_value(p.lower()) for p in non_musora) |
| | social_platform_clause = f"AND LOWER(s.PLATFORM) IN ('{plat_str}')" |
| |
|
| | if not include_social and not include_musora: |
| | return None |
| |
|
| | parts = [] |
| |
|
| | if include_social: |
| | parts.append(f""" |
| | SELECT |
| | s.COMMENT_SK, s.COMMENT_ID, s.CONTENT_SK, s.CONTENT_DESCRIPTION, |
| | CASE WHEN s.IS_ENGLISH = FALSE AND s.TRANSLATED_TEXT IS NOT NULL |
| | THEN s.TRANSLATED_TEXT ELSE s.ORIGINAL_TEXT END AS DISPLAY_TEXT, |
| | s.ORIGINAL_TEXT, |
| | LOWER(s.PLATFORM) AS PLATFORM, |
| | LOWER(s.CHANNEL_NAME) AS BRAND, |
| | s.COMMENT_TIMESTAMP, s.AUTHOR_NAME, |
| | s.DETECTED_LANGUAGE, s.SENTIMENT_POLARITY, s.INTENT, |
| | s.REQUIRES_REPLY, s.SENTIMENT_CONFIDENCE, s.IS_ENGLISH, |
| | c.PERMALINK_URL |
| | FROM SOCIAL_MEDIA_DB.ML_FEATURES.COMMENT_SENTIMENT_FEATURES s |
| | LEFT JOIN SOCIAL_MEDIA_DB.CORE.DIM_CONTENT c ON s.CONTENT_SK = c.CONTENT_SK |
| | WHERE s.REQUIRES_REPLY = TRUE |
| | {social_platform_clause} |
| | {social_brand_clause} |
| | {social_date_clause} |
| | """) |
| |
|
| | if include_musora: |
| | parts.append(f""" |
| | SELECT |
| | COMMENT_SK, COMMENT_ID, CONTENT_SK, CONTENT_DESCRIPTION, |
| | CASE WHEN IS_ENGLISH = FALSE AND TRANSLATED_TEXT IS NOT NULL |
| | THEN TRANSLATED_TEXT ELSE ORIGINAL_TEXT END AS DISPLAY_TEXT, |
| | ORIGINAL_TEXT, |
| | 'musora_app' AS PLATFORM, |
| | LOWER(CHANNEL_NAME) AS BRAND, |
| | COMMENT_TIMESTAMP, AUTHOR_NAME, |
| | DETECTED_LANGUAGE, SENTIMENT_POLARITY, INTENT, |
| | REQUIRES_REPLY, SENTIMENT_CONFIDENCE, IS_ENGLISH, |
| | PERMALINK_URL |
| | FROM SOCIAL_MEDIA_DB.ML_FEATURES.MUSORA_COMMENT_SENTIMENT_FEATURES |
| | WHERE REQUIRES_REPLY = TRUE |
| | {musora_brand_clause} |
| | {musora_date_clause} |
| | """) |
| |
|
| | combined = " UNION ALL ".join(parts) |
| | return f""" |
| | WITH combined AS ({combined}) |
| | SELECT * FROM combined |
| | ORDER BY COMMENT_TIMESTAMP DESC |
| | """ |
| |
|
| | |
| | |
| | |
| |
|
| | @st.cache_data(ttl=86400) |
| | def load_demographics_data(_self): |
| | """Load user demographic data from Snowflake.""" |
| | if not _self.demographics_query: |
| | return pd.DataFrame() |
| |
|
| | try: |
| | conn = SnowFlakeConn() |
| | query_with_cast = _self.demographics_query.replace( |
| | "u.birthday as BIRTHDAY", |
| | "TO_VARCHAR(u.birthday, 'YYYY-MM-DD HH24:MI:SS.FF6 TZHTZM') as BIRTHDAY" |
| | ) |
| | df = conn.run_read_query(query_with_cast, "user demographics") |
| | conn.close_connection() |
| |
|
| | if df is None or df.empty: |
| | return pd.DataFrame() |
| |
|
| | return _self._process_demographics_dataframe(df) |
| |
|
| | except Exception as e: |
| | st.warning(f"Could not load demographic data: {str(e)}") |
| | return pd.DataFrame() |
| |
|
| | def _process_demographics_dataframe(self, df): |
| | """Process and enrich demographic dataframe.""" |
| | df.columns = df.columns.str.lower() |
| |
|
| | if 'birthday' in df.columns: |
| | df['birthday'] = df['birthday'].astype(str) |
| | df['birthday'] = pd.to_datetime(df['birthday'], errors='coerce', utc=True) |
| | df['birthday'] = df['birthday'].dt.tz_localize(None) |
| | df['age'] = df['birthday'].apply(self._calculate_age) |
| | df['age_group'] = df['age'].apply(self._categorize_age) |
| |
|
| | if 'timezone' in df.columns: |
| | df['timezone_region'] = df['timezone'].apply(self._extract_timezone_region) |
| |
|
| | if 'experience_level' in df.columns: |
| | df['experience_group'] = df['experience_level'].apply(self._categorize_experience) |
| |
|
| | if 'user_id' in df.columns: |
| | df = df[df['user_id'].notna()] |
| |
|
| | return df |
| |
|
| | @staticmethod |
| | def _calculate_age(birthday): |
| | if pd.isna(birthday): |
| | return None |
| | try: |
| | age = relativedelta(datetime.now(), birthday).years |
| | return age if 0 <= age <= 120 else None |
| | except Exception: |
| | return None |
| |
|
| | def _categorize_age(self, age): |
| | if pd.isna(age) or age is None: |
| | return 'Unknown' |
| | for group_name, (min_age, max_age) in self.config.get('demographics', {}).get('age_groups', {}).items(): |
| | if min_age <= age <= max_age: |
| | return group_name |
| | return 'Unknown' |
| |
|
| | @staticmethod |
| | def _extract_timezone_region(timezone): |
| | if pd.isna(timezone) or not isinstance(timezone, str): |
| | return 'Unknown' |
| | parts = timezone.split('/') |
| | return parts[0] if parts else 'Unknown' |
| |
|
| | def _categorize_experience(self, experience_level): |
| | if pd.isna(experience_level): |
| | return 'Unknown' |
| | try: |
| | exp_level = float(experience_level) |
| | except Exception: |
| | return 'Unknown' |
| | for group_name, (min_exp, max_exp) in self.config.get('demographics', {}).get('experience_groups', {}).items(): |
| | if min_exp <= exp_level <= max_exp: |
| | return group_name |
| | return 'Unknown' |
| |
|
| | def merge_demographics_with_comments(self, comments_df, demographics_df): |
| | """Merge demographic data with comment data for musora_app platform only.""" |
| | if demographics_df.empty: |
| | for col, val in [('age', None), ('age_group', 'Unknown'), |
| | ('timezone', None), ('timezone_region', 'Unknown'), |
| | ('experience_level', None), ('experience_group', 'Unknown')]: |
| | comments_df[col] = val |
| | return comments_df |
| |
|
| | if 'author_id' in comments_df.columns and 'user_id' in demographics_df.columns: |
| | comments_df = comments_df.copy() |
| | comments_df['author_id_str'] = comments_df['author_id'].astype(str) |
| | demographics_df['user_id_str'] = demographics_df['user_id'].astype(str) |
| |
|
| | merged_df = comments_df.merge( |
| | demographics_df[['user_id_str', 'age', 'age_group', 'timezone', |
| | 'timezone_region', 'experience_level', 'experience_group']], |
| | left_on='author_id_str', |
| | right_on='user_id_str', |
| | how='left' |
| | ) |
| | merged_df.drop(columns=['author_id_str', 'user_id_str'], errors='ignore', inplace=True) |
| |
|
| | for col in ['age_group', 'timezone_region', 'experience_group']: |
| | if col in merged_df.columns: |
| | merged_df[col] = merged_df[col].fillna('Unknown') |
| |
|
| | return merged_df |
| |
|
| | return comments_df |
| |
|
| | |
| | |
| | |
| |
|
| | @staticmethod |
| | def get_filter_options(df): |
| | """Get unique values for sidebar filters.""" |
| | return { |
| | 'platforms': sorted(df['platform'].unique().tolist()), |
| | 'brands': sorted(df['brand'].unique().tolist()), |
| | 'sentiments': sorted(df['sentiment_polarity'].unique().tolist()), |
| | 'languages': sorted(df['detected_language'].dropna().unique().tolist()) |
| | if 'detected_language' in df.columns else [] |
| | } |
| |
|
| | @staticmethod |
| | def apply_filters(df, platforms=None, brands=None, sentiments=None, |
| | date_range=None, languages=None): |
| | """Apply sidebar filters to a dataframe (no copy β boolean indexing only).""" |
| | filtered_df = df |
| |
|
| | if platforms: |
| | filtered_df = filtered_df[filtered_df['platform'].isin(platforms)] |
| |
|
| | if brands: |
| | filtered_df = filtered_df[filtered_df['brand'].isin(brands)] |
| |
|
| | if sentiments: |
| | filtered_df = filtered_df[filtered_df['sentiment_polarity'].isin(sentiments)] |
| |
|
| | if languages: |
| | filtered_df = filtered_df[filtered_df['detected_language'].isin(languages)] |
| |
|
| | if date_range and len(date_range) == 2 and 'comment_timestamp' in filtered_df.columns: |
| | start_date, end_date = date_range |
| | filtered_df = filtered_df[ |
| | (filtered_df['comment_timestamp'] >= pd.Timestamp(start_date)) & |
| | (filtered_df['comment_timestamp'] <= pd.Timestamp(end_date)) |
| | ] |
| |
|
| | return filtered_df |
| |
|
| | @staticmethod |
| | def get_date_range(df, default_days=30): |
| | """Get default date range from dataframe.""" |
| | if 'comment_timestamp' in df.columns and not df.empty: |
| | max_date = df['comment_timestamp'].max() |
| | min_date = max_date - timedelta(days=default_days) |
| | return (min_date, max_date) |
| | return (datetime.now() - timedelta(days=default_days), datetime.now()) |
| |
|
| | |
| | |
| | |
| |
|
| | @staticmethod |
| | def _sanitize_value(value): |
| | """Remove characters that could break SQL string literals.""" |
| | return re.sub(r"['\";\\]", '', str(value)) |
| |
|
| | @staticmethod |
| | def _build_date_clause(date_range, table_alias=None): |
| | """ |
| | Build a SQL AND COMMENT_TIMESTAMP ... clause, or empty string. |
| | |
| | Args: |
| | date_range: tuple of (start, end) or None |
| | table_alias: optional table alias prefix (e.g. 's') to avoid |
| | ambiguous column errors when a JOIN is present |
| | """ |
| | if date_range and len(date_range) == 2: |
| | col = f"{table_alias}.COMMENT_TIMESTAMP" if table_alias else "COMMENT_TIMESTAMP" |
| | return f"AND {col} >= '{date_range[0]}' AND {col} <= '{date_range[1]}'" |
| | return "" |