| |
| import os, io, math, time, warnings |
| warnings.filterwarnings("ignore") |
|
|
| from typing import List, Tuple, Dict, Optional |
|
|
| import numpy as np |
| import pandas as pd |
| import matplotlib.pyplot as plt |
| from PIL import Image |
| import requests |
| import yfinance as yf |
| import gradio as gr |
|
|
| |
| os.environ.setdefault("MPLCONFIGDIR", os.getenv("MPLCONFIGDIR", "/home/user/.config/matplotlib")) |
| os.makedirs(os.environ["MPLCONFIGDIR"], exist_ok=True) |
| for d in [ |
| "/home/user/.cache", |
| "/home/user/.cache/huggingface", |
| "/home/user/.cache/huggingface/hub", |
| "/home/user/.cache/sentencetransformers", |
| ]: |
| os.makedirs(d, exist_ok=True) |
|
|
| |
| DATA_DIR = "data" |
| os.makedirs(DATA_DIR, exist_ok=True) |
|
|
| MAX_TICKERS = 30 |
| DEFAULT_LOOKBACK_YEARS = 10 |
| MARKET_TICKER = "VOO" |
|
|
| SYNTH_ROWS = 1000 |
|
|
| |
| HORIZON_YEARS = 10 |
| RF_CODE = "DGS10" |
| RF_ANN = 0.0375 |
|
|
| |
| def fred_series_for_horizon(years: float) -> str: |
| y = max(1.0, min(100.0, float(years))) |
| if y <= 2: return "DGS2" |
| if y <= 3: return "DGS3" |
| if y <= 5: return "DGS5" |
| if y <= 7: return "DGS7" |
| if y <= 10: return "DGS10" |
| if y <= 20: return "DGS20" |
| return "DGS30" |
|
|
| def fetch_fred_yield_annual(code: str) -> float: |
| url = f"https://fred.stlouisfed.org/graph/fredgraph.csv?id={code}" |
| try: |
| r = requests.get(url, timeout=10) |
| r.raise_for_status() |
| df = pd.read_csv(io.StringIO(r.text)) |
| s = pd.to_numeric(df.iloc[:, 1], errors="coerce").dropna() |
| return float(s.iloc[-1] / 100.0) if len(s) else 0.03 |
| except Exception: |
| return 0.03 |
|
|
| def fetch_prices_monthly(tickers: List[str], years: int) -> pd.DataFrame: |
| tickers = list(dict.fromkeys([t.upper().strip() for t in tickers])) |
| start = (pd.Timestamp.today(tz="UTC") - pd.DateOffset(years=years, days=7)).date() |
| end = pd.Timestamp.today(tz="UTC").date() |
|
|
| df = yf.download( |
| tickers, |
| start=start, |
| end=end, |
| interval="1mo", |
| auto_adjust=True, |
| actions=False, |
| progress=False, |
| group_by="column", |
| threads=False, |
| ) |
|
|
| if isinstance(df, pd.Series): |
| df = df.to_frame() |
|
|
| if isinstance(df.columns, pd.MultiIndex): |
| lvl0 = [str(x) for x in df.columns.get_level_values(0).unique()] |
| if "Close" in lvl0: |
| df = df["Close"] |
| elif "Adj Close" in lvl0: |
| df = df["Adj Close"] |
| else: |
| df = df.xs(df.columns.levels[0][-1], axis=1, level=0, drop_level=True) |
|
|
| cols = [c for c in tickers if c in df.columns] |
| out = df[cols].dropna(how="all").fillna(method="ffill") |
| return out |
|
|
| def monthly_returns(prices: pd.DataFrame) -> pd.DataFrame: |
| return prices.pct_change().dropna() |
|
|
| def yahoo_search(query: str): |
| if not query or not str(query).strip(): |
| return [] |
| url = "https://query1.finance.yahoo.com/v1/finance/search" |
| params = {"q": query.strip(), "quotesCount": 10, "newsCount": 0} |
| headers = {"User-Agent": "Mozilla/5.0"} |
| try: |
| r = requests.get(url, params=params, headers=headers, timeout=10) |
| r.raise_for_status() |
| data = r.json() |
| out = [] |
| for q in data.get("quotes", []): |
| sym = q.get("symbol") |
| name = q.get("shortname") or q.get("longname") or "" |
| exch = q.get("exchDisp") or "" |
| if sym and sym.isascii(): |
| out.append(f"{sym} | {name} | {exch}") |
| if not out: |
| out = [f"{query.strip().upper()} | typed symbol | n/a"] |
| return out[:10] |
| except Exception: |
| return [f"{query.strip().upper()} | typed symbol | n/a"] |
|
|
| def validate_tickers(symbols: List[str], years: int) -> List[str]: |
| base = [s for s in dict.fromkeys([t.upper().strip() for t in symbols]) if s] |
| px = fetch_prices_monthly(base + [MARKET_TICKER], years) |
| ok = [s for s in base if s in px.columns] |
| |
| if MARKET_TICKER not in px.columns: |
| return [] |
| return ok |
|
|
| |
| def get_aligned_monthly_returns(symbols: List[str], years: int) -> pd.DataFrame: |
| uniq = [c for c in dict.fromkeys(symbols) if c != MARKET_TICKER] |
| tickers = uniq + [MARKET_TICKER] |
| px = fetch_prices_monthly(tickers, years) |
| rets = monthly_returns(px) |
| cols = [c for c in uniq if c in rets.columns] + ([MARKET_TICKER] if MARKET_TICKER in rets.columns else []) |
| R = rets[cols].dropna(how="any") |
| return R.loc[:, ~R.columns.duplicated()] |
|
|
| def estimate_all_moments_aligned(symbols: List[str], years: int, rf_ann: float): |
| R = get_aligned_monthly_returns(symbols, years) |
| if MARKET_TICKER not in R.columns or len(R) < 3: |
| raise ValueError("Not enough aligned data with market proxy.") |
|
|
| m = R[MARKET_TICKER] |
| if isinstance(m, pd.DataFrame): |
| m = m.iloc[:, 0].squeeze() |
|
|
| mu_m_ann = float(m.mean() * 12.0) |
| sigma_m_ann = float(m.std(ddof=1) * math.sqrt(12.0)) |
| erp_ann = float(mu_m_ann - rf_ann) |
|
|
| rf_m = rf_ann / 12.0 |
| ex_m = m - rf_m |
| var_m = float(np.var(ex_m.values, ddof=1)) |
| var_m = max(var_m, 1e-9) |
|
|
| betas: Dict[str, float] = {} |
| for s in [c for c in R.columns if c != MARKET_TICKER]: |
| ex_s = R[s] - rf_m |
| cov_sm = float(np.cov(ex_s.values, ex_m.values, ddof=1)[0, 1]) |
| betas[s] = cov_sm / var_m |
| betas[MARKET_TICKER] = 1.0 |
|
|
| |
| asset_cols_all = list(R.columns) |
| cov_m_all = np.cov(R[asset_cols_all].values.T, ddof=1) if asset_cols_all else np.zeros((0, 0)) |
| covA = pd.DataFrame(cov_m_all * 12.0, index=asset_cols_all, columns=asset_cols_all) |
|
|
| return {"betas": betas, "cov_ann": covA, "erp_ann": erp_ann, "sigma_m_ann": sigma_m_ann} |
|
|
| def capm_er(beta: float, rf_ann: float, erp_ann: float) -> float: |
| return float(rf_ann + beta * erp_ann) |
|
|
| def portfolio_stats(weights: Dict[str, float], |
| cov_ann: pd.DataFrame, |
| betas: Dict[str, float], |
| rf_ann: float, |
| erp_ann: float) -> Tuple[float, float, float]: |
| tickers = list(weights.keys()) |
| w = np.array([weights[t] for t in tickers], dtype=float) |
| gross = float(np.sum(np.abs(w))) |
| if gross <= 1e-12: |
| return 0.0, rf_ann, 0.0 |
| w_expo = w / gross |
| beta_p = float(np.dot([betas.get(t, 0.0) for t in tickers], w_expo)) |
| mu_capm = capm_er(beta_p, rf_ann, erp_ann) |
| cov = cov_ann.reindex(index=tickers, columns=tickers).fillna(0.0).to_numpy() |
| sigma_hist = float(max(w_expo.T @ cov @ w_expo, 0.0)) ** 0.5 |
| return beta_p, mu_capm, sigma_hist |
|
|
| def efficient_same_sigma(sigma_target: float, rf_ann: float, erp_ann: float, sigma_mkt: float): |
| if sigma_mkt <= 1e-12: |
| return 0.0, 1.0, rf_ann |
| a = sigma_target / sigma_mkt |
| return a, 1.0 - a, rf_ann + a * erp_ann |
|
|
| def efficient_same_return(mu_target: float, rf_ann: float, erp_ann: float, sigma_mkt: float): |
| if abs(erp_ann) <= 1e-12: |
| return 0.0, 1.0, rf_ann |
| a = (mu_target - rf_ann) / erp_ann |
| return a, 1.0 - a, abs(a) * sigma_mkt |
|
|
| |
| def _pct(x): |
| return np.asarray(x, dtype=float) * 100.0 |
|
|
| def plot_cml(rf_ann, erp_ann, sigma_mkt, |
| sigma_hist_p, mu_capm_p, |
| same_sigma_mu, same_mu_sigma, |
| sugg_sigma_hist=None, sugg_mu_capm=None) -> Image.Image: |
|
|
| fig = plt.figure(figsize=(6.5, 4.3), dpi=120) |
|
|
| xmax = max(0.3, sigma_mkt * 2.4, (sigma_hist_p or 0.0) * 1.6, (sugg_sigma_hist or 0.0) * 1.6) |
| xs = np.linspace(0, xmax, 200) |
| slope = erp_ann / max(sigma_mkt, 1e-9) |
| cml = rf_ann + slope * xs |
|
|
| plt.plot(_pct(xs), _pct(cml), label="CML (Market/Bills)", linewidth=1.8) |
| plt.scatter([_pct(0)], [_pct(rf_ann)], label="Risk-free") |
| plt.scatter([_pct(sigma_mkt)], [_pct(rf_ann + erp_ann)], label="Market") |
|
|
| |
| y_cml_at_sigma_p = rf_ann + slope * max(0.0, float(sigma_hist_p)) |
| y_you = min(float(mu_capm_p), y_cml_at_sigma_p) |
| plt.scatter([_pct(sigma_hist_p)], [_pct(y_you)], label="Your CAPM point") |
|
|
| |
| plt.scatter([_pct(sigma_hist_p)], [_pct(same_sigma_mu)], marker="^", label="Efficient (same σ)") |
| plt.scatter([_pct(same_mu_sigma)], [_pct(mu_capm_p)], marker="^", label="Efficient (same E[r])") |
|
|
| |
| if sugg_sigma_hist is not None and sugg_mu_capm is not None: |
| y_cml_at_sugg = rf_ann + slope * max(0.0, float(sugg_sigma_hist)) |
| y_sugg = min(float(sugg_mu_capm), y_cml_at_sugg) |
| plt.scatter([_pct(sugg_sigma_hist)], [_pct(y_sugg)], label="Selected Suggestion", marker="X", s=60) |
|
|
| plt.xlabel("σ (historical, annualized, %)") |
| plt.ylabel("CAPM E[r] (annual, %)") |
| plt.legend(loc="best", fontsize=8) |
| plt.tight_layout() |
|
|
| buf = io.BytesIO() |
| plt.savefig(buf, format="png") |
| plt.close(fig) |
| buf.seek(0) |
| return Image.open(buf) |
|
|
| |
| def build_synthetic_dataset(universe_user: List[str], |
| covA: pd.DataFrame, |
| betas: Dict[str, float], |
| rf_ann: float, |
| erp_ann: float, |
| sigma_mkt: float, |
| n_rows: int = SYNTH_ROWS) -> pd.DataFrame: |
| """ |
| Generate long-only mixes **from exactly the user's tickers** (VOO included only if the user holds it). |
| """ |
| rng = np.random.default_rng(12345) |
| assets = list(universe_user) |
| if len(assets) == 0: |
| return pd.DataFrame(columns=["tickers", "weights", "beta", "mu_capm", "sigma_hist"]) |
|
|
| rows = [] |
| for _ in range(n_rows): |
| k = int(rng.integers(low=1, high=min(8, len(assets)) + 1)) |
| picks = list(rng.choice(assets, size=k, replace=False)) |
| w = rng.dirichlet(np.ones(k)) |
| beta_p = float(np.dot([betas.get(t, 0.0) for t in picks], w)) |
| mu_capm = capm_er(beta_p, rf_ann, erp_ann) |
| sub = covA.reindex(index=picks, columns=picks).fillna(0.0).to_numpy() |
| sigma_hist = float(max(w.T @ sub @ w, 0.0)) ** 0.5 |
|
|
| rows.append({ |
| "tickers": ",".join(picks), |
| "weights": ",".join(f"{x:.6f}" for x in w), |
| "beta": beta_p, |
| "mu_capm": mu_capm, |
| "sigma_hist": sigma_hist |
| }) |
| return pd.DataFrame(rows) |
|
|
| def _band_bounds(sigma_mkt: float, band: str) -> Tuple[float, float]: |
| band = (band or "Medium").strip().lower() |
| if band.startswith("low"): |
| return 0.0, 0.8 * sigma_mkt |
| if band.startswith("high"): |
| return 1.2 * sigma_mkt, 3.0 * sigma_mkt |
| return 0.8 * sigma_mkt, 1.2 * sigma_mkt |
|
|
| def _exposure_vec(row: pd.Series, universe: List[str]) -> np.ndarray: |
| vec = np.zeros(len(universe)) |
| idx_map = {t: i for i, t in enumerate(universe)} |
| ts = [t.strip() for t in str(row["tickers"]).split(",") if t.strip()] |
| ws = [float(x) for x in str(row["weights"]).split(",")] |
| s = sum(ws) or 1.0 |
| ws = [max(0.0, w) / s for w in ws] |
| for t, w in zip(ts, ws): |
| if t in idx_map: |
| vec[idx_map[t]] = w |
| return vec |
|
|
| def rerank_and_pick_one(df_band: pd.DataFrame, |
| universe: List[str], |
| desired_band: str, |
| alpha: float = 0.6) -> pd.Series: |
| if df_band.empty: |
| return pd.Series(dtype=object) |
|
|
| exp_target = np.ones(len(universe)) |
| exp_target = exp_target / np.sum(exp_target) |
|
|
| embs_ok = True |
| try: |
| from sentence_transformers import SentenceTransformer |
| model = SentenceTransformer("FinLang/finance-embeddings-investopedia") |
| prompt_map = { |
| "low": "low risk conservative diversified stable portfolio", |
| "medium": "balanced medium risk diversified portfolio", |
| "high": "high risk growth aggressive portfolio higher expected return", |
| } |
| prompt = prompt_map.get(desired_band.lower(), prompt_map["medium"]) |
| q = model.encode([prompt]) |
| except Exception: |
| embs_ok = False |
| q = None |
|
|
| def _cos(a, b): |
| an = np.linalg.norm(a) + 1e-12 |
| bn = np.linalg.norm(b) + 1e-12 |
| return float(np.dot(a, b) / (an * bn)) |
|
|
| X_exp = np.stack([_exposure_vec(r, universe) for _, r in df_band.iterrows()], axis=0) |
| exp_sims = np.array([_cos(x, exp_target) for x in X_exp]) |
|
|
| if embs_ok: |
| cand_texts = [] |
| for _, r in df_band.iterrows(): |
| cand_texts.append( |
| f"portfolio with tickers {r['tickers']} having beta {float(r['beta']):.2f}, " |
| f"expected return {float(r['mu_capm']):.3f}, sigma {float(r['sigma_hist']):.3f}" |
| ) |
| C = model.encode(cand_texts) |
| qv = q.reshape(-1) |
| coss = (C @ qv) / (np.linalg.norm(C, axis=1) * (np.linalg.norm(qv) + 1e-12)) |
| coss = np.nan_to_num(coss, nan=0.0) |
| else: |
| coss = np.zeros(len(df_band)) |
|
|
| base = alpha * exp_sims + (1 - alpha) * coss |
| order = np.argsort(-base) |
| best_idx = int(order[0]) |
| return df_band.iloc[best_idx] |
|
|
| def suggest_one_per_band(synth: pd.DataFrame, sigma_mkt: float, universe_user: List[str]) -> Dict[str, pd.Series]: |
| out: Dict[str, pd.Series] = {} |
| for band in ["Low", "Medium", "High"]: |
| lo, hi = _band_bounds(sigma_mkt, band) |
| pool = synth[(synth["sigma_hist"] >= lo) & (synth["sigma_hist"] <= hi)].copy() |
| if pool.empty: |
| if band.lower() == "low": |
| pool = synth.nsmallest(50, "sigma_hist").copy() |
| elif band.lower() == "high": |
| pool = synth.nlargest(50, "sigma_hist").copy() |
| else: |
| tmp = synth.copy() |
| tmp["dist_med"] = (tmp["sigma_hist"] - sigma_mkt).abs() |
| pool = tmp.nsmallest(100, "dist_med").drop(columns=["dist_med"]) |
| chosen = rerank_and_pick_one(pool, universe_user, band) |
| out[band.lower()] = chosen |
| return out |
|
|
| |
| def empty_positions_df(): |
| return pd.DataFrame(columns=["ticker", "amount_usd", "weight_exposure", "beta"]) |
|
|
| def empty_suggestion_df(): |
| return pd.DataFrame(columns=["ticker", "weight_%", "amount_$"]) |
|
|
| def set_horizon(years: float): |
| y = max(1.0, min(100.0, float(years))) |
| code = fred_series_for_horizon(y) |
| rf = fetch_fred_yield_annual(code) |
| global HORIZON_YEARS, RF_CODE, RF_ANN |
| HORIZON_YEARS = y |
| RF_CODE = code |
| RF_ANN = rf |
| |
|
|
| def search_tickers_cb(q: str): |
| opts = yahoo_search(q) |
| note = "Select a symbol and click 'Add selected to portfolio'." if opts else "No matches." |
| return note, gr.update(choices=opts, value=None) |
|
|
| def add_symbol(selection: str, table: Optional[pd.DataFrame]): |
| if not selection: |
| return table if isinstance(table, pd.DataFrame) else pd.DataFrame(columns=["ticker","amount_usd"]), "Pick a row in Matches first." |
| symbol = selection.split("|")[0].strip().upper() |
|
|
| current = [] |
| if isinstance(table, pd.DataFrame) and not table.empty: |
| current = [str(x).upper() for x in table["ticker"].tolist() if str(x) != "nan"] |
| tickers = current if symbol in current else current + [symbol] |
|
|
| val = validate_tickers(tickers, years=DEFAULT_LOOKBACK_YEARS) |
| tickers = [t for t in tickers if t in val] |
|
|
| amt_map = {} |
| if isinstance(table, pd.DataFrame) and not table.empty: |
| for _, r in table.iterrows(): |
| t = str(r.get("ticker", "")).upper() |
| if t in tickers: |
| amt_map[t] = float(pd.to_numeric(r.get("amount_usd", 0.0), errors="coerce") or 0.0) |
|
|
| new_table = pd.DataFrame({"ticker": tickers, "amount_usd": [amt_map.get(t, 0.0) for t in tickers]}) |
| if len(new_table) > MAX_TICKERS: |
| new_table = new_table.iloc[:MAX_TICKERS] |
| return new_table, f"Reached max of {MAX_TICKERS}." |
| return new_table, f"Added {symbol}." |
|
|
| def lock_ticker_column(tb: Optional[pd.DataFrame]): |
| if not isinstance(tb, pd.DataFrame) or tb.empty: |
| return pd.DataFrame(columns=["ticker", "amount_usd"]) |
| tickers = [str(x).upper() for x in tb["ticker"].tolist()] |
| amounts = pd.to_numeric(tb["amount_usd"], errors="coerce").fillna(0.0).tolist() |
| val = validate_tickers(tickers, years=DEFAULT_LOOKBACK_YEARS) |
| tickers = [t for t in tickers if t in val] |
| amounts = amounts[:len(tickers)] + [0.0] * max(0, len(tickers) - len(amounts)) |
| return pd.DataFrame({"ticker": tickers, "amount_usd": amounts}) |
|
|
| |
| UNIVERSE: List[str] = [MARKET_TICKER, "QQQ", "VTI", "SOXX", "IBIT"] |
|
|
| def _holdings_table_from_row(row: pd.Series, budget: float) -> pd.DataFrame: |
| ts = [t.strip() for t in str(row["tickers"]).split(",") if t.strip()] |
| ws = [float(x) for x in str(row["weights"]).split(",")] |
| s = sum(ws) if ws else 1.0 |
| ws = [max(0.0, w) / s for w in ws] |
| return pd.DataFrame( |
| [{"ticker": t, "weight_%": round(w*100.0, 2), "amount_$": round(w*budget, 0)} for t, w in zip(ts, ws)], |
| columns=["ticker", "weight_%", "amount_$"] |
| ) |
|
|
| def compute( |
| years_lookback: int, |
| table: Optional[pd.DataFrame], |
| pick_band_to_show: str |
| ): |
| |
| if isinstance(table, pd.DataFrame): |
| df = table.copy() |
| else: |
| df = pd.DataFrame(columns=["ticker", "amount_usd"]) |
| df = df.dropna(how="all") |
| if "ticker" not in df.columns: df["ticker"] = [] |
| if "amount_usd" not in df.columns: df["amount_usd"] = [] |
| df["ticker"] = df["ticker"].astype(str).str.upper().str.strip() |
| df["amount_usd"] = pd.to_numeric(df["amount_usd"], errors="coerce").fillna(0.0) |
|
|
| symbols = [t for t in df["ticker"].tolist() if t] |
| if len(symbols) == 0: |
| return ( |
| None, "Add at least one ticker.", empty_positions_df(), empty_suggestion_df(), None, |
| "", "", "", |
| None, None, None, None, None, None, None, None, None |
| ) |
|
|
| symbols = validate_tickers(symbols, years_lookback) |
| if len(symbols) == 0: |
| return ( |
| None, "Could not validate any tickers.", empty_positions_df(), empty_suggestion_df(), None, |
| "", "", "", |
| None, None, None, None, None, None, None, None, None |
| ) |
|
|
| global UNIVERSE |
| UNIVERSE = list(sorted(set(symbols)))[:MAX_TICKERS] |
|
|
| df = df[df["ticker"].isin(symbols)].copy() |
| amounts = {r["ticker"]: float(r["amount_usd"]) for _, r in df.iterrows()} |
| rf_ann = RF_ANN |
|
|
| |
| moms = estimate_all_moments_aligned(symbols, years_lookback, rf_ann) |
| betas, covA, erp_ann, sigma_mkt = moms["betas"], moms["cov_ann"], moms["erp_ann"], moms["sigma_m_ann"] |
|
|
| |
| gross = sum(abs(v) for v in amounts.values()) |
| if gross <= 1e-12: |
| return ( |
| None, "All amounts are zero.", empty_positions_df(), empty_suggestion_df(), None, |
| "", "", "", |
| rf_ann, erp_ann, sigma_mkt, None, None, None, None, None, None |
| ) |
| weights = {k: v / gross for k, v in amounts.items()} |
|
|
| |
| beta_p, mu_capm, sigma_hist = portfolio_stats(weights, covA, betas, rf_ann, erp_ann) |
|
|
| |
| a_sigma, b_sigma, mu_eff_same_sigma = efficient_same_sigma(sigma_hist, rf_ann, erp_ann, sigma_mkt) |
| a_mu, b_mu, sigma_eff_same_mu = efficient_same_return(mu_capm, rf_ann, erp_ann, sigma_mkt) |
|
|
| |
| user_universe = list(symbols) |
| synth = build_synthetic_dataset(user_universe, covA, betas, rf_ann, erp_ann, sigma_mkt, n_rows=SYNTH_ROWS) |
| csv_path = os.path.join(DATA_DIR, f"investor_profiles_{int(time.time())}.csv") |
| try: |
| synth.to_csv(csv_path, index=False) |
| except Exception: |
| csv_path = None |
|
|
| picks = suggest_one_per_band(synth, sigma_mkt, user_universe) |
|
|
| def _fmt(row: pd.Series) -> str: |
| if row is None or row.empty: |
| return "No pick available." |
| return f"CAPM E[r] {row['mu_capm']*100:.2f}%, σ(h) {row['sigma_hist']*100:.2f}%" |
|
|
| txt_low = _fmt(picks.get("low", pd.Series(dtype=object))) |
| txt_med = _fmt(picks.get("medium", pd.Series(dtype=object))) |
| txt_high = _fmt(picks.get("high", pd.Series(dtype=object))) |
|
|
| chosen_band = (pick_band_to_show or "Medium").strip().lower() |
| chosen = picks.get(chosen_band, pd.Series(dtype=object)) |
| if chosen is None or chosen.empty: |
| chosen_sigma = None |
| chosen_mu = None |
| sugg_table = empty_suggestion_df() |
| else: |
| chosen_sigma = float(chosen["sigma_hist"]) |
| chosen_mu = float(chosen["mu_capm"]) |
| |
| sugg_table = _holdings_table_from_row(chosen, budget=gross) |
|
|
| pos_table = pd.DataFrame( |
| [{ |
| "ticker": t, |
| "amount_usd": amounts.get(t, 0.0), |
| "weight_exposure": weights.get(t, 0.0), |
| "beta": 1.0 if t == MARKET_TICKER else betas.get(t, np.nan) |
| } for t in symbols], |
| columns=["ticker", "amount_usd", "weight_exposure", "beta"] |
| ) |
|
|
| img = plot_cml( |
| rf_ann, erp_ann, sigma_mkt, |
| sigma_hist, mu_capm, |
| mu_eff_same_sigma, sigma_eff_same_mu, |
| sugg_sigma_hist=chosen_sigma, sugg_mu_capm=chosen_mu |
| ) |
|
|
| |
| info = "\n".join([ |
| "### Inputs", |
| f"- Lookback years {years_lookback}", |
| f"- Horizon years {int(round(HORIZON_YEARS))}", |
| f"- Risk-free {rf_ann:.2%} from {RF_CODE}", |
| f"- Market ERP {erp_ann:.2%}", |
| f"- Market σ (hist) {sigma_mkt:.2%}", |
| "", |
| "### Your portfolio", |
| f"- CAPM E[r] {mu_capm:.2%}", |
| f"- σ (historical) {sigma_hist:.2%}", |
| "", |
| "### Efficient market/bills mixes (replication weights)", |
| f"- **Same σ as your portfolio** → Market weight **{a_sigma:.2f}**, Bills weight **{b_sigma:.2f}** → E[r] **{mu_eff_same_sigma:.2%}**", |
| f"- **Same E[r] as your portfolio** → Market weight **{a_mu:.2f}**, Bills weight **{b_mu:.2f}** → σ **{sigma_eff_same_mu:.2%}**", |
| "", |
| "_How to replicate:_ use a broad market ETF (e.g., VOO) for the **Market** leg and a T-bill/money-market fund for **Bills**. ", |
| "Weights can be >1 or negative (e.g., Market > 1 and Bills < 0 implies leverage/borrowing). ", |
| "If leverage isn’t allowed, scale both weights proportionally toward 1.0 to fit your constraints.", |
| ]) |
| |
|
|
| return ( |
| img, info, pos_table, sugg_table, csv_path, |
| txt_low, txt_med, txt_high, |
| rf_ann, erp_ann, sigma_mkt, sigma_hist, mu_capm, mu_eff_same_sigma, sigma_eff_same_mu, |
| chosen_sigma, chosen_mu |
| ) |
|
|
| |
| with gr.Blocks(title="Efficient Portfolio Advisor") as demo: |
| gr.Markdown( |
| "## Efficient Portfolio Advisor\n" |
| "Search symbols, enter **dollar amounts**, set horizon. Returns use Yahoo Finance monthly data; risk-free from FRED. " |
| "Plot shows **your CAPM point on the CML** plus efficient market/bills points." |
| ) |
|
|
| with gr.Row(): |
| with gr.Column(scale=1): |
| q = gr.Textbox(label="Search symbol") |
| search_note = gr.Markdown() |
| matches = gr.Dropdown(choices=[], label="Matches") |
| with gr.Row(): |
| search_btn = gr.Button("Search") |
| add_btn = gr.Button("Add selected to portfolio") |
|
|
| gr.Markdown("### Portfolio positions") |
| table = gr.Dataframe( |
| headers=["ticker", "amount_usd"], |
| datatype=["str", "number"], |
| row_count=0, |
| col_count=(2, "fixed") |
| ) |
|
|
| horizon = gr.Number(label="Horizon in years (1–100)", value=HORIZON_YEARS, precision=0) |
| lookback = gr.Slider(1, 15, value=DEFAULT_LOOKBACK_YEARS, step=1, label="Lookback years for betas & covariances") |
|
|
| gr.Markdown("### Suggestions") |
| with gr.Row(): |
| btn_low = gr.Button("Show Low") |
| btn_med = gr.Button("Show Medium") |
| btn_high = gr.Button("Show High") |
| low_txt = gr.Markdown() |
| med_txt = gr.Markdown() |
| high_txt = gr.Markdown() |
|
|
| run_btn = gr.Button("Compute (build dataset & suggest)") |
| with gr.Column(scale=1): |
| plot = gr.Image(label="Capital Market Line (CAPM)", type="pil") |
| summary = gr.Markdown(label="Inputs & Results") |
| positions = gr.Dataframe( |
| label="Computed positions", |
| headers=["ticker", "amount_usd", "weight_exposure", "beta"], |
| datatype=["str", "number", "number", "number"], |
| col_count=(4, "fixed"), |
| value=empty_positions_df(), |
| interactive=False |
| ) |
| sugg_table = gr.Dataframe( |
| label="Selected suggestion holdings (% / $)", |
| headers=["ticker", "weight_%", "amount_$"], |
| datatype=["str", "number", "number"], |
| col_count=(3, "fixed"), |
| value=empty_suggestion_df(), |
| interactive=False |
| ) |
| dl = gr.File(label="Generated dataset CSV", value=None, visible=True) |
|
|
| |
| search_btn.click(fn=search_tickers_cb, inputs=q, outputs=[search_note, matches]) |
| add_btn.click(fn=add_symbol, inputs=[matches, table], outputs=[table, search_note]) |
| table.change(fn=lock_ticker_column, inputs=table, outputs=table) |
|
|
| |
| horizon.change(fn=set_horizon, inputs=horizon, outputs=[]) |
|
|
| |
| run_btn.click( |
| fn=compute, |
| inputs=[lookback, table, gr.State("Medium")], |
| outputs=[ |
| plot, summary, positions, sugg_table, dl, |
| low_txt, med_txt, high_txt, |
| gr.State(), gr.State(), gr.State(), gr.State(), gr.State(), gr.State(), gr.State(), |
| gr.State(), gr.State() |
| ] |
| ) |
|
|
| |
| btn_low.click( |
| fn=compute, |
| inputs=[lookback, table, gr.State("Low")], |
| outputs=[ |
| plot, summary, positions, sugg_table, dl, |
| low_txt, med_txt, high_txt, |
| gr.State(), gr.State(), gr.State(), gr.State(), gr.State(), gr.State(), gr.State(), |
| gr.State(), gr.State() |
| ] |
| ) |
| btn_med.click( |
| fn=compute, |
| inputs=[lookback, table, gr.State("Medium")], |
| outputs=[ |
| plot, summary, positions, sugg_table, dl, |
| low_txt, med_txt, high_txt, |
| gr.State(), gr.State(), gr.State(), gr.State(), gr.State(), gr.State(), gr.State(), |
| gr.State(), gr.State() |
| ] |
| ) |
| btn_high.click( |
| fn=compute, |
| inputs=[lookback, table, gr.State("High")], |
| outputs=[ |
| plot, summary, positions, sugg_table, dl, |
| low_txt, med_txt, high_txt, |
| gr.State(), gr.State(), gr.State(), gr.State(), gr.State(), gr.State(), gr.State(), |
| gr.State(), gr.State() |
| ] |
| ) |
|
|
| |
| RF_CODE = fred_series_for_horizon(HORIZON_YEARS) |
| RF_ANN = fetch_fred_yield_annual(RF_CODE) |
|
|
| if __name__ == "__main__": |
| demo.launch(server_name="0.0.0.0", server_port=7860, show_api=False) |
|
|