Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| from datetime import datetime, timedelta | |
| import nltk | |
| from nltk.sentiment import SentimentIntensityAnalyzer | |
| from textblob import TextBlob | |
| from transformers import pipeline | |
| from wordcloud import WordCloud | |
| import base64 | |
| from io import BytesIO | |
| import plotly.express as px | |
| import praw | |
| from googleapiclient.discovery import build | |
| from sklearn.linear_model import Ridge | |
| import os | |
| # -------------------------- | |
| # Initial Setup | |
| # -------------------------- | |
| # Configure page | |
| st.set_page_config( | |
| page_title="SentimentSync Pro", | |
| page_icon="📈", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| # -------------------------- | |
| # Configuration | |
| # -------------------------- | |
| class Config: | |
| # API Keys - Replace with your actual keys or use environment variables | |
| YOUTUBE_API_KEY = os.getenv("YT_API_KEY", "your_youtube_api_key_here") | |
| REDDIT_CLIENT_ID = os.getenv("REDDIT_CLIENT_ID", "your_reddit_client_id") | |
| REDDIT_CLIENT_SECRET = os.getenv("REDDIT_CLIENT_SECRET", "your_reddit_secret") | |
| REDDIT_USER_AGENT = "SentimentAnalysisBot/1.0" | |
| # NLTK data path | |
| NLTK_DATA_PATH = os.path.join(os.path.expanduser("~"), "nltk_data") | |
| # Sentiment thresholds | |
| POSITIVE_THRESHOLD = 0.1 | |
| NEGATIVE_THRESHOLD = -0.1 | |
| # -------------------------- | |
| # Initialize Resources | |
| # -------------------------- | |
| def initialize_resources(): | |
| """Initialize all required resources with proper error handling""" | |
| try: | |
| # Setup NLTK data | |
| os.makedirs(Config.NLTK_DATA_PATH, exist_ok=True) | |
| nltk.data.path.append(Config.NLTK_DATA_PATH) | |
| required_nltk = ['punkt', 'stopwords', 'vader_lexicon'] | |
| for package in required_nltk: | |
| try: | |
| nltk.data.find(f'tokenizers/{package}') | |
| except LookupError: | |
| nltk.download(package, download_dir=Config.NLTK_DATA_PATH) | |
| except Exception as e: | |
| st.error(f"NLTK initialization failed: {str(e)}") | |
| return False | |
| # Initialize sentiment analyzers | |
| try: | |
| st.session_state.vader = SentimentIntensityAnalyzer() | |
| st.session_state.bert = pipeline( | |
| "sentiment-analysis", | |
| model="nlptown/bert-base-multilingual-uncased-sentiment" | |
| ) | |
| except Exception as e: | |
| st.error(f"Model initialization failed: {str(e)}") | |
| return False | |
| # Initialize API clients | |
| try: | |
| st.session_state.reddit = praw.Reddit( | |
| client_id=Config.REDDIT_CLIENT_ID, | |
| client_secret=Config.REDDIT_CLIENT_SECRET, | |
| user_agent=Config.REDDIT_USER_AGENT | |
| ) | |
| except Exception as e: | |
| st.error(f"Reddit client initialization failed: {str(e)}") | |
| st.session_state.reddit = None | |
| try: | |
| if Config.YOUTUBE_API_KEY.startswith("your_"): | |
| st.session_state.youtube = None | |
| else: | |
| st.session_state.youtube = build( | |
| 'youtube', | |
| 'v3', | |
| developerKey=Config.YOUTUBE_API_KEY, | |
| cache_discovery=False | |
| ) | |
| except Exception as e: | |
| st.error(f"YouTube client initialization failed: {str(e)}") | |
| st.session_state.youtube = None | |
| return True | |
| if not initialize_resources(): | |
| st.error("Critical initialization failed. Check error messages above.") | |
| st.stop() | |
| # -------------------------- | |
| # Core Functions | |
| # -------------------------- | |
| def analyze_sentiment(text): | |
| """Analyze text using multiple sentiment models""" | |
| results = { | |
| 'vader': 0, | |
| 'bert': 0, | |
| 'textblob': 0, | |
| 'bert_label': 'Error', | |
| 'bert_score': 0 | |
| } | |
| try: | |
| # VADER | |
| results['vader'] = st.session_state.vader.polarity_scores(text)['compound'] | |
| # BERT (with truncation for long texts) | |
| bert_result = st.session_state.bert(text[:512])[0] | |
| results['bert_label'] = bert_result['label'] | |
| results['bert_score'] = bert_result['score'] | |
| # Convert BERT label to numeric score | |
| label_map = { | |
| '1 star': -1, | |
| '2 stars': -0.5, | |
| '3 stars': 0, | |
| '4 stars': 0.5, | |
| '5 stars': 1 | |
| } | |
| results['bert'] = label_map.get(bert_result['label'], 0) | |
| # TextBlob | |
| results['textblob'] = TextBlob(text).sentiment.polarity | |
| except Exception as e: | |
| st.error(f"Sentiment analysis error: {str(e)}") | |
| return results | |
| def fetch_youtube_data(keyword, max_results=25): | |
| """Fetch YouTube data with enhanced error handling""" | |
| if st.session_state.youtube is None: | |
| st.warning("YouTube API not configured") | |
| return pd.DataFrame() | |
| try: | |
| # Search for videos | |
| search_response = st.session_state.youtube.search().list( | |
| q=keyword, | |
| part="snippet", | |
| maxResults=max_results, | |
| type="video", | |
| order="relevance", | |
| safeSearch="moderate" | |
| ).execute() | |
| # Get video details | |
| video_ids = [item['id']['videoId'] for item in search_response['items']] | |
| videos_response = st.session_state.youtube.videos().list( | |
| part="snippet,statistics", | |
| id=",".join(video_ids) | |
| ).execute() | |
| # Process results | |
| data = [] | |
| for item in videos_response['items']: | |
| snippet = item['snippet'] | |
| stats = item.get('statistics', {}) | |
| data.append({ | |
| 'source': 'YouTube', | |
| 'date': datetime.strptime(snippet['publishedAt'], '%Y-%m-%dT%H:%M:%SZ'), | |
| 'title': snippet['title'], | |
| 'text': f"{snippet['title']}\n{snippet['description']}", | |
| 'url': f"https://youtu.be/{item['id']}", | |
| 'views': int(stats.get('viewCount', 0)), | |
| 'likes': int(stats.get('likeCount', 0)), | |
| 'comments': int(stats.get('commentCount', 0)), | |
| 'thumbnail': snippet['thumbnails']['default']['url'] | |
| }) | |
| return pd.DataFrame(data) | |
| except Exception as e: | |
| st.error(f"Error fetching YouTube data: {str(e)}") | |
| return pd.DataFrame() | |
| def fetch_reddit_data(keyword, limit=50): | |
| """Fetch Reddit posts with error handling""" | |
| if st.session_state.reddit is None: | |
| st.warning("Reddit API not configured") | |
| return pd.DataFrame() | |
| try: | |
| posts = st.session_state.reddit.subreddit("all").search( | |
| query=keyword, | |
| limit=limit, | |
| time_filter="month" | |
| ) | |
| data = [] | |
| for post in posts: | |
| data.append({ | |
| 'source': 'Reddit', | |
| 'date': datetime.fromtimestamp(post.created_utc), | |
| 'title': post.title, | |
| 'text': f"{post.title}\n\n{post.selftext}", | |
| 'url': f"https://reddit.com{post.permalink}", | |
| 'upvotes': post.score, | |
| 'comments': post.num_comments, | |
| 'thumbnail': post.thumbnail if post.thumbnail not in ['self', 'default'] else None | |
| }) | |
| return pd.DataFrame(data) | |
| except Exception as e: | |
| st.error(f"Error fetching Reddit data: {str(e)}") | |
| return pd.DataFrame() | |
| # -------------------------- | |
| # Visualization Functions | |
| # -------------------------- | |
| def create_wordcloud(text): | |
| """Generate a word cloud with proper error handling""" | |
| try: | |
| wc = WordCloud( | |
| width=800, | |
| height=400, | |
| background_color='white', | |
| stopwords=set(nltk.corpus.stopwords.words('english')), | |
| collocations=False | |
| ).generate(text) | |
| img = BytesIO() | |
| wc.to_image().save(img, format='PNG') | |
| return base64.b64encode(img.getvalue()).decode() | |
| except Exception as e: | |
| st.error(f"Word cloud error: {str(e)}") | |
| return None | |
| def plot_sentiment_timeline(df): | |
| """Interactive timeline plot of sentiment""" | |
| try: | |
| fig = px.line( | |
| df, | |
| x='date', | |
| y='average_sentiment', | |
| color='source', | |
| title='Sentiment Over Time', | |
| labels={'average_sentiment': 'Sentiment Score', 'date': 'Date'}, | |
| hover_data=['title', 'source', 'url'], | |
| template='plotly_white' | |
| ) | |
| fig.update_traces(mode='markers+lines') | |
| fig.update_layout(hovermode='x unified') | |
| st.plotly_chart(fig, use_container_width=True) | |
| except Exception as e: | |
| st.error(f"Plotting error: {str(e)}") | |
| # -------------------------- | |
| # UI Components | |
| # -------------------------- | |
| def sidebar_controls(): | |
| """Render sidebar controls""" | |
| with st.sidebar: | |
| st.title("🔧 Controls") | |
| analysis_mode = st.radio( | |
| "Analysis Mode", | |
| ["Text Input", "Live Data"], | |
| index=0, | |
| key='analysis_mode' | |
| ) | |
| if st.session_state.analysis_mode == "Text Input": | |
| st.session_state.user_text = st.text_area( | |
| "Enter your text:", | |
| height=200, | |
| placeholder="Type or paste text here..." | |
| ) | |
| else: | |
| st.session_state.search_keyword = st.text_input( | |
| "Search keyword:", | |
| placeholder="e.g., Tesla, AI, etc." | |
| ) | |
| col1, col2 = st.columns(2) | |
| st.session_state.use_reddit = col1.checkbox("Reddit", True) | |
| st.session_state.use_youtube = col2.checkbox("YouTube", True) | |
| st.session_state.max_results = st.slider( | |
| "Max results per source:", | |
| 10, 100, 25 | |
| ) | |
| st.markdown("---") | |
| if st.button("Analyze", type="primary"): | |
| st.session_state.analyze_clicked = True | |
| if st.button("Reset"): | |
| st.session_state.clear() | |
| st.rerun() | |
| # -------------------------- | |
| # Main App | |
| # -------------------------- | |
| def main(): | |
| st.title("📊 SentimentSync Pro") | |
| st.caption("Advanced sentiment analysis across multiple platforms") | |
| sidebar_controls() | |
| if not hasattr(st.session_state, 'analyze_clicked') or not st.session_state.analyze_clicked: | |
| st.info("Configure your analysis using the sidebar controls") | |
| return | |
| # Perform analysis based on selected mode | |
| if st.session_state.analysis_mode == "Text Input": | |
| analyze_text_input() | |
| else: | |
| analyze_live_data() | |
| def analyze_text_input(): | |
| """Analyze manually entered text""" | |
| if not st.session_state.user_text or len(st.session_state.user_text.strip()) < 10: | |
| st.warning("Please enter at least 10 characters of text") | |
| return | |
| with st.spinner("Analyzing text..."): | |
| # Overall sentiment | |
| sentiment = analyze_sentiment(st.session_state.user_text) | |
| # Display results | |
| col1, col2, col3 = st.columns(3) | |
| col1.metric("VADER Score", f"{sentiment['vader']:.2f}", | |
| delta_color="inverse" if sentiment['vader'] < 0 else "normal") | |
| col2.metric("BERT Sentiment", sentiment['bert_label'], f"{sentiment['bert_score']:.2f}") | |
| col3.metric("TextBlob Score", f"{sentiment['textblob']:.2f}") | |
| # Word cloud | |
| st.subheader("Word Cloud") | |
| wc_img = create_wordcloud(st.session_state.user_text) | |
| if wc_img: | |
| st.image(f"data:image/png;base64,{wc_img}", use_container_width=True) | |
| # Sentence-level analysis | |
| try: | |
| sentences = nltk.sent_tokenize(st.session_state.user_text) | |
| if len(sentences) > 1: | |
| st.subheader("Sentence Breakdown") | |
| sent_data = [] | |
| for i, sent in enumerate(sentences): | |
| sent_sentiment = analyze_sentiment(sent) | |
| sent_data.append({ | |
| 'Sentence': sent[:150] + ("..." if len(sent) > 150 else ""), | |
| 'VADER': sent_sentiment['vader'], | |
| 'BERT': sent_sentiment['bert'], | |
| 'TextBlob': sent_sentiment['textblob'], | |
| 'Average': np.mean([ | |
| sent_sentiment['vader'], | |
| sent_sentiment['bert'], | |
| sent_sentiment['textblob'] | |
| ]) | |
| }) | |
| sent_df = pd.DataFrame(sent_data) | |
| # Fixed the dataframe display with proper parenthesis closure | |
| styled_df = sent_df.style.background_gradient( | |
| cmap='RdYlGn', | |
| subset=['VADER', 'BERT', 'TextBlob', 'Average'], | |
| vmin=-1, | |
| vmax=1 | |
| ) | |
| st.dataframe( | |
| styled_df, | |
| use_container_width=True, | |
| height=min(400, 35 * len(sent_df)) | |
| except Exception as e: | |
| st.error(f"Sentence analysis error: {str(e)}") | |
| def analyze_live_data(): | |
| """Analyze live data from APIs""" | |
| if not st.session_state.search_keyword: | |
| st.warning("Please enter a search keyword") | |
| return | |
| if not st.session_state.use_reddit and not st.session_state.use_youtube: | |
| st.warning("Please select at least one data source") | |
| return | |
| with st.spinner(f"Fetching data for '{st.session_state.search_keyword}'..."): | |
| # Fetch data | |
| dfs = [] | |
| if st.session_state.use_reddit: | |
| reddit_df = fetch_reddit_data( | |
| st.session_state.search_keyword, | |
| st.session_state.max_results | |
| ) | |
| if not reddit_df.empty: | |
| dfs.append(reddit_df) | |
| if st.session_state.use_youtube: | |
| youtube_df = fetch_youtube_data( | |
| st.session_state.search_keyword, | |
| st.session_state.max_results | |
| ) | |
| if not youtube_df.empty: | |
| dfs.append(youtube_df) | |
| if not dfs: | |
| st.error("No data found. Try different keywords or sources.") | |
| return | |
| df = pd.concat(dfs, ignore_index=True) | |
| # Analyze sentiment | |
| with st.spinner("Analyzing sentiment..."): | |
| sentiment_results = [] | |
| for text in df['text']: | |
| res = analyze_sentiment(text) | |
| sentiment_results.append({ | |
| 'vader': res['vader'], | |
| 'bert': res['bert'], | |
| 'textblob': res['textblob'], | |
| 'average_sentiment': np.mean([res['vader'], res['bert'], res['textblob']]) | |
| }) | |
| sentiment_df = pd.DataFrame(sentiment_results) | |
| df = pd.concat([df, sentiment_df], axis=1) | |
| # Filter recent data | |
| df = df[df['date'] >= (datetime.now() - timedelta(days=60))] | |
| df = df.sort_values('date') | |
| # Calculate moving average | |
| df['rolling_sentiment'] = df['average_sentiment'].rolling( | |
| window=7, | |
| min_periods=1 | |
| ).mean() | |
| # Display results | |
| st.subheader(f"Results for: '{st.session_state.search_keyword}'") | |
| # Overall metrics | |
| avg_sentiment = df['average_sentiment'].mean() | |
| pos_pct = (df['average_sentiment'] > Config.POSITIVE_THRESHOLD).mean() * 100 | |
| neg_pct = (df['average_sentiment'] < Config.NEGATIVE_THRESHOLD).mean() * 100 | |
| col1, col2, col3 = st.columns(3) | |
| col1.metric("Average Sentiment", f"{avg_sentiment:.2f}") | |
| col2.metric("Positive Content", f"{pos_pct:.1f}%") | |
| col3.metric("Negative Content", f"{neg_pct:.1f}%") | |
| # Word cloud | |
| st.subheader("Word Cloud") | |
| combined_text = " ".join(df['text']) | |
| wc_img = create_wordcloud(combined_text) | |
| if wc_img: | |
| st.image(f"data:image/png;base64,{wc_img}", use_container_width=True) | |
| # Timeline visualization | |
| st.subheader("Sentiment Timeline") | |
| plot_sentiment_timeline(df) | |
| # Raw data | |
| with st.expander("View Raw Data"): | |
| st.dataframe(df, use_container_width=True) | |
| if __name__ == "__main__": | |
| main() |