Spaces:
Sleeping
Sleeping
| import os | |
| import datetime | |
| import requests | |
| import pandas as pd | |
| import streamlit as st | |
| from streamlit_calendar import calendar | |
| API_KEY = os.getenv("FMP_API_KEY") | |
| # ---------- Utilities ---------- | |
| def _normalize_date(date_val): | |
| """Return YYYY-MM-DD or None if invalid.""" | |
| if isinstance(date_val, datetime.datetime): | |
| return date_val.date().strftime("%Y-%m-%d") | |
| if isinstance(date_val, datetime.date): | |
| return date_val.strftime("%Y-%m-%d") | |
| if isinstance(date_val, str): | |
| s = date_val.strip() | |
| if not s: | |
| return None | |
| # handle strings like "2025-08-18 00:00:00" | |
| if len(s) >= 10: | |
| return s[:10] | |
| return None | |
| def parse_time_field(date_val, time_val): | |
| """ | |
| Convert an input date plus a 'time' label into ISO start/end strings. | |
| Handles None/missing time safely. Returns (start_iso, end_iso) or (None, None) if date invalid. | |
| """ | |
| date_str = _normalize_date(date_val) | |
| if not date_str: | |
| return None, None | |
| t = (time_val or "").strip().lower() | |
| # If a clock time is provided (e.g., "09:30" or "09:30:00") | |
| if ":" in t: | |
| # normalize to HH:MM:SS if we can | |
| parts = t.split(":") | |
| if len(parts) == 2: | |
| t = f"{t}:00" | |
| elif len(parts) >= 3: | |
| t = ":".join(parts[:3]) | |
| else: | |
| t = "00:00:00" | |
| chosen = t | |
| else: | |
| # Common labels from FMP (and synonyms) | |
| time_map = { | |
| "bmo": "06:00:00", "before market": "06:00:00", "pre-market": "06:00:00", | |
| "pre market": "07:00:00", # keep your original mapping too | |
| "amc": "18:00:00", "after market": "18:00:00", "post-market": "18:00:00", | |
| "post market": "16:00:00", # keep original | |
| "during market": "10:00:00", | |
| } | |
| chosen = time_map.get(t, "00:00:00") | |
| start_iso = f"{date_str}T{chosen}" | |
| end_iso = f"{date_str}T{chosen}" | |
| return start_iso, end_iso | |
| # ---------- API calls ---------- | |
| def fetch_earnings(from_date, to_date, limit): | |
| url = ( | |
| "https://financialmodelingprep.com/api/v4/earning-calendar-confirmed" | |
| f"?from={_normalize_date(from_date)}&to={_normalize_date(to_date)}&limit={int(limit)}&apikey={API_KEY}" | |
| ) | |
| r = requests.get(url) | |
| return r.json() if r.status_code == 200 else [] | |
| def fetch_dividends(from_date, to_date): | |
| url = ( | |
| "https://financialmodelingprep.com/api/v3/stock_dividend_calendar" | |
| f"?from={_normalize_date(from_date)}&to={_normalize_date(to_date)}&apikey={API_KEY}" | |
| ) | |
| r = requests.get(url) | |
| return r.json() if r.status_code == 200 else [] | |
| def fetch_splits(from_date, to_date): | |
| url = ( | |
| "https://financialmodelingprep.com/api/v3/stock_split_calendar" | |
| f"?from={_normalize_date(from_date)}&to={_normalize_date(to_date)}&apikey={API_KEY}" | |
| ) | |
| r = requests.get(url) | |
| return r.json() if r.status_code == 200 else [] | |
| def fetch_earnings_ticker(symbol, limit): | |
| url = ( | |
| "https://financialmodelingprep.com/api/v3/historical/earning_calendar/" | |
| f"{symbol}?limit={int(limit)}&apikey={API_KEY}" | |
| ) | |
| r = requests.get(url) | |
| return r.json() if r.status_code == 200 else [] | |
| def fetch_dividends_ticker(symbol): | |
| url = ( | |
| "https://financialmodelingprep.com/api/v3/historical-price-full/stock_dividend/" | |
| f"{symbol}?apikey={API_KEY}" | |
| ) | |
| r = requests.get(url) | |
| return r.json() if r.status_code == 200 else {} | |
| def fetch_splits_ticker(symbol): | |
| url = ( | |
| "https://financialmodelingprep.com/api/v3/historical-price-full/stock_split/" | |
| f"{symbol}?apikey={API_KEY}" | |
| ) | |
| r = requests.get(url) | |
| return r.json() if r.status_code == 200 else {} | |
| # ---------- App ---------- | |
| def main(): | |
| st.set_page_config(page_title="Earnings, Dividends, and Splits Calendar", layout="wide") | |
| if not API_KEY: | |
| st.warning("FMP_API_KEY is not set. Please add it in your Space secrets.") | |
| if "general_data" not in st.session_state: | |
| st.session_state["general_data"] = [] | |
| if "ticker_data" not in st.session_state: | |
| st.session_state["ticker_data"] = [] | |
| st.title("Corporate Events Calendar") | |
| st.write("This calendar shows earnings, dividends, and stock splits. Set parameters and click run.") | |
| st.sidebar.title("Input Parameters") | |
| with st.sidebar.expander("How to Use", expanded=False): | |
| st.write( | |
| """ | |
| 1. Select "General Calendar" or "Ticker Calendar." | |
| 2. Check which event types you want (earnings, dividends, splits). | |
| 3. For the general view, choose a date range and limit. | |
| 4. For the ticker view, choose a symbol and limit. | |
| 5. Click the button to see results. | |
| """ | |
| ) | |
| with st.sidebar.expander("", expanded=True): | |
| page_choice = st.radio("Page", ["General Calendar", "Ticker Calendar"]) | |
| today = datetime.date.today() | |
| one_month_later = today + datetime.timedelta(days=30) | |
| if page_choice == "General Calendar": | |
| with st.sidebar.expander("Event Type", expanded=True): | |
| include_earnings = st.checkbox("Include Earnings", value=True) | |
| include_dividends = st.checkbox("Include Dividends", value=True) | |
| include_splits = st.checkbox("Include Stock Splits", value=True) | |
| with st.sidebar.expander("Parameters", expanded=True): | |
| from_date = st.date_input("From Date", value=today) | |
| to_date = st.date_input("To Date", value=one_month_later) | |
| limit_val = st.number_input("Limit", value=200, min_value=1) | |
| if st.sidebar.button("Retrieve Calendar", key="fetch_general"): | |
| all_events = [] | |
| if include_earnings: | |
| for item in fetch_earnings(from_date, to_date, limit_val): | |
| date_raw = item.get("date") | |
| if not _normalize_date(date_raw): | |
| continue # skip invalid date | |
| time_raw = item.get("time") | |
| start_dt, end_dt = parse_time_field(date_raw, time_raw) | |
| if not start_dt: | |
| continue | |
| sym = item.get("symbol", "") | |
| event_entry = { | |
| "start": start_dt, | |
| "end": end_dt, | |
| "title": f"[Earnings] {sym}", | |
| "color": "#3D9DF3", | |
| "eventType": "Earnings", | |
| } | |
| event_entry.update(item) | |
| all_events.append(event_entry) | |
| if include_dividends: | |
| for item in fetch_dividends(from_date, to_date): | |
| date_raw = item.get("date") | |
| d = _normalize_date(date_raw) | |
| if not d: | |
| continue | |
| event_entry = { | |
| "start": f"{d}T00:00:00", | |
| "end": f"{d}T23:59:59", | |
| "title": f"[Dividend] {item.get('symbol','')}", | |
| "color": "#80C080", | |
| "eventType": "Dividend", | |
| } | |
| event_entry.update(item) | |
| all_events.append(event_entry) | |
| if include_splits: | |
| for item in fetch_splits(from_date, to_date): | |
| date_raw = item.get("date") | |
| d = _normalize_date(date_raw) | |
| if not d: | |
| continue | |
| event_entry = { | |
| "start": f"{d}T00:00:00", | |
| "end": f"{d}T23:59:59", | |
| "title": f"[Split] {item.get('symbol','')}", | |
| "color": "#FFC870", | |
| "eventType": "Split", | |
| } | |
| event_entry.update(item) | |
| all_events.append(event_entry) | |
| st.session_state["general_data"] = all_events | |
| st.subheader("General Calendar Results") | |
| data_general = st.session_state["general_data"] | |
| if data_general: | |
| calendar_events = [ | |
| {"title": ev["title"], "start": ev["start"], "end": ev["end"], "color": ev["color"]} | |
| for ev in data_general | |
| ] | |
| cal_options = { | |
| "initialView": "dayGridMonth", | |
| "headerToolbar": { | |
| "left": "today prev,next", | |
| "center": "title", | |
| "right": "dayGridDay,dayGridWeek,dayGridMonth", | |
| }, | |
| "navLinks": True, | |
| } | |
| calendar(events=calendar_events, options=cal_options, key="general_cal") | |
| st.write("Data Table") | |
| st.dataframe(pd.DataFrame(data_general), use_container_width=True) | |
| else: | |
| st.write("No data retrieved. Select event types and click the button.") | |
| else: | |
| with st.sidebar.expander("Event Type", expanded=True): | |
| include_earnings_t = st.checkbox("Include Earnings", value=True) | |
| include_dividends_t = st.checkbox("Include Dividends", value=True) | |
| include_splits_t = st.checkbox("Include Splits", value=True) | |
| with st.sidebar.expander("Parameters", expanded=True): | |
| symbol = st.text_input("Symbol", value="AAPL") | |
| limit_val_ticker = st.number_input("Limit", value=50, min_value=1) | |
| if st.sidebar.button("Retrieve Ticker Calendar", key="fetch_ticker"): | |
| ticker_events = [] | |
| if include_earnings_t: | |
| for item in fetch_earnings_ticker(symbol, limit_val_ticker): | |
| date_raw = item.get("date") | |
| if not _normalize_date(date_raw): | |
| continue | |
| time_raw = item.get("time") | |
| start_dt, end_dt = parse_time_field(date_raw, time_raw) | |
| if not start_dt: | |
| continue | |
| event_info = { | |
| "start": start_dt, | |
| "end": end_dt, | |
| "title": f"[Earnings] {symbol}", | |
| "color": "#3D9DF3", | |
| "eventType": "Earnings", | |
| } | |
| event_info.update(item) | |
| ticker_events.append(event_info) | |
| if include_dividends_t: | |
| hist = fetch_dividends_ticker(symbol).get("historical", []) | |
| for item in hist: | |
| d = _normalize_date(item.get("date")) | |
| if not d: | |
| continue | |
| event_info = { | |
| "start": f"{d}T00:00:00", | |
| "end": f"{d}T23:59:59", | |
| "title": f"[Dividend] {symbol}", | |
| "color": "#80C080", | |
| "eventType": "Dividend", | |
| } | |
| event_info.update(item) | |
| ticker_events.append(event_info) | |
| if include_splits_t: | |
| hist = fetch_splits_ticker(symbol).get("historical", []) | |
| for item in hist: | |
| d = _normalize_date(item.get("date")) | |
| if not d: | |
| continue | |
| event_info = { | |
| "start": f"{d}T00:00:00", | |
| "end": f"{d}T23:59:59", | |
| "title": f"[Split] {symbol}", | |
| "color": "#FFC870", | |
| "eventType": "Split", | |
| } | |
| event_info.update(item) | |
| ticker_events.append(event_info) | |
| st.session_state["ticker_data"] = ticker_events | |
| st.subheader("Ticker Calendar Results") | |
| data_ticker = st.session_state["ticker_data"] | |
| if data_ticker: | |
| calendar_events_t = [ | |
| {"title": ev["title"], "start": ev["start"], "end": ev["end"], "color": ev["color"]} | |
| for ev in data_ticker | |
| ] | |
| cal_options_ticker = { | |
| "initialView": "dayGridMonth", | |
| "headerToolbar": { | |
| "left": "today prev,next", | |
| "center": "title", | |
| "right": "dayGridDay,dayGridWeek,dayGridMonth", | |
| }, | |
| "navLinks": True, | |
| } | |
| calendar(events=calendar_events_t, options=cal_options_ticker, key="ticker_cal") | |
| st.write("Data Table") | |
| st.dataframe(pd.DataFrame(data_ticker), use_container_width=True) | |
| else: | |
| st.write(" ") | |
| if __name__ == "__main__": | |
| main() | |
| hide_streamlit_style = """ | |
| <style> | |
| #MainMenu {visibility: hidden;} | |
| footer {visibility: hidden;} | |
| </style> | |
| """ | |
| st.markdown(hide_streamlit_style, unsafe_allow_html=True) | |