math-backend / core_engine.py
engineportf's picture
Upload folder using huggingface_hub
558db1e verified
Raw
History Blame Contribute Delete
28.1 kB
import os
import sys
if sys.stdout.encoding.lower() != 'utf-8':
sys.stdout.reconfigure(encoding='utf-8')
import time
import json
import pandas as pd
import numpy as np
from dataclasses import dataclass, field
from typing import Dict, Any, List
# ─────────────────────────────────────────────
# IMPORT OUR CUSTOM MODULES
# ─────────────────────────────────────────────
from config import load_config, save_config, Color, hr, MODEL_NAMES, SPREAD_BY_SECTOR, COST_BASIS_FILE, logger, OUTPUT_DIR
from core_types import PortfolioState
from data import fetch_risk_free_rate, fetch_fama_french_factors, fetch_data, fetch_risk_free_series, build_monthly_returns
from solver import build_and_optimize
from analytics import (
portfolio_sensitivity, portfolio_stress_test, backtest,
behavioral_diagnostics, build_macro
)
from utils.metrics import portfolio_gross_metrics, israelsen_sharpe
from backtest import (
monte_carlo, expanding_window_backtest
)
from report import _ensure_chartjs, generate_html_report
from exports import export_csv, export_excel
from server import serve_report
from futures_overlay import optimize_futures_overlay
from overlay_analytics import aggregate_overlay_returns, simulate_margin_calls
# Advanced Quant Modules
from risk_attribution import factor_exposure, marginal_var, cvar_attribution, stress_correlation
from regime_detection import detect_volatility_regime, dynamic_risk_aversion
from validation import (
christoffersen_test,
diebold_mariano_test,
probabilistic_sharpe_ratio,
deflated_sharpe_ratio,
print_validation_report
)
# Unified Database Access
from database import get_pg_engine
# ─────────────────────────────────────────────────────────────────
# DYNAMIC TICKER UNIVERSE & CLI WIZARD
# ─────────────────────────────────────────────────────────────────
def _print_universe(cfg):
print(f"\n{Color.DIM} ┌─ Available symbols by asset class ──────────────────────────────────────────┐")
categories = cfg.get("universe_categories", {})
if not categories:
categories = {
"Core Equities": ["SPY", "QQQ", "DIA", "IWM"],
"Bonds & Rates": ["TLT", "IEF", "SHY", "AGG"],
"Tech & Growth": ["AAPL", "MSFT", "NVDA", "TSLA"],
}
for cat, tks in categories.items():
display_tks = tks[:12]
row = " ".join(f"{t:<11}" for t in display_tks)
print(f" │ {Color.CYAN}{cat:<20}{Color.RESET}{Color.DIM} {row}")
print(f" └──────────────────────────────────────────────────────────────────────────┘{Color.RESET}")
def _section(title, color=Color.CYAN):
print(f"\n{color}{Color.BOLD}{chr(8212)*58}")
print(f" {title}")
print(f"{chr(8212)*58}{Color.RESET}")
def setup_portfolio(cfg):
risk_map = {1:0.1,2:0.5,3:1.0,4:2.0,5:3.0,6:5.0,7:7.5,8:10.0,9:15.0,10:25.0}
try: os.system("cls" if os.name == "nt" else "clear")
except Exception: pass
print(f"\n{Color.BOLD}{Color.MAGENTA}╔══════════════════════════════════════════════════════╗")
print("║ QUANTITATIVE PORTFOLIO BUILDER v8.0 ║")
print("║ Global Institutional Optimization Engine ║")
print(f"╚══════════════════════════════════════════════════════╝{Color.RESET}")
_section("STEP 1 OF 5 — Regional & Market Settings")
curr_in = input(f" {Color.BOLD}Base Currency Symbol [{cfg.get('currency_symbol', '$')}]:{Color.RESET} ").strip()
if curr_in: cfg['currency_symbol'] = curr_in
days_in = input(f" {Color.BOLD}Trading Days per Year [{cfg.get('trading_days_per_year', 252)}]:{Color.RESET} ").strip()
if days_in.isdigit(): cfg['trading_days_per_year'] = int(days_in)
_section("STEP 2 OF 5 — Your Current Portfolio")
_print_universe(cfg)
current_weights_raw = {}
while True:
try: line = input(f" {Color.BOLD}>{Color.RESET} ").strip().upper()
except (EOFError, KeyboardInterrupt): break
if not line: break
parts = line.replace(",", " ").split()
if len(parts) >= 2:
try:
t, w = parts[0], float(parts[1].replace("%", ""))
current_weights_raw[t] = w
except ValueError: pass
if current_weights_raw:
total_w = sum(current_weights_raw.values())
if total_w > 0: current_weights_raw = {t: w/total_w for t, w in current_weights_raw.items()}
for t in current_weights_raw:
if t not in cfg.get("sector_map", {}): cfg.setdefault("sector_map", {})[t] = "Other"
tickers = list(current_weights_raw.keys())
else:
raw = input(f" {Color.BOLD}Tickers (comma-separated):{Color.RESET} ").upper().replace(" ", "")
tickers = [t for t in raw.split(",") if t] or ["SPY", "TLT", "GLD"]
for t in tickers:
if t not in cfg.get("sector_map", {}): cfg.setdefault("sector_map", {})[t] = "Other"
try: capital = float(input(f" {Color.BOLD}Portfolio value:{Color.RESET} ").replace(",", ""))
except ValueError: capital = 100_000.0
_section("STEP 3 OF 5 — Risk Aversion")
try:
ri = int(input(f"\n {Color.BOLD}Risk Aversion (1-10):{Color.RESET} "))
if ri not in risk_map: raise ValueError
risk_input, risk_factor = ri, risk_map[ri]
except ValueError: risk_input, risk_factor = 5, 3.0
_section("STEP 4 OF 5 — Expected Return Model")
try:
mi_in = int(input(f" {Color.BOLD}Return Model (1-5):{Color.RESET} "))
if mi_in not in MODEL_NAMES: raise ValueError
model = mi_in
except ValueError: model = 1
try: ae_in = int(input(f" {Color.BOLD}Allocation Engine (1-2):{Color.RESET} "))
except ValueError: ae_in = 1
allocation_engine = ae_in if ae_in in [1, 2] else 1
_section("STEP 5 OF 5 — Tax & Advanced Risk Constraints")
tax_lt = cfg.get("tax_rate_lt", 0.20)
tax_st = cfg.get("tax_rate_st", 0.35)
cfg["tax_rate_lt"], cfg["tax_rate_st"] = tax_lt, tax_st
cfg["tax_enabled"] = tax_lt > 0
cfg["_use_saved_basis"] = cfg["tax_enabled"] and os.path.exists(COST_BASIS_FILE)
allow_ss = False
if allow_ss:
cfg["single_asset_min"] = -0.30
cfg["gross_leverage_cap"] = 1.5
cfg["short_borrow_cost"] = 0.015
else:
cfg["single_asset_min"] = 0.0
cfg["gross_leverage_cap"] = 1.0
cfg["short_borrow_cost"] = 0.0
cfg["garch_enabled"] = True
cfg["cvar_enabled"] = True
return tickers, capital, risk_input, risk_factor, model, allocation_engine, current_weights_raw
def build_spread_map(tickers, sector_map):
return {t: SPREAD_BY_SECTOR.get(sector_map.get(t, "Other"), 0.0008) for t in tickers}
def load_portfolio_state_dict():
if os.path.exists(COST_BASIS_FILE):
try:
with open(COST_BASIS_FILE) as f: return json.load(f)
except Exception: pass
return {}
def save_portfolio_state_dict(weights, prices, capital, existing_state=None):
state = {k: dict(v) for k, v in (existing_state or {}).items() if not k.startswith('_')}
today = time.strftime('%Y-%m-%d')
for t, w in weights.items():
price = prices.get(t, 0.0)
if price <= 0: continue
new_alloc = capital * w
new_shares = new_alloc / price if abs(w) > 0.001 else 0.0
if new_shares < 0.0001:
state.pop(t, None)
continue
if t in state and state[t].get('shares', 0) > 0:
old_shares = state[t]['shares']
old_avg_cost = state[t]['avg_cost']
new_avg_cost = (old_shares * old_avg_cost + (new_shares - old_shares) * price) / new_shares if new_shares > old_shares else old_avg_cost
state[t].update({'avg_cost': round(new_avg_cost, 4), 'shares': round(new_shares, 6), 'last_updated': today})
else:
state[t] = {'avg_cost': round(price, 4), 'shares': round(new_shares, 6), 'purchase_date': today, 'last_updated': today}
state['_metadata'] = {
"disclaimer": "Note: Prices used here may be synthetic or stale based on the last backtest. Do not use for live accounting.",
"last_generated": today
}
with open(COST_BASIS_FILE, 'w') as f:
json.dump(state, f, indent=2)
return state
def _force_liquidate_for_margin(weights, prices, shortfall, capital, cfg):
if shortfall >= 0 or weights.empty: return weights
weights = weights.copy()
long_positions = sorted([t for t in weights.index if t != "CASH" and weights.get(t, 0.0) > 0], key=lambda t: -float(abs(weights.get(t, 0.0))))
remaining = -shortfall
for ticker in long_positions:
price = prices.get(ticker, 0.0)
if price <= 0: continue
max_reducible = float(weights[ticker]) * capital
sell_amount = min(max_reducible, remaining)
weight_reduction = sell_amount / capital
weights[ticker] = float(weights[ticker]) - weight_reduction
weights["CASH"] = float(weights.get("CASH", 0.0)) + weight_reduction
remaining -= sell_amount
if remaining <= 0: break
return weights
# ─────────────────────────────────────────────
# PIPELINE DATA STRUCTURES
# ─────────────────────────────────────────────
@dataclass
class ValidationBundle:
oos_eq: pd.Series
oos_bench_curve: pd.Series
oos_port_rets: pd.Series
wf_ann_ret: float
var_results: dict
dm_results: dict
psr_results: dict
dsr_results: dict
@dataclass
class OptimizationBundle:
weights: pd.Series
exp_rets: pd.Series
cov_mat: pd.DataFrame
vol: float
corr_matrix: pd.DataFrame
betas: pd.Series
model_info: dict
sens_report: dict
stress_report: dict
n_fragile: int
# ─────────────────────────────────────────────
# PIPELINE ORCHESTRATOR
# ─────────────────────────────────────────────
class PortfolioPipeline:
def __init__(self, overrides=None):
self.cfg = load_config()
self.overrides = overrides or {}
# Determine execution mode
if overrides:
logger.info("Executing in Headless Orchestrator Mode with overrides.")
self.tickers = self.overrides.get('tickers', ["SPY", "TLT", "GLD"])
self.capital = self.overrides.get('capital', 100000.0)
self.risk_input = self.overrides.get('risk_input', 5)
self.risk_factor = self.overrides.get('risk_factor', 3.0)
self.model = self.overrides.get('model', 1)
self.allocation_engine = self.overrides.get('allocation_engine', 1)
self.current_weights_raw = self.overrides.get('current_weights_raw', {})
for k, v in self.overrides.get('cfg_overrides', {}).items():
self.cfg[k] = v
for t in self.tickers + list(self.current_weights_raw.keys()):
if t not in self.cfg.get("sector_map", {}):
self.cfg.setdefault("sector_map", {})[t] = "Other"
else:
t, c, ri, rf, m, ae, cw = setup_portfolio(self.cfg)
self.tickers, self.capital, self.risk_input, self.risk_factor = t, c, ri, rf
self.model, self.allocation_engine, self.current_weights_raw = m, ae, cw
save_config(self.cfg)
self.chartjs_js = _ensure_chartjs()
self.trading_days = self.cfg.get("trading_days_per_year", 252)
# State populated through pipeline
self.data_bundle = {}
def load_data(self) -> None:
"""Fetches market data, sets up benchmarks and populates legacy state via DataRepository."""
from data_repository import DataRepository
repo = DataRepository(self.cfg)
snap = repo.fetch_all(self.tickers, self.model)
self.ff_df = snap.opt_ff_df
self.rfr = snap.rfr
self.spread_map = snap.spread_map
self.legacy_state_dict = snap.master_state.to_dict() if hasattr(snap.master_state, 'to_dict') else {}
self.vol_raw = snap.vol_raw
self.opt_tickers = snap.opt_tickers
self.opt_returns_df = snap.opt_returns_df
self.bench_rets_monthly = snap.bench_rets_monthly
self.opt_ff_df = snap.opt_ff_df
self.display_df = snap.display_df
self.bench_display = snap.bench_display
self.final_tickers = snap.master_state.tickers
self.master_state = snap.master_state
self.train_yrs = snap.train_yrs
self.OOS_TEST_DAYS = int(snap.test_yrs * self.trading_days)
self.OOS_TRAIN_DAYS = int(snap.train_yrs * self.trading_days)
self.test_yrs = snap.test_yrs
self.tn_ratio = (len(snap.opt_returns_df) if snap.opt_returns_df is not None else 0) / max(len(snap.opt_tickers), 1)
vix_current = float(self.vol_raw.iloc[-1]) if self.vol_raw is not None and not self.vol_raw.empty else 0.0
self.risk_adj = None
if self.cfg.get("dynamic_risk", True):
orig_ri, orig_rf = self.risk_input, self.risk_factor
self.risk_input, self.risk_factor = dynamic_risk_aversion(vix_current, orig_ri, orig_rf, silent=False)
self.risk_adj = {"original_input": orig_ri, "adjusted_input": self.risk_input, "vix_val": vix_current}
self.regime_info = detect_volatility_regime(snap.bench_rets, cfg=self.cfg, silent=False) if self.cfg.get("hmm_regime", True) else None
self.data_bundle = {
"returns_df": snap.returns_df, "bench_rets": snap.bench_rets,
"raw": snap.raw, "prices": snap.prices, "eq_bench": snap.eq_bench,
"vol_bench": snap.vol_bench, "rfr_bench": snap.rfr_bench
}
def run_validation(self) -> ValidationBundle:
returns_df = self.data_bundle["returns_df"]
bench_rets = self.data_bundle["bench_rets"]
reb_freq = int(self.trading_days / 4)
self.cfg['_risk_input'] = self.risk_input
self.cfg['_risk_factor'] = self.risk_factor
oos_eq, oos_bench_curve = expanding_window_backtest(
returns_df, bench_rets, self.capital, self.rfr, self.cfg, self.model, self.allocation_engine,
self.spread_map, initial_train_days=self.OOS_TRAIN_DAYS, rebalance_freq=reb_freq, ff_df=self.ff_df
)
oos_port_rets = oos_eq.pct_change().dropna()
oos_rets_arr = oos_port_rets.values
total_days = len(oos_rets_arr)
n_yrs_wf = total_days / self.trading_days if total_days > 0 else 1.0
wf_ann_ret = float((oos_eq.iloc[-1] / self.capital) ** (1 / max(n_yrs_wf, 0.01)) - 1.0)
cvar_alpha = self.cfg.get('cvar_alpha', 0.95)
rolling_var = -oos_port_rets.rolling(window=self.trading_days).quantile(1 - cvar_alpha).bfill().values
var_results = christoffersen_test(oos_rets_arr, rolling_var, target_alpha=round(1.0 - cvar_alpha, 2))
sim_state = PortfolioState.empty(self.final_tickers)
temp_macro = {"hmm_regime": self.regime_info} if self.regime_info else {}
opt_res_cv = build_and_optimize(
returns_df.iloc[:self.OOS_TRAIN_DAYS], bench_rets.iloc[:self.OOS_TRAIN_DAYS],
self.risk_input, self.risk_factor, sim_state, self.cfg, self.model, self.allocation_engine,
self.ff_df, spread_map=self.spread_map, macro=temp_macro, silent=True,
opt_rets_df=returns_df.iloc[:self.OOS_TRAIN_DAYS], opt_spy_rets=bench_rets.iloc[:self.OOS_TRAIN_DAYS], opt_ff_df=self.ff_df
)
oos_w_risky = opt_res_cv.weights.drop(labels=['CASH'], errors='ignore')
oos_cash_w = float(opt_res_cv.weights.get('CASH', 0.0))
rfr_scalar = self.rfr.mean() if isinstance(self.rfr, pd.Series) else self.rfr
oos_opt_ret = float(oos_w_risky @ opt_res_cv.expected_returns.reindex(oos_w_risky.index).fillna(0.0)) + (oos_cash_w * rfr_scalar)
naive_exp_rets = returns_df.iloc[:self.OOS_TRAIN_DAYS].mean() * self.trading_days
naive_opt_ret = float(oos_w_risky @ naive_exp_rets.reindex(oos_w_risky.index).fillna(0.0)) + (oos_cash_w * rfr_scalar)
pred_model = np.full(len(oos_rets_arr), oos_opt_ret / self.trading_days)
pred_naive = np.full(len(oos_rets_arr), naive_opt_ret / self.trading_days)
dm_results = diebold_mariano_test(oos_rets_arr, pred_model, pred_naive, h=1, loss_type='MAE')
dm_results['winner'] = f"{MODEL_NAMES.get(self.model).split(' ')[0]}" if dm_results['winner'] == "Model 1" else "Naive Mean"
psr_results = probabilistic_sharpe_ratio(oos_rets_arr, benchmark_sharpe=0.0, periods=self.trading_days)
dsr_results = deflated_sharpe_ratio(oos_rets_arr, num_trials=len(MODEL_NAMES), variance_of_trials=0.5, periods=self.trading_days)
print_validation_report(dm_results, var_results, psr_results, dsr_results, model_name=f"{MODEL_NAMES.get(self.model).split(' ')[0]}")
return ValidationBundle(oos_eq, oos_bench_curve, oos_port_rets, wf_ann_ret, var_results, dm_results, psr_results, dsr_results)
def optimize(self) -> OptimizationBundle:
returns_df = self.data_bundle["returns_df"]
bench_rets = self.data_bundle["bench_rets"]
raw = self.data_bundle["raw"]
temp_macro = {"hmm_regime": self.regime_info} if self.regime_info else {}
opt_res = build_and_optimize(
returns_df, bench_rets, self.risk_input, self.risk_factor, self.master_state, self.cfg,
self.model, self.allocation_engine, self.ff_df, spread_map=self.spread_map, macro=temp_macro, silent=False,
opt_rets_df=self.opt_returns_df, opt_spy_rets=self.bench_rets_monthly, opt_ff_df=self.opt_ff_df
)
weights = opt_res.weights
exp_rets = opt_res.expected_returns
cov_mat = opt_res.covariance_matrix
vol = opt_res.volatility
corr_matrix = opt_res.correlation_matrix
betas = opt_res.betas
model_info = opt_res.model_info
sens_report = portfolio_sensitivity(weights, returns_df, bench_rets, exp_rets, cov_mat, self.risk_factor, self.risk_input, self.cfg, betas, self.spread_map)
stress_report = portfolio_stress_test(weights, returns_df, raw, betas)
stab_spreads = np.array([sens_report.get(t, {}).get('spread', 0.0) for t in returns_df.columns], dtype=float)
fragile_mask = stab_spreads > 0.15
n_fragile = int(fragile_mask.sum())
if n_fragile > 0 and self.allocation_engine == 1:
self.cfg['_stability_spreads'] = stab_spreads.tolist()
self.cfg['_stab_lambda'] = float(self.risk_factor * 0.5 * (n_fragile / len(stab_spreads)))
opt_res_fragile = build_and_optimize(
returns_df, bench_rets, self.risk_input, self.risk_factor, self.master_state, self.cfg,
self.model, self.allocation_engine, self.ff_df, spread_map=self.spread_map, macro=temp_macro, silent=False,
opt_rets_df=self.opt_returns_df, opt_spy_rets=self.bench_rets_monthly, opt_ff_df=self.opt_ff_df
)
weights, exp_rets, cov_mat = opt_res_fragile.weights, opt_res_fragile.expected_returns, opt_res_fragile.covariance_matrix
vol, corr_matrix, betas = opt_res_fragile.volatility, opt_res_fragile.correlation_matrix, opt_res_fragile.betas
model_info = opt_res_fragile.model_info
self.cfg['_stab_lambda'] = 0.0
return OptimizationBundle(weights, exp_rets, cov_mat, vol, corr_matrix, betas, model_info, sens_report, stress_report, n_fragile)
def generate_reports(self, val: ValidationBundle, opt: OptimizationBundle) -> None:
prices = self.data_bundle["prices"]
raw = self.data_bundle["raw"]
returns_df = self.data_bundle["returns_df"]
w_risky = opt.weights.drop(labels=['CASH'], errors='ignore')
mvar_series = marginal_var(w_risky, opt.cov_mat, alpha=0.95)
if 'CASH' in opt.weights: mvar_series['CASH'] = 0.0
c_cvar, t_cvar = cvar_attribution(w_risky, self.display_df, alpha=0.95)
_, s_vol = stress_correlation(w_risky, opt.cov_mat, shock_corr=0.30)
factor_exposures = factor_exposure(w_risky, opt.model_info['ff_betas']) if self.model == 4 and 'ff_betas' in opt.model_info else None
macro_series = []
for label, key in [(self.data_bundle["eq_bench"], self.data_bundle["eq_bench"]), ("VIX_PROXY", self.data_bundle["vol_bench"]), ("RFR_PROXY", self.data_bundle["rfr_bench"])]:
if key in raw and label not in returns_df.columns:
macro_series.append(raw[key].pct_change().rename(label))
corr_matrix_html = pd.concat([self.display_df] + macro_series, axis=1, sort=False).dropna().corr() if macro_series else opt.corr_matrix
if 'CASH' in corr_matrix_html.columns: corr_matrix_html = corr_matrix_html.drop(index=['CASH'], columns=['CASH'], errors='ignore')
equity, bench_curve_full, port_rets, bt_stats = backtest(self.display_df, opt.weights, self.capital, self.rfr, self.bench_display, self.spread_map, self.cfg, state=self.master_state, betas=opt.betas)
macro = build_macro(prices, raw, self.rfr, self.display_df, opt.weights.values, self.vol_raw, self.cfg)
if self.regime_info: macro["hmm_regime"] = self.regime_info
mc_paths, mc_stats = monte_carlo(opt.weights, opt.exp_rets, opt.cov_mat, self.capital, self.cfg, macro, seed=42)
diags = behavioral_diagnostics(opt.weights, self.display_df, opt.cov_mat, self.risk_input, bt_stats["max_dd"])
overlay_html = ""
if self.cfg.get("with_futures", False):
overlay_result = optimize_futures_overlay(opt.weights, opt.betas, self.capital, self.cfg, equity_returns=returns_df, prices=prices)
if overlay_result.cash_reserve < 0:
opt.weights = _force_liquidate_for_margin(opt.weights, prices, overlay_result.cash_reserve, self.capital, self.cfg)
contract_list = ", ".join(f"{v:+d} {k}" for k, v in overlay_result.contracts.items()) if overlay_result.contracts else "None"
overlay_html = f'<div class="card"><h3>Futures Overlay</h3><div class="mg"><div class="mc"><div class="ml">Contracts</div><div class="mv">{contract_list}</div></div></div></div>'
export_csv(opt.weights, opt.exp_rets, opt.vol, prices, self.capital, opt.betas, self.spread_map, self.cfg,
mvar_series=mvar_series, cvar_components=(c_cvar, t_cvar), factor_exp=factor_exposures, tax_meta={})
if self.cfg.get("export_excel", False):
export_excel(opt.weights, opt.exp_rets, opt.vol, prices, self.capital, opt.betas, self.spread_map, self.cfg,
mvar_series=mvar_series, cvar_components=(c_cvar, t_cvar), factor_exp=factor_exposures, tax_meta={})
save_portfolio_state_dict(opt.weights, prices, self.capital, self.legacy_state_dict)
curr_w_series, current_stats = None, None
if self.current_weights_raw:
ok = {t: w for t, w in self.current_weights_raw.items() if t in returns_df.columns}
if ok:
tot = sum(ok.values())
curr_w_series = pd.Series({t: w/tot for t, w in ok.items()}, dtype=float).reindex(returns_df.columns).fillna(0.0)
curr_exp_ret = float(curr_w_series @ opt.exp_rets)
curr_vol_val = float(np.sqrt(curr_w_series @ opt.cov_mat.values @ curr_w_series))
rfr_scalar = self.rfr.iloc[-1] if isinstance(self.rfr, pd.Series) else self.rfr
curr_sr = israelsen_sharpe(curr_exp_ret - rfr_scalar, curr_vol_val)
curr_bt_full = backtest(self.display_df, curr_w_series, self.capital, self.rfr, self.bench_display, self.spread_map, self.cfg, state=self.master_state, betas=opt.betas)
_, curr_mc_stats = monte_carlo(curr_w_series, opt.exp_rets, opt.cov_mat, self.capital, self.cfg, macro, seed=42)
current_stats = {"exp_ret": curr_exp_ret, "exp_vol": curr_vol_val, "exp_sr": curr_sr, "beta": float(curr_w_series @ opt.betas), "bt": curr_bt_full, "mc": curr_mc_stats}
generate_html_report(
opt.weights, opt.exp_rets, opt.cov_mat, opt.vol, corr_matrix_html, opt.betas,
equity, bench_curve_full, port_rets, val.oos_eq, val.oos_bench_curve,
mc_paths, mc_stats, bt_stats, None,
self.capital, self.cfg, prices, macro, opt.model_info,
self.spread_map, opt.sens_report, opt.stress_report,
diags=diags, tax_meta={},
tn_ratio=self.tn_ratio, n_fragile=opt.n_fragile,
train_yrs=self.train_yrs, test_yrs=self.test_yrs,
returns_df=self.display_df, chartjs_js=self.chartjs_js,
current_weights=curr_w_series, current_stats=current_stats,
risk_input=self.risk_input, mvar_series=mvar_series,
cvar_components=(c_cvar, t_cvar), stressed_vol=s_vol,
factor_exp=factor_exposures, regime_info=self.regime_info,
risk_adj=self.risk_adj, dm_results=val.dm_results,
var_results=val.var_results, overlay_html=overlay_html
)
if self.cfg.get('_serve', True):
serve_report(block=not bool(self.overrides))
def run_engine(overrides=None, serve=True, preview_only=False):
"""
Main orchestration logic decomposed into a Pipeline pattern.
"""
pipeline = PortfolioPipeline(overrides=overrides)
pipeline.cfg['_serve'] = serve
pipeline.load_data()
if preview_only:
# In preview mode, skip validation and report generation
opt_bundle = pipeline.optimize()
return {
"target_weights": opt_bundle.weights.to_dict(),
"expected_returns": opt_bundle.exp_rets.to_dict(),
"volatility": opt_bundle.vol,
"prices": pipeline.data_bundle["prices"],
"efficient_frontier": opt_bundle.model_info.get('ef_curve', {"vols": [], "rets": []})
}
val_bundle = pipeline.run_validation()
opt_bundle = pipeline.optimize()
pipeline.generate_reports(val_bundle, opt_bundle)
# Return useful attributes for testing/api downstream hooks
return {
"target_weights": opt_bundle.weights.to_dict(),
"expected_returns": opt_bundle.exp_rets.to_dict(),
"volatility": opt_bundle.vol,
"prices": pipeline.data_bundle["prices"]
}
if __name__ == "__main__":
try:
run_engine()
except KeyboardInterrupt:
print(f"\n{Color.YELLOW}Interrupted.{Color.RESET}")
except SystemExit as e:
print(e)
except Exception as e:
import traceback
traceback.print_exc()
print(f"\nFatal error during headless execution: {e}")