| """ |
| Bayesian risk engine with hyperpriors (hierarchical Beta‑binomial). |
| Uses Pyro for variational inference. |
| """ |
| import logging |
| import pyro |
| import pyro.distributions as dist |
| from pyro.infer import SVI, Trace_ELBO, Predictive |
| import torch |
| import numpy as np |
| from typing import Dict, Optional, List, Tuple |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class AIRiskEngine: |
| """ |
| Hierarchical Bayesian model for task‑specific risk. |
| Each task category has its own Beta parameters, but they share a common hyperprior. |
| """ |
|
|
| def __init__(self, num_categories: int = 10): |
| self.num_categories = num_categories |
| self.category_names = ["chat", "code", "summary", "image", "audio", "iot", "switch", "server", "service", "unknown"] |
| self._history: List[Tuple[int, float]] = [] |
| self._init_model() |
|
|
| def _init_model(self): |
| |
| self.alpha0 = pyro.param("alpha0", torch.tensor(2.0), constraint=dist.constraints.positive) |
| self.beta0 = pyro.param("beta0", torch.tensor(2.0), constraint=dist.constraints.positive) |
| |
| self.p_alpha = pyro.param("p_alpha", torch.ones(self.num_categories) * 2.0, constraint=dist.constraints.positive) |
| self.p_beta = pyro.param("p_beta", torch.ones(self.num_categories) * 2.0, constraint=dist.constraints.positive) |
|
|
| def model(self, observations=None): |
| |
| alpha0 = pyro.sample("alpha0", dist.Gamma(2.0, 1.0)) |
| beta0 = pyro.sample("beta0", dist.Gamma(2.0, 1.0)) |
|
|
| with pyro.plate("categories", self.num_categories): |
| |
| p = pyro.sample("p", dist.Beta(alpha0, beta0)) |
|
|
| if observations is not None: |
| cat_idx = torch.tensor([obs[0] for obs in observations]) |
| successes = torch.tensor([obs[1] for obs in observations]) |
| with pyro.plate("data", len(observations)): |
| pyro.sample("obs", dist.Bernoulli(p[cat_idx]), obs=successes) |
|
|
| def guide(self, observations=None): |
| |
| alpha0_q = pyro.param("alpha0_q", torch.tensor(2.0), constraint=dist.constraints.positive) |
| beta0_q = pyro.param("beta0_q", torch.tensor(2.0), constraint=dist.constraints.positive) |
| pyro.sample("alpha0", dist.Gamma(alpha0_q, 1.0)) |
| pyro.sample("beta0", dist.Gamma(beta0_q, 1.0)) |
|
|
| with pyro.plate("categories", self.num_categories): |
| p_alpha = pyro.param("p_alpha", torch.ones(self.num_categories) * 2.0, constraint=dist.constraints.positive) |
| p_beta = pyro.param("p_beta", torch.ones(self.num_categories) * 2.0, constraint=dist.constraints.positive) |
| pyro.sample("p", dist.Beta(p_alpha, p_beta)) |
|
|
| def update_outcome(self, category: str, success: bool): |
| """Store observation and optionally trigger a learning step.""" |
| cat_idx = self.category_names.index(category) if category in self.category_names else -1 |
| if cat_idx == -1: |
| logger.warning(f"Unknown category: {category}") |
| return |
| self._history.append((cat_idx, 1.0 if success else 0.0)) |
| |
| if len(self._history) > 5: |
| self._run_svi(steps=10) |
|
|
| def _run_svi(self, steps=50): |
| if len(self._history) == 0: |
| return |
| optimizer = pyro.optim.Adam({"lr": 0.01}) |
| svi = SVI(self.model, self.guide, optimizer, loss=Trace_ELBO()) |
| for step in range(steps): |
| loss = svi.step(self._history) |
| if step % 10 == 0: |
| logger.debug(f"SVI step {step}, loss: {loss}") |
|
|
| def risk_score(self, category: str) -> Dict[str, float]: |
| """Return posterior predictive risk metrics for a category.""" |
| cat_idx = self.category_names.index(category) if category in self.category_names else -1 |
| if cat_idx == -1 or len(self._history) == 0: |
| return {"mean": 0.5, "p5": 0.1, "p50": 0.5, "p95": 0.9} |
| |
| predictive = Predictive(self.model, guide=self.guide, num_samples=500) |
| samples = predictive(self._history) |
| p_samples = samples["p"][:, cat_idx].detach().numpy() |
| return { |
| "mean": float(p_samples.mean()), |
| "p5": float(np.percentile(p_samples, 5)), |
| "p50": float(np.percentile(p_samples, 50)), |
| "p95": float(np.percentile(p_samples, 95)) |
| } |