| from datetime import datetime, timedelta |
| from functools import lru_cache |
| import time |
|
|
| import gradio as gr |
| import numpy as np |
| import pandas as pd |
| import yfinance as yf |
|
|
| APP_VERSION = "0.1.1" |
| RETRY_ATTEMPTS = 3 |
| RETRY_DELAY_SECONDS = 1.5 |
| option_chain_cache: dict[tuple[str, str], tuple[object, str]] = {} |
| price_cache: dict[str, tuple[float, str]] = {} |
|
|
|
|
| @lru_cache(maxsize=128) |
| def get_ticker(symb: str) -> yf.Ticker: |
| return yf.Ticker(symb) |
|
|
|
|
| @lru_cache(maxsize=128) |
| def get_expirations(symb: str) -> tuple[str, ...]: |
| return tuple(get_ticker(symb).options) |
|
|
|
|
| def get_option_chain_with_retry(ticker: yf.Ticker, target_time: str): |
| last_error = None |
| for attempt in range(RETRY_ATTEMPTS): |
| try: |
| return ticker.option_chain(target_time) |
| except Exception as exc: |
| last_error = exc |
| if attempt < RETRY_ATTEMPTS - 1: |
| time.sleep(RETRY_DELAY_SECONDS * (attempt + 1)) |
| raise last_error |
|
|
|
|
| def get_option_chain(symb: str, target_time: str, force_refresh: bool = False): |
| cache_key = (symb, target_time) |
| if not force_refresh and cache_key in option_chain_cache: |
| return option_chain_cache[cache_key] |
|
|
| ticker = get_ticker(symb) |
| opt_chain = get_option_chain_with_retry(ticker, target_time) |
| downloaded_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| option_chain_cache[cache_key] = (opt_chain, downloaded_at) |
| return opt_chain, downloaded_at |
|
|
|
|
| def get_current_price(symb: str, force_refresh: bool = False) -> tuple[float, str]: |
| if not force_refresh and symb in price_cache: |
| return price_cache[symb] |
|
|
| ticker = get_ticker(symb) |
| current_price = ticker.fast_info["lastPrice"] |
| downloaded_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| price_cache[symb] = (current_price, downloaded_at) |
| return current_price, downloaded_at |
|
|
|
|
| def conf_int_ind( |
| symb: str, target_time: str | None = None, force_refresh: bool = False |
| ) -> tuple[pd.DataFrame, list[str]]: |
| ticker = get_ticker(symb) |
| expirations = get_expirations(symb) |
| if not expirations: |
| raise ValueError(f"No option expirations found for {symb}.") |
|
|
| df_time = pd.to_datetime(expirations) |
| if target_time is None: |
| target_time_dt = df_time[0] |
| else: |
| target_time_dt = pd.to_datetime(target_time) |
| valid_times = df_time[df_time <= target_time_dt] |
| if valid_times.empty: |
| raise ValueError( |
| f"No expiration on or before {target_time_dt.strftime('%Y-%m-%d')} for {symb}." |
| ) |
| target_time_dt = valid_times[-1] |
|
|
| target_time_r = target_time_dt.strftime("%Y-%m-%d") |
| time_now = datetime.now().strftime("%Y-%m-%d") |
| t_days = (pd.to_datetime(target_time_r) - pd.to_datetime(time_now)).days + 1 |
|
|
| opt_chain, option_downloaded_at = get_option_chain(symb, target_time_r, force_refresh) |
| current_price, price_downloaded_at = get_current_price(symb, force_refresh) |
|
|
| lop = opt_chain.puts.strike[~opt_chain.puts.inTheMoney].iloc[-1] |
| hip = opt_chain.puts.strike[opt_chain.puts.inTheMoney].iloc[0] |
| iv1p = opt_chain.puts.loc[opt_chain.puts.strike == lop, "impliedVolatility"].item() |
| iv2p = opt_chain.puts.loc[opt_chain.puts.strike == hip, "impliedVolatility"].item() |
| ivp = (iv1p + iv2p) / 2 |
| sdp = ivp * np.sqrt(t_days / 365) |
|
|
| loca = opt_chain.calls.strike[opt_chain.calls.inTheMoney].iloc[-1] |
| hica = opt_chain.calls.strike[~opt_chain.calls.inTheMoney].iloc[0] |
| iv1c = opt_chain.calls.loc[opt_chain.calls.strike == loca, "impliedVolatility"].item() |
| iv2c = opt_chain.calls.loc[opt_chain.calls.strike == hica, "impliedVolatility"].item() |
| ivc = (iv1c + iv2c) / 2 |
| sdc = ivc * np.sqrt(t_days / 365) |
|
|
| row = { |
| "symbol": [ticker.info["symbol"]], |
| "days": [f"{target_time_r}: ({t_days}) days"], |
| "-2.5%(p)": [current_price * (1 - 2 * sdp)], |
| "-6.5%(p)": [current_price * (1 - 1.5 * sdp)], |
| "-16.5%(p)": [current_price * (1 - 1 * sdp)], |
| "current": [current_price], |
| "+16.5%(c)": [current_price * (1 + 1 * sdc)], |
| "+6.5%(c)": [current_price * (1 + 1.5 * sdc)], |
| "+2.5%(c)": [current_price * (1 + 2 * sdc)], |
| } |
|
|
| return pd.DataFrame(row).set_index("symbol"), [option_downloaded_at, price_downloaded_at] |
|
|
|
|
| def conf_int_duo( |
| symb: str, target_time: str | None = None, force_refresh: bool = False |
| ) -> tuple[pd.DataFrame, list[str]]: |
| if target_time is None: |
| expirations = get_expirations(symb) |
| df_time = pd.to_datetime(expirations) |
| recent_mask = (df_time - datetime.now() <= timedelta(days=14)) & (df_time >= datetime.now()) |
| dlist = list(df_time[recent_mask].strftime("%Y-%m-%d")) |
| if not dlist: |
| return conf_int_ind(symb, None, force_refresh) |
| results = [conf_int_ind(symb, d, force_refresh) for d in dlist] |
| frames = [result[0] for result in results] |
| timestamps = [stamp for result in results for stamp in result[1]] |
| return pd.concat(frames), timestamps |
|
|
| return conf_int_ind(symb, target_time, force_refresh) |
|
|
|
|
| def conf_int( |
| symblist: str | list[str], target_time: str | None = None, force_refresh: bool = False |
| ) -> tuple[pd.DataFrame, list[str]]: |
| if isinstance(symblist, str): |
| return conf_int_duo(symblist, target_time, force_refresh) |
| if isinstance(symblist, list): |
| results = [conf_int_duo(symb, target_time, force_refresh) for symb in symblist] |
| frames = [result[0] for result in results] |
| timestamps = [stamp for result in results for stamp in result[1]] |
| return pd.concat(frames), timestamps |
| raise TypeError("symblist must be a string or a list of strings.") |
|
|
|
|
| def parse_symbols(symbs_text: str) -> list[str]: |
| symbs = [item.strip().upper() for item in symbs_text.replace("\n", ",").split(",")] |
| symbs = [item for item in symbs if item] |
| if not symbs: |
| raise gr.Error("Provide at least one symbol.") |
| return symbs |
|
|
|
|
| def run_app(symbs_text: str, disable_target_time: bool, target_time: str): |
| symbs = parse_symbols(symbs_text) |
| resolved_target_time = None if disable_target_time else (target_time or None) |
| if not disable_target_time and resolved_target_time is None: |
| raise gr.Error("Provide target_time in YYYY-MM-DD format, or disable it.") |
|
|
| try: |
| df, download_timestamps = conf_int(symbs, resolved_target_time, True) |
| except Exception as exc: |
| raise gr.Error(str(exc)) from exc |
|
|
| latest_download = max(download_timestamps) if download_timestamps else "Unknown" |
| return df.reset_index(), f"Last data download: {latest_download}" |
|
|
|
|
| def toggle_target_time(disable_target_time: bool): |
| return gr.update(interactive=not disable_target_time, value="" if disable_target_time else None) |
|
|
|
|
| with gr.Blocks(title="Options Confidence Interval") as demo: |
| gr.Markdown("## Options Confidence Interval") |
| gr.Markdown(f"Version: `{APP_VERSION}`") |
| gr.Markdown("Enter one or more symbols. Use a comma or a new line to separate multiple symbols.") |
|
|
| with gr.Row(): |
| symbs_input = gr.Textbox( |
| label="symbs", |
| lines=3, |
| value="SOXL, SPY", |
| placeholder="SOXL, SPY", |
| ) |
| with gr.Column(): |
| disable_target_time_input = gr.Checkbox( |
| label="Disable target_time to include all expiration dates within 14 days", |
| value=True, |
| ) |
| target_time_input = gr.Textbox( |
| label="target_time", |
| placeholder="YYYY-MM-DD", |
| interactive=False, |
| ) |
|
|
| submit_btn = gr.Button("Run") |
| output_df = gr.Dataframe(label="Result") |
| refresh_timestamp = gr.Textbox(label="Data download timestamp", interactive=False) |
|
|
| disable_target_time_input.change( |
| toggle_target_time, |
| inputs=disable_target_time_input, |
| outputs=target_time_input, |
| ) |
| submit_btn.click( |
| run_app, |
| inputs=[symbs_input, disable_target_time_input, target_time_input], |
| outputs=[output_df, refresh_timestamp], |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| demo.launch(ssr_mode=False, share=True) |
|
|