import os import streamlit as st import requests import pandas as pd import nltk from wordcloud import WordCloud import matplotlib.pyplot as plt from sklearn.feature_extraction.text import CountVectorizer, ENGLISH_STOP_WORDS from sklearn.decomposition import LatentDirichletAllocation from datetime import datetime, timedelta # Download VADER lexicon (if not already downloaded) nltk.download('vader_lexicon') from nltk.sentiment.vader import SentimentIntensityAnalyzer # Global VADER instance sia = SentimentIntensityAnalyzer() # Environment variable for the API key API_KEY = os.getenv("FMP_API_KEY") # Maximum pages to fetch MAX_PAGES = 3 # Store stateful data if "all_run" not in st.session_state: st.session_state.all_run = False if "symbol_run" not in st.session_state: st.session_state.symbol_run = False if "selected_symbol" not in st.session_state: st.session_state.selected_symbol = "AAPL" if "selected_date" not in st.session_state: st.session_state.selected_date = datetime.now().date() - timedelta(days=30) if "selected_topics_all" not in st.session_state: st.session_state.selected_topics_all = 10 if "selected_topics_symbol" not in st.session_state: st.session_state.selected_topics_symbol = 10 ############################# # Utility Functions ############################# def process_press_releases_df(df: pd.DataFrame) -> pd.DataFrame: """ Add a sentiment score using VADER for each press release row. Returns the updated DataFrame. """ if df.empty: return df df["sentiment"] = df["text"].apply(lambda x: sia.polarity_scores(x)["compound"]) return df def generate_wordcloud(df: pd.DataFrame): """ Generate and display a word cloud from the 'text' column. """ all_text = " ".join(df["text"].dropna().tolist()) if not all_text: st.write("No text found for generating a word cloud.") return wc = WordCloud(width=800, height=400, background_color="white").generate(all_text) fig, ax = plt.subplots(figsize=(10, 5)) ax.imshow(wc, interpolation="bilinear") ax.axis("off") st.pyplot(fig) def run_topic_modeling(df: pd.DataFrame, n_topics=10, n_top_words=10): """ Perform topic modeling using LDA. Display top words for each topic. """ texts = df["text"].dropna().tolist() if not texts: st.write("No text available for topic modeling.") return # Extend default English stop words with common press release terms. custom_stop_words = list(ENGLISH_STOP_WORDS.union({ "said", "reuters", "inc", "llc", "corp", "co", "company", "news", "press", "release" })) vectorizer = CountVectorizer(stop_words=custom_stop_words) X = vectorizer.fit_transform(texts) lda = LatentDirichletAllocation(n_components=n_topics, random_state=42) lda.fit(X) # Build a dictionary of topic names -> top words topics = {} for topic_idx, topic in enumerate(lda.components_): top_features_ind = topic.argsort()[:-n_top_words - 1:-1] top_features = [vectorizer.get_feature_names_out()[i] for i in top_features_ind] topics[f"Topic {topic_idx+1}"] = top_features st.write("### Topic Modeling Results") for topic_label, words in topics.items(): st.write(f"**{topic_label}:** {', '.join(words)}") ############################# # PAGE 1: Press Releases Live Feed ############################# @st.cache_data(show_spinner=False) def fetch_press_releases_all() -> pd.DataFrame: """ Fetch recent press releases from multiple companies across several pages. Returns a combined DataFrame. """ frames = [] for page in range(MAX_PAGES): url = f"https://financialmodelingprep.com/api/v3/press-releases?page={page}&apikey={API_KEY}" try: response = requests.get(url) response.raise_for_status() data = response.json() if not data: break frames.append(pd.DataFrame(data)) except Exception: # Fail gracefully without naming the data source return pd.DataFrame() if frames: df = pd.concat(frames, ignore_index=True) if "date" in df.columns: df["date"] = pd.to_datetime(df["date"]) return df return pd.DataFrame() def run_all_press_releases(): st.write("**Press Releases Live Feed**") st.write( "Here, you will see the latest press releases aggregated from various companies. " "Explore the table for publication dates, text content, and automated sentiment. " "Use the Word Cloud and Topic Modeling below to uncover common themes." ) df = fetch_press_releases_all() if df.empty: st.error("No press releases found.") return # Process text for sentiment df = process_press_releases_df(df) st.dataframe(df, use_container_width=True) st.subheader("Word Cloud") generate_wordcloud(df) st.subheader("Topic Modeling") run_topic_modeling(df, n_topics=st.session_state.selected_topics_all) ############################# # PAGE 2: Press Releases by Company ############################# @st.cache_data(show_spinner=False) def fetch_press_releases_by_symbol(symbol: str) -> pd.DataFrame: """ Fetch recent press releases for a single company symbol across several pages. Returns a combined DataFrame. """ frames = [] for page in range(MAX_PAGES): url = f"https://financialmodelingprep.com/api/v3/press-releases/{symbol}?page={page}&apikey={API_KEY}" try: response = requests.get(url) response.raise_for_status() data = response.json() if not data: break frames.append(pd.DataFrame(data)) except Exception: # Fail gracefully without naming the data source return pd.DataFrame() if frames: df = pd.concat(frames, ignore_index=True) if "date" in df.columns: df["date"] = pd.to_datetime(df["date"]) return df return pd.DataFrame() def run_symbol_press_releases(symbol: str, start_date, n_topics): st.write("**Press Releases by Company**") st.write( f"Browse recent press releases for **{symbol}**, starting from {start_date}. " "View release text, publication dates, and sentiment analysis. " "Below, discover prevalent words and recurring topics for these press releases." ) df = fetch_press_releases_by_symbol(symbol) if df.empty: st.error(f"No press releases found for {symbol}.") return # Filter by user-chosen date if "date" in df.columns: df = df[df["date"].dt.date >= start_date] # Process text for sentiment df = process_press_releases_df(df) st.dataframe(df, use_container_width=True) st.subheader("Word Cloud") generate_wordcloud(df) st.subheader("Topic Modeling") run_topic_modeling(df, n_topics=n_topics) ############################# # MAIN APP ############################# def main(): st.set_page_config(page_title="Press Releases", layout="wide") st.title("Press Releases Analysis") st.write( "Explore recent press releases from multiple companies or focus on a single company. " "Each page provides a table of press releases, sentiment analysis, a word cloud, and topic modeling." ) # Sidebar navigation with st.sidebar.expander("Navigation and Options", expanded=True): page = st.radio( "Select Page", ("Press Releases Live Feed", "Press Releases by Company"), help="Choose between a broad overview or a single company's releases." ) if page == "Press Releases Live Feed": st.session_state.selected_topics_all = st.number_input( "Number of Topics for Live Feed", value=st.session_state.selected_topics_all, min_value=1, max_value=20, help="Choose how many topics you want to see in the topic model." ) if st.button("Run"): st.session_state.all_run = True elif page == "Press Releases by Company": symbol = st.text_input( "Ticker Symbol", value=st.session_state.selected_symbol, help="Type the company's ticker symbol." ) st.session_state.selected_symbol = symbol start_date = st.date_input( "Start Date", value=st.session_state.selected_date, help="Only press releases on or after this date will appear." ) st.session_state.selected_date = start_date st.session_state.selected_topics_symbol = st.number_input( "Number of Topics for Company", value=st.session_state.selected_topics_symbol, min_value=1, max_value=20, help="Choose how many topics you want to see in the topic model." ) if st.button("Run"): st.session_state.symbol_run = True # Main body content if page == "Press Releases Live Feed": st.header("Press Releases Live Feed") if st.session_state.all_run: run_all_press_releases() else: st.info("Pick how many topics to show, then click 'Run Press Releases Live Feed'.") elif page == "Press Releases by Company": st.header("Press Releases by Company") if st.session_state.symbol_run: run_symbol_press_releases( st.session_state.selected_symbol, st.session_state.selected_date, st.session_state.selected_topics_symbol ) else: st.info("Enter a ticker symbol, date, and number of topics, then click 'Run Press Releases by Company'.") if __name__ == "__main__": main() hide_streamlit_style = """ """ st.markdown(hide_streamlit_style, unsafe_allow_html=True)