petter2025 commited on
Commit
7e061fe
·
verified ·
1 Parent(s): 6b1b475

Update ai_risk_engine.py

Browse files
Files changed (1) hide show
  1. ai_risk_engine.py +83 -39
ai_risk_engine.py CHANGED
@@ -1,49 +1,93 @@
1
  """
2
- Local Bayesian risk engine for AI tasks – uses conjugate Beta priors.
3
  """
 
 
 
 
 
 
4
 
5
- import threading
6
- import numpy as np
7
- from typing import Dict, Tuple
8
 
9
- AI_CATEGORIES = ["chat", "code", "summary", "image", "audio", "iot"]
 
 
 
 
10
 
11
- DEFAULT_PRIORS = {
12
- "chat": (1.0, 10.0),
13
- "code": (0.5, 8.0),
14
- "summary": (1.0, 12.0),
15
- "image": (1.0, 15.0),
16
- "audio": (1.0, 15.0),
17
- "iot": (1.0, 10.0),
18
- }
19
 
20
- class AIRiskEngine:
21
- def __init__(self):
22
- self._lock = threading.RLock()
23
- self._priors: Dict[str, Tuple[float, float]] = DEFAULT_PRIORS.copy()
24
- self._counts: Dict[str, Tuple[int, int]] = {cat: (0, 0) for cat in AI_CATEGORIES}
25
 
26
- def get_posterior(self, category: str) -> Tuple[float, float]:
27
- prior_a, prior_b = self._priors[category]
28
- succ, trials = self._counts[category]
29
- return prior_a + succ, prior_b + (trials - succ)
30
 
31
- def risk_score(self, category: str) -> Dict[str, float]:
32
- with self._lock:
33
- alpha, beta = self.get_posterior(category)
34
- mean = alpha / (alpha + beta)
35
- samples = np.random.beta(alpha, beta, size=10000)
36
- return {
37
- "mean": float(mean),
38
- "p5": float(np.percentile(samples, 5)),
39
- "p50": float(np.percentile(samples, 50)),
40
- "p95": float(np.percentile(samples, 95)),
41
- }
 
 
 
 
 
 
 
 
 
 
 
42
 
43
  def update_outcome(self, category: str, success: bool):
44
- with self._lock:
45
- succ, trials = self._counts[category]
46
- trials += 1
47
- if success:
48
- succ += 1
49
- self._counts[category] = (succ, trials)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """
2
+ Bayesian risk engine with hyperpriors (hierarchical Beta‑binomial).
3
  """
4
+ import logging
5
+ import pyro
6
+ import pyro.distributions as dist
7
+ from pyro.infer import SVI, Trace_ELBO, Predictive
8
+ import torch
9
+ from typing import Dict, Optional
10
 
11
+ logger = logging.getLogger(__name__)
 
 
12
 
13
+ class AIRiskEngine:
14
+ """
15
+ Hierarchical Bayesian model for task‑specific risk.
16
+ Each task category has its own Beta parameters, but they share a common hyperprior.
17
+ """
18
 
19
+ def __init__(self, num_categories: int = 10):
20
+ self.num_categories = num_categories
21
+ self.category_names = ["chat", "code", "summary", "image", "audio", "iot", "switch", "server", "service", "unknown"]
22
+ self._init_model()
23
+ self._history = [] # store (category, success) for later updates
 
 
 
24
 
25
+ def _init_model(self):
26
+ # Hyperpriors
27
+ self.alpha0 = pyro.param("alpha0", torch.tensor(2.0), constraint=dist.constraints.positive)
28
+ self.beta0 = pyro.param("beta0", torch.tensor(2.0), constraint=dist.constraints.positive)
29
+ # We'll learn these via SVI when update is called
30
 
31
+ def model(self, observations=None):
32
+ # Global hyperprior (concentration parameters)
33
+ alpha0 = pyro.sample("alpha0", dist.Gamma(2.0, 1.0))
34
+ beta0 = pyro.sample("beta0", dist.Gamma(2.0, 1.0))
35
 
36
+ with pyro.plate("categories", self.num_categories):
37
+ # Category‑specific success probabilities drawn from Beta(alpha0, beta0)
38
+ p = pyro.sample("p", dist.Beta(alpha0, beta0))
39
+
40
+ if observations is not None:
41
+ # Observations: list of (category_idx, success)
42
+ cat_idx = torch.tensor([obs[0] for obs in observations])
43
+ successes = torch.tensor([obs[1] for obs in observations])
44
+ with pyro.plate("data", len(observations)):
45
+ pyro.sample("obs", dist.Bernoulli(p[cat_idx]), obs=successes)
46
+
47
+ def guide(self, observations=None):
48
+ # Variational parameters
49
+ alpha0_q = pyro.param("alpha0_q", torch.tensor(2.0), constraint=dist.constraints.positive)
50
+ beta0_q = pyro.param("beta0_q", torch.tensor(2.0), constraint=dist.constraints.positive)
51
+ pyro.sample("alpha0", dist.Gamma(alpha0_q, 1.0))
52
+ pyro.sample("beta0", dist.Gamma(beta0_q, 1.0))
53
+
54
+ with pyro.plate("categories", self.num_categories):
55
+ p_alpha = pyro.param("p_alpha", torch.ones(self.num_categories) * 2.0, constraint=dist.constraints.positive)
56
+ p_beta = pyro.param("p_beta", torch.ones(self.num_categories) * 2.0, constraint=dist.constraints.positive)
57
+ pyro.sample("p", dist.Beta(p_alpha, p_beta))
58
 
59
  def update_outcome(self, category: str, success: bool):
60
+ """Store observation and optionally trigger a learning step."""
61
+ cat_idx = self.category_names.index(category) if category in self.category_names else -1
62
+ if cat_idx == -1:
63
+ return
64
+ self._history.append((cat_idx, 1.0 if success else 0.0))
65
+ # Run a few steps of SVI
66
+ if len(self._history) > 5:
67
+ self._run_svi(steps=10)
68
+
69
+ def _run_svi(self, steps=50):
70
+ if len(self._history) == 0:
71
+ return
72
+ optimizer = pyro.optim.Adam({"lr": 0.01})
73
+ svi = SVI(self.model, self.guide, optimizer, loss=Trace_ELBO())
74
+ for _ in range(steps):
75
+ loss = svi.step(self._history)
76
+ if steps % 10 == 0:
77
+ logger.debug(f"SVI loss: {loss}")
78
+
79
+ def risk_score(self, category: str) -> Dict[str, float]:
80
+ """Return posterior predictive risk metrics for a category."""
81
+ cat_idx = self.category_names.index(category) if category in self.category_names else -1
82
+ if cat_idx == -1 or len(self._history) == 0:
83
+ return {"mean": 0.5, "p5": 0.1, "p50": 0.5, "p95": 0.9}
84
+ # Generate posterior samples for p[cat_idx]
85
+ predictive = Predictive(self.model, guide=self.guide, num_samples=500)
86
+ samples = predictive(self._history)
87
+ p_samples = samples["p"][:, cat_idx].detach().numpy()
88
+ return {
89
+ "mean": float(p_samples.mean()),
90
+ "p5": float(np.percentile(p_samples, 5)),
91
+ "p50": float(np.percentile(p_samples, 50)),
92
+ "p95": float(np.percentile(p_samples, 95))
93
+ }