""" hmc_learner.py – Hamiltonian Monte Carlo offline learner for ARF. Trains a hierarchical Bayesian model on historical incidents to produce: - Posterior coefficients for interpretable risk factors - A fast approximator (logistic regression) for real-time use - Feature importance for weighting semantic embeddings """ import json import os import sqlite3 import numpy as np import pandas as pd import pymc as pm import arviz as az from datetime import datetime from typing import Dict, Any, Optional def fetch_incident_data(db_path: str) -> pd.DataFrame: """Query the incidents table and return as DataFrame with all columns.""" conn = sqlite3.connect(db_path) # Ensure we select the new columns; if missing, they will be NULL query = """ SELECT action, risk_score, risk_level, confidence, allowed, environment, user_role, requires_human, rollback_feasible, hour_of_day, action_category, timestamp FROM incidents WHERE allowed IS NOT NULL -- only include evaluated incidents """ df = pd.read_sql_query(query, conn) conn.close() return df def preprocess_data(df: pd.DataFrame) -> pd.DataFrame: """Convert categorical variables to numeric codes and create features.""" # Ensure allowed is integer (0/1) df['allowed'] = df['allowed'].astype(int) # Map environment to numeric (e.g., production=1, staging=0, dev=0) df['env_prod'] = (df['environment'] == 'production').astype(int) # Map user_role to numeric (junior=1, else 0) df['role_junior'] = (df['user_role'] == 'junior').astype(int) # action_category as categorical codes df['action_cat_code'] = df['action_category'].astype('category').cat.codes # Keep mapping for interpretation cat_mapping = dict(enumerate(df['action_category'].astype('category').cat.categories)) # risk_score as is (continuous) df['risk_score'] = df['risk_score'].astype(float) # Cyclical encoding of hour hours = df['hour_of_day'].values df['hour_sin'] = np.sin(2 * np.pi * hours / 24) df['hour_cos'] = np.cos(2 * np.pi * hours / 24) # confidence df['confidence'] = df['confidence'].astype(float) return df, cat_mapping def build_model(df: pd.DataFrame): """Build hierarchical logistic regression model using PyMC.""" coords = { "action_cat": np.unique(df['action_cat_code']), } with pm.Model(coords=coords) as model: # Hierarchical intercepts for action categories α_mu = pm.Normal("α_mu", mu=0, sigma=1) α_sigma = pm.HalfNormal("α_sigma", sigma=1) α_cat = pm.Normal("α_cat", mu=α_mu, sigma=α_sigma, dims="action_cat") # Coefficients for fixed effects β_env = pm.Normal("β_env", mu=0, sigma=1) β_role = pm.Normal("β_role", mu=0, sigma=1) β_risk = pm.Normal("β_risk", mu=0, sigma=1) β_hour_sin = pm.Normal("β_hour_sin", mu=0, sigma=1) β_hour_cos = pm.Normal("β_hour_cos", mu=0, sigma=1) β_conf = pm.Normal("β_conf", mu=0, sigma=1) # Linear predictor logit_p = (α_cat[df['action_cat_code'].values] + β_env * df['env_prod'].values + β_role * df['role_junior'].values + β_risk * (df['risk_score'].values - 0.5) + # center risk around 0.5 β_hour_sin * df['hour_sin'].values + β_hour_cos * df['hour_cos'].values + β_conf * (df['confidence'].values - 0.5)) # center confidence # Likelihood pm.Bernoulli("success", logit_p=logit_p, observed=df['allowed'].values) # Sample using NUTS (HMC) trace = pm.sample(1000, tune=1000, chains=4, cores=4, return_inferencedata=True, progressbar=False) return model, trace def extract_coefficients(trace, cat_mapping: Dict) -> Dict[str, Any]: """Extract posterior means and 95% intervals.""" summary = az.summary(trace, hdi_prob=0.95) coeffs = {} for var in summary.index: coeffs[var] = { "mean": summary.loc[var, "mean"], "sd": summary.loc[var, "sd"], "hdi_low": summary.loc[var, "hdi_2.5%"], "hdi_high": summary.loc[var, "hdi_97.5%"], } # Also store action category mapping coeffs["action_cat_mapping"] = cat_mapping return coeffs def save_model(coeffs: Dict, output_dir: str): """Save coefficients to JSON file.""" path = os.path.join(output_dir, "hmc_model.json") with open(path, 'w') as f: json.dump(coeffs, f, indent=2) return path def train_hmc_model(db_path: str, output_dir: str) -> Dict[str, Any]: """Main training routine: fetch data, build model, save coefficients.""" df = fetch_incident_data(db_path) if len(df) < 10: raise ValueError("Insufficient data for training (need at least 10 incidents)") df_proc, cat_mapping = preprocess_data(df) model, trace = build_model(df_proc) coeffs = extract_coefficients(trace, cat_mapping) save_model(coeffs, output_dir) return coeffs