Spaces:
Sleeping
Sleeping
| """Pydantic data models for the Cybersec environment. | |
| This module defines the public contract between the environment, agents, | |
| and any downstream training/evaluation code. All field names and types are | |
| stable; downstream tooling (the LLM prompt formatter, the reward shaping in | |
| training scripts, the heuristic baseline) reads these classes directly. | |
| """ | |
| from __future__ import annotations | |
| from enum import Enum | |
| from typing import Any, Dict, List, Optional | |
| from pydantic import BaseModel, ConfigDict, Field, model_validator | |
| from openenv.core.env_server.types import Action, Observation, State | |
| # --------------------------------------------------------------------------- | |
| # Enums | |
| # --------------------------------------------------------------------------- | |
| class ActionType(str, Enum): | |
| """The six defender actions exposed to the policy. | |
| The action surface is intentionally small so a 1.5B-parameter LLM can | |
| reliably emit valid JSON and reason about target selection. | |
| """ | |
| MONITOR = "MONITOR" | |
| INVESTIGATE = "INVESTIGATE" | |
| ISOLATE_ASSET = "ISOLATE_ASSET" | |
| REVOKE_IDENTITY = "REVOKE_IDENTITY" | |
| BLOCK_EGRESS = "BLOCK_EGRESS" | |
| PATCH_ASSET = "PATCH_ASSET" | |
| class AttackerPersonality(str, Enum): | |
| """Three scripted attacker archetypes sampled per episode.""" | |
| STEALTHY = "stealthy" | |
| AGGRESSIVE = "aggressive" | |
| OPPORTUNISTIC = "opportunistic" | |
| class AlertSignal(str, Enum): | |
| """Coarse signal type attached to every alert. | |
| Real SOCs surface dozens of signal kinds; we keep five buckets so the | |
| policy can learn distinct response patterns without combinatorial blow-up. | |
| """ | |
| AUTH_ANOMALY = "auth_anomaly" | |
| LATERAL_MOVEMENT = "lateral_movement" | |
| DATA_STAGING = "data_staging" | |
| EGRESS_ANOMALY = "egress_anomaly" | |
| BACKGROUND_NOISE = "background_noise" | |
| # --------------------------------------------------------------------------- | |
| # Public data records (used inside Observation) | |
| # --------------------------------------------------------------------------- | |
| class AlertEvent(BaseModel): | |
| """One alert visible to the defender at a given tick. | |
| Alerts are produced by the telemetry engine. Real adversary actions | |
| surface as alerts only after a stochastic detection delay; benign | |
| background noise also produces alerts. Severity is the only signal the | |
| defender has to discriminate the two. | |
| """ | |
| model_config = ConfigDict(extra="forbid") | |
| tick: int = Field(..., description="Tick at which the alert became visible") | |
| signal: AlertSignal = Field(..., description="Coarse signal category") | |
| asset: Optional[str] = Field(default=None, description="Asset implicated, if any") | |
| identity: Optional[str] = Field(default=None, description="Identity implicated, if any") | |
| severity: float = Field(..., ge=0.0, le=1.0, description="0..1 confidence proxy") | |
| description: str = Field(default="", description="Short human-readable hint") | |
| class ForensicResult(BaseModel): | |
| """Outcome of an INVESTIGATE action. | |
| Forensics return a noisy ground-truth signal: confidence rises with the | |
| asset/identity actually being on the active attack path, but it is never | |
| exactly 1.0 to keep the defender from blindly trusting one query. | |
| """ | |
| model_config = ConfigDict(extra="forbid") | |
| tick: int = Field(..., description="Tick the investigation completed") | |
| target: str = Field(..., description="Asset or identity that was investigated") | |
| target_kind: str = Field(..., description="'asset' or 'identity'") | |
| is_compromised: bool = Field(..., description="Investigator's verdict") | |
| confidence: float = Field(..., ge=0.0, le=1.0, description="Confidence in the verdict") | |
| class RewardBreakdown(BaseModel): | |
| """Per-step reward decomposition exposed for analysis and reward shaping.""" | |
| model_config = ConfigDict(extra="forbid") | |
| detection: float = 0.0 | |
| containment: float = 0.0 | |
| evidence_bonus: float = 0.0 | |
| false_positive_penalty: float = 0.0 | |
| disruption_penalty: float = 0.0 | |
| invalid_action_penalty: float = 0.0 | |
| terminal_score: float = 0.0 | |
| total: float = 0.0 | |
| # --------------------------------------------------------------------------- | |
| # Action | |
| # --------------------------------------------------------------------------- | |
| class CybersecAction(Action): | |
| """Defender action. | |
| All six action types take at most a single string ``target``; this keeps | |
| the JSON the policy must emit minimal: | |
| {"action_type": "ISOLATE_ASSET", "target": "asset-web-01"} | |
| Validation rules (enforced server-side): | |
| * MONITOR forbids a target. | |
| * INVESTIGATE requires a target (asset or identity). | |
| * ISOLATE_ASSET / BLOCK_EGRESS / PATCH_ASSET require an asset target. | |
| * REVOKE_IDENTITY requires an identity target. | |
| Cross-checking the target against the live ``valid_targets`` dictionary | |
| in the observation is the environment's responsibility, not Pydantic's. | |
| """ | |
| action_type: ActionType = Field(..., description="One of the six action verbs") | |
| target: Optional[str] = Field(default=None, description="Asset or identity id") | |
| def _check_target_presence(self) -> "CybersecAction": | |
| requires_target = { | |
| ActionType.INVESTIGATE, | |
| ActionType.ISOLATE_ASSET, | |
| ActionType.REVOKE_IDENTITY, | |
| ActionType.BLOCK_EGRESS, | |
| ActionType.PATCH_ASSET, | |
| } | |
| if self.action_type in requires_target and not self.target: | |
| raise ValueError(f"{self.action_type.value} requires a non-empty target") | |
| if self.action_type is ActionType.MONITOR and self.target: | |
| raise ValueError("MONITOR must not be given a target") | |
| return self | |
| # --------------------------------------------------------------------------- | |
| # Observation | |
| # --------------------------------------------------------------------------- | |
| class CybersecObservation(Observation): | |
| """Defender's view of the world at a single tick. | |
| Fields are partitioned into: | |
| * Episode meta: tick / horizon / scenario_id / attacker_personality. | |
| * Telemetry: ``alerts`` (recent, lag-delayed) and ``forensics`` (responses | |
| to past INVESTIGATE actions). | |
| * Defender controls: which assets/identities are isolated, revoked, | |
| blocked, patched, or have been confirmed compromised. | |
| * Action grounding: ``available_actions`` and ``valid_targets`` so the | |
| policy can be hard-constrained to the legal action space. | |
| * ``info``: free-form dict carrying ``reward_breakdown`` every step and | |
| ``terminal`` info on the final tick. | |
| """ | |
| tick: int = Field(..., description="Current tick (0-indexed)") | |
| horizon: int = Field(..., description="Maximum number of ticks in the episode") | |
| scenario_id: str = Field(..., description="Active scenario identifier") | |
| attacker_personality: AttackerPersonality = Field( | |
| ..., description="Sampled attacker archetype for this episode" | |
| ) | |
| alerts: List[AlertEvent] = Field( | |
| default_factory=list, | |
| description="Alerts visible to the defender (most recent last)", | |
| ) | |
| forensics: List[ForensicResult] = Field( | |
| default_factory=list, | |
| description="Investigation results received so far", | |
| ) | |
| isolated_assets: List[str] = Field(default_factory=list) | |
| revoked_identities: List[str] = Field(default_factory=list) | |
| blocked_egress_assets: List[str] = Field(default_factory=list) | |
| patched_assets: List[str] = Field(default_factory=list) | |
| confirmed_compromised: List[str] = Field( | |
| default_factory=list, | |
| description="Targets the defender has positively identified as compromised", | |
| ) | |
| valid_targets: Dict[str, List[str]] = Field( | |
| default_factory=dict, | |
| description="{'assets': [...], 'identities': [...]} - canonical legal targets", | |
| ) | |
| available_actions: List[ActionType] = Field( | |
| default_factory=list, | |
| description="Action verbs currently legal (filtered by remaining cooldowns/etc.)", | |
| ) | |
| info: Dict[str, Any] = Field( | |
| default_factory=dict, | |
| description="Per-step diagnostics: reward_breakdown, terminal stats, debug", | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # State (server-side only; never returned to clients verbatim) | |
| # --------------------------------------------------------------------------- | |
| class CybersecState(State): | |
| """Coarse state snapshot for debugging / GET /state. | |
| Intentionally lightweight: the full ground-truth WorldState lives in | |
| ``cybersec.server.cybersec_environment`` and is kept private. Only fields | |
| safe to expose on the OpenEnv state endpoint live here. | |
| """ | |
| scenario_id: Optional[str] = None | |
| attacker_personality: Optional[AttackerPersonality] = None | |
| tick: int = 0 | |
| horizon: int = 0 | |
| completed_attack_stages: List[str] = Field(default_factory=list) | |
| cumulative_reward: float = 0.0 | |
| done: bool = False | |
| __all__ = [ | |
| "ActionType", | |
| "AttackerPersonality", | |
| "AlertSignal", | |
| "AlertEvent", | |
| "ForensicResult", | |
| "RewardBreakdown", | |
| "CybersecAction", | |
| "CybersecObservation", | |
| "CybersecState", | |
| ] | |