Spaces:
Runtime error
Runtime error
| """Agent protocols and shared Pydantic models for OpenRange. | |
| Three pluggable infrastructure components: | |
| - SnapshotBuilder: generates candidate snapshot specs from manifests | |
| - NPCBehavior: decides NPC response to stimuli | |
| - ValidatorCheck: single admission check in the validation pipeline | |
| """ | |
| from __future__ import annotations | |
| from enum import Enum | |
| from typing import Any, Literal, Protocol, runtime_checkable | |
| from pydantic import AliasChoices, BaseModel, ConfigDict, Field | |
| # --------------------------------------------------------------------------- | |
| # Pydantic models — service lifecycle | |
| # --------------------------------------------------------------------------- | |
| class ReadinessCheck(BaseModel): | |
| """How to verify a service is ready after starting. | |
| Supports three probe types: | |
| - ``tcp``: connect to *port* on localhost. | |
| - ``http``: GET *url* and expect a 2xx response. | |
| - ``command``: run *command* and expect exit code 0. | |
| """ | |
| type: Literal["tcp", "http", "command"] = "tcp" | |
| port: int = 0 | |
| url: str = "" | |
| command: str = "" | |
| timeout_s: int = 30 | |
| interval_s: float = 1.0 | |
| class ServiceSpec(BaseModel): | |
| """Declarative service lifecycle for subprocess mode. | |
| Generated by the Renderer alongside docker-compose.yml. | |
| Consumed by ``RangeEnvironment._start_snapshot_services()``. | |
| Each entry describes one daemon that must be running for the snapshot | |
| to function. The *host* field links back to the topology host name | |
| so that stop/restart logic can correlate services to logical hosts. | |
| """ | |
| host: str | |
| daemon: str | |
| packages: list[str] = Field(default_factory=list) | |
| init_commands: list[str] = Field(default_factory=list) | |
| start_command: str | |
| readiness: ReadinessCheck = Field(default_factory=ReadinessCheck) | |
| log_dir: str = "" | |
| env_vars: dict[str, str] = Field(default_factory=dict) | |
| # --------------------------------------------------------------------------- | |
| # Pydantic models — build context & topology | |
| # --------------------------------------------------------------------------- | |
| class BuildContext(BaseModel): | |
| """Runtime context passed to the Builder on each build() call.""" | |
| seed: int | None = None | |
| tier: int = 1 | |
| previous_vuln_classes: list[str] = Field(default_factory=list) | |
| red_solve_rate: float = 0.0 | |
| blue_detect_rate: float = 0.0 | |
| weak_areas: list[str] = Field(default_factory=list) | |
| recent_attack_surfaces: list[str] = Field(default_factory=list) | |
| episode_count: int = 0 | |
| # Narrative guidance from curriculum | |
| narrative_hints: list[str] = Field( | |
| default_factory=list, | |
| description="Curriculum-driven hints, e.g. 'include lateral movement via credential reuse'", | |
| ) | |
| require_chain_length: int = Field( | |
| default=0, | |
| description="If > 0, force multi-hop exploit chains of at least this length", | |
| ) | |
| focus_layer: str = Field( | |
| default="", | |
| description="Which realism layer to emphasize: 'infra', 'app', 'identity', 'process'", | |
| ) | |
| class MutationOp(BaseModel): | |
| """Single typed edit applied to derive a child snapshot from a parent.""" | |
| mutation_id: str | |
| op_type: str | |
| target_selector: dict[str, str] = Field(default_factory=dict) | |
| magnitude: int = 1 | |
| params: dict[str, Any] = Field(default_factory=dict) | |
| expected_effects: list[str] = Field(default_factory=list) | |
| risk_tags: list[str] = Field(default_factory=list) | |
| class MutationPlan(BaseModel): | |
| """Ordered list of mutations used to produce a child snapshot.""" | |
| parent_snapshot_id: str | None = None | |
| ops: list[MutationOp] = Field(default_factory=list) | |
| predicted_complexity_delta: int = 0 | |
| predicted_chain_delta: int = 0 | |
| predicted_novelty: float = 0.0 | |
| policy_name: str = "" | |
| policy_score: float = 0.0 | |
| score_breakdown: dict[str, float] = Field(default_factory=dict) | |
| class LineageMetadata(BaseModel): | |
| """Lineage and mutation provenance for a stored snapshot.""" | |
| snapshot_id: str = "" | |
| parent_snapshot_id: str | None = None | |
| root_snapshot_id: str = "" | |
| manifest_id: str = "" | |
| generation_depth: int = 0 | |
| mutation_ids: list[str] = Field(default_factory=list) | |
| mutation_summary: list[str] = Field(default_factory=list) | |
| class Vulnerability(BaseModel): | |
| """Single planted vulnerability in the truth graph.""" | |
| id: str | |
| type: str # e.g. sqli, xss, idor, ssrf | |
| host: str | |
| service: str = "" | |
| injection_point: str = "" | |
| vulnerable_code: str | dict[str, str] = "" # str or {file_path: snippet} | |
| root_cause: str = "" | |
| blast_radius: str = "" | |
| remediation: str = "" | |
| class ExploitStep(BaseModel): | |
| """Single step in an exploit chain.""" | |
| model_config = ConfigDict(populate_by_name=True) | |
| vuln_id: str = Field(validation_alias=AliasChoices("vuln_id", "vuln")) | |
| command: str = Field(validation_alias=AliasChoices("command", "action")) | |
| description: str = Field(default="", validation_alias=AliasChoices("description", "yields")) | |
| class TruthGraph(BaseModel): | |
| """Ground truth about planted vulnerabilities and exploit chains.""" | |
| vulns: list[Vulnerability] = Field(default_factory=list) | |
| exploit_chain: list[ExploitStep] = Field(default_factory=list) | |
| class GoldenPathStep(BaseModel): | |
| """Single step in the golden path walkthrough.""" | |
| model_config = ConfigDict(populate_by_name=True) | |
| step: int | |
| command: str = Field(validation_alias=AliasChoices("command", "cmd")) | |
| expect_in_stdout: str = Field( | |
| default="", | |
| validation_alias=AliasChoices("expect_in_stdout", "expect_stdout"), | |
| ) | |
| host: str = "attacker" | |
| description: str = "" | |
| class FlagSpec(BaseModel): | |
| """Flag definition: value and where it lives.""" | |
| id: str | |
| value: str | |
| path: str | |
| host: str | |
| class EvidenceItem(BaseModel): | |
| """Expected evidence artifact for Blue to find.""" | |
| type: str # log_entry, alert, file | |
| location: str | |
| pattern: str = "" | |
| class NPCPersona(BaseModel): | |
| """NPC persona card for LLM-driven NPC behavior.""" | |
| name: str | |
| role: str = "" | |
| department: str = "" | |
| reports_to: str = "" | |
| communication_style: str = "" | |
| security_awareness: float = 0.5 # 0.0-1.0 | |
| susceptibility: dict[str, float] = Field(default_factory=dict) | |
| routine: dict[str, Any] = Field(default_factory=dict) | |
| accounts: dict[str, Any] = Field(default_factory=dict) | |
| class NPCTrafficSpec(BaseModel): | |
| """NPC traffic configuration.""" | |
| level: int = 0 # 0=shell scripts, 1=LLM personas | |
| rate_lambda: float = 10.0 # requests/minute | |
| scripts: list[str] = Field(default_factory=list) | |
| class TaskType(str, Enum): | |
| """Types of tasks agents can be assigned.""" | |
| EXPLOIT = "exploit" | |
| INVESTIGATE = "investigate" | |
| PATCH = "patch" | |
| REPORT = "report" | |
| ENDPOINT_QUERY = "endpoint_query" | |
| MULTI_STEP = "multi_step" | |
| class TaskSpec(BaseModel): | |
| """Agent-facing task descriptions (no leakage of internals).""" | |
| red_briefing: str = "" | |
| blue_briefing: str = "" | |
| task_type: str = "exploit" # Use str not enum for flexibility | |
| milestones: list[str] = Field(default_factory=list) # For multi_step tasks | |
| success_conditions: list[dict[str, Any]] = Field( | |
| default_factory=list, | |
| ) # [{type: "flag", value: "..."}, {type: "endpoint", url: "...", expect: "..."}] | |
| class SnapshotSpec(BaseModel): | |
| """Complete specification for a generated range snapshot.""" | |
| topology: dict[str, Any] = Field(default_factory=dict) | |
| truth_graph: TruthGraph = Field(default_factory=TruthGraph) | |
| golden_path: list[GoldenPathStep] = Field(default_factory=list) | |
| flags: list[FlagSpec] = Field(default_factory=list) | |
| evidence_spec: list[EvidenceItem] = Field(default_factory=list) | |
| npc_personas: list[NPCPersona] = Field(default_factory=list) | |
| npc_traffic: NPCTrafficSpec = Field(default_factory=NPCTrafficSpec) | |
| task: TaskSpec = Field(default_factory=TaskSpec) | |
| compose: dict[str, Any] = Field(default_factory=dict) # rendered docker-compose | |
| files: dict[str, str] = Field(default_factory=dict) # path -> content | |
| services: list[ServiceSpec] = Field(default_factory=list) # subprocess-mode daemons | |
| lineage: LineageMetadata = Field(default_factory=LineageMetadata) | |
| mutation_plan: MutationPlan | None = None | |
| class Stimulus(BaseModel): | |
| """Incoming stimulus for an NPC to react to.""" | |
| type: str = "email" # email, chat, file_access, voice | |
| sender: str = "" | |
| subject: str = "" | |
| content: str = "" | |
| attachments: list[str] = Field(default_factory=list) | |
| plausibility: float = 0.5 # 0.0-1.0 | |
| class NPCAction(BaseModel): | |
| """NPC's decided response to a stimulus.""" | |
| action: str = "ignore" # click_link, open_attachment, reply, share_credentials, | |
| # ignore, report_to_IT, forward | |
| response_content: str = "" | |
| side_effects: list[str] = Field(default_factory=list) | |
| class CheckResult(BaseModel): | |
| """Result of a single validator check.""" | |
| name: str = "" | |
| passed: bool = False | |
| time_s: float = 0.0 | |
| details: dict[str, Any] = Field(default_factory=dict) | |
| error: str = "" | |
| advisory: bool = False # if True, failure triggers retry but never blocks | |
| class ExecResult(BaseModel): | |
| """Structured command execution result.""" | |
| stdout: str = "" | |
| stderr: str = "" | |
| exit_code: int = 0 | |
| timed_out: bool = False | |
| def combined_output(self) -> str: | |
| parts = [self.stdout, self.stderr] | |
| return "\n".join(part for part in parts if part).strip() | |
| class ContainerSet(BaseModel): | |
| """Handle to live Docker containers for a snapshot.""" | |
| model_config = ConfigDict(arbitrary_types_allowed=True) | |
| project_name: str = "" | |
| container_ids: dict[str, str] = Field(default_factory=dict) # service -> id | |
| async def exec_run(self, container: str, cmd: str, timeout: float = 30.0) -> ExecResult: | |
| """Run *cmd* inside *container* and return structured output + status.""" | |
| import asyncio | |
| cid = self.container_ids.get(container, container) | |
| proc = await asyncio.create_subprocess_exec( | |
| "docker", "exec", cid, "sh", "-c", cmd, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| ) | |
| try: | |
| stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) | |
| except asyncio.TimeoutError: | |
| proc.kill() | |
| try: | |
| await proc.communicate() | |
| except Exception: # noqa: BLE001 | |
| pass | |
| return ExecResult(stderr="<timeout>", exit_code=124, timed_out=True) | |
| return ExecResult( | |
| stdout=(stdout or b"").decode(errors="replace"), | |
| stderr=(stderr or b"").decode(errors="replace"), | |
| exit_code=int(proc.returncode or 0), | |
| ) | |
| async def exec(self, container: str, cmd: str, timeout: float = 30.0) -> str: | |
| """Backward-compatible string output helper around ``exec_run``.""" | |
| result = await self.exec_run(container, cmd, timeout=timeout) | |
| return result.combined_output | |
| async def is_healthy(self, container: str) -> bool: | |
| """Return True when *container* is running and its healthcheck passes.""" | |
| import asyncio | |
| cid = self.container_ids.get(container, container) | |
| proc = await asyncio.create_subprocess_exec( | |
| "docker", | |
| "inspect", | |
| "--format", | |
| "{{if .State.Health}}{{.State.Health.Status}}{{else}}{{.State.Status}}{{end}}", | |
| cid, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| ) | |
| stdout, _ = await proc.communicate() | |
| status = (stdout or b"").decode().strip() | |
| return status in {"running", "healthy"} | |
| async def cp(self, container: str, src: str, dest: str) -> None: | |
| """Copy a file into a container: ``docker cp src container:dest``.""" | |
| import asyncio | |
| cid = self.container_ids.get(container, container) | |
| proc = await asyncio.create_subprocess_exec( | |
| "docker", "cp", src, f"{cid}:{dest}", | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| ) | |
| await proc.communicate() | |
| async def restart(self, container: str, timeout: float = 30.0) -> None: | |
| """Restart *container* via ``docker restart``, restoring pre-patched state.""" | |
| import asyncio | |
| cid = self.container_ids.get(container, container) | |
| proc = await asyncio.create_subprocess_exec( | |
| "docker", "restart", cid, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| ) | |
| try: | |
| await asyncio.wait_for(proc.communicate(), timeout=timeout) | |
| except asyncio.TimeoutError: | |
| proc.kill() | |
| # --------------------------------------------------------------------------- | |
| # Protocols | |
| # --------------------------------------------------------------------------- | |
| class SnapshotBuilder(Protocol): | |
| """Generate a candidate snapshot spec from a manifest.""" | |
| async def build( | |
| self, | |
| manifest: dict, | |
| context: BuildContext, | |
| ) -> SnapshotSpec: ... | |
| class NPCBehavior(Protocol): | |
| """Decide how an NPC responds to a stimulus.""" | |
| async def decide( | |
| self, | |
| persona: NPCPersona, | |
| stimulus: Stimulus, | |
| ) -> NPCAction: ... | |
| class ValidatorCheck(Protocol): | |
| """Single check in the validator admission pipeline.""" | |
| async def check( | |
| self, | |
| snapshot: SnapshotSpec, | |
| containers: ContainerSet, | |
| ) -> CheckResult: ... | |