Spaces:
Sleeping
Sleeping
| import os | |
| import streamlit as st | |
| import requests | |
| import pandas as pd | |
| import nltk | |
| from wordcloud import WordCloud | |
| import matplotlib.pyplot as plt | |
| from sklearn.feature_extraction.text import CountVectorizer, ENGLISH_STOP_WORDS | |
| from sklearn.decomposition import LatentDirichletAllocation | |
| from datetime import datetime, timedelta | |
| # Download VADER lexicon (if not already downloaded) | |
| nltk.download('vader_lexicon') | |
| from nltk.sentiment.vader import SentimentIntensityAnalyzer | |
| # Global VADER instance | |
| sia = SentimentIntensityAnalyzer() | |
| # Environment variable for the API key | |
| API_KEY = os.getenv("FMP_API_KEY") | |
| # Maximum pages to fetch | |
| MAX_PAGES = 3 | |
| # Store stateful data | |
| if "all_run" not in st.session_state: | |
| st.session_state.all_run = False | |
| if "symbol_run" not in st.session_state: | |
| st.session_state.symbol_run = False | |
| if "selected_symbol" not in st.session_state: | |
| st.session_state.selected_symbol = "AAPL" | |
| if "selected_date" not in st.session_state: | |
| st.session_state.selected_date = datetime.now().date() - timedelta(days=30) | |
| if "selected_topics_all" not in st.session_state: | |
| st.session_state.selected_topics_all = 10 | |
| if "selected_topics_symbol" not in st.session_state: | |
| st.session_state.selected_topics_symbol = 10 | |
| ############################# | |
| # Utility Functions | |
| ############################# | |
| def process_press_releases_df(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Add a sentiment score using VADER for each press release row. | |
| Returns the updated DataFrame. | |
| """ | |
| if df.empty: | |
| return df | |
| df["sentiment"] = df["text"].apply(lambda x: sia.polarity_scores(x)["compound"]) | |
| return df | |
| def generate_wordcloud(df: pd.DataFrame): | |
| """ | |
| Generate and display a word cloud from the 'text' column. | |
| """ | |
| all_text = " ".join(df["text"].dropna().tolist()) | |
| if not all_text: | |
| st.write("No text found for generating a word cloud.") | |
| return | |
| wc = WordCloud(width=800, height=400, background_color="white").generate(all_text) | |
| fig, ax = plt.subplots(figsize=(10, 5)) | |
| ax.imshow(wc, interpolation="bilinear") | |
| ax.axis("off") | |
| st.pyplot(fig) | |
| def run_topic_modeling(df: pd.DataFrame, n_topics=10, n_top_words=10): | |
| """ | |
| Perform topic modeling using LDA. Display top words for each topic. | |
| """ | |
| texts = df["text"].dropna().tolist() | |
| if not texts: | |
| st.write("No text available for topic modeling.") | |
| return | |
| # Extend default English stop words with common press release terms. | |
| custom_stop_words = list(ENGLISH_STOP_WORDS.union({ | |
| "said", "reuters", "inc", "llc", "corp", "co", "company", "news", "press", "release" | |
| })) | |
| vectorizer = CountVectorizer(stop_words=custom_stop_words) | |
| X = vectorizer.fit_transform(texts) | |
| lda = LatentDirichletAllocation(n_components=n_topics, random_state=42) | |
| lda.fit(X) | |
| # Build a dictionary of topic names -> top words | |
| topics = {} | |
| for topic_idx, topic in enumerate(lda.components_): | |
| top_features_ind = topic.argsort()[:-n_top_words - 1:-1] | |
| top_features = [vectorizer.get_feature_names_out()[i] for i in top_features_ind] | |
| topics[f"Topic {topic_idx+1}"] = top_features | |
| st.write("### Topic Modeling Results") | |
| for topic_label, words in topics.items(): | |
| st.write(f"**{topic_label}:** {', '.join(words)}") | |
| ############################# | |
| # PAGE 1: Press Releases Live Feed | |
| ############################# | |
| def fetch_press_releases_all() -> pd.DataFrame: | |
| """ | |
| Fetch recent press releases from multiple companies across several pages. | |
| Returns a combined DataFrame. | |
| """ | |
| frames = [] | |
| for page in range(MAX_PAGES): | |
| url = f"https://financialmodelingprep.com/api/v3/press-releases?page={page}&apikey={API_KEY}" | |
| try: | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| data = response.json() | |
| if not data: | |
| break | |
| frames.append(pd.DataFrame(data)) | |
| except Exception: | |
| # Fail gracefully without naming the data source | |
| return pd.DataFrame() | |
| if frames: | |
| df = pd.concat(frames, ignore_index=True) | |
| if "date" in df.columns: | |
| df["date"] = pd.to_datetime(df["date"]) | |
| return df | |
| return pd.DataFrame() | |
| def run_all_press_releases(): | |
| st.write("**Press Releases Live Feed**") | |
| st.write( | |
| "Here, you will see the latest press releases aggregated from various companies. " | |
| "Explore the table for publication dates, text content, and automated sentiment. " | |
| "Use the Word Cloud and Topic Modeling below to uncover common themes." | |
| ) | |
| df = fetch_press_releases_all() | |
| if df.empty: | |
| st.error("No press releases found.") | |
| return | |
| # Process text for sentiment | |
| df = process_press_releases_df(df) | |
| st.dataframe(df, use_container_width=True) | |
| st.subheader("Word Cloud") | |
| generate_wordcloud(df) | |
| st.subheader("Topic Modeling") | |
| run_topic_modeling(df, n_topics=st.session_state.selected_topics_all) | |
| ############################# | |
| # PAGE 2: Press Releases by Company | |
| ############################# | |
| def fetch_press_releases_by_symbol(symbol: str) -> pd.DataFrame: | |
| """ | |
| Fetch recent press releases for a single company symbol across several pages. | |
| Returns a combined DataFrame. | |
| """ | |
| frames = [] | |
| for page in range(MAX_PAGES): | |
| url = f"https://financialmodelingprep.com/api/v3/press-releases/{symbol}?page={page}&apikey={API_KEY}" | |
| try: | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| data = response.json() | |
| if not data: | |
| break | |
| frames.append(pd.DataFrame(data)) | |
| except Exception: | |
| # Fail gracefully without naming the data source | |
| return pd.DataFrame() | |
| if frames: | |
| df = pd.concat(frames, ignore_index=True) | |
| if "date" in df.columns: | |
| df["date"] = pd.to_datetime(df["date"]) | |
| return df | |
| return pd.DataFrame() | |
| def run_symbol_press_releases(symbol: str, start_date, n_topics): | |
| st.write("**Press Releases by Company**") | |
| st.write( | |
| f"Browse recent press releases for **{symbol}**, starting from {start_date}. " | |
| "View release text, publication dates, and sentiment analysis. " | |
| "Below, discover prevalent words and recurring topics for these press releases." | |
| ) | |
| df = fetch_press_releases_by_symbol(symbol) | |
| if df.empty: | |
| st.error(f"No press releases found for {symbol}.") | |
| return | |
| # Filter by user-chosen date | |
| if "date" in df.columns: | |
| df = df[df["date"].dt.date >= start_date] | |
| # Process text for sentiment | |
| df = process_press_releases_df(df) | |
| st.dataframe(df, use_container_width=True) | |
| st.subheader("Word Cloud") | |
| generate_wordcloud(df) | |
| st.subheader("Topic Modeling") | |
| run_topic_modeling(df, n_topics=n_topics) | |
| ############################# | |
| # MAIN APP | |
| ############################# | |
| def main(): | |
| st.set_page_config(page_title="Press Releases", layout="wide") | |
| st.title("Press Releases Analysis") | |
| st.write( | |
| "Explore recent press releases from multiple companies or focus on a single company. " | |
| "Each page provides a table of press releases, sentiment analysis, a word cloud, and topic modeling." | |
| ) | |
| # Sidebar navigation | |
| with st.sidebar.expander("Navigation and Options", expanded=True): | |
| page = st.radio( | |
| "Select Page", | |
| ("Press Releases Live Feed", "Press Releases by Company"), | |
| help="Choose between a broad overview or a single company's releases." | |
| ) | |
| if page == "Press Releases Live Feed": | |
| st.session_state.selected_topics_all = st.number_input( | |
| "Number of Topics for Live Feed", | |
| value=st.session_state.selected_topics_all, | |
| min_value=1, | |
| max_value=20, | |
| help="Choose how many topics you want to see in the topic model." | |
| ) | |
| if st.button("Run"): | |
| st.session_state.all_run = True | |
| elif page == "Press Releases by Company": | |
| symbol = st.text_input( | |
| "Ticker Symbol", | |
| value=st.session_state.selected_symbol, | |
| help="Type the company's ticker symbol." | |
| ) | |
| st.session_state.selected_symbol = symbol | |
| start_date = st.date_input( | |
| "Start Date", | |
| value=st.session_state.selected_date, | |
| help="Only press releases on or after this date will appear." | |
| ) | |
| st.session_state.selected_date = start_date | |
| st.session_state.selected_topics_symbol = st.number_input( | |
| "Number of Topics for Company", | |
| value=st.session_state.selected_topics_symbol, | |
| min_value=1, | |
| max_value=20, | |
| help="Choose how many topics you want to see in the topic model." | |
| ) | |
| if st.button("Run"): | |
| st.session_state.symbol_run = True | |
| # Main body content | |
| if page == "Press Releases Live Feed": | |
| st.header("Press Releases Live Feed") | |
| if st.session_state.all_run: | |
| run_all_press_releases() | |
| else: | |
| st.info("Pick how many topics to show, then click 'Run Press Releases Live Feed'.") | |
| elif page == "Press Releases by Company": | |
| st.header("Press Releases by Company") | |
| if st.session_state.symbol_run: | |
| run_symbol_press_releases( | |
| st.session_state.selected_symbol, | |
| st.session_state.selected_date, | |
| st.session_state.selected_topics_symbol | |
| ) | |
| else: | |
| st.info("Enter a ticker symbol, date, and number of topics, then click 'Run Press Releases by Company'.") | |
| if __name__ == "__main__": | |
| main() | |
| hide_streamlit_style = """ | |
| <style> | |
| #MainMenu {visibility: hidden;} | |
| footer {visibility: hidden;} | |
| </style> | |
| """ | |
| st.markdown(hide_streamlit_style, unsafe_allow_html=True) | |