Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| from datetime import datetime, timedelta | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from plotly.subplots import make_subplots | |
| from sklearn.linear_model import LinearRegression | |
| from sklearn.ensemble import RandomForestRegressor | |
| from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification | |
| from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer | |
| from wordcloud import WordCloud, STOPWORDS | |
| import matplotlib.pyplot as plt | |
| from io import BytesIO | |
| import base64 | |
| import nltk | |
| from nltk.corpus import stopwords | |
| from nltk.tokenize import word_tokenize | |
| from nltk.stem import WordNetLemmatizer | |
| import re | |
| import json | |
| import os | |
| import pickle | |
| from textblob import TextBlob | |
| # Download necessary NLTK data | |
| try: | |
| nltk.data.find('tokenizers/punkt') | |
| nltk.data.find('corpora/stopwords') | |
| nltk.data.find('corpora/wordnet') | |
| except LookupError: | |
| st.info("Downloading NLTK resources...") | |
| nltk.download('punkt') | |
| nltk.download('stopwords') | |
| nltk.download('wordnet') | |
| # Page configuration | |
| st.set_page_config( | |
| page_title="SentiMind Pro - Advanced Sentiment Analysis", | |
| page_icon="📊", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| # Custom CSS | |
| st.markdown(""" | |
| <style> | |
| .main-header { | |
| font-size: 2.5rem; | |
| color: #1E88E5; | |
| text-align: center; | |
| margin-bottom: 1rem; | |
| font-weight: bold; | |
| } | |
| .sub-header { | |
| font-size: 1.5rem; | |
| color: #0D47A1; | |
| margin-top: 2rem; | |
| margin-bottom: 1rem; | |
| font-weight: bold; | |
| } | |
| .description { | |
| font-size: 1rem; | |
| color: #424242; | |
| margin-bottom: 2rem; | |
| } | |
| .results-container { | |
| background-color: #f5f5f5; | |
| padding: 1.5rem; | |
| border-radius: 10px; | |
| margin-bottom: 2rem; | |
| } | |
| .metric-card { | |
| background-color: white; | |
| padding: 1rem; | |
| border-radius: 10px; | |
| box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); | |
| text-align: center; | |
| } | |
| .metric-value { | |
| font-size: 1.8rem; | |
| font-weight: bold; | |
| color: #1E88E5; | |
| } | |
| .metric-label { | |
| font-size: 0.9rem; | |
| color: #616161; | |
| } | |
| .footer { | |
| text-align: center; | |
| margin-top: 3rem; | |
| color: #616161; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # Session state initialization | |
| if 'initialized' not in st.session_state: | |
| st.session_state.initialized = False | |
| st.session_state.user_input = "" | |
| st.session_state.analysis_done = False | |
| st.session_state.historical_data = None | |
| st.session_state.sentiment_models = {} | |
| st.session_state.historical_inputs = [] | |
| st.session_state.historical_results = [] | |
| # ----------- HELPER FUNCTIONS ----------- | |
| def preprocess_text(text): | |
| """Preprocess text for sentiment analysis""" | |
| # Convert to lowercase | |
| text = text.lower() | |
| # Remove URLs | |
| text = re.sub(r'http\S+|www\S+|https\S+', '', text) | |
| # Remove mentions and hashtags | |
| text = re.sub(r'@\w+|#\w+', '', text) | |
| # Remove punctuation | |
| text = re.sub(r'[^\w\s]', '', text) | |
| # Remove extra whitespace | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| # Tokenize | |
| tokens = word_tokenize(text) | |
| # Remove stopwords | |
| stop_words = set(stopwords.words('english')) | |
| tokens = [word for word in tokens if word not in stop_words] | |
| # Lemmatize | |
| lemmatizer = WordNetLemmatizer() | |
| tokens = [lemmatizer.lemmatize(word) for word in tokens] | |
| return ' '.join(tokens) | |
| def initialize_models(): | |
| """Initialize sentiment analysis models with loading spinner""" | |
| with st.spinner('Initializing sentiment analysis models...'): | |
| # VADER Sentiment Analysis | |
| st.session_state.sentiment_models['vader'] = SentimentIntensityAnalyzer() | |
| # BERT Sentiment Analysis | |
| try: | |
| model_name = "distilbert-base-uncased-finetuned-sst-2-english" | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| model = AutoModelForSequenceClassification.from_pretrained(model_name) | |
| st.session_state.sentiment_models['bert'] = pipeline("sentiment-analysis", model=model, tokenizer=tokenizer) | |
| except Exception as e: | |
| st.error(f"Error loading BERT model: {e}") | |
| st.session_state.sentiment_models['bert'] = pipeline("sentiment-analysis") | |
| # TextBlob for additional analysis | |
| st.session_state.sentiment_models['textblob'] = TextBlob | |
| def generate_sample_data(): | |
| """Generate realistic sample data for demonstration""" | |
| end_date = datetime.today() | |
| start_date = end_date - timedelta(days=30) | |
| dates = pd.date_range(start=start_date, end=end_date, freq='D') | |
| # Generate more realistic sentiment patterns | |
| weekday_effect = np.array([0.1 if d.weekday() >= 5 else 0 for d in dates]) | |
| trend = np.linspace(-0.2, 0.3, len(dates)) | |
| seasonal = np.array([-0.15 if d.weekday() == 0 else 0.05 if d.weekday() == 4 else 0 for d in dates]) | |
| noise = np.random.normal(0, 0.2, len(dates)) | |
| sentiment_scores = np.clip(weekday_effect + trend + seasonal + noise, -1, 1) | |
| df = pd.DataFrame({ | |
| "Date": dates, | |
| "Sentiment Score": sentiment_scores, | |
| "Volume": np.random.randint(50, 500, len(dates)) # Simulated volume | |
| }) | |
| df['Day'] = df['Date'].dt.dayofweek | |
| df['Hour'] = np.random.randint(0, 24, len(df)) | |
| df['Weekday'] = df['Date'].dt.day_name() | |
| df['Month'] = df['Date'].dt.month_name() | |
| return df | |
| def train_prediction_models(df): | |
| """Train multiple prediction models and return the best one""" | |
| X = df.copy() | |
| X['day_of_week'] = X['Date'].dt.dayofweek | |
| X['day_of_month'] = X['Date'].dt.day | |
| X['month'] = X['Date'].dt.month | |
| X['trend'] = np.arange(len(X)) | |
| features = ['day_of_week', 'day_of_month', 'month', 'trend'] | |
| X_train = X[features].values | |
| y_train = X['Sentiment Score'].values | |
| models = { | |
| 'Linear Regression': LinearRegression(), | |
| 'Random Forest': RandomForestRegressor(n_estimators=100, random_state=42) | |
| } | |
| for name, model in models.items(): | |
| model.fit(X_train, y_train) | |
| future_dates = pd.date_range( | |
| start=df['Date'].max() + timedelta(days=1), | |
| periods=14, | |
| freq='D' | |
| ) | |
| X_future = pd.DataFrame({ | |
| 'Date': future_dates, | |
| 'day_of_week': future_dates.dayofweek, | |
| 'day_of_month': future_dates.day, | |
| 'month': future_dates.month, | |
| 'trend': np.arange(len(X_train), len(X_train) + len(future_dates)) | |
| }) | |
| predictions = {} | |
| for name, model in models.items(): | |
| y_pred = model.predict(X_future[features].values) | |
| predictions[name] = pd.DataFrame({ | |
| 'Date': future_dates, | |
| 'Predicted Sentiment': np.clip(y_pred, -1, 1) | |
| }) | |
| return models['Random Forest'], predictions | |
| def generate_wordcloud(text, sentiment_score): | |
| """Generate a wordcloud colored by sentiment""" | |
| text = preprocess_text(text) | |
| stopwords = set(STOPWORDS) | |
| def color_func(word, font_size, position, orientation, random_state=None, **kwargs): | |
| if sentiment_score > 0.5: | |
| return "rgb(0, 128, 0)" # Green | |
| elif sentiment_score > 0: | |
| return "rgb(0, 255, 0)" # Light green | |
| elif sentiment_score > -0.5: | |
| return "rgb(255, 165, 0)" # Orange | |
| else: | |
| return "rgb(255, 0, 0)" # Red | |
| wc = WordCloud( | |
| width=800, | |
| height=400, | |
| background_color='white', | |
| max_words=100, | |
| stopwords=stopwords, | |
| contour_width=3, | |
| contour_color='steelblue' | |
| ) | |
| wordcloud = wc.generate(text) | |
| wordcloud.recolor(color_func=color_func) | |
| img = BytesIO() | |
| plt.figure(figsize=(10, 5)) | |
| plt.imshow(wordcloud, interpolation='bilinear') | |
| plt.axis('off') | |
| plt.tight_layout() | |
| plt.savefig(img, format='PNG', bbox_inches='tight') | |
| plt.close() | |
| return base64.b64encode(img.getvalue()).decode() | |
| def analyze_sentiment(text): | |
| """Perform sentiment analysis using multiple models""" | |
| processed_text = preprocess_text(text) | |
| vader_result = st.session_state.sentiment_models['vader'].polarity_scores(text) | |
| vader_score = vader_result['compound'] | |
| bert_result = st.session_state.sentiment_models['bert'](text)[0] | |
| bert_score = bert_result['score'] if bert_result['label'] == 'POSITIVE' else -bert_result['score'] | |
| blob = st.session_state.sentiment_models['textblob'](text) | |
| textblob_score = blob.sentiment.polarity | |
| combined_score = (0.4 * vader_score + 0.4 * bert_score + 0.2 * textblob_score) | |
| key_phrases = extract_key_phrases(text) | |
| emotions = analyze_emotions(text) | |
| sentiment_results = { | |
| 'raw_text': text, | |
| 'processed_text': processed_text, | |
| 'vader': { | |
| 'score': vader_score, | |
| 'breakdown': vader_result | |
| }, | |
| 'bert': { | |
| 'score': bert_score, | |
| 'label': bert_result['label'], | |
| 'confidence': bert_result['score'] | |
| }, | |
| 'textblob': { | |
| 'score': textblob_score, | |
| 'subjectivity': blob.sentiment.subjectivity | |
| }, | |
| 'combined_score': combined_score, | |
| 'key_phrases': key_phrases, | |
| 'emotions': emotions, | |
| 'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| } | |
| return sentiment_results | |
| def extract_key_phrases(text, num_phrases=5): | |
| """Extract key phrases from text""" | |
| blob = TextBlob(text) | |
| noun_phrases = blob.noun_phrases | |
| if len(noun_phrases) < num_phrases: | |
| tokens = word_tokenize(text.lower()) | |
| bigrams = list(nltk.bigrams(tokens)) | |
| bigram_phrases = [' '.join(bigram) for bigram in bigrams] | |
| all_phrases = list(noun_phrases) + bigram_phrases | |
| stop_words = set(stopwords.words('english')) | |
| filtered_phrases = [ | |
| phrase for phrase in all_phrases | |
| if not all(word in stop_words for word in phrase.split()) | |
| ] | |
| return list(set(filtered_phrases))[:num_phrases] | |
| return list(set(noun_phrases))[:num_phrases] | |
| def analyze_emotions(text): | |
| """Analyze emotions in text""" | |
| emotion_dict = { | |
| 'joy': ['happy', 'delighted', 'pleased', 'glad', 'joy', 'love', 'excellent', 'wonderful'], | |
| 'sadness': ['sad', 'unhappy', 'sorrow', 'depressed', 'down', 'gloomy'], | |
| 'anger': ['angry', 'mad', 'furious', 'irritated', 'annoyed'], | |
| 'fear': ['afraid', 'scared', 'fearful', 'terrified', 'worried'], | |
| 'surprise': ['surprised', 'amazed', 'astonished', 'shocked'], | |
| } | |
| emotions = {emotion: 0 for emotion in emotion_dict.keys()} | |
| for word in text.split(): | |
| for emotion, keywords in emotion_dict.items(): | |
| if word in keywords: | |
| emotions[emotion] += 1 | |
| return emotions | |
| # Main application logic | |
| def main(): | |
| st.title("SentiMind Pro - Advanced Sentiment Analysis") | |
| if not st.session_state.initialized: | |
| initialize_models() | |
| st.session_state.initialized = True | |
| st.subheader("Enter Text for Sentiment Analysis") | |
| user_input = st.text_area("Input Text", height=150) | |
| if st.button("Analyze Sentiment"): | |
| if user_input: | |
| sentiment_results = analyze_sentiment(user_input) | |
| st.session_state.historical_inputs.append(user_input) | |
| st.session_state.historical_results.append(sentiment_results) | |
| st.session_state.analysis_done = True | |
| # Display results | |
| st.markdown("### Sentiment Analysis Results") | |
| st.json(sentiment_results) | |
| # Generate Word Cloud | |
| wordcloud_image = generate_wordcloud(user_input, sentiment_results['combined_score']) | |
| st.image(f"data:image/png;base64,{wordcloud_image}", use_column_width=True) | |
| else: | |
| st.warning("Please enter some text for analysis.") | |
| if st.session_state.analysis_done: | |
| st.subheader("Historical Analysis") | |
| if st.session_state.historical_results: | |
| for i, result in enumerate(st.session_state.historical_results): | |
| st.markdown(f"**Input Text {i + 1}:** {st.session_state.historical_inputs[i]}") | |
| st.json(result) | |
| st.markdown("<footer class='footer'>© 2023 SentiMind Pro. All rights reserved.</footer>", unsafe_allow_html=True) | |
| if __name__ == "__main__": | |
| main() |