petter2025's picture
Update app.py
3c5135b verified
import gradio as gr
import asyncio
import json
import logging
import traceback
import os
import numpy as np
import pandas as pd
from datetime import datetime
from typing import Dict, Any, List, Optional
import threading
import urllib.request
import time
from scipy.stats import beta
# ----------------------------------------------------------------------
# Memory monitoring (no external dependencies)
# ----------------------------------------------------------------------
def get_memory_usage():
"""Return current process memory usage in MB (RSS)."""
try:
import resource
rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
if rss < 1e9:
return rss / 1024.0
else:
return rss / (1024.0 * 1024.0)
except ImportError:
try:
with open("/proc/self/status") as f:
for line in f:
if line.startswith("VmRSS:"):
parts = line.split()
if len(parts) >= 2:
return int(parts[1]) / 1024.0
except Exception:
pass
return None
def log_memory_usage():
mem_mb = get_memory_usage()
if mem_mb is not None:
logging.info(f"Process memory: {mem_mb:.1f} MB")
else:
logging.info("Process memory: unknown")
threading.Timer(60, log_memory_usage).start()
# ----------------------------------------------------------------------
# Plotly
# ----------------------------------------------------------------------
import plotly.graph_objects as go
# ----------------------------------------------------------------------
# Logging
# ----------------------------------------------------------------------
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# ----------------------------------------------------------------------
# Bayesian Risk Engine (Beta‑Binomial)
# ----------------------------------------------------------------------
class BayesianRiskEngine:
def __init__(self, alpha=1.0, beta=1.0):
self.alpha = alpha
self.beta = beta
def update(self, failures, successes):
self.alpha += failures
self.beta += successes
def risk(self):
return self.alpha / (self.alpha + self.beta)
def risk_interval(self, prob=0.95):
lo = beta.ppf((1 - prob) / 2, self.alpha, self.beta)
hi = beta.ppf((1 + prob) / 2, self.alpha, self.beta)
return lo, hi
# ----------------------------------------------------------------------
# Policy Engine
# ----------------------------------------------------------------------
class PolicyEngine:
def __init__(self, thresholds={"low": 0.2, "high": 0.8}):
self.thresholds = thresholds
def evaluate(self, risk):
if risk < self.thresholds["low"]:
return "approve", "Risk within safe limits"
elif risk > self.thresholds["high"]:
return "deny", f"Risk exceeds high threshold ({self.thresholds['high']})"
else:
return "escalate", f"Risk in escalation zone ({self.thresholds['low']}-{self.thresholds['high']})"
# ----------------------------------------------------------------------
# History
# ----------------------------------------------------------------------
decision_history = []
risk_history = []
def update_dashboard_data(decision, risk):
decision_history.append((datetime.utcnow().isoformat(), decision, risk))
risk_history.append((datetime.utcnow().isoformat(), risk))
if len(decision_history) > 100:
decision_history.pop(0)
if len(risk_history) > 100:
risk_history.pop(0)
def autonomous_control_decision(risk, risk_engine, policy_engine):
action, reason = policy_engine.evaluate(risk)
decision = {
"timestamp": datetime.utcnow().isoformat(),
"approved": action == "approve",
"actions": ["escalate_human"] if action == "escalate" else [],
"reason": reason,
"risk_level": "low" if risk < 0.2 else "medium" if risk < 0.8 else "high"
}
update_dashboard_data(decision, risk)
return decision
# ----------------------------------------------------------------------
# Infrastructure analysis
# ----------------------------------------------------------------------
async def handle_infra_with_governance(fault_type, context_window, session_state):
fault_map = {
"none": (1, 99),
"switch_down": (20, 80),
"server_overload": (35, 65),
"cascade": (60, 40)
}
failures, successes = fault_map.get(fault_type, (1, 99))
severity = "low" if failures < 10 else "medium" if failures < 40 else "high"
risk_engine = BayesianRiskEngine(alpha=1, beta=1)
risk_engine.update(failures, successes)
risk = risk_engine.risk()
ci_low, ci_high = risk_engine.risk_interval(0.95)
policy_engine = PolicyEngine(thresholds={"low": 0.2, "high": 0.8})
action, reason = policy_engine.evaluate(risk)
control_decision = autonomous_control_decision(risk, risk_engine, policy_engine)
analysis_result = {
"risk": risk,
"risk_ci": [ci_low, ci_high],
"decision": action,
"justification": reason,
"healing_actions": ["restart"] if action == "deny" else ["monitor"],
"posterior_parameters": {
"alpha": risk_engine.alpha,
"beta": risk_engine.beta
}
}
output = {
**analysis_result,
"governance": {
"policy_evaluation": {
"action": action,
"reason": reason,
"thresholds": policy_engine.thresholds
},
"control_plane_decision": control_decision
}
}
return output, session_state
# ----------------------------------------------------------------------
# MCMC (Metropolis‑Hastings)
# ----------------------------------------------------------------------
class MHMCMC:
def __init__(self, log_target, proposal_sd=0.1):
self.log_target = log_target
self.proposal_sd = proposal_sd
def sample(self, n_samples, initial_state, burn_in=0):
samples = np.zeros((n_samples, len(initial_state)))
current = np.array(initial_state)
current_log = self.log_target(current)
accepted = 0
for i in range(n_samples + burn_in):
proposal = current + np.random.normal(0, self.proposal_sd, size=len(current))
proposal_log = self.log_target(proposal)
accept_prob = min(1, np.exp(proposal_log - current_log))
if np.random.rand() < accept_prob:
current = proposal
current_log = proposal_log
accepted += 1
if i >= burn_in:
samples[i - burn_in] = current
acceptance_rate = accepted / (n_samples + burn_in)
return samples, acceptance_rate
def run_hmc_mcmc(samples, warmup):
# Generate data: 10 observations with mean 0.5, std 0.2
data = np.random.normal(0.5, 0.2, 10)
def log_prior(mu):
return -0.5 * (mu ** 2) # prior N(0,1)
def log_likelihood(mu):
return -0.5 * np.sum(((data - mu) / 0.2) ** 2)
def log_posterior(mu):
return log_prior(mu) + log_likelihood(mu)
sampler = MHMCMC(log_posterior, proposal_sd=0.05)
mu_samples, acceptance = sampler.sample(samples, initial_state=[0.0], burn_in=warmup)
mu_samples = mu_samples.flatten()
mean = np.mean(mu_samples)
median = np.median(mu_samples)
credible_interval = np.percentile(mu_samples, [2.5, 97.5])
fig_trace = go.Figure()
fig_trace.add_trace(go.Scatter(y=mu_samples, mode='lines', name='μ', line=dict(width=1)))
fig_trace.update_layout(title="Trace of μ (Metropolis-Hastings)", xaxis_title="Iteration", yaxis_title="μ")
fig_hist = go.Figure()
fig_hist.add_trace(go.Histogram(x=mu_samples, nbinsx=50, name='Posterior'))
fig_hist.update_layout(title="Posterior Distribution of μ", xaxis_title="μ", yaxis_title="Density")
summary = {
"mean": mean,
"median": median,
"credible_interval_95": f"[{credible_interval[0]:.3f}, {credible_interval[1]:.3f}]",
"acceptance_rate": f"{acceptance:.2%}"
}
return summary, fig_trace, fig_hist
# ----------------------------------------------------------------------
# Dashboard plots
# ----------------------------------------------------------------------
def generate_risk_gauge():
if not risk_history:
return go.Figure()
latest_risk = risk_history[-1][1]
fig = go.Figure(go.Indicator(
mode="gauge+number",
value=latest_risk,
title={'text': "Current Risk"},
gauge={
'axis': {'range': [0, 1]},
'bar': {'color': "darkblue"},
'steps': [
{'range': [0, 0.2], 'color': "lightgreen"},
{'range': [0.2, 0.8], 'color': "yellow"},
{'range': [0.8, 1], 'color': "red"}
]
}))
return fig
def generate_decision_pie():
if not decision_history:
return go.Figure()
approved = sum(1 for _, d, _ in decision_history if d.get("approved", False))
blocked = len(decision_history) - approved
fig = go.Figure(data=[go.Pie(labels=["Approved", "Blocked"], values=[approved, blocked])])
fig.update_layout(title="Policy Decisions")
return fig
def generate_action_timeline():
if not decision_history:
return go.Figure()
times = [d["timestamp"] for _, d, _ in decision_history]
approvals = [1 if d.get("approved", False) else 0 for _, d, _ in decision_history]
fig = go.Figure()
fig.add_trace(go.Scatter(x=times, y=approvals, mode='markers+lines', name='Approvals'))
fig.update_layout(title="Autonomous Actions Timeline", xaxis_title="Time", yaxis_title="Approved (1) / Blocked (0)")
return fig
def refresh_dashboard():
total = len(decision_history)
approved = sum(1 for _, d, _ in decision_history if d.get("approved", False))
blocked = total - approved
avg_risk = np.mean([r for _, r in risk_history]) if risk_history else 0.5
control_stats = {
"total_decisions": total,
"approved_actions": approved,
"blocked_actions": blocked,
"average_risk": float(avg_risk)
}
return (
control_stats,
generate_risk_gauge(),
generate_decision_pie(),
generate_action_timeline()
)
# ----------------------------------------------------------------------
# OSS capabilities (mocked)
# ----------------------------------------------------------------------
oss_caps = {
"edition": "OSS (Demo)",
"version": "4.0.0-bayesian",
"license": "Apache 2.0",
"execution": {"modes": ["advisory"], "max_incidents": 100},
"memory": {"type": "in-memory", "faiss_index_type": "flat", "max_incident_nodes": 100},
"enterprise_features": ["Real-time HMC (using PyMC)", "Hyperpriors", "Decision Engine"]
}
# ----------------------------------------------------------------------
# Start memory monitoring
# ----------------------------------------------------------------------
log_memory_usage()
# ----------------------------------------------------------------------
# Gradio UI
# ----------------------------------------------------------------------
with gr.Blocks(title="ARF v4 – Bayesian Risk Scoring Demo") as demo:
gr.Markdown("""
# 🧠 ARF v4 – Bayesian Risk Scoring for AI Reliability (Demo)
**Mathematically rigorous risk estimation using conjugate priors and MCMC**
This demo showcases:
- **Bayesian conjugate prior (Beta-Binomial)** – online risk update from observed failures/successes.
- **Policy thresholds** – approve (<0.2), escalate (0.2‑0.8), deny (>0.8).
- **Metropolis-Hastings MCMC** – sampling from a posterior distribution (simulating HMC concepts).
- **Autonomous control decisions** – based on the current risk estimate.
All components are implemented with only `numpy`, `scipy`, and standard libraries.
""")
with gr.Tabs():
with gr.TabItem("Control Plane Dashboard"):
gr.Markdown("### 🎮 Control Plane")
with gr.Row():
with gr.Column():
system_status = gr.JSON(label="System Status", value={
"edition": oss_caps["edition"],
"version": oss_caps["version"],
"governance_mode": "advisory",
"policies_loaded": 2,
"risk_threshold_low": 0.2,
"risk_threshold_high": 0.8
})
with gr.Column():
control_stats = gr.JSON(label="Control Statistics", value={
"total_decisions": 0,
"approved_actions": 0,
"blocked_actions": 0,
"average_risk": 0.5
})
with gr.Row():
risk_gauge = gr.Plot(label="Current Risk Gauge")
decision_pie = gr.Plot(label="Policy Decisions")
with gr.Row():
action_timeline = gr.Plot(label="Autonomous Actions Timeline")
refresh_dash_btn = gr.Button("Refresh Dashboard")
refresh_dash_btn.click(
fn=refresh_dashboard,
outputs=[control_stats, risk_gauge, decision_pie, action_timeline]
)
with gr.TabItem("Infrastructure Reliability"):
gr.Markdown("### 🏗️ Infrastructure Intent Evaluation with Bayesian Risk")
infra_state = gr.State(value={})
with gr.Row():
with gr.Column():
infra_fault = gr.Dropdown(
["none", "switch_down", "server_overload", "cascade"],
value="none",
label="Inject Fault"
)
infra_btn = gr.Button("Evaluate Intent")
with gr.Column():
infra_output = gr.JSON(label="Analysis Result")
with gr.TabItem("Deep Analysis (MCMC)"):
gr.Markdown("### Markov Chain Monte Carlo (Metropolis‑Hastings)")
with gr.Row():
with gr.Column():
hmc_samples = gr.Slider(500, 10000, value=5000, step=500, label="Number of Samples")
hmc_warmup = gr.Slider(100, 2000, value=1000, step=100, label="Burn‑in Steps")
hmc_run_btn = gr.Button("Run MCMC")
with gr.Column():
hmc_summary = gr.JSON(label="Posterior Summary")
with gr.Row():
hmc_trace_plot = gr.Plot(label="Trace Plot")
hmc_pair_plot = gr.Plot(label="Posterior Histogram")
with gr.TabItem("Policy Management"):
gr.Markdown("### 📋 Execution Policies")
policies_json = [
{"name": "Low Risk Policy", "conditions": ["risk < 0.2"], "action": "approve", "priority": 1},
{"name": "Medium Risk Policy", "conditions": ["0.2 ≤ risk ≤ 0.8"], "action": "escalate", "priority": 2},
{"name": "High Risk Policy", "conditions": ["risk > 0.8"], "action": "deny", "priority": 3}
]
gr.JSON(label="Active Policies", value=policies_json)
with gr.TabItem("Enterprise / OSS"):
gr.Markdown(f"""
## 🚀 ARF {oss_caps['edition'].upper()} Edition
**Version:** {oss_caps['version']}
**License:** {oss_caps['license']}
### OSS Capabilities (Demo)
- **Bayesian conjugate prior** – Beta-Binomial risk scoring
- **Policy thresholds** – configurable approve/escalate/deny
- **MCMC sampling** – Metropolis-Hastings (simulates HMC concepts)
- **In-memory storage** – no persistence
### Enterprise Features (not included)
{chr(10).join('- ' + f for f in oss_caps['enterprise_features'])}
[📅 Book a Demo](https://calendly.com/petter2025us/30min) | [📧 Contact Sales](mailto:petter2025us@outlook.com)
""")
# Wire events
infra_btn.click(
fn=lambda f, w, s: handle_infra_with_governance(f, w, s),
inputs=[infra_fault, gr.State(50), infra_state],
outputs=[infra_output, infra_state]
)
hmc_run_btn.click(
fn=run_hmc_mcmc,
inputs=[hmc_samples, hmc_warmup],
outputs=[hmc_summary, hmc_trace_plot, hmc_pair_plot]
)
if __name__ == "__main__":
demo.launch(theme="soft")