Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| """ | |
| Twitter Content Analyzer | |
| A comprehensive Twitter data collection and analysis tool with automated scheduling capabilities. | |
| """ | |
| import os | |
| import logging | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Tuple, Any | |
| from collections import Counter | |
| import streamlit as st | |
| import pandas as pd | |
| import plotly.express as px | |
| import pytz | |
| from pymongo import MongoClient | |
| import google.generativeai as genai | |
| from apify_client import ApifyClient | |
| from dotenv import load_dotenv | |
| # ============================================================================= | |
| # CONSTANTS | |
| # ============================================================================= | |
| DEFAULT_USERNAME = "narendramodi" | |
| DEFAULT_DAYS_BACK = 7 | |
| IST_TIMEZONE = 'Asia/Kolkata' | |
| UTC_TIMEZONE = 'UTC' | |
| # Twitter API date format | |
| TWITTER_DATE_FORMAT = "%a %b %d %H:%M:%S %z %Y" | |
| # MongoDB collection names | |
| TWEETS_COLLECTION = "tweets" | |
| SCHEDULER_USERS_COLLECTION = "scheduler_users" | |
| # Streamlit page config | |
| PAGE_CONFIG = { | |
| "page_title": "Twitter Scraper & Analyzer", | |
| "page_icon": "π¦", | |
| "layout": "wide", | |
| "initial_sidebar_state": "expanded" | |
| } | |
| # ============================================================================= | |
| # LOGGING CONFIGURATION | |
| # ============================================================================= | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # ============================================================================= | |
| # UTILITY FUNCTIONS | |
| # ============================================================================= | |
| def convert_to_ist(utc_dt: datetime) -> datetime: | |
| """Convert UTC datetime to Indian Standard Time.""" | |
| if utc_dt.tzinfo is None: | |
| utc_dt = pytz.utc.localize(utc_dt) | |
| return utc_dt.astimezone(pytz.timezone(IST_TIMEZONE)) | |
| def safe_get_nested(data: Dict, keys: List[str], default=None): | |
| """Safely get nested dictionary values.""" | |
| for key in keys: | |
| if isinstance(data, dict) and key in data: | |
| data = data[key] | |
| else: | |
| return default | |
| return data | |
| def format_large_number(num: int) -> str: | |
| """Format large numbers with commas.""" | |
| return f"{num:,}" if num > 0 else "N/A" | |
| # ============================================================================= | |
| # CONFIGURATION MANAGEMENT | |
| # ============================================================================= | |
| class AppConfig: | |
| """Centralized configuration management.""" | |
| def __init__(self, env_path: str = ".env.local"): | |
| load_dotenv(dotenv_path=env_path) | |
| self._validate_config() | |
| def mongodb_uri(self) -> Optional[str]: | |
| return os.getenv("MONGODB_URI") | |
| def apify_api_key(self) -> Optional[str]: | |
| return os.getenv("APIFY_API_KEY") | |
| def gemini_api_key(self) -> Optional[str]: | |
| return os.getenv("GEMINI_API_KEY") | |
| def _validate_config(self) -> None: | |
| """Validate essential configuration.""" | |
| if not self.apify_api_key: | |
| raise ValueError("APIFY_API_KEY is required but not found in environment variables") | |
| # ============================================================================= | |
| # DATABASE MANAGEMENT | |
| # ============================================================================= | |
| class DatabaseManager: | |
| """Handles all MongoDB operations.""" | |
| def __init__(self, uri: Optional[str]): | |
| self.client = None | |
| self.db = None | |
| self.is_connected = False | |
| self._connect(uri) | |
| def _connect(self, uri: Optional[str]) -> None: | |
| """Establish MongoDB connection.""" | |
| if not uri: | |
| logger.warning("No MongoDB URI provided. Running in offline mode.") | |
| self._setup_dummy_collections() | |
| return | |
| try: | |
| self.client = MongoClient(uri, serverSelectionTimeoutMS=5000) | |
| self.client.admin.command('ping') | |
| self.db = self.client["DataCollector"] | |
| self.tweets_collection = self.db[TWEETS_COLLECTION] | |
| self.scheduler_users_collection = self.db[SCHEDULER_USERS_COLLECTION] | |
| self.is_connected = True | |
| logger.info("β MongoDB connected successfully") | |
| except Exception as e: | |
| logger.error(f"β οΈ MongoDB connection failed: {e}") | |
| logger.info("π Running in offline mode - data will not be stored") | |
| self._setup_dummy_collections() | |
| def _setup_dummy_collections(self) -> None: | |
| """Setup dummy collections for offline mode.""" | |
| class DummyCollection: | |
| def update_one(self, *args, **kwargs): pass | |
| def find(self, *args, **kwargs): return [] | |
| def find_one(self, *args, **kwargs): return None | |
| def insert_one(self, *args, **kwargs): pass | |
| self.tweets_collection = DummyCollection() | |
| self.scheduler_users_collection = DummyCollection() | |
| self.is_connected = False | |
| # ============================================================================= | |
| # API SERVICES | |
| # ============================================================================= | |
| class ApifyService: | |
| """Handles Apify API interactions for Twitter data collection.""" | |
| ACTOR_ID = "CJdippxWmn9uRfooo" | |
| def __init__(self, api_key: str): | |
| self.client = ApifyClient(api_key) | |
| def _run_actor(self, run_input: Dict[str, Any]) -> Tuple[List[Dict], str]: | |
| """Execute Apify actor and retrieve dataset.""" | |
| try: | |
| run = self.client.actor(self.ACTOR_ID).call(run_input=run_input) | |
| dataset_id = run["defaultDatasetId"] | |
| data = list(self.client.dataset(dataset_id).iterate_items()) | |
| return data, dataset_id | |
| except Exception as e: | |
| logger.error(f"Apify actor execution failed: {e}") | |
| raise | |
| def fetch_account_tweets(self, username: str, since: str, until: str) -> Tuple[List[Dict], str]: | |
| """Fetch tweets posted by a specific account.""" | |
| # Handle both simple date (YYYY-MM-DD) and full timestamp (YYYY-MM-DD_HH:MM:SS) formats | |
| since_formatted = f"{since}_UTC" if "_" in since else f"{since}_00:00:00_UTC" | |
| until_formatted = f"{until}_UTC" if "_" in until else f"{until}_23:59:59_UTC" | |
| run_input = { | |
| "from": username.strip(), | |
| "since": since_formatted, | |
| "until": until_formatted, | |
| "queryType": "Latest", | |
| "include:nativeretweets": True, | |
| } | |
| with st.spinner(f"Fetching tweets for @{username} from {since} to {until}..."): | |
| data, dataset_id = self._run_actor(run_input) | |
| st.info(f"π Query Details: from:{username} | Raw results: {len(data)} tweets") | |
| return data, dataset_id | |
| def fetch_account_comments(self, username: str, since: str, until: str) -> Tuple[List[Dict], str]: | |
| """Fetch comments/replies directed to a specific account.""" | |
| # Handle both simple date (YYYY-MM-DD) and full timestamp (YYYY-MM-DD_HH:MM:SS) formats | |
| since_formatted = f"{since}_UTC" if "_" in since else f"{since}_00:00:00_UTC" | |
| until_formatted = f"{until}_UTC" if "_" in until else f"{until}_23:59:59_UTC" | |
| run_input = { | |
| "to": username.strip(), | |
| "since": since_formatted, | |
| "until": until_formatted, | |
| "queryType": "Latest", | |
| } | |
| with st.spinner(f"Fetching comments for @{username} from {since} to {until}..."): | |
| data, dataset_id = self._run_actor(run_input) | |
| st.info(f"π Query Details: to:@{username} | Raw results: {len(data)} comments") | |
| return data, dataset_id | |
| class GeminiService: | |
| """Handles Google Generative AI interactions.""" | |
| def __init__(self, api_key: str): | |
| genai.configure(api_key=api_key) | |
| self.model = genai.GenerativeModel('gemini-1.5-flash') | |
| def generate_analysis(self, tweets_df: pd.DataFrame, context: str) -> str: | |
| """Generate AI-powered analysis of tweets.""" | |
| if tweets_df.empty: | |
| return "No tweets provided for analysis." | |
| with st.spinner("Generating AI summary with Gemini..."): | |
| try: | |
| tweets_text = self._format_tweets_for_analysis(tweets_df) | |
| prompt = self._create_analysis_prompt(context, tweets_text) | |
| response = self.model.generate_content(prompt) | |
| return response.text | |
| except Exception as e: | |
| logger.error(f"Gemini analysis failed: {e}") | |
| return f"Error generating summary: {str(e)}" | |
| def _format_tweets_for_analysis(self, tweets_df: pd.DataFrame) -> str: | |
| """Format tweets for AI analysis.""" | |
| return "\n\n".join([ | |
| f"{i}. @{row.Username}: {row.Text} (Likes: {row.Likes}, Retweets: {row.Retweets})" | |
| for i, row in enumerate(tweets_df.itertuples(), 1) | |
| ]) | |
| def _create_analysis_prompt(self, context: str, tweets_text: str) -> str: | |
| """Create analysis prompt for Gemini.""" | |
| return f""" | |
| {context} | |
| Here are the tweets to analyze: | |
| {tweets_text} | |
| Please provide a comprehensive analysis covering: | |
| 1. **Main Themes & Topics:** What are the key subjects of discussion? | |
| 2. **Overall Sentiment:** What is the general tone (positive, negative, neutral)? | |
| 3. **Key Insights & Patterns:** Are there any notable trends or surprising findings? | |
| 4. **Top Recommendations:** Provide 5 actionable suggestions for the brand/party to improve their strategy based on this feedback. | |
| Format the response clearly using Markdown. | |
| """ | |
| # ============================================================================= | |
| # DATA PROCESSING | |
| # ============================================================================= | |
| class TweetDataProcessor: | |
| """Processes raw tweet data into structured format.""" | |
| def process_tweets(self, raw_data: List[Dict[str, Any]], target_username: str = None) -> Tuple[pd.DataFrame, Dict[str, Any]]: | |
| """Transform raw API data into clean DataFrame and metrics.""" | |
| processed_data = [] | |
| hashtags_counter = Counter() | |
| mentions_counter = Counter() | |
| all_author_data = [] | |
| skipped_count = 0 | |
| error_count = 0 | |
| for item in raw_data: | |
| try: | |
| processed_tweet = self._process_single_tweet(item, hashtags_counter, mentions_counter, all_author_data, target_username) | |
| if processed_tweet: | |
| processed_data.append(processed_tweet) | |
| else: | |
| skipped_count += 1 | |
| except Exception as e: | |
| error_count += 1 | |
| # Only log individual errors in debug mode | |
| if st.session_state.get('debug_mode', False): | |
| logger.warning(f"Skipping tweet due to processing error: {e}") | |
| st.warning(f"Skipping a tweet due to processing error: {e}") | |
| # Show summary of skipped items only if significant | |
| if skipped_count > 0 and st.session_state.get('debug_mode', False): | |
| st.info(f"βΉοΈ Skipped {skipped_count} items (likely mock/invalid data)") | |
| if error_count > 0: | |
| st.warning(f"β οΈ {error_count} items had processing errors") | |
| # Extract best account details | |
| account_details = self._extract_best_account_details(all_author_data, target_username) | |
| # Create DataFrame and calculate engagement metrics from tweet data | |
| df = pd.DataFrame(processed_data) | |
| engagement_metrics = self._calculate_engagement_metrics(df, target_username) | |
| # Add engagement metrics to account_details | |
| if account_details: | |
| account_details.update(engagement_metrics) | |
| metrics = { | |
| "top_hashtags": hashtags_counter.most_common(5), | |
| "top_mentions": mentions_counter.most_common(5), | |
| "account_details": account_details | |
| } | |
| return df, metrics | |
| def _calculate_engagement_metrics(self, df: pd.DataFrame, target_username: str = None) -> Dict: | |
| """Calculate comprehensive engagement metrics from tweet data.""" | |
| if df.empty: | |
| return self._get_empty_metrics() | |
| # Filter to only tweets from the target user if specified | |
| if target_username: | |
| user_tweets = df[df['Username'].str.lower() == target_username.lower()] | |
| else: | |
| user_tweets = df | |
| if user_tweets.empty: | |
| return self._get_empty_metrics() | |
| # Basic engagement totals | |
| likes_count = user_tweets['Likes'].sum() if 'Likes' in user_tweets.columns else 0 | |
| views_count = user_tweets['Views'].sum() if 'Views' in user_tweets.columns else 0 | |
| reply_count = user_tweets['Replies'].sum() if 'Replies' in user_tweets.columns else 0 | |
| repost_count = user_tweets['Retweets'].sum() if 'Retweets' in user_tweets.columns else 0 | |
| tweet_count = len(user_tweets) | |
| # Content quality metrics | |
| avg_likes_per_tweet = likes_count / tweet_count if tweet_count > 0 else 0 | |
| avg_views_per_tweet = views_count / tweet_count if tweet_count > 0 else 0 | |
| avg_engagement_rate = ((likes_count + repost_count) / views_count * 100) if views_count > 0 else 0 | |
| # Content length analysis | |
| if 'Text' in user_tweets.columns: | |
| text_lengths = user_tweets['Text'].astype(str).str.len() | |
| avg_tweet_length = text_lengths.mean() | |
| longest_tweet_length = text_lengths.max() | |
| shortest_tweet_length = text_lengths.min() | |
| else: | |
| avg_tweet_length = longest_tweet_length = shortest_tweet_length = 0 | |
| # Media usage metrics | |
| if 'Has_Media' in user_tweets.columns: | |
| tweets_with_media = user_tweets['Has_Media'].sum() | |
| media_usage_percentage = (tweets_with_media / tweet_count * 100) if tweet_count > 0 else 0 | |
| # Media effectiveness | |
| media_tweets = user_tweets[user_tweets['Has_Media'] == True] | |
| no_media_tweets = user_tweets[user_tweets['Has_Media'] == False] | |
| avg_likes_with_media = media_tweets['Likes'].mean() if len(media_tweets) > 0 else 0 | |
| avg_likes_without_media = no_media_tweets['Likes'].mean() if len(no_media_tweets) > 0 else 0 | |
| else: | |
| tweets_with_media = media_usage_percentage = 0 | |
| avg_likes_with_media = avg_likes_without_media = 0 | |
| # Hashtag and mention analysis | |
| if 'Hashtags' in user_tweets.columns: | |
| # Count hashtags from the Hashtags field (comma-separated string) | |
| hashtag_counts = user_tweets['Hashtags'].astype(str).apply(lambda x: len([h.strip() for h in x.split(',') if h.strip()])) | |
| total_hashtags_used = hashtag_counts.sum() | |
| avg_hashtags_per_tweet = hashtag_counts.mean() | |
| tweets_with_hashtags_percentage = ((hashtag_counts > 0).sum() / tweet_count * 100) if tweet_count > 0 else 0 | |
| elif 'Hashtag_Count' in user_tweets.columns: | |
| # Fallback to Hashtag_Count if available | |
| total_hashtags_used = user_tweets['Hashtag_Count'].sum() | |
| avg_hashtags_per_tweet = user_tweets['Hashtag_Count'].mean() | |
| tweets_with_hashtags_percentage = ((user_tweets['Hashtag_Count'] > 0).sum() / tweet_count * 100) if tweet_count > 0 else 0 | |
| else: | |
| total_hashtags_used = avg_hashtags_per_tweet = tweets_with_hashtags_percentage = 0 | |
| if 'Mentions' in user_tweets.columns: | |
| # Count mentions from the Mentions field (comma-separated string) | |
| mention_counts = user_tweets['Mentions'].astype(str).apply(lambda x: len([m.strip() for m in x.split(',') if m.strip()])) | |
| total_mentions_used = mention_counts.sum() | |
| avg_mentions_per_tweet = mention_counts.mean() | |
| elif 'Mention_Count' in user_tweets.columns: | |
| # Fallback to Mention_Count if available | |
| total_mentions_used = user_tweets['Mention_Count'].sum() | |
| avg_mentions_per_tweet = user_tweets['Mention_Count'].mean() | |
| else: | |
| total_mentions_used = avg_mentions_per_tweet = 0 | |
| # Timing and activity patterns | |
| if 'Hour' in user_tweets.columns: | |
| most_active_hour = user_tweets['Hour'].mode().values[0] if len(user_tweets['Hour'].mode()) > 0 else 0 | |
| hourly_distribution = user_tweets['Hour'].value_counts().head(3).to_dict() | |
| else: | |
| most_active_hour = 0 | |
| hourly_distribution = {} | |
| if 'Day_of_Week' in user_tweets.columns: | |
| most_active_day = user_tweets['Day_of_Week'].mode().values[0] if len(user_tweets['Day_of_Week'].mode()) > 0 else "Unknown" | |
| else: | |
| most_active_day = "Unknown" | |
| # Performance metrics | |
| if 'Likes' in user_tweets.columns and not user_tweets.empty: | |
| highest_likes = user_tweets['Likes'].max() | |
| top_tweet_idx = user_tweets['Likes'].idxmax() | |
| top_tweet_text = user_tweets.loc[top_tweet_idx, 'Text'][:100] + "..." if 'Text' in user_tweets.columns else "" | |
| top_tweet_url = user_tweets.loc[top_tweet_idx, 'URL'] if 'URL' in user_tweets.columns else "" | |
| # Viral content (top 10% threshold) | |
| viral_threshold = user_tweets['Likes'].quantile(0.9) | |
| viral_tweets_count = (user_tweets['Likes'] > viral_threshold).sum() | |
| viral_content_percentage = (viral_tweets_count / tweet_count * 100) if tweet_count > 0 else 0 | |
| else: | |
| highest_likes = viral_tweets_count = viral_content_percentage = 0 | |
| top_tweet_text = top_tweet_url = "" | |
| # Audience engagement ratios | |
| like_to_view_ratio = (likes_count / views_count * 100) if views_count > 0 else 0 | |
| retweet_to_like_ratio = (repost_count / likes_count * 100) if likes_count > 0 else 0 | |
| reply_to_like_ratio = (reply_count / likes_count * 100) if likes_count > 0 else 0 | |
| # Engagement score (weighted: likes=1, retweets=2, replies=3) | |
| total_engagement = likes_count + repost_count + reply_count | |
| engagement_score = (likes_count * 1 + repost_count * 2 + reply_count * 3) / tweet_count if tweet_count > 0 else 0 | |
| return { | |
| # Basic metrics | |
| "likes_count": int(likes_count), | |
| "views_count": int(views_count), | |
| "reply_count": int(reply_count), | |
| "repost_count": int(repost_count), | |
| # Content quality metrics | |
| "avg_likes_per_tweet": round(avg_likes_per_tweet, 1), | |
| "avg_views_per_tweet": round(avg_views_per_tweet, 1), | |
| "avg_engagement_rate": round(avg_engagement_rate, 2), | |
| "avg_tweet_length": round(avg_tweet_length, 1), | |
| "longest_tweet_length": int(longest_tweet_length), | |
| "shortest_tweet_length": int(shortest_tweet_length), | |
| # Media usage metrics | |
| "tweets_with_media_count": int(tweets_with_media), | |
| "media_usage_percentage": round(media_usage_percentage, 1), | |
| "avg_likes_with_media": round(avg_likes_with_media, 1), | |
| "avg_likes_without_media": round(avg_likes_without_media, 1), | |
| # Hashtag and mention metrics | |
| "total_hashtags_used": int(total_hashtags_used), | |
| "avg_hashtags_per_tweet": round(avg_hashtags_per_tweet, 1), | |
| "tweets_with_hashtags_percentage": round(tweets_with_hashtags_percentage, 1), | |
| "total_mentions_used": int(total_mentions_used), | |
| "avg_mentions_per_tweet": round(avg_mentions_per_tweet, 1), | |
| # Activity patterns | |
| "most_active_hour": int(most_active_hour), | |
| "most_active_day": str(most_active_day), | |
| "top_activity_hours": list(hourly_distribution.keys())[:3], | |
| # Performance metrics | |
| "highest_likes": int(highest_likes), | |
| "top_tweet_text": str(top_tweet_text), | |
| "top_tweet_url": str(top_tweet_url), | |
| "viral_tweets_count": int(viral_tweets_count), | |
| "viral_content_percentage": round(viral_content_percentage, 1), | |
| # Engagement ratios | |
| "like_to_view_ratio": round(like_to_view_ratio, 2), | |
| "retweet_to_like_ratio": round(retweet_to_like_ratio, 2), | |
| "reply_to_like_ratio": round(reply_to_like_ratio, 2), | |
| "engagement_score": round(engagement_score, 1), | |
| "total_engagement": int(total_engagement), | |
| } | |
| def _get_empty_metrics(self) -> Dict: | |
| """Return empty metrics structure.""" | |
| return { | |
| # Basic metrics | |
| "likes_count": 0, "views_count": 0, "reply_count": 0, "repost_count": 0, | |
| # Content quality metrics | |
| "avg_likes_per_tweet": 0, "avg_views_per_tweet": 0, "avg_engagement_rate": 0, | |
| "avg_tweet_length": 0, "longest_tweet_length": 0, "shortest_tweet_length": 0, | |
| # Media usage metrics | |
| "tweets_with_media_count": 0, "media_usage_percentage": 0, | |
| "avg_likes_with_media": 0, "avg_likes_without_media": 0, | |
| # Hashtag and mention metrics | |
| "total_hashtags_used": 0, "avg_hashtags_per_tweet": 0, "tweets_with_hashtags_percentage": 0, | |
| "total_mentions_used": 0, "avg_mentions_per_tweet": 0, | |
| # Activity patterns | |
| "most_active_hour": 0, "most_active_day": "Unknown", "top_activity_hours": [], | |
| # Performance metrics | |
| "highest_likes": 0, "top_tweet_text": "", "top_tweet_url": "", | |
| "viral_tweets_count": 0, "viral_content_percentage": 0, | |
| # Engagement ratios | |
| "like_to_view_ratio": 0, "retweet_to_like_ratio": 0, "reply_to_like_ratio": 0, | |
| "engagement_score": 0, "total_engagement": 0, | |
| } | |
| def _is_mock_tweet(self, item: Dict) -> bool: | |
| """Detect if a tweet is mock/invalid data that should be ignored.""" | |
| # Check for missing essential fields that real tweets should have | |
| essential_fields = ['createdAt', 'text', 'author'] | |
| missing_fields = sum(1 for field in essential_fields if not item.get(field)) | |
| # If missing multiple essential fields, likely mock data | |
| if missing_fields >= 2: | |
| return True | |
| # Check for empty or placeholder text | |
| text = item.get("text", "").strip() | |
| if not text or text.lower() in ["", "null", "undefined", "test", "placeholder"]: | |
| return True | |
| # Check for missing or empty author data | |
| author = item.get("author", {}) | |
| if not author or not author.get("userName", "").strip(): | |
| return True | |
| # Check for obviously fake/test usernames | |
| username = author.get("userName", "").lower() | |
| test_patterns = ["test", "mock", "fake", "placeholder", "example"] | |
| if any(pattern in username for pattern in test_patterns): | |
| return True | |
| return False | |
| def _process_single_tweet(self, item: Dict, hashtags_counter: Counter, | |
| mentions_counter: Counter, all_author_data: List, target_username: str = None) -> Optional[Dict]: | |
| """Process a single tweet item.""" | |
| # Extract author data | |
| author = item.get("author", {}) | |
| if author: | |
| # Only collect author data from the target user if target_username is specified | |
| # This prevents random accounts from being saved in replies data | |
| if target_username: | |
| author_username = author.get("userName", "").lower() | |
| if author_username == target_username.lower(): | |
| all_author_data.append(author) | |
| else: | |
| all_author_data.append(author) | |
| # Check if this is a mock/invalid tweet (has minimal or no real data) | |
| is_mock_tweet = self._is_mock_tweet(item) | |
| # Validate date information | |
| created_at = item.get("createdAt", "") | |
| if not created_at: | |
| # Only show warning for real tweets missing dates, and only in debug mode | |
| if not is_mock_tweet and st.session_state.get('debug_mode', False): | |
| st.warning("Skipping a tweet due to missing date information") | |
| return None | |
| # Parse date | |
| try: | |
| date_obj_utc = datetime.strptime(created_at, TWITTER_DATE_FORMAT) | |
| date_obj_ist = convert_to_ist(date_obj_utc) | |
| except ValueError as e: | |
| # Only log/warn for real tweets with invalid dates | |
| if not is_mock_tweet: | |
| if st.session_state.get('debug_mode', False): | |
| st.warning(f"Skipping tweet due to invalid date format: {created_at}") | |
| logger.warning(f"Invalid date format: {created_at}") | |
| return None | |
| # Extract text and analyze | |
| text = item.get("text", "") | |
| hashtags = [word.strip("#") for word in text.split() if word.startswith('#')] | |
| mentions = [word.strip("@") for word in text.split() if word.startswith('@')] | |
| # Update counters | |
| hashtags_counter.update(hashtags) | |
| mentions_counter.update(mentions) | |
| return { | |
| "Date": date_obj_ist.strftime("%Y-%m-%d %H:%M:%S"), | |
| "Date_Only": date_obj_ist.strftime("%Y-%m-%d"), | |
| "Hour": date_obj_ist.hour, | |
| "Day_of_Week": date_obj_ist.strftime("%A"), | |
| "Username": author.get("userName", ""), | |
| "Text": text, | |
| "Likes": item.get("likeCount", 0), | |
| "Retweets": item.get("retweetCount", 0), | |
| "Replies": item.get("replyCount", 0), | |
| "Views": item.get("viewCount", 0), | |
| "URL": item.get("url", ""), | |
| "Has_Media": "extendedEntities" in item, | |
| "Hashtags": ", ".join(hashtags), | |
| "Mentions": ", ".join(mentions), | |
| } | |
| def _extract_best_account_details(self, all_author_data: List[Dict], target_username: str = None) -> Dict: | |
| """Extract the most complete account details from author data.""" | |
| if not all_author_data: | |
| # If no author data and we have a target username, create a basic structure | |
| if target_username: | |
| return { | |
| "name": target_username, | |
| "username": target_username, | |
| "bio": "", | |
| "followers_count": 0, | |
| "following_count": 0, | |
| "tweet_count": 0, | |
| "verified": False, | |
| "profile_image_url": "" | |
| } | |
| return {} | |
| # Find the author data with the most complete information | |
| best_author = self._find_most_complete_author(all_author_data) | |
| # Debug information | |
| if st.session_state.get('debug_mode', False): | |
| st.write("Debug - Found", len(all_author_data), "author objects") | |
| st.write("Debug - Best author data keys:", list(best_author.keys())) | |
| st.write("Debug - Best author data sample:", { | |
| k: v for k, v in best_author.items() | |
| if k in ['name', 'userName', 'followers', 'following', 'statusesCount'] | |
| }) | |
| return self._standardize_account_details(best_author) | |
| def _find_most_complete_author(self, all_author_data: List[Dict]) -> Dict: | |
| """Find the author data object with the most complete information.""" | |
| best_author = {} | |
| best_score = -1 | |
| for author in all_author_data: | |
| score = self._calculate_author_completeness_score(author) | |
| if score > best_score: | |
| best_score = score | |
| best_author = author | |
| return best_author if best_score > 0 else (all_author_data[0] if all_author_data else {}) | |
| def _calculate_author_completeness_score(self, author: Dict) -> int: | |
| """Calculate completeness score for author data.""" | |
| score = 0 | |
| # Check for follower metrics (high priority) | |
| followers = (author.get("followers") or author.get("followersCount") or | |
| author.get("followers_count") or | |
| author.get("publicMetrics", {}).get("followers_count") or | |
| safe_get_nested(author, ["publicMetrics", "followers_count"]) or | |
| safe_get_nested(author, ["public_metrics", "followers_count"]) or 0) | |
| if followers > 0: | |
| score += 3 | |
| following = (author.get("following") or author.get("followingCount") or | |
| author.get("following_count") or author.get("friends_count") or | |
| author.get("publicMetrics", {}).get("following_count") or | |
| safe_get_nested(author, ["publicMetrics", "following_count"]) or | |
| safe_get_nested(author, ["public_metrics", "following_count"]) or 0) | |
| if following > 0: | |
| score += 2 | |
| tweet_count = (author.get("statusesCount") or author.get("statuses_count") or | |
| author.get("tweet_count") or | |
| author.get("publicMetrics", {}).get("tweet_count") or | |
| safe_get_nested(author, ["publicMetrics", "tweet_count"]) or | |
| safe_get_nested(author, ["public_metrics", "tweet_count"]) or 0) | |
| if tweet_count > 0: | |
| score += 2 | |
| # Check for profile information (lower priority) | |
| if author.get("description") or author.get("profile_bio"): | |
| score += 1 | |
| if author.get("verified") or author.get("isVerified"): | |
| score += 1 | |
| return score | |
| def _convert_to_ist_format(self, twitter_date_str: str) -> str: | |
| """Convert Twitter date string to IST format.""" | |
| if not twitter_date_str or twitter_date_str == "": | |
| return "" | |
| try: | |
| # Parse the Twitter date format: "Mon Jul 08 09:31:59 +0000 2013" | |
| utc_dt = datetime.strptime(twitter_date_str, TWITTER_DATE_FORMAT) | |
| # Convert to IST | |
| ist_tz = pytz.timezone(IST_TIMEZONE) | |
| ist_dt = utc_dt.astimezone(ist_tz) | |
| # Format as a more readable IST date | |
| # Format: "8 July 2013, 3:01 PM IST" | |
| formatted_date = ist_dt.strftime("%d %B %Y, %I:%M %p IST") | |
| return formatted_date | |
| except ValueError: | |
| # If parsing fails, return the original string | |
| return twitter_date_str | |
| def _standardize_account_details(self, author_data: Dict) -> Dict: | |
| """Standardize account details from various possible field names.""" | |
| # Debug: Print raw author data keys (only in debug mode) | |
| if st.session_state.get('debug_mode', False): | |
| st.write(f"Debug - Author data keys: {list(author_data.keys())}") | |
| # Try multiple possible field names for metrics with additional variations | |
| followers_count = ( | |
| author_data.get("followers") or | |
| author_data.get("followersCount") or | |
| author_data.get("followers_count") or | |
| author_data.get("publicMetrics", {}).get("followers_count") or | |
| safe_get_nested(author_data, ["publicMetrics", "followers_count"]) or | |
| safe_get_nested(author_data, ["public_metrics", "followers_count"]) or | |
| 0 | |
| ) | |
| following_count = ( | |
| author_data.get("following") or | |
| author_data.get("followingCount") or | |
| author_data.get("following_count") or | |
| author_data.get("friends_count") or | |
| author_data.get("publicMetrics", {}).get("following_count") or | |
| safe_get_nested(author_data, ["publicMetrics", "following_count"]) or | |
| safe_get_nested(author_data, ["public_metrics", "following_count"]) or | |
| 0 | |
| ) | |
| tweet_count = ( | |
| author_data.get("statusesCount") or | |
| author_data.get("statuses_count") or | |
| author_data.get("tweet_count") or | |
| author_data.get("publicMetrics", {}).get("tweet_count") or | |
| safe_get_nested(author_data, ["publicMetrics", "tweet_count"]) or | |
| safe_get_nested(author_data, ["public_metrics", "tweet_count"]) or | |
| 0 | |
| ) | |
| # Extract account creation date | |
| raw_create_date = ( | |
| author_data.get("createdAt") or | |
| author_data.get("created_at") or | |
| author_data.get("account_create_date") or | |
| "" | |
| ) | |
| # Convert to IST format if we have a valid date | |
| account_create_date = self._convert_to_ist_format(raw_create_date) | |
| return { | |
| "name": author_data.get("name", ""), | |
| "username": author_data.get("userName", "") or author_data.get("username", ""), | |
| "bio": author_data.get("description", "") or author_data.get("bio", ""), | |
| "followers_count": followers_count, | |
| "following_count": following_count, | |
| "tweet_count": tweet_count, | |
| "verified": author_data.get("verified", False) or author_data.get("isVerified", False), | |
| "profile_image_url": author_data.get("profileImageUrl", "") or author_data.get("profile_image_url", ""), | |
| "account_create_date": account_create_date, | |
| # Engagement metrics will be calculated from tweet data and added later | |
| "likes_count": 0, | |
| "views_count": 0, | |
| "reply_count": 0, | |
| "repost_count": 0, | |
| } | |
| # ============================================================================= | |
| # UI COMPONENTS | |
| # ============================================================================= | |
| class UIComponents: | |
| """Reusable UI components for the dashboard.""" | |
| def display_account_info(account_details: Dict) -> None: | |
| """Display account information section.""" | |
| if not account_details: | |
| return | |
| st.subheader(f"π€ Account: @{account_details['username']}") | |
| # Profile image | |
| if account_details.get('profile_image_url'): | |
| st.image(account_details['profile_image_url'], width=80) | |
| # Account name and verification | |
| verification_badge = 'β ' if account_details.get('verified') else '' | |
| st.markdown(f"**{account_details.get('name')}** {verification_badge}") | |
| # Bio | |
| if account_details.get('bio'): | |
| st.caption(account_details.get('bio')) | |
| # Metrics | |
| UIComponents._display_account_metrics(account_details) | |
| st.divider() | |
| def _display_account_metrics(account_details: Dict) -> None: | |
| """Display account metrics (followers, following, posts).""" | |
| # Account creation date | |
| create_date = account_details.get('account_create_date', '') | |
| if create_date: | |
| st.caption(f"π Account created: {create_date}") | |
| # Basic metrics | |
| m1, m2, m3 = st.columns(3) | |
| followers = account_details.get('followers_count', 0) | |
| following = account_details.get('following_count', 0) | |
| posts = account_details.get('tweet_count', 0) | |
| m1.metric( | |
| "Followers", | |
| format_large_number(followers), | |
| help="Follower count from Twitter API" | |
| ) | |
| m2.metric( | |
| "Following", | |
| format_large_number(following), | |
| help="Following count from Twitter API" | |
| ) | |
| m3.metric( | |
| "Total Posts", | |
| format_large_number(posts), | |
| help="Total tweet count from Twitter API" | |
| ) | |
| # Engagement metrics | |
| likes = account_details.get('likes_count', 0) | |
| views = account_details.get('views_count', 0) | |
| replies = account_details.get('reply_count', 0) | |
| reposts = account_details.get('repost_count', 0) | |
| if likes > 0 or views > 0 or replies > 0 or reposts > 0: | |
| st.caption("**π Total Engagement:**") | |
| e1, e2, e3, e4 = st.columns(4) | |
| e1.metric( | |
| "Likes", | |
| format_large_number(likes), | |
| help="Total likes count" | |
| ) | |
| e2.metric( | |
| "Views", | |
| format_large_number(views), | |
| help="Total views/impressions count" | |
| ) | |
| e3.metric( | |
| "Replies", | |
| format_large_number(replies), | |
| help="Total replies count" | |
| ) | |
| e4.metric( | |
| "Reposts", | |
| format_large_number(reposts), | |
| help="Total reposts/retweets count" | |
| ) | |
| # Advanced metrics sections | |
| UIComponents._display_content_quality_metrics(account_details) | |
| UIComponents._display_media_usage_metrics(account_details) | |
| UIComponents._display_activity_patterns(account_details) | |
| UIComponents._display_performance_metrics(account_details) | |
| UIComponents._display_engagement_ratios(account_details) | |
| # Warning for missing data | |
| if followers == 0 and following == 0 and posts == 0: | |
| st.warning("β οΈ Account metrics unavailable - this may be due to API limitations or account privacy settings") | |
| def _display_content_quality_metrics(account_details: Dict) -> None: | |
| """Display content quality metrics.""" | |
| avg_likes = account_details.get('avg_likes_per_tweet', 0) | |
| avg_views = account_details.get('avg_views_per_tweet', 0) | |
| engagement_rate = account_details.get('avg_engagement_rate', 0) | |
| avg_length = account_details.get('avg_tweet_length', 0) | |
| if avg_likes > 0 or avg_views > 0 or engagement_rate > 0: | |
| st.caption("**π Content Quality:**") | |
| q1, q2, q3, q4 = st.columns(4) | |
| q1.metric( | |
| "Avg Likes/Tweet", | |
| f"{avg_likes:.1f}", | |
| help="Average likes per tweet" | |
| ) | |
| q2.metric( | |
| "Avg Views/Tweet", | |
| format_large_number(int(avg_views)), | |
| help="Average views per tweet" | |
| ) | |
| q3.metric( | |
| "Engagement Rate", | |
| f"{engagement_rate:.1f}%", | |
| help="(Likes + Retweets) / Views * 100" | |
| ) | |
| q4.metric( | |
| "Avg Tweet Length", | |
| f"{avg_length:.0f} chars", | |
| help="Average character length per tweet" | |
| ) | |
| def _display_media_usage_metrics(account_details: Dict) -> None: | |
| """Display media usage metrics.""" | |
| media_count = account_details.get('tweets_with_media_count', 0) | |
| media_percentage = account_details.get('media_usage_percentage', 0) | |
| likes_with_media = account_details.get('avg_likes_with_media', 0) | |
| likes_without_media = account_details.get('avg_likes_without_media', 0) | |
| if media_count > 0 or media_percentage > 0: | |
| st.caption("**π¬ Media Usage:**") | |
| m1, m2, m3, m4 = st.columns(4) | |
| m1.metric( | |
| "Tweets with Media", | |
| f"{media_count}", | |
| help="Number of tweets with media attachments" | |
| ) | |
| m2.metric( | |
| "Media Usage", | |
| f"{media_percentage:.1f}%", | |
| help="Percentage of tweets with media" | |
| ) | |
| m3.metric( | |
| "Avg Likes (Media)", | |
| f"{likes_with_media:.1f}", | |
| help="Average likes for tweets with media" | |
| ) | |
| m4.metric( | |
| "Avg Likes (No Media)", | |
| f"{likes_without_media:.1f}", | |
| help="Average likes for tweets without media" | |
| ) | |
| def _display_activity_patterns(account_details: Dict) -> None: | |
| """Display activity pattern metrics.""" | |
| most_active_hour = account_details.get('most_active_hour', 0) | |
| most_active_day = account_details.get('most_active_day', 'Unknown') | |
| top_hours = account_details.get('top_activity_hours', []) | |
| if most_active_hour > 0 or most_active_day != 'Unknown': | |
| st.caption("**β° Activity Patterns:**") | |
| a1, a2, a3, a4 = st.columns(4) | |
| a1.metric( | |
| "Most Active Hour", | |
| f"{most_active_hour}:00", | |
| help="Hour of day with most tweets" | |
| ) | |
| a2.metric( | |
| "Most Active Day", | |
| most_active_day, | |
| help="Day of week with most tweets" | |
| ) | |
| a3.metric( | |
| "Top Hours", | |
| ", ".join([f"{h}:00" for h in top_hours[:2]]), | |
| help="Top active hours" | |
| ) | |
| # Hashtag and mention usage | |
| hashtags = account_details.get('total_hashtags_used', 0) | |
| mentions = account_details.get('total_mentions_used', 0) | |
| a4.metric( | |
| "Hashtags Used", | |
| f"{hashtags}", | |
| help="Total hashtags used in tweets" | |
| ) | |
| def _display_performance_metrics(account_details: Dict) -> None: | |
| """Display performance metrics.""" | |
| highest_likes = account_details.get('highest_likes', 0) | |
| viral_count = account_details.get('viral_tweets_count', 0) | |
| viral_percentage = account_details.get('viral_content_percentage', 0) | |
| top_tweet_text = account_details.get('top_tweet_text', '') | |
| top_tweet_url = account_details.get('top_tweet_url', '') | |
| if highest_likes > 0 or viral_count > 0: | |
| st.caption("**π Performance:**") | |
| p1, p2, p3, p4 = st.columns(4) | |
| p1.metric( | |
| "Highest Likes", | |
| format_large_number(highest_likes), | |
| help="Most likes on a single tweet" | |
| ) | |
| p2.metric( | |
| "Viral Tweets", | |
| f"{viral_count}", | |
| help="Tweets in top 10% by likes" | |
| ) | |
| p3.metric( | |
| "Viral Content %", | |
| f"{viral_percentage:.1f}%", | |
| help="Percentage of viral tweets" | |
| ) | |
| p4.metric( | |
| "Engagement Score", | |
| f"{account_details.get('engagement_score', 0):.1f}", | |
| help="Weighted engagement score (likesΓ1 + retweetsΓ2 + repliesΓ3)" | |
| ) | |
| # Show top tweet if available | |
| if top_tweet_text and top_tweet_url: | |
| st.caption("**π Top Performing Tweet:**") | |
| with st.expander("View top tweet"): | |
| st.write(f"**Likes:** {format_large_number(highest_likes)}") | |
| st.write(f"**Text:** {top_tweet_text}") | |
| st.write(f"**URL:** {top_tweet_url}") | |
| def _display_engagement_ratios(account_details: Dict) -> None: | |
| """Display engagement ratio metrics.""" | |
| like_to_view = account_details.get('like_to_view_ratio', 0) | |
| retweet_to_like = account_details.get('retweet_to_like_ratio', 0) | |
| reply_to_like = account_details.get('reply_to_like_ratio', 0) | |
| total_engagement = account_details.get('total_engagement', 0) | |
| if like_to_view > 0 or retweet_to_like > 0 or reply_to_like > 0: | |
| st.caption("**π Engagement Ratios:**") | |
| r1, r2, r3, r4 = st.columns(4) | |
| r1.metric( | |
| "Like Rate", | |
| f"{like_to_view:.2f}%", | |
| help="Likes per view percentage" | |
| ) | |
| r2.metric( | |
| "Retweet Rate", | |
| f"{retweet_to_like:.2f}%", | |
| help="Retweets per like percentage" | |
| ) | |
| r3.metric( | |
| "Reply Rate", | |
| f"{reply_to_like:.2f}%", | |
| help="Replies per like percentage" | |
| ) | |
| r4.metric( | |
| "Total Engagement", | |
| format_large_number(total_engagement), | |
| help="Total likes + retweets + replies" | |
| ) | |
| def display_key_metrics(df: pd.DataFrame) -> None: | |
| """Display key engagement metrics.""" | |
| if df.empty: | |
| return | |
| st.subheader("π Key Metrics") | |
| # Basic metrics | |
| c1, c2, c3 = st.columns(3) | |
| c1.metric("Total Tweets Scanned", f"{len(df):,}") | |
| c2.metric("Total Likes", f"{df['Likes'].sum():,}") | |
| c3.metric("Total Retweets", f"{df['Retweets'].sum():,}") | |
| # Engagement metrics | |
| st.subheader("β‘ Engagement") | |
| df_copy = df.copy() | |
| df_copy["Engagement"] = df_copy["Likes"] + df_copy["Retweets"] + df_copy["Replies"] | |
| total_engagement = df_copy["Engagement"].sum() | |
| avg_engagement = total_engagement / len(df) if len(df) > 0 else 0 | |
| total_views = df["Views"].sum() | |
| engagement_rate = (total_engagement / total_views * 100) if total_views > 0 else 0 | |
| e1, e2 = st.columns(2) | |
| e1.metric("Avg. Engagement/Tweet", f"{avg_engagement:.1f}") | |
| e2.metric("Engagement Rate (vs Views)", f"{engagement_rate:.2f}%") | |
| st.divider() | |
| def display_content_analysis(metrics: Dict) -> None: | |
| """Display content analysis section.""" | |
| st.subheader("π Content Analysis") | |
| top_hashtags = metrics.get("top_hashtags", []) | |
| top_mentions = metrics.get("top_mentions", []) | |
| if top_hashtags: | |
| st.markdown("**Top Hashtags**") | |
| st.write(", ".join([f"`#{tag}` ({count})" for tag, count in top_hashtags])) | |
| if top_mentions: | |
| st.markdown("**Top Mentions**") | |
| st.write(", ".join([f"`@{user}` ({count})" for user, count in top_mentions])) | |
| def display_ai_summary(gemini_summary: Optional[str]) -> None: | |
| """Display AI-generated summary section.""" | |
| if gemini_summary: | |
| st.subheader("π§ AI Summary & Recommendations") | |
| st.markdown(gemini_summary) | |
| st.divider() | |
| def display_most_engaging_tweet(df: pd.DataFrame) -> None: | |
| """Display the most engaging tweet.""" | |
| if df.empty: | |
| return | |
| st.subheader("π Most Engaging Tweet") | |
| df_copy = df.copy() | |
| df_copy["Engagement"] = df_copy["Likes"] + df_copy["Retweets"] + df_copy["Replies"] | |
| most_engaging = df_copy.loc[df_copy["Engagement"].idxmax()] | |
| with st.container(border=True): | |
| st.markdown(f"**{most_engaging['Text']}**") | |
| stats = (f"β€οΈ {most_engaging['Likes']} | π {most_engaging['Retweets']} | " | |
| f"π¬ {most_engaging['Replies']} | ποΈ {most_engaging['Views']}") | |
| st.markdown(f"**{stats}** | [{most_engaging['Date']}]({most_engaging['URL']})") | |
| st.divider() | |
| def display_charts(df: pd.DataFrame) -> None: | |
| """Display data visualization charts.""" | |
| if df.empty: | |
| return | |
| st.subheader("π Posting Patterns") | |
| # Tweets by day | |
| df_by_day = df.groupby('Date_Only')['Text'].count().reset_index() | |
| df_by_day['Date_Only'] = pd.to_datetime(df_by_day['Date_Only']) | |
| fig_day = px.line( | |
| df_by_day, | |
| x='Date_Only', | |
| y='Text', | |
| title="Tweets per Day", | |
| labels={'Date_Only': 'Date', 'Text': 'Count'} | |
| ) | |
| st.plotly_chart(fig_day, use_container_width=True) | |
| def display_data_download(df: pd.DataFrame) -> None: | |
| """Display raw data table with download option.""" | |
| st.subheader("π Raw Data") | |
| st.dataframe(df) | |
| if not df.empty: | |
| csv = df.to_csv(index=False).encode('utf-8') | |
| st.download_button( | |
| "π₯ Download as CSV", | |
| csv, | |
| f"twitter_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", | |
| "text/csv", | |
| key="download-csv", | |
| use_container_width=True | |
| ) | |
| # ============================================================================= | |
| # DASHBOARD MANAGEMENT | |
| # ============================================================================= | |
| class TwitterDashboard: | |
| """Main dashboard for displaying Twitter analysis results.""" | |
| def __init__(self, df: pd.DataFrame, metrics: Dict, dataset_id: str, | |
| analysis_type: str = "Account's Tweets", gemini_summary: Optional[str] = None): | |
| self.df = df | |
| self.metrics = metrics | |
| self.dataset_id = dataset_id | |
| self.analysis_type = analysis_type | |
| self.gemini_summary = gemini_summary | |
| def render(self) -> None: | |
| """Render the complete dashboard.""" | |
| if self.df.empty: | |
| st.warning("No data available to display.") | |
| return | |
| # Main layout | |
| left_col, right_col = st.columns([1, 1], gap="large") | |
| with left_col: | |
| # Only show account info for "Account's Tweets" analysis | |
| if self.analysis_type == "Account's Tweets": | |
| UIComponents.display_account_info(self.metrics.get("account_details", {})) | |
| else: | |
| # For "Comments to Account", show a different header | |
| st.subheader(f"π¬ Comments Analysis") | |
| st.info("Analyzing comments and replies directed to the account") | |
| st.divider() | |
| UIComponents.display_key_metrics(self.df) | |
| UIComponents.display_content_analysis(self.metrics) | |
| with right_col: | |
| UIComponents.display_ai_summary(self.gemini_summary) | |
| UIComponents.display_most_engaging_tweet(self.df) | |
| UIComponents.display_charts(self.df) | |
| # Full-width sections | |
| UIComponents.display_data_download(self.df) | |
| # ============================================================================= | |
| # SCHEDULER MANAGEMENT | |
| # ============================================================================= | |
| class SchedulerManager: | |
| """Manages scheduled users and automation settings.""" | |
| def __init__(self, db: DatabaseManager): | |
| self.db = db | |
| def render_controls(self) -> None: | |
| """Render scheduler management interface.""" | |
| st.header("π Scheduler Management") | |
| if not self.db.is_connected: | |
| st.warning("β οΈ Database not connected. Scheduler features unavailable.") | |
| return | |
| self._display_current_users() | |
| st.divider() | |
| self._display_add_user_form() | |
| st.divider() | |
| self._display_scheduler_info() | |
| def _display_current_users(self) -> None: | |
| """Display currently scheduled users.""" | |
| st.subheader("π Current Scheduled Users") | |
| try: | |
| scheduled_users = list(self.db.scheduler_users_collection.find({"active": True})) | |
| usernames = [user["username"] for user in scheduled_users] | |
| except Exception as e: | |
| st.error(f"Error fetching scheduled users: {e}") | |
| return | |
| if usernames: | |
| for username in usernames: | |
| col1, col2 = st.columns([3, 1]) | |
| with col1: | |
| st.write(f"@{username}") | |
| with col2: | |
| if st.button("ποΈ", key=f"remove_{username}", help=f"Remove @{username}"): | |
| if self._remove_user(username): | |
| st.rerun() | |
| else: | |
| st.info("No users currently scheduled.") | |
| def _display_add_user_form(self) -> None: | |
| """Display form to add new users.""" | |
| st.subheader("β Add New User") | |
| new_username = st.text_input("Username to schedule (without @)", key="new_scheduled_user") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("Add User", use_container_width=True, disabled=not new_username): | |
| if self._add_user(new_username): | |
| st.success(f"β Added @{new_username} to scheduler") | |
| st.rerun() | |
| with col2: | |
| if st.button("π Refresh List", use_container_width=True): | |
| st.rerun() | |
| def _display_scheduler_info(self) -> None: | |
| """Display scheduler information.""" | |
| st.subheader("βΉοΈ Scheduler Info") | |
| st.info(""" | |
| **GitHub Actions Automation:** | |
| - Runs daily at 12:00 AM IST automatically | |
| - Can be triggered manually from GitHub Actions tab | |
| - Scrapes only the previous day's data (no overlap) | |
| - Stores results in MongoDB with duplicate detection | |
| """) | |
| def _add_user(self, username: str) -> bool: | |
| """Add user to scheduled scraping list.""" | |
| try: | |
| # Check if user already exists | |
| existing_users = list(self.db.scheduler_users_collection.find({"active": True})) | |
| if username in [user["username"] for user in existing_users]: | |
| st.warning("User already scheduled") | |
| return False | |
| user_doc = { | |
| "username": username, | |
| "active": True, | |
| "added_at": datetime.utcnow(), | |
| "last_scraped": None | |
| } | |
| self.db.scheduler_users_collection.update_one( | |
| {"username": username}, | |
| {"$set": user_doc}, | |
| upsert=True | |
| ) | |
| return True | |
| except Exception as e: | |
| st.error(f"Error adding user: {e}") | |
| return False | |
| def _remove_user(self, username: str) -> bool: | |
| """Remove user from scheduled scraping list.""" | |
| try: | |
| self.db.scheduler_users_collection.update_one( | |
| {"username": username}, | |
| {"$set": {"active": False}} | |
| ) | |
| return True | |
| except Exception as e: | |
| st.error(f"Error removing user: {e}") | |
| return False | |
| # ============================================================================= | |
| # MAIN APPLICATION | |
| # ============================================================================= | |
| class TwitterAnalyzerApp: | |
| """Main Twitter Analyzer application.""" | |
| def __init__(self): | |
| self._setup_page() | |
| self._initialize_services() | |
| def _setup_page(self) -> None: | |
| """Configure Streamlit page settings.""" | |
| st.set_page_config(**PAGE_CONFIG) | |
| st.title("π¦ Twitter Content Analyzer") | |
| def _initialize_services(self) -> None: | |
| """Initialize all required services.""" | |
| try: | |
| self.config = AppConfig() | |
| self.db = DatabaseManager(self.config.mongodb_uri) | |
| self.apify = ApifyService(self.config.apify_api_key) | |
| self.gemini = GeminiService(self.config.gemini_api_key) if self.config.gemini_api_key else None | |
| self.processor = TweetDataProcessor() | |
| self.scheduler = SchedulerManager(self.db) | |
| except ValueError as e: | |
| st.error(f"Initialization failed: {e}. Please check your .env.local file.") | |
| st.stop() | |
| def run(self) -> None: | |
| """Execute the main application.""" | |
| self._render_sidebar() | |
| if not hasattr(self, 'run_button') or not self.run_button or not self.username: | |
| st.info("Please enter a Twitter username and click 'Analyze' to begin.") | |
| return | |
| self._perform_analysis() | |
| def _render_sidebar(self) -> None: | |
| """Render the application sidebar.""" | |
| with st.sidebar: | |
| self._render_analysis_controls() | |
| self._render_debug_options() | |
| st.divider() | |
| self.scheduler.render_controls() | |
| def _render_analysis_controls(self) -> None: | |
| """Render analysis control widgets.""" | |
| st.header("βοΈ Analysis Controls") | |
| self.analysis_type = st.radio( | |
| "Analysis Type", | |
| ["Account's Tweets", "Comments to Account"], | |
| horizontal=True | |
| ) | |
| self.username = st.text_input("Twitter Username (without @)", DEFAULT_USERNAME) | |
| # Date inputs | |
| today = datetime.now() | |
| last_week = today - timedelta(days=DEFAULT_DAYS_BACK) | |
| self.since_date = st.date_input("Start Date", last_week) | |
| self.until_date = st.date_input("End Date", today) | |
| self.run_button = st.button("π Analyze", use_container_width=True, type="primary") | |
| def _render_debug_options(self) -> None: | |
| """Render debug options.""" | |
| with st.expander("π§ Debug Options"): | |
| st.session_state['debug_mode'] = st.checkbox( | |
| "Show API Debug Info", | |
| help="Shows raw API data for troubleshooting" | |
| ) | |
| def _perform_analysis(self) -> None: | |
| """Perform the main analysis workflow.""" | |
| since_str = self.since_date.strftime("%Y-%m-%d") | |
| until_str = self.until_date.strftime("%Y-%m-%d") | |
| # Fetch data based on analysis type | |
| try: | |
| if self.analysis_type == "Account's Tweets": | |
| raw_data, dataset_id = self.apify.fetch_account_tweets(self.username, since_str, until_str) | |
| context = f"This is an analysis of tweets by the Twitter account @{self.username}." | |
| else: | |
| raw_data, dataset_id = self.apify.fetch_account_comments(self.username, since_str, until_str) | |
| context = f"This is an analysis of comments/replies sent to the Twitter account @{self.username}." | |
| if not raw_data: | |
| st.error("No data was returned from the API. The account may be private, have no tweets in the selected range, or there might be an API issue.") | |
| return | |
| # Process data | |
| df, metrics = self.processor.process_tweets(raw_data, self.username) | |
| # Generate AI summary if available | |
| gemini_summary = None | |
| if self.gemini: | |
| gemini_summary = self.gemini.generate_analysis(df.head(100), context) | |
| else: | |
| st.warning("GEMINI_API_KEY not found. AI summary will be skipped.") | |
| # Display results | |
| dashboard = TwitterDashboard(df, metrics, dataset_id, self.analysis_type, gemini_summary) | |
| dashboard.render() | |
| except Exception as e: | |
| logger.error(f"Analysis failed: {e}") | |
| st.error(f"Analysis failed: {str(e)}") | |
| # ============================================================================= | |
| # APPLICATION ENTRY POINT | |
| # ============================================================================= | |
| def main(): | |
| """Application entry point.""" | |
| app = TwitterAnalyzerApp() | |
| app.run() | |
| if __name__ == "__main__": | |
| main() | |