Portfolio-Risk-Platform / backend /services /scenario_engine.py
sheikhkmmtahmid's picture
Initial commit: ML-Powered Portfolio Stress Testing Platform
031a2d6
from __future__ import annotations
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional
import numpy as np
import pandas as pd
def configure_logging(level: int = logging.INFO) -> None:
logging.basicConfig(
level=level,
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
)
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class ScenarioDefinition:
name: str
description: str
additive_shocks: Dict[str, float]
multiplicative_shocks: Dict[str, float]
rules: Dict[str, Any]
class ScenarioEngine:
"""
Phase 4 deterministic scenario engine.
Input:
data/features/features_monthly_full_history.csv
Outputs:
data/scenarios/scenario_dataset.csv
data/scenarios/scenario_summary.csv
"""
def __init__(
self,
input_path: str | Path,
output_dir: str | Path,
scenario_definitions: Optional[List[ScenarioDefinition]] = None,
) -> None:
self.input_path = Path(input_path)
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.scenario_definitions = (
scenario_definitions if scenario_definitions is not None else self._default_scenarios()
)
self.df: Optional[pd.DataFrame] = None
def run(self) -> tuple[pd.DataFrame, pd.DataFrame]:
logger.info("Starting Phase 4 scenario engine")
self.df = self._load_input_data()
scenario_frames: List[pd.DataFrame] = []
for scenario in self.scenario_definitions:
logger.info("Applying scenario: %s", scenario.name)
shocked = self._apply_scenario(self.df.copy(deep=True), scenario)
scenario_frames.append(shocked)
scenario_dataset = pd.concat(scenario_frames, axis=0, ignore_index=True)
scenario_dataset = self._finalise_output_columns(scenario_dataset)
scenario_summary = self._build_summary(scenario_dataset)
scenario_dataset_path = self.output_dir / "scenario_dataset.csv"
scenario_summary_path = self.output_dir / "scenario_summary.csv"
scenario_dataset.to_csv(scenario_dataset_path, index=False)
scenario_summary.to_csv(scenario_summary_path, index=False)
logger.info("Scenario dataset saved to: %s", scenario_dataset_path)
logger.info("Scenario summary saved to: %s", scenario_summary_path)
return scenario_dataset, scenario_summary
def _load_input_data(self) -> pd.DataFrame:
if not self.input_path.exists():
raise FileNotFoundError(f"Input dataset not found: {self.input_path}")
df = pd.read_csv(self.input_path)
if df.empty:
raise ValueError("Input dataset is empty")
if "date" not in df.columns:
raise ValueError("Input dataset must contain a 'date' column")
df["date"] = pd.to_datetime(df["date"], errors="coerce")
if df["date"].isna().any():
raise ValueError("Some rows contain invalid dates")
df = df.sort_values("date").reset_index(drop=True)
logger.info("Loaded input data with shape: %s", df.shape)
logger.info("Date range: %s to %s", df["date"].min(), df["date"].max())
logger.info("Columns: %s", list(df.columns))
return df
def _apply_scenario(self, df: pd.DataFrame, scenario: ScenarioDefinition) -> pd.DataFrame:
df["scenario_name"] = scenario.name
df["scenario_description"] = scenario.description
original_df = df.copy(deep=True)
self._apply_additive_shocks(df, scenario.additive_shocks)
self._apply_multiplicative_shocks(df, scenario.multiplicative_shocks)
self._apply_correlated_rules(df, original_df, scenario.rules)
self._recompute_dependent_fields(df)
self._clip_ranges(df)
self._add_delta_columns(df, original_df)
return df
def _apply_additive_shocks(self, df: pd.DataFrame, shocks: Dict[str, float]) -> None:
for col, shock in shocks.items():
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce") + shock
def _apply_multiplicative_shocks(self, df: pd.DataFrame, shocks: Dict[str, float]) -> None:
for col, shock in shocks.items():
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce") * (1.0 + shock)
def _apply_correlated_rules(
self,
df: pd.DataFrame,
original_df: pd.DataFrame,
rules: Dict[str, Any],
) -> None:
if rules.get("propagate_rates_to_risk", False):
self._propagate_rates_to_risk(df, original_df)
if rules.get("propagate_inflation_to_rates", False):
self._propagate_inflation_to_rates(df, original_df)
if rules.get("propagate_vix_to_volatility", False):
self._propagate_vix_to_volatility(df, original_df)
if rules.get("propagate_credit_to_risk", False):
self._propagate_credit_to_risk(df, original_df)
if rules.get("propagate_usd_squeeze", False):
self._propagate_usd_squeeze(df)
if rules.get("propagate_systemic_crisis", False):
self._propagate_systemic_crisis(df)
if rules.get("propagate_growth_scare", False):
self._propagate_growth_scare(df)
def _propagate_rates_to_risk(self, df: pd.DataFrame, original_df: pd.DataFrame) -> None:
delta_2y = (
(pd.to_numeric(df["us2y_yield"], errors="coerce") - pd.to_numeric(original_df["us2y_yield"], errors="coerce"))
if "us2y_yield" in df.columns else 0.0
)
delta_10y = (
(pd.to_numeric(df["us10y_yield"], errors="coerce") - pd.to_numeric(original_df["us10y_yield"], errors="coerce"))
if "us10y_yield" in df.columns else 0.0
)
combined_rate_move = delta_2y + delta_10y
scaled = combined_rate_move / 0.02 # 2 x 100bps total reference move
for col in ["spx_vol_3m", "spx_vol_6m", "ndx_vol_3m", "ndx_vol_6m"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce") * (1.0 + 0.08 * scaled)
for col in ["spx_return", "ndx_return"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce") - (0.010 * scaled)
if "gold_return" in df.columns:
df["gold_return"] = pd.to_numeric(df["gold_return"], errors="coerce") - (0.003 * scaled)
for col in ["eurusd_return", "gbpusd_return"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce") - (0.005 * scaled)
def _propagate_inflation_to_rates(self, df: pd.DataFrame, original_df: pd.DataFrame) -> None:
if "us_cpi_yoy" not in df.columns:
return
inflation_delta = (
pd.to_numeric(df["us_cpi_yoy"], errors="coerce")
- pd.to_numeric(original_df["us_cpi_yoy"], errors="coerce")
)
scaled = inflation_delta / 0.01 # +1% inflation reference
if "us2y_yield" in df.columns:
df["us2y_yield"] = pd.to_numeric(df["us2y_yield"], errors="coerce") + (0.0025 * scaled)
if "us10y_yield" in df.columns:
df["us10y_yield"] = pd.to_numeric(df["us10y_yield"], errors="coerce") + (0.0015 * scaled)
for col in ["spx_return", "ndx_return"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce") - (0.006 * scaled)
if "gold_return" in df.columns:
df["gold_return"] = pd.to_numeric(df["gold_return"], errors="coerce") + (0.005 * scaled)
def _propagate_vix_to_volatility(self, df: pd.DataFrame, original_df: pd.DataFrame) -> None:
if "vix_level" not in df.columns:
return
vix_delta = (
pd.to_numeric(df["vix_level"], errors="coerce")
- pd.to_numeric(original_df["vix_level"], errors="coerce")
)
scaled = vix_delta / 10.0 # +10 VIX points reference
for col in ["spx_vol_3m", "spx_vol_6m", "ndx_vol_3m", "ndx_vol_6m"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce") * (1.0 + 0.20 * scaled)
for col in ["spx_return", "ndx_return"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce") - (0.015 * scaled)
if "gold_return" in df.columns:
df["gold_return"] = pd.to_numeric(df["gold_return"], errors="coerce") + (0.002 * scaled)
for col in ["spx_drawdown", "ndx_drawdown", "gold_drawdown", "spx_max_dd_6m", "ndx_max_dd_6m", "gold_max_dd_6m"]:
if col in df.columns:
if "gold" in col:
df[col] = pd.to_numeric(df[col], errors="coerce") - (0.005 * scaled)
else:
df[col] = pd.to_numeric(df[col], errors="coerce") - (0.030 * scaled)
if "vix_spike" in df.columns:
df["vix_spike"] = 1
def _propagate_credit_to_risk(self, df: pd.DataFrame, original_df: pd.DataFrame) -> None:
if "high_yield_spread" not in df.columns:
return
hy_delta = (
pd.to_numeric(df["high_yield_spread"], errors="coerce")
- pd.to_numeric(original_df["high_yield_spread"], errors="coerce")
)
scaled = hy_delta / 0.02 # +200bps reference
for col in ["spx_return", "ndx_return"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce") - (0.012 * scaled)
if "gold_return" in df.columns:
df["gold_return"] = pd.to_numeric(df["gold_return"], errors="coerce") + (0.003 * scaled)
for col in ["spx_vol_3m", "spx_vol_6m", "ndx_vol_3m", "ndx_vol_6m"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce") * (1.0 + 0.12 * scaled)
for col in ["spx_drawdown", "ndx_drawdown"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce") - (0.020 * scaled)
def _propagate_usd_squeeze(self, df: pd.DataFrame) -> None:
for col in ["eurusd_return", "gbpusd_return"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce") - 0.020
for col in ["spx_return", "ndx_return"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce") - 0.010
if "gold_return" in df.columns:
df["gold_return"] = pd.to_numeric(df["gold_return"], errors="coerce") - 0.005
if "vix_level" in df.columns:
df["vix_level"] = pd.to_numeric(df["vix_level"], errors="coerce") + 5.0
def _propagate_systemic_crisis(self, df: pd.DataFrame) -> None:
for col in ["spx_return", "ndx_return"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce") - 0.030
if "gold_return" in df.columns:
df["gold_return"] = pd.to_numeric(df["gold_return"], errors="coerce") + 0.010
for col in ["spx_vol_3m", "spx_vol_6m", "ndx_vol_3m", "ndx_vol_6m"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce") * 1.35
if "vix_level" in df.columns:
df["vix_level"] = pd.to_numeric(df["vix_level"], errors="coerce") + 15.0
if "high_yield_spread" in df.columns:
df["high_yield_spread"] = pd.to_numeric(df["high_yield_spread"], errors="coerce") + 0.03
for col in ["spx_drawdown", "ndx_drawdown", "spx_max_dd_6m", "ndx_max_dd_6m"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce") - 0.050
if "gold_drawdown" in df.columns:
df["gold_drawdown"] = pd.to_numeric(df["gold_drawdown"], errors="coerce") - 0.010
if "gold_max_dd_6m" in df.columns:
df["gold_max_dd_6m"] = pd.to_numeric(df["gold_max_dd_6m"], errors="coerce") - 0.010
if "vix_spike" in df.columns:
df["vix_spike"] = 1
def _propagate_growth_scare(self, df: pd.DataFrame) -> None:
for col in ["spx_return", "ndx_return"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce") - 0.012
if "gold_return" in df.columns:
df["gold_return"] = pd.to_numeric(df["gold_return"], errors="coerce") + 0.004
for col in ["us2y_yield", "us10y_yield"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce") - 0.0025
def _recompute_dependent_fields(self, df: pd.DataFrame) -> None:
if "us10y_yield" in df.columns and "us2y_yield" in df.columns:
df["yield_spread"] = (
pd.to_numeric(df["us10y_yield"], errors="coerce")
- pd.to_numeric(df["us2y_yield"], errors="coerce")
)
if "vix_level" in df.columns and "vix_spike" in df.columns:
df["vix_spike"] = np.where(
pd.to_numeric(df["vix_level"], errors="coerce") >= 30.0,
1,
pd.to_numeric(df["vix_spike"], errors="coerce").fillna(0).astype(int),
)
def _clip_ranges(self, df: pd.DataFrame) -> None:
volatility_cols = ["gold_vol_3m", "gold_vol_6m", "spx_vol_3m", "spx_vol_6m", "ndx_vol_3m", "ndx_vol_6m"]
for col in volatility_cols:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce").clip(lower=0.0)
drawdown_cols = ["gold_drawdown", "gold_max_dd_6m", "spx_drawdown", "spx_max_dd_6m", "ndx_drawdown", "ndx_max_dd_6m"]
for col in drawdown_cols:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce").clip(upper=0.0)
def _add_delta_columns(self, df: pd.DataFrame, original_df: pd.DataFrame) -> None:
tracked_cols = [
"us2y_yield",
"us10y_yield",
"yield_spread",
"us_cpi_yoy",
"vix_level",
"high_yield_spread",
"spx_return",
"ndx_return",
"gold_return",
"eurusd_return",
"gbpusd_return",
]
for col in tracked_cols:
if col in df.columns and col in original_df.columns:
df[f"{col}_delta"] = (
pd.to_numeric(df[col], errors="coerce")
- pd.to_numeric(original_df[col], errors="coerce")
)
def _finalise_output_columns(self, df: pd.DataFrame) -> pd.DataFrame:
front = ["date", "scenario_name", "scenario_description"]
front = [c for c in front if c in df.columns]
rest = [c for c in df.columns if c not in front]
return df[front + rest]
def _build_summary(self, scenario_dataset: pd.DataFrame) -> pd.DataFrame:
summary_cols = [
"us2y_yield",
"us10y_yield",
"yield_spread",
"us_cpi_yoy",
"vix_level",
"high_yield_spread",
"spx_return",
"ndx_return",
"gold_return",
"eurusd_return",
"gbpusd_return",
]
summary_cols = [c for c in summary_cols if c in scenario_dataset.columns]
agg_map = {col: "mean" for col in summary_cols}
for col in summary_cols:
delta_col = f"{col}_delta"
if delta_col in scenario_dataset.columns:
agg_map[delta_col] = "mean"
summary = (
scenario_dataset
.groupby(["scenario_name", "scenario_description"], dropna=False)
.agg(agg_map)
.reset_index()
.sort_values("scenario_name")
.reset_index(drop=True)
)
return summary
@staticmethod
def _default_scenarios() -> List[ScenarioDefinition]:
return [
ScenarioDefinition(
name="baseline",
description="No shock applied. Control scenario.",
additive_shocks={},
multiplicative_shocks={},
rules={},
),
ScenarioDefinition(
name="rates_up_100bps",
description="Parallel +100 bps shock to US 2Y and US 10Y yields.",
additive_shocks={
"us2y_yield": 0.01,
"us10y_yield": 0.01,
},
multiplicative_shocks={},
rules={
"propagate_rates_to_risk": True,
},
),
ScenarioDefinition(
name="inflation_up_100bps",
description="US inflation shock of +1 percentage point YoY.",
additive_shocks={
"us_cpi_yoy": 0.01,
},
multiplicative_shocks={},
rules={
"propagate_inflation_to_rates": True,
},
),
ScenarioDefinition(
name="vix_spike_10pt",
description="Volatility shock with VIX rising by 10 points.",
additive_shocks={
"vix_level": 10.0,
},
multiplicative_shocks={},
rules={
"propagate_vix_to_volatility": True,
},
),
ScenarioDefinition(
name="credit_spread_widening_200bps",
description="High-yield spread widening by 200 bps.",
additive_shocks={
"high_yield_spread": 0.02,
},
multiplicative_shocks={},
rules={
"propagate_credit_to_risk": True,
},
),
ScenarioDefinition(
name="hawkish_policy_shock",
description="Correlated hawkish shock: higher yields, higher inflation, moderately higher VIX, wider spreads.",
additive_shocks={
"us2y_yield": 0.0125,
"us10y_yield": 0.0075,
"us_cpi_yoy": 0.0075,
"vix_level": 4.0,
"high_yield_spread": 0.005,
},
multiplicative_shocks={},
rules={
"propagate_rates_to_risk": True,
"propagate_inflation_to_rates": True,
"propagate_vix_to_volatility": True,
"propagate_credit_to_risk": True,
},
),
ScenarioDefinition(
name="stagflation_regime",
description="Higher inflation, wider spreads, higher volatility, weaker equities, supportive gold.",
additive_shocks={
"us_cpi_yoy": 0.015,
"vix_level": 8.0,
"high_yield_spread": 0.015,
"gold_return": 0.010,
},
multiplicative_shocks={
"spx_vol_3m": 0.15,
"spx_vol_6m": 0.15,
"ndx_vol_3m": 0.18,
"ndx_vol_6m": 0.18,
},
rules={
"propagate_inflation_to_rates": True,
"propagate_vix_to_volatility": True,
"propagate_credit_to_risk": True,
},
),
ScenarioDefinition(
name="disinflation_growth_scare",
description="Lower inflation, lower yields, weaker growth, weaker equities, mild gold support.",
additive_shocks={
"us_cpi_yoy": -0.01,
"vix_level": 5.0,
},
multiplicative_shocks={},
rules={
"propagate_growth_scare": True,
"propagate_vix_to_volatility": True,
},
),
ScenarioDefinition(
name="systemic_crisis",
description="Severe risk-off crisis with spread widening, volatility spike, large drawdowns, gold support.",
additive_shocks={
"vix_level": 20.0,
"high_yield_spread": 0.03,
},
multiplicative_shocks={
"spx_vol_3m": 0.25,
"spx_vol_6m": 0.25,
"ndx_vol_3m": 0.30,
"ndx_vol_6m": 0.30,
},
rules={
"propagate_systemic_crisis": True,
"propagate_vix_to_volatility": True,
"propagate_credit_to_risk": True,
},
),
ScenarioDefinition(
name="usd_dollar_squeeze",
description="Dollar strength regime with weaker EUR/USD and GBP/USD, weaker risk assets, higher stress.",
additive_shocks={},
multiplicative_shocks={},
rules={
"propagate_usd_squeeze": True,
},
),
]