opensoc-env / demo_data.py
shivam2k3's picture
OpenSOC v1
bb6a031
"""Helpers for the Gradio demo: load the pre-baked before-vs-after JSON
file and render each section as readable markdown.
The demo is intentionally read-only and deterministic: judges click "Next
incident" and see one of N pre-computed (alert, baseline-response,
trained-response, ground-truth) tuples. The expensive part — running
the baseline and trained model on each incident — happens once on a GPU
in `eval.bake_demo` and is committed to `data/demo_examples.json`.
This file is small, fast, and carries no GPU dependency, so the deployed
HF Space can stay on the free CPU tier and still cold-start in <30s.
"""
from __future__ import annotations
import json
import os
from typing import Any, Dict, List
def load_demo_examples(path: str) -> List[Dict[str, Any]]:
"""Read demo examples. Returns [] if the file isn't present yet so
the Space still boots before the user has run training + bake_demo."""
if not os.path.exists(path):
return []
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict) and "examples" in data:
return data["examples"]
return data # type: ignore[return-value]
def _format_event(e: Dict[str, Any]) -> str:
fields = e.get("fields") or {}
field_strs = []
for k, v in fields.items():
if v in (None, ""):
continue
field_strs.append(f"`{k}`={v}")
fields_md = " ".join(field_strs)
et = e.get("event_type", "?")
if hasattr(et, "value"):
et = et.value
return (
f"- `{e.get('log_id')}` · {e.get('timestamp')} · "
f"src=`{e.get('source')}` · type=`{et}` · {fields_md}"
).rstrip()
def format_alert_card(alert: Dict[str, Any], events: List[Dict[str, Any]]) -> str:
"""Render the SIEM alert + log window as a markdown card."""
lines = [
f"### Alert `{alert.get('alert_id', '?')}`",
f"- **category**: {alert.get('category')}",
f"- **severity**: {alert.get('severity')}",
f"- **host / user**: {alert.get('host')} / {alert.get('user')}",
f"- **summary**: {alert.get('summary', '')}",
"",
f"**Log window ({len(events)} event(s))**",
]
for e in events:
lines.append(_format_event(e))
return "\n".join(lines)
def format_response_card(title: str, response: Dict[str, Any]) -> str:
"""Render a model response (parsed action + reward + breakdown)."""
action = response.get("action", "—")
cited = response.get("cited_log_id", "—")
rationale = response.get("rationale", "")
reward = response.get("reward")
correct = response.get("correct")
raw = response.get("raw_text", "")
correct_emoji = "OK" if correct else ("MISS" if correct is False else "?")
reward_str = f"{reward:+.2f}" if isinstance(reward, (int, float)) else "—"
lines = [
f"### {title}",
f"- **action**: `{action}` ({correct_emoji})",
f"- **cited_log**: `{cited}`",
f"- **reward**: `{reward_str}`",
"",
f"> {rationale}",
]
breakdown = response.get("reward_breakdown") or {}
if breakdown:
bk = ", ".join(f"`{k}={v:+.2f}`" for k, v in breakdown.items())
lines.append("")
lines.append(f"_{bk}_")
if raw and raw != rationale:
lines.append("")
lines.append("<details><summary>raw model output</summary>")
lines.append("")
lines.append("```")
lines.append(raw.strip())
lines.append("```")
lines.append("</details>")
return "\n".join(lines)
def format_truth_card(ex: Dict[str, Any]) -> str:
return (
f"**Ground truth**: `{ex.get('ground_truth')}` · "
f"**Triggering log**: `{ex.get('triggering_log_id')}` · "
f"**Stage**: `{ex.get('stage')}` · **Seed**: `{ex.get('seed')}`"
)
def empty_state_message() -> str:
return (
"### No demo examples baked yet\n\n"
"Run `python -m eval.bake_demo --placeholder` (no GPU required) "
"or, after training, "
"`python -m eval.bake_demo --baseline unsloth/Qwen2.5-3B-Instruct "
"--trained-adapter checkpoints/defender_grpo/stage4_adversarial/adapter` "
"to populate `data/demo_examples.json`."
)
__all__ = [
"load_demo_examples",
"format_alert_card",
"format_response_card",
"format_truth_card",
"empty_state_message",
]