petter2025's picture
Update hmc_learner.py
c81c838 verified
raw
history blame
5.09 kB
"""
hmc_learner.py – Hamiltonian Monte Carlo offline learner for ARF.
Trains a hierarchical Bayesian model on historical incidents to produce:
- Posterior coefficients for interpretable risk factors
- A fast approximator (logistic regression) for real-time use
- Feature importance for weighting semantic embeddings
"""
import json
import os
import sqlite3
import numpy as np
import pandas as pd
import pymc as pm
import arviz as az
from datetime import datetime
from typing import Dict, Any, Optional
def fetch_incident_data(db_path: str) -> pd.DataFrame:
"""Query the incidents table and return as DataFrame with all columns."""
conn = sqlite3.connect(db_path)
# Ensure we select the new columns; if missing, they will be NULL
query = """
SELECT action, risk_score, risk_level, confidence, allowed,
environment, user_role, requires_human, rollback_feasible,
hour_of_day, action_category, timestamp
FROM incidents
WHERE allowed IS NOT NULL -- only include evaluated incidents
"""
df = pd.read_sql_query(query, conn)
conn.close()
return df
def preprocess_data(df: pd.DataFrame) -> pd.DataFrame:
"""Convert categorical variables to numeric codes and create features."""
# Ensure allowed is integer (0/1)
df['allowed'] = df['allowed'].astype(int)
# Map environment to numeric (e.g., production=1, staging=0, dev=0)
df['env_prod'] = (df['environment'] == 'production').astype(int)
# Map user_role to numeric (junior=1, else 0)
df['role_junior'] = (df['user_role'] == 'junior').astype(int)
# action_category as categorical codes
df['action_cat_code'] = df['action_category'].astype('category').cat.codes
# Keep mapping for interpretation
cat_mapping = dict(enumerate(df['action_category'].astype('category').cat.categories))
# risk_score as is (continuous)
df['risk_score'] = df['risk_score'].astype(float)
# Cyclical encoding of hour
hours = df['hour_of_day'].values
df['hour_sin'] = np.sin(2 * np.pi * hours / 24)
df['hour_cos'] = np.cos(2 * np.pi * hours / 24)
# confidence
df['confidence'] = df['confidence'].astype(float)
return df, cat_mapping
def build_model(df: pd.DataFrame):
"""Build hierarchical logistic regression model using PyMC."""
coords = {
"action_cat": np.unique(df['action_cat_code']),
}
with pm.Model(coords=coords) as model:
# Hierarchical intercepts for action categories
α_mu = pm.Normal("α_mu", mu=0, sigma=1)
α_sigma = pm.HalfNormal("α_sigma", sigma=1)
α_cat = pm.Normal("α_cat", mu=α_mu, sigma=α_sigma, dims="action_cat")
# Coefficients for fixed effects
β_env = pm.Normal("β_env", mu=0, sigma=1)
β_role = pm.Normal("β_role", mu=0, sigma=1)
β_risk = pm.Normal("β_risk", mu=0, sigma=1)
β_hour_sin = pm.Normal("β_hour_sin", mu=0, sigma=1)
β_hour_cos = pm.Normal("β_hour_cos", mu=0, sigma=1)
β_conf = pm.Normal("β_conf", mu=0, sigma=1)
# Linear predictor
logit_p = (α_cat[df['action_cat_code'].values] +
β_env * df['env_prod'].values +
β_role * df['role_junior'].values +
β_risk * (df['risk_score'].values - 0.5) + # center risk around 0.5
β_hour_sin * df['hour_sin'].values +
β_hour_cos * df['hour_cos'].values +
β_conf * (df['confidence'].values - 0.5)) # center confidence
# Likelihood
pm.Bernoulli("success", logit_p=logit_p, observed=df['allowed'].values)
# Sample using NUTS (HMC)
trace = pm.sample(1000, tune=1000, chains=4, cores=4, return_inferencedata=True, progressbar=False)
return model, trace
def extract_coefficients(trace, cat_mapping: Dict) -> Dict[str, Any]:
"""Extract posterior means and 95% intervals."""
summary = az.summary(trace, hdi_prob=0.95)
coeffs = {}
for var in summary.index:
coeffs[var] = {
"mean": summary.loc[var, "mean"],
"sd": summary.loc[var, "sd"],
"hdi_low": summary.loc[var, "hdi_2.5%"],
"hdi_high": summary.loc[var, "hdi_97.5%"],
}
# Also store action category mapping
coeffs["action_cat_mapping"] = cat_mapping
return coeffs
def save_model(coeffs: Dict, output_dir: str):
"""Save coefficients to JSON file."""
path = os.path.join(output_dir, "hmc_model.json")
with open(path, 'w') as f:
json.dump(coeffs, f, indent=2)
return path
def train_hmc_model(db_path: str, output_dir: str) -> Dict[str, Any]:
"""Main training routine: fetch data, build model, save coefficients."""
df = fetch_incident_data(db_path)
if len(df) < 10:
raise ValueError("Insufficient data for training (need at least 10 incidents)")
df_proc, cat_mapping = preprocess_data(df)
model, trace = build_model(df_proc)
coeffs = extract_coefficients(trace, cat_mapping)
save_model(coeffs, output_dir)
return coeffs