import os import sys if sys.stdout.encoding.lower() != 'utf-8': sys.stdout.reconfigure(encoding='utf-8') import time import json import pandas as pd import numpy as np from dataclasses import dataclass, field from typing import Dict, Any, List # ───────────────────────────────────────────── # IMPORT OUR CUSTOM MODULES # ───────────────────────────────────────────── from config import load_config, save_config, Color, hr, MODEL_NAMES, SPREAD_BY_SECTOR, COST_BASIS_FILE, logger, OUTPUT_DIR from core_types import PortfolioState from data import fetch_risk_free_rate, fetch_fama_french_factors, fetch_data, fetch_risk_free_series, build_monthly_returns from solver import build_and_optimize from analytics import ( portfolio_sensitivity, portfolio_stress_test, backtest, behavioral_diagnostics, build_macro ) from utils.metrics import portfolio_gross_metrics, israelsen_sharpe from backtest import ( monte_carlo, expanding_window_backtest ) from report import _ensure_chartjs, generate_html_report from exports import export_csv, export_excel from server import serve_report from futures_overlay import optimize_futures_overlay from overlay_analytics import aggregate_overlay_returns, simulate_margin_calls # Advanced Quant Modules from risk_attribution import factor_exposure, marginal_var, cvar_attribution, stress_correlation from regime_detection import detect_volatility_regime, dynamic_risk_aversion from validation import ( christoffersen_test, diebold_mariano_test, probabilistic_sharpe_ratio, deflated_sharpe_ratio, print_validation_report ) # Unified Database Access from database import get_pg_engine # ───────────────────────────────────────────────────────────────── # DYNAMIC TICKER UNIVERSE & CLI WIZARD # ───────────────────────────────────────────────────────────────── def _print_universe(cfg): print(f"\n{Color.DIM} ┌─ Available symbols by asset class ──────────────────────────────────────────┐") categories = cfg.get("universe_categories", {}) if not categories: categories = { "Core Equities": ["SPY", "QQQ", "DIA", "IWM"], "Bonds & Rates": ["TLT", "IEF", "SHY", "AGG"], "Tech & Growth": ["AAPL", "MSFT", "NVDA", "TSLA"], } for cat, tks in categories.items(): display_tks = tks[:12] row = " ".join(f"{t:<11}" for t in display_tks) print(f" │ {Color.CYAN}{cat:<20}{Color.RESET}{Color.DIM} {row}") print(f" └──────────────────────────────────────────────────────────────────────────┘{Color.RESET}") def _section(title, color=Color.CYAN): print(f"\n{color}{Color.BOLD}{chr(8212)*58}") print(f" {title}") print(f"{chr(8212)*58}{Color.RESET}") def setup_portfolio(cfg): risk_map = {1:0.1,2:0.5,3:1.0,4:2.0,5:3.0,6:5.0,7:7.5,8:10.0,9:15.0,10:25.0} try: os.system("cls" if os.name == "nt" else "clear") except Exception: pass print(f"\n{Color.BOLD}{Color.MAGENTA}╔══════════════════════════════════════════════════════╗") print("║ QUANTITATIVE PORTFOLIO BUILDER v8.0 ║") print("║ Global Institutional Optimization Engine ║") print(f"╚══════════════════════════════════════════════════════╝{Color.RESET}") _section("STEP 1 OF 5 — Regional & Market Settings") curr_in = input(f" {Color.BOLD}Base Currency Symbol [{cfg.get('currency_symbol', '$')}]:{Color.RESET} ").strip() if curr_in: cfg['currency_symbol'] = curr_in days_in = input(f" {Color.BOLD}Trading Days per Year [{cfg.get('trading_days_per_year', 252)}]:{Color.RESET} ").strip() if days_in.isdigit(): cfg['trading_days_per_year'] = int(days_in) _section("STEP 2 OF 5 — Your Current Portfolio") _print_universe(cfg) current_weights_raw = {} while True: try: line = input(f" {Color.BOLD}>{Color.RESET} ").strip().upper() except (EOFError, KeyboardInterrupt): break if not line: break parts = line.replace(",", " ").split() if len(parts) >= 2: try: t, w = parts[0], float(parts[1].replace("%", "")) current_weights_raw[t] = w except ValueError: pass if current_weights_raw: total_w = sum(current_weights_raw.values()) if total_w > 0: current_weights_raw = {t: w/total_w for t, w in current_weights_raw.items()} for t in current_weights_raw: if t not in cfg.get("sector_map", {}): cfg.setdefault("sector_map", {})[t] = "Other" tickers = list(current_weights_raw.keys()) else: raw = input(f" {Color.BOLD}Tickers (comma-separated):{Color.RESET} ").upper().replace(" ", "") tickers = [t for t in raw.split(",") if t] or ["SPY", "TLT", "GLD"] for t in tickers: if t not in cfg.get("sector_map", {}): cfg.setdefault("sector_map", {})[t] = "Other" try: capital = float(input(f" {Color.BOLD}Portfolio value:{Color.RESET} ").replace(",", "")) except ValueError: capital = 100_000.0 _section("STEP 3 OF 5 — Risk Aversion") try: ri = int(input(f"\n {Color.BOLD}Risk Aversion (1-10):{Color.RESET} ")) if ri not in risk_map: raise ValueError risk_input, risk_factor = ri, risk_map[ri] except ValueError: risk_input, risk_factor = 5, 3.0 _section("STEP 4 OF 5 — Expected Return Model") try: mi_in = int(input(f" {Color.BOLD}Return Model (1-5):{Color.RESET} ")) if mi_in not in MODEL_NAMES: raise ValueError model = mi_in except ValueError: model = 1 try: ae_in = int(input(f" {Color.BOLD}Allocation Engine (1-2):{Color.RESET} ")) except ValueError: ae_in = 1 allocation_engine = ae_in if ae_in in [1, 2] else 1 _section("STEP 5 OF 5 — Tax & Advanced Risk Constraints") tax_lt = cfg.get("tax_rate_lt", 0.20) tax_st = cfg.get("tax_rate_st", 0.35) cfg["tax_rate_lt"], cfg["tax_rate_st"] = tax_lt, tax_st cfg["tax_enabled"] = tax_lt > 0 cfg["_use_saved_basis"] = cfg["tax_enabled"] and os.path.exists(COST_BASIS_FILE) allow_ss = False if allow_ss: cfg["single_asset_min"] = -0.30 cfg["gross_leverage_cap"] = 1.5 cfg["short_borrow_cost"] = 0.015 else: cfg["single_asset_min"] = 0.0 cfg["gross_leverage_cap"] = 1.0 cfg["short_borrow_cost"] = 0.0 cfg["garch_enabled"] = True cfg["cvar_enabled"] = True return tickers, capital, risk_input, risk_factor, model, allocation_engine, current_weights_raw def build_spread_map(tickers, sector_map): return {t: SPREAD_BY_SECTOR.get(sector_map.get(t, "Other"), 0.0008) for t in tickers} def load_portfolio_state_dict(): if os.path.exists(COST_BASIS_FILE): try: with open(COST_BASIS_FILE) as f: return json.load(f) except Exception: pass return {} def save_portfolio_state_dict(weights, prices, capital, existing_state=None): state = {k: dict(v) for k, v in (existing_state or {}).items() if not k.startswith('_')} today = time.strftime('%Y-%m-%d') for t, w in weights.items(): price = prices.get(t, 0.0) if price <= 0: continue new_alloc = capital * w new_shares = new_alloc / price if abs(w) > 0.001 else 0.0 if new_shares < 0.0001: state.pop(t, None) continue if t in state and state[t].get('shares', 0) > 0: old_shares = state[t]['shares'] old_avg_cost = state[t]['avg_cost'] new_avg_cost = (old_shares * old_avg_cost + (new_shares - old_shares) * price) / new_shares if new_shares > old_shares else old_avg_cost state[t].update({'avg_cost': round(new_avg_cost, 4), 'shares': round(new_shares, 6), 'last_updated': today}) else: state[t] = {'avg_cost': round(price, 4), 'shares': round(new_shares, 6), 'purchase_date': today, 'last_updated': today} state['_metadata'] = { "disclaimer": "Note: Prices used here may be synthetic or stale based on the last backtest. Do not use for live accounting.", "last_generated": today } with open(COST_BASIS_FILE, 'w') as f: json.dump(state, f, indent=2) return state def _force_liquidate_for_margin(weights, prices, shortfall, capital, cfg): if shortfall >= 0 or weights.empty: return weights weights = weights.copy() long_positions = sorted([t for t in weights.index if t != "CASH" and weights.get(t, 0.0) > 0], key=lambda t: -float(abs(weights.get(t, 0.0)))) remaining = -shortfall for ticker in long_positions: price = prices.get(ticker, 0.0) if price <= 0: continue max_reducible = float(weights[ticker]) * capital sell_amount = min(max_reducible, remaining) weight_reduction = sell_amount / capital weights[ticker] = float(weights[ticker]) - weight_reduction weights["CASH"] = float(weights.get("CASH", 0.0)) + weight_reduction remaining -= sell_amount if remaining <= 0: break return weights # ───────────────────────────────────────────── # PIPELINE DATA STRUCTURES # ───────────────────────────────────────────── @dataclass class ValidationBundle: oos_eq: pd.Series oos_bench_curve: pd.Series oos_port_rets: pd.Series wf_ann_ret: float var_results: dict dm_results: dict psr_results: dict dsr_results: dict @dataclass class OptimizationBundle: weights: pd.Series exp_rets: pd.Series cov_mat: pd.DataFrame vol: float corr_matrix: pd.DataFrame betas: pd.Series model_info: dict sens_report: dict stress_report: dict n_fragile: int # ───────────────────────────────────────────── # PIPELINE ORCHESTRATOR # ───────────────────────────────────────────── class PortfolioPipeline: def __init__(self, overrides=None): self.cfg = load_config() self.overrides = overrides or {} # Determine execution mode if overrides: logger.info("Executing in Headless Orchestrator Mode with overrides.") self.tickers = self.overrides.get('tickers', ["SPY", "TLT", "GLD"]) self.capital = self.overrides.get('capital', 100000.0) self.risk_input = self.overrides.get('risk_input', 5) self.risk_factor = self.overrides.get('risk_factor', 3.0) self.model = self.overrides.get('model', 1) self.allocation_engine = self.overrides.get('allocation_engine', 1) self.current_weights_raw = self.overrides.get('current_weights_raw', {}) for k, v in self.overrides.get('cfg_overrides', {}).items(): self.cfg[k] = v for t in self.tickers + list(self.current_weights_raw.keys()): if t not in self.cfg.get("sector_map", {}): self.cfg.setdefault("sector_map", {})[t] = "Other" else: t, c, ri, rf, m, ae, cw = setup_portfolio(self.cfg) self.tickers, self.capital, self.risk_input, self.risk_factor = t, c, ri, rf self.model, self.allocation_engine, self.current_weights_raw = m, ae, cw save_config(self.cfg) self.chartjs_js = _ensure_chartjs() self.trading_days = self.cfg.get("trading_days_per_year", 252) # State populated through pipeline self.data_bundle = {} def load_data(self) -> None: """Fetches market data, sets up benchmarks and populates legacy state via DataRepository.""" from data_repository import DataRepository repo = DataRepository(self.cfg) snap = repo.fetch_all(self.tickers, self.model) self.ff_df = snap.opt_ff_df self.rfr = snap.rfr self.spread_map = snap.spread_map self.legacy_state_dict = snap.master_state.to_dict() if hasattr(snap.master_state, 'to_dict') else {} self.vol_raw = snap.vol_raw self.opt_tickers = snap.opt_tickers self.opt_returns_df = snap.opt_returns_df self.bench_rets_monthly = snap.bench_rets_monthly self.opt_ff_df = snap.opt_ff_df self.display_df = snap.display_df self.bench_display = snap.bench_display self.final_tickers = snap.master_state.tickers self.master_state = snap.master_state self.train_yrs = snap.train_yrs self.OOS_TEST_DAYS = int(snap.test_yrs * self.trading_days) self.OOS_TRAIN_DAYS = int(snap.train_yrs * self.trading_days) self.test_yrs = snap.test_yrs self.tn_ratio = (len(snap.opt_returns_df) if snap.opt_returns_df is not None else 0) / max(len(snap.opt_tickers), 1) vix_current = float(self.vol_raw.iloc[-1]) if self.vol_raw is not None and not self.vol_raw.empty else 0.0 self.risk_adj = None if self.cfg.get("dynamic_risk", True): orig_ri, orig_rf = self.risk_input, self.risk_factor self.risk_input, self.risk_factor = dynamic_risk_aversion(vix_current, orig_ri, orig_rf, silent=False) self.risk_adj = {"original_input": orig_ri, "adjusted_input": self.risk_input, "vix_val": vix_current} self.regime_info = detect_volatility_regime(snap.bench_rets, cfg=self.cfg, silent=False) if self.cfg.get("hmm_regime", True) else None self.data_bundle = { "returns_df": snap.returns_df, "bench_rets": snap.bench_rets, "raw": snap.raw, "prices": snap.prices, "eq_bench": snap.eq_bench, "vol_bench": snap.vol_bench, "rfr_bench": snap.rfr_bench } def run_validation(self) -> ValidationBundle: returns_df = self.data_bundle["returns_df"] bench_rets = self.data_bundle["bench_rets"] reb_freq = int(self.trading_days / 4) self.cfg['_risk_input'] = self.risk_input self.cfg['_risk_factor'] = self.risk_factor oos_eq, oos_bench_curve = expanding_window_backtest( returns_df, bench_rets, self.capital, self.rfr, self.cfg, self.model, self.allocation_engine, self.spread_map, initial_train_days=self.OOS_TRAIN_DAYS, rebalance_freq=reb_freq, ff_df=self.ff_df ) oos_port_rets = oos_eq.pct_change().dropna() oos_rets_arr = oos_port_rets.values total_days = len(oos_rets_arr) n_yrs_wf = total_days / self.trading_days if total_days > 0 else 1.0 wf_ann_ret = float((oos_eq.iloc[-1] / self.capital) ** (1 / max(n_yrs_wf, 0.01)) - 1.0) cvar_alpha = self.cfg.get('cvar_alpha', 0.95) rolling_var = -oos_port_rets.rolling(window=self.trading_days).quantile(1 - cvar_alpha).bfill().values var_results = christoffersen_test(oos_rets_arr, rolling_var, target_alpha=round(1.0 - cvar_alpha, 2)) sim_state = PortfolioState.empty(self.final_tickers) temp_macro = {"hmm_regime": self.regime_info} if self.regime_info else {} opt_res_cv = build_and_optimize( returns_df.iloc[:self.OOS_TRAIN_DAYS], bench_rets.iloc[:self.OOS_TRAIN_DAYS], self.risk_input, self.risk_factor, sim_state, self.cfg, self.model, self.allocation_engine, self.ff_df, spread_map=self.spread_map, macro=temp_macro, silent=True, opt_rets_df=returns_df.iloc[:self.OOS_TRAIN_DAYS], opt_spy_rets=bench_rets.iloc[:self.OOS_TRAIN_DAYS], opt_ff_df=self.ff_df ) oos_w_risky = opt_res_cv.weights.drop(labels=['CASH'], errors='ignore') oos_cash_w = float(opt_res_cv.weights.get('CASH', 0.0)) rfr_scalar = self.rfr.mean() if isinstance(self.rfr, pd.Series) else self.rfr oos_opt_ret = float(oos_w_risky @ opt_res_cv.expected_returns.reindex(oos_w_risky.index).fillna(0.0)) + (oos_cash_w * rfr_scalar) naive_exp_rets = returns_df.iloc[:self.OOS_TRAIN_DAYS].mean() * self.trading_days naive_opt_ret = float(oos_w_risky @ naive_exp_rets.reindex(oos_w_risky.index).fillna(0.0)) + (oos_cash_w * rfr_scalar) pred_model = np.full(len(oos_rets_arr), oos_opt_ret / self.trading_days) pred_naive = np.full(len(oos_rets_arr), naive_opt_ret / self.trading_days) dm_results = diebold_mariano_test(oos_rets_arr, pred_model, pred_naive, h=1, loss_type='MAE') dm_results['winner'] = f"{MODEL_NAMES.get(self.model).split(' ')[0]}" if dm_results['winner'] == "Model 1" else "Naive Mean" psr_results = probabilistic_sharpe_ratio(oos_rets_arr, benchmark_sharpe=0.0, periods=self.trading_days) dsr_results = deflated_sharpe_ratio(oos_rets_arr, num_trials=len(MODEL_NAMES), variance_of_trials=0.5, periods=self.trading_days) print_validation_report(dm_results, var_results, psr_results, dsr_results, model_name=f"{MODEL_NAMES.get(self.model).split(' ')[0]}") return ValidationBundle(oos_eq, oos_bench_curve, oos_port_rets, wf_ann_ret, var_results, dm_results, psr_results, dsr_results) def optimize(self) -> OptimizationBundle: returns_df = self.data_bundle["returns_df"] bench_rets = self.data_bundle["bench_rets"] raw = self.data_bundle["raw"] temp_macro = {"hmm_regime": self.regime_info} if self.regime_info else {} opt_res = build_and_optimize( returns_df, bench_rets, self.risk_input, self.risk_factor, self.master_state, self.cfg, self.model, self.allocation_engine, self.ff_df, spread_map=self.spread_map, macro=temp_macro, silent=False, opt_rets_df=self.opt_returns_df, opt_spy_rets=self.bench_rets_monthly, opt_ff_df=self.opt_ff_df ) weights = opt_res.weights exp_rets = opt_res.expected_returns cov_mat = opt_res.covariance_matrix vol = opt_res.volatility corr_matrix = opt_res.correlation_matrix betas = opt_res.betas model_info = opt_res.model_info sens_report = portfolio_sensitivity(weights, returns_df, bench_rets, exp_rets, cov_mat, self.risk_factor, self.risk_input, self.cfg, betas, self.spread_map) stress_report = portfolio_stress_test(weights, returns_df, raw, betas) stab_spreads = np.array([sens_report.get(t, {}).get('spread', 0.0) for t in returns_df.columns], dtype=float) fragile_mask = stab_spreads > 0.15 n_fragile = int(fragile_mask.sum()) if n_fragile > 0 and self.allocation_engine == 1: self.cfg['_stability_spreads'] = stab_spreads.tolist() self.cfg['_stab_lambda'] = float(self.risk_factor * 0.5 * (n_fragile / len(stab_spreads))) opt_res_fragile = build_and_optimize( returns_df, bench_rets, self.risk_input, self.risk_factor, self.master_state, self.cfg, self.model, self.allocation_engine, self.ff_df, spread_map=self.spread_map, macro=temp_macro, silent=False, opt_rets_df=self.opt_returns_df, opt_spy_rets=self.bench_rets_monthly, opt_ff_df=self.opt_ff_df ) weights, exp_rets, cov_mat = opt_res_fragile.weights, opt_res_fragile.expected_returns, opt_res_fragile.covariance_matrix vol, corr_matrix, betas = opt_res_fragile.volatility, opt_res_fragile.correlation_matrix, opt_res_fragile.betas model_info = opt_res_fragile.model_info self.cfg['_stab_lambda'] = 0.0 return OptimizationBundle(weights, exp_rets, cov_mat, vol, corr_matrix, betas, model_info, sens_report, stress_report, n_fragile) def generate_reports(self, val: ValidationBundle, opt: OptimizationBundle) -> None: prices = self.data_bundle["prices"] raw = self.data_bundle["raw"] returns_df = self.data_bundle["returns_df"] w_risky = opt.weights.drop(labels=['CASH'], errors='ignore') mvar_series = marginal_var(w_risky, opt.cov_mat, alpha=0.95) if 'CASH' in opt.weights: mvar_series['CASH'] = 0.0 c_cvar, t_cvar = cvar_attribution(w_risky, self.display_df, alpha=0.95) _, s_vol = stress_correlation(w_risky, opt.cov_mat, shock_corr=0.30) factor_exposures = factor_exposure(w_risky, opt.model_info['ff_betas']) if self.model == 4 and 'ff_betas' in opt.model_info else None macro_series = [] for label, key in [(self.data_bundle["eq_bench"], self.data_bundle["eq_bench"]), ("VIX_PROXY", self.data_bundle["vol_bench"]), ("RFR_PROXY", self.data_bundle["rfr_bench"])]: if key in raw and label not in returns_df.columns: macro_series.append(raw[key].pct_change().rename(label)) corr_matrix_html = pd.concat([self.display_df] + macro_series, axis=1, sort=False).dropna().corr() if macro_series else opt.corr_matrix if 'CASH' in corr_matrix_html.columns: corr_matrix_html = corr_matrix_html.drop(index=['CASH'], columns=['CASH'], errors='ignore') equity, bench_curve_full, port_rets, bt_stats = backtest(self.display_df, opt.weights, self.capital, self.rfr, self.bench_display, self.spread_map, self.cfg, state=self.master_state, betas=opt.betas) macro = build_macro(prices, raw, self.rfr, self.display_df, opt.weights.values, self.vol_raw, self.cfg) if self.regime_info: macro["hmm_regime"] = self.regime_info mc_paths, mc_stats = monte_carlo(opt.weights, opt.exp_rets, opt.cov_mat, self.capital, self.cfg, macro, seed=42) diags = behavioral_diagnostics(opt.weights, self.display_df, opt.cov_mat, self.risk_input, bt_stats["max_dd"]) overlay_html = "" if self.cfg.get("with_futures", False): overlay_result = optimize_futures_overlay(opt.weights, opt.betas, self.capital, self.cfg, equity_returns=returns_df, prices=prices) if overlay_result.cash_reserve < 0: opt.weights = _force_liquidate_for_margin(opt.weights, prices, overlay_result.cash_reserve, self.capital, self.cfg) contract_list = ", ".join(f"{v:+d} {k}" for k, v in overlay_result.contracts.items()) if overlay_result.contracts else "None" overlay_html = f'

Futures Overlay

Contracts
{contract_list}
' export_csv(opt.weights, opt.exp_rets, opt.vol, prices, self.capital, opt.betas, self.spread_map, self.cfg, mvar_series=mvar_series, cvar_components=(c_cvar, t_cvar), factor_exp=factor_exposures, tax_meta={}) if self.cfg.get("export_excel", False): export_excel(opt.weights, opt.exp_rets, opt.vol, prices, self.capital, opt.betas, self.spread_map, self.cfg, mvar_series=mvar_series, cvar_components=(c_cvar, t_cvar), factor_exp=factor_exposures, tax_meta={}) save_portfolio_state_dict(opt.weights, prices, self.capital, self.legacy_state_dict) curr_w_series, current_stats = None, None if self.current_weights_raw: ok = {t: w for t, w in self.current_weights_raw.items() if t in returns_df.columns} if ok: tot = sum(ok.values()) curr_w_series = pd.Series({t: w/tot for t, w in ok.items()}, dtype=float).reindex(returns_df.columns).fillna(0.0) curr_exp_ret = float(curr_w_series @ opt.exp_rets) curr_vol_val = float(np.sqrt(curr_w_series @ opt.cov_mat.values @ curr_w_series)) rfr_scalar = self.rfr.iloc[-1] if isinstance(self.rfr, pd.Series) else self.rfr curr_sr = israelsen_sharpe(curr_exp_ret - rfr_scalar, curr_vol_val) curr_bt_full = backtest(self.display_df, curr_w_series, self.capital, self.rfr, self.bench_display, self.spread_map, self.cfg, state=self.master_state, betas=opt.betas) _, curr_mc_stats = monte_carlo(curr_w_series, opt.exp_rets, opt.cov_mat, self.capital, self.cfg, macro, seed=42) current_stats = {"exp_ret": curr_exp_ret, "exp_vol": curr_vol_val, "exp_sr": curr_sr, "beta": float(curr_w_series @ opt.betas), "bt": curr_bt_full, "mc": curr_mc_stats} generate_html_report( opt.weights, opt.exp_rets, opt.cov_mat, opt.vol, corr_matrix_html, opt.betas, equity, bench_curve_full, port_rets, val.oos_eq, val.oos_bench_curve, mc_paths, mc_stats, bt_stats, None, self.capital, self.cfg, prices, macro, opt.model_info, self.spread_map, opt.sens_report, opt.stress_report, diags=diags, tax_meta={}, tn_ratio=self.tn_ratio, n_fragile=opt.n_fragile, train_yrs=self.train_yrs, test_yrs=self.test_yrs, returns_df=self.display_df, chartjs_js=self.chartjs_js, current_weights=curr_w_series, current_stats=current_stats, risk_input=self.risk_input, mvar_series=mvar_series, cvar_components=(c_cvar, t_cvar), stressed_vol=s_vol, factor_exp=factor_exposures, regime_info=self.regime_info, risk_adj=self.risk_adj, dm_results=val.dm_results, var_results=val.var_results, overlay_html=overlay_html ) if self.cfg.get('_serve', True): serve_report(block=not bool(self.overrides)) def run_engine(overrides=None, serve=True, preview_only=False): """ Main orchestration logic decomposed into a Pipeline pattern. """ pipeline = PortfolioPipeline(overrides=overrides) pipeline.cfg['_serve'] = serve pipeline.load_data() if preview_only: # In preview mode, skip validation and report generation opt_bundle = pipeline.optimize() return { "target_weights": opt_bundle.weights.to_dict(), "expected_returns": opt_bundle.exp_rets.to_dict(), "volatility": opt_bundle.vol, "prices": pipeline.data_bundle["prices"], "efficient_frontier": opt_bundle.model_info.get('ef_curve', {"vols": [], "rets": []}) } val_bundle = pipeline.run_validation() opt_bundle = pipeline.optimize() pipeline.generate_reports(val_bundle, opt_bundle) # Return useful attributes for testing/api downstream hooks return { "target_weights": opt_bundle.weights.to_dict(), "expected_returns": opt_bundle.exp_rets.to_dict(), "volatility": opt_bundle.vol, "prices": pipeline.data_bundle["prices"] } if __name__ == "__main__": try: run_engine() except KeyboardInterrupt: print(f"\n{Color.YELLOW}Interrupted.{Color.RESET}") except SystemExit as e: print(e) except Exception as e: import traceback traceback.print_exc() print(f"\nFatal error during headless execution: {e}")