#!/usr/bin/env python3 """ Twitter Content Analyzer A comprehensive Twitter data collection and analysis tool with automated scheduling capabilities. """ import os import logging from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple, Any from collections import Counter import streamlit as st import pandas as pd import plotly.express as px import pytz from pymongo import MongoClient import google.generativeai as genai from apify_client import ApifyClient from dotenv import load_dotenv # ============================================================================= # CONSTANTS # ============================================================================= DEFAULT_USERNAME = "narendramodi" DEFAULT_DAYS_BACK = 7 IST_TIMEZONE = 'Asia/Kolkata' UTC_TIMEZONE = 'UTC' # Twitter API date format TWITTER_DATE_FORMAT = "%a %b %d %H:%M:%S %z %Y" # MongoDB collection names TWEETS_COLLECTION = "tweets" SCHEDULER_USERS_COLLECTION = "scheduler_users" # Streamlit page config PAGE_CONFIG = { "page_title": "Twitter Scraper & Analyzer", "page_icon": "🐦", "layout": "wide", "initial_sidebar_state": "expanded" } # ============================================================================= # LOGGING CONFIGURATION # ============================================================================= logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # ============================================================================= # UTILITY FUNCTIONS # ============================================================================= def convert_to_ist(utc_dt: datetime) -> datetime: """Convert UTC datetime to Indian Standard Time.""" if utc_dt.tzinfo is None: utc_dt = pytz.utc.localize(utc_dt) return utc_dt.astimezone(pytz.timezone(IST_TIMEZONE)) def safe_get_nested(data: Dict, keys: List[str], default=None): """Safely get nested dictionary values.""" for key in keys: if isinstance(data, dict) and key in data: data = data[key] else: return default return data def format_large_number(num: int) -> str: """Format large numbers with commas.""" return f"{num:,}" if num > 0 else "N/A" # ============================================================================= # CONFIGURATION MANAGEMENT # ============================================================================= class AppConfig: """Centralized configuration management.""" def __init__(self, env_path: str = ".env.local"): load_dotenv(dotenv_path=env_path) self._validate_config() @property def mongodb_uri(self) -> Optional[str]: return os.getenv("MONGODB_URI") @property def apify_api_key(self) -> Optional[str]: return os.getenv("APIFY_API_KEY") @property def gemini_api_key(self) -> Optional[str]: return os.getenv("GEMINI_API_KEY") def _validate_config(self) -> None: """Validate essential configuration.""" if not self.apify_api_key: raise ValueError("APIFY_API_KEY is required but not found in environment variables") # ============================================================================= # DATABASE MANAGEMENT # ============================================================================= class DatabaseManager: """Handles all MongoDB operations.""" def __init__(self, uri: Optional[str]): self.client = None self.db = None self.is_connected = False self._connect(uri) def _connect(self, uri: Optional[str]) -> None: """Establish MongoDB connection.""" if not uri: logger.warning("No MongoDB URI provided. Running in offline mode.") self._setup_dummy_collections() return try: self.client = MongoClient(uri, serverSelectionTimeoutMS=5000) self.client.admin.command('ping') self.db = self.client["DataCollector"] self.tweets_collection = self.db[TWEETS_COLLECTION] self.scheduler_users_collection = self.db[SCHEDULER_USERS_COLLECTION] self.is_connected = True logger.info("βœ… MongoDB connected successfully") except Exception as e: logger.error(f"⚠️ MongoDB connection failed: {e}") logger.info("πŸ”„ Running in offline mode - data will not be stored") self._setup_dummy_collections() def _setup_dummy_collections(self) -> None: """Setup dummy collections for offline mode.""" class DummyCollection: def update_one(self, *args, **kwargs): pass def find(self, *args, **kwargs): return [] def find_one(self, *args, **kwargs): return None def insert_one(self, *args, **kwargs): pass self.tweets_collection = DummyCollection() self.scheduler_users_collection = DummyCollection() self.is_connected = False # ============================================================================= # API SERVICES # ============================================================================= class ApifyService: """Handles Apify API interactions for Twitter data collection.""" ACTOR_ID = "CJdippxWmn9uRfooo" def __init__(self, api_key: str): self.client = ApifyClient(api_key) def _run_actor(self, run_input: Dict[str, Any]) -> Tuple[List[Dict], str]: """Execute Apify actor and retrieve dataset.""" try: run = self.client.actor(self.ACTOR_ID).call(run_input=run_input) dataset_id = run["defaultDatasetId"] data = list(self.client.dataset(dataset_id).iterate_items()) return data, dataset_id except Exception as e: logger.error(f"Apify actor execution failed: {e}") raise def fetch_account_tweets(self, username: str, since: str, until: str) -> Tuple[List[Dict], str]: """Fetch tweets posted by a specific account.""" # Handle both simple date (YYYY-MM-DD) and full timestamp (YYYY-MM-DD_HH:MM:SS) formats since_formatted = f"{since}_UTC" if "_" in since else f"{since}_00:00:00_UTC" until_formatted = f"{until}_UTC" if "_" in until else f"{until}_23:59:59_UTC" run_input = { "from": username.strip(), "since": since_formatted, "until": until_formatted, "queryType": "Latest", "include:nativeretweets": True, } with st.spinner(f"Fetching tweets for @{username} from {since} to {until}..."): data, dataset_id = self._run_actor(run_input) st.info(f"πŸ” Query Details: from:{username} | Raw results: {len(data)} tweets") return data, dataset_id def fetch_account_comments(self, username: str, since: str, until: str) -> Tuple[List[Dict], str]: """Fetch comments/replies directed to a specific account.""" # Handle both simple date (YYYY-MM-DD) and full timestamp (YYYY-MM-DD_HH:MM:SS) formats since_formatted = f"{since}_UTC" if "_" in since else f"{since}_00:00:00_UTC" until_formatted = f"{until}_UTC" if "_" in until else f"{until}_23:59:59_UTC" run_input = { "to": username.strip(), "since": since_formatted, "until": until_formatted, "queryType": "Latest", } with st.spinner(f"Fetching comments for @{username} from {since} to {until}..."): data, dataset_id = self._run_actor(run_input) st.info(f"πŸ” Query Details: to:@{username} | Raw results: {len(data)} comments") return data, dataset_id class GeminiService: """Handles Google Generative AI interactions.""" def __init__(self, api_key: str): genai.configure(api_key=api_key) self.model = genai.GenerativeModel('gemini-1.5-flash') def generate_analysis(self, tweets_df: pd.DataFrame, context: str) -> str: """Generate AI-powered analysis of tweets.""" if tweets_df.empty: return "No tweets provided for analysis." with st.spinner("Generating AI summary with Gemini..."): try: tweets_text = self._format_tweets_for_analysis(tweets_df) prompt = self._create_analysis_prompt(context, tweets_text) response = self.model.generate_content(prompt) return response.text except Exception as e: logger.error(f"Gemini analysis failed: {e}") return f"Error generating summary: {str(e)}" def _format_tweets_for_analysis(self, tweets_df: pd.DataFrame) -> str: """Format tweets for AI analysis.""" return "\n\n".join([ f"{i}. @{row.Username}: {row.Text} (Likes: {row.Likes}, Retweets: {row.Retweets})" for i, row in enumerate(tweets_df.itertuples(), 1) ]) def _create_analysis_prompt(self, context: str, tweets_text: str) -> str: """Create analysis prompt for Gemini.""" return f""" {context} Here are the tweets to analyze: {tweets_text} Please provide a comprehensive analysis covering: 1. **Main Themes & Topics:** What are the key subjects of discussion? 2. **Overall Sentiment:** What is the general tone (positive, negative, neutral)? 3. **Key Insights & Patterns:** Are there any notable trends or surprising findings? 4. **Top Recommendations:** Provide 5 actionable suggestions for the brand/party to improve their strategy based on this feedback. Format the response clearly using Markdown. """ # ============================================================================= # DATA PROCESSING # ============================================================================= class TweetDataProcessor: """Processes raw tweet data into structured format.""" def process_tweets(self, raw_data: List[Dict[str, Any]], target_username: str = None) -> Tuple[pd.DataFrame, Dict[str, Any]]: """Transform raw API data into clean DataFrame and metrics.""" processed_data = [] hashtags_counter = Counter() mentions_counter = Counter() all_author_data = [] skipped_count = 0 error_count = 0 for item in raw_data: try: processed_tweet = self._process_single_tweet(item, hashtags_counter, mentions_counter, all_author_data, target_username) if processed_tweet: processed_data.append(processed_tweet) else: skipped_count += 1 except Exception as e: error_count += 1 # Only log individual errors in debug mode if st.session_state.get('debug_mode', False): logger.warning(f"Skipping tweet due to processing error: {e}") st.warning(f"Skipping a tweet due to processing error: {e}") # Show summary of skipped items only if significant if skipped_count > 0 and st.session_state.get('debug_mode', False): st.info(f"ℹ️ Skipped {skipped_count} items (likely mock/invalid data)") if error_count > 0: st.warning(f"⚠️ {error_count} items had processing errors") # Extract best account details account_details = self._extract_best_account_details(all_author_data, target_username) # Create DataFrame and calculate engagement metrics from tweet data df = pd.DataFrame(processed_data) engagement_metrics = self._calculate_engagement_metrics(df, target_username) # Add engagement metrics to account_details if account_details: account_details.update(engagement_metrics) metrics = { "top_hashtags": hashtags_counter.most_common(5), "top_mentions": mentions_counter.most_common(5), "account_details": account_details } return df, metrics def _calculate_engagement_metrics(self, df: pd.DataFrame, target_username: str = None) -> Dict: """Calculate comprehensive engagement metrics from tweet data.""" if df.empty: return self._get_empty_metrics() # Filter to only tweets from the target user if specified if target_username: user_tweets = df[df['Username'].str.lower() == target_username.lower()] else: user_tweets = df if user_tweets.empty: return self._get_empty_metrics() # Basic engagement totals likes_count = user_tweets['Likes'].sum() if 'Likes' in user_tweets.columns else 0 views_count = user_tweets['Views'].sum() if 'Views' in user_tweets.columns else 0 reply_count = user_tweets['Replies'].sum() if 'Replies' in user_tweets.columns else 0 repost_count = user_tweets['Retweets'].sum() if 'Retweets' in user_tweets.columns else 0 tweet_count = len(user_tweets) # Content quality metrics avg_likes_per_tweet = likes_count / tweet_count if tweet_count > 0 else 0 avg_views_per_tweet = views_count / tweet_count if tweet_count > 0 else 0 avg_engagement_rate = ((likes_count + repost_count) / views_count * 100) if views_count > 0 else 0 # Content length analysis if 'Text' in user_tweets.columns: text_lengths = user_tweets['Text'].astype(str).str.len() avg_tweet_length = text_lengths.mean() longest_tweet_length = text_lengths.max() shortest_tweet_length = text_lengths.min() else: avg_tweet_length = longest_tweet_length = shortest_tweet_length = 0 # Media usage metrics if 'Has_Media' in user_tweets.columns: tweets_with_media = user_tweets['Has_Media'].sum() media_usage_percentage = (tweets_with_media / tweet_count * 100) if tweet_count > 0 else 0 # Media effectiveness media_tweets = user_tweets[user_tweets['Has_Media'] == True] no_media_tweets = user_tweets[user_tweets['Has_Media'] == False] avg_likes_with_media = media_tweets['Likes'].mean() if len(media_tweets) > 0 else 0 avg_likes_without_media = no_media_tweets['Likes'].mean() if len(no_media_tweets) > 0 else 0 else: tweets_with_media = media_usage_percentage = 0 avg_likes_with_media = avg_likes_without_media = 0 # Hashtag and mention analysis if 'Hashtags' in user_tweets.columns: # Count hashtags from the Hashtags field (comma-separated string) hashtag_counts = user_tweets['Hashtags'].astype(str).apply(lambda x: len([h.strip() for h in x.split(',') if h.strip()])) total_hashtags_used = hashtag_counts.sum() avg_hashtags_per_tweet = hashtag_counts.mean() tweets_with_hashtags_percentage = ((hashtag_counts > 0).sum() / tweet_count * 100) if tweet_count > 0 else 0 elif 'Hashtag_Count' in user_tweets.columns: # Fallback to Hashtag_Count if available total_hashtags_used = user_tweets['Hashtag_Count'].sum() avg_hashtags_per_tweet = user_tweets['Hashtag_Count'].mean() tweets_with_hashtags_percentage = ((user_tweets['Hashtag_Count'] > 0).sum() / tweet_count * 100) if tweet_count > 0 else 0 else: total_hashtags_used = avg_hashtags_per_tweet = tweets_with_hashtags_percentage = 0 if 'Mentions' in user_tweets.columns: # Count mentions from the Mentions field (comma-separated string) mention_counts = user_tweets['Mentions'].astype(str).apply(lambda x: len([m.strip() for m in x.split(',') if m.strip()])) total_mentions_used = mention_counts.sum() avg_mentions_per_tweet = mention_counts.mean() elif 'Mention_Count' in user_tweets.columns: # Fallback to Mention_Count if available total_mentions_used = user_tweets['Mention_Count'].sum() avg_mentions_per_tweet = user_tweets['Mention_Count'].mean() else: total_mentions_used = avg_mentions_per_tweet = 0 # Timing and activity patterns if 'Hour' in user_tweets.columns: most_active_hour = user_tweets['Hour'].mode().values[0] if len(user_tweets['Hour'].mode()) > 0 else 0 hourly_distribution = user_tweets['Hour'].value_counts().head(3).to_dict() else: most_active_hour = 0 hourly_distribution = {} if 'Day_of_Week' in user_tweets.columns: most_active_day = user_tweets['Day_of_Week'].mode().values[0] if len(user_tweets['Day_of_Week'].mode()) > 0 else "Unknown" else: most_active_day = "Unknown" # Performance metrics if 'Likes' in user_tweets.columns and not user_tweets.empty: highest_likes = user_tweets['Likes'].max() top_tweet_idx = user_tweets['Likes'].idxmax() top_tweet_text = user_tweets.loc[top_tweet_idx, 'Text'][:100] + "..." if 'Text' in user_tweets.columns else "" top_tweet_url = user_tweets.loc[top_tweet_idx, 'URL'] if 'URL' in user_tweets.columns else "" # Viral content (top 10% threshold) viral_threshold = user_tweets['Likes'].quantile(0.9) viral_tweets_count = (user_tweets['Likes'] > viral_threshold).sum() viral_content_percentage = (viral_tweets_count / tweet_count * 100) if tweet_count > 0 else 0 else: highest_likes = viral_tweets_count = viral_content_percentage = 0 top_tweet_text = top_tweet_url = "" # Audience engagement ratios like_to_view_ratio = (likes_count / views_count * 100) if views_count > 0 else 0 retweet_to_like_ratio = (repost_count / likes_count * 100) if likes_count > 0 else 0 reply_to_like_ratio = (reply_count / likes_count * 100) if likes_count > 0 else 0 # Engagement score (weighted: likes=1, retweets=2, replies=3) total_engagement = likes_count + repost_count + reply_count engagement_score = (likes_count * 1 + repost_count * 2 + reply_count * 3) / tweet_count if tweet_count > 0 else 0 return { # Basic metrics "likes_count": int(likes_count), "views_count": int(views_count), "reply_count": int(reply_count), "repost_count": int(repost_count), # Content quality metrics "avg_likes_per_tweet": round(avg_likes_per_tweet, 1), "avg_views_per_tweet": round(avg_views_per_tweet, 1), "avg_engagement_rate": round(avg_engagement_rate, 2), "avg_tweet_length": round(avg_tweet_length, 1), "longest_tweet_length": int(longest_tweet_length), "shortest_tweet_length": int(shortest_tweet_length), # Media usage metrics "tweets_with_media_count": int(tweets_with_media), "media_usage_percentage": round(media_usage_percentage, 1), "avg_likes_with_media": round(avg_likes_with_media, 1), "avg_likes_without_media": round(avg_likes_without_media, 1), # Hashtag and mention metrics "total_hashtags_used": int(total_hashtags_used), "avg_hashtags_per_tweet": round(avg_hashtags_per_tweet, 1), "tweets_with_hashtags_percentage": round(tweets_with_hashtags_percentage, 1), "total_mentions_used": int(total_mentions_used), "avg_mentions_per_tweet": round(avg_mentions_per_tweet, 1), # Activity patterns "most_active_hour": int(most_active_hour), "most_active_day": str(most_active_day), "top_activity_hours": list(hourly_distribution.keys())[:3], # Performance metrics "highest_likes": int(highest_likes), "top_tweet_text": str(top_tweet_text), "top_tweet_url": str(top_tweet_url), "viral_tweets_count": int(viral_tweets_count), "viral_content_percentage": round(viral_content_percentage, 1), # Engagement ratios "like_to_view_ratio": round(like_to_view_ratio, 2), "retweet_to_like_ratio": round(retweet_to_like_ratio, 2), "reply_to_like_ratio": round(reply_to_like_ratio, 2), "engagement_score": round(engagement_score, 1), "total_engagement": int(total_engagement), } def _get_empty_metrics(self) -> Dict: """Return empty metrics structure.""" return { # Basic metrics "likes_count": 0, "views_count": 0, "reply_count": 0, "repost_count": 0, # Content quality metrics "avg_likes_per_tweet": 0, "avg_views_per_tweet": 0, "avg_engagement_rate": 0, "avg_tweet_length": 0, "longest_tweet_length": 0, "shortest_tweet_length": 0, # Media usage metrics "tweets_with_media_count": 0, "media_usage_percentage": 0, "avg_likes_with_media": 0, "avg_likes_without_media": 0, # Hashtag and mention metrics "total_hashtags_used": 0, "avg_hashtags_per_tweet": 0, "tweets_with_hashtags_percentage": 0, "total_mentions_used": 0, "avg_mentions_per_tweet": 0, # Activity patterns "most_active_hour": 0, "most_active_day": "Unknown", "top_activity_hours": [], # Performance metrics "highest_likes": 0, "top_tweet_text": "", "top_tweet_url": "", "viral_tweets_count": 0, "viral_content_percentage": 0, # Engagement ratios "like_to_view_ratio": 0, "retweet_to_like_ratio": 0, "reply_to_like_ratio": 0, "engagement_score": 0, "total_engagement": 0, } def _is_mock_tweet(self, item: Dict) -> bool: """Detect if a tweet is mock/invalid data that should be ignored.""" # Check for missing essential fields that real tweets should have essential_fields = ['createdAt', 'text', 'author'] missing_fields = sum(1 for field in essential_fields if not item.get(field)) # If missing multiple essential fields, likely mock data if missing_fields >= 2: return True # Check for empty or placeholder text text = item.get("text", "").strip() if not text or text.lower() in ["", "null", "undefined", "test", "placeholder"]: return True # Check for missing or empty author data author = item.get("author", {}) if not author or not author.get("userName", "").strip(): return True # Check for obviously fake/test usernames username = author.get("userName", "").lower() test_patterns = ["test", "mock", "fake", "placeholder", "example"] if any(pattern in username for pattern in test_patterns): return True return False def _process_single_tweet(self, item: Dict, hashtags_counter: Counter, mentions_counter: Counter, all_author_data: List, target_username: str = None) -> Optional[Dict]: """Process a single tweet item.""" # Extract author data author = item.get("author", {}) if author: # Only collect author data from the target user if target_username is specified # This prevents random accounts from being saved in replies data if target_username: author_username = author.get("userName", "").lower() if author_username == target_username.lower(): all_author_data.append(author) else: all_author_data.append(author) # Check if this is a mock/invalid tweet (has minimal or no real data) is_mock_tweet = self._is_mock_tweet(item) # Validate date information created_at = item.get("createdAt", "") if not created_at: # Only show warning for real tweets missing dates, and only in debug mode if not is_mock_tweet and st.session_state.get('debug_mode', False): st.warning("Skipping a tweet due to missing date information") return None # Parse date try: date_obj_utc = datetime.strptime(created_at, TWITTER_DATE_FORMAT) date_obj_ist = convert_to_ist(date_obj_utc) except ValueError as e: # Only log/warn for real tweets with invalid dates if not is_mock_tweet: if st.session_state.get('debug_mode', False): st.warning(f"Skipping tweet due to invalid date format: {created_at}") logger.warning(f"Invalid date format: {created_at}") return None # Extract text and analyze text = item.get("text", "") hashtags = [word.strip("#") for word in text.split() if word.startswith('#')] mentions = [word.strip("@") for word in text.split() if word.startswith('@')] # Update counters hashtags_counter.update(hashtags) mentions_counter.update(mentions) return { "Date": date_obj_ist.strftime("%Y-%m-%d %H:%M:%S"), "Date_Only": date_obj_ist.strftime("%Y-%m-%d"), "Hour": date_obj_ist.hour, "Day_of_Week": date_obj_ist.strftime("%A"), "Username": author.get("userName", ""), "Text": text, "Likes": item.get("likeCount", 0), "Retweets": item.get("retweetCount", 0), "Replies": item.get("replyCount", 0), "Views": item.get("viewCount", 0), "URL": item.get("url", ""), "Has_Media": "extendedEntities" in item, "Hashtags": ", ".join(hashtags), "Mentions": ", ".join(mentions), } def _extract_best_account_details(self, all_author_data: List[Dict], target_username: str = None) -> Dict: """Extract the most complete account details from author data.""" if not all_author_data: # If no author data and we have a target username, create a basic structure if target_username: return { "name": target_username, "username": target_username, "bio": "", "followers_count": 0, "following_count": 0, "tweet_count": 0, "verified": False, "profile_image_url": "" } return {} # Find the author data with the most complete information best_author = self._find_most_complete_author(all_author_data) # Debug information if st.session_state.get('debug_mode', False): st.write("Debug - Found", len(all_author_data), "author objects") st.write("Debug - Best author data keys:", list(best_author.keys())) st.write("Debug - Best author data sample:", { k: v for k, v in best_author.items() if k in ['name', 'userName', 'followers', 'following', 'statusesCount'] }) return self._standardize_account_details(best_author) def _find_most_complete_author(self, all_author_data: List[Dict]) -> Dict: """Find the author data object with the most complete information.""" best_author = {} best_score = -1 for author in all_author_data: score = self._calculate_author_completeness_score(author) if score > best_score: best_score = score best_author = author return best_author if best_score > 0 else (all_author_data[0] if all_author_data else {}) def _calculate_author_completeness_score(self, author: Dict) -> int: """Calculate completeness score for author data.""" score = 0 # Check for follower metrics (high priority) followers = (author.get("followers") or author.get("followersCount") or author.get("followers_count") or author.get("publicMetrics", {}).get("followers_count") or safe_get_nested(author, ["publicMetrics", "followers_count"]) or safe_get_nested(author, ["public_metrics", "followers_count"]) or 0) if followers > 0: score += 3 following = (author.get("following") or author.get("followingCount") or author.get("following_count") or author.get("friends_count") or author.get("publicMetrics", {}).get("following_count") or safe_get_nested(author, ["publicMetrics", "following_count"]) or safe_get_nested(author, ["public_metrics", "following_count"]) or 0) if following > 0: score += 2 tweet_count = (author.get("statusesCount") or author.get("statuses_count") or author.get("tweet_count") or author.get("publicMetrics", {}).get("tweet_count") or safe_get_nested(author, ["publicMetrics", "tweet_count"]) or safe_get_nested(author, ["public_metrics", "tweet_count"]) or 0) if tweet_count > 0: score += 2 # Check for profile information (lower priority) if author.get("description") or author.get("profile_bio"): score += 1 if author.get("verified") or author.get("isVerified"): score += 1 return score def _convert_to_ist_format(self, twitter_date_str: str) -> str: """Convert Twitter date string to IST format.""" if not twitter_date_str or twitter_date_str == "": return "" try: # Parse the Twitter date format: "Mon Jul 08 09:31:59 +0000 2013" utc_dt = datetime.strptime(twitter_date_str, TWITTER_DATE_FORMAT) # Convert to IST ist_tz = pytz.timezone(IST_TIMEZONE) ist_dt = utc_dt.astimezone(ist_tz) # Format as a more readable IST date # Format: "8 July 2013, 3:01 PM IST" formatted_date = ist_dt.strftime("%d %B %Y, %I:%M %p IST") return formatted_date except ValueError: # If parsing fails, return the original string return twitter_date_str def _standardize_account_details(self, author_data: Dict) -> Dict: """Standardize account details from various possible field names.""" # Debug: Print raw author data keys (only in debug mode) if st.session_state.get('debug_mode', False): st.write(f"Debug - Author data keys: {list(author_data.keys())}") # Try multiple possible field names for metrics with additional variations followers_count = ( author_data.get("followers") or author_data.get("followersCount") or author_data.get("followers_count") or author_data.get("publicMetrics", {}).get("followers_count") or safe_get_nested(author_data, ["publicMetrics", "followers_count"]) or safe_get_nested(author_data, ["public_metrics", "followers_count"]) or 0 ) following_count = ( author_data.get("following") or author_data.get("followingCount") or author_data.get("following_count") or author_data.get("friends_count") or author_data.get("publicMetrics", {}).get("following_count") or safe_get_nested(author_data, ["publicMetrics", "following_count"]) or safe_get_nested(author_data, ["public_metrics", "following_count"]) or 0 ) tweet_count = ( author_data.get("statusesCount") or author_data.get("statuses_count") or author_data.get("tweet_count") or author_data.get("publicMetrics", {}).get("tweet_count") or safe_get_nested(author_data, ["publicMetrics", "tweet_count"]) or safe_get_nested(author_data, ["public_metrics", "tweet_count"]) or 0 ) # Extract account creation date raw_create_date = ( author_data.get("createdAt") or author_data.get("created_at") or author_data.get("account_create_date") or "" ) # Convert to IST format if we have a valid date account_create_date = self._convert_to_ist_format(raw_create_date) return { "name": author_data.get("name", ""), "username": author_data.get("userName", "") or author_data.get("username", ""), "bio": author_data.get("description", "") or author_data.get("bio", ""), "followers_count": followers_count, "following_count": following_count, "tweet_count": tweet_count, "verified": author_data.get("verified", False) or author_data.get("isVerified", False), "profile_image_url": author_data.get("profileImageUrl", "") or author_data.get("profile_image_url", ""), "account_create_date": account_create_date, # Engagement metrics will be calculated from tweet data and added later "likes_count": 0, "views_count": 0, "reply_count": 0, "repost_count": 0, } # ============================================================================= # UI COMPONENTS # ============================================================================= class UIComponents: """Reusable UI components for the dashboard.""" @staticmethod def display_account_info(account_details: Dict) -> None: """Display account information section.""" if not account_details: return st.subheader(f"πŸ‘€ Account: @{account_details['username']}") # Profile image if account_details.get('profile_image_url'): st.image(account_details['profile_image_url'], width=80) # Account name and verification verification_badge = 'βœ…' if account_details.get('verified') else '' st.markdown(f"**{account_details.get('name')}** {verification_badge}") # Bio if account_details.get('bio'): st.caption(account_details.get('bio')) # Metrics UIComponents._display_account_metrics(account_details) st.divider() @staticmethod def _display_account_metrics(account_details: Dict) -> None: """Display account metrics (followers, following, posts).""" # Account creation date create_date = account_details.get('account_create_date', '') if create_date: st.caption(f"πŸ“… Account created: {create_date}") # Basic metrics m1, m2, m3 = st.columns(3) followers = account_details.get('followers_count', 0) following = account_details.get('following_count', 0) posts = account_details.get('tweet_count', 0) m1.metric( "Followers", format_large_number(followers), help="Follower count from Twitter API" ) m2.metric( "Following", format_large_number(following), help="Following count from Twitter API" ) m3.metric( "Total Posts", format_large_number(posts), help="Total tweet count from Twitter API" ) # Engagement metrics likes = account_details.get('likes_count', 0) views = account_details.get('views_count', 0) replies = account_details.get('reply_count', 0) reposts = account_details.get('repost_count', 0) if likes > 0 or views > 0 or replies > 0 or reposts > 0: st.caption("**πŸ“Š Total Engagement:**") e1, e2, e3, e4 = st.columns(4) e1.metric( "Likes", format_large_number(likes), help="Total likes count" ) e2.metric( "Views", format_large_number(views), help="Total views/impressions count" ) e3.metric( "Replies", format_large_number(replies), help="Total replies count" ) e4.metric( "Reposts", format_large_number(reposts), help="Total reposts/retweets count" ) # Advanced metrics sections UIComponents._display_content_quality_metrics(account_details) UIComponents._display_media_usage_metrics(account_details) UIComponents._display_activity_patterns(account_details) UIComponents._display_performance_metrics(account_details) UIComponents._display_engagement_ratios(account_details) # Warning for missing data if followers == 0 and following == 0 and posts == 0: st.warning("⚠️ Account metrics unavailable - this may be due to API limitations or account privacy settings") @staticmethod def _display_content_quality_metrics(account_details: Dict) -> None: """Display content quality metrics.""" avg_likes = account_details.get('avg_likes_per_tweet', 0) avg_views = account_details.get('avg_views_per_tweet', 0) engagement_rate = account_details.get('avg_engagement_rate', 0) avg_length = account_details.get('avg_tweet_length', 0) if avg_likes > 0 or avg_views > 0 or engagement_rate > 0: st.caption("**πŸ“ˆ Content Quality:**") q1, q2, q3, q4 = st.columns(4) q1.metric( "Avg Likes/Tweet", f"{avg_likes:.1f}", help="Average likes per tweet" ) q2.metric( "Avg Views/Tweet", format_large_number(int(avg_views)), help="Average views per tweet" ) q3.metric( "Engagement Rate", f"{engagement_rate:.1f}%", help="(Likes + Retweets) / Views * 100" ) q4.metric( "Avg Tweet Length", f"{avg_length:.0f} chars", help="Average character length per tweet" ) @staticmethod def _display_media_usage_metrics(account_details: Dict) -> None: """Display media usage metrics.""" media_count = account_details.get('tweets_with_media_count', 0) media_percentage = account_details.get('media_usage_percentage', 0) likes_with_media = account_details.get('avg_likes_with_media', 0) likes_without_media = account_details.get('avg_likes_without_media', 0) if media_count > 0 or media_percentage > 0: st.caption("**🎬 Media Usage:**") m1, m2, m3, m4 = st.columns(4) m1.metric( "Tweets with Media", f"{media_count}", help="Number of tweets with media attachments" ) m2.metric( "Media Usage", f"{media_percentage:.1f}%", help="Percentage of tweets with media" ) m3.metric( "Avg Likes (Media)", f"{likes_with_media:.1f}", help="Average likes for tweets with media" ) m4.metric( "Avg Likes (No Media)", f"{likes_without_media:.1f}", help="Average likes for tweets without media" ) @staticmethod def _display_activity_patterns(account_details: Dict) -> None: """Display activity pattern metrics.""" most_active_hour = account_details.get('most_active_hour', 0) most_active_day = account_details.get('most_active_day', 'Unknown') top_hours = account_details.get('top_activity_hours', []) if most_active_hour > 0 or most_active_day != 'Unknown': st.caption("**⏰ Activity Patterns:**") a1, a2, a3, a4 = st.columns(4) a1.metric( "Most Active Hour", f"{most_active_hour}:00", help="Hour of day with most tweets" ) a2.metric( "Most Active Day", most_active_day, help="Day of week with most tweets" ) a3.metric( "Top Hours", ", ".join([f"{h}:00" for h in top_hours[:2]]), help="Top active hours" ) # Hashtag and mention usage hashtags = account_details.get('total_hashtags_used', 0) mentions = account_details.get('total_mentions_used', 0) a4.metric( "Hashtags Used", f"{hashtags}", help="Total hashtags used in tweets" ) @staticmethod def _display_performance_metrics(account_details: Dict) -> None: """Display performance metrics.""" highest_likes = account_details.get('highest_likes', 0) viral_count = account_details.get('viral_tweets_count', 0) viral_percentage = account_details.get('viral_content_percentage', 0) top_tweet_text = account_details.get('top_tweet_text', '') top_tweet_url = account_details.get('top_tweet_url', '') if highest_likes > 0 or viral_count > 0: st.caption("**πŸš€ Performance:**") p1, p2, p3, p4 = st.columns(4) p1.metric( "Highest Likes", format_large_number(highest_likes), help="Most likes on a single tweet" ) p2.metric( "Viral Tweets", f"{viral_count}", help="Tweets in top 10% by likes" ) p3.metric( "Viral Content %", f"{viral_percentage:.1f}%", help="Percentage of viral tweets" ) p4.metric( "Engagement Score", f"{account_details.get('engagement_score', 0):.1f}", help="Weighted engagement score (likesΓ—1 + retweetsΓ—2 + repliesΓ—3)" ) # Show top tweet if available if top_tweet_text and top_tweet_url: st.caption("**πŸ† Top Performing Tweet:**") with st.expander("View top tweet"): st.write(f"**Likes:** {format_large_number(highest_likes)}") st.write(f"**Text:** {top_tweet_text}") st.write(f"**URL:** {top_tweet_url}") @staticmethod def _display_engagement_ratios(account_details: Dict) -> None: """Display engagement ratio metrics.""" like_to_view = account_details.get('like_to_view_ratio', 0) retweet_to_like = account_details.get('retweet_to_like_ratio', 0) reply_to_like = account_details.get('reply_to_like_ratio', 0) total_engagement = account_details.get('total_engagement', 0) if like_to_view > 0 or retweet_to_like > 0 or reply_to_like > 0: st.caption("**πŸ“Š Engagement Ratios:**") r1, r2, r3, r4 = st.columns(4) r1.metric( "Like Rate", f"{like_to_view:.2f}%", help="Likes per view percentage" ) r2.metric( "Retweet Rate", f"{retweet_to_like:.2f}%", help="Retweets per like percentage" ) r3.metric( "Reply Rate", f"{reply_to_like:.2f}%", help="Replies per like percentage" ) r4.metric( "Total Engagement", format_large_number(total_engagement), help="Total likes + retweets + replies" ) @staticmethod def display_key_metrics(df: pd.DataFrame) -> None: """Display key engagement metrics.""" if df.empty: return st.subheader("πŸ“ˆ Key Metrics") # Basic metrics c1, c2, c3 = st.columns(3) c1.metric("Total Tweets Scanned", f"{len(df):,}") c2.metric("Total Likes", f"{df['Likes'].sum():,}") c3.metric("Total Retweets", f"{df['Retweets'].sum():,}") # Engagement metrics st.subheader("⚑ Engagement") df_copy = df.copy() df_copy["Engagement"] = df_copy["Likes"] + df_copy["Retweets"] + df_copy["Replies"] total_engagement = df_copy["Engagement"].sum() avg_engagement = total_engagement / len(df) if len(df) > 0 else 0 total_views = df["Views"].sum() engagement_rate = (total_engagement / total_views * 100) if total_views > 0 else 0 e1, e2 = st.columns(2) e1.metric("Avg. Engagement/Tweet", f"{avg_engagement:.1f}") e2.metric("Engagement Rate (vs Views)", f"{engagement_rate:.2f}%") st.divider() @staticmethod def display_content_analysis(metrics: Dict) -> None: """Display content analysis section.""" st.subheader("πŸ” Content Analysis") top_hashtags = metrics.get("top_hashtags", []) top_mentions = metrics.get("top_mentions", []) if top_hashtags: st.markdown("**Top Hashtags**") st.write(", ".join([f"`#{tag}` ({count})" for tag, count in top_hashtags])) if top_mentions: st.markdown("**Top Mentions**") st.write(", ".join([f"`@{user}` ({count})" for user, count in top_mentions])) @staticmethod def display_ai_summary(gemini_summary: Optional[str]) -> None: """Display AI-generated summary section.""" if gemini_summary: st.subheader("🧠 AI Summary & Recommendations") st.markdown(gemini_summary) st.divider() @staticmethod def display_most_engaging_tweet(df: pd.DataFrame) -> None: """Display the most engaging tweet.""" if df.empty: return st.subheader("🌟 Most Engaging Tweet") df_copy = df.copy() df_copy["Engagement"] = df_copy["Likes"] + df_copy["Retweets"] + df_copy["Replies"] most_engaging = df_copy.loc[df_copy["Engagement"].idxmax()] with st.container(border=True): st.markdown(f"**{most_engaging['Text']}**") stats = (f"❀️ {most_engaging['Likes']} | πŸ”„ {most_engaging['Retweets']} | " f"πŸ’¬ {most_engaging['Replies']} | πŸ‘οΈ {most_engaging['Views']}") st.markdown(f"**{stats}** | [{most_engaging['Date']}]({most_engaging['URL']})") st.divider() @staticmethod def display_charts(df: pd.DataFrame) -> None: """Display data visualization charts.""" if df.empty: return st.subheader("πŸ“… Posting Patterns") # Tweets by day df_by_day = df.groupby('Date_Only')['Text'].count().reset_index() df_by_day['Date_Only'] = pd.to_datetime(df_by_day['Date_Only']) fig_day = px.line( df_by_day, x='Date_Only', y='Text', title="Tweets per Day", labels={'Date_Only': 'Date', 'Text': 'Count'} ) st.plotly_chart(fig_day, use_container_width=True) @staticmethod def display_data_download(df: pd.DataFrame) -> None: """Display raw data table with download option.""" st.subheader("πŸ“Š Raw Data") st.dataframe(df) if not df.empty: csv = df.to_csv(index=False).encode('utf-8') st.download_button( "πŸ“₯ Download as CSV", csv, f"twitter_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", "text/csv", key="download-csv", use_container_width=True ) # ============================================================================= # DASHBOARD MANAGEMENT # ============================================================================= class TwitterDashboard: """Main dashboard for displaying Twitter analysis results.""" def __init__(self, df: pd.DataFrame, metrics: Dict, dataset_id: str, analysis_type: str = "Account's Tweets", gemini_summary: Optional[str] = None): self.df = df self.metrics = metrics self.dataset_id = dataset_id self.analysis_type = analysis_type self.gemini_summary = gemini_summary def render(self) -> None: """Render the complete dashboard.""" if self.df.empty: st.warning("No data available to display.") return # Main layout left_col, right_col = st.columns([1, 1], gap="large") with left_col: # Only show account info for "Account's Tweets" analysis if self.analysis_type == "Account's Tweets": UIComponents.display_account_info(self.metrics.get("account_details", {})) else: # For "Comments to Account", show a different header st.subheader(f"πŸ’¬ Comments Analysis") st.info("Analyzing comments and replies directed to the account") st.divider() UIComponents.display_key_metrics(self.df) UIComponents.display_content_analysis(self.metrics) with right_col: UIComponents.display_ai_summary(self.gemini_summary) UIComponents.display_most_engaging_tweet(self.df) UIComponents.display_charts(self.df) # Full-width sections UIComponents.display_data_download(self.df) # ============================================================================= # SCHEDULER MANAGEMENT # ============================================================================= class SchedulerManager: """Manages scheduled users and automation settings.""" def __init__(self, db: DatabaseManager): self.db = db def render_controls(self) -> None: """Render scheduler management interface.""" st.header("πŸ•’ Scheduler Management") if not self.db.is_connected: st.warning("⚠️ Database not connected. Scheduler features unavailable.") return self._display_current_users() st.divider() self._display_add_user_form() st.divider() self._display_scheduler_info() def _display_current_users(self) -> None: """Display currently scheduled users.""" st.subheader("πŸ“‹ Current Scheduled Users") try: scheduled_users = list(self.db.scheduler_users_collection.find({"active": True})) usernames = [user["username"] for user in scheduled_users] except Exception as e: st.error(f"Error fetching scheduled users: {e}") return if usernames: for username in usernames: col1, col2 = st.columns([3, 1]) with col1: st.write(f"@{username}") with col2: if st.button("πŸ—‘οΈ", key=f"remove_{username}", help=f"Remove @{username}"): if self._remove_user(username): st.rerun() else: st.info("No users currently scheduled.") def _display_add_user_form(self) -> None: """Display form to add new users.""" st.subheader("βž• Add New User") new_username = st.text_input("Username to schedule (without @)", key="new_scheduled_user") col1, col2 = st.columns(2) with col1: if st.button("Add User", use_container_width=True, disabled=not new_username): if self._add_user(new_username): st.success(f"βœ… Added @{new_username} to scheduler") st.rerun() with col2: if st.button("πŸ”„ Refresh List", use_container_width=True): st.rerun() def _display_scheduler_info(self) -> None: """Display scheduler information.""" st.subheader("ℹ️ Scheduler Info") st.info(""" **GitHub Actions Automation:** - Runs daily at 12:00 AM IST automatically - Can be triggered manually from GitHub Actions tab - Scrapes only the previous day's data (no overlap) - Stores results in MongoDB with duplicate detection """) def _add_user(self, username: str) -> bool: """Add user to scheduled scraping list.""" try: # Check if user already exists existing_users = list(self.db.scheduler_users_collection.find({"active": True})) if username in [user["username"] for user in existing_users]: st.warning("User already scheduled") return False user_doc = { "username": username, "active": True, "added_at": datetime.utcnow(), "last_scraped": None } self.db.scheduler_users_collection.update_one( {"username": username}, {"$set": user_doc}, upsert=True ) return True except Exception as e: st.error(f"Error adding user: {e}") return False def _remove_user(self, username: str) -> bool: """Remove user from scheduled scraping list.""" try: self.db.scheduler_users_collection.update_one( {"username": username}, {"$set": {"active": False}} ) return True except Exception as e: st.error(f"Error removing user: {e}") return False # ============================================================================= # MAIN APPLICATION # ============================================================================= class TwitterAnalyzerApp: """Main Twitter Analyzer application.""" def __init__(self): self._setup_page() self._initialize_services() def _setup_page(self) -> None: """Configure Streamlit page settings.""" st.set_page_config(**PAGE_CONFIG) st.title("🐦 Twitter Content Analyzer") def _initialize_services(self) -> None: """Initialize all required services.""" try: self.config = AppConfig() self.db = DatabaseManager(self.config.mongodb_uri) self.apify = ApifyService(self.config.apify_api_key) self.gemini = GeminiService(self.config.gemini_api_key) if self.config.gemini_api_key else None self.processor = TweetDataProcessor() self.scheduler = SchedulerManager(self.db) except ValueError as e: st.error(f"Initialization failed: {e}. Please check your .env.local file.") st.stop() def run(self) -> None: """Execute the main application.""" self._render_sidebar() if not hasattr(self, 'run_button') or not self.run_button or not self.username: st.info("Please enter a Twitter username and click 'Analyze' to begin.") return self._perform_analysis() def _render_sidebar(self) -> None: """Render the application sidebar.""" with st.sidebar: self._render_analysis_controls() self._render_debug_options() st.divider() self.scheduler.render_controls() def _render_analysis_controls(self) -> None: """Render analysis control widgets.""" st.header("βš™οΈ Analysis Controls") self.analysis_type = st.radio( "Analysis Type", ["Account's Tweets", "Comments to Account"], horizontal=True ) self.username = st.text_input("Twitter Username (without @)", DEFAULT_USERNAME) # Date inputs today = datetime.now() last_week = today - timedelta(days=DEFAULT_DAYS_BACK) self.since_date = st.date_input("Start Date", last_week) self.until_date = st.date_input("End Date", today) self.run_button = st.button("πŸš€ Analyze", use_container_width=True, type="primary") def _render_debug_options(self) -> None: """Render debug options.""" with st.expander("πŸ”§ Debug Options"): st.session_state['debug_mode'] = st.checkbox( "Show API Debug Info", help="Shows raw API data for troubleshooting" ) def _perform_analysis(self) -> None: """Perform the main analysis workflow.""" since_str = self.since_date.strftime("%Y-%m-%d") until_str = self.until_date.strftime("%Y-%m-%d") # Fetch data based on analysis type try: if self.analysis_type == "Account's Tweets": raw_data, dataset_id = self.apify.fetch_account_tweets(self.username, since_str, until_str) context = f"This is an analysis of tweets by the Twitter account @{self.username}." else: raw_data, dataset_id = self.apify.fetch_account_comments(self.username, since_str, until_str) context = f"This is an analysis of comments/replies sent to the Twitter account @{self.username}." if not raw_data: st.error("No data was returned from the API. The account may be private, have no tweets in the selected range, or there might be an API issue.") return # Process data df, metrics = self.processor.process_tweets(raw_data, self.username) # Generate AI summary if available gemini_summary = None if self.gemini: gemini_summary = self.gemini.generate_analysis(df.head(100), context) else: st.warning("GEMINI_API_KEY not found. AI summary will be skipped.") # Display results dashboard = TwitterDashboard(df, metrics, dataset_id, self.analysis_type, gemini_summary) dashboard.render() except Exception as e: logger.error(f"Analysis failed: {e}") st.error(f"Analysis failed: {str(e)}") # ============================================================================= # APPLICATION ENTRY POINT # ============================================================================= def main(): """Application entry point.""" app = TwitterAnalyzerApp() app.run() if __name__ == "__main__": main()