| """ |
| hmc_learner.py – Hamiltonian Monte Carlo offline learner for ARF. |
| Trains a hierarchical Bayesian model on historical incidents to produce: |
| - Posterior coefficients for interpretable risk factors |
| - A fast approximator (logistic regression) for real-time use |
| - Feature importance for weighting semantic embeddings |
| """ |
|
|
| import json |
| import os |
| import sqlite3 |
| import numpy as np |
| import pandas as pd |
| import pymc as pm |
| import arviz as az |
| from datetime import datetime |
| from typing import Dict, Any, Optional |
|
|
| def fetch_incident_data(db_path: str) -> pd.DataFrame: |
| """Query the incidents table and return as DataFrame with all columns.""" |
| conn = sqlite3.connect(db_path) |
| |
| query = """ |
| SELECT action, risk_score, risk_level, confidence, allowed, |
| environment, user_role, requires_human, rollback_feasible, |
| hour_of_day, action_category, timestamp |
| FROM incidents |
| WHERE allowed IS NOT NULL -- only include evaluated incidents |
| """ |
| df = pd.read_sql_query(query, conn) |
| conn.close() |
| return df |
|
|
| def preprocess_data(df: pd.DataFrame) -> pd.DataFrame: |
| """Convert categorical variables to numeric codes and create features.""" |
| |
| df['allowed'] = df['allowed'].astype(int) |
|
|
| |
| df['env_prod'] = (df['environment'] == 'production').astype(int) |
|
|
| |
| df['role_junior'] = (df['user_role'] == 'junior').astype(int) |
|
|
| |
| df['action_cat_code'] = df['action_category'].astype('category').cat.codes |
| |
| cat_mapping = dict(enumerate(df['action_category'].astype('category').cat.categories)) |
|
|
| |
| df['risk_score'] = df['risk_score'].astype(float) |
|
|
| |
| hours = df['hour_of_day'].values |
| df['hour_sin'] = np.sin(2 * np.pi * hours / 24) |
| df['hour_cos'] = np.cos(2 * np.pi * hours / 24) |
|
|
| |
| df['confidence'] = df['confidence'].astype(float) |
|
|
| return df, cat_mapping |
|
|
| def build_model(df: pd.DataFrame): |
| """Build hierarchical logistic regression model using PyMC.""" |
| coords = { |
| "action_cat": np.unique(df['action_cat_code']), |
| } |
| with pm.Model(coords=coords) as model: |
| |
| α_mu = pm.Normal("α_mu", mu=0, sigma=1) |
| α_sigma = pm.HalfNormal("α_sigma", sigma=1) |
| α_cat = pm.Normal("α_cat", mu=α_mu, sigma=α_sigma, dims="action_cat") |
|
|
| |
| β_env = pm.Normal("β_env", mu=0, sigma=1) |
| β_role = pm.Normal("β_role", mu=0, sigma=1) |
| β_risk = pm.Normal("β_risk", mu=0, sigma=1) |
| β_hour_sin = pm.Normal("β_hour_sin", mu=0, sigma=1) |
| β_hour_cos = pm.Normal("β_hour_cos", mu=0, sigma=1) |
| β_conf = pm.Normal("β_conf", mu=0, sigma=1) |
|
|
| |
| logit_p = (α_cat[df['action_cat_code'].values] + |
| β_env * df['env_prod'].values + |
| β_role * df['role_junior'].values + |
| β_risk * (df['risk_score'].values - 0.5) + |
| β_hour_sin * df['hour_sin'].values + |
| β_hour_cos * df['hour_cos'].values + |
| β_conf * (df['confidence'].values - 0.5)) |
|
|
| |
| pm.Bernoulli("success", logit_p=logit_p, observed=df['allowed'].values) |
|
|
| |
| trace = pm.sample(1000, tune=1000, chains=4, cores=4, return_inferencedata=True, progressbar=False) |
| return model, trace |
|
|
| def extract_coefficients(trace, cat_mapping: Dict) -> Dict[str, Any]: |
| """Extract posterior means and 95% intervals.""" |
| summary = az.summary(trace, hdi_prob=0.95) |
| coeffs = {} |
| for var in summary.index: |
| coeffs[var] = { |
| "mean": summary.loc[var, "mean"], |
| "sd": summary.loc[var, "sd"], |
| "hdi_low": summary.loc[var, "hdi_2.5%"], |
| "hdi_high": summary.loc[var, "hdi_97.5%"], |
| } |
| |
| coeffs["action_cat_mapping"] = cat_mapping |
| return coeffs |
|
|
| def save_model(coeffs: Dict, output_dir: str): |
| """Save coefficients to JSON file.""" |
| path = os.path.join(output_dir, "hmc_model.json") |
| with open(path, 'w') as f: |
| json.dump(coeffs, f, indent=2) |
| return path |
|
|
| def train_hmc_model(db_path: str, output_dir: str) -> Dict[str, Any]: |
| """Main training routine: fetch data, build model, save coefficients.""" |
| df = fetch_incident_data(db_path) |
| if len(df) < 10: |
| raise ValueError("Insufficient data for training (need at least 10 incidents)") |
|
|
| df_proc, cat_mapping = preprocess_data(df) |
| model, trace = build_model(df_proc) |
| coeffs = extract_coefficients(trace, cat_mapping) |
| save_model(coeffs, output_dir) |
| return coeffs |