Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| from transformers import pipeline | |
| from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer | |
| import numpy as np | |
| import pandas as pd | |
| from datetime import datetime, timedelta | |
| import plotly.express as px | |
| from sklearn.linear_model import Ridge | |
| from sklearn.ensemble import RandomForestRegressor | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import mean_absolute_error | |
| from wordcloud import WordCloud | |
| import base64 | |
| from io import BytesIO | |
| import nltk | |
| from textblob import TextBlob | |
| import praw | |
| from googleapiclient.discovery import build | |
| import os | |
| from statsmodels.tsa.arima.model import ARIMA | |
| from prophet import Prophet | |
| # -------------------------- | |
| # Initial Setup & Configuration | |
| # -------------------------- | |
| # Set page config | |
| st.set_page_config( | |
| page_title="๐ SentimentSync: Live Sentiment Analysis & Prediction Dashboard", | |
| page_icon="๐", | |
| layout="wide" | |
| ) | |
| # -------------------------- | |
| # NLTK Data Download | |
| # -------------------------- | |
| def download_nltk_data(): | |
| try: | |
| nltk_data_dir = os.path.join(os.path.expanduser("~"), "nltk_data") | |
| if not os.path.exists(nltk_data_dir): | |
| os.makedirs(nltk_data_dir) | |
| nltk.download('punkt', download_dir=nltk_data_dir) | |
| nltk.download('stopwords', download_dir=nltk_data_dir) | |
| nltk.download('punkt_tab', download_dir=nltk_data_dir) | |
| nltk.data.path.append(nltk_data_dir) | |
| except Exception as e: | |
| st.error(f"Error downloading NLTK data: {str(e)}") | |
| return False | |
| return True | |
| if not download_nltk_data(): | |
| st.warning("Some NLTK features may not work properly without the required data files.") | |
| # -------------------------- | |
| # Model Initialization | |
| # -------------------------- | |
| def load_models(): | |
| try: | |
| # Initialize sentiment models | |
| bert_sentiment = pipeline( | |
| "sentiment-analysis", | |
| model="nlptown/bert-base-multilingual-uncased-sentiment" | |
| ) | |
| vader_analyzer = SentimentIntensityAnalyzer() | |
| return bert_sentiment, vader_analyzer | |
| except Exception as e: | |
| st.error(f"Error loading models: {str(e)}") | |
| return None, None | |
| bert_sentiment, vader_analyzer = load_models() | |
| if bert_sentiment is None or vader_analyzer is None: | |
| st.stop() | |
| # -------------------------- | |
| # API Clients Setup | |
| # -------------------------- | |
| def setup_api_clients(): | |
| try: | |
| # Reddit API setup | |
| reddit = praw.Reddit( | |
| client_id="S7pTXhj5JDFGDb3-_zrJEA", | |
| client_secret="QP3NYN4lrAKVLrBamzLGrpFywiVg8w", | |
| user_agent="SoundaryaR_Bot/1.0" | |
| ) | |
| youtube = build('youtube', 'v3', developerKey="AIzaSyDcUAkcoPvkTwN_tksmiW0dVPI5Bse7qos") | |
| return reddit, youtube | |
| except Exception as e: | |
| st.error(f"Error setting up API clients: {str(e)}") | |
| return None, None | |
| reddit, youtube = setup_api_clients() | |
| if reddit is None or youtube is None: | |
| st.stop() | |
| # -------------------------- | |
| # Helper Functions | |
| # -------------------------- | |
| def bert_score(result): | |
| """Convert BERT label to numerical score""" | |
| label_map = { | |
| '1 star': -1, | |
| '2 stars': -0.5, | |
| '3 stars': 0, | |
| '4 stars': 0.5, | |
| '5 stars': 1 | |
| } | |
| return label_map.get(result['label'], 0) | |
| def analyze_text(text): | |
| """Analyze sentiment using multiple models""" | |
| try: | |
| vader_score = vader_analyzer.polarity_scores(text)['compound'] | |
| bert_result = bert_sentiment(text[:512])[0] # Truncate to avoid token limits | |
| bert_num = bert_score(bert_result) | |
| textblob_score = TextBlob(text).sentiment.polarity | |
| return vader_score, bert_num, textblob_score, bert_result | |
| except Exception as e: | |
| st.error(f"Error analyzing text: {str(e)}") | |
| return 0, 0, 0, {'label': 'Error', 'score': 0} | |
| def generate_wordcloud(text): | |
| """Generate word cloud image""" | |
| try: | |
| wordcloud = WordCloud( | |
| width=800, | |
| height=400, | |
| background_color='white', | |
| stopwords=nltk.corpus.stopwords.words('english') | |
| ).generate(text) | |
| img = BytesIO() | |
| wordcloud.to_image().save(img, format='PNG') | |
| return base64.b64encode(img.getvalue()).decode() | |
| except Exception as e: | |
| st.error(f"Error generating word cloud: {str(e)}") | |
| return "" | |
| def prepare_time_series_data(df): | |
| """Prepare time series data for forecasting""" | |
| try: | |
| # Resample to daily data | |
| ts_df = df.set_index('date').resample('D').agg({ | |
| 'Average': 'mean', | |
| 'VADER': 'mean', | |
| 'BERT': 'mean', | |
| 'TextBlob': 'mean' | |
| }).ffill().reset_index() | |
| # Create features | |
| ts_df['day_of_week'] = ts_df['date'].dt.dayofweek | |
| ts_df['day_of_month'] = ts_df['date'].dt.day | |
| ts_df['days_since_start'] = (ts_df['date'] - ts_df['date'].min()).dt.days | |
| return ts_df | |
| except Exception as e: | |
| st.error(f"Error preparing time series data: {str(e)}") | |
| return None | |
| def predict_sentiment_prophet(df, periods=15): | |
| """Predict future sentiment using Facebook Prophet""" | |
| try: | |
| # Prepare data for Prophet | |
| prophet_df = df[['date', 'Average']].rename(columns={'date': 'ds', 'Average': 'y'}) | |
| # Initialize and fit model | |
| model = Prophet( | |
| daily_seasonality=True, | |
| weekly_seasonality=True, | |
| yearly_seasonality=False | |
| ) | |
| model.fit(prophet_df) | |
| # Make future dataframe | |
| future = model.make_future_dataframe(periods=periods) | |
| # Predict | |
| forecast = model.predict(future) | |
| return forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].rename(columns={ | |
| 'ds': 'date', | |
| 'yhat': 'predicted_sentiment', | |
| 'yhat_lower': 'lower_bound', | |
| 'yhat_upper': 'upper_bound' | |
| }) | |
| except Exception as e: | |
| st.error(f"Error with Prophet prediction: {str(e)}") | |
| return None | |
| def predict_sentiment_arima(df, periods=15): | |
| """Predict future sentiment using ARIMA""" | |
| try: | |
| # Fit ARIMA model | |
| model = ARIMA(df['Average'], order=(2, 1, 2)) | |
| model_fit = model.fit() | |
| # Make predictions | |
| forecast = model_fit.forecast(steps=periods) | |
| # Create future dates | |
| last_date = df['date'].max() | |
| future_dates = [last_date + timedelta(days=i) for i in range(1, periods+1)] | |
| return pd.DataFrame({ | |
| 'date': future_dates, | |
| 'predicted_sentiment': forecast, | |
| 'model': 'ARIMA' | |
| }) | |
| except Exception as e: | |
| st.error(f"Error with ARIMA prediction: {str(e)}") | |
| return None | |
| def predict_sentiment_rf(df, periods=15): | |
| """Predict future sentiment using Random Forest""" | |
| try: | |
| # Prepare features | |
| ts_df = prepare_time_series_data(df) | |
| if ts_df is None or len(ts_df) < 10: | |
| return None | |
| X = ts_df[['days_since_start', 'day_of_week', 'day_of_month']] | |
| y = ts_df['Average'] | |
| # Train model | |
| model = RandomForestRegressor(n_estimators=100, random_state=42) | |
| model.fit(X, y) | |
| # Create future features | |
| last_date = ts_df['date'].max() | |
| future_dates = [last_date + timedelta(days=i) for i in range(1, periods+1)] | |
| future_days_since = [(d - ts_df['date'].min()).days for d in future_dates] | |
| future_X = pd.DataFrame({ | |
| 'days_since_start': future_days_since, | |
| 'day_of_week': [d.weekday() for d in future_dates], | |
| 'day_of_month': [d.day for d in future_dates] | |
| }) | |
| # Make predictions | |
| predictions = model.predict(future_X) | |
| return pd.DataFrame({ | |
| 'date': future_dates, | |
| 'predicted_sentiment': predictions, | |
| 'model': 'Random Forest' | |
| }) | |
| except Exception as e: | |
| st.error(f"Error with Random Forest prediction: {str(e)}") | |
| return None | |
| def plot_sentiment_predictions(history_df, predictions): | |
| """Plot historical data and predictions""" | |
| try: | |
| # Prepare historical data | |
| history_df = history_df.set_index('date').resample('D')['Average'].mean().reset_index() | |
| # Create figure | |
| fig = px.line(history_df, x='date', y='Average', | |
| title='Historical Sentiment & Future Predictions', | |
| labels={'Average': 'Sentiment Score'}) | |
| # Add prediction traces | |
| for model_name, pred_df in predictions.items(): | |
| if pred_df is not None: | |
| fig.add_scatter(x=pred_df['date'], y=pred_df['predicted_sentiment'], | |
| mode='lines', name=f'{model_name} Prediction', | |
| line=dict(dash='dot')) | |
| # Add confidence interval if available | |
| if 'lower_bound' in pred_df.columns and 'upper_bound' in pred_df.columns: | |
| fig.add_trace(px.area(pred_df, x='date', | |
| y_upper='upper_bound', | |
| y_lower='lower_bound', | |
| title='').data[0]) | |
| fig.update_layout(hovermode="x unified", showlegend=True) | |
| return fig | |
| except Exception as e: | |
| st.error(f"Error plotting predictions: {str(e)}") | |
| return None | |
| # -------------------------- | |
| # Data Fetching Functions | |
| # -------------------------- | |
| # Cache for 1 hour | |
| def fetch_reddit_data(keyword, limit=50): | |
| """Fetch Reddit posts containing the keyword""" | |
| try: | |
| subreddit = reddit.subreddit("all") | |
| posts = subreddit.search(keyword, limit=limit) | |
| data = [] | |
| for post in posts: | |
| data.append({ | |
| 'date': datetime.fromtimestamp(post.created_utc), | |
| 'text': f"{post.title}\n{post.selftext}", | |
| 'source': 'Reddit', | |
| 'url': f"https://reddit.com{post.permalink}" | |
| }) | |
| return pd.DataFrame(data) | |
| except Exception as e: | |
| st.error(f"Error fetching Reddit data: {str(e)}") | |
| return pd.DataFrame() | |
| # Cache for 1 hour | |
| def fetch_youtube_data(keyword, limit=100): | |
| """Fetch YouTube videos containing the keyword""" | |
| try: | |
| request = youtube.search().list( | |
| q=keyword, | |
| part="snippet", | |
| maxResults=limit, | |
| type="video", | |
| order="relevance" | |
| ) | |
| response = request.execute() | |
| data = [] | |
| for item in response['items']: | |
| data.append({ | |
| 'date': datetime.strptime(item['snippet']['publishedAt'], '%Y-%m-%dT%H:%M:%SZ'), | |
| 'text': f"{item['snippet']['title']}\n{item['snippet']['description']}", | |
| 'source': 'YouTube', | |
| 'url': f"https://youtube.com/watch?v={item['id']['videoId']}" | |
| }) | |
| return pd.DataFrame(data) | |
| except Exception as e: | |
| st.error(f"Error fetching YouTube data: {str(e)}") | |
| return pd.DataFrame() | |
| # -------------------------- | |
| # Visualization Functions | |
| # -------------------------- | |
| def plot_sentiment_trends(df, keyword): | |
| """Plot sentiment trends over time""" | |
| try: | |
| fig = px.line( | |
| df, | |
| x='date', | |
| y=["VADER", "BERT", "TextBlob", "Average"], | |
| title=f'Sentiment Over Time for "{keyword}"', | |
| labels={'value': 'Sentiment Score', 'date': 'Date'}, | |
| color_discrete_map={ | |
| "VADER": "#636EFA", | |
| "BERT": "#EF553B", | |
| "TextBlob": "#00CC96", | |
| "Average": "#AB63FA" | |
| } | |
| ) | |
| fig.update_layout(hovermode="x unified") | |
| st.plotly_chart(fig, use_container_width=True) | |
| except Exception as e: | |
| st.error(f"Error plotting sentiment trends: {str(e)}") | |
| def plot_sentiment_distribution(df, keyword): | |
| """Plot sentiment distribution""" | |
| try: | |
| dist_values = [ | |
| sum(df['Average'] > 0.1), # Positive | |
| sum(df['Average'] < -0.1), # Negative | |
| sum((df['Average'] >= -0.1) & (df['Average'] <= 0.1)) # Neutral | |
| ] | |
| fig = px.pie( | |
| values=dist_values, | |
| names=['Positive', 'Negative', 'Neutral'], | |
| title=f'Sentiment Distribution for "{keyword}"', | |
| color=['Positive', 'Negative', 'Neutral'], | |
| color_discrete_map={ | |
| 'Positive': '#00CC96', | |
| 'Negative': '#EF553B', | |
| 'Neutral': '#636EFA' | |
| }, | |
| hole=0.3 | |
| ) | |
| st.plotly_chart(fig, use_container_width=True) | |
| except Exception as e: | |
| st.error(f"Error plotting sentiment distribution: {str(e)}") | |
| # -------------------------- | |
| # Main App Interface | |
| # -------------------------- | |
| def main(): | |
| st.title("๐ SentimentSync: Live Sentiment Analysis & Prediction Dashboard") | |
| # Sidebar controls | |
| with st.sidebar: | |
| st.header("๐ Analysis Controls") | |
| analysis_mode = st.radio( | |
| "Analysis Mode", | |
| ["Manual Text", "Live Data (Reddit & YouTube)"], | |
| index=0 | |
| ) | |
| if analysis_mode == "Manual Text": | |
| user_input = st.text_area( | |
| "Enter text for sentiment analysis", | |
| height=200, | |
| placeholder="Type or paste your text here..." | |
| ) | |
| analyze_btn = st.button("Analyze Text") | |
| else: | |
| keyword = st.text_input( | |
| "Enter keyword for live data", | |
| placeholder="e.g., Tesla, Bitcoin, etc." | |
| ) | |
| analyze_btn = st.button("Fetch & Analyze Data") | |
| st.markdown("---") | |
| st.markdown("### Settings") | |
| show_raw_data = st.checkbox("Show raw data", value=False) | |
| enable_prediction = st.checkbox("Enable sentiment prediction", value=True) | |
| st.markdown("---") | |
| st.button("๐ Reset Analysis") | |
| # Main content area | |
| if analyze_btn: | |
| with st.spinner("Analyzing..."): | |
| if analysis_mode == "Manual Text": | |
| if not user_input or not any(c.isalpha() for c in user_input): | |
| st.warning("Please enter valid text for analysis") | |
| return | |
| # Analyze the text | |
| vader_score, bert_num, textblob_score, bert_result = analyze_text(user_input) | |
| # Display results | |
| st.subheader("๐ Sentiment Analysis Results") | |
| cols = st.columns(3) | |
| cols[0].metric("VADER Score", f"{vader_score:.2f}", | |
| "Positive" if vader_score > 0 else "Negative" if vader_score < 0 else "Neutral") | |
| cols[1].metric("BERT Sentiment", bert_result['label'], f"Confidence: {bert_result['score']:.2f}") | |
| cols[2].metric("TextBlob Polarity", f"{textblob_score:.2f}", | |
| "Positive" if textblob_score > 0 else "Negative" if textblob_score < 0 else "Neutral") | |
| # Word cloud | |
| st.subheader("๐ Word Cloud") | |
| wordcloud_img = f'data:image/png;base64,{generate_wordcloud(user_input)}' | |
| st.image(wordcloud_img, use_column_width=True) | |
| # Sentence-level analysis | |
| try: | |
| sentences = nltk.sent_tokenize(user_input) | |
| if len(sentences) > 1: | |
| st.subheader("๐ Sentence-level Analysis") | |
| dates = [datetime.now() - timedelta(minutes=len(sentences)-i) for i in range(len(sentences))] | |
| sentence_data = [analyze_text(s) for s in sentences] | |
| df = pd.DataFrame({ | |
| "Sentence": sentences, | |
| "VADER": [d[0] for d in sentence_data], | |
| "BERT": [d[1] for d in sentence_data], | |
| "TextBlob": [d[2] for d in sentence_data] | |
| }) | |
| df["Average"] = df[["VADER", "BERT", "TextBlob"]].mean(axis=1) | |
| st.dataframe(df.style.background_gradient( | |
| cmap='RdYlGn', | |
| subset=["VADER", "BERT", "TextBlob", "Average"], | |
| vmin=-1, vmax=1 | |
| ), use_container_width=True) | |
| plot_sentiment_trends(df, "Your Text") | |
| except Exception as e: | |
| st.error(f"Error in sentence analysis: {str(e)}") | |
| else: # Live Data mode | |
| if not keyword: | |
| st.warning("Please enter a keyword to search") | |
| return | |
| # Fetch data | |
| with st.spinner(f"Fetching data for '{keyword}'..."): | |
| reddit_df = fetch_reddit_data(keyword) | |
| youtube_df = fetch_youtube_data(keyword) | |
| if reddit_df.empty and youtube_df.empty: | |
| st.error("No data found. Try a different keyword.") | |
| return | |
| df = pd.concat([reddit_df, youtube_df], ignore_index=True) | |
| # Analyze sentiment for each item | |
| with st.spinner("Analyzing sentiment..."): | |
| results = [] | |
| for _, row in df.iterrows(): | |
| vader, bert, textblob, _ = analyze_text(row['text']) | |
| results.append((vader, bert, textblob)) | |
| df['VADER'] = [r[0] for r in results] | |
| df['BERT'] = [r[1] for r in results] | |
| df['TextBlob'] = [r[2] for r in results] | |
| df['Average'] = df[['VADER', 'BERT', 'TextBlob']].mean(axis=1) | |
| # Display results | |
| st.subheader(f"๐ Overall Sentiment for '{keyword}'") | |
| # Metrics | |
| avg_sentiment = df['Average'].mean() | |
| pos_pct = len(df[df['Average'] > 0.1]) / len(df) * 100 | |
| neg_pct = len(df[df['Average'] < -0.1]) / len(df) * 100 | |
| cols = st.columns(3) | |
| cols[0].metric("Average Sentiment", f"{avg_sentiment:.2f}", | |
| "Positive" if avg_sentiment > 0 else "Negative" if avg_sentiment < 0 else "Neutral") | |
| cols[1].metric("Positive Content", f"{pos_pct:.1f}%") | |
| cols[2].metric("Negative Content", f"{neg_pct:.1f}%") | |
| # Word cloud | |
| st.subheader("๐ Word Cloud") | |
| combined_text = " ".join(df['text']) | |
| wordcloud_img = f'data:image/png;base64,{generate_wordcloud(combined_text)}' | |
| st.image(wordcloud_img, use_container_width=True) | |
| # Filter recent data (last 14 days) | |
| df['date'] = pd.to_datetime(df['date']) | |
| cutoff_date = datetime.now() - timedelta(days=14) | |
| df_recent = df[df['date'] >= cutoff_date].sort_values('date') | |
| if not df_recent.empty: | |
| # Sentiment trends | |
| st.subheader("๐ Sentiment Trends (Last 14 Days)") | |
| plot_sentiment_trends(df_recent, keyword) | |
| # Sentiment distribution | |
| st.subheader("๐ Sentiment Distribution") | |
| plot_sentiment_distribution(df_recent, keyword) | |
| # Sentiment prediction | |
| if enable_prediction and len(df_recent) >= 7: # Need at least 7 days of data | |
| st.subheader("๐ฎ Sentiment Prediction (Next 15 Days)") | |
| with st.spinner("Training prediction models..."): | |
| # Prepare time series data | |
| ts_df = prepare_time_series_data(df_recent) | |
| if ts_df is not None and len(ts_df) >= 7: | |
| # Get predictions from different models | |
| predictions = { | |
| 'Prophet': predict_sentiment_prophet(ts_df), | |
| 'ARIMA': predict_sentiment_arima(ts_df), | |
| 'Random Forest': predict_sentiment_rf(ts_df) | |
| } | |
| # Filter out None predictions | |
| valid_predictions = {k: v for k, v in predictions.items() if v is not None} | |
| if valid_predictions: | |
| # Plot predictions | |
| fig = plot_sentiment_predictions(df_recent, valid_predictions) | |
| if fig: | |
| st.plotly_chart(fig, use_container_width=True) | |
| # Show prediction details | |
| st.subheader("๐ Prediction Details") | |
| for model_name, pred_df in valid_predictions.items(): | |
| st.markdown(f"**{model_name} Prediction**") | |
| st.dataframe(pred_df.set_index('date').style.format("{:.2f}"), use_container_width=True) | |
| else: | |
| st.warning("Could not generate predictions with the available data.") | |
| else: | |
| st.warning("Not enough data points for reliable prediction. Need at least 7 days of data.") | |
| # Raw data (if enabled) | |
| if show_raw_data: | |
| st.subheader("๐ Raw Data") | |
| st.dataframe(df_recent[['date', 'source', 'text', 'Average']], use_container_width=True) | |
| else: | |
| st.info("No recent data found (within last 14 days).") | |
| if __name__ == "__main__": | |
| main() |