Space48 / app.py
QuantumLearner's picture
Update app.py
d8195e6 verified
import os
import streamlit as st
import requests
import pandas as pd
import nltk
from wordcloud import WordCloud
import matplotlib.pyplot as plt
from sklearn.feature_extraction.text import CountVectorizer, ENGLISH_STOP_WORDS
from sklearn.decomposition import LatentDirichletAllocation
from datetime import datetime, timedelta
# Download VADER lexicon (if not already downloaded)
nltk.download('vader_lexicon')
from nltk.sentiment.vader import SentimentIntensityAnalyzer
# Global VADER instance
sia = SentimentIntensityAnalyzer()
# Environment variable for the API key
API_KEY = os.getenv("FMP_API_KEY")
# Maximum pages to fetch
MAX_PAGES = 3
# Store stateful data
if "all_run" not in st.session_state:
st.session_state.all_run = False
if "symbol_run" not in st.session_state:
st.session_state.symbol_run = False
if "selected_symbol" not in st.session_state:
st.session_state.selected_symbol = "AAPL"
if "selected_date" not in st.session_state:
st.session_state.selected_date = datetime.now().date() - timedelta(days=30)
if "selected_topics_all" not in st.session_state:
st.session_state.selected_topics_all = 10
if "selected_topics_symbol" not in st.session_state:
st.session_state.selected_topics_symbol = 10
#############################
# Utility Functions
#############################
def process_press_releases_df(df: pd.DataFrame) -> pd.DataFrame:
"""
Add a sentiment score using VADER for each press release row.
Returns the updated DataFrame.
"""
if df.empty:
return df
df["sentiment"] = df["text"].apply(lambda x: sia.polarity_scores(x)["compound"])
return df
def generate_wordcloud(df: pd.DataFrame):
"""
Generate and display a word cloud from the 'text' column.
"""
all_text = " ".join(df["text"].dropna().tolist())
if not all_text:
st.write("No text found for generating a word cloud.")
return
wc = WordCloud(width=800, height=400, background_color="white").generate(all_text)
fig, ax = plt.subplots(figsize=(10, 5))
ax.imshow(wc, interpolation="bilinear")
ax.axis("off")
st.pyplot(fig)
def run_topic_modeling(df: pd.DataFrame, n_topics=10, n_top_words=10):
"""
Perform topic modeling using LDA. Display top words for each topic.
"""
texts = df["text"].dropna().tolist()
if not texts:
st.write("No text available for topic modeling.")
return
# Extend default English stop words with common press release terms.
custom_stop_words = list(ENGLISH_STOP_WORDS.union({
"said", "reuters", "inc", "llc", "corp", "co", "company", "news", "press", "release"
}))
vectorizer = CountVectorizer(stop_words=custom_stop_words)
X = vectorizer.fit_transform(texts)
lda = LatentDirichletAllocation(n_components=n_topics, random_state=42)
lda.fit(X)
# Build a dictionary of topic names -> top words
topics = {}
for topic_idx, topic in enumerate(lda.components_):
top_features_ind = topic.argsort()[:-n_top_words - 1:-1]
top_features = [vectorizer.get_feature_names_out()[i] for i in top_features_ind]
topics[f"Topic {topic_idx+1}"] = top_features
st.write("### Topic Modeling Results")
for topic_label, words in topics.items():
st.write(f"**{topic_label}:** {', '.join(words)}")
#############################
# PAGE 1: Press Releases Live Feed
#############################
@st.cache_data(show_spinner=False)
def fetch_press_releases_all() -> pd.DataFrame:
"""
Fetch recent press releases from multiple companies across several pages.
Returns a combined DataFrame.
"""
frames = []
for page in range(MAX_PAGES):
url = f"https://financialmodelingprep.com/api/v3/press-releases?page={page}&apikey={API_KEY}"
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
if not data:
break
frames.append(pd.DataFrame(data))
except Exception:
# Fail gracefully without naming the data source
return pd.DataFrame()
if frames:
df = pd.concat(frames, ignore_index=True)
if "date" in df.columns:
df["date"] = pd.to_datetime(df["date"])
return df
return pd.DataFrame()
def run_all_press_releases():
st.write("**Press Releases Live Feed**")
st.write(
"Here, you will see the latest press releases aggregated from various companies. "
"Explore the table for publication dates, text content, and automated sentiment. "
"Use the Word Cloud and Topic Modeling below to uncover common themes."
)
df = fetch_press_releases_all()
if df.empty:
st.error("No press releases found.")
return
# Process text for sentiment
df = process_press_releases_df(df)
st.dataframe(df, use_container_width=True)
st.subheader("Word Cloud")
generate_wordcloud(df)
st.subheader("Topic Modeling")
run_topic_modeling(df, n_topics=st.session_state.selected_topics_all)
#############################
# PAGE 2: Press Releases by Company
#############################
@st.cache_data(show_spinner=False)
def fetch_press_releases_by_symbol(symbol: str) -> pd.DataFrame:
"""
Fetch recent press releases for a single company symbol across several pages.
Returns a combined DataFrame.
"""
frames = []
for page in range(MAX_PAGES):
url = f"https://financialmodelingprep.com/api/v3/press-releases/{symbol}?page={page}&apikey={API_KEY}"
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
if not data:
break
frames.append(pd.DataFrame(data))
except Exception:
# Fail gracefully without naming the data source
return pd.DataFrame()
if frames:
df = pd.concat(frames, ignore_index=True)
if "date" in df.columns:
df["date"] = pd.to_datetime(df["date"])
return df
return pd.DataFrame()
def run_symbol_press_releases(symbol: str, start_date, n_topics):
st.write("**Press Releases by Company**")
st.write(
f"Browse recent press releases for **{symbol}**, starting from {start_date}. "
"View release text, publication dates, and sentiment analysis. "
"Below, discover prevalent words and recurring topics for these press releases."
)
df = fetch_press_releases_by_symbol(symbol)
if df.empty:
st.error(f"No press releases found for {symbol}.")
return
# Filter by user-chosen date
if "date" in df.columns:
df = df[df["date"].dt.date >= start_date]
# Process text for sentiment
df = process_press_releases_df(df)
st.dataframe(df, use_container_width=True)
st.subheader("Word Cloud")
generate_wordcloud(df)
st.subheader("Topic Modeling")
run_topic_modeling(df, n_topics=n_topics)
#############################
# MAIN APP
#############################
def main():
st.set_page_config(page_title="Press Releases", layout="wide")
st.title("Press Releases Analysis")
st.write(
"Explore recent press releases from multiple companies or focus on a single company. "
"Each page provides a table of press releases, sentiment analysis, a word cloud, and topic modeling."
)
# Sidebar navigation
with st.sidebar.expander("Navigation and Options", expanded=True):
page = st.radio(
"Select Page",
("Press Releases Live Feed", "Press Releases by Company"),
help="Choose between a broad overview or a single company's releases."
)
if page == "Press Releases Live Feed":
st.session_state.selected_topics_all = st.number_input(
"Number of Topics for Live Feed",
value=st.session_state.selected_topics_all,
min_value=1,
max_value=20,
help="Choose how many topics you want to see in the topic model."
)
if st.button("Run"):
st.session_state.all_run = True
elif page == "Press Releases by Company":
symbol = st.text_input(
"Ticker Symbol",
value=st.session_state.selected_symbol,
help="Type the company's ticker symbol."
)
st.session_state.selected_symbol = symbol
start_date = st.date_input(
"Start Date",
value=st.session_state.selected_date,
help="Only press releases on or after this date will appear."
)
st.session_state.selected_date = start_date
st.session_state.selected_topics_symbol = st.number_input(
"Number of Topics for Company",
value=st.session_state.selected_topics_symbol,
min_value=1,
max_value=20,
help="Choose how many topics you want to see in the topic model."
)
if st.button("Run"):
st.session_state.symbol_run = True
# Main body content
if page == "Press Releases Live Feed":
st.header("Press Releases Live Feed")
if st.session_state.all_run:
run_all_press_releases()
else:
st.info("Pick how many topics to show, then click 'Run Press Releases Live Feed'.")
elif page == "Press Releases by Company":
st.header("Press Releases by Company")
if st.session_state.symbol_run:
run_symbol_press_releases(
st.session_state.selected_symbol,
st.session_state.selected_date,
st.session_state.selected_topics_symbol
)
else:
st.info("Enter a ticker symbol, date, and number of topics, then click 'Run Press Releases by Company'.")
if __name__ == "__main__":
main()
hide_streamlit_style = """
<style>
#MainMenu {visibility: hidden;}
footer {visibility: hidden;}
</style>
"""
st.markdown(hide_streamlit_style, unsafe_allow_html=True)