"""Operon Watcher Dashboard — interactive signal classification and intervention timeline."""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
import gradio as gr
# ---------------------------------------------------------------------------
# Inline simulation (avoids heavy operon import for HF Space cold start)
# ---------------------------------------------------------------------------
class SignalCategory(Enum):
EPISTEMIC = "epistemic"
SOMATIC = "somatic"
SPECIES_SPECIFIC = "species"
class InterventionKind(Enum):
RETRY = "retry"
ESCALATE = "escalate"
HALT = "halt"
@dataclass(frozen=True)
class WatcherSignal:
category: SignalCategory
source: str
stage_name: str
value: float
detail: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class Intervention:
kind: InterventionKind
stage_name: str
reason: str
@dataclass
class SimulatedStage:
name: str
role: str
model: str # "fast" or "deep"
output: str
epiplexity: float
atp_fraction: float
immune_threat: str # "none", "suspicious", "confirmed", "critical"
action_type: str = "EXECUTE"
# ---------------------------------------------------------------------------
# Preset scenarios
# ---------------------------------------------------------------------------
def _build_normal_run() -> list[SimulatedStage]:
return [
SimulatedStage("intake", "Normalizer", "deterministic", "Parsed request", 0.7, 0.9, "none"),
SimulatedStage("router", "Router", "fast", "Route: billing", 0.6, 0.85, "none"),
SimulatedStage("analyst", "Analyst", "deep", "Risk assessment: low", 0.5, 0.75, "none"),
SimulatedStage("reviewer", "Reviewer", "fast", "Approved", 0.55, 0.7, "none"),
]
def _build_stagnant_agent() -> list[SimulatedStage]:
return [
SimulatedStage("intake", "Normalizer", "deterministic", "Parsed request", 0.7, 0.9, "none"),
SimulatedStage("planner", "Planner", "fast", "...", 0.12, 0.8, "none"), # Critical epiplexity
SimulatedStage("executor", "Executor", "deep", "Task completed (escalated)", 0.5, 0.7, "none"),
SimulatedStage("checker", "Checker", "fast", "Verified", 0.6, 0.65, "none"),
]
def _build_budget_exhaustion() -> list[SimulatedStage]:
return [
SimulatedStage("s1", "Worker", "fast", "Step 1 done", 0.5, 0.6, "none"),
SimulatedStage("s2", "Worker", "deep", "Step 2 done", 0.45, 0.3, "none"),
SimulatedStage("s3", "Worker", "deep", "Step 3 started", 0.4, 0.08, "none"), # ATP critical
SimulatedStage("s4", "Worker", "fast", "Step 4 skipped", 0.5, 0.05, "none"),
]
def _build_immune_alert() -> list[SimulatedStage]:
return [
SimulatedStage("intake", "Parser", "deterministic", "Input parsed", 0.7, 0.9, "none"),
SimulatedStage("agent", "Agent", "fast", "Suspicious output", 0.5, 0.85, "suspicious"),
SimulatedStage("agent2", "Agent", "fast", "Malicious pattern", 0.45, 0.8, "critical"),
]
PRESETS = {
"Normal Run": _build_normal_run,
"Stagnant Agent (→ Escalate)": _build_stagnant_agent,
"Budget Exhaustion (→ Low ATP)": _build_budget_exhaustion,
"Immune Alert (→ Halt)": _build_immune_alert,
}
# ---------------------------------------------------------------------------
# Simulation engine
# ---------------------------------------------------------------------------
def _classify_signals(
stage: SimulatedStage,
epiplexity_thresh: float,
atp_thresh: float,
immune_levels: tuple[str, ...],
) -> list[WatcherSignal]:
signals = []
# Epistemic
status = "healthy"
if stage.epiplexity < 0.15:
status = "critical"
elif stage.epiplexity < epiplexity_thresh:
status = "stagnant"
signals.append(WatcherSignal(
SignalCategory.EPISTEMIC, "epiplexity", stage.name, stage.epiplexity,
{"status": status},
))
# Somatic
signals.append(WatcherSignal(
SignalCategory.SOMATIC, "atp_store", stage.name, 1.0 - stage.atp_fraction,
{"fraction": stage.atp_fraction},
))
# Species
threat_value = {"none": 0.0, "suspicious": 0.3, "confirmed": 0.7, "critical": 1.0}
signals.append(WatcherSignal(
SignalCategory.SPECIES_SPECIFIC, "immune", stage.name, threat_value.get(stage.immune_threat, 0.0),
{"threat_level": stage.immune_threat},
))
return signals
def _decide_intervention(
stage: SimulatedStage,
signals: list[WatcherSignal],
intervention_count: int,
total_stages: int,
max_rate: float,
immune_levels: tuple[str, ...],
) -> Intervention | None:
# Convergence check
if total_stages > 0 and intervention_count / total_stages > max_rate:
return Intervention(InterventionKind.HALT, stage.name, "Non-convergence: intervention rate exceeded")
# Immune
if stage.immune_threat in immune_levels:
return Intervention(InterventionKind.HALT, stage.name, f"Immune threat: {stage.immune_threat}")
# Epistemic
ep_status = None
for s in signals:
if s.category == SignalCategory.EPISTEMIC:
ep_status = s.detail.get("status")
if ep_status == "critical":
if stage.model == "deep":
return Intervention(InterventionKind.HALT, stage.name, "Critical epiplexity on deep model")
return Intervention(InterventionKind.ESCALATE, stage.name, "Critical epiplexity → escalate")
if ep_status == "stagnant" and stage.model == "fast":
return Intervention(InterventionKind.ESCALATE, stage.name, "Stagnant on fast → escalate")
# Failure
if stage.action_type == "FAILURE":
return Intervention(InterventionKind.RETRY, stage.name, "Stage failure → retry")
return None
def _run_simulation(
stages: list[SimulatedStage],
epiplexity_thresh: float = 0.3,
atp_thresh: float = 0.1,
max_rate: float = 0.5,
immune_levels: tuple[str, ...] = ("confirmed", "critical"),
) -> tuple[list[WatcherSignal], list[Intervention]]:
all_signals: list[WatcherSignal] = []
all_interventions: list[Intervention] = []
for i, stage in enumerate(stages):
sigs = _classify_signals(stage, epiplexity_thresh, atp_thresh, immune_levels)
all_signals.extend(sigs)
intv = _decide_intervention(stage, sigs, len(all_interventions), i + 1, max_rate, immune_levels)
if intv:
all_interventions.append(intv)
if intv.kind == InterventionKind.HALT:
break
return all_signals, all_interventions
# ---------------------------------------------------------------------------
# HTML helpers
# ---------------------------------------------------------------------------
_CAT_COLORS = {
"epistemic": "#2563eb",
"somatic": "#16a34a",
"species": "#dc2626",
}
_INTV_COLORS = {
"retry": "#eab308",
"escalate": "#f97316",
"halt": "#ef4444",
}
def _signal_table_html(signals: list[WatcherSignal]) -> str:
rows = ""
for s in signals:
color = _CAT_COLORS.get(s.category.value, "#888")
rows += f"""
| {s.category.value} |
{s.source} |
{s.stage_name} |
{s.value:.2f} |
{s.detail} |
"""
return f"""
| Category |
Source |
Stage |
Value |
Detail |
{rows}
"""
def _timeline_html(
stages: list[SimulatedStage],
signals: list[WatcherSignal],
interventions: list[Intervention],
) -> str:
intv_map = {i.stage_name: i for i in interventions}
rows = ""
for stage in stages:
intv = intv_map.get(stage.name)
intv_cell = ""
if intv:
color = _INTV_COLORS.get(intv.kind.value, "#888")
intv_cell = f'{intv.kind.value.upper()} {intv.reason}'
stage_sigs = [s for s in signals if s.stage_name == stage.name]
ep_val = next((s.value for s in stage_sigs if s.category == SignalCategory.EPISTEMIC), None)
atp_val = next((s.detail.get("fraction") for s in stage_sigs if s.category == SignalCategory.SOMATIC), None)
ep_str = f"{ep_val:.2f}" if ep_val is not None else "—"
atp_str = f"{atp_val:.0%}" if atp_val is not None else "—"
rows += f"""
| {stage.name} |
{stage.model} |
{ep_str} |
{atp_str} |
{intv_cell or 'OK'} |
"""
rate = f"{len(interventions)}/{len(stages)}" if stages else "0/0"
return f"""
Intervention rate: {rate}
| Stage |
Model |
Epiplexity |
ATP |
Intervention |
{rows}
"""
# ---------------------------------------------------------------------------
# Gradio callbacks
# ---------------------------------------------------------------------------
def _load_preset(preset_name):
if preset_name not in PRESETS:
return "Select a preset.", ""
stages = PRESETS[preset_name]()
signals, interventions = _run_simulation(stages)
signal_html = _signal_table_html(signals)
timeline_html = _timeline_html(stages, signals, interventions)
return signal_html, timeline_html
def _run_custom(preset_name, ep_thresh, atp_thresh, max_rate):
if preset_name not in PRESETS:
return "Select a preset first."
stages = PRESETS[preset_name]()
signals, interventions = _run_simulation(
stages,
epiplexity_thresh=ep_thresh,
atp_thresh=atp_thresh,
max_rate=max_rate,
)
return _timeline_html(stages, signals, interventions)
# ---------------------------------------------------------------------------
# App
# ---------------------------------------------------------------------------
def build_app():
with gr.Blocks(title="Operon Watcher Dashboard", theme=gr.themes.Base()) as demo:
gr.Markdown("# Operon Watcher Dashboard\nSignal classification and intervention timeline for multi-stage workflows.")
with gr.Tab("Signal Classification"):
preset_dd = gr.Dropdown(choices=list(PRESETS.keys()), label="Preset Scenario", value="Normal Run")
load_btn = gr.Button("Load & Run")
signal_out = gr.HTML()
timeline_out = gr.HTML()
load_btn.click(_load_preset, inputs=[preset_dd], outputs=[signal_out, timeline_out])
with gr.Tab("Intervention Timeline"):
gr.Markdown("Select a preset in the first tab to see the intervention timeline above.")
with gr.Tab("Live Configuration"):
gr.Markdown("Adjust thresholds and re-run the selected scenario.")
preset_dd2 = gr.Dropdown(choices=list(PRESETS.keys()), label="Preset", value="Normal Run")
ep_slider = gr.Slider(minimum=0.05, maximum=0.8, value=0.3, step=0.05, label="Epiplexity Stagnant Threshold")
atp_slider = gr.Slider(minimum=0.01, maximum=0.5, value=0.1, step=0.01, label="ATP Low Fraction")
rate_slider = gr.Slider(minimum=0.1, maximum=1.0, value=0.5, step=0.1, label="Max Intervention Rate")
run_btn = gr.Button("Run with Custom Config")
custom_out = gr.HTML()
run_btn.click(_run_custom, inputs=[preset_dd2, ep_slider, atp_slider, rate_slider], outputs=[custom_out])
return demo
if __name__ == "__main__":
app = build_app()
app.launch()