Spaces:
Runtime error
Runtime error
| """ | |
| Risk service – integrates ARF Bayesian risk engine, policy engine, and decision engine. | |
| Deterministic, no random fallbacks, explicit error handling. Tenant‑aware. | |
| Version: 2026-06-07 – added tenant_id propagation, improved Rust enforcer integration. | |
| """ | |
| import json | |
| import logging | |
| import os | |
| import time | |
| from typing import Optional, List, Dict, Any | |
| from agentic_reliability_framework.core.governance.risk_engine import RiskEngine | |
| from agentic_reliability_framework.core.governance.intents import InfrastructureIntent | |
| from agentic_reliability_framework.core.models.event import ReliabilityEvent, HealingAction | |
| from agentic_reliability_framework.core.governance.policy_engine import PolicyEngine | |
| from agentic_reliability_framework.core.decision.decision_engine import DecisionEngine | |
| from agentic_reliability_framework.runtime.memory.rag_graph import RAGGraphMemory | |
| from agentic_reliability_framework.core.research.eclipse_probe import compute_epistemic_risk | |
| # ── optional tracing ───────────────────────────────────────── | |
| try: | |
| from opentelemetry import trace | |
| _tracer = trace.get_tracer(__name__) | |
| OTEL_AVAILABLE = True | |
| except ImportError: | |
| OTEL_AVAILABLE = False | |
| _tracer = None | |
| # ── Prometheus metrics (always registered; no‑op if not scraped) ─ | |
| from prometheus_client import Counter, Histogram | |
| _EVAL_COUNTER = Counter( | |
| "arf_evaluations_total", | |
| "Total evaluation calls (intent + healing), partitioned by engine and status.", | |
| ["engine", "status"], | |
| ) | |
| _EVAL_DURATION = Histogram( | |
| "arf_evaluation_duration_seconds", | |
| "End‑to‑end latency of evaluation calls.", | |
| ["engine"], | |
| buckets=(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0), | |
| ) | |
| _RUST_AGREEMENT = Counter( | |
| "arf_rust_agreement_total", | |
| "Agreement between Rust enforcer and Python policy evaluation.", | |
| ["result"], # "agreed" or "diverged" | |
| ) | |
| # ── optional Rust enforcer (shadow mode) ────────────────────── | |
| _RUST_ENFORCER_AVAILABLE = False | |
| _rust_evaluator = None # singleton per process | |
| _rust_policy_json: Optional[str] = None | |
| if os.getenv("ARF_USE_RUST_ENFORCER", "false").lower() == "true": | |
| try: | |
| import arf_enforcer | |
| _RUST_ENFORCER_AVAILABLE = True | |
| except ImportError: | |
| pass | |
| # Default OSS policy tree – mirrors the hard‑coded rules in the Python PolicyEvaluator | |
| _OSS_POLICY_TREE_JSON = json.dumps({ | |
| "And": [ | |
| {"Atomic": {"RegionAllowed": {"allowed_regions": ["eastus"]}}}, | |
| {"Atomic": {"ResourceTypeRestricted": { | |
| "forbidden_types": ["DATABASE_DROP", "FULL_ROLLOUT", "SYSTEM_SHUTDOWN", "SECRET_ROTATION"] | |
| }}}, | |
| {"Atomic": {"MaxPermissionLevel": {"max_level": "admin"}}} | |
| ] | |
| }) | |
| def _ensure_rust_evaluator() -> bool: | |
| """Lazy initialise the Rust policy evaluator. Returns True on success.""" | |
| global _rust_evaluator, _rust_policy_json | |
| if _rust_evaluator is not None: | |
| return True | |
| if not _RUST_ENFORCER_AVAILABLE: | |
| return False | |
| try: | |
| _rust_policy_json = _OSS_POLICY_TREE_JSON | |
| _rust_evaluator = arf_enforcer.PyPolicyEvaluator(_rust_policy_json) | |
| return True | |
| except Exception: | |
| _rust_evaluator = None | |
| return False | |
| logger = logging.getLogger(__name__) | |
| def evaluate_intent( | |
| engine: RiskEngine, | |
| intent: InfrastructureIntent, | |
| cost_estimate: Optional[float], | |
| policy_violations: List[str], | |
| tenant_id: Optional[str] = None, # <-- NEW: tenant isolation | |
| ) -> dict: | |
| """ | |
| Evaluate an infrastructure intent using the Bayesian risk engine. | |
| The risk score is computed using a weighted fusion of conjugate online | |
| model, optional hyperpriors, and offline HMC. The tenant_id is passed | |
| to the risk engine to select the correct per‑tenant Beta store. | |
| Parameters | |
| ---------- | |
| engine : RiskEngine | |
| Initialised ARF Bayesian risk engine (must be tenant‑aware). | |
| intent : InfrastructureIntent | |
| The infrastructure request to evaluate. | |
| cost_estimate : float or None | |
| Estimated monthly cost (used by cost‑threshold policies). | |
| policy_violations : list[str] | |
| Pre‑computed policy violation strings (from the Python evaluator). | |
| tenant_id : str, optional | |
| Tenant UUID. If provided, the risk engine will use tenant‑specific | |
| conjugate state. Required for multi‑tenant deployments. | |
| Returns | |
| ------- | |
| dict | |
| Keys: risk_score, explanation, contributions. | |
| """ | |
| t0 = time.monotonic() | |
| span = None | |
| if OTEL_AVAILABLE and _tracer: | |
| span = _tracer.start_span("risk_service.evaluate_intent") | |
| span.set_attribute("intent_type", type(intent).__name__) | |
| if tenant_id: | |
| span.set_attribute("tenant_id", tenant_id) | |
| # ── Shadow Rust enforcer (best‑effort, non‑blocking) ────── | |
| if _RUST_ENFORCER_AVAILABLE and _ensure_rust_evaluator(): | |
| try: | |
| rust_intent = { | |
| "action": getattr(intent, "intent_type", "unknown"), | |
| "component": getattr(intent, "service_name", "unknown"), | |
| "region": getattr(intent, "region", None), | |
| "resource_type": getattr(intent, "resource_type", None), | |
| "permission_level": getattr(intent, "permission_level", None), | |
| "tenant_id": tenant_id, # pass tenant for logging | |
| "extra": {} | |
| } | |
| rust_raw = _rust_evaluator.evaluate( | |
| json.dumps(rust_intent), cost_estimate | |
| ) | |
| rust_violations = json.loads(rust_raw) | |
| agreed = set(rust_violations) == set(policy_violations) | |
| _RUST_AGREEMENT.labels(result="agreed" if agreed else "diverged").inc() | |
| if not agreed: | |
| msg = ( | |
| f"Rust enforcer divergence for tenant {tenant_id}: " | |
| f"Rust={sorted(rust_violations)} Python={sorted(policy_violations)}" | |
| ) | |
| logger.warning(msg) | |
| if span: | |
| span.add_event("rust_enforcer_divergence", { | |
| "rust_violations": rust_violations, | |
| "python_violations": policy_violations | |
| }) | |
| except Exception as exc: | |
| logger.debug("Rust enforcer shadow evaluation failed: %s", exc) | |
| # ── Core risk evaluation ────────────────────────────────── | |
| try: | |
| # Note: The RiskEngine must be modified to accept tenant_id and use | |
| # a per‑tenant BetaStore. This change is expected in the core engine. | |
| # Here we pass the tenant_id as a keyword argument; the engine will | |
| # ignore it if not yet implemented, but we log a warning. | |
| if hasattr(engine, "set_tenant"): | |
| engine.set_tenant(tenant_id) | |
| elif tenant_id: | |
| logger.warning( | |
| "RiskEngine does not yet support tenant_id; evaluations will be shared across tenants." | |
| ) | |
| score, explanation, contributions = engine.calculate_risk( | |
| intent=intent, | |
| cost_estimate=cost_estimate, | |
| policy_violations=policy_violations | |
| ) | |
| engine_label = "python" | |
| status = "success" | |
| except Exception: | |
| _EVAL_COUNTER.labels(engine="python", status="error").inc() | |
| _EVAL_DURATION.labels(engine="python").observe(time.monotonic() - t0) | |
| raise | |
| _EVAL_COUNTER.labels(engine=engine_label, status=status).inc() | |
| _EVAL_DURATION.labels(engine=engine_label).observe(time.monotonic() - t0) | |
| if span: | |
| span.set_attribute("risk_score", score) | |
| if _RUST_ENFORCER_AVAILABLE: | |
| span.set_attribute("rust_enforcer_available", True) | |
| span.end() | |
| return { | |
| "risk_score": score, | |
| "explanation": explanation, | |
| "contributions": contributions | |
| } | |
| def evaluate_healing_decision( | |
| event: ReliabilityEvent, | |
| policy_engine: PolicyEngine, | |
| decision_engine: Optional[DecisionEngine] = None, | |
| rag_graph: Optional[RAGGraphMemory] = None, | |
| model=None, | |
| tokenizer=None, | |
| tenant_id: Optional[str] = None, # <-- NEW for audit context | |
| ) -> Dict[str, Any]: | |
| """ | |
| Evaluate healing actions for a given reliability event using decision‑theoretic selection. | |
| Includes epistemic risk signals from the eclipse probe. | |
| Parameters | |
| ---------- | |
| event : ReliabilityEvent | |
| The incident event containing latency, error rate, etc. | |
| policy_engine : PolicyEngine | |
| The ARF healing policy engine with configured policies. | |
| decision_engine : DecisionEngine, optional | |
| If omitted, a default instance is created. | |
| rag_graph : RAGGraphMemory, optional | |
| Semantic memory for similar incident retrieval. | |
| model, tokenizer : optional | |
| HuggingFace model and tokenizer for epistemic risk computation. | |
| tenant_id : str, optional | |
| Tenant UUID for logging and metrics (not used in core logic yet). | |
| Returns | |
| ------- | |
| dict | |
| Keys: risk_score, selected_action, expected_utility, alternatives, | |
| explanation, epistemic_signals. | |
| """ | |
| t0 = time.monotonic() | |
| span = None | |
| if OTEL_AVAILABLE and _tracer: | |
| span = _tracer.start_span("risk_service.evaluate_healing") | |
| span.set_attribute("component", event.component) | |
| if tenant_id: | |
| span.set_attribute("tenant_id", tenant_id) | |
| # If decision_engine not provided, try to get from policy_engine | |
| if decision_engine is None and hasattr(policy_engine, 'decision_engine'): | |
| decision_engine = policy_engine.decision_engine | |
| # If still None, create a minimal one (global stats only) | |
| if decision_engine is None: | |
| logger.debug("No DecisionEngine provided; creating default instance") | |
| decision_engine = DecisionEngine(rag_graph=rag_graph) | |
| # Get raw candidate actions (by temporarily disabling decision engine) | |
| orig_use = policy_engine.use_decision_engine | |
| try: | |
| policy_engine.use_decision_engine = False | |
| raw_actions = policy_engine.evaluate_policies(event) | |
| finally: | |
| policy_engine.use_decision_engine = orig_use | |
| # If no actions, return NO_ACTION | |
| if not raw_actions or raw_actions == [HealingAction.NO_ACTION]: | |
| if span: | |
| span.set_attribute("selected_action", HealingAction.NO_ACTION.value) | |
| span.end() | |
| _EVAL_COUNTER.labels(engine="python", status="success").inc() | |
| _EVAL_DURATION.labels(engine="python").observe(time.monotonic() - t0) | |
| return { | |
| "risk_score": 0.0, | |
| "selected_action": HealingAction.NO_ACTION.value, | |
| "expected_utility": 0.0, | |
| "alternatives": [], | |
| "explanation": "No candidate actions triggered.", | |
| "epistemic_signals": None, | |
| } | |
| # Build reasoning text from policies that triggered the actions | |
| reasoning_parts = [] | |
| for policy in policy_engine.policies: | |
| if any(a in policy.actions for a in raw_actions): | |
| conditions_str = ", ".join( | |
| f"{c.metric} {c.operator} {c.threshold}" for c in policy.conditions | |
| ) | |
| reasoning_parts.append( | |
| f"Policy {policy.name} triggered by {conditions_str} → actions {[a.value for a in policy.actions]}" | |
| ) | |
| reasoning_text = " ".join(reasoning_parts) | |
| # Build evidence text from the event | |
| evidence_text = ( | |
| f"Component: {event.component}, " | |
| f"latency_p99: {event.latency_p99}, " | |
| f"error_rate: {event.error_rate}, " | |
| f"cpu_util: {event.cpu_util}, " | |
| f"memory_util: {event.memory_util}" | |
| ) | |
| # Compute epistemic signals (if model/tokenizer provided) | |
| epistemic_signals = None | |
| if model is not None and tokenizer is not None: | |
| try: | |
| epistemic_signals = compute_epistemic_risk( | |
| reasoning_text, evidence_text, model, tokenizer | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to compute epistemic risk: {e}") | |
| epistemic_signals = { | |
| "entropy": 0.0, | |
| "contradiction": 0.0, | |
| "evidence_lift": 0.0, | |
| "hallucination_risk": 0.0, | |
| } | |
| else: | |
| logger.debug("Epistemic model/tokenizer not provided; using zero signals") | |
| epistemic_signals = { | |
| "entropy": 0.0, | |
| "contradiction": 0.0, | |
| "evidence_lift": 0.0, | |
| "hallucination_risk": 0.0, | |
| } | |
| # Run decision engine to get best action and alternatives | |
| decision = decision_engine.select_optimal_action( | |
| raw_actions, event, component=event.component, | |
| epistemic_signals=epistemic_signals | |
| ) | |
| # Extract risk of the selected action | |
| risk_score = None | |
| for alt in decision.alternatives: | |
| if alt.action == decision.best_action: | |
| risk_score = alt.risk | |
| break | |
| if risk_score is None: | |
| # Compute risk separately | |
| risk_score = decision_engine.compute_risk( | |
| decision.best_action, event, event.component) | |
| # Format alternatives (top 3 only) | |
| alt_list = [] | |
| for alt in decision.alternatives[:3]: | |
| alt_list.append({ | |
| "action": alt.action.value, | |
| "expected_utility": alt.utility, | |
| "risk": alt.risk, | |
| }) | |
| # ── Metrics & span finalisation ─────────────────────────── | |
| _EVAL_COUNTER.labels(engine="python", status="success").inc() | |
| _EVAL_DURATION.labels(engine="python").observe(time.monotonic() - t0) | |
| if span: | |
| span.set_attribute("risk_score", risk_score) | |
| span.set_attribute("selected_action", decision.best_action.value) | |
| span.set_attribute("expected_utility", decision.expected_utility) | |
| span.end() | |
| return { | |
| "risk_score": risk_score, | |
| "selected_action": decision.best_action.value, | |
| "expected_utility": decision.expected_utility, | |
| "alternatives": alt_list, | |
| "explanation": decision.explanation, | |
| "raw_decision": decision.raw_data, | |
| "epistemic_signals": epistemic_signals, | |
| } | |
| def get_system_risk() -> float: | |
| """ | |
| Return an aggregated risk score across all monitored components. | |
| This endpoint is deprecated. Use component‑level risk evaluation instead. | |
| """ | |
| raise NotImplementedError( | |
| "get_system_risk is deprecated. Use component‑level risk evaluation instead." | |
| ) | |