import os import datetime import requests import pandas as pd import streamlit as st from streamlit_calendar import calendar API_KEY = os.getenv("FMP_API_KEY") # ---------- Utilities ---------- def _normalize_date(date_val): """Return YYYY-MM-DD or None if invalid.""" if isinstance(date_val, datetime.datetime): return date_val.date().strftime("%Y-%m-%d") if isinstance(date_val, datetime.date): return date_val.strftime("%Y-%m-%d") if isinstance(date_val, str): s = date_val.strip() if not s: return None # handle strings like "2025-08-18 00:00:00" if len(s) >= 10: return s[:10] return None def parse_time_field(date_val, time_val): """ Convert an input date plus a 'time' label into ISO start/end strings. Handles None/missing time safely. Returns (start_iso, end_iso) or (None, None) if date invalid. """ date_str = _normalize_date(date_val) if not date_str: return None, None t = (time_val or "").strip().lower() # If a clock time is provided (e.g., "09:30" or "09:30:00") if ":" in t: # normalize to HH:MM:SS if we can parts = t.split(":") if len(parts) == 2: t = f"{t}:00" elif len(parts) >= 3: t = ":".join(parts[:3]) else: t = "00:00:00" chosen = t else: # Common labels from FMP (and synonyms) time_map = { "bmo": "06:00:00", "before market": "06:00:00", "pre-market": "06:00:00", "pre market": "07:00:00", # keep your original mapping too "amc": "18:00:00", "after market": "18:00:00", "post-market": "18:00:00", "post market": "16:00:00", # keep original "during market": "10:00:00", } chosen = time_map.get(t, "00:00:00") start_iso = f"{date_str}T{chosen}" end_iso = f"{date_str}T{chosen}" return start_iso, end_iso # ---------- API calls ---------- def fetch_earnings(from_date, to_date, limit): url = ( "https://financialmodelingprep.com/api/v4/earning-calendar-confirmed" f"?from={_normalize_date(from_date)}&to={_normalize_date(to_date)}&limit={int(limit)}&apikey={API_KEY}" ) r = requests.get(url) return r.json() if r.status_code == 200 else [] def fetch_dividends(from_date, to_date): url = ( "https://financialmodelingprep.com/api/v3/stock_dividend_calendar" f"?from={_normalize_date(from_date)}&to={_normalize_date(to_date)}&apikey={API_KEY}" ) r = requests.get(url) return r.json() if r.status_code == 200 else [] def fetch_splits(from_date, to_date): url = ( "https://financialmodelingprep.com/api/v3/stock_split_calendar" f"?from={_normalize_date(from_date)}&to={_normalize_date(to_date)}&apikey={API_KEY}" ) r = requests.get(url) return r.json() if r.status_code == 200 else [] def fetch_earnings_ticker(symbol, limit): url = ( "https://financialmodelingprep.com/api/v3/historical/earning_calendar/" f"{symbol}?limit={int(limit)}&apikey={API_KEY}" ) r = requests.get(url) return r.json() if r.status_code == 200 else [] def fetch_dividends_ticker(symbol): url = ( "https://financialmodelingprep.com/api/v3/historical-price-full/stock_dividend/" f"{symbol}?apikey={API_KEY}" ) r = requests.get(url) return r.json() if r.status_code == 200 else {} def fetch_splits_ticker(symbol): url = ( "https://financialmodelingprep.com/api/v3/historical-price-full/stock_split/" f"{symbol}?apikey={API_KEY}" ) r = requests.get(url) return r.json() if r.status_code == 200 else {} # ---------- App ---------- def main(): st.set_page_config(page_title="Earnings, Dividends, and Splits Calendar", layout="wide") if not API_KEY: st.warning("FMP_API_KEY is not set. Please add it in your Space secrets.") if "general_data" not in st.session_state: st.session_state["general_data"] = [] if "ticker_data" not in st.session_state: st.session_state["ticker_data"] = [] st.title("Corporate Events Calendar") st.write("This calendar shows earnings, dividends, and stock splits. Set parameters and click run.") st.sidebar.title("Input Parameters") with st.sidebar.expander("How to Use", expanded=False): st.write( """ 1. Select "General Calendar" or "Ticker Calendar." 2. Check which event types you want (earnings, dividends, splits). 3. For the general view, choose a date range and limit. 4. For the ticker view, choose a symbol and limit. 5. Click the button to see results. """ ) with st.sidebar.expander("", expanded=True): page_choice = st.radio("Page", ["General Calendar", "Ticker Calendar"]) today = datetime.date.today() one_month_later = today + datetime.timedelta(days=30) if page_choice == "General Calendar": with st.sidebar.expander("Event Type", expanded=True): include_earnings = st.checkbox("Include Earnings", value=True) include_dividends = st.checkbox("Include Dividends", value=True) include_splits = st.checkbox("Include Stock Splits", value=True) with st.sidebar.expander("Parameters", expanded=True): from_date = st.date_input("From Date", value=today) to_date = st.date_input("To Date", value=one_month_later) limit_val = st.number_input("Limit", value=200, min_value=1) if st.sidebar.button("Retrieve Calendar", key="fetch_general"): all_events = [] if include_earnings: for item in fetch_earnings(from_date, to_date, limit_val): date_raw = item.get("date") if not _normalize_date(date_raw): continue # skip invalid date time_raw = item.get("time") start_dt, end_dt = parse_time_field(date_raw, time_raw) if not start_dt: continue sym = item.get("symbol", "") event_entry = { "start": start_dt, "end": end_dt, "title": f"[Earnings] {sym}", "color": "#3D9DF3", "eventType": "Earnings", } event_entry.update(item) all_events.append(event_entry) if include_dividends: for item in fetch_dividends(from_date, to_date): date_raw = item.get("date") d = _normalize_date(date_raw) if not d: continue event_entry = { "start": f"{d}T00:00:00", "end": f"{d}T23:59:59", "title": f"[Dividend] {item.get('symbol','')}", "color": "#80C080", "eventType": "Dividend", } event_entry.update(item) all_events.append(event_entry) if include_splits: for item in fetch_splits(from_date, to_date): date_raw = item.get("date") d = _normalize_date(date_raw) if not d: continue event_entry = { "start": f"{d}T00:00:00", "end": f"{d}T23:59:59", "title": f"[Split] {item.get('symbol','')}", "color": "#FFC870", "eventType": "Split", } event_entry.update(item) all_events.append(event_entry) st.session_state["general_data"] = all_events st.subheader("General Calendar Results") data_general = st.session_state["general_data"] if data_general: calendar_events = [ {"title": ev["title"], "start": ev["start"], "end": ev["end"], "color": ev["color"]} for ev in data_general ] cal_options = { "initialView": "dayGridMonth", "headerToolbar": { "left": "today prev,next", "center": "title", "right": "dayGridDay,dayGridWeek,dayGridMonth", }, "navLinks": True, } calendar(events=calendar_events, options=cal_options, key="general_cal") st.write("Data Table") st.dataframe(pd.DataFrame(data_general), use_container_width=True) else: st.write("No data retrieved. Select event types and click the button.") else: with st.sidebar.expander("Event Type", expanded=True): include_earnings_t = st.checkbox("Include Earnings", value=True) include_dividends_t = st.checkbox("Include Dividends", value=True) include_splits_t = st.checkbox("Include Splits", value=True) with st.sidebar.expander("Parameters", expanded=True): symbol = st.text_input("Symbol", value="AAPL") limit_val_ticker = st.number_input("Limit", value=50, min_value=1) if st.sidebar.button("Retrieve Ticker Calendar", key="fetch_ticker"): ticker_events = [] if include_earnings_t: for item in fetch_earnings_ticker(symbol, limit_val_ticker): date_raw = item.get("date") if not _normalize_date(date_raw): continue time_raw = item.get("time") start_dt, end_dt = parse_time_field(date_raw, time_raw) if not start_dt: continue event_info = { "start": start_dt, "end": end_dt, "title": f"[Earnings] {symbol}", "color": "#3D9DF3", "eventType": "Earnings", } event_info.update(item) ticker_events.append(event_info) if include_dividends_t: hist = fetch_dividends_ticker(symbol).get("historical", []) for item in hist: d = _normalize_date(item.get("date")) if not d: continue event_info = { "start": f"{d}T00:00:00", "end": f"{d}T23:59:59", "title": f"[Dividend] {symbol}", "color": "#80C080", "eventType": "Dividend", } event_info.update(item) ticker_events.append(event_info) if include_splits_t: hist = fetch_splits_ticker(symbol).get("historical", []) for item in hist: d = _normalize_date(item.get("date")) if not d: continue event_info = { "start": f"{d}T00:00:00", "end": f"{d}T23:59:59", "title": f"[Split] {symbol}", "color": "#FFC870", "eventType": "Split", } event_info.update(item) ticker_events.append(event_info) st.session_state["ticker_data"] = ticker_events st.subheader("Ticker Calendar Results") data_ticker = st.session_state["ticker_data"] if data_ticker: calendar_events_t = [ {"title": ev["title"], "start": ev["start"], "end": ev["end"], "color": ev["color"]} for ev in data_ticker ] cal_options_ticker = { "initialView": "dayGridMonth", "headerToolbar": { "left": "today prev,next", "center": "title", "right": "dayGridDay,dayGridWeek,dayGridMonth", }, "navLinks": True, } calendar(events=calendar_events_t, options=cal_options_ticker, key="ticker_cal") st.write("Data Table") st.dataframe(pd.DataFrame(data_ticker), use_container_width=True) else: st.write(" ") if __name__ == "__main__": main() hide_streamlit_style = """ """ st.markdown(hide_streamlit_style, unsafe_allow_html=True)