File size: 3,347 Bytes
8c486a8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49d1c75
8c486a8
 
 
 
 
 
 
 
 
 
 
 
 
49d1c75
8c486a8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49d1c75
 
 
8c486a8
 
 
49d1c75
 
 
 
 
 
8c486a8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
"""Check 10: Realism review — LLM advisory on scenario plausibility.

Uses LiteLLM to review task briefings for leakage and overall realism.
Always ``advisory=True``: can trigger a retry but never overrides a
mechanical pass.

The LLM never sees flag values or golden-path commands — only summaries
and briefings.
"""

from __future__ import annotations

import json
import logging
import os
from typing import Any

from open_range.builder.prompts import REALISM_REVIEW_PROMPT
from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec

logger = logging.getLogger(__name__)


class RealismReviewCheck:
    """LLM-based realism review.  Always advisory."""

    def __init__(self, model: str | None = None) -> None:
        self.model = model or os.environ.get(
            "OPENRANGE_VALIDATOR_MODEL",
            "azure/gpt-5.2-codex",
        )

    async def check(self, snapshot: SnapshotSpec, containers: ContainerSet) -> CheckResult:
        try:
            import litellm  # noqa: F811
        except ImportError:
            return CheckResult(
                name="realism_review",
                passed=True,
                advisory=True,
                details={"note": "litellm not installed — skipping advisory review"},
            )

        # Build a redacted summary — never expose flag values or golden-path
        # commands to the reviewer LLM.
        tier = snapshot.topology.get("tier", 1)
        summary = {
            "task_briefings": {
                "red_briefing": snapshot.task.red_briefing,
                "blue_briefing": snapshot.task.blue_briefing,
            },
            "vuln_types": [v.type for v in snapshot.truth_graph.vulns],
            "vuln_hosts": [v.host for v in snapshot.truth_graph.vulns],
            "topology_hosts": snapshot.topology.get("hosts", []),
            "golden_path_length": len(snapshot.golden_path),
            "tier": tier,
        }

        try:
            kwargs: dict[str, Any] = {
                "model": self.model,
                "messages": [
                    {"role": "system", "content": REALISM_REVIEW_PROMPT},
                    {"role": "user", "content": json.dumps(summary)},
                ],
                "response_format": {"type": "json_object"},
            }
            # Codex models don't support temperature
            if "codex" not in self.model.lower():
                kwargs["temperature"] = 0.0
            response = await litellm.acompletion(**kwargs)
            review = json.loads(response.choices[0].message.content)
            passed = bool(review.get("pass", False))
            issues = review.get("issues", [])
        except Exception as exc:  # noqa: BLE001
            # LLM failure should not block validation — degrade gracefully.
            logger.warning("Realism review LLM call failed: %s", exc)
            return CheckResult(
                name="realism_review",
                passed=True,
                advisory=True,
                details={"note": f"LLM review failed ({exc}) — skipping"},
            )

        return CheckResult(
            name="realism_review",
            passed=passed,
            advisory=True,
            details={"issues": issues, "model": self.model},
            error="" if passed else "; ".join(str(i) for i in issues),
        )