Space44 / app.py
QuantumLearner's picture
Update app.py
ca7627a verified
import streamlit as st
import pandas as pd
import datetime
import re
import os
import asyncio
import aiohttp
import plotly.graph_objects as go
from dateutil.relativedelta import relativedelta
st.set_page_config(page_title="Congress Trading Activity", layout="wide")
API_KEY = os.getenv("FMP_API_KEY")
SENATE_BASE_URL = "https://financialmodelingprep.com/api/v4/senate-trading-rss-feed"
HOUSE_BASE_URL = "https://financialmodelingprep.com/api/v4/senate-disclosure-rss-feed"
# ---------------------------
# ASYNC FUNCTIONS FOR FETCHING DATA
# ---------------------------
async def fetch_data_page(session, base_url, page):
url = f"{base_url}?page={page}&apikey={API_KEY}"
try:
async with session.get(url) as response:
if response.status == 200:
return await response.json()
else:
return [] # Fail gracefully
except Exception:
return []
async def fetch_all_data_async(base_url, pages=5):
async with aiohttp.ClientSession() as session:
tasks = [fetch_data_page(session, base_url, page) for page in range(pages)]
results = []
progress_bar = st.progress(0)
completed = 0
for coro in asyncio.as_completed(tasks):
data = await coro
results.extend(data)
completed += 1
progress_bar.progress(completed / pages)
return results
def load_data_async(base_url, pages=5):
raw_data = asyncio.run(fetch_all_data_async(base_url, pages))
if not raw_data:
return pd.DataFrame()
df = pd.DataFrame(raw_data)
if "transactionDate" in df.columns:
df["transactionDate"] = pd.to_datetime(df["transactionDate"], errors="coerce")
df.sort_values(by="transactionDate", ascending=False, inplace=True)
return df
# ---------------------------
# HELPER FUNCTION TO PARSE AMOUNT RANGE
# ---------------------------
def parse_amount_range(amount_str):
if not isinstance(amount_str, str):
return None
clean_str = amount_str.replace("$", "").replace(",", "")
if " - " in clean_str:
low, high = clean_str.split(" - ")
try:
return (int(low) + int(high)) / 2
except ValueError:
return None
match = re.match(r"\d+", clean_str)
return float(match.group()) if match else None
# ---------------------------
# MAIN APP CODE
# ---------------------------
st.sidebar.title("Filters")
with st.sidebar.expander("Parameters", expanded=True):
default_start_date = datetime.date.today() - relativedelta(months=3)
start_date = st.date_input("Start transaction date", value=default_start_date)
top_n = st.slider("Top N stocks", min_value=1, max_value=20, value=10,
help="Select the top N stock by trade amount and volume.")
run_button = st.sidebar.button("Run Analysis")
st.title("Congress Trades Analysis")
st.write(
"Analyze recent stock trades reported by members of Congress, including both the Senate and the House. "
"This tool shows transaction-level data and summary charts."
)
# Function to standardize trade type
def standardize_trade_type(t):
if "sale" in t or "sold" in t or "sell" in t:
return "sale"
return "purchase"
if run_button:
# Use asynchronous fetching for both Senate and House
senate_data = load_data_async(SENATE_BASE_URL, pages=5)
house_data = load_data_async(HOUSE_BASE_URL, pages=5)
# Process Senate data
if not senate_data.empty:
senate_data["transactionDate"] = pd.to_datetime(senate_data["transactionDate"], errors="coerce")
senate_data["dateRecieved"] = pd.to_datetime(senate_data["dateRecieved"], errors="coerce")
senate_data = senate_data[
(senate_data["transactionDate"] >= pd.to_datetime(start_date)) |
(senate_data["dateRecieved"] >= pd.to_datetime(start_date))
]
# Process House data
if not house_data.empty:
house_data["transactionDate"] = pd.to_datetime(house_data["transactionDate"], errors="coerce")
house_data["disclosureDate"] = pd.to_datetime(house_data["disclosureDate"], errors="coerce")
house_data = house_data[
(house_data["transactionDate"] >= pd.to_datetime(start_date)) |
(house_data["disclosureDate"] >= pd.to_datetime(start_date))
]
# ---------------------------
# Time Series Chart Data Preparation
# ---------------------------
# Prepare Senate time series data
if not senate_data.empty:
senate_time = senate_data.copy()
senate_time["trade_date"] = senate_time["dateRecieved"]
senate_time["tradeType"] = senate_time["type"].str.lower().apply(standardize_trade_type)
senate_time["transactionVolume"] = 1
senate_time["parsed_amount"] = senate_time["amount"].apply(parse_amount_range)
else:
senate_time = pd.DataFrame()
# Prepare House time series data
if not house_data.empty:
house_time = house_data.copy()
house_time["trade_date"] = house_time["disclosureDate"]
house_time["tradeType"] = house_time["type"].str.lower().apply(standardize_trade_type)
house_time["transactionVolume"] = 1
house_time["parsed_amount"] = house_time["amount"].apply(parse_amount_range)
else:
house_time = pd.DataFrame()
time_series_data = pd.concat([senate_time, house_time], ignore_index=True)
time_series_data = time_series_data.dropna(subset=["trade_date", "transactionVolume", "parsed_amount"])
time_grouped = (
time_series_data
.groupby(["trade_date", "tradeType"], as_index=False)
.agg(total_transactions=("transactionVolume", "sum"),
avg_amount=("parsed_amount", "mean"))
)
time_grouped.sort_values("trade_date", inplace=True)
# Create Plotly time series bar chart
fig_time = go.Figure()
for trade in ["purchase", "sale"]:
df_subset = time_grouped[time_grouped["tradeType"] == trade]
if not df_subset.empty:
fig_time.add_trace(go.Bar(
x=df_subset["trade_date"],
y=df_subset["total_transactions"],
name=trade,
customdata=df_subset["avg_amount"],
hovertemplate="Date: %{x|%Y-%m-%d}<br>Total Transactions: %{y}<br>Avg Amount: %{customdata:.2f}<extra></extra>",
marker_color="green" if trade == "purchase" else "red"
))
fig_time.update_layout(
barmode="group",
xaxis=dict(
tickformat="%Y-%m-%d",
dtick="D3", # one tick per day
tickangle=-45
),
xaxis_title="Date",
yaxis_title="Total Transaction Count",
title="Time Series of Trade Volume",
height=400
)
# ---------------------------
# Display Time Series Chart in Container
# ---------------------------
with st.container(border=True):
st.plotly_chart(fig_time, use_container_width=True)
# ---------------------------
# Chart: Total Amount per Ticker
# ---------------------------
# Prepare chart data for Senate
senate_chart_data = pd.DataFrame()
if not senate_data.empty:
senate_chart_data = pd.DataFrame({
"ticker": senate_data["symbol"],
"rawType": senate_data["type"].str.lower(),
"amount": senate_data["amount"].apply(parse_amount_range),
"chamber": "Senate"
})
# Prepare chart data for House
house_chart_data = pd.DataFrame()
if not house_data.empty:
house_chart_data = pd.DataFrame({
"ticker": house_data["ticker"],
"rawType": house_data["type"].str.lower(),
"amount": house_data["amount"].apply(parse_amount_range),
"chamber": "House"
})
combined_data = pd.concat([senate_chart_data, house_chart_data], ignore_index=True)
combined_data.dropna(subset=["amount", "ticker"], inplace=True)
combined_data = combined_data[combined_data["amount"] > 0]
combined_data["tradeType"] = combined_data["rawType"].apply(standardize_trade_type)
combined_data["count"] = 1
# Get top N by sum
sum_per_ticker = (
combined_data
.groupby("ticker", as_index=False)["amount"]
.sum()
.sort_values("amount", ascending=False)
.head(top_n)
)
top_tickers = sum_per_ticker["ticker"].unique()
filtered_data = combined_data[combined_data["ticker"].isin(top_tickers)]
if filtered_data.empty:
st.write("No data available for the selected filters.")
else:
chart_data = (
filtered_data
.groupby(["ticker", "chamber", "tradeType"], as_index=False)
.agg({"amount": "sum", "count": "sum"})
)
fig = go.Figure()
for chamber in chart_data["chamber"].unique():
for trade in ["purchase", "sale"]:
df_subset = chart_data[(chart_data["chamber"] == chamber) & (chart_data["tradeType"] == trade)]
if not df_subset.empty:
color = "green" if trade == "purchase" else "red"
fig.add_trace(go.Bar(
x=df_subset["ticker"],
y=df_subset["amount"],
text=df_subset["count"],
textposition="auto",
name=f"{chamber} {trade}",
offsetgroup=chamber,
marker_color=color
))
fig.update_layout(
barmode="stack",
xaxis_tickangle=-45,
xaxis_title="Ticker",
yaxis_title="Total Amount",
title="Total Amount per Ticker",
width=40 * len(top_tickers),
height=400
)
with st.container(border=True):
st.plotly_chart(fig, use_container_width=True)
# ---------------------------
# Reorder and Display Senate Table
# ---------------------------
if not senate_data.empty:
desired_order_senate = [
"office",
"dateRecieved",
"symbol",
"type",
"amount",
"assetDescription"
]
existing_senate_cols = [c for c in desired_order_senate if c in senate_data.columns]
remaining_senate_cols = [c for c in senate_data.columns if c not in existing_senate_cols]
reordered_senate_cols = existing_senate_cols + remaining_senate_cols
senate_data = senate_data[reordered_senate_cols]
with st.container(border=True):
st.subheader("Senate Data")
st.write("Latest Transaction in Senate. Please sort by **`disclosureDate`** and/or **`dateRecieved`**.")
st.dataframe(senate_data, use_container_width=True)
# ---------------------------
# Reorder and Display House Table
# ---------------------------
if not house_data.empty:
desired_order_house = [
"representative",
"disclosureDate",
"ticker",
"type",
"amount",
"assetDescription"
]
existing_house_cols = [c for c in desired_order_house if c in house_data.columns]
remaining_house_cols = [c for c in house_data.columns if c not in existing_house_cols]
reordered_house_cols = existing_house_cols + remaining_house_cols
house_data = house_data[reordered_house_cols]
with st.container(border=True):
st.subheader("House Data")
st.write("Latest Transaction in House. Please sort by **`disclosureDate`** and/or **`transactionDate`**.")
st.dataframe(house_data, use_container_width=True)
else:
st.info("Set filters and press Run to load data.")
hide_streamlit_style = """
<style>
#MainMenu {visibility: hidden;}
footer {visibility: hidden;}
</style>
"""
st.markdown(hide_streamlit_style, unsafe_allow_html=True)