open-range / src /open_range /protocols.py
Lars Talian
fix(validator): add structured exec results and strict patchability outcomes
595e190
"""Agent protocols and shared Pydantic models for OpenRange.
Three pluggable infrastructure components:
- SnapshotBuilder: generates candidate snapshot specs from manifests
- NPCBehavior: decides NPC response to stimuli
- ValidatorCheck: single admission check in the validation pipeline
"""
from __future__ import annotations
from enum import Enum
from typing import Any, Literal, Protocol, runtime_checkable
from pydantic import AliasChoices, BaseModel, ConfigDict, Field
# ---------------------------------------------------------------------------
# Pydantic models — service lifecycle
# ---------------------------------------------------------------------------
class ReadinessCheck(BaseModel):
"""How to verify a service is ready after starting.
Supports three probe types:
- ``tcp``: connect to *port* on localhost.
- ``http``: GET *url* and expect a 2xx response.
- ``command``: run *command* and expect exit code 0.
"""
type: Literal["tcp", "http", "command"] = "tcp"
port: int = 0
url: str = ""
command: str = ""
timeout_s: int = 30
interval_s: float = 1.0
class ServiceSpec(BaseModel):
"""Declarative service lifecycle for subprocess mode.
Generated by the Renderer alongside docker-compose.yml.
Consumed by ``RangeEnvironment._start_snapshot_services()``.
Each entry describes one daemon that must be running for the snapshot
to function. The *host* field links back to the topology host name
so that stop/restart logic can correlate services to logical hosts.
"""
host: str
daemon: str
packages: list[str] = Field(default_factory=list)
init_commands: list[str] = Field(default_factory=list)
start_command: str
readiness: ReadinessCheck = Field(default_factory=ReadinessCheck)
log_dir: str = ""
env_vars: dict[str, str] = Field(default_factory=dict)
# ---------------------------------------------------------------------------
# Pydantic models — build context & topology
# ---------------------------------------------------------------------------
class BuildContext(BaseModel):
"""Runtime context passed to the Builder on each build() call."""
seed: int | None = None
tier: int = 1
previous_vuln_classes: list[str] = Field(default_factory=list)
red_solve_rate: float = 0.0
blue_detect_rate: float = 0.0
weak_areas: list[str] = Field(default_factory=list)
recent_attack_surfaces: list[str] = Field(default_factory=list)
episode_count: int = 0
# Narrative guidance from curriculum
narrative_hints: list[str] = Field(
default_factory=list,
description="Curriculum-driven hints, e.g. 'include lateral movement via credential reuse'",
)
require_chain_length: int = Field(
default=0,
description="If > 0, force multi-hop exploit chains of at least this length",
)
focus_layer: str = Field(
default="",
description="Which realism layer to emphasize: 'infra', 'app', 'identity', 'process'",
)
class MutationOp(BaseModel):
"""Single typed edit applied to derive a child snapshot from a parent."""
mutation_id: str
op_type: str
target_selector: dict[str, str] = Field(default_factory=dict)
magnitude: int = 1
params: dict[str, Any] = Field(default_factory=dict)
expected_effects: list[str] = Field(default_factory=list)
risk_tags: list[str] = Field(default_factory=list)
class MutationPlan(BaseModel):
"""Ordered list of mutations used to produce a child snapshot."""
parent_snapshot_id: str | None = None
ops: list[MutationOp] = Field(default_factory=list)
predicted_complexity_delta: int = 0
predicted_chain_delta: int = 0
predicted_novelty: float = 0.0
policy_name: str = ""
policy_score: float = 0.0
score_breakdown: dict[str, float] = Field(default_factory=dict)
class LineageMetadata(BaseModel):
"""Lineage and mutation provenance for a stored snapshot."""
snapshot_id: str = ""
parent_snapshot_id: str | None = None
root_snapshot_id: str = ""
manifest_id: str = ""
generation_depth: int = 0
mutation_ids: list[str] = Field(default_factory=list)
mutation_summary: list[str] = Field(default_factory=list)
class Vulnerability(BaseModel):
"""Single planted vulnerability in the truth graph."""
id: str
type: str # e.g. sqli, xss, idor, ssrf
host: str
service: str = ""
injection_point: str = ""
vulnerable_code: str | dict[str, str] = "" # str or {file_path: snippet}
root_cause: str = ""
blast_radius: str = ""
remediation: str = ""
class ExploitStep(BaseModel):
"""Single step in an exploit chain."""
model_config = ConfigDict(populate_by_name=True)
vuln_id: str = Field(validation_alias=AliasChoices("vuln_id", "vuln"))
command: str = Field(validation_alias=AliasChoices("command", "action"))
description: str = Field(default="", validation_alias=AliasChoices("description", "yields"))
class TruthGraph(BaseModel):
"""Ground truth about planted vulnerabilities and exploit chains."""
vulns: list[Vulnerability] = Field(default_factory=list)
exploit_chain: list[ExploitStep] = Field(default_factory=list)
class GoldenPathStep(BaseModel):
"""Single step in the golden path walkthrough."""
model_config = ConfigDict(populate_by_name=True)
step: int
command: str = Field(validation_alias=AliasChoices("command", "cmd"))
expect_in_stdout: str = Field(
default="",
validation_alias=AliasChoices("expect_in_stdout", "expect_stdout"),
)
host: str = "attacker"
description: str = ""
class FlagSpec(BaseModel):
"""Flag definition: value and where it lives."""
id: str
value: str
path: str
host: str
class EvidenceItem(BaseModel):
"""Expected evidence artifact for Blue to find."""
type: str # log_entry, alert, file
location: str
pattern: str = ""
class NPCPersona(BaseModel):
"""NPC persona card for LLM-driven NPC behavior."""
name: str
role: str = ""
department: str = ""
reports_to: str = ""
communication_style: str = ""
security_awareness: float = 0.5 # 0.0-1.0
susceptibility: dict[str, float] = Field(default_factory=dict)
routine: dict[str, Any] = Field(default_factory=dict)
accounts: dict[str, Any] = Field(default_factory=dict)
class NPCTrafficSpec(BaseModel):
"""NPC traffic configuration."""
level: int = 0 # 0=shell scripts, 1=LLM personas
rate_lambda: float = 10.0 # requests/minute
scripts: list[str] = Field(default_factory=list)
class TaskType(str, Enum):
"""Types of tasks agents can be assigned."""
EXPLOIT = "exploit"
INVESTIGATE = "investigate"
PATCH = "patch"
REPORT = "report"
ENDPOINT_QUERY = "endpoint_query"
MULTI_STEP = "multi_step"
class TaskSpec(BaseModel):
"""Agent-facing task descriptions (no leakage of internals)."""
red_briefing: str = ""
blue_briefing: str = ""
task_type: str = "exploit" # Use str not enum for flexibility
milestones: list[str] = Field(default_factory=list) # For multi_step tasks
success_conditions: list[dict[str, Any]] = Field(
default_factory=list,
) # [{type: "flag", value: "..."}, {type: "endpoint", url: "...", expect: "..."}]
class SnapshotSpec(BaseModel):
"""Complete specification for a generated range snapshot."""
topology: dict[str, Any] = Field(default_factory=dict)
truth_graph: TruthGraph = Field(default_factory=TruthGraph)
golden_path: list[GoldenPathStep] = Field(default_factory=list)
flags: list[FlagSpec] = Field(default_factory=list)
evidence_spec: list[EvidenceItem] = Field(default_factory=list)
npc_personas: list[NPCPersona] = Field(default_factory=list)
npc_traffic: NPCTrafficSpec = Field(default_factory=NPCTrafficSpec)
task: TaskSpec = Field(default_factory=TaskSpec)
compose: dict[str, Any] = Field(default_factory=dict) # rendered docker-compose
files: dict[str, str] = Field(default_factory=dict) # path -> content
services: list[ServiceSpec] = Field(default_factory=list) # subprocess-mode daemons
lineage: LineageMetadata = Field(default_factory=LineageMetadata)
mutation_plan: MutationPlan | None = None
class Stimulus(BaseModel):
"""Incoming stimulus for an NPC to react to."""
type: str = "email" # email, chat, file_access, voice
sender: str = ""
subject: str = ""
content: str = ""
attachments: list[str] = Field(default_factory=list)
plausibility: float = 0.5 # 0.0-1.0
class NPCAction(BaseModel):
"""NPC's decided response to a stimulus."""
action: str = "ignore" # click_link, open_attachment, reply, share_credentials,
# ignore, report_to_IT, forward
response_content: str = ""
side_effects: list[str] = Field(default_factory=list)
class CheckResult(BaseModel):
"""Result of a single validator check."""
name: str = ""
passed: bool = False
time_s: float = 0.0
details: dict[str, Any] = Field(default_factory=dict)
error: str = ""
advisory: bool = False # if True, failure triggers retry but never blocks
class ExecResult(BaseModel):
"""Structured command execution result."""
stdout: str = ""
stderr: str = ""
exit_code: int = 0
timed_out: bool = False
@property
def combined_output(self) -> str:
parts = [self.stdout, self.stderr]
return "\n".join(part for part in parts if part).strip()
class ContainerSet(BaseModel):
"""Handle to live Docker containers for a snapshot."""
model_config = ConfigDict(arbitrary_types_allowed=True)
project_name: str = ""
container_ids: dict[str, str] = Field(default_factory=dict) # service -> id
async def exec_run(self, container: str, cmd: str, timeout: float = 30.0) -> ExecResult:
"""Run *cmd* inside *container* and return structured output + status."""
import asyncio
cid = self.container_ids.get(container, container)
proc = await asyncio.create_subprocess_exec(
"docker", "exec", cid, "sh", "-c", cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
try:
await proc.communicate()
except Exception: # noqa: BLE001
pass
return ExecResult(stderr="<timeout>", exit_code=124, timed_out=True)
return ExecResult(
stdout=(stdout or b"").decode(errors="replace"),
stderr=(stderr or b"").decode(errors="replace"),
exit_code=int(proc.returncode or 0),
)
async def exec(self, container: str, cmd: str, timeout: float = 30.0) -> str:
"""Backward-compatible string output helper around ``exec_run``."""
result = await self.exec_run(container, cmd, timeout=timeout)
return result.combined_output
async def is_healthy(self, container: str) -> bool:
"""Return True when *container* is running and its healthcheck passes."""
import asyncio
cid = self.container_ids.get(container, container)
proc = await asyncio.create_subprocess_exec(
"docker",
"inspect",
"--format",
"{{if .State.Health}}{{.State.Health.Status}}{{else}}{{.State.Status}}{{end}}",
cid,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await proc.communicate()
status = (stdout or b"").decode().strip()
return status in {"running", "healthy"}
async def cp(self, container: str, src: str, dest: str) -> None:
"""Copy a file into a container: ``docker cp src container:dest``."""
import asyncio
cid = self.container_ids.get(container, container)
proc = await asyncio.create_subprocess_exec(
"docker", "cp", src, f"{cid}:{dest}",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
await proc.communicate()
async def restart(self, container: str, timeout: float = 30.0) -> None:
"""Restart *container* via ``docker restart``, restoring pre-patched state."""
import asyncio
cid = self.container_ids.get(container, container)
proc = await asyncio.create_subprocess_exec(
"docker", "restart", cid,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
# ---------------------------------------------------------------------------
# Protocols
# ---------------------------------------------------------------------------
@runtime_checkable
class SnapshotBuilder(Protocol):
"""Generate a candidate snapshot spec from a manifest."""
async def build(
self,
manifest: dict,
context: BuildContext,
) -> SnapshotSpec: ...
@runtime_checkable
class NPCBehavior(Protocol):
"""Decide how an NPC responds to a stimulus."""
async def decide(
self,
persona: NPCPersona,
stimulus: Stimulus,
) -> NPCAction: ...
@runtime_checkable
class ValidatorCheck(Protocol):
"""Single check in the validator admission pipeline."""
async def check(
self,
snapshot: SnapshotSpec,
containers: ContainerSet,
) -> CheckResult: ...