diff --git "a/src/streamlit_app.py" "b/src/streamlit_app.py" --- "a/src/streamlit_app.py" +++ "b/src/streamlit_app.py" @@ -1,1675 +1,926 @@ +#!/usr/bin/env python3 +""" +Twitter Content Analyzer +A comprehensive Twitter data collection and analysis tool with automated scheduling capabilities. +""" + import os +import logging +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Tuple, Any +from collections import Counter + import streamlit as st import pandas as pd -import json -from datetime import datetime, timedelta import plotly.express as px -import numpy as np -from collections import Counter import pytz from pymongo import MongoClient -import schedule -import threading -import time -#test -# Try to import Google Generative AI, but handle it gracefully if not installed -try: - import google.generativeai as genai - GENAI_AVAILABLE = True -except ImportError: - GENAI_AVAILABLE = False - +import google.generativeai as genai from apify_client import ApifyClient from dotenv import load_dotenv -# Set page config to wide mode with a custom title and icon -st.set_page_config( - page_title="Twitter Scraper", - page_icon="đŸĻ", - layout="wide", - initial_sidebar_state="collapsed" +# ============================================================================= +# CONSTANTS +# ============================================================================= + +DEFAULT_USERNAME = "narendramodi" +DEFAULT_DAYS_BACK = 7 +IST_TIMEZONE = 'Asia/Kolkata' +UTC_TIMEZONE = 'UTC' + +# Twitter API date format +TWITTER_DATE_FORMAT = "%a %b %d %H:%M:%S %z %Y" + +# MongoDB collection names +TWEETS_COLLECTION = "tweets" +SCHEDULER_USERS_COLLECTION = "scheduler_users" + +# Streamlit page config +PAGE_CONFIG = { + "page_title": "Twitter Scraper & Analyzer", + "page_icon": "đŸĻ", + "layout": "wide", + "initial_sidebar_state": "expanded" +} + +# ============================================================================= +# LOGGING CONFIGURATION +# ============================================================================= + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' ) +logger = logging.getLogger(__name__) + +# ============================================================================= +# UTILITY FUNCTIONS +# ============================================================================= + +def convert_to_ist(utc_dt: datetime) -> datetime: + """Convert UTC datetime to Indian Standard Time.""" + if utc_dt.tzinfo is None: + utc_dt = pytz.utc.localize(utc_dt) + return utc_dt.astimezone(pytz.timezone(IST_TIMEZONE)) + +def safe_get_nested(data: Dict, keys: List[str], default=None): + """Safely get nested dictionary values.""" + for key in keys: + if isinstance(data, dict) and key in data: + data = data[key] + else: + return default + return data -# Load environment variables from .env.local file specifically -load_dotenv(dotenv_path=".env.local") - -# Setup MongoDB connection -MONGODB_URI = os.getenv("MONGODB_URI") - -# Try to connect to MongoDB, but continue if it fails -try: - mongo_client = MongoClient(MONGODB_URI, serverSelectionTimeoutMS=5000) - # Test the connection - mongo_client.admin.command('ping') - mongo_db = mongo_client["DataCollector"] - tweets_collection = mongo_db["tweets"] - scheduler_users_collection = mongo_db["scheduler_users"] - MONGODB_AVAILABLE = True - print("✅ MongoDB connected successfully") -except Exception as e: - print(f"âš ī¸ MongoDB connection failed: {e}") - print("🔄 Running in offline mode - data will not be stored") - MONGODB_AVAILABLE = False - # Create dummy collections for offline mode - class DummyCollection: - def update_one(self, *args, **kwargs): - pass - def find(self, *args, **kwargs): - return [] - tweets_collection = DummyCollection() - scheduler_users_collection = DummyCollection() - -# Initialize the ApifyClient with your API token -client = ApifyClient(os.getenv("APIFY_API_KEY")) - -# Initialize Gemini API if available and the key is available -GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") -if GENAI_AVAILABLE and GEMINI_API_KEY: - genai.configure(api_key=GEMINI_API_KEY) +def format_large_number(num: int) -> str: + """Format large numbers with commas.""" + return f"{num:,}" if num > 0 else "N/A" -# Function to get summary from Gemini -def get_gemini_summary(tweets_data, context=""): - try: - if not GENAI_AVAILABLE: - return "Error: Google Generative AI package not installed. Run 'pip install google-generativeai' to install it." - - if not GEMINI_API_KEY: - return "Error: GEMINI_API_KEY not found. Please add it to your .env.local file." - - # Format the tweets data into a readable text - tweets_text = [] - for i, tweet in enumerate(tweets_data.itertuples(), 1): - tweet_str = f"{i}. @{tweet.Username}: {tweet.Text} (Likes: {tweet.Likes}, Retweets: {tweet.Retweets})" - tweets_text.append(tweet_str) - - all_tweets = "\n\n".join(tweets_text) - - # Create a prompt for Gemini with enhanced analysis requirements - prompt = f""" - {context} - - Here are the tweets to analyze: - - {all_tweets} - - Please provide a comprehensive analysis of these tweets, including: - - 1. Main themes and topics discussed - 2. Overall sentiment - 3. Key insights or patterns - 4. Most engaging content - - Additionally, please provide these specific analyses: - - 5. Political/Brand Affiliation Analysis: Analyze which party or brand the reply tweeters belong to. Identify if there are instances where people from the same party/brand are tweeting negatively about their own party/brand. - - 6. Top 10 Positive Tweets: List the most positive tweets with their tweet numbers and brief explanation. - - 7. Top 10 Negative Tweets: List the most negative tweets with their tweet numbers and brief explanation. - - 8. Top 10 Recommendations: Provide specific suggestions and recommendations to help the party or brand improve their messaging, engagement, or content strategy based on the tweet analysis. - - Format the analysis in a clear, structured way with bullet points where appropriate and clear section headings. - """ - - # Generate summary using Gemini - model = genai.GenerativeModel('gemini-2.5-flash-preview-04-17') - response = model.generate_content(prompt) - - return response.text - except Exception as e: - return f"Error generating summary: {str(e)}" +# ============================================================================= +# CONFIGURATION MANAGEMENT +# ============================================================================= -# Function to extract account details from API response -def extract_account_details(author_data): - """Extract comprehensive account details from author data""" - # If no data provided (None), return empty dict - if author_data is None: - return {} +class AppConfig: + """Centralized configuration management.""" - # Create account details with defaults for all fields - account_details = { - "user_id": author_data.get("id", ""), - "name": author_data.get("name", ""), - "username": author_data.get("userName", ""), - "bio": author_data.get("description", author_data.get("biography", "")), - "location": author_data.get("location", ""), - "website": author_data.get("url", ""), - "followers_count": author_data.get("followersCount", author_data.get("followers_count", author_data.get("followers", 0))), - "following_count": author_data.get("followingCount", author_data.get("following_count", author_data.get("following", 0))), - "tweet_count": author_data.get("statusesCount", author_data.get("tweet_count", 0)), - "listed_count": author_data.get("listedCount", author_data.get("listed_count", 0)), - "verified": author_data.get("verified", author_data.get("isVerified", author_data.get("isBlueVerified", False))), - "protected": author_data.get("protected", False), - "profile_image_url": author_data.get("profileImageUrl", author_data.get("profile_image_url", "")), - "profile_banner_url": author_data.get("profileBannerUrl", author_data.get("profile_banner_url", "")), - "created_at": author_data.get("createdAt", author_data.get("created_at", "")), - "favourites_count": author_data.get("favouritesCount", author_data.get("favourites_count", 0)), - "media_count": author_data.get("mediaCount", author_data.get("media_count", 0)) - } + def __init__(self, env_path: str = ".env.local"): + load_dotenv(dotenv_path=env_path) + self._validate_config() - return account_details - -def run_apify_comment_analysis(input): - # Prepare the Actor input with exact format for Comment Analysis - id = input["id"] - since_date = input["since"] - until_date = input.get("until", datetime.now().strftime("%Y-%m-%d")) # NEW: Add until date + @property + def mongodb_uri(self) -> Optional[str]: + return os.getenv("MONGODB_URI") - # ENHANCED: Improved query parameters for better comment capture - run_input = { - "@": id, - "filter:blue_verified": False, - "filter:consumer_video": False, - "filter:has_engagement": False, # Always False to get more comments - "filter:hashtags": False, - "filter:images": False, - "filter:links": False, - "filter:media": False, - "filter:mentions": False, - "filter:native_video": False, - "filter:nativeretweets": False, - "filter:news": False, - "filter:pro_video": False, - "filter:quote": False, - "filter:replies": False, # Keep false to get actual comments - "filter:safe": False, - "filter:spaces": False, - "filter:twimg": False, - "filter:verified": False, - "filter:videos": False, - "filter:vine": False, - "include:nativeretweets": False, - "since": since_date + "_00:00:00_UTC", - "to": id, - "until": until_date + "_23:59:59_UTC", - "queryType": "Latest", - "min_retweets": 0, - "min_faves": 0, - "min_replies": 0, - "-min_retweets": 0, - "-min_faves": 0, - "-min_replies": 0, - "sort": "time" # ADDED: Sort by time for chronological order - } - - # Show loading state - with st.spinner(f"Fetching comments from {since_date} to {until_date}..."): - # Run the Actor and wait for it to finish - run = client.actor("CJdippxWmn9uRfooo").call(run_input=run_input) - - # Fetch ALL data from the run's dataset (no maxItems limit) - data = list(client.dataset(run["defaultDatasetId"]).iterate_items()) - - # ENHANCED: Log query details for debugging - st.info(f"🔍 Query Details: to:@{id} since:{since_date} until:{until_date} | Raw results: {len(data)} comments") + @property + def apify_api_key(self) -> Optional[str]: + return os.getenv("APIFY_API_KEY") - return data, run["defaultDatasetId"] - -def run_apify_account_analysis(input, disable_engagement_filters=True): - # Prepare the Actor input with exact format for Account Analysis - username = input["username"] - since_date = input["since"] - until_date = input.get("until", datetime.now().strftime("%Y-%m-%d")) # NEW: Add until date - min_faves = input.get("min_faves", 0) # NEW: Configurable engagement - min_retweets = input.get("min_retweets", 0) # NEW: Configurable engagement - min_replies = input.get("min_replies", 0) # NEW: Configurable engagement + @property + def gemini_api_key(self) -> Optional[str]: + return os.getenv("GEMINI_API_KEY") - # ENHANCED: More comprehensive query parameters for better accuracy - run_input = { - "filter:blue_verified": False, - "filter:consumer_video": False, - "filter:has_engagement": False, # Always False for maximum tweet capture - "filter:hashtags": False, - "filter:images": False, - "filter:links": False, - "filter:media": False, - "filter:mentions": False, - "filter:native_video": False, - "filter:nativeretweets": False, # Include retweets for accurate count - "filter:news": False, - "filter:pro_video": False, - "filter:quote": False, - "filter:replies": False, # Include replies for accurate count - "filter:safe": False, - "filter:spaces": False, - "filter:twimg": False, - "filter:verified": False, - "filter:videos": False, - "filter:vine": False, - "from": username, - "include:nativeretweets": True, # CHANGED: Include retweets to match Twitter counts - "queryType": "Latest", - "since": since_date + "_00:00:00_UTC", - "until": until_date + "_23:59:59_UTC", - "min_faves": min_faves, - "min_retweets": min_retweets, - "min_replies": min_replies, - "-min_retweets": 0, - "-min_faves": 0, - "-min_replies": 0, - "sort": "time" # ADDED: Sort by time for chronological order - } + def _validate_config(self) -> None: + """Validate essential configuration.""" + if not self.apify_api_key: + raise ValueError("APIFY_API_KEY is required but not found in environment variables") - # Show loading state - with st.spinner(f"Fetching tweets from {since_date} to {until_date}..."): - # Run the Actor and wait for it to finish - run = client.actor("CJdippxWmn9uRfooo").call(run_input=run_input) +# ============================================================================= +# DATABASE MANAGEMENT +# ============================================================================= - # Fetch ALL data from the run's dataset (no maxItems limit) - data = list(client.dataset(run["defaultDatasetId"]).iterate_items()) - - # ENHANCED: Log query details for debugging - st.info(f"🔍 Query Details: from:{username} since:{since_date} until:{until_date} | Raw results: {len(data)} tweets") +class DatabaseManager: + """Handles all MongoDB operations.""" - return data, run["defaultDatasetId"] - -# Function to extract hashtags from tweet text -def extract_hashtags(text): - if not text: - return [] - - # Simple extraction - split by spaces and filter for hashtags - words = text.split() - hashtags = [word[1:] for word in words if word.startswith('#')] - return hashtags - -# Function to extract mentions from tweet text -def extract_mentions(text): - if not text: - return [] + def __init__(self, uri: Optional[str]): + self.client = None + self.db = None + self.is_connected = False + self._connect(uri) - # Simple extraction - split by spaces and filter for mentions - words = text.split() - mentions = [word[1:] for word in words if word.startswith('@')] - return mentions - -# Function to convert UTC time to Indian Standard Time (IST) -def convert_to_ist(utc_datetime): - if not utc_datetime: - return None - - # Create timezone objects - utc_tz = pytz.timezone('UTC') - ist_tz = pytz.timezone('Asia/Kolkata') - - # If datetime is naive, make it timezone-aware with UTC - if utc_datetime.tzinfo is None: - utc_datetime = utc_tz.localize(utc_datetime) - - # Convert to IST - ist_datetime = utc_datetime.astimezone(ist_tz) - return ist_datetime - -# Function to process tweet data and create dataframe - ENHANCED FOR ACCOUNT DETAILS -def process_tweet_data(data, extract_account_info=False): - processed_data = [] - all_hashtags = [] - all_mentions = [] - mock_data_detected = False - mock_data_signature = "From KaitoEasyAPI, a reminder:Our API pricing is based on the volume of data returned." - account_details = {} - - for item in data: - text = item.get("text", "") - if mock_data_signature in text: - mock_data_detected = True - continue # Skip this mock data tweet - + def _connect(self, uri: Optional[str]) -> None: + """Establish MongoDB connection.""" + if not uri: + logger.warning("No MongoDB URI provided. Running in offline mode.") + self._setup_dummy_collections() + return + try: - # Format date - date_str = item.get("createdAt", "") - try: - # Try to parse the Twitter date format - date_obj = datetime.strptime(date_str, "%a %b %d %H:%M:%S %z %Y") - - # Convert to IST - ist_date_obj = convert_to_ist(date_obj) - - formatted_date = ist_date_obj.strftime("%Y-%m-%d %H:%M:%S") - date_only = ist_date_obj.strftime("%Y-%m-%d") - time_only = ist_date_obj.strftime("%H:%M") - hour = ist_date_obj.hour - day_of_week = ist_date_obj.strftime("%A") - except: - formatted_date = date_str - date_only = "" - time_only = "" - hour = 0 - day_of_week = "" - - # Get author info - author = item.get("author", {}) - - # ENHANCED: Extract account details if requested - if extract_account_info and not account_details and author: - account_details = extract_account_details(author) - # Debug: log what we found - print(f"DEBUG: Extracted account details from author: {account_details}") - elif extract_account_info and not author: - print(f"DEBUG: No author data found in tweet item: {list(item.keys())}") - - # Check if media exists - has_media = False - if "extendedEntities" in item and "media" in item["extendedEntities"]: - media = item["extendedEntities"]["media"] - if len(media) > 0: - has_media = True - - # Get tweet text - text = item.get("text", "") - - # Extract hashtags and mentions - hashtags = extract_hashtags(text) - mentions = extract_mentions(text) - - # Collect all hashtags and mentions for analysis - all_hashtags.extend(hashtags) - all_mentions.extend(mentions) - - # Calculate tweet length - tweet_length = len(text) if text else 0 - - # Get bookmarks count if available - bookmarks = item.get("bookmarkCount", 0) - - processed_item = { - "Date": formatted_date, - "Date_Only": date_only, - "Time_Only": time_only, - "Hour": hour, - "Day_of_Week": day_of_week, - "ID": item.get("id", ""), - "Author": author.get("name", ""), - "Username": author.get("userName", ""), - "Text": text, - "Text_Length": tweet_length, - "Likes": item.get("likeCount", 0), - "Retweets": item.get("retweetCount", 0), - "Replies": item.get("replyCount", 0), - "Bookmarks": bookmarks, - "Views": item.get("viewCount", 0), - "URL": item.get("url", ""), - "Is_Reply": item.get("isReply", False), - "Has_Media": has_media, - "Hashtag_Count": len(hashtags), - "Mention_Count": len(mentions), - "Hashtags": ", ".join(hashtags) if hashtags else "", - "Mentions": ", ".join(mentions) if mentions else "" - } - processed_data.append(processed_item) + self.client = MongoClient(uri, serverSelectionTimeoutMS=5000) + self.client.admin.command('ping') + self.db = self.client["DataCollector"] + self.tweets_collection = self.db[TWEETS_COLLECTION] + self.scheduler_users_collection = self.db[SCHEDULER_USERS_COLLECTION] + self.is_connected = True + logger.info("✅ MongoDB connected successfully") except Exception as e: - st.warning(f"Error processing tweet: {e}") + logger.error(f"âš ī¸ MongoDB connection failed: {e}") + logger.info("🔄 Running in offline mode - data will not be stored") + self._setup_dummy_collections() - # Create dataframe - df = pd.DataFrame(processed_data) - - # Calculate additional metrics - metrics = { - "hashtags": all_hashtags, - "mentions": all_mentions, - "account_details": account_details # ADDED: Include account details - } - - return df, metrics, mock_data_detected - -# Function to display a compact version of the analysis for comparison -def display_compact_analysis(df, metrics, username, dataset_id): - st.subheader(f"@{username}") + def _setup_dummy_collections(self) -> None: + """Setup dummy collections for offline mode.""" + class DummyCollection: + def update_one(self, *args, **kwargs): pass + def find(self, *args, **kwargs): return [] + def find_one(self, *args, **kwargs): return None + def insert_one(self, *args, **kwargs): pass + + self.tweets_collection = DummyCollection() + self.scheduler_users_collection = DummyCollection() + self.is_connected = False - # ENHANCED: Display account details if available - account_details = metrics.get("account_details", {}) - if account_details: - st.markdown("##### 👤 Account Info") - - # Display followers and following in columns - if account_details.get("followers_count") or account_details.get("following_count"): - acc_col1, acc_col2 = st.columns(2) - with acc_col1: - if account_details.get("followers_count"): - st.metric("Followers", f"{account_details['followers_count']:,}") - with acc_col2: - if account_details.get("following_count"): - st.metric("Following", f"{account_details['following_count']:,}") - - # Show follower ratio and verification status - if account_details.get("followers_count") and account_details.get("following_count"): - ratio = account_details["followers_count"] / account_details["following_count"] - st.metric("Follower Ratio", f"{ratio:.2f}:1") - - if account_details.get("verified"): - st.success("✅ Verified") +# ============================================================================= +# API SERVICES +# ============================================================================= - # Calculate metrics for analysis - total_tweets = len(df) - total_likes = df["Likes"].sum() - total_retweets = df["Retweets"].sum() - total_replies = df["Replies"].sum() - total_bookmarks = df["Bookmarks"].sum() - total_views = df["Views"].sum() +class ApifyService: + """Handles Apify API interactions for Twitter data collection.""" - total_engagement = total_likes + total_retweets + total_replies + total_bookmarks - avg_engagement_per_tweet = total_engagement / total_tweets if total_tweets > 0 else 0 - engagement_rate = (total_engagement / total_views * 100) if total_views > 0 else 0 + ACTOR_ID = "CJdippxWmn9uRfooo" - df["Engagement"] = df["Likes"] + df["Retweets"] + df["Replies"] + df["Bookmarks"] - most_engaging_tweet = df.loc[df["Engagement"].idxmax()] if not df.empty else None + def __init__(self, api_key: str): + self.client = ApifyClient(api_key) - media_tweets_pct = (df["Has_Media"].sum() / total_tweets * 100) if total_tweets > 0 else 0 - reply_tweets_pct = (df["Is_Reply"].sum() / total_tweets * 100) if total_tweets > 0 else 0 - avg_tweet_length = df["Text_Length"].mean() if not df.empty else 0 + def _run_actor(self, run_input: Dict[str, Any]) -> Tuple[List[Dict], str]: + """Execute Apify actor and retrieve dataset.""" + try: + run = self.client.actor(self.ACTOR_ID).call(run_input=run_input) + dataset_id = run["defaultDatasetId"] + data = list(self.client.dataset(dataset_id).iterate_items()) + return data, dataset_id + except Exception as e: + logger.error(f"Apify actor execution failed: {e}") + raise - hashtag_counts = Counter(metrics["hashtags"]) - top_hashtags = hashtag_counts.most_common(5) + def fetch_account_tweets(self, username: str, since: str, until: str) -> Tuple[List[Dict], str]: + """Fetch tweets posted by a specific account.""" + run_input = { + "from": username, + "since": f"{since}_00:00:00_UTC", + "until": f"{until}_23:59:59_UTC", + "queryType": "Latest", + "include:nativeretweets": True, + } + + with st.spinner(f"Fetching tweets for @{username} from {since} to {until}..."): + data, dataset_id = self._run_actor(run_input) + st.info(f"🔍 Query Details: from:{username} | Raw results: {len(data)} tweets") + + return data, dataset_id - mention_counts = Counter(metrics["mentions"]) - top_mentions = mention_counts.most_common(5) + def fetch_account_comments(self, username: str, since: str, until: str) -> Tuple[List[Dict], str]: + """Fetch comments/replies directed to a specific account.""" + run_input = { + "to": username, + "since": f"{since}_00:00:00_UTC", + "until": f"{until}_23:59:59_UTC", + "queryType": "Latest", + } + + with st.spinner(f"Fetching comments for @{username} from {since} to {until}..."): + data, dataset_id = self._run_actor(run_input) + st.info(f"🔍 Query Details: to:@{username} | Raw results: {len(data)} comments") + + return data, dataset_id - st.markdown("##### 📈 Key Metrics") - st.metric("Total Tweets", f"{total_tweets:,}") - st.metric("Total Likes", f"{total_likes:,}") - st.metric("Total Retweets", f"{total_retweets:,}") - st.metric("Total Replies", f"{total_replies:,}") - st.metric("Total Bookmarks", f"{total_bookmarks:,}") - st.metric("Total Views", f"{total_views:,}") +class GeminiService: + """Handles Google Generative AI interactions.""" - st.markdown("##### ⚡ Engagement") - st.metric("Avg. Engagement/Tweet", f"{avg_engagement_per_tweet:.1f}") - st.metric("Engagement Rate", f"{engagement_rate:.2f}%") + def __init__(self, api_key: str): + genai.configure(api_key=api_key) + self.model = genai.GenerativeModel('gemini-1.5-flash') - st.markdown("##### 🔍 Content") - st.metric("Media Tweets", f"{media_tweets_pct:.1f}%") - st.metric("Reply Tweets", f"{reply_tweets_pct:.1f}%") - st.metric("Avg. Tweet Length", f"{avg_tweet_length:.0f} chars") + def generate_analysis(self, tweets_df: pd.DataFrame, context: str) -> str: + """Generate AI-powered analysis of tweets.""" + if tweets_df.empty: + return "No tweets provided for analysis." + + with st.spinner("Generating AI summary with Gemini..."): + try: + tweets_text = self._format_tweets_for_analysis(tweets_df) + prompt = self._create_analysis_prompt(context, tweets_text) + response = self.model.generate_content(prompt) + return response.text + except Exception as e: + logger.error(f"Gemini analysis failed: {e}") + return f"Error generating summary: {str(e)}" + + def _format_tweets_for_analysis(self, tweets_df: pd.DataFrame) -> str: + """Format tweets for AI analysis.""" + return "\n\n".join([ + f"{i}. @{row.Username}: {row.Text} (Likes: {row.Likes}, Retweets: {row.Retweets})" + for i, row in enumerate(tweets_df.itertuples(), 1) + ]) + + def _create_analysis_prompt(self, context: str, tweets_text: str) -> str: + """Create analysis prompt for Gemini.""" + return f""" + {context} + + Here are the tweets to analyze: + {tweets_text} + + Please provide a comprehensive analysis covering: + 1. **Main Themes & Topics:** What are the key subjects of discussion? + 2. **Overall Sentiment:** What is the general tone (positive, negative, neutral)? + 3. **Key Insights & Patterns:** Are there any notable trends or surprising findings? + 4. **Top Recommendations:** Provide 5 actionable suggestions for the brand/party to improve their strategy based on this feedback. + + Format the response clearly using Markdown. + """ - if top_hashtags: - st.markdown("##### 🔝 Top Hashtags") - for tag, count in top_hashtags: - st.write(f"#{tag}: {count}") - - if top_mentions: - st.markdown("##### đŸ‘Ĩ Top Mentions") - for user, count in top_mentions: - st.write(f"@{user}: {count}") - - if most_engaging_tweet is not None: - st.markdown("##### 🌟 Most Engaging") - with st.container(): - st.write(f"**{most_engaging_tweet['Text']}**") - st.write(f"đŸ’Ŧ {most_engaging_tweet['Replies']} 🔄 {most_engaging_tweet['Retweets']} â¤ī¸ {most_engaging_tweet['Likes']} 🔖 {most_engaging_tweet['Bookmarks']} đŸ‘ī¸ {most_engaging_tweet['Views']}") - st.write(f"[{most_engaging_tweet['Date']}]({most_engaging_tweet['URL']})") +# ============================================================================= +# DATA PROCESSING +# ============================================================================= - st.info(f"Dataset ID: {dataset_id}") +class TweetDataProcessor: + """Processes raw tweet data into structured format.""" + + def process_tweets(self, raw_data: List[Dict[str, Any]]) -> Tuple[pd.DataFrame, Dict[str, Any]]: + """Transform raw API data into clean DataFrame and metrics.""" + processed_data = [] + hashtags_counter = Counter() + mentions_counter = Counter() + all_author_data = [] + + skipped_count = 0 + error_count = 0 + + for item in raw_data: + try: + processed_tweet = self._process_single_tweet(item, hashtags_counter, mentions_counter, all_author_data) + if processed_tweet: + processed_data.append(processed_tweet) + else: + skipped_count += 1 + except Exception as e: + error_count += 1 + # Only log individual errors in debug mode + if st.session_state.get('debug_mode', False): + logger.warning(f"Skipping tweet due to processing error: {e}") + st.warning(f"Skipping a tweet due to processing error: {e}") + + # Show summary of skipped items only if significant + if skipped_count > 0 and st.session_state.get('debug_mode', False): + st.info(f"â„šī¸ Skipped {skipped_count} items (likely mock/invalid data)") + + if error_count > 0: + st.warning(f"âš ī¸ {error_count} items had processing errors") + + # Extract best account details + account_details = self._extract_best_account_details(all_author_data) + + # Create DataFrame and metrics + df = pd.DataFrame(processed_data) + metrics = { + "top_hashtags": hashtags_counter.most_common(5), + "top_mentions": mentions_counter.most_common(5), + "account_details": account_details + } + + return df, metrics + + def _is_mock_tweet(self, item: Dict) -> bool: + """Detect if a tweet is mock/invalid data that should be ignored.""" + # Check for missing essential fields that real tweets should have + essential_fields = ['createdAt', 'text', 'author'] + missing_fields = sum(1 for field in essential_fields if not item.get(field)) + + # If missing multiple essential fields, likely mock data + if missing_fields >= 2: + return True + + # Check for empty or placeholder text + text = item.get("text", "").strip() + if not text or text.lower() in ["", "null", "undefined", "test", "placeholder"]: + return True + + # Check for missing or empty author data + author = item.get("author", {}) + if not author or not author.get("userName", "").strip(): + return True + + # Check for obviously fake/test usernames + username = author.get("userName", "").lower() + test_patterns = ["test", "mock", "fake", "placeholder", "example"] + if any(pattern in username for pattern in test_patterns): + return True + + return False + + def _process_single_tweet(self, item: Dict, hashtags_counter: Counter, + mentions_counter: Counter, all_author_data: List) -> Optional[Dict]: + """Process a single tweet item.""" + # Extract author data + author = item.get("author", {}) + if author: + all_author_data.append(author) + + # Check if this is a mock/invalid tweet (has minimal or no real data) + is_mock_tweet = self._is_mock_tweet(item) + + # Validate date information + created_at = item.get("createdAt", "") + if not created_at: + # Only show warning for real tweets missing dates, and only in debug mode + if not is_mock_tweet and st.session_state.get('debug_mode', False): + st.warning("Skipping a tweet due to missing date information") + return None + + # Parse date + try: + date_obj_utc = datetime.strptime(created_at, TWITTER_DATE_FORMAT) + date_obj_ist = convert_to_ist(date_obj_utc) + except ValueError as e: + # Only log/warn for real tweets with invalid dates + if not is_mock_tweet: + if st.session_state.get('debug_mode', False): + st.warning(f"Skipping tweet due to invalid date format: {created_at}") + logger.warning(f"Invalid date format: {created_at}") + return None + + # Extract text and analyze + text = item.get("text", "") + hashtags = [word.strip("#") for word in text.split() if word.startswith('#')] + mentions = [word.strip("@") for word in text.split() if word.startswith('@')] + + # Update counters + hashtags_counter.update(hashtags) + mentions_counter.update(mentions) + + return { + "Date": date_obj_ist.strftime("%Y-%m-%d %H:%M:%S"), + "Date_Only": date_obj_ist.strftime("%Y-%m-%d"), + "Hour": date_obj_ist.hour, + "Day_of_Week": date_obj_ist.strftime("%A"), + "Username": author.get("userName", ""), + "Text": text, + "Likes": item.get("likeCount", 0), + "Retweets": item.get("retweetCount", 0), + "Replies": item.get("replyCount", 0), + "Views": item.get("viewCount", 0), + "URL": item.get("url", ""), + "Has_Media": "extendedEntities" in item, + "Hashtags": ", ".join(hashtags), + "Mentions": ", ".join(mentions), + } - csv = df.to_csv(index=False).encode('utf-8') - st.download_button( - f"đŸ“Ĩ Download @{username} CSV", - csv, - f"twitter_data_{username}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", - "text/csv", - key=f"download-csv-compare-{username}", - use_container_width=True - ) + def _extract_best_account_details(self, all_author_data: List[Dict]) -> Dict: + """Extract the most complete account details from author data.""" + if not all_author_data: + return {} + + # Find the author data with the most complete information + best_author = self._find_most_complete_author(all_author_data) + + # Debug information + if st.session_state.get('debug_mode', False): + st.write("Debug - Found", len(all_author_data), "author objects") + st.write("Debug - Best author data keys:", list(best_author.keys())) + st.write("Debug - Best author data sample:", { + k: v for k, v in best_author.items() + if k in ['name', 'userName', 'followers', 'following', 'statusesCount'] + }) + + return self._standardize_account_details(best_author) + + def _find_most_complete_author(self, all_author_data: List[Dict]) -> Dict: + """Find the author data object with the most complete information.""" + best_author = {} + best_score = -1 + + for author in all_author_data: + score = self._calculate_author_completeness_score(author) + if score > best_score: + best_score = score + best_author = author + + return best_author if best_score > 0 else (all_author_data[0] if all_author_data else {}) + + def _calculate_author_completeness_score(self, author: Dict) -> int: + """Calculate completeness score for author data.""" + score = 0 + + # Check for follower metrics (high priority) + if author.get("followers", 0) > 0 or author.get("followersCount", 0) > 0: + score += 3 + if author.get("following", 0) > 0 or author.get("followingCount", 0) > 0: + score += 2 + if author.get("statusesCount", 0) > 0: + score += 2 + + # Check for profile information (lower priority) + if author.get("description") or author.get("profile_bio"): + score += 1 + if author.get("verified") or author.get("isVerified"): + score += 1 + + return score + + def _standardize_account_details(self, author_data: Dict) -> Dict: + """Standardize account details from various possible field names.""" + # Try multiple possible field names for metrics + followers_count = ( + author_data.get("followers") or + author_data.get("followersCount") or + author_data.get("followers_count") or + safe_get_nested(author_data, ["publicMetrics", "followers_count"]) or + 0 + ) + + following_count = ( + author_data.get("following") or + author_data.get("followingCount") or + author_data.get("following_count") or + author_data.get("friends_count") or + safe_get_nested(author_data, ["publicMetrics", "following_count"]) or + 0 + ) + + tweet_count = ( + author_data.get("statusesCount") or + author_data.get("statuses_count") or + author_data.get("tweet_count") or + safe_get_nested(author_data, ["publicMetrics", "tweet_count"]) or + 0 + ) + + return { + "name": author_data.get("name", ""), + "username": author_data.get("userName", "") or author_data.get("username", ""), + "bio": author_data.get("description", "") or author_data.get("bio", ""), + "followers_count": followers_count, + "following_count": following_count, + "tweet_count": tweet_count, + "verified": author_data.get("verified", False) or author_data.get("isVerified", False), + "profile_image_url": author_data.get("profileImageUrl", "") or author_data.get("profile_image_url", ""), + } -# Function to analyze and display the tweet data -def analyze_and_display_data(data, dataset_id, analysis_type="Account"): - raw_data = None - if not isinstance(data, pd.DataFrame): # If raw data is passed - # Store raw data for sentiment analysis - raw_data = data - # Process the data into a dataframe - ENHANCED: Extract account details - df, metrics, _ = process_tweet_data(data, extract_account_info=True) - else: # If DataFrame is already processed (e.g. after retry) - df = data - # Recalculate metrics if df might have changed (e.g. if mock data was removed before this call) - all_hashtags_retry = [] - all_mentions_retry = [] - for _, row in df.iterrows(): - if pd.notna(row.get("Hashtags")) and row["Hashtags"]: - all_hashtags_retry.extend(row["Hashtags"].split(", ")) - if pd.notna(row.get("Mentions")) and row["Mentions"]: - all_mentions_retry.extend(row["Mentions"].split(", ")) - metrics = {"hashtags": all_hashtags_retry, "mentions": all_mentions_retry, "account_details": {}} +# ============================================================================= +# UI COMPONENTS +# ============================================================================= - if not df.empty: - # Calculate additional metrics for analysis - total_tweets = len(df) - total_likes = df["Likes"].sum() - total_retweets = df["Retweets"].sum() - total_replies = df["Replies"].sum() - total_bookmarks = df["Bookmarks"].sum() - total_views = df["Views"].sum() +class UIComponents: + """Reusable UI components for the dashboard.""" + + @staticmethod + def display_account_info(account_details: Dict) -> None: + """Display account information section.""" + if not account_details: + return + + st.subheader(f"👤 Account: @{account_details['username']}") + + # Profile image + if account_details.get('profile_image_url'): + st.image(account_details['profile_image_url'], width=80) + + # Account name and verification + verification_badge = '✅' if account_details.get('verified') else '' + st.markdown(f"**{account_details.get('name')}** {verification_badge}") + + # Bio + if account_details.get('bio'): + st.caption(account_details.get('bio')) + + # Metrics + UIComponents._display_account_metrics(account_details) + st.divider() + + @staticmethod + def _display_account_metrics(account_details: Dict) -> None: + """Display account metrics (followers, following, posts).""" + m1, m2, m3 = st.columns(3) + + followers = account_details.get('followers_count', 0) + following = account_details.get('following_count', 0) + posts = account_details.get('tweet_count', 0) + + m1.metric( + "Followers", + format_large_number(followers), + help="Follower count from Twitter API" + ) + m2.metric( + "Following", + format_large_number(following), + help="Following count from Twitter API" + ) + m3.metric( + "Total Posts", + format_large_number(posts), + help="Total tweet count from Twitter API" + ) + + # Warning for missing data + if followers == 0 and following == 0 and posts == 0: + st.warning("âš ī¸ Account metrics unavailable - this may be due to API limitations or account privacy settings") + + @staticmethod + def display_key_metrics(df: pd.DataFrame) -> None: + """Display key engagement metrics.""" + if df.empty: + return + + st.subheader("📈 Key Metrics") + + # Basic metrics + c1, c2, c3 = st.columns(3) + c1.metric("Total Tweets Scanned", f"{len(df):,}") + c2.metric("Total Likes", f"{df['Likes'].sum():,}") + c3.metric("Total Retweets", f"{df['Retweets'].sum():,}") # Engagement metrics - total_engagement = total_likes + total_retweets + total_replies + total_bookmarks - avg_engagement_per_tweet = total_engagement / total_tweets if total_tweets > 0 else 0 + st.subheader("⚡ Engagement") + df_copy = df.copy() + df_copy["Engagement"] = df_copy["Likes"] + df_copy["Retweets"] + df_copy["Replies"] + + total_engagement = df_copy["Engagement"].sum() + avg_engagement = total_engagement / len(df) if len(df) > 0 else 0 + total_views = df["Views"].sum() engagement_rate = (total_engagement / total_views * 100) if total_views > 0 else 0 - # Find most engaging tweet - df["Engagement"] = df["Likes"] + df["Retweets"] + df["Replies"] + df["Bookmarks"] - most_engaging_tweet = df.loc[df["Engagement"].idxmax()] if not df.empty else None + e1, e2 = st.columns(2) + e1.metric("Avg. Engagement/Tweet", f"{avg_engagement:.1f}") + e2.metric("Engagement Rate (vs Views)", f"{engagement_rate:.2f}%") + st.divider() + + @staticmethod + def display_content_analysis(metrics: Dict) -> None: + """Display content analysis section.""" + st.subheader("🔍 Content Analysis") - # Tweet type breakdown - media_tweets_pct = (df["Has_Media"].sum() / total_tweets * 100) if total_tweets > 0 else 0 - reply_tweets_pct = (df["Is_Reply"].sum() / total_tweets * 100) if total_tweets > 0 else 0 + top_hashtags = metrics.get("top_hashtags", []) + top_mentions = metrics.get("top_mentions", []) - # Content analysis - avg_tweet_length = df["Text_Length"].mean() if not df.empty else 0 + if top_hashtags: + st.markdown("**Top Hashtags**") + st.write(", ".join([f"`#{tag}` ({count})" for tag, count in top_hashtags])) - # Get top hashtags - hashtag_counts = Counter(metrics["hashtags"]) - top_hashtags = hashtag_counts.most_common(5) + if top_mentions: + st.markdown("**Top Mentions**") + st.write(", ".join([f"`@{user}` ({count})" for user, count in top_mentions])) + + @staticmethod + def display_ai_summary(gemini_summary: Optional[str]) -> None: + """Display AI-generated summary section.""" + if gemini_summary: + st.subheader("🧠 AI Summary & Recommendations") + st.markdown(gemini_summary) + st.divider() + + @staticmethod + def display_most_engaging_tweet(df: pd.DataFrame) -> None: + """Display the most engaging tweet.""" + if df.empty: + return - # Get top mentions - mention_counts = Counter(metrics["mentions"]) - top_mentions = mention_counts.most_common(5) + st.subheader("🌟 Most Engaging Tweet") - # Temporal analysis by day - df_by_day = df.groupby("Date_Only").size().reset_index(name="Count") - df_by_hour = df.groupby("Hour").size().reset_index(name="Count") - df_by_weekday = df.groupby("Day_of_Week").size().reset_index(name="Count") + df_copy = df.copy() + df_copy["Engagement"] = df_copy["Likes"] + df_copy["Retweets"] + df_copy["Replies"] + most_engaging = df_copy.loc[df_copy["Engagement"].idxmax()] - # Store DataFrame and metrics in session state - st.session_state.processed_df = df - # Note: Data is only stored to MongoDB during scheduled operations, not manual scraping + with st.container(border=True): + st.markdown(f"**{most_engaging['Text']}**") + stats = (f"â¤ī¸ {most_engaging['Likes']} | 🔄 {most_engaging['Retweets']} | " + f"đŸ’Ŧ {most_engaging['Replies']} | đŸ‘ī¸ {most_engaging['Views']}") + st.markdown(f"**{stats}** | [{most_engaging['Date']}]({most_engaging['URL']})") + st.divider() + + @staticmethod + def display_charts(df: pd.DataFrame) -> None: + """Display data visualization charts.""" + if df.empty: + return - # Generate Gemini summary if available - gemini_summary = None - if GENAI_AVAILABLE: - with st.spinner("Generating AI summary with Gemini..."): - context = f"The following are {analysis_type.lower()} for Twitter {'account' if analysis_type == 'Account' else 'comments to'}" - gemini_summary = get_gemini_summary(df, context) + st.subheader("📅 Posting Patterns") - # Two column layout for dashboard - left_col, right_col = st.columns([1, 1]) + # Tweets by day + df_by_day = df.groupby('Date_Only')['Text'].count().reset_index() + df_by_day['Date_Only'] = pd.to_datetime(df_by_day['Date_Only']) - with left_col: - # ENHANCED: Display account details if available - account_details = metrics.get("account_details", {}) - # Debug: Show account details for troubleshooting - with st.expander("🔍 Debug Account Details"): - st.write("Account details object:") - st.json(account_details) - if not account_details and hasattr(st.session_state, 'results') and st.session_state.results: - st.write("Sample raw API response (first item):") - sample_item = st.session_state.results[0] if st.session_state.results else {} - st.json({ - "author": sample_item.get("author", "No author key"), - "available_keys": list(sample_item.keys()) if sample_item else [] - }) - if account_details: - st.subheader("👤 Account Information") - acc_col1, acc_col2, acc_col3 = st.columns(3) - with acc_col1: - # Show followers count (even if 0) - followers_count = account_details.get("followers_count", 0) - st.metric("Followers", f"{followers_count:,}") - # Show following count (even if 0) - following_count = account_details.get("following_count", 0) - st.metric("Following", f"{following_count:,}") - # Calculate follower-to-following ratio - if followers_count > 0 and following_count > 0: - ratio = followers_count / following_count - st.metric("Follower Ratio", f"{ratio:.2f}:1") - with acc_col2: - if account_details.get("tweet_count"): - st.metric("Total Tweets (All Time)", f"{account_details['tweet_count']:,}") - if account_details.get("listed_count"): - st.metric("Listed Count", f"{account_details['listed_count']:,}") - with acc_col3: - if account_details.get("verified"): - st.success("✅ Verified Account") - if account_details.get("bio"): - st.write(f"**Bio:** {account_details['bio']}") - - st.divider() - - st.subheader("📈 Key Metrics") - - # Basic stats - metrics_section = st.container() - - col1, col2, col3 = metrics_section.columns(3) - with col1: - st.metric("Total Tweets", f"{total_tweets:,}") - st.metric("Total Likes", f"{total_likes:,}") - with col2: - st.metric("Total Retweets", f"{total_retweets:,}") - st.metric("Total Replies", f"{total_replies:,}") - with col3: - st.metric("Total Bookmarks", f"{total_bookmarks:,}") - st.metric("Total Views", f"{total_views:,}") - - # Engagement metrics - st.subheader("⚡ Engagement Analysis") - engagement_cols = st.columns(2) - with engagement_cols[0]: - st.metric("Avg. Engagement per Tweet", f"{avg_engagement_per_tweet:.1f}") - with engagement_cols[1]: - st.metric("Engagement Rate", f"{engagement_rate:.2f}%") - - # Tweet type breakdown - st.subheader("🔍 Content Breakdown") - type_cols = st.columns(3) - with type_cols[0]: - st.metric("Tweets with Media", f"{media_tweets_pct:.1f}%") - with type_cols[1]: - st.metric("Reply Tweets", f"{reply_tweets_pct:.1f}%") - with type_cols[2]: - st.metric("Avg. Tweet Length", f"{avg_tweet_length:.0f} chars") - - # Top hashtags - if top_hashtags: - st.subheader("🔝 Top Hashtags") - for tag, count in top_hashtags: - st.write(f"#{tag}: {count} times") - - # Top mentions - if top_mentions: - st.subheader("đŸ‘Ĩ Top Mentions") - for user, count in top_mentions: - st.write(f"@{user}: {count} times") - - # Dataset info - st.info(f"Dataset ID: {dataset_id}") - - # Download button + fig_day = px.line( + df_by_day, + x='Date_Only', + y='Text', + title="Tweets per Day", + labels={'Date_Only': 'Date', 'Text': 'Count'} + ) + st.plotly_chart(fig_day, use_container_width=True) + + @staticmethod + def display_data_download(df: pd.DataFrame) -> None: + """Display raw data table with download option.""" + st.subheader("📊 Raw Data") + st.dataframe(df) + + if not df.empty: csv = df.to_csv(index=False).encode('utf-8') st.download_button( "đŸ“Ĩ Download as CSV", csv, f"twitter_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", "text/csv", - key=f"download-csv-{analysis_type}", + key="download-csv", use_container_width=True ) - - with right_col: - # Display Gemini summary if available - if gemini_summary: - st.subheader("🧠 AI Summary") - st.markdown(gemini_summary) - st.divider() - elif GENAI_AVAILABLE is False: - st.info("💡 AI Summary not available. Install the Google Generative AI package for automatic summaries. See sidebar for instructions.") - - # Most engaging tweet - if most_engaging_tweet is not None: - st.subheader("🌟 Most Engaging Tweet") - with st.container(): - st.write(f"**@{most_engaging_tweet['Username']}** â€ĸ {most_engaging_tweet['Author']} â€ĸ {most_engaging_tweet['Date']}") - st.write(most_engaging_tweet['Text']) - - # Display metrics in a row - cols = st.columns(5) - with cols[0]: - st.write(f"đŸ’Ŧ {most_engaging_tweet['Replies']}") - with cols[1]: - st.write(f"🔄 {most_engaging_tweet['Retweets']}") - with cols[2]: - st.write(f"â¤ī¸ {most_engaging_tweet['Likes']}") - with cols[3]: - st.write(f"🔖 {most_engaging_tweet['Bookmarks']}") - with cols[4]: - st.write(f"đŸ‘ī¸ {most_engaging_tweet['Views']}") - - # Link to original tweet - st.write(f"[View on Twitter]({most_engaging_tweet['URL']})") - st.divider() - - # Temporal analysis visualizations - st.subheader("📅 Posting Patterns") - - # Tweets by day - if not df_by_day.empty and len(df_by_day) > 1: - fig_by_day = px.line(df_by_day, x="Date_Only", y="Count", - title="Tweets by Day", - labels={"Date_Only": "Date", "Count": "Number of Tweets"}) - st.plotly_chart(fig_by_day, use_container_width=True) - - # Tweets by hour of day - if not df_by_hour.empty: - fig_by_hour = px.bar(df_by_hour, x="Hour", y="Count", - title="Tweets by Hour of Day (Indian Time)", - labels={"Hour": "Hour (24h format)", "Count": "Number of Tweets"}) - st.plotly_chart(fig_by_hour, use_container_width=True) - - # Tweets by day of week - if not df_by_weekday.empty: - # Sort by days of week properly - days_order = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] - df_by_weekday["Day_of_Week"] = pd.Categorical(df_by_weekday["Day_of_Week"], categories=days_order, ordered=True) - df_by_weekday = df_by_weekday.sort_values("Day_of_Week") - - fig_by_weekday = px.bar(df_by_weekday, x="Day_of_Week", y="Count", - title="Tweets by Day of Week", - labels={"Day_of_Week": "Day of Week", "Count": "Number of Tweets"}) - st.plotly_chart(fig_by_weekday, use_container_width=True) - - # Advanced views in expandable sections - with st.expander("View as Table"): - st.dataframe(df, use_container_width=True) - - with st.expander("View Raw JSON"): - st.json(data) - - # Display tweets list without pagination - st.subheader("đŸĻ Tweets") - display_tweet_list(df) - else: - st.warning("No results found. Try a different query or date range.") - -# Function to handle tweet list display without pagination -def display_tweet_list(df): - # Display all tweets from the dataframe - st.write(f"Displaying all {len(df)} tweets:") - # Add a toggle to show/hide tweets for better performance - if len(df) > 50: - show_all = st.checkbox("Show all tweets (may slow down the app)", value=False) - display_count = len(df) if show_all else min(50, len(df)) - st.info(f"Showing {display_count} of {len(df)} tweets. {'' if show_all else 'Check the box above to see all tweets.'}") - display_df = df.iloc[:display_count].copy() - else: - display_df = df - - # Display each tweet - for i, row in display_df.iterrows(): - with st.container(): - st.write(f"**@{row['Username']}** â€ĸ {row['Author']} â€ĸ {row['Date']}") - st.write(row['Text']) - - # Display metrics in a row - cols = st.columns(5) - with cols[0]: - st.write(f"đŸ’Ŧ {row['Replies']}") - with cols[1]: - st.write(f"🔄 {row['Retweets']}") - with cols[2]: - st.write(f"â¤ī¸ {row['Likes']}") - with cols[3]: - st.write(f"🔖 {row['Bookmarks']}") - with cols[4]: - st.write(f"đŸ‘ī¸ {row['Views']}") - - # Indicate if tweet has media without showing it - if row['Has_Media']: - st.write("📷 Contains media") - - # Link to original tweet - st.write(f"[View on Twitter]({row['URL']})") - st.divider() +# ============================================================================= +# DASHBOARD MANAGEMENT +# ============================================================================= -# Function to display tweets in a compact format for comparison -def display_tweet_list_compact(df): - # Limit to first 20 tweets for comparison view to avoid overwhelming the UI - display_count = min(20, len(df)) - if len(df) > 20: - st.info(f"Showing top {display_count} of {len(df)} tweets") +class TwitterDashboard: + """Main dashboard for displaying Twitter analysis results.""" - display_df = df.iloc[:display_count].copy() - - # Display each tweet in compact format - for i, row in display_df.iterrows(): - with st.container(): - # Compact header with date - st.write(f"**{row['Date_Only']} {row['Time_Only']}**") - - # Tweet text (truncate if too long) - text = row['Text'] - if len(text) > 200: - text = text[:200] + "..." - st.write(text) - - # Compact metrics in one line - metrics_text = f"đŸ’Ŧ {row['Replies']} â€ĸ 🔄 {row['Retweets']} â€ĸ â¤ī¸ {row['Likes']} â€ĸ 🔖 {row['Bookmarks']} â€ĸ đŸ‘ī¸ {row['Views']}" - if row['Has_Media']: - metrics_text += " â€ĸ 📷" - st.caption(metrics_text) - - # Small divider - st.write("---") - -# Function to store processed tweets into MongoDB (upsert by tweet ID) - ENHANCED FOR RAW DATA -def store_to_mongodb(df, analysis_type="Account", ai_summary=None, raw_data=None, account_details=None): - if df.empty: - return - if not MONGODB_AVAILABLE: - print(f"âš ī¸ MongoDB unavailable - {len(df)} tweets not stored") - return + def __init__(self, df: pd.DataFrame, metrics: Dict, dataset_id: str, + analysis_type: str = "Account's Tweets", gemini_summary: Optional[str] = None): + self.df = df + self.metrics = metrics + self.dataset_id = dataset_id + self.analysis_type = analysis_type + self.gemini_summary = gemini_summary - # Group by username and store one document per account - for username in df['Username'].unique(): - user_tweets = df[df['Username'] == username] - - # Calculate aggregated metrics (convert to native Python types for MongoDB) - # Handle missing columns gracefully - total_tweets = int(len(user_tweets)) - total_likes = int(user_tweets.get("Likes", pd.Series([0])).sum()) if "Likes" in user_tweets.columns else 0 - total_retweets = int(user_tweets.get("Retweets", pd.Series([0])).sum()) if "Retweets" in user_tweets.columns else 0 - total_replies = int(user_tweets.get("Replies", pd.Series([0])).sum()) if "Replies" in user_tweets.columns else 0 - total_bookmarks = int(user_tweets.get("Bookmarks", pd.Series([0])).sum()) if "Bookmarks" in user_tweets.columns else 0 - total_views = int(user_tweets.get("Views", pd.Series([0])).sum()) if "Views" in user_tweets.columns else 0 - total_engagement = total_likes + total_retweets + total_replies + total_bookmarks - avg_engagement = float(total_engagement / total_tweets) if total_tweets > 0 else 0.0 - - # Get all tweets as a list - tweets_list = user_tweets.to_dict("records") - - # ENHANCED: Create account document with raw data and account details - account_doc = { - "username": username, - "analysis_type": analysis_type, - "last_updated": datetime.now().isoformat(), - "total_tweets": total_tweets, - "total_likes": total_likes, - "total_retweets": total_retweets, - "total_replies": total_replies, - "total_bookmarks": total_bookmarks, - "total_views": total_views, - "total_engagement": total_engagement, - "avg_engagement_per_tweet": avg_engagement, - "tweets": tweets_list, - "ai_summary": ai_summary, - "raw_tweets": raw_data if raw_data else [], # ADDED: Store raw data for sentiment analysis - "account_details": account_details if account_details else {} # ADDED: Store account details - } - - # Upsert by username - one document per account - tweets_collection.update_one( - {"username": username}, - {"$set": account_doc}, - upsert=True - ) - -# --- Scheduler utilities --- - -def fetch_and_store(username, since, until): - """Helper to fetch tweets for a username and store them in MongoDB.""" - try: - results, _ = run_apify_account_analysis({ - "username": username, - "since": since, - "until": until, - "min_faves": 0, - "min_retweets": 0, - "min_replies": 0 - }) - df, metrics, _ = process_tweet_data(results, extract_account_info=True) - - # Generate AI summary if available - ai_summary = None - if not df.empty and GENAI_AVAILABLE and GEMINI_API_KEY: - try: - context = f"The following are account tweets for Twitter account @{username}" - ai_summary = get_gemini_summary(df, context) - except Exception as e: - print(f"AI summary generation failed for @{username}: {e}") + def render(self) -> None: + """Render the complete dashboard.""" + if self.df.empty: + st.warning("No data available to display.") + return - # ENHANCED: Store with raw data and account details - account_details = metrics.get("account_details", {}) - store_to_mongodb(df, "Account", ai_summary, raw_data=results, account_details=account_details) - except Exception as e: - print(f"Scheduler error fetching @{username}: {e}") - - -def schedule_fetch(usernames, since, until): - for user in usernames: - fetch_and_store(user, since, until) - - -def _run_schedule_loop(): - """Background thread that keeps the schedule running.""" - while True: - schedule.run_pending() - time.sleep(30) - -# --- End Scheduler utilities --- - -# --- Scheduler DB helpers --- - -def get_scheduler_usernames(): - if not MONGODB_AVAILABLE: - return [] - return [doc["username"] for doc in scheduler_users_collection.find()] - - -def save_scheduler_usernames(usernames): - if not MONGODB_AVAILABLE: - print("âš ī¸ MongoDB unavailable - usernames not stored") - return - for u in usernames: - scheduler_users_collection.update_one({"username": u}, {"$set": {"username": u}}, upsert=True) - -def remove_scheduler_username(username): - if not MONGODB_AVAILABLE: - print("âš ī¸ MongoDB unavailable - username not removed") - return - scheduler_users_collection.delete_one({"username": username}) - -def clear_all_scheduler_usernames(): - if not MONGODB_AVAILABLE: - print("âš ī¸ MongoDB unavailable - usernames not cleared") - return - scheduler_users_collection.delete_many({}) - -def clear_all_tweets_data(): - if not MONGODB_AVAILABLE: - print("âš ī¸ MongoDB unavailable - tweets data not cleared") - return - result = tweets_collection.delete_many({}) - return result.deleted_count - -# --- End Scheduler DB helpers --- - -def run_apify_followers_analysis(input): - """ - Fetch followers/following data using Apify actor - """ - username = input["username"] - relationship_type = input.get("relationship_type", "followers") # "followers" or "following" - max_items = input.get("max_items", 100) - - # Try the followers actor first - try: - if relationship_type == "followers": - run_input = { - "twitterHandles": [username], - "maxItems": max_items, - "getFollowers": True, - "getFollowing": False, - "getRetweeters": False, - "includeUnavailableUsers": False, - } - else: # following - run_input = { - "twitterHandles": [username], - "maxItems": max_items, - "getFollowers": False, - "getFollowing": True, - "getRetweeters": False, - "includeUnavailableUsers": False, - } + # Main layout + left_col, right_col = st.columns([1, 1], gap="large") - with st.spinner(f"Fetching {relationship_type} for @{username}..."): - # Try the actor you specified - run = client.actor("V38PZzpEgOfeeWvZY").call(run_input=run_input) - data = list(client.dataset(run["defaultDatasetId"]).iterate_items()) - - if data: - return data, run["defaultDatasetId"] + with left_col: + # Only show account info for "Account's Tweets" analysis + if self.analysis_type == "Account's Tweets": + UIComponents.display_account_info(self.metrics.get("account_details", {})) else: - # Fallback: Use alternative followers scraper - return run_apify_followers_fallback(input) - - except Exception as e: - st.warning(f"Primary followers actor failed: {e}") - # Fallback to alternative scraper - return run_apify_followers_fallback(input) - -def run_apify_followers_fallback(input): - """ - Fallback method using alternative followers scraper - """ - username = input["username"] - relationship_type = input.get("relationship_type", "followers") - max_items = input.get("max_items", 100) - - try: - # Use curious_coder/twitter-scraper as fallback - run_input = { - "profileUrl": f"https://twitter.com/{username}", - "friendshipType": relationship_type, # "followers" or "following" - "count": max_items, - "minDelay": 1, - "maxDelay": 3 - } - - with st.spinner(f"Fetching {relationship_type} for @{username} (fallback method)..."): - run = client.actor("curious_coder/twitter-scraper").call(run_input=run_input) - data = list(client.dataset(run["defaultDatasetId"]).iterate_items()) - return data, run["defaultDatasetId"] + # For "Comments to Account", show a different header + st.subheader(f"đŸ’Ŧ Comments Analysis") + st.info("Analyzing comments and replies directed to the account") + st.divider() - except Exception as e: - st.error(f"All followers scrapers failed: {e}") - return [], None - -def process_followers_data(data, relationship_type="followers"): - """ - Process followers/following data into a structured format - """ - processed_data = [] - - for item in data: - # Handle different data structures from different actors - username = item.get('username', item.get('screen_name', item.get('userName', ''))) - name = item.get('name', item.get('displayName', '')) - - processed_item = { - "Username": username, - "Name": name, - "Bio": item.get('description', item.get('bio', '')), - "Location": item.get('location', ''), - "Followers": item.get('followers_count', item.get('followersCount', item.get('followers', 0))), - "Following": item.get('following_count', item.get('followingCount', item.get('following', 0))), - "Tweets": item.get('tweet_count', item.get('statusesCount', item.get('statuses_count', 0))), - "Verified": item.get('verified', item.get('isVerified', False)), - "Profile_Image": item.get('profile_image_url', item.get('profileImageUrl', '')), - "Created_At": item.get('created_at', item.get('createdAt', '')), - "URL": item.get('url', f"https://twitter.com/{username}"), - "Relationship_Type": relationship_type - } - processed_data.append(processed_item) - - return pd.DataFrame(processed_data) - -# App header -st.title("đŸĻ Twitter Scraper") - -# Initialize session state variables if they don't exist -if 'username' not in st.session_state: - st.session_state.username = "" -if 'id' not in st.session_state: - st.session_state.id = "" -if 'since' not in st.session_state: - st.session_state.since = "2025-01-01" -if 'until' not in st.session_state: - st.session_state.until = datetime.now().strftime("%Y-%m-%d") -if 'min_faves' not in st.session_state: - st.session_state.min_faves = 0 -if 'min_retweets' not in st.session_state: - st.session_state.min_retweets = 0 -if 'min_replies' not in st.session_state: - st.session_state.min_replies = 0 -if 'results' not in st.session_state: - st.session_state.results = None -if 'dataset_id' not in st.session_state: - st.session_state.dataset_id = None -if 'active_tab' not in st.session_state: - st.session_state.active_tab = 0 -if 'processed_df' not in st.session_state: - st.session_state.processed_df = None -if 'username1' not in st.session_state: - st.session_state.username1 = "" -if 'username2' not in st.session_state: - st.session_state.username2 = "" -if 'compare_since' not in st.session_state: - st.session_state.compare_since = "2025-01-01" -if 'compare_until' not in st.session_state: - st.session_state.compare_until = datetime.now().strftime("%Y-%m-%d") + UIComponents.display_key_metrics(self.df) + UIComponents.display_content_analysis(self.metrics) + + with right_col: + UIComponents.display_ai_summary(self.gemini_summary) + UIComponents.display_most_engaging_tweet(self.df) + UIComponents.display_charts(self.df) + + # Full-width sections + UIComponents.display_data_download(self.df) -# Create tabs -tabs = st.tabs(["📊 Account Analysis", "đŸ’Ŧ Comment Analysis", "🆚 Compare", "⏰ Scheduler"]) +# ============================================================================= +# SCHEDULER MANAGEMENT +# ============================================================================= -# Account Analysis tab -with tabs[0]: - # Create a container for inputs - with st.container(): - st.header("Account Analysis") - st.write("Analyze tweets from a specific Twitter account") - - # Input fields in a cleaner layout - col1, col2, col3 = st.columns([3, 2, 2]) - with col1: - st.session_state.username = st.text_input("Enter Twitter username (without @)", - value=st.session_state.username, - key="account_username", - placeholder="e.g. elonmusk") - with col2: - st.session_state.since = st.date_input("Start date", - value=datetime.strptime(st.session_state.since, "%Y-%m-%d") - if isinstance(st.session_state.since, str) - else st.session_state.since, - key="account_since") - with col3: - st.session_state.until = st.date_input("End date", - value=datetime.strptime(st.session_state.until, "%Y-%m-%d") - if isinstance(st.session_state.until, str) - else st.session_state.until, - key="account_until") - - # Optional engagement filters - with st.expander("âš™ī¸ Advanced Filters (Optional)", expanded=False): - st.info("All filters are set to 0 by default to capture maximum tweets. Increase values to filter for more engaging content.") - col1, col2, col3 = st.columns(3) - with col1: - st.session_state.min_faves = st.number_input("Minimum Likes", - min_value=0, - max_value=10000, - value=st.session_state.min_faves, - step=10, - key="account_min_faves") - with col2: - st.session_state.min_retweets = st.number_input("Minimum Retweets", - min_value=0, - max_value=1000, - value=st.session_state.min_retweets, - step=5, - key="account_min_retweets") - with col3: - st.session_state.min_replies = st.number_input("Minimum Replies", - min_value=0, - max_value=1000, - value=st.session_state.min_replies, - step=5, - key="account_min_replies") - - # Convert dates to string format - if not isinstance(st.session_state.since, str): - st.session_state.since = st.session_state.since.strftime("%Y-%m-%d") - if not isinstance(st.session_state.until, str): - st.session_state.until = st.session_state.until.strftime("%Y-%m-%d") - - # Run button - run_button = st.button("🔍 Analyze Account Tweets", key="run_account", use_container_width=True) +class SchedulerManager: + """Manages scheduled users and automation settings.""" - # Run analysis when button is clicked - if run_button: - if st.session_state.username: - # Validate date range - if st.session_state.since > st.session_state.until: - st.error("Start date must be before end date.") - else: - st.session_state.results, st.session_state.dataset_id = run_apify_account_analysis({ - "username": st.session_state.username, - "since": st.session_state.since, - "until": st.session_state.until, - "min_faves": st.session_state.min_faves, - "min_retweets": st.session_state.min_retweets, - "min_replies": st.session_state.min_replies - }) - - # Process results to check for mock data - processed_df, metrics, mock_data_detected = process_tweet_data(st.session_state.results, extract_account_info=True) - - if mock_data_detected: - st.warning("Mock data detected in the response, indicating limited results. This may be due to strict filters or no tweets in the date range.") - - if not processed_df.empty: - date_range = f"{st.session_state.since} to {st.session_state.until}" - st.success(f"Analysis complete! Found {len(processed_df)} tweets from {date_range}.") - st.balloons() - # Pass raw data to preserve account details - analyze_and_display_data(st.session_state.results, st.session_state.dataset_id, "Account") - else: - st.warning("No results found. Try a different date range or reduce the engagement filters.") - else: - st.error("Please enter a Twitter username") - -# Comment Analysis tab -with tabs[1]: - with st.container(): - st.header("Comment Analysis") - st.write("Analyze comments directed at a specific Twitter account") - - # Input fields in a cleaner layout - col1, col2, col3 = st.columns([3, 2, 2]) - with col1: - tweet_id = st.text_input("Enter Twitter ID", - key="comment_id", - placeholder="e.g. YSJaganTrends") - with col2: - comment_since = st.date_input("Start date", - value=datetime.strptime(st.session_state.since, "%Y-%m-%d") - if isinstance(st.session_state.since, str) - else st.session_state.since, - key="comment_since") - with col3: - comment_until = st.date_input("End date", - value=datetime.strptime(st.session_state.until, "%Y-%m-%d") - if isinstance(st.session_state.until, str) - else st.session_state.until, - key="comment_until") - - # Run button - comment_button = st.button("🔍 Analyze Comments", key="run_comment", use_container_width=True) + def __init__(self, db: DatabaseManager): + self.db = db - # Run analysis when button is clicked - if comment_button: - if tweet_id: - # Validate date range - if comment_since > comment_until: - st.error("Start date must be before end date.") - else: - raw_results, dataset_id = run_apify_comment_analysis({ - "id": tweet_id, - "since": comment_since.strftime("%Y-%m-%d"), - "until": comment_until.strftime("%Y-%m-%d") - }) - - # Process data to remove mock tweets and get the actual count - processed_df, _, mock_data_detected = process_tweet_data(raw_results) - - if not processed_df.empty: - date_range = f"{comment_since.strftime('%Y-%m-%d')} to {comment_until.strftime('%Y-%m-%d')}" - st.success(f"Analysis complete! Found {len(processed_df)} actual comments from {date_range}.") - st.balloons() - # Display the results using the processed DataFrame - analyze_and_display_data(processed_df, dataset_id, "Comment") - elif mock_data_detected and processed_df.empty: - st.warning("Mock data was returned by the API, indicating no specific comments were found for your query. Please try adjusting your date range.") - else: # No mock data, but still empty (or raw_results was empty) - st.warning("No results found. Try a different query or date range.") - else: - st.error("Please enter a Twitter ID") - -# Compare Accounts tab -with tabs[2]: - with st.container(): - st.header("Compare Accounts") - st.write("Analyze two Twitter accounts side-by-side") + def render_controls(self) -> None: + """Render scheduler management interface.""" + st.header("🕒 Scheduler Management") - # Input fields - col1, col2 = st.columns(2) - with col1: - st.session_state.username1 = st.text_input( - "Enter first Twitter username (without @)", - value=st.session_state.username1, - key="compare_username1", - placeholder="e.g. narendramodi" - ) - with col2: - st.session_state.username2 = st.text_input( - "Enter second Twitter username (without @)", - value=st.session_state.username2, - key="compare_username2", - placeholder="e.g. RahulGandhi" - ) + if not self.db.is_connected: + st.warning("âš ī¸ Database not connected. Scheduler features unavailable.") + return - # Shared settings - col1, col2 = st.columns([1, 1]) - with col1: - # Use a different key for the date input to avoid conflicts - compare_since_date = st.date_input( - "Start date", - value=datetime.strptime(st.session_state.compare_since, "%Y-%m-%d"), - key="compare_since_dateinput" - ) - st.session_state.compare_since = compare_since_date.strftime("%Y-%m-%d") - with col2: - compare_until_date = st.date_input( - "End date", - value=datetime.strptime(st.session_state.compare_until, "%Y-%m-%d"), - key="compare_until_dateinput" - ) - st.session_state.compare_until = compare_until_date.strftime("%Y-%m-%d") + self._display_current_users() + st.divider() + self._display_add_user_form() + st.divider() + self._display_scheduler_info() + + def _display_current_users(self) -> None: + """Display currently scheduled users.""" + st.subheader("📋 Current Scheduled Users") - compare_button = st.button("âš–ī¸ Compare Accounts", key="run_compare", use_container_width=True) - - if compare_button: - if st.session_state.username1 and st.session_state.username2: - # Validate date range - if st.session_state.compare_since > st.session_state.compare_until: - st.error("Start date must be before end date.") - else: - def fetch_and_process_user_data(username, since, until): - date_range = f"{since} to {until}" - with st.spinner(f"Fetching tweets for @{username} from {date_range}..."): - results, dataset_id = run_apify_account_analysis({ - "username": username, - "since": since, - "until": until, - "min_faves": 0, - "min_retweets": 0, - "min_replies": 0 - }) - processed_df, metrics, mock_data = process_tweet_data(results, extract_account_info=True) - - if mock_data: - st.warning(f"Mock data detected for @{username}, indicating limited results in the date range.") - - if not processed_df.empty: - account_details = metrics.get("account_details", {}) - followers_info = f" | {account_details.get('followers_count', 'N/A')} followers" if account_details.get('followers_count') else "" - following_info = f" | {account_details.get('following_count', 'N/A')} following" if account_details.get('following_count') else "" - st.success(f"Found {len(processed_df)} tweets for @{username} from {date_range}{followers_info}{following_info}.") - - # ENHANCED: Debug mode for account details - if account_details: - with st.expander(f"🔍 Debug Account Info for @{username}"): - st.json(account_details) - else: - st.warning(f"No results for @{username} in the specified date range.") - - return processed_df, metrics, dataset_id - - col1, col2 = st.columns(2) - - with col1: - df1, metrics1, dsid1 = fetch_and_process_user_data( - st.session_state.username1, - st.session_state.compare_since, - st.session_state.compare_until - ) - if not df1.empty: - display_compact_analysis(df1, metrics1, st.session_state.username1, dsid1) - - with col2: - df2, metrics2, dsid2 = fetch_and_process_user_data( - st.session_state.username2, - st.session_state.compare_since, - st.session_state.compare_until - ) - if not df2.empty: - display_compact_analysis(df2, metrics2, st.session_state.username2, dsid2) - - # Display tweets side by side after the analysis - if not df1.empty or not df2.empty: - st.divider() - st.subheader("đŸĻ Tweets Comparison") - - col1, col2 = st.columns(2) - + try: + scheduled_users = list(self.db.scheduler_users_collection.find({"active": True})) + usernames = [user["username"] for user in scheduled_users] + except Exception as e: + st.error(f"Error fetching scheduled users: {e}") + return + + if usernames: + for username in usernames: + col1, col2 = st.columns([3, 1]) with col1: - if not df1.empty: - st.markdown(f"### @{st.session_state.username1} Tweets") - display_tweet_list_compact(df1) - else: - st.info(f"No tweets found for @{st.session_state.username1}") - + st.write(f"@{username}") with col2: - if not df2.empty: - st.markdown(f"### @{st.session_state.username2} Tweets") - display_tweet_list_compact(df2) - else: - st.info(f"No tweets found for @{st.session_state.username2}") - + if st.button("đŸ—‘ī¸", key=f"remove_{username}", help=f"Remove @{username}"): + if self._remove_user(username): + st.rerun() else: - st.error("Please enter both Twitter usernames to compare.") - - - -# Scheduler tab -with tabs[3]: - st.header("⏰ Daily Scheduler") - st.write("Configure daily automatic fetching of tweets and storage to MongoDB.") - - # Existing stored usernames - existing_users = get_scheduler_usernames() - if existing_users: - st.markdown("**Current usernames:** " + ", ".join(existing_users)) - - # Remove usernames section - st.subheader("đŸ—‘ī¸ Manage Usernames") - col1, col2 = st.columns([3, 1]) + st.info("No users currently scheduled.") + + def _display_add_user_form(self) -> None: + """Display form to add new users.""" + st.subheader("➕ Add New User") + new_username = st.text_input("Username to schedule (without @)", key="new_scheduled_user") + + col1, col2 = st.columns(2) with col1: - username_to_remove = st.selectbox("Select username to remove", [""] + existing_users, key="username_to_remove") - with col2: - st.write("") # Empty space for alignment - if st.button("đŸ—‘ī¸ Remove", key="remove_username_btn"): - if username_to_remove: - remove_scheduler_username(username_to_remove) - st.success(f"@{username_to_remove} removed from scheduler.") + if st.button("Add User", use_container_width=True, disabled=not new_username): + if self._add_user(new_username): + st.success(f"✅ Added @{new_username} to scheduler") st.rerun() - else: - st.error("Please select a username to remove.") - # Clear all button - if st.button("đŸ—‘ī¸ Clear All Usernames", key="clear_all_btn", type="secondary"): - clear_all_scheduler_usernames() - st.success("All usernames cleared from scheduler.") - st.rerun() - - # Clear database button - st.divider() - st.subheader("đŸ—„ī¸ Database Management") - st.warning("âš ī¸ This will permanently delete all stored tweet data and AI summaries!") - if st.button("đŸ—‘ī¸ Clear All Tweet Data", key="clear_db_btn", type="secondary"): - if MONGODB_AVAILABLE: - deleted_count = clear_all_tweets_data() - if deleted_count > 0: - st.success(f"✅ Cleared {deleted_count} account records from database.") - else: - st.info("Database was already empty.") - else: - st.error("MongoDB not available - cannot clear database.") - else: - st.info("No usernames stored yet.") - - # Add single username - st.subheader("➕ Add Username") - new_user = st.text_input("Add a new Twitter username", key="sched_single_add") - if st.button("➕ Add Username", key="sched_add_btn", use_container_width=True): - if new_user.strip(): - save_scheduler_usernames([new_user.strip()]) - st.success(f"@{new_user.strip()} added to scheduler list.") - st.rerun() - else: - st.error("Enter a valid username.") - - st.divider() - - # Scheduler configuration - st.subheader("âš™ī¸ Scheduler Configuration") - usernames_input = st.text_area("Usernames to schedule (one per line)", value="\n".join(existing_users), key="sched_usernames") - - col1, col2, col3 = st.columns(3) - with col1: - sched_since = st.date_input("Start date", value=(datetime.now() - timedelta(days=1)).date(), key="sched_since") - with col2: - sched_until = st.date_input("End date", value=datetime.now().date(), key="sched_until") - with col3: - sched_time = st.time_input("Run at (24h format)", datetime.now().replace(hour=2, minute=0, second=0, microsecond=0).time(), key="sched_time") - - # Buttons row - col1, col2 = st.columns(2) - with col1: - if st.button("â–ļī¸ Start Scheduler", key="start_scheduler", use_container_width=True): - usernames = [u.strip() for u in usernames_input.split("\n") if u.strip()] - if usernames: - # Validate date range - if sched_since > sched_until: - st.error("Start date must be before end date.") - else: - # Save/update usernames in DB - save_scheduler_usernames(usernames) - - # Clear existing jobs with tag - schedule.clear('tweet_jobs') - - def scheduled_job(): - schedule_fetch(usernames, sched_since.strftime("%Y-%m-%d"), sched_until.strftime("%Y-%m-%d")) - - schedule.every().day.at(sched_time.strftime("%H:%M")).tag('tweet_jobs').do(scheduled_job) - date_range = f"{sched_since.strftime('%Y-%m-%d')} to {sched_until.strftime('%Y-%m-%d')}" - st.success(f"Scheduler started for {len(usernames)} accounts daily at {sched_time.strftime('%H:%M')} for date range {date_range}.") - - # Launch scheduler loop thread if not already running - if 'scheduler_thread' not in st.session_state: - thread = threading.Thread(target=_run_schedule_loop, daemon=True) - thread.start() - st.session_state.scheduler_thread = thread - else: - st.error("Please input at least one username.") + with col2: + if st.button("🔄 Refresh List", use_container_width=True): + st.rerun() + + def _display_scheduler_info(self) -> None: + """Display scheduler information.""" + st.subheader("â„šī¸ Scheduler Info") + st.info(""" + **GitHub Actions Automation:** + - Runs daily at 12:00 AM IST automatically + - Can be triggered manually from GitHub Actions tab + - Scrapes only the previous day's data (no overlap) + - Stores results in MongoDB with duplicate detection + """) + + def _add_user(self, username: str) -> bool: + """Add user to scheduled scraping list.""" + try: + # Check if user already exists + existing_users = list(self.db.scheduler_users_collection.find({"active": True})) + if username in [user["username"] for user in existing_users]: + st.warning("User already scheduled") + return False + + user_doc = { + "username": username, + "active": True, + "added_at": datetime.utcnow(), + "last_scraped": None + } + self.db.scheduler_users_collection.update_one( + {"username": username}, + {"$set": user_doc}, + upsert=True + ) + return True + except Exception as e: + st.error(f"Error adding user: {e}") + return False - with col2: - if st.button("🚀 Scrape Now", key="run_now_btn", use_container_width=True, type="secondary"): - usernames = [u.strip() for u in usernames_input.split("\n") if u.strip()] - if usernames: - # Validate date range - if sched_since > sched_until: - st.error("Start date must be before end date.") - else: - date_range = f"{sched_since.strftime('%Y-%m-%d')} to {sched_until.strftime('%Y-%m-%d')}" - with st.spinner(f"Scraping tweets for {len(usernames)} accounts from {date_range}..."): - try: - total_tweets = 0 - for username in usernames: - with st.spinner(f"Scraping @{username} from {date_range}..."): - results, _ = run_apify_account_analysis({ - "username": username, - "since": sched_since.strftime("%Y-%m-%d"), - "until": sched_until.strftime("%Y-%m-%d"), - "min_faves": 0, - "min_retweets": 0, - "min_replies": 0 - }) - df, metrics, _ = process_tweet_data(results, extract_account_info=True) - if not df.empty: - # Generate AI summary - ai_summary = None - if GENAI_AVAILABLE and GEMINI_API_KEY: - with st.spinner(f"Generating AI summary for @{username}..."): - try: - context = f"The following are account tweets for Twitter account @{username}" - ai_summary = get_gemini_summary(df, context) - except Exception as e: - st.warning(f"AI summary generation failed for @{username}: {e}") - - # ENHANCED: Store with raw data and account details - account_details = metrics.get("account_details", {}) - store_to_mongodb(df, "Account", ai_summary, raw_data=results, account_details=account_details) - total_tweets += len(df) - summary_status = " (with AI summary)" if ai_summary else "" - account_info = f" | Followers: {account_details.get('followers_count', 'N/A')}" if account_details.get('followers_count') else "" - st.success(f"✅ @{username}: {len(df)} tweets scraped and stored from {date_range}{summary_status}{account_info}") - else: - st.warning(f"âš ī¸ @{username}: No tweets found in the specified date range") - - if total_tweets > 0: - st.success(f"🎉 Successfully scraped and stored {total_tweets} tweets from {len(usernames)} accounts in date range {date_range}!") - st.info("Data has been stored in your MongoDB DataCollector database.") - else: - st.warning("No tweets were found for any of the accounts in the specified date range.") - except Exception as e: - st.error(f"❌ Error during scraping: {str(e)}") - else: - st.error("Please input at least one username.") + def _remove_user(self, username: str) -> bool: + """Remove user from scheduled scraping list.""" + try: + self.db.scheduler_users_collection.update_one( + {"username": username}, + {"$set": {"active": False}} + ) + return True + except Exception as e: + st.error(f"Error removing user: {e}") + return False - # Display currently scheduled jobs - jobs = schedule.get_jobs('tweet_jobs') - if jobs: - st.subheader("📅 Scheduled Jobs") - for job in jobs: - st.write(str(job)) - st.info(f"Next run at: {jobs[0].next_run.strftime('%Y-%m-%d %H:%M:%S')}") - - # Stop scheduler button - if jobs: - if st.button("âšī¸ Stop Scheduler", key="stop_scheduler", type="secondary"): - schedule.clear('tweet_jobs') - st.success("Scheduler stopped. All scheduled jobs cleared.") - st.rerun() +# ============================================================================= +# MAIN APPLICATION +# ============================================================================= -# ENHANCED: Show API limitations and setup instructions -st.sidebar.title("📋 API Notes & Features") -st.sidebar.info( - """ - **New Features:** +class TwitterAnalyzerApp: + """Main Twitter Analyzer application.""" - ✅ **Date Range Fetching:** All tweets between start and end dates are fetched (no max limit) + def __init__(self): + self._setup_page() + self._initialize_services() - ✅ **Account Analysis:** Comprehensive account details shown in all analysis views + def _setup_page(self) -> None: + """Configure Streamlit page settings.""" + st.set_page_config(**PAGE_CONFIG) + st.title("đŸĻ Twitter Content Analyzer") - ✅ **Zero Engagement Filters:** Default engagement filters set to 0 for maximum tweet capture - - âš™ī¸ **Optional Filters:** Users can set custom engagement thresholds if desired - - **Known Limitations:** - - đŸšĢ **Tweet-level comment replies** are not available due to Twitter API restrictions. Only direct comments to the main account are fetched. - - âš ī¸ **Tweet count discrepancies** may occur due to: - - Private/protected tweets - - Deleted tweets - - API rate limiting - - Account restrictions - - Language filtering (now disabled by default) - - Time zone differences (API uses UTC, display shows IST) + def _initialize_services(self) -> None: + """Initialize all required services.""" + try: + self.config = AppConfig() + self.db = DatabaseManager(self.config.mongodb_uri) + self.apify = ApifyService(self.config.apify_api_key) + self.gemini = GeminiService(self.config.gemini_api_key) if self.config.gemini_api_key else None + self.processor = TweetDataProcessor() + self.scheduler = SchedulerManager(self.db) + except ValueError as e: + st.error(f"Initialization failed: {e}. Please check your .env.local file.") + st.stop() + + def run(self) -> None: + """Execute the main application.""" + self._render_sidebar() + + if not hasattr(self, 'run_button') or not self.run_button or not self.username: + st.info("Please enter a Twitter username and click 'Analyze' to begin.") + return + + self._perform_analysis() + + def _render_sidebar(self) -> None: + """Render the application sidebar.""" + with st.sidebar: + self._render_analysis_controls() + self._render_debug_options() + st.divider() + self.scheduler.render_controls() - 💡 **Tips for better results:** - - Use appropriate date ranges - - Keep engagement filters at 0 (default) for maximum capture - - Use broader time periods for more comprehensive data - - Check the debug info shown with query results - - Compare against multiple time ranges for consistency + def _render_analysis_controls(self) -> None: + """Render analysis control widgets.""" + st.header("âš™ī¸ Analysis Controls") + + self.analysis_type = st.radio( + "Analysis Type", + ["Account's Tweets", "Comments to Account"], + horizontal=True + ) + self.username = st.text_input("Twitter Username (without @)", DEFAULT_USERNAME) + + # Date inputs + today = datetime.now() + last_week = today - timedelta(days=DEFAULT_DAYS_BACK) + + self.since_date = st.date_input("Start Date", last_week) + self.until_date = st.date_input("End Date", today) + + self.run_button = st.button("🚀 Analyze", use_container_width=True, type="primary") - 🔧 **Troubleshooting discrepancies:** - - Twitter's web interface may include/exclude different content types - - Retweets are now included by default for better accuracy - - Language filter removed to capture all tweets - - Check the raw results count vs processed count - """ -) - -# Show instructions for setting up Gemini -if not GENAI_AVAILABLE or not GEMINI_API_KEY: - st.sidebar.title("Setup Gemini API") + def _render_debug_options(self) -> None: + """Render debug options.""" + with st.expander("🔧 Debug Options"): + st.session_state['debug_mode'] = st.checkbox( + "Show API Debug Info", + help="Shows raw API data for troubleshooting" + ) - if not GENAI_AVAILABLE: - st.sidebar.error( - """ - The Google Generative AI package is not installed. + def _perform_analysis(self) -> None: + """Perform the main analysis workflow.""" + since_str = self.since_date.strftime("%Y-%m-%d") + until_str = self.until_date.strftime("%Y-%m-%d") + + # Fetch data based on analysis type + try: + if self.analysis_type == "Account's Tweets": + raw_data, dataset_id = self.apify.fetch_account_tweets(self.username, since_str, until_str) + context = f"This is an analysis of tweets by the Twitter account @{self.username}." + else: + raw_data, dataset_id = self.apify.fetch_account_comments(self.username, since_str, until_str) + context = f"This is an analysis of comments/replies sent to the Twitter account @{self.username}." - Install it by running: - ``` - pip install google-generativeai - ``` - Then restart the application. - """ - ) - - if GENAI_AVAILABLE and not GEMINI_API_KEY: - st.sidebar.info( - """ - To enable the Gemini summarization feature: - 1. Get an API key from [Google AI Studio](https://aistudio.google.com/) - 2. Add the key to your .env.local file as: - ``` - GEMINI_API_KEY=your_api_key_here - ``` - 3. Restart the application - """ - ) - -# Show MongoDB status -st.sidebar.title("Database Status") -if MONGODB_AVAILABLE: - st.sidebar.success("✅ MongoDB Connected") -else: - st.sidebar.error("âš ī¸ MongoDB Offline") - st.sidebar.info( - """ - Running in offline mode. - Data will not be stored to database. - - To connect to MongoDB: - 1. Check your internet connection - 2. Verify MongoDB Atlas cluster is running - 3. Check MONGODB_URI in .env.local - """ - ) + if not raw_data: + st.error("No data was returned from the API. The account may be private, have no tweets in the selected range, or there might be an API issue.") + return + + # Process data + df, metrics = self.processor.process_tweets(raw_data) + + # Generate AI summary if available + gemini_summary = None + if self.gemini: + gemini_summary = self.gemini.generate_analysis(df.head(100), context) + else: + st.warning("GEMINI_API_KEY not found. AI summary will be skipped.") + + # Display results + dashboard = TwitterDashboard(df, metrics, dataset_id, self.analysis_type, gemini_summary) + dashboard.render() + + except Exception as e: + logger.error(f"Analysis failed: {e}") + st.error(f"Analysis failed: {str(e)}") -# Update requirements.txt file if it exists and does not contain the package -try: - with open("requirements.txt", "r") as f: - requirements = f.read() - - updated_requirements = False - - if "google-generativeai" not in requirements: - with open("requirements.txt", "a") as f: - f.write("\ngoogle-generativeai>=0.3.0\n") - updated_requirements = True - - if "pytz" not in requirements: - with open("requirements.txt", "a") as f: - f.write("\npytz\n") - updated_requirements = True - - if "pymongo" not in requirements: - with open("requirements.txt", "a") as f: - f.write("\npymongo>=4.6.0\n") - updated_requirements = True - - if "schedule" not in requirements: - with open("requirements.txt", "a") as f: - f.write("\nschedule\n") - updated_requirements = True -except: - pass +# ============================================================================= +# APPLICATION ENTRY POINT +# ============================================================================= +def main(): + """Application entry point.""" + app = TwitterAnalyzerApp() + app.run() -# Footer with attribution -st.divider() -st.caption("Powered by Apify Twitter Scraper API â€ĸ Created with Streamlit â€ĸ AI Summaries by Google Gemini â€ĸ Times in Indian Standard Time (IST)") \ No newline at end of file +if __name__ == "__main__": + main()