Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| from transformers import pipeline | |
| from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer | |
| import pandas as pd | |
| from datetime import datetime, timedelta | |
| import plotly.graph_objects as go | |
| from wordcloud import WordCloud | |
| import base64 | |
| from io import BytesIO | |
| import nltk | |
| from textblob import TextBlob | |
| import os | |
| import time | |
| from functools import lru_cache | |
| import numpy as np | |
| from sklearn.linear_model import Ridge | |
| from sklearn.preprocessing import PolynomialFeatures | |
| from sklearn.pipeline import make_pipeline | |
| import feedparser | |
| # -------------------------- | |
| # Initial Setup | |
| # -------------------------- | |
| st.set_page_config( | |
| page_title="π SentimentSync Pro", | |
| page_icon="π", | |
| layout="wide" | |
| ) | |
| # -------------------------- | |
| # Performance Optimizations | |
| # -------------------------- | |
| def load_models(): | |
| """Load models with progress indicators""" | |
| progress = st.progress(0, text="Loading sentiment models...") | |
| try: | |
| with st.spinner("Loading BERT model..."): | |
| bert_sentiment = pipeline( | |
| "sentiment-analysis", | |
| model="nlptown/bert-base-multilingual-uncased-sentiment" | |
| ) | |
| progress.progress(50) | |
| with st.spinner("Loading VADER analyzer..."): | |
| vader_analyzer = SentimentIntensityAnalyzer() | |
| progress.progress(100) | |
| return bert_sentiment, vader_analyzer | |
| except Exception as e: | |
| st.error(f"Model loading failed: {str(e)}") | |
| return None, None | |
| # -------------------------- | |
| # Fetch Financial News | |
| # -------------------------- | |
| def fetch_financial_news(keyword, limit=30): | |
| """Fetch recent financial news (past 7 days) using Google News RSS""" | |
| try: | |
| base_url = "https://news.google.com/rss/search" | |
| query = f"{keyword}+finance+stock" | |
| feed_url = f"{base_url}?q={query}&hl=en-US&gl=US&ceid=US:en" | |
| feed = feedparser.parse(feed_url) | |
| seven_days_ago = datetime.now() - timedelta(days=7) | |
| articles = [] | |
| for entry in feed.entries: | |
| published = None | |
| if hasattr(entry, 'published_parsed') and entry.published_parsed: | |
| published = datetime(*entry.published_parsed[:6]) | |
| elif hasattr(entry, 'updated_parsed') and entry.updated_parsed: | |
| published = datetime(*entry.updated_parsed[:6]) | |
| else: | |
| continue | |
| if published < seven_days_ago: | |
| continue | |
| text = f"{entry.title}\n{entry.summary}" if hasattr(entry, 'summary') else entry.title | |
| articles.append({ | |
| 'date': published, | |
| 'text': text, | |
| 'source': 'Financial News', | |
| 'url': entry.link | |
| }) | |
| if len(articles) >= limit: | |
| break | |
| return pd.DataFrame(articles) | |
| except Exception as e: | |
| st.error(f"News fetch error: {str(e)}") | |
| return pd.DataFrame() | |
| # -------------------------- | |
| # Sentiment Analysis | |
| # -------------------------- | |
| def analyze_text(text, models): | |
| bert_sentiment, vader_analyzer = models | |
| truncated_text = text[:2000] if text else "" | |
| try: | |
| if not truncated_text.strip(): | |
| return { | |
| 'vader': 0, | |
| 'bert': 0, | |
| 'textblob': 0, | |
| 'bert_label': 'Neutral', | |
| 'bert_confidence': 0 | |
| } | |
| vader_score = vader_analyzer.polarity_scores(truncated_text)['compound'] | |
| textblob_score = TextBlob(truncated_text).sentiment.polarity | |
| bert_result = bert_sentiment(truncated_text[:512])[0] | |
| label_map = { | |
| '1 star': -1, | |
| '2 stars': -0.5, | |
| '3 stars': 0, | |
| '4 stars': 0.5, | |
| '5 stars': 1 | |
| } | |
| bert_num = label_map.get(bert_result['label'], 0) | |
| return { | |
| 'vader': vader_score, | |
| 'bert': bert_num, | |
| 'textblob': textblob_score, | |
| 'bert_label': bert_result['label'], | |
| 'bert_confidence': bert_result['score'] | |
| } | |
| except Exception as e: | |
| st.error(f"Analysis error: {str(e)}") | |
| return { | |
| 'vader': 0, | |
| 'bert': 0, | |
| 'textblob': 0, | |
| 'bert_label': 'Error', | |
| 'bert_confidence': 0 | |
| } | |
| # -------------------------- | |
| # Visualization | |
| # -------------------------- | |
| def generate_wordcloud(text): | |
| try: | |
| if not text.strip(): | |
| return "" | |
| wordcloud = WordCloud( | |
| width=800, | |
| height=400, | |
| background_color='white', | |
| collocations=False, | |
| stopwords=nltk.corpus.stopwords.words('english') | |
| ).generate(text) | |
| img = BytesIO() | |
| wordcloud.to_image().save(img, format='PNG') | |
| return base64.b64encode(img.getvalue()).decode() | |
| except Exception as e: | |
| st.error(f"Word cloud generation error: {str(e)}") | |
| return "" | |
| # -------------------------- | |
| # Prediction & Plotting | |
| # -------------------------- | |
| def prepare_data_for_prediction(data): | |
| try: | |
| if data.empty: | |
| st.warning("No data available for prediction") | |
| return None | |
| data = data.sort_values('date') | |
| data = data.dropna(subset=['average']) | |
| daily_data = data.groupby(pd.Grouper(key='date', freq='D'))['average'].mean().reset_index() | |
| daily_data = daily_data.dropna(subset=['average']) | |
| if len(daily_data) < 5: | |
| st.warning("Insufficient valid data points for prediction (minimum 5 required)") | |
| return None | |
| daily_data['days'] = (daily_data['date'] - daily_data['date'].min()).dt.days | |
| return daily_data | |
| except Exception as e: | |
| st.error(f"Data preparation error: {str(e)}") | |
| return None | |
| def train_sentiment_model(data): | |
| try: | |
| if data is None or len(data) < 5: | |
| return None, None | |
| X = data['days'].values.reshape(-1, 1) | |
| y = data['average'].values | |
| model = make_pipeline(PolynomialFeatures(degree=2), Ridge(alpha=1.0)) | |
| model.fit(X, y) | |
| return model, data | |
| except Exception as e: | |
| st.error(f"Model training error: {str(e)}") | |
| return None, None | |
| def predict_future_sentiment(model, training_data, days_to_predict=15): | |
| try: | |
| if model is None or training_data is None: | |
| return None | |
| last_date = training_data['date'].max() | |
| future_dates = [last_date + timedelta(days=i) for i in range(1, days_to_predict + 1)] | |
| min_date = training_data['date'].min() | |
| future_days = [(date - min_date).days for date in future_dates] | |
| X_future = np.array(future_days).reshape(-1, 1) | |
| predictions = model.predict(X_future) | |
| pred_df = pd.DataFrame({ | |
| 'date': future_dates, | |
| 'average': predictions, | |
| 'type': 'prediction' | |
| }) | |
| training_df = training_data.copy() | |
| training_df['type'] = 'actual' | |
| return pd.concat([training_df, pred_df], ignore_index=True) | |
| except Exception as e: | |
| st.error(f"Prediction error: {str(e)}") | |
| return None | |
| def plot_sentiment(data, keyword): | |
| try: | |
| if data is None or data.empty: | |
| st.warning("No data available for plotting sentiment trends") | |
| return None | |
| actual_data = data[data['type'] == 'actual'] | |
| pred_data = data[data['type'] == 'prediction'] | |
| fig = go.Figure() | |
| if not actual_data.empty: | |
| fig.add_trace(go.Scatter( | |
| x=actual_data['date'], | |
| y=actual_data['average'], | |
| name='Actual Sentiment', | |
| mode='lines+markers', | |
| line=dict(color='#636EFA') | |
| )) | |
| if not pred_data.empty: | |
| fig.add_trace(go.Scatter( | |
| x=pred_data['date'], | |
| y=pred_data['average'], | |
| name='Predicted Sentiment', | |
| mode='lines+markers', | |
| line=dict(color='#EF553B', dash='dot') | |
| )) | |
| fig.update_layout( | |
| title=f'Sentiment Analysis and Prediction for "{keyword}"', | |
| xaxis_title="Date", | |
| yaxis_title="Sentiment Score", | |
| hovermode="x unified", | |
| legend_title="Data Type" | |
| ) | |
| return fig | |
| except Exception as e: | |
| st.error(f"Plotting error: {str(e)}") | |
| return None | |
| # -------------------------- | |
| # Main App | |
| # -------------------------- | |
| def main(): | |
| st.title("π SentimentSync Pro - Financial News Sentiment Dashboard") | |
| with st.sidebar: | |
| st.header("π§ Analysis Controls") | |
| analysis_mode = st.radio( | |
| "Mode", | |
| ["Text Analysis", "Financial News Analysis"], | |
| index=1 | |
| ) | |
| if analysis_mode == "Text Analysis": | |
| user_input = st.text_area("Enter text to analyze", height=200, placeholder="Paste your content here...") | |
| analyze_btn = st.button("Analyze Now") | |
| else: | |
| keyword = st.text_input("Enter keyword (e.g., Apple, Tesla, Bitcoin)") | |
| analyze_btn = st.button("Fetch & Analyze") | |
| st.markdown("---") | |
| show_details = st.checkbox("Show detailed results", value=False) | |
| enable_prediction = st.checkbox("Enable sentiment prediction", value=True) | |
| st.markdown("---") | |
| if analyze_btn: | |
| models = load_models() | |
| if not all(models): | |
| st.error("Model loading failed") | |
| return | |
| if analysis_mode == "Text Analysis": | |
| if not user_input.strip(): | |
| st.warning("Please enter some text") | |
| return | |
| with st.spinner("Analyzing..."): | |
| result = analyze_text(user_input, models) | |
| st.success("β Analysis completed") | |
| cols = st.columns(3) | |
| cols[0].metric("VADER Score", f"{result['vader']:.2f}") | |
| cols[1].metric("BERT Label", result['bert_label']) | |
| cols[2].metric("TextBlob", f"{result['textblob']:.2f}") | |
| st.subheader("π Word Cloud") | |
| wc_img = f"data:image/png;base64,{generate_wordcloud(user_input)}" | |
| st.image(wc_img, use_column_width=True) | |
| else: | |
| if not keyword.strip(): | |
| st.warning("Please enter a keyword") | |
| return | |
| with st.spinner(f"Fetching financial news for '{keyword}'..."): | |
| start_time = time.time() | |
| news_data = fetch_financial_news(keyword) | |
| if news_data.empty: | |
| st.error("No news found for the past 7 days.") | |
| return | |
| analysis_results = [] | |
| for _, row in news_data.iterrows(): | |
| analysis_results.append(analyze_text(row['text'], models)) | |
| news_data['vader'] = [r['vader'] for r in analysis_results] | |
| news_data['bert'] = [r['bert'] for r in analysis_results] | |
| news_data['textblob'] = [r['textblob'] for r in analysis_results] | |
| news_data['average'] = news_data[['vader', 'bert', 'textblob']].mean(axis=1) | |
| processing_time = time.time() - start_time | |
| st.success(f"Analyzed {len(news_data)} articles in {processing_time:.2f}s") | |
| avg_sentiment = news_data['average'].mean() | |
| cols = st.columns(3) | |
| cols[0].metric("Avg Sentiment", f"{avg_sentiment:.2f}") | |
| cols[1].metric("Positive", f"{(news_data['average'] > 0.1).mean() * 100:.1f}%") | |
| cols[2].metric("Negative", f"{(news_data['average'] < -0.1).mean() * 100:.1f}%") | |
| all_text = " ".join(news_data['text']) | |
| wc_img = f"data:image/png;base64,{generate_wordcloud(all_text)}" | |
| st.subheader("π Word Cloud") | |
| st.image(wc_img, use_column_width=True) | |
| if enable_prediction: | |
| daily_data = prepare_data_for_prediction(news_data) | |
| model, training_data = train_sentiment_model(daily_data) | |
| if model is not None: | |
| full_data = predict_future_sentiment(model, training_data) | |
| fig = plot_sentiment(full_data, keyword) | |
| st.plotly_chart(fig, use_container_width=True) | |
| if show_details: | |
| st.subheader("π° Detailed News Data") | |
| st.dataframe(news_data[['date', 'source', 'text', 'average', 'url']], use_container_width=True) | |
| if __name__ == "__main__": | |
| try: | |
| nltk.data.path.append(os.path.join(os.path.expanduser("~"), "nltk_data")) | |
| nltk.download('stopwords', quiet=True) | |
| except: | |
| pass | |
| main() | |