| | import os, io, math, json, traceback, warnings |
| | warnings.filterwarnings("ignore") |
| |
|
| | from typing import List, Tuple, Dict, Optional |
| |
|
| | import numpy as np |
| | import pandas as pd |
| | import matplotlib.pyplot as plt |
| | from PIL import Image |
| | import gradio as gr |
| | import requests |
| | import yfinance as yf |
| |
|
| | from sentence_transformers import SentenceTransformer, util as st_util |
| |
|
| | |
| | |
| | |
| | DATA_DIR = "data" |
| | os.makedirs(DATA_DIR, exist_ok=True) |
| |
|
| | DEFAULT_LOOKBACK_YEARS = 5 |
| | MAX_TICKERS = 30 |
| | MARKET_TICKER = "VOO" |
| | BILLS_TICKER = "BILLS" |
| |
|
| | EMBED_MODEL_NAME = "BAAI/bge-base-en-v1.5" |
| |
|
| | POS_COLS = ["ticker", "amount_usd", "weight_exposure", "beta"] |
| | SUG_COLS = ["ticker", "weight_%", "amount_$"] |
| | EFF_COLS = ["asset", "weight_%", "amount_$"] |
| |
|
| | N_SYNTH = 1000 |
| | MMR_K = 40 |
| | MMR_LAMBDA = 0.65 |
| |
|
| | DEBUG = True |
| |
|
| | |
| | FRED_MAP = [ |
| | (1, "DGS1"), |
| | (2, "DGS2"), |
| | (3, "DGS3"), |
| | (5, "DGS5"), |
| | (7, "DGS7"), |
| | (10, "DGS10"), |
| | (20, "DGS20"), |
| | (30, "DGS30"), |
| | (100,"DGS30"), |
| | ] |
| |
|
| | def fred_series_for_horizon(years: float) -> str: |
| | y = max(1.0, min(100.0, float(years))) |
| | for cutoff, code in FRED_MAP: |
| | if y <= cutoff: |
| | return code |
| | return "DGS30" |
| |
|
| | def fetch_fred_yield_annual(code: str) -> float: |
| | url = f"https://fred.stlouisfed.org/graph/fredgraph.csv?id={code}" |
| | try: |
| | r = requests.get(url, timeout=10) |
| | r.raise_for_status() |
| | df = pd.read_csv(io.StringIO(r.text)) |
| | s = pd.to_numeric(df.iloc[:, 1], errors="coerce").dropna() |
| | return float(s.iloc[-1] / 100.0) if len(s) else 0.03 |
| | except Exception: |
| | return 0.03 |
| |
|
| | |
| | |
| | |
| | def _to_cols_close(df: pd.DataFrame, tickers: List[str]) -> pd.DataFrame: |
| | """ |
| | Coerce yfinance download to single-level columns of closes/adj closes. |
| | Handles Series, single-level, and MultiIndex frames safely. |
| | """ |
| | if df is None or df.empty: |
| | return pd.DataFrame() |
| |
|
| | |
| | if isinstance(df, pd.Series): |
| | df = df.to_frame("Close") |
| |
|
| | |
| | if isinstance(df.columns, pd.MultiIndex): |
| | fields = df.columns.get_level_values(1).unique().tolist() |
| | field = "Adj Close" if "Adj Close" in fields else ("Close" if "Close" in fields else fields[0]) |
| | out = {} |
| | for t in dict.fromkeys(tickers): |
| | col = (t, field) |
| | if col in df.columns: |
| | out[t] = pd.to_numeric(df[col], errors="coerce") |
| | return pd.DataFrame(out) |
| |
|
| | |
| | if "Adj Close" in df.columns: |
| | col = pd.to_numeric(df["Adj Close"], errors="coerce") |
| | col.name = tickers[0] if tickers else "SINGLE" |
| | return col.to_frame() |
| | if "Close" in df.columns: |
| | col = pd.to_numeric(df["Close"], errors="coerce") |
| | col.name = tickers[0] if tickers else "SINGLE" |
| | return col.to_frame() |
| |
|
| | |
| | num_cols = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])] |
| | if num_cols: |
| | col = pd.to_numeric(df[num_cols[0]], errors="coerce") |
| | col.name = tickers[0] if tickers else "SINGLE" |
| | return col.to_frame() |
| |
|
| | return pd.DataFrame() |
| |
|
| | def fetch_prices_monthly(tickers: List[str], years: int) -> pd.DataFrame: |
| | tickers = [t for t in dict.fromkeys(tickers) if t] |
| | if not tickers: |
| | return pd.DataFrame() |
| |
|
| | start = (pd.Timestamp.today(tz="UTC") - pd.DateOffset(years=int(years), days=7)).date() |
| | end = pd.Timestamp.today(tz="UTC").date() |
| |
|
| | df_raw = yf.download( |
| | tickers, start=start, end=end, |
| | interval="1mo", auto_adjust=True, progress=False, group_by="ticker", |
| | threads=True, |
| | ) |
| | df = _to_cols_close(df_raw, tickers) |
| | if df.empty: |
| | return df |
| | df = df.dropna(how="all").fillna(method="ffill") |
| | |
| | keep = [t for t in tickers if t in df.columns] |
| | if not keep and df.shape[1] == 1: |
| | |
| | df.columns = [tickers[0]] |
| | keep = [tickers[0]] |
| | return df[keep] if keep else pd.DataFrame() |
| |
|
| | def monthly_returns(prices: pd.DataFrame) -> pd.DataFrame: |
| | if prices is None or prices.empty: |
| | return pd.DataFrame() |
| | return prices.pct_change().dropna(how="all") |
| |
|
| | def validate_tickers(symbols: List[str], years: int) -> List[str]: |
| | """Return subset of symbols that have monthly data.""" |
| | symbols = [s.strip().upper() for s in symbols if s and isinstance(s, str)] |
| | if not symbols: |
| | return [] |
| | base = [s for s in symbols if s != MARKET_TICKER] |
| | px = fetch_prices_monthly(base + [MARKET_TICKER], years) |
| | if px.empty: |
| | return [s for s in symbols if s == MARKET_TICKER] |
| | ok = [s for s in symbols if s in px.columns] |
| | return ok |
| |
|
| | |
| | |
| | |
| | def annualize_mean(m): return np.asarray(m, dtype=float) * 12.0 |
| | def annualize_sigma(s): return np.asarray(s, dtype=float) * math.sqrt(12.0) |
| |
|
| | def get_aligned_monthly_returns(symbols: List[str], years: int) -> pd.DataFrame: |
| | uniq = [c for c in dict.fromkeys(symbols)] |
| | if MARKET_TICKER not in uniq: |
| | uniq.append(MARKET_TICKER) |
| | px = fetch_prices_monthly(uniq, years) |
| | rets = monthly_returns(px) |
| | if rets.empty: |
| | return pd.DataFrame() |
| | cols = [c for c in uniq if c in rets.columns] |
| | R = rets[cols].dropna(how="any") |
| | return R.loc[:, ~R.columns.duplicated()] |
| |
|
| | def estimate_all_moments_aligned(symbols: List[str], years: int, rf_ann: float): |
| | R = get_aligned_monthly_returns(symbols + [MARKET_TICKER], years) |
| | if R.empty or MARKET_TICKER not in R.columns or R.shape[0] < 3: |
| | raise ValueError("Not enough aligned data to estimate moments.") |
| | rf_m = rf_ann / 12.0 |
| |
|
| | m = R[MARKET_TICKER] |
| | if isinstance(m, pd.DataFrame): |
| | m = m.iloc[:, 0].squeeze() |
| |
|
| | mu_m_ann = float(annualize_mean(m.mean())) |
| | sigma_m_ann = float(annualize_sigma(m.std(ddof=1))) |
| | erp_ann = float(mu_m_ann - rf_ann) |
| |
|
| | ex_m = m - rf_m |
| | var_m = float(np.var(ex_m.values, ddof=1)) |
| | var_m = max(var_m, 1e-9) |
| |
|
| | betas: Dict[str, float] = {} |
| | for s in [c for c in R.columns if c != MARKET_TICKER]: |
| | ex_s = R[s] - rf_m |
| | cov_sm = float(np.cov(ex_s.values, ex_m.values, ddof=1)[0, 1]) |
| | betas[s] = cov_sm / var_m |
| | betas[MARKET_TICKER] = 1.0 |
| |
|
| | asset_cols = [c for c in R.columns if c != MARKET_TICKER] |
| | cov_m = np.cov(R[asset_cols].values.T, ddof=1) if asset_cols else np.zeros((0, 0)) |
| | covA = pd.DataFrame(cov_m * 12.0, index=asset_cols, columns=asset_cols) |
| |
|
| | return {"betas": betas, "cov_ann": covA, "erp_ann": erp_ann, "sigma_m_ann": sigma_m_ann} |
| |
|
| | def capm_er(beta: float, rf_ann: float, erp_ann: float) -> float: |
| | return float(rf_ann + beta * erp_ann) |
| |
|
| | def portfolio_stats(weights: Dict[str, float], |
| | cov_ann: pd.DataFrame, |
| | betas: Dict[str, float], |
| | rf_ann: float, |
| | erp_ann: float) -> Tuple[float, float, float]: |
| | tickers = list(weights.keys()) |
| | if not tickers: |
| | return 0.0, rf_ann, 0.0 |
| | w = np.array([weights[t] for t in tickers], dtype=float) |
| | gross = float(np.sum(np.abs(w))) |
| | if gross <= 1e-12: |
| | return 0.0, rf_ann, 0.0 |
| | w_expo = w / gross |
| | beta_p = float(np.dot([betas.get(t, 0.0) for t in tickers], w_expo)) |
| | er_capm = capm_er(beta_p, rf_ann, erp_ann) |
| | cov = cov_ann.reindex(index=tickers, columns=tickers).fillna(0.0).to_numpy() |
| | sigma_p = math.sqrt(max(float(w_expo.T @ cov @ w_expo), 0.0)) |
| | return beta_p, er_capm, sigma_p |
| |
|
| | |
| | |
| | |
| | def efficient_same_sigma(sigma_target: float, rf_ann: float, erp_ann: float, sigma_mkt: float): |
| | if sigma_mkt <= 1e-12: |
| | return 0.0, 1.0, rf_ann |
| | a = sigma_target / sigma_mkt |
| | return a, 1.0 - a, rf_ann + a * erp_ann |
| |
|
| | def efficient_same_return(mu_target: float, rf_ann: float, erp_ann: float, sigma_mkt: float): |
| | if abs(erp_ann) <= 1e-12: |
| | return 0.0, 1.0, 0.0 |
| | a = (mu_target - rf_ann) / erp_ann |
| | return a, 1.0 - a, abs(a) * sigma_mkt |
| |
|
| | |
| | |
| | |
| | def _pct_arr(x): |
| | return np.asarray(x, dtype=float) * 100.0 |
| |
|
| | def plot_cml(rf_ann, erp_ann, sigma_mkt, |
| | pt_sigma_hist, pt_mu_capm, |
| | same_sigma_sigma, same_sigma_mu, |
| | same_mu_sigma, same_mu_mu) -> Image.Image: |
| | fig = plt.figure(figsize=(6.6, 4.4), dpi=130) |
| |
|
| | xmax = max(0.3, sigma_mkt * 2.0, pt_sigma_hist * 1.4, same_mu_sigma * 1.4, same_sigma_sigma * 1.4) |
| | xs = np.linspace(0, xmax, 160) |
| | slope = erp_ann / max(sigma_mkt, 1e-12) |
| | cml = rf_ann + slope * xs |
| |
|
| | plt.plot(_pct_arr(xs), _pct_arr(cml), label="CML via VOO", linewidth=1.8) |
| | plt.scatter([0.0], [_pct_arr(rf_ann)], label="Risk-free", zorder=5) |
| | plt.scatter([_pct_arr(sigma_mkt)], [_pct_arr(rf_ann + erp_ann)], label="Market (VOO)", zorder=5) |
| | plt.scatter([_pct_arr(pt_sigma_hist)], [_pct_arr(pt_mu_capm)], label="Your portfolio (CAPM)", zorder=6) |
| | plt.scatter([_pct_arr(same_sigma_sigma)], [_pct_arr(same_sigma_mu)], label="Efficient: same σ", zorder=5) |
| | plt.scatter([_pct_arr(same_mu_sigma)], [_pct_arr(same_mu_mu)], label="Efficient: same μ", zorder=5) |
| |
|
| | |
| | plt.plot([_pct_arr(pt_sigma_hist), _pct_arr(same_sigma_sigma)], |
| | [_pct_arr(pt_mu_capm), _pct_arr(same_sigma_mu)], |
| | ls="--", lw=1.1, alpha=0.7, color="gray") |
| | plt.plot([_pct_arr(pt_sigma_hist), _pct_arr(same_mu_sigma)], |
| | [_pct_arr(pt_mu_capm), _pct_arr(same_mu_mu)], |
| | ls="--", lw=1.1, alpha=0.7, color="gray") |
| |
|
| | plt.xlabel("σ (annual, %)") |
| | plt.ylabel("E[return] (annual, %)") |
| | plt.legend(loc="best", fontsize=8) |
| | plt.tight_layout() |
| |
|
| | buf = io.BytesIO() |
| | plt.savefig(buf, format="png") |
| | plt.close(fig) |
| | buf.seek(0) |
| | return Image.open(buf) |
| |
|
| | |
| | |
| | |
| | def dirichlet_signed(k, rng): |
| | signs = rng.choice([-1.0, 1.0], size=k, p=[0.25, 0.75]) |
| | raw = rng.dirichlet(np.ones(k)) |
| | gross = 1.0 + float(rng.gamma(2.0, 0.5)) |
| | return gross * signs * raw |
| |
|
| | def build_synth_dataset(universe: List[str], |
| | cov_ann: pd.DataFrame, |
| | betas: Dict[str, float], |
| | rf_ann: float, erp_ann: float, |
| | n_rows: int = N_SYNTH, |
| | seed: int = 123) -> pd.DataFrame: |
| | rng = np.random.default_rng(seed) |
| | U = [u for u in universe if u != MARKET_TICKER] + [MARKET_TICKER] |
| | rows = [] |
| | if not U: |
| | return pd.DataFrame() |
| | for i in range(n_rows): |
| | k = int(rng.integers(low=max(1, min(2, len(U))), high=min(8, len(U)) + 1)) |
| | picks = list(rng.choice(U, size=k, replace=False)) |
| | w = dirichlet_signed(k, rng) |
| | gross = float(np.sum(np.abs(w))) |
| | if gross <= 1e-12: |
| | continue |
| | w_expo = w / gross |
| | weights = {picks[j]: float(w_expo[j]) for j in range(k)} |
| | beta_i, er_capm_i, sigma_i = portfolio_stats(weights, cov_ann, betas, rf_ann, erp_ann) |
| | rows.append({ |
| | "id": int(i), |
| | "tickers": ",".join(picks), |
| | "weights": ",".join(f"{x:.6f}" for x in w_expo), |
| | "beta": float(beta_i), |
| | "er_capm": float(er_capm_i), |
| | "sigma": float(sigma_i), |
| | }) |
| | return pd.DataFrame(rows) |
| |
|
| | |
| | |
| | |
| | _embedder = None |
| | def get_embedder(): |
| | global _embedder |
| | if _embedder is None: |
| | _embedder = SentenceTransformer(EMBED_MODEL_NAME) |
| | return _embedder |
| |
|
| | def row_to_sentence(row: pd.Series) -> str: |
| | try: |
| | ts = row["tickers"].split(",") |
| | ws = [float(x) for x in row["weights"].split(",")] |
| | pairs = ", ".join([f"{ts[i]} {ws[i]:+.2f}" for i in range(min(len(ts), len(ws)))]) |
| | except Exception: |
| | pairs = "" |
| | return (f"portfolio with sigma {row['sigma']:.4f}, " |
| | f"capm_return {row['er_capm']:.4f}, " |
| | f"beta {row['beta']:.3f}, " |
| | f"exposures {pairs}") |
| |
|
| | def mmr_select(query_emb, cand_embs, k: int = 3, lambda_param: float = MMR_LAMBDA) -> List[int]: |
| | if cand_embs.shape[0] <= k: |
| | return list(range(cand_embs.shape[0])) |
| | sim_to_query = st_util.cos_sim(query_emb, cand_embs).cpu().numpy().reshape(-1) |
| | chosen = [] |
| | candidate_indices = list(range(cand_embs.shape[0])) |
| | first = int(np.argmax(sim_to_query)) |
| | chosen.append(first) |
| | candidate_indices.remove(first) |
| | while len(chosen) < k and candidate_indices: |
| | max_score = -1e9 |
| | max_idx = candidate_indices[0] |
| | |
| | chosen_stack = cand_embs[chosen] |
| | for idx in candidate_indices: |
| | sim_q = sim_to_query[idx] |
| | sim_d = float(st_util.cos_sim(cand_embs[idx], chosen_stack).max().cpu().numpy()) |
| | mmr_score = lambda_param * sim_q - (1.0 - lambda_param) * sim_d |
| | if mmr_score > max_score: |
| | max_score = mmr_score |
| | max_idx = idx |
| | chosen.append(max_idx) |
| | candidate_indices.remove(max_idx) |
| | return chosen |
| |
|
| | |
| | |
| | |
| | def yahoo_search(query: str): |
| | if not query or len(query.strip()) == 0: |
| | return [] |
| | url = "https://query1.finance.yahoo.com/v1/finance/search" |
| | params = {"q": query.strip(), "quotesCount": 10, "newsCount": 0} |
| | headers = {"User-Agent": "Mozilla/5.0"} |
| | try: |
| | r = requests.get(url, params=params, headers=headers, timeout=10) |
| | r.raise_for_status() |
| | data = r.json() |
| | out = [] |
| | for q in data.get("quotes", []): |
| | sym = q.get("symbol") |
| | name = q.get("shortname") or q.get("longname") or "" |
| | exch = q.get("exchDisp") or "" |
| | if sym and sym.isascii(): |
| | out.append(f"{sym} | {name} | {exch}") |
| | if not out: |
| | out = [f"{query.strip().upper()} | typed symbol | n/a"] |
| | return out[:10] |
| | except Exception: |
| | return [f"{query.strip().upper()} | typed symbol | n/a"] |
| |
|
| | _last_matches = [] |
| |
|
| | |
| | |
| | |
| | def fmt_pct(x: float) -> str: |
| | try: |
| | return f"{float(x)*100:.2f}%" |
| | except Exception: |
| | return "n/a" |
| |
|
| | def fmt_money(x: float) -> str: |
| | try: |
| | return f"${float(x):,.0f}" |
| | except Exception: |
| | return "n/a" |
| |
|
| | |
| | |
| | |
| | HORIZON_YEARS = 5.0 |
| | RF_CODE = fred_series_for_horizon(HORIZON_YEARS) |
| | RF_ANN = fetch_fred_yield_annual(RF_CODE) |
| |
|
| | def do_search(query): |
| | global _last_matches |
| | _last_matches = yahoo_search(query) |
| | note = "Select a symbol from Matches, then click Add." |
| | return note, gr.update(choices=_last_matches, value=None) |
| |
|
| | def add_symbol(selection: str, table: pd.DataFrame): |
| | if selection and " | " in selection: |
| | symbol = selection.split(" | ")[0].strip().upper() |
| | elif isinstance(selection, str) and selection.strip(): |
| | symbol = selection.strip().upper() |
| | else: |
| | return table, "Pick a row from Matches first." |
| |
|
| | current = [] |
| | if isinstance(table, pd.DataFrame) and len(table) > 0 and "ticker" in table.columns: |
| | current = [str(x).upper() for x in table["ticker"].tolist() if str(x) != "nan"] |
| |
|
| | tickers = current if symbol in current else current + [symbol] |
| | tickers = [t for t in tickers if t] |
| |
|
| | val = validate_tickers(tickers, years=DEFAULT_LOOKBACK_YEARS) |
| | tickers = [t for t in tickers if t in val] |
| |
|
| | amt_map = {} |
| | if isinstance(table, pd.DataFrame) and len(table) > 0: |
| | for _, r in table.iterrows(): |
| | t = str(r.get("ticker", "")).upper() |
| | if t in tickers: |
| | amt_map[t] = float(pd.to_numeric(r.get("amount_usd", 0.0), errors="coerce") or 0.0) |
| |
|
| | new_table = pd.DataFrame({"ticker": tickers, "amount_usd": [amt_map.get(t, 0.0) for t in tickers]}) |
| | msg = f"Added {symbol}" if symbol in tickers else f"{symbol} not valid or no data" |
| | if len(new_table) > MAX_TICKERS: |
| | new_table = new_table.iloc[:MAX_TICKERS] |
| | msg = f"Reached max of {MAX_TICKERS}" |
| | return new_table, msg |
| |
|
| | def lock_ticker_column(tb: pd.DataFrame): |
| | if tb is None or len(tb) == 0: |
| | return pd.DataFrame(columns=["ticker", "amount_usd"]) |
| | tickers = [str(x).upper() for x in tb["ticker"].tolist()] |
| | amounts = pd.to_numeric(tb["amount_usd"], errors="coerce").fillna(0.0).tolist() |
| | val = validate_tickers(tickers, years=DEFAULT_LOOKBACK_YEARS) |
| | tickers = [t for t in tickers if t in val] |
| | amounts = amounts[:len(tickers)] + [0.0] * max(0, len(tickers) - len(amounts)) |
| | return pd.DataFrame({"ticker": tickers, "amount_usd": amounts}) |
| |
|
| | def set_horizon(years: float): |
| | y = max(1.0, min(100.0, float(years))) |
| | code = fred_series_for_horizon(y) |
| | rf = fetch_fred_yield_annual(code) |
| | global HORIZON_YEARS, RF_CODE, RF_ANN |
| | HORIZON_YEARS = y |
| | RF_CODE = code |
| | RF_ANN = rf |
| | return f"Risk-free series {code}. Latest annual rate {rf:.2%}. Computations will use this." |
| |
|
| | def _table_from_weights(weights: Dict[str, float], gross_amt: float) -> pd.DataFrame: |
| | items = [] |
| | for t, w in weights.items(): |
| | pct = float(w) |
| | amt = float(w) * gross_amt |
| | items.append({"ticker": t, "weight_%": round(pct * 100.0, 2), "amount_$": round(amt, 2)}) |
| | df = pd.DataFrame(items, columns=SUG_COLS) |
| | if df.empty: |
| | return pd.DataFrame(columns=SUG_COLS) |
| | df["absw"] = df["weight_%"].abs() |
| | df = df.sort_values("absw", ascending=False).drop(columns=["absw"]) |
| | return df |
| |
|
| | def _weights_dict_from_row(r: pd.Series) -> Dict[str, float]: |
| | ts = [t.strip().upper() for t in str(r.get("tickers","")).split(",") if t] |
| | ws = [] |
| | for x in str(r.get("weights","")).split(","): |
| | try: |
| | ws.append(float(x)) |
| | except Exception: |
| | ws.append(0.0) |
| | wmap = {} |
| | for i in range(min(len(ts), len(ws))): |
| | wmap[ts[i]] = ws[i] |
| | gross = sum(abs(v) for v in wmap.values()) |
| | if gross <= 1e-12: |
| | return {} |
| | return {k: v / gross for k, v in wmap.items()} |
| |
|
| | def compute(lookback_years: int, |
| | table: Optional[pd.DataFrame], |
| | risk_bucket: str, |
| | horizon_years: float): |
| |
|
| | try: |
| | |
| | if table is None or len(table) == 0: |
| | empty = pd.DataFrame(columns=POS_COLS) |
| | emptyS = pd.DataFrame(columns=SUG_COLS) |
| | emptyE = pd.DataFrame(columns=EFF_COLS) |
| | return (None, "Add at least one ticker", "", empty, |
| | emptyS, emptyS, emptyS, emptyE, emptyE, "[]", 1, "No suggestions yet.") |
| |
|
| | df = table.copy().dropna(how="all") |
| | if df.empty or "ticker" not in df.columns or "amount_usd" not in df.columns: |
| | empty = pd.DataFrame(columns=POS_COLS) |
| | emptyS = pd.DataFrame(columns=SUG_COLS) |
| | emptyE = pd.DataFrame(columns=EFF_COLS) |
| | return (None, "Positions table is empty or malformed.", "", empty, |
| | emptyS, emptyS, emptyS, emptyE, emptyE, "[]", 1, "No suggestions yet.") |
| |
|
| | df["ticker"] = df["ticker"].astype(str).str.upper().str.strip() |
| | df["amount_usd"] = pd.to_numeric(df["amount_usd"], errors="coerce").fillna(0.0) |
| |
|
| | symbols = [t for t in df["ticker"].tolist() if t] |
| | symbols = validate_tickers(symbols, lookback_years) |
| | if len(symbols) == 0: |
| | empty = pd.DataFrame(columns=POS_COLS) |
| | emptyS = pd.DataFrame(columns=SUG_COLS) |
| | emptyE = pd.DataFrame(columns=EFF_COLS) |
| | return (None, "Could not validate any tickers", "Universe invalid", |
| | empty, emptyS, emptyS, emptyS, emptyE, emptyE, "[]", 1, "No suggestions.") |
| |
|
| | |
| | universe = sorted(set([s for s in symbols if s != MARKET_TICKER] + [MARKET_TICKER])) |
| | df = df[df["ticker"].isin(symbols)].copy() |
| | amounts = {r["ticker"]: float(r["amount_usd"]) for _, r in df.iterrows()} |
| | gross_amt = sum(abs(v) for v in amounts.values()) |
| | if gross_amt <= 1e-9: |
| | empty = pd.DataFrame(columns=POS_COLS) |
| | emptyS = pd.DataFrame(columns=SUG_COLS) |
| | emptyE = pd.DataFrame(columns=EFF_COLS) |
| | return (None, "All amounts are zero", "Universe ok", |
| | empty, emptyS, emptyS, emptyS, emptyE, emptyE, "[]", 1, "No suggestions.") |
| |
|
| | weights = {k: v / gross_amt for k, v in amounts.items()} |
| |
|
| | |
| | rf_code = fred_series_for_horizon(horizon_years) |
| | rf_ann = fetch_fred_yield_annual(rf_code) |
| | moms = estimate_all_moments_aligned(universe, lookback_years, rf_ann) |
| | betas, covA, erp_ann, sigma_mkt = moms["betas"], moms["cov_ann"], moms["erp_ann"], moms["sigma_m_ann"] |
| |
|
| | |
| | beta_p, er_capm_p, sigma_p = portfolio_stats(weights, covA, betas, rf_ann, erp_ann) |
| |
|
| | |
| | a_sigma, b_sigma, mu_eff_sigma = efficient_same_sigma(sigma_p, rf_ann, erp_ann, sigma_mkt) |
| | a_mu, b_mu, sigma_eff_mu = efficient_same_return(er_capm_p, rf_ann, erp_ann, sigma_mkt) |
| |
|
| | eff_same_sigma_tbl = _table_from_weights({MARKET_TICKER: a_sigma, BILLS_TICKER: b_sigma}, gross_amt) |
| | eff_same_mu_tbl = _table_from_weights({MARKET_TICKER: a_mu, BILLS_TICKER: b_mu}, gross_amt) |
| |
|
| | |
| | synth = build_synth_dataset(universe, covA, betas, rf_ann, erp_ann, n_rows=N_SYNTH, seed=777) |
| | if synth.empty: |
| | |
| | fallback = [] |
| | for a in [0.2, 0.5, 0.8]: |
| | w = {MARKET_TICKER: a, BILLS_TICKER: 1-a} |
| | beta_i, er_capm_i, sigma_i = portfolio_stats(w, pd.DataFrame(), {MARKET_TICKER:1.0}, rf_ann, erp_ann) |
| | fallback.append({"tickers": ",".join(w.keys()), |
| | "weights": ",".join(f"{v:.6f}" for v in w.values()), |
| | "beta": beta_i, "er_capm": er_capm_i, "sigma": sigma_i}) |
| | synth = pd.DataFrame(fallback) |
| |
|
| | |
| | median_sigma = float(synth["sigma"].median()) |
| | low_max = max(float(synth["sigma"].min()), median_sigma - 0.05) |
| | high_min = median_sigma + 0.05 |
| |
|
| | if risk_bucket == "Low": |
| | cand_df = synth[synth["sigma"] <= low_max].copy() |
| | elif risk_bucket == "High": |
| | cand_df = synth[synth["sigma"] >= high_min].copy() |
| | else: |
| | cand_df = synth[(synth["sigma"] > low_max) & (synth["sigma"] < high_min)].copy() |
| | if len(cand_df) == 0: |
| | cand_df = synth.copy() |
| |
|
| | |
| | embed = get_embedder() |
| | cand_sentences = cand_df.apply(row_to_sentence, axis=1).tolist() |
| | cur_pairs = ", ".join([f"{k}:{v:+.2f}" for k, v in sorted(weights.items())]) |
| | q_sentence = f"user portfolio ({risk_bucket} risk); capm_target {er_capm_p:.4f}; sigma_hist {sigma_p:.4f}; exposures {cur_pairs}" |
| |
|
| | cand_embs = embed.encode(cand_sentences, convert_to_tensor=True, normalize_embeddings=True, batch_size=64, show_progress_bar=False) |
| | q_emb = embed.encode([q_sentence], convert_to_tensor=True, normalize_embeddings=True)[0] |
| |
|
| | sims = st_util.cos_sim(q_emb, cand_embs)[0] |
| | top_idx = sims.topk(k=min(MMR_K, len(cand_df))).indices.cpu().numpy().tolist() |
| | shortlist_embs = cand_embs[top_idx] |
| | mmr_local = mmr_select(q_emb, shortlist_embs, k=3, lambda_param=MMR_LAMBDA) |
| | chosen = [top_idx[i] for i in mmr_local] |
| | recs = cand_df.iloc[chosen].reset_index(drop=True) |
| |
|
| | |
| | sugg_tables = [] |
| | sugg_meta = [] |
| | for _, r in recs.iterrows(): |
| | wmap = _weights_dict_from_row(r) |
| | sugg_tables.append(_table_from_weights(wmap, gross_amt)) |
| | sugg_meta.append({"er_capm": float(r["er_capm"]), "sigma": float(r["sigma"]), "beta": float(r["beta"])}) |
| |
|
| | |
| | img = plot_cml( |
| | rf_ann, erp_ann, sigma_mkt, |
| | sigma_p, er_capm_p, |
| | same_sigma_sigma=sigma_p, same_sigma_mu=mu_eff_sigma, |
| | same_mu_sigma=sigma_eff_mu, same_mu_mu=er_capm_p |
| | ) |
| |
|
| | |
| | rows = [] |
| | for t in universe: |
| | if t == MARKET_TICKER: |
| | continue |
| | rows.append({ |
| | "ticker": t, |
| | "amount_usd": round(amounts.get(t, 0.0), 2), |
| | "weight_exposure": round(weights.get(t, 0.0), 6), |
| | "beta": round(betas.get(t, np.nan), 4) if t != MARKET_TICKER else 1.0 |
| | }) |
| | pos_table = pd.DataFrame(rows, columns=POS_COLS) |
| |
|
| | |
| | info_lines = [] |
| | info_lines.append("### Inputs") |
| | info_lines.append(f"- Lookback years **{int(lookback_years)}**") |
| | info_lines.append(f"- Horizon years **{int(round(horizon_years))}**") |
| | info_lines.append(f"- Risk-free **{fmt_pct(rf_ann)}** from **{rf_code}**") |
| | info_lines.append(f"- Market ERP **{fmt_pct(erp_ann)}**") |
| | info_lines.append(f"- Market σ **{fmt_pct(sigma_mkt)}**") |
| | info_lines.append("") |
| | info_lines.append("### Your portfolio (plotted as CAPM return, historical σ)") |
| | info_lines.append(f"- Beta **{beta_p:.2f}**") |
| | info_lines.append(f"- σ (historical) **{fmt_pct(sigma_p)}**") |
| | info_lines.append(f"- E[return] (CAPM / SML) **{fmt_pct(er_capm_p)}**") |
| | info_lines.append("") |
| | info_lines.append("### Efficient alternatives on CML") |
| | info_lines.append(f"- Same σ → Market **{a_sigma:.2f}**, Bills **{b_sigma:.2f}**, Return **{fmt_pct(mu_eff_sigma)}**") |
| | info_lines.append(f"- Same μ → Market **{a_mu:.2f}**, Bills **{b_mu:.2f}**, σ **{fmt_pct(sigma_eff_mu)}**") |
| | info_lines.append("") |
| | info_lines.append(f"### Dataset-based suggestions (risk: **{risk_bucket}**)") |
| | info_lines.append("Use the selector to flip between **Pick #1 / #2 / #3**. Table shows % exposure and $ amounts.") |
| |
|
| | |
| | while len(sugg_tables) < 3: |
| | sugg_tables.append(pd.DataFrame(columns=SUG_COLS)) |
| |
|
| | pick_idx_default = 1 |
| | pick_msg_default = (f"Pick #1 — E[μ] {fmt_pct(sugg_meta[0]['er_capm'])}, " |
| | f"σ {fmt_pct(sugg_meta[0]['sigma'])}, β {sugg_meta[0]['beta']:.2f}") if sugg_meta else "No suggestion." |
| |
|
| | return (img, |
| | "\n".join(info_lines), |
| | f"Universe set to {', '.join(universe)}", |
| | pos_table, |
| | sugg_tables[0], sugg_tables[1], sugg_tables[2], |
| | eff_same_sigma_tbl, eff_same_mu_tbl, |
| | json.dumps(sugg_meta), pick_idx_default, pick_msg_default) |
| |
|
| | except Exception as e: |
| | empty = pd.DataFrame(columns=POS_COLS) |
| | emptyS = pd.DataFrame(columns=SUG_COLS) |
| | emptyE = pd.DataFrame(columns=EFF_COLS) |
| | msg = f"⚠️ Compute failed: {e}" |
| | if DEBUG: |
| | msg += "\n\n```\n" + traceback.format_exc() + "\n```" |
| | return (None, msg, "Error", empty, emptyS, emptyS, emptyS, emptyE, emptyE, "[]", 1, "No suggestions.") |
| |
|
| | def on_pick_change(idx: int, meta_json: str): |
| | try: |
| | data = json.loads(meta_json) |
| | except Exception: |
| | data = [] |
| | if not data: |
| | return "No suggestion." |
| | i = int(idx) - 1 |
| | i = max(0, min(i, len(data)-1)) |
| | s = data[i] |
| | return f"Pick #{i+1} — E[μ] {fmt_pct(s['er_capm'])}, σ {fmt_pct(s['sigma'])}, β {s['beta']:.2f}" |
| |
|
| | |
| | |
| | |
| | with gr.Blocks(title="Efficient Portfolio Advisor", css="#small-note {font-size: 12px; color:#666;}") as demo: |
| |
|
| | gr.Markdown("## Efficient Portfolio Advisor\n" |
| | "Search symbols, enter **$ amounts**, set your **horizon**. " |
| | "The plot shows your **CAPM expected return** vs **historical σ**, alongside the **CML**. " |
| | "Recommendations are generated from a **synthetic dataset (1000 portfolios)** and ranked with **local embeddings (BGE-base)** for relevance + diversity.") |
| |
|
| | with gr.Tab("Build Portfolio"): |
| | with gr.Row(): |
| | with gr.Column(scale=1): |
| | q = gr.Textbox(label="Search symbol") |
| | search_note = gr.Markdown(elem_id="small-note") |
| | matches = gr.Dropdown(choices=[], label="Matches", value=None) |
| | search_btn = gr.Button("Search") |
| | add_btn = gr.Button("Add selected to portfolio") |
| |
|
| | gr.Markdown("### Positions (enter dollars; negatives allowed for shorts)") |
| | table = gr.Dataframe( |
| | headers=["ticker", "amount_usd"], |
| | datatype=["str", "number"], |
| | row_count=0, |
| | col_count=(2, "fixed"), |
| | wrap=True |
| | ) |
| |
|
| | with gr.Column(scale=1): |
| | horizon = gr.Slider(1, 30, value=DEFAULT_LOOKBACK_YEARS, step=1, label="Investment horizon (years)") |
| | lookback = gr.Slider(1, 10, value=DEFAULT_LOOKBACK_YEARS, step=1, label="Lookback (years) for β and σ") |
| | risk_bucket = gr.Radio(["Low", "Medium", "High"], value="Medium", label="Recommendation risk level") |
| | run_btn = gr.Button("Compute") |
| |
|
| | rf_msg = gr.Textbox(label="Risk-free source / status", interactive=False) |
| | search_btn.click(fn=do_search, inputs=q, outputs=[search_note, matches]) |
| | add_btn.click(fn=add_symbol, inputs=[matches, table], outputs=[table, search_note]) |
| | table.change(fn=lock_ticker_column, inputs=table, outputs=table) |
| | horizon.change(fn=set_horizon, inputs=horizon, outputs=[rf_msg]) |
| |
|
| | with gr.Tab("Results"): |
| | with gr.Row(): |
| | with gr.Column(scale=1): |
| | plot = gr.Image(label="Capital Market Line", type="pil") |
| | summary = gr.Markdown(label="Summary") |
| | universe_msg = gr.Textbox(label="Universe status", interactive=False) |
| |
|
| | with gr.Column(scale=1): |
| | positions = gr.Dataframe( |
| | label="Computed positions", |
| | headers=POS_COLS, |
| | datatype=["str", "number", "number", "number"], |
| | col_count=(len(POS_COLS), "fixed"), |
| | interactive=False |
| | ) |
| |
|
| | gr.Markdown("### Recommendations (always from embeddings)") |
| | with gr.Row(): |
| | sugg1 = gr.Dataframe(label="Pick #1", interactive=False) |
| | sugg2 = gr.Dataframe(label="Pick #2", interactive=False) |
| | sugg3 = gr.Dataframe(label="Pick #3", interactive=False) |
| |
|
| | with gr.Row(): |
| | pick_idx = gr.Slider(1, 3, value=1, step=1, label="Carousel: show Pick #") |
| | pick_meta = gr.Textbox(value="[]", visible=False) |
| | pick_msg = gr.Markdown("") |
| |
|
| | gr.Markdown("### Efficient alternatives on the CML") |
| | eff_same_sigma_tbl = gr.Dataframe(label="Efficient: Same σ", interactive=False) |
| | eff_same_mu_tbl = gr.Dataframe(label="Efficient: Same μ", interactive=False) |
| |
|
| | run_btn.click( |
| | fn=compute, |
| | inputs=[lookback, table, risk_bucket, horizon], |
| | outputs=[ |
| | plot, summary, universe_msg, positions, |
| | sugg1, sugg2, sugg3, |
| | eff_same_sigma_tbl, eff_same_mu_tbl, |
| | pick_meta, pick_idx, pick_msg |
| | ] |
| | ) |
| | pick_idx.change(fn=on_pick_change, inputs=[pick_idx, pick_meta], outputs=pick_msg) |
| |
|
| | with gr.Tab("About"): |
| | gr.Markdown( |
| | "### Modality & Model\n" |
| | "- **Modality**: Text (portfolio → text descriptions) powering **embeddings**\n" |
| | "- **Embedding model**: `BAAI/bge-base-en-v1.5` (local, downloaded once; no API)\n\n" |
| | "### Use case\n" |
| | "Given a portfolio, we build a synthetic dataset of 1,000 alternative mixes **using the same tickers**, " |
| | "compute each mix’s **CAPM return, σ, and β**, and rank candidates with embeddings to return **3 diverse, relevant suggestions** " |
| | "for **Low / Medium / High** risk.\n\n" |
| | "### Theory links\n" |
| | "- Portfolio expected return in the plot uses **CAPM (SML)**, while σ is historical.\n" |
| | "- The **CML** and the two **efficient alternatives** (same σ, same μ) use a mix of **Market (VOO)** and **Bills**." |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | demo.launch() |
| |
|