KYTHY's picture
Update app.py
023dc07 verified
raw
history blame
12.9 kB
import streamlit as st
from transformers import pipeline
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import pandas as pd
from datetime import datetime, timedelta
import plotly.graph_objects as go
from wordcloud import WordCloud
import base64
from io import BytesIO
import nltk
from textblob import TextBlob
import os
import time
from functools import lru_cache
import numpy as np
from sklearn.linear_model import Ridge
from sklearn.preprocessing import PolynomialFeatures
from sklearn.pipeline import make_pipeline
import feedparser
# --------------------------
# Initial Setup
# --------------------------
st.set_page_config(
page_title="πŸš€ SentimentSync Pro",
page_icon="πŸ“ˆ",
layout="wide"
)
# --------------------------
# Performance Optimizations
# --------------------------
@st.cache_resource
def load_models():
"""Load models with progress indicators"""
progress = st.progress(0, text="Loading sentiment models...")
try:
with st.spinner("Loading BERT model..."):
bert_sentiment = pipeline(
"sentiment-analysis",
model="nlptown/bert-base-multilingual-uncased-sentiment"
)
progress.progress(50)
with st.spinner("Loading VADER analyzer..."):
vader_analyzer = SentimentIntensityAnalyzer()
progress.progress(100)
return bert_sentiment, vader_analyzer
except Exception as e:
st.error(f"Model loading failed: {str(e)}")
return None, None
# --------------------------
# Fetch Financial News
# --------------------------
@st.cache_data(ttl=3600, show_spinner="Fetching financial news...")
def fetch_financial_news(keyword, limit=30):
"""Fetch recent financial news (past 7 days) using Google News RSS"""
try:
base_url = "https://news.google.com/rss/search"
query = f"{keyword}+finance+stock"
feed_url = f"{base_url}?q={query}&hl=en-US&gl=US&ceid=US:en"
feed = feedparser.parse(feed_url)
seven_days_ago = datetime.now() - timedelta(days=7)
articles = []
for entry in feed.entries:
published = None
if hasattr(entry, 'published_parsed') and entry.published_parsed:
published = datetime(*entry.published_parsed[:6])
elif hasattr(entry, 'updated_parsed') and entry.updated_parsed:
published = datetime(*entry.updated_parsed[:6])
else:
continue
if published < seven_days_ago:
continue
text = f"{entry.title}\n{entry.summary}" if hasattr(entry, 'summary') else entry.title
articles.append({
'date': published,
'text': text,
'source': 'Financial News',
'url': entry.link
})
if len(articles) >= limit:
break
return pd.DataFrame(articles)
except Exception as e:
st.error(f"News fetch error: {str(e)}")
return pd.DataFrame()
# --------------------------
# Sentiment Analysis
# --------------------------
def analyze_text(text, models):
bert_sentiment, vader_analyzer = models
truncated_text = text[:2000] if text else ""
try:
if not truncated_text.strip():
return {
'vader': 0,
'bert': 0,
'textblob': 0,
'bert_label': 'Neutral',
'bert_confidence': 0
}
vader_score = vader_analyzer.polarity_scores(truncated_text)['compound']
textblob_score = TextBlob(truncated_text).sentiment.polarity
bert_result = bert_sentiment(truncated_text[:512])[0]
label_map = {
'1 star': -1,
'2 stars': -0.5,
'3 stars': 0,
'4 stars': 0.5,
'5 stars': 1
}
bert_num = label_map.get(bert_result['label'], 0)
return {
'vader': vader_score,
'bert': bert_num,
'textblob': textblob_score,
'bert_label': bert_result['label'],
'bert_confidence': bert_result['score']
}
except Exception as e:
st.error(f"Analysis error: {str(e)}")
return {
'vader': 0,
'bert': 0,
'textblob': 0,
'bert_label': 'Error',
'bert_confidence': 0
}
# --------------------------
# Visualization
# --------------------------
def generate_wordcloud(text):
try:
if not text.strip():
return ""
wordcloud = WordCloud(
width=800,
height=400,
background_color='white',
collocations=False,
stopwords=nltk.corpus.stopwords.words('english')
).generate(text)
img = BytesIO()
wordcloud.to_image().save(img, format='PNG')
return base64.b64encode(img.getvalue()).decode()
except Exception as e:
st.error(f"Word cloud generation error: {str(e)}")
return ""
# --------------------------
# Prediction & Plotting
# --------------------------
def prepare_data_for_prediction(data):
try:
if data.empty:
st.warning("No data available for prediction")
return None
data = data.sort_values('date')
data = data.dropna(subset=['average'])
daily_data = data.groupby(pd.Grouper(key='date', freq='D'))['average'].mean().reset_index()
daily_data = daily_data.dropna(subset=['average'])
if len(daily_data) < 5:
st.warning("Insufficient valid data points for prediction (minimum 5 required)")
return None
daily_data['days'] = (daily_data['date'] - daily_data['date'].min()).dt.days
return daily_data
except Exception as e:
st.error(f"Data preparation error: {str(e)}")
return None
def train_sentiment_model(data):
try:
if data is None or len(data) < 5:
return None, None
X = data['days'].values.reshape(-1, 1)
y = data['average'].values
model = make_pipeline(PolynomialFeatures(degree=2), Ridge(alpha=1.0))
model.fit(X, y)
return model, data
except Exception as e:
st.error(f"Model training error: {str(e)}")
return None, None
def predict_future_sentiment(model, training_data, days_to_predict=15):
try:
if model is None or training_data is None:
return None
last_date = training_data['date'].max()
future_dates = [last_date + timedelta(days=i) for i in range(1, days_to_predict + 1)]
min_date = training_data['date'].min()
future_days = [(date - min_date).days for date in future_dates]
X_future = np.array(future_days).reshape(-1, 1)
predictions = model.predict(X_future)
pred_df = pd.DataFrame({
'date': future_dates,
'average': predictions,
'type': 'prediction'
})
training_df = training_data.copy()
training_df['type'] = 'actual'
return pd.concat([training_df, pred_df], ignore_index=True)
except Exception as e:
st.error(f"Prediction error: {str(e)}")
return None
def plot_sentiment(data, keyword):
try:
if data is None or data.empty:
st.warning("No data available for plotting sentiment trends")
return None
actual_data = data[data['type'] == 'actual']
pred_data = data[data['type'] == 'prediction']
fig = go.Figure()
if not actual_data.empty:
fig.add_trace(go.Scatter(
x=actual_data['date'],
y=actual_data['average'],
name='Actual Sentiment',
mode='lines+markers',
line=dict(color='#636EFA')
))
if not pred_data.empty:
fig.add_trace(go.Scatter(
x=pred_data['date'],
y=pred_data['average'],
name='Predicted Sentiment',
mode='lines+markers',
line=dict(color='#EF553B', dash='dot')
))
fig.update_layout(
title=f'Sentiment Analysis and Prediction for "{keyword}"',
xaxis_title="Date",
yaxis_title="Sentiment Score",
hovermode="x unified",
legend_title="Data Type"
)
return fig
except Exception as e:
st.error(f"Plotting error: {str(e)}")
return None
# --------------------------
# Main App
# --------------------------
def main():
st.title("πŸš€ SentimentSync Pro - Financial News Sentiment Dashboard")
with st.sidebar:
st.header("πŸ”§ Analysis Controls")
analysis_mode = st.radio(
"Mode",
["Text Analysis", "Financial News Analysis"],
index=1
)
if analysis_mode == "Text Analysis":
user_input = st.text_area("Enter text to analyze", height=200, placeholder="Paste your content here...")
analyze_btn = st.button("Analyze Now")
else:
keyword = st.text_input("Enter keyword (e.g., Apple, Tesla, Bitcoin)")
analyze_btn = st.button("Fetch & Analyze")
st.markdown("---")
show_details = st.checkbox("Show detailed results", value=False)
enable_prediction = st.checkbox("Enable sentiment prediction", value=True)
st.markdown("---")
if analyze_btn:
models = load_models()
if not all(models):
st.error("Model loading failed")
return
if analysis_mode == "Text Analysis":
if not user_input.strip():
st.warning("Please enter some text")
return
with st.spinner("Analyzing..."):
result = analyze_text(user_input, models)
st.success("βœ… Analysis completed")
cols = st.columns(3)
cols[0].metric("VADER Score", f"{result['vader']:.2f}")
cols[1].metric("BERT Label", result['bert_label'])
cols[2].metric("TextBlob", f"{result['textblob']:.2f}")
st.subheader("πŸ“Š Word Cloud")
wc_img = f"data:image/png;base64,{generate_wordcloud(user_input)}"
st.image(wc_img, use_column_width=True)
else:
if not keyword.strip():
st.warning("Please enter a keyword")
return
with st.spinner(f"Fetching financial news for '{keyword}'..."):
start_time = time.time()
news_data = fetch_financial_news(keyword)
if news_data.empty:
st.error("No news found for the past 7 days.")
return
analysis_results = []
for _, row in news_data.iterrows():
analysis_results.append(analyze_text(row['text'], models))
news_data['vader'] = [r['vader'] for r in analysis_results]
news_data['bert'] = [r['bert'] for r in analysis_results]
news_data['textblob'] = [r['textblob'] for r in analysis_results]
news_data['average'] = news_data[['vader', 'bert', 'textblob']].mean(axis=1)
processing_time = time.time() - start_time
st.success(f"Analyzed {len(news_data)} articles in {processing_time:.2f}s")
avg_sentiment = news_data['average'].mean()
cols = st.columns(3)
cols[0].metric("Avg Sentiment", f"{avg_sentiment:.2f}")
cols[1].metric("Positive", f"{(news_data['average'] > 0.1).mean() * 100:.1f}%")
cols[2].metric("Negative", f"{(news_data['average'] < -0.1).mean() * 100:.1f}%")
all_text = " ".join(news_data['text'])
wc_img = f"data:image/png;base64,{generate_wordcloud(all_text)}"
st.subheader("πŸ“Š Word Cloud")
st.image(wc_img, use_column_width=True)
if enable_prediction:
daily_data = prepare_data_for_prediction(news_data)
model, training_data = train_sentiment_model(daily_data)
if model is not None:
full_data = predict_future_sentiment(model, training_data)
fig = plot_sentiment(full_data, keyword)
st.plotly_chart(fig, use_container_width=True)
if show_details:
st.subheader("πŸ“° Detailed News Data")
st.dataframe(news_data[['date', 'source', 'text', 'average', 'url']], use_container_width=True)
if __name__ == "__main__":
try:
nltk.data.path.append(os.path.join(os.path.expanduser("~"), "nltk_data"))
nltk.download('stopwords', quiet=True)
except:
pass
main()