Spaces:
Sleeping
Sleeping
| import uuid | |
| from enum import Enum | |
| from typing import List, Optional, Dict, Any | |
| from pydantic import BaseModel, Field | |
| from openenv.core import Action, Observation, State | |
| class ActionType(str, Enum): | |
| BLOCK_IP = "block_ip" | |
| QUARANTINE_FILE = "quarantine_file" | |
| INSPECT_IP = "inspect_ip" | |
| QUERY_LOGS = "query_logs" # Essential for Multi-Step Reasoning | |
| ALLOW = "allow" | |
| NOOP = "noop" | |
| class LogEntry(BaseModel): | |
| timestamp: str | |
| source_ip: str | |
| destination_ip: str | |
| port: int | |
| protocol: str | |
| message: str | |
| severity: str = "INFO" | |
| class SecurityAction(Action): | |
| action_type: ActionType | |
| target: Optional[str] = Field(None, description="The target IP, File, or Query string.") | |
| reason: Optional[str] = Field(None, description="Detailed justification for the action.") | |
| class SecurityObservation(Observation): | |
| # NATURAL LANGUAGE AMBIGUITY: The primary signal | |
| alert_text: str = Field(..., description="Ambiguous natural language alert.") | |
| # ACTIONABLE ERROR RECOVERY: Specific hints for the agent | |
| error_context: Optional[str] = Field(None, description="Feedback for malformed or unauthorized actions.") | |
| # SYSTEM STATE | |
| system_load: float = 0.0 | |
| blocked_ips: List[str] = Field(default_factory=list) | |
| inspection_result: Optional[str] = None | |
| # REVEALED DATA (Only populated after QUERY_LOGS) | |
| queried_logs: List[LogEntry] = Field(default_factory=list) | |
| # METRICS for RL training | |
| metrics: Dict[str, float] = Field(default_factory=dict) | |
| class SecurityState(State): | |
| """The 'Hidden Truth' - Never visible to the agent.""" | |
| episode_id: str = Field(default_factory=lambda: str(uuid.uuid4())) | |
| is_under_attack: bool = False | |
| attacker_ips: List[str] = Field(default_factory=list) | |
| infrastructure_health: float = 1.0 | |
| dwell_time: int = 0 | |
| logs_unlocked: bool = False | |