File size: 4,411 Bytes
bb6a031
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""Helpers for the Gradio demo: load the pre-baked before-vs-after JSON
file and render each section as readable markdown.

The demo is intentionally read-only and deterministic: judges click "Next
incident" and see one of N pre-computed (alert, baseline-response,
trained-response, ground-truth) tuples.  The expensive part — running
the baseline and trained model on each incident — happens once on a GPU
in `eval.bake_demo` and is committed to `data/demo_examples.json`.

This file is small, fast, and carries no GPU dependency, so the deployed
HF Space can stay on the free CPU tier and still cold-start in <30s.
"""

from __future__ import annotations

import json
import os
from typing import Any, Dict, List


def load_demo_examples(path: str) -> List[Dict[str, Any]]:
    """Read demo examples.  Returns [] if the file isn't present yet so
    the Space still boots before the user has run training + bake_demo."""
    if not os.path.exists(path):
        return []
    with open(path, "r", encoding="utf-8") as f:
        data = json.load(f)
    if isinstance(data, dict) and "examples" in data:
        return data["examples"]
    return data  # type: ignore[return-value]


def _format_event(e: Dict[str, Any]) -> str:
    fields = e.get("fields") or {}
    field_strs = []
    for k, v in fields.items():
        if v in (None, ""):
            continue
        field_strs.append(f"`{k}`={v}")
    fields_md = " ".join(field_strs)
    et = e.get("event_type", "?")
    if hasattr(et, "value"):
        et = et.value
    return (
        f"- `{e.get('log_id')}` · {e.get('timestamp')} · "
        f"src=`{e.get('source')}` · type=`{et}` · {fields_md}"
    ).rstrip()


def format_alert_card(alert: Dict[str, Any], events: List[Dict[str, Any]]) -> str:
    """Render the SIEM alert + log window as a markdown card."""
    lines = [
        f"### Alert `{alert.get('alert_id', '?')}`",
        f"- **category**: {alert.get('category')}",
        f"- **severity**: {alert.get('severity')}",
        f"- **host / user**: {alert.get('host')} / {alert.get('user')}",
        f"- **summary**: {alert.get('summary', '')}",
        "",
        f"**Log window ({len(events)} event(s))**",
    ]
    for e in events:
        lines.append(_format_event(e))
    return "\n".join(lines)


def format_response_card(title: str, response: Dict[str, Any]) -> str:
    """Render a model response (parsed action + reward + breakdown)."""
    action = response.get("action", "—")
    cited = response.get("cited_log_id", "—")
    rationale = response.get("rationale", "")
    reward = response.get("reward")
    correct = response.get("correct")
    raw = response.get("raw_text", "")

    correct_emoji = "OK" if correct else ("MISS" if correct is False else "?")
    reward_str = f"{reward:+.2f}" if isinstance(reward, (int, float)) else "—"

    lines = [
        f"### {title}",
        f"- **action**: `{action}` ({correct_emoji})",
        f"- **cited_log**: `{cited}`",
        f"- **reward**: `{reward_str}`",
        "",
        f"> {rationale}",
    ]
    breakdown = response.get("reward_breakdown") or {}
    if breakdown:
        bk = ", ".join(f"`{k}={v:+.2f}`" for k, v in breakdown.items())
        lines.append("")
        lines.append(f"_{bk}_")
    if raw and raw != rationale:
        lines.append("")
        lines.append("<details><summary>raw model output</summary>")
        lines.append("")
        lines.append("```")
        lines.append(raw.strip())
        lines.append("```")
        lines.append("</details>")
    return "\n".join(lines)


def format_truth_card(ex: Dict[str, Any]) -> str:
    return (
        f"**Ground truth**: `{ex.get('ground_truth')}`  ·  "
        f"**Triggering log**: `{ex.get('triggering_log_id')}`  ·  "
        f"**Stage**: `{ex.get('stage')}`  ·  **Seed**: `{ex.get('seed')}`"
    )


def empty_state_message() -> str:
    return (
        "### No demo examples baked yet\n\n"
        "Run `python -m eval.bake_demo --placeholder` (no GPU required) "
        "or, after training, "
        "`python -m eval.bake_demo --baseline unsloth/Qwen2.5-3B-Instruct "
        "--trained-adapter checkpoints/defender_grpo/stage4_adversarial/adapter` "
        "to populate `data/demo_examples.json`."
    )


__all__ = [
    "load_demo_examples",
    "format_alert_card",
    "format_response_card",
    "format_truth_card",
    "empty_state_message",
]