"""Agent protocols and shared Pydantic models for OpenRange. Three pluggable infrastructure components: - SnapshotBuilder: generates candidate snapshot specs from manifests - NPCBehavior: decides NPC response to stimuli - ValidatorCheck: single admission check in the validation pipeline """ from __future__ import annotations from enum import Enum from typing import Any, Literal, Protocol, runtime_checkable from pydantic import AliasChoices, BaseModel, ConfigDict, Field # --------------------------------------------------------------------------- # Pydantic models — service lifecycle # --------------------------------------------------------------------------- class ReadinessCheck(BaseModel): """How to verify a service is ready after starting. Supports three probe types: - ``tcp``: connect to *port* on localhost. - ``http``: GET *url* and expect a 2xx response. - ``command``: run *command* and expect exit code 0. """ type: Literal["tcp", "http", "command"] = "tcp" port: int = 0 url: str = "" command: str = "" timeout_s: int = 30 interval_s: float = 1.0 class ServiceSpec(BaseModel): """Declarative service lifecycle for subprocess mode. Generated by the Renderer alongside docker-compose.yml. Consumed by ``RangeEnvironment._start_snapshot_services()``. Each entry describes one daemon that must be running for the snapshot to function. The *host* field links back to the topology host name so that stop/restart logic can correlate services to logical hosts. """ host: str daemon: str packages: list[str] = Field(default_factory=list) init_commands: list[str] = Field(default_factory=list) start_command: str readiness: ReadinessCheck = Field(default_factory=ReadinessCheck) log_dir: str = "" env_vars: dict[str, str] = Field(default_factory=dict) # --------------------------------------------------------------------------- # Pydantic models — build context & topology # --------------------------------------------------------------------------- class BuildContext(BaseModel): """Runtime context passed to the Builder on each build() call.""" seed: int | None = None tier: int = 1 previous_vuln_classes: list[str] = Field(default_factory=list) red_solve_rate: float = 0.0 blue_detect_rate: float = 0.0 weak_areas: list[str] = Field(default_factory=list) recent_attack_surfaces: list[str] = Field(default_factory=list) episode_count: int = 0 # Narrative guidance from curriculum narrative_hints: list[str] = Field( default_factory=list, description="Curriculum-driven hints, e.g. 'include lateral movement via credential reuse'", ) require_chain_length: int = Field( default=0, description="If > 0, force multi-hop exploit chains of at least this length", ) focus_layer: str = Field( default="", description="Which realism layer to emphasize: 'infra', 'app', 'identity', 'process'", ) class MutationOp(BaseModel): """Single typed edit applied to derive a child snapshot from a parent.""" mutation_id: str op_type: str target_selector: dict[str, str] = Field(default_factory=dict) magnitude: int = 1 params: dict[str, Any] = Field(default_factory=dict) expected_effects: list[str] = Field(default_factory=list) risk_tags: list[str] = Field(default_factory=list) class MutationPlan(BaseModel): """Ordered list of mutations used to produce a child snapshot.""" parent_snapshot_id: str | None = None ops: list[MutationOp] = Field(default_factory=list) predicted_complexity_delta: int = 0 predicted_chain_delta: int = 0 predicted_novelty: float = 0.0 policy_name: str = "" policy_score: float = 0.0 score_breakdown: dict[str, float] = Field(default_factory=dict) class LineageMetadata(BaseModel): """Lineage and mutation provenance for a stored snapshot.""" snapshot_id: str = "" parent_snapshot_id: str | None = None root_snapshot_id: str = "" manifest_id: str = "" generation_depth: int = 0 mutation_ids: list[str] = Field(default_factory=list) mutation_summary: list[str] = Field(default_factory=list) class Vulnerability(BaseModel): """Single planted vulnerability in the truth graph.""" id: str type: str # e.g. sqli, xss, idor, ssrf host: str service: str = "" injection_point: str = "" vulnerable_code: str | dict[str, str] = "" # str or {file_path: snippet} root_cause: str = "" blast_radius: str = "" remediation: str = "" class ExploitStep(BaseModel): """Single step in an exploit chain.""" model_config = ConfigDict(populate_by_name=True) vuln_id: str = Field(validation_alias=AliasChoices("vuln_id", "vuln")) command: str = Field(validation_alias=AliasChoices("command", "action")) description: str = Field(default="", validation_alias=AliasChoices("description", "yields")) class TruthGraph(BaseModel): """Ground truth about planted vulnerabilities and exploit chains.""" vulns: list[Vulnerability] = Field(default_factory=list) exploit_chain: list[ExploitStep] = Field(default_factory=list) class GoldenPathStep(BaseModel): """Single step in the golden path walkthrough.""" model_config = ConfigDict(populate_by_name=True) step: int command: str = Field(validation_alias=AliasChoices("command", "cmd")) expect_in_stdout: str = Field( default="", validation_alias=AliasChoices("expect_in_stdout", "expect_stdout"), ) host: str = "attacker" description: str = "" class FlagSpec(BaseModel): """Flag definition: value and where it lives.""" id: str value: str path: str host: str class EvidenceItem(BaseModel): """Expected evidence artifact for Blue to find.""" type: str # log_entry, alert, file location: str pattern: str = "" class NPCPersona(BaseModel): """NPC persona card for LLM-driven NPC behavior.""" name: str role: str = "" department: str = "" reports_to: str = "" communication_style: str = "" security_awareness: float = 0.5 # 0.0-1.0 susceptibility: dict[str, float] = Field(default_factory=dict) routine: dict[str, Any] = Field(default_factory=dict) accounts: dict[str, Any] = Field(default_factory=dict) class NPCTrafficSpec(BaseModel): """NPC traffic configuration.""" level: int = 0 # 0=shell scripts, 1=LLM personas rate_lambda: float = 10.0 # requests/minute scripts: list[str] = Field(default_factory=list) class TaskType(str, Enum): """Types of tasks agents can be assigned.""" EXPLOIT = "exploit" INVESTIGATE = "investigate" PATCH = "patch" REPORT = "report" ENDPOINT_QUERY = "endpoint_query" MULTI_STEP = "multi_step" class TaskSpec(BaseModel): """Agent-facing task descriptions (no leakage of internals).""" red_briefing: str = "" blue_briefing: str = "" task_type: str = "exploit" # Use str not enum for flexibility milestones: list[str] = Field(default_factory=list) # For multi_step tasks success_conditions: list[dict[str, Any]] = Field( default_factory=list, ) # [{type: "flag", value: "..."}, {type: "endpoint", url: "...", expect: "..."}] class SnapshotSpec(BaseModel): """Complete specification for a generated range snapshot.""" topology: dict[str, Any] = Field(default_factory=dict) truth_graph: TruthGraph = Field(default_factory=TruthGraph) golden_path: list[GoldenPathStep] = Field(default_factory=list) flags: list[FlagSpec] = Field(default_factory=list) evidence_spec: list[EvidenceItem] = Field(default_factory=list) npc_personas: list[NPCPersona] = Field(default_factory=list) npc_traffic: NPCTrafficSpec = Field(default_factory=NPCTrafficSpec) task: TaskSpec = Field(default_factory=TaskSpec) compose: dict[str, Any] = Field(default_factory=dict) # rendered docker-compose files: dict[str, str] = Field(default_factory=dict) # path -> content services: list[ServiceSpec] = Field(default_factory=list) # subprocess-mode daemons lineage: LineageMetadata = Field(default_factory=LineageMetadata) mutation_plan: MutationPlan | None = None class Stimulus(BaseModel): """Incoming stimulus for an NPC to react to.""" type: str = "email" # email, chat, file_access, voice sender: str = "" subject: str = "" content: str = "" attachments: list[str] = Field(default_factory=list) plausibility: float = 0.5 # 0.0-1.0 class NPCAction(BaseModel): """NPC's decided response to a stimulus.""" action: str = "ignore" # click_link, open_attachment, reply, share_credentials, # ignore, report_to_IT, forward response_content: str = "" side_effects: list[str] = Field(default_factory=list) class CheckResult(BaseModel): """Result of a single validator check.""" name: str = "" passed: bool = False time_s: float = 0.0 details: dict[str, Any] = Field(default_factory=dict) error: str = "" advisory: bool = False # if True, failure triggers retry but never blocks class ExecResult(BaseModel): """Structured command execution result.""" stdout: str = "" stderr: str = "" exit_code: int = 0 timed_out: bool = False @property def combined_output(self) -> str: parts = [self.stdout, self.stderr] return "\n".join(part for part in parts if part).strip() class ContainerSet(BaseModel): """Handle to live Docker containers for a snapshot.""" model_config = ConfigDict(arbitrary_types_allowed=True) project_name: str = "" container_ids: dict[str, str] = Field(default_factory=dict) # service -> id async def exec_run(self, container: str, cmd: str, timeout: float = 30.0) -> ExecResult: """Run *cmd* inside *container* and return structured output + status.""" import asyncio cid = self.container_ids.get(container, container) proc = await asyncio.create_subprocess_exec( "docker", "exec", cid, "sh", "-c", cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) except asyncio.TimeoutError: proc.kill() try: await proc.communicate() except Exception: # noqa: BLE001 pass return ExecResult(stderr="", exit_code=124, timed_out=True) return ExecResult( stdout=(stdout or b"").decode(errors="replace"), stderr=(stderr or b"").decode(errors="replace"), exit_code=int(proc.returncode or 0), ) async def exec(self, container: str, cmd: str, timeout: float = 30.0) -> str: """Backward-compatible string output helper around ``exec_run``.""" result = await self.exec_run(container, cmd, timeout=timeout) return result.combined_output async def is_healthy(self, container: str) -> bool: """Return True when *container* is running and its healthcheck passes.""" import asyncio cid = self.container_ids.get(container, container) proc = await asyncio.create_subprocess_exec( "docker", "inspect", "--format", "{{if .State.Health}}{{.State.Health.Status}}{{else}}{{.State.Status}}{{end}}", cid, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, _ = await proc.communicate() status = (stdout or b"").decode().strip() return status in {"running", "healthy"} async def cp(self, container: str, src: str, dest: str) -> None: """Copy a file into a container: ``docker cp src container:dest``.""" import asyncio cid = self.container_ids.get(container, container) proc = await asyncio.create_subprocess_exec( "docker", "cp", src, f"{cid}:{dest}", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) await proc.communicate() async def restart(self, container: str, timeout: float = 30.0) -> None: """Restart *container* via ``docker restart``, restoring pre-patched state.""" import asyncio cid = self.container_ids.get(container, container) proc = await asyncio.create_subprocess_exec( "docker", "restart", cid, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: await asyncio.wait_for(proc.communicate(), timeout=timeout) except asyncio.TimeoutError: proc.kill() # --------------------------------------------------------------------------- # Protocols # --------------------------------------------------------------------------- @runtime_checkable class SnapshotBuilder(Protocol): """Generate a candidate snapshot spec from a manifest.""" async def build( self, manifest: dict, context: BuildContext, ) -> SnapshotSpec: ... @runtime_checkable class NPCBehavior(Protocol): """Decide how an NPC responds to a stimulus.""" async def decide( self, persona: NPCPersona, stimulus: Stimulus, ) -> NPCAction: ... @runtime_checkable class ValidatorCheck(Protocol): """Single check in the validator admission pipeline.""" async def check( self, snapshot: SnapshotSpec, containers: ContainerSet, ) -> CheckResult: ...