Soundaryasos's picture
Update app.py
7e649a4 verified
raw
history blame
16.4 kB
import streamlit as st
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import nltk
from nltk.sentiment import SentimentIntensityAnalyzer
from textblob import TextBlob
from transformers import pipeline
from wordcloud import WordCloud
import base64
from io import BytesIO
import plotly.express as px
import praw
from googleapiclient.discovery import build
from sklearn.linear_model import Ridge
import os
# --------------------------
# Initial Setup
# --------------------------
# Configure page
st.set_page_config(
page_title="SentimentSync Pro",
page_icon="📈",
layout="wide",
initial_sidebar_state="expanded"
)
# --------------------------
# Configuration
# --------------------------
class Config:
# API Keys - Replace with your actual keys or use environment variables
YOUTUBE_API_KEY = os.getenv("YT_API_KEY", "your_youtube_api_key_here")
REDDIT_CLIENT_ID = os.getenv("REDDIT_CLIENT_ID", "your_reddit_client_id")
REDDIT_CLIENT_SECRET = os.getenv("REDDIT_CLIENT_SECRET", "your_reddit_secret")
REDDIT_USER_AGENT = "SentimentAnalysisBot/1.0"
# NLTK data path
NLTK_DATA_PATH = os.path.join(os.path.expanduser("~"), "nltk_data")
# Sentiment thresholds
POSITIVE_THRESHOLD = 0.1
NEGATIVE_THRESHOLD = -0.1
# --------------------------
# Initialize Resources
# --------------------------
def initialize_resources():
"""Initialize all required resources with proper error handling"""
try:
# Setup NLTK data
os.makedirs(Config.NLTK_DATA_PATH, exist_ok=True)
nltk.data.path.append(Config.NLTK_DATA_PATH)
required_nltk = ['punkt', 'stopwords', 'vader_lexicon']
for package in required_nltk:
try:
nltk.data.find(f'tokenizers/{package}')
except LookupError:
nltk.download(package, download_dir=Config.NLTK_DATA_PATH)
except Exception as e:
st.error(f"NLTK initialization failed: {str(e)}")
return False
# Initialize sentiment analyzers
try:
st.session_state.vader = SentimentIntensityAnalyzer()
st.session_state.bert = pipeline(
"sentiment-analysis",
model="nlptown/bert-base-multilingual-uncased-sentiment"
)
except Exception as e:
st.error(f"Model initialization failed: {str(e)}")
return False
# Initialize API clients
try:
st.session_state.reddit = praw.Reddit(
client_id=Config.REDDIT_CLIENT_ID,
client_secret=Config.REDDIT_CLIENT_SECRET,
user_agent=Config.REDDIT_USER_AGENT
)
except Exception as e:
st.error(f"Reddit client initialization failed: {str(e)}")
st.session_state.reddit = None
try:
if Config.YOUTUBE_API_KEY.startswith("your_"):
st.session_state.youtube = None
else:
st.session_state.youtube = build(
'youtube',
'v3',
developerKey=Config.YOUTUBE_API_KEY,
cache_discovery=False
)
except Exception as e:
st.error(f"YouTube client initialization failed: {str(e)}")
st.session_state.youtube = None
return True
if not initialize_resources():
st.error("Critical initialization failed. Check error messages above.")
st.stop()
# --------------------------
# Core Functions
# --------------------------
def analyze_sentiment(text):
"""Analyze text using multiple sentiment models"""
results = {
'vader': 0,
'bert': 0,
'textblob': 0,
'bert_label': 'Error',
'bert_score': 0
}
try:
# VADER
results['vader'] = st.session_state.vader.polarity_scores(text)['compound']
# BERT (with truncation for long texts)
bert_result = st.session_state.bert(text[:512])[0]
results['bert_label'] = bert_result['label']
results['bert_score'] = bert_result['score']
# Convert BERT label to numeric score
label_map = {
'1 star': -1,
'2 stars': -0.5,
'3 stars': 0,
'4 stars': 0.5,
'5 stars': 1
}
results['bert'] = label_map.get(bert_result['label'], 0)
# TextBlob
results['textblob'] = TextBlob(text).sentiment.polarity
except Exception as e:
st.error(f"Sentiment analysis error: {str(e)}")
return results
def fetch_youtube_data(keyword, max_results=25):
"""Fetch YouTube data with enhanced error handling"""
if st.session_state.youtube is None:
st.warning("YouTube API not configured")
return pd.DataFrame()
try:
# Search for videos
search_response = st.session_state.youtube.search().list(
q=keyword,
part="snippet",
maxResults=max_results,
type="video",
order="relevance",
safeSearch="moderate"
).execute()
# Get video details
video_ids = [item['id']['videoId'] for item in search_response['items']]
videos_response = st.session_state.youtube.videos().list(
part="snippet,statistics",
id=",".join(video_ids)
).execute()
# Process results
data = []
for item in videos_response['items']:
snippet = item['snippet']
stats = item.get('statistics', {})
data.append({
'source': 'YouTube',
'date': datetime.strptime(snippet['publishedAt'], '%Y-%m-%dT%H:%M:%SZ'),
'title': snippet['title'],
'text': f"{snippet['title']}\n{snippet['description']}",
'url': f"https://youtu.be/{item['id']}",
'views': int(stats.get('viewCount', 0)),
'likes': int(stats.get('likeCount', 0)),
'comments': int(stats.get('commentCount', 0)),
'thumbnail': snippet['thumbnails']['default']['url']
})
return pd.DataFrame(data)
except Exception as e:
st.error(f"Error fetching YouTube data: {str(e)}")
return pd.DataFrame()
def fetch_reddit_data(keyword, limit=50):
"""Fetch Reddit posts with error handling"""
if st.session_state.reddit is None:
st.warning("Reddit API not configured")
return pd.DataFrame()
try:
posts = st.session_state.reddit.subreddit("all").search(
query=keyword,
limit=limit,
time_filter="month"
)
data = []
for post in posts:
data.append({
'source': 'Reddit',
'date': datetime.fromtimestamp(post.created_utc),
'title': post.title,
'text': f"{post.title}\n\n{post.selftext}",
'url': f"https://reddit.com{post.permalink}",
'upvotes': post.score,
'comments': post.num_comments,
'thumbnail': post.thumbnail if post.thumbnail not in ['self', 'default'] else None
})
return pd.DataFrame(data)
except Exception as e:
st.error(f"Error fetching Reddit data: {str(e)}")
return pd.DataFrame()
# --------------------------
# Visualization Functions
# --------------------------
def create_wordcloud(text):
"""Generate a word cloud with proper error handling"""
try:
wc = WordCloud(
width=800,
height=400,
background_color='white',
stopwords=set(nltk.corpus.stopwords.words('english')),
collocations=False
).generate(text)
img = BytesIO()
wc.to_image().save(img, format='PNG')
return base64.b64encode(img.getvalue()).decode()
except Exception as e:
st.error(f"Word cloud error: {str(e)}")
return None
def plot_sentiment_timeline(df):
"""Interactive timeline plot of sentiment"""
try:
fig = px.line(
df,
x='date',
y='average_sentiment',
color='source',
title='Sentiment Over Time',
labels={'average_sentiment': 'Sentiment Score', 'date': 'Date'},
hover_data=['title', 'source', 'url'],
template='plotly_white'
)
fig.update_traces(mode='markers+lines')
fig.update_layout(hovermode='x unified')
st.plotly_chart(fig, use_container_width=True)
except Exception as e:
st.error(f"Plotting error: {str(e)}")
# --------------------------
# UI Components
# --------------------------
def sidebar_controls():
"""Render sidebar controls"""
with st.sidebar:
st.title("🔧 Controls")
analysis_mode = st.radio(
"Analysis Mode",
["Text Input", "Live Data"],
index=0,
key='analysis_mode'
)
if st.session_state.analysis_mode == "Text Input":
st.session_state.user_text = st.text_area(
"Enter your text:",
height=200,
placeholder="Type or paste text here..."
)
else:
st.session_state.search_keyword = st.text_input(
"Search keyword:",
placeholder="e.g., Tesla, AI, etc."
)
col1, col2 = st.columns(2)
st.session_state.use_reddit = col1.checkbox("Reddit", True)
st.session_state.use_youtube = col2.checkbox("YouTube", True)
st.session_state.max_results = st.slider(
"Max results per source:",
10, 100, 25
)
st.markdown("---")
if st.button("Analyze", type="primary"):
st.session_state.analyze_clicked = True
if st.button("Reset"):
st.session_state.clear()
st.rerun()
# --------------------------
# Main App
# --------------------------
def main():
st.title("📊 SentimentSync Pro")
st.caption("Advanced sentiment analysis across multiple platforms")
sidebar_controls()
if not hasattr(st.session_state, 'analyze_clicked') or not st.session_state.analyze_clicked:
st.info("Configure your analysis using the sidebar controls")
return
# Perform analysis based on selected mode
if st.session_state.analysis_mode == "Text Input":
analyze_text_input()
else:
analyze_live_data()
def analyze_text_input():
"""Analyze manually entered text"""
if not st.session_state.user_text or len(st.session_state.user_text.strip()) < 10:
st.warning("Please enter at least 10 characters of text")
return
with st.spinner("Analyzing text..."):
# Overall sentiment
sentiment = analyze_sentiment(st.session_state.user_text)
# Display results
col1, col2, col3 = st.columns(3)
col1.metric("VADER Score", f"{sentiment['vader']:.2f}",
delta_color="inverse" if sentiment['vader'] < 0 else "normal")
col2.metric("BERT Sentiment", sentiment['bert_label'], f"{sentiment['bert_score']:.2f}")
col3.metric("TextBlob Score", f"{sentiment['textblob']:.2f}")
# Word cloud
st.subheader("Word Cloud")
wc_img = create_wordcloud(st.session_state.user_text)
if wc_img:
st.image(f"data:image/png;base64,{wc_img}", use_container_width=True)
# Sentence-level analysis
try:
sentences = nltk.sent_tokenize(st.session_state.user_text)
if len(sentences) > 1:
st.subheader("Sentence Breakdown")
sent_data = []
for i, sent in enumerate(sentences):
sent_sentiment = analyze_sentiment(sent)
sent_data.append({
'Sentence': sent[:150] + ("..." if len(sent) > 150 else ""),
'VADER': sent_sentiment['vader'],
'BERT': sent_sentiment['bert'],
'TextBlob': sent_sentiment['textblob'],
'Average': np.mean([
sent_sentiment['vader'],
sent_sentiment['bert'],
sent_sentiment['textblob']
])
})
sent_df = pd.DataFrame(sent_data)
# Fixed the dataframe display with proper parenthesis closure
styled_df = sent_df.style.background_gradient(
cmap='RdYlGn',
subset=['VADER', 'BERT', 'TextBlob', 'Average'],
vmin=-1,
vmax=1
)
st.dataframe(
styled_df,
use_container_width=True,
height=min(400, 35 * len(sent_df))
except Exception as e:
st.error(f"Sentence analysis error: {str(e)}")
def analyze_live_data():
"""Analyze live data from APIs"""
if not st.session_state.search_keyword:
st.warning("Please enter a search keyword")
return
if not st.session_state.use_reddit and not st.session_state.use_youtube:
st.warning("Please select at least one data source")
return
with st.spinner(f"Fetching data for '{st.session_state.search_keyword}'..."):
# Fetch data
dfs = []
if st.session_state.use_reddit:
reddit_df = fetch_reddit_data(
st.session_state.search_keyword,
st.session_state.max_results
)
if not reddit_df.empty:
dfs.append(reddit_df)
if st.session_state.use_youtube:
youtube_df = fetch_youtube_data(
st.session_state.search_keyword,
st.session_state.max_results
)
if not youtube_df.empty:
dfs.append(youtube_df)
if not dfs:
st.error("No data found. Try different keywords or sources.")
return
df = pd.concat(dfs, ignore_index=True)
# Analyze sentiment
with st.spinner("Analyzing sentiment..."):
sentiment_results = []
for text in df['text']:
res = analyze_sentiment(text)
sentiment_results.append({
'vader': res['vader'],
'bert': res['bert'],
'textblob': res['textblob'],
'average_sentiment': np.mean([res['vader'], res['bert'], res['textblob']])
})
sentiment_df = pd.DataFrame(sentiment_results)
df = pd.concat([df, sentiment_df], axis=1)
# Filter recent data
df = df[df['date'] >= (datetime.now() - timedelta(days=60))]
df = df.sort_values('date')
# Calculate moving average
df['rolling_sentiment'] = df['average_sentiment'].rolling(
window=7,
min_periods=1
).mean()
# Display results
st.subheader(f"Results for: '{st.session_state.search_keyword}'")
# Overall metrics
avg_sentiment = df['average_sentiment'].mean()
pos_pct = (df['average_sentiment'] > Config.POSITIVE_THRESHOLD).mean() * 100
neg_pct = (df['average_sentiment'] < Config.NEGATIVE_THRESHOLD).mean() * 100
col1, col2, col3 = st.columns(3)
col1.metric("Average Sentiment", f"{avg_sentiment:.2f}")
col2.metric("Positive Content", f"{pos_pct:.1f}%")
col3.metric("Negative Content", f"{neg_pct:.1f}%")
# Word cloud
st.subheader("Word Cloud")
combined_text = " ".join(df['text'])
wc_img = create_wordcloud(combined_text)
if wc_img:
st.image(f"data:image/png;base64,{wc_img}", use_container_width=True)
# Timeline visualization
st.subheader("Sentiment Timeline")
plot_sentiment_timeline(df)
# Raw data
with st.expander("View Raw Data"):
st.dataframe(df, use_container_width=True)
if __name__ == "__main__":
main()