Spaces:
Runtime error
Runtime error
| """NPC traffic orchestrator. | |
| Starts Level 0 shell-script traffic generators and (optionally) Level 1 | |
| LLM-driven NPC agents for a given snapshot. Multimodal NPC channels | |
| (chat, voice, document) are initialised at start and their activity logs | |
| are available for SIEM consumption. | |
| In **mock mode** (``mock_mode=True``), no Docker exec or LLM calls are | |
| made. Only synthetic chat traffic is generated from the | |
| ``chat_traffic`` module, so unit tests can exercise the NPC pipeline | |
| without infrastructure. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import base64 | |
| import logging | |
| from pathlib import Path | |
| from typing import Any | |
| from open_range.builder.npc.channels import ChatChannel, DocumentChannel, VoiceChannel | |
| from open_range.protocols import ContainerSet, SnapshotSpec | |
| logger = logging.getLogger(__name__) | |
| _SCRIPT_DIR = Path(__file__).parent | |
| # --------------------------------------------------------------------------- | |
| # Service keyword mappings used to match script prefixes to topology hosts | |
| # and to resolve well-known env-var roles from service lists. | |
| # --------------------------------------------------------------------------- | |
| # Map a script filename keyword to service keywords that indicate a host | |
| # can run that script. Order matters for priority within each entry. | |
| _SCRIPT_SERVICE_KEYWORDS: dict[str, list[str]] = { | |
| "http": ["nginx", "apache", "httpd", "web", "php-fpm"], | |
| "db": ["mysql", "mariadb", "postgres", "postgresql", "mongodb", "redis"], | |
| "ssh": ["nmap", "hydra", "nikto", "ssh-client", "attacker", "sshd"], | |
| "smtp": ["postfix", "sendmail", "exim", "dovecot", "mail"], | |
| } | |
| # Map an env-var role (e.g. WEB_HOST) to service keywords that identify the | |
| # host fulfilling that role. | |
| _ROLE_SERVICE_KEYWORDS: dict[str, list[str]] = { | |
| "WEB_HOST": ["nginx", "apache", "httpd", "web", "php-fpm"], | |
| "DB_HOST": ["mysql", "mariadb", "postgres", "postgresql", "mongodb"], | |
| "MAIL_HOST": ["postfix", "sendmail", "dovecot", "mail"], | |
| "LDAP_HOST": ["openldap", "ldap", "slapd"], | |
| "SIEM_HOST": ["rsyslog", "elasticsearch", "siem", "splunk"], | |
| } | |
| def _hosts_from_topology(topology: dict[str, Any]) -> list[dict[str, Any]]: | |
| """Return normalized host dicts for compiled or manifest-style topology. | |
| ``compile_manifest_topology()`` canonicalizes ``topology["hosts"]`` to a | |
| list of host names and keeps the richer metadata in ``host_catalog`` / | |
| ``host_details``. NPC helpers need the richer dict shape, so normalize the | |
| compiled form back into ``{"name": ..., "services": ...}`` records here. | |
| """ | |
| raw_hosts = topology.get("hosts") or [] | |
| host_catalog = topology.get("host_catalog") | |
| if not isinstance(host_catalog, dict): | |
| host_catalog = {} | |
| host_details = topology.get("host_details") | |
| if not isinstance(host_details, dict): | |
| host_details = {} | |
| hosts: list[dict[str, Any]] = [] | |
| seen: set[str] = set() | |
| def _append_host(raw_host: Any) -> None: | |
| if isinstance(raw_host, dict): | |
| name = str(raw_host.get("name", "")).strip() | |
| else: | |
| name = str(raw_host).strip() | |
| if not name or name in seen: | |
| return | |
| merged: dict[str, Any] = {} | |
| catalog_detail = host_catalog.get(name) | |
| if isinstance(catalog_detail, dict): | |
| merged.update(catalog_detail) | |
| detailed_detail = host_details.get(name) | |
| if isinstance(detailed_detail, dict): | |
| merged.update(detailed_detail) | |
| if isinstance(raw_host, dict): | |
| merged.update(raw_host) | |
| merged["name"] = name | |
| services = merged.get("services") | |
| merged["services"] = list(services) if isinstance(services, list) else [] | |
| seen.add(name) | |
| hosts.append(merged) | |
| if isinstance(raw_hosts, list): | |
| for raw_host in raw_hosts: | |
| _append_host(raw_host) | |
| for name in host_catalog: | |
| _append_host(name) | |
| for name in host_details: | |
| _append_host(name) | |
| return hosts | |
| def _host_matches_keywords(host: dict[str, Any], keywords: list[str]) -> bool: | |
| """Return True if the host's name or any of its services match *keywords*.""" | |
| host_name = (host.get("name") or "").lower() | |
| services = [s.lower() for s in (host.get("services") or [])] | |
| for kw in keywords: | |
| kw_lower = kw.lower() | |
| if kw_lower in host_name or any(kw_lower in svc for svc in services): | |
| return True | |
| return False | |
| def _container_for_script(script_name: str, topology: dict[str, Any]) -> str: | |
| """Determine which container a script should run inside. | |
| Matches the script filename against service keywords in the topology | |
| hosts. Falls back to the first host if nothing matches. | |
| """ | |
| hosts = _hosts_from_topology(topology) | |
| if not hosts: | |
| return "web" # legacy fallback when topology is empty | |
| for prefix, keywords in _SCRIPT_SERVICE_KEYWORDS.items(): | |
| if prefix in script_name.lower(): | |
| for host in hosts: | |
| if _host_matches_keywords(host, keywords): | |
| return host["name"] | |
| break # prefix matched but no host found; fall through | |
| # Default: first host in topology | |
| return hosts[0].get("name", "web") | |
| def _resolve_env_vars(topology: dict[str, Any], rate_lambda: float) -> dict[str, str]: | |
| """Build environment variables by resolving roles and credentials from topology. | |
| Resolves host roles (WEB_HOST, DB_HOST, etc.) and credentials (DB_USER, | |
| DB_PASS, SSH_USER, SSH_PASS) from the topology so shell scripts don't | |
| need hardcoded values. | |
| """ | |
| hosts = _hosts_from_topology(topology) | |
| env: dict[str, str] = {"RATE_LAMBDA": str(int(rate_lambda))} | |
| for role, keywords in _ROLE_SERVICE_KEYWORDS.items(): | |
| for host in hosts: | |
| if _host_matches_keywords(host, keywords): | |
| env[role] = host["name"] | |
| break | |
| # Pass DB and SSH credentials from topology to shell scripts | |
| users = topology.get("users", []) | |
| for user in users: | |
| if not isinstance(user, dict): | |
| continue | |
| hosts_list = user.get("hosts", []) | |
| if "db" in hosts_list and "DB_USER" not in env: | |
| env["DB_USER"] = user.get("username", "app_user") | |
| env["DB_PASS"] = user.get("password", "AppUs3r!2024") | |
| if any(h in hosts_list for h in ("web", "files", "ldap", "siem")): | |
| role = user.get("role", "") | |
| if role in ("admin", "sysadmin", "root") and "SSH_USER" not in env: | |
| env["SSH_USER"] = user.get("username", "admin") | |
| env["SSH_PASS"] = user.get("password", "Adm1n!2024") | |
| return env | |
| def _derive_scripts_from_topology(topology: dict[str, Any]) -> list[str]: | |
| """Derive available NPC scripts from topology services. | |
| Scans the topology hosts and checks which script prefixes have a | |
| matching host. Only returns scripts that actually exist on disk. | |
| """ | |
| hosts = _hosts_from_topology(topology) | |
| scripts: list[str] = [] | |
| for prefix, keywords in _SCRIPT_SERVICE_KEYWORDS.items(): | |
| for host in hosts: | |
| if _host_matches_keywords(host, keywords): | |
| candidate = f"{prefix}_traffic.sh" | |
| if (_SCRIPT_DIR / candidate).exists(): | |
| scripts.append(candidate) | |
| break # one match per prefix is enough | |
| return scripts | |
| class NPCManager: | |
| """Start and stop NPC background traffic for a snapshot. | |
| Args: | |
| mock_mode: When True, skip Docker exec and LLM calls (unit tests). | |
| model: LiteLLM model string for Level 1 NPC agents. | |
| Defaults to ``OPENRANGE_NPC_MODEL`` env var, then | |
| ``azure/gpt-5.2-codex``. Any LiteLLM-supported model works | |
| (e.g. ``openai/gpt-4o``, ``anthropic/claude-haiku-4-5-20251001``, | |
| ``ollama/llama3``). | |
| """ | |
| def __init__(self, mock_mode: bool = False, model: str | None = None) -> None: | |
| self._mock_mode = mock_mode | |
| self._model = model # passed to LLMNPCAgent | |
| self._processes: list[asyncio.subprocess.Process] = [] | |
| self._tasks: list[asyncio.Task[Any]] = [] | |
| self._running = False | |
| self._npc_agents: list[Any] = [] # LLMNPCAgent instances | |
| # Containers where scripts were deployed (for cleanup) | |
| self._script_containers: list[str] = [] | |
| self._containers: ContainerSet | None = None | |
| # Multimodal NPC communication channels | |
| self.channels: dict[str, ChatChannel | VoiceChannel | DocumentChannel] = { | |
| "chat": ChatChannel(), | |
| "voice": VoiceChannel(), | |
| "document": DocumentChannel(), | |
| } | |
| # ----------------------------------------------------------------- | |
| # Async start / stop (used when an event loop is available) | |
| # ----------------------------------------------------------------- | |
| async def start( | |
| self, | |
| snapshot: SnapshotSpec, | |
| containers: ContainerSet | None = None, | |
| ) -> None: | |
| """Start NPC traffic generators. | |
| Level 0: shell scripts (http, ssh, db traffic loops). | |
| Level 1: LLM NPC agents (deferred to npc_agent.py). | |
| In mock mode, only synthetic chat traffic is generated. | |
| """ | |
| if self._running: | |
| await self.stop() | |
| self._running = True | |
| self._containers = containers | |
| npc_cfg = snapshot.npc_traffic | |
| # Re-initialise channels for the new episode | |
| self.channels = { | |
| "chat": ChatChannel(), | |
| "voice": VoiceChannel(), | |
| "document": DocumentChannel(), | |
| } | |
| # Generate Level 0 chat traffic if personas are available | |
| if snapshot.npc_personas and len(snapshot.npc_personas) >= 2: | |
| from open_range.builder.npc.chat_traffic import generate_chat_traffic | |
| chat_ch = self.channels["chat"] | |
| assert isinstance(chat_ch, ChatChannel) | |
| generate_chat_traffic( | |
| personas=snapshot.npc_personas, | |
| channel=chat_ch, | |
| num_messages=10, | |
| ) | |
| logger.info( | |
| "Generated %d chat messages for %d personas", | |
| len(chat_ch.get_channel_log()), | |
| len(snapshot.npc_personas), | |
| ) | |
| # In mock mode, skip Docker exec and LLM agent loops | |
| if self._mock_mode: | |
| logger.info("NPC manager running in mock mode (no Docker/LLM)") | |
| return | |
| topology = snapshot.topology | |
| # Determine which scripts to run -- derive from topology when | |
| # the snapshot does not specify scripts explicitly. | |
| scripts = npc_cfg.scripts or _derive_scripts_from_topology(topology) | |
| # Resolve environment variables (WEB_HOST, DB_HOST, etc.) from | |
| # the topology instead of hardcoding host names. | |
| env_vars = _resolve_env_vars(topology, npc_cfg.rate_lambda) | |
| for script_name in scripts: | |
| script_path = _SCRIPT_DIR / script_name | |
| if not script_path.exists(): | |
| logger.warning("NPC script not found: %s", script_path) | |
| continue | |
| container = _container_for_script(script_name, topology) | |
| logger.info( | |
| "Starting NPC script: %s in container %s (rate=%s)", | |
| script_name, container, npc_cfg.rate_lambda, | |
| ) | |
| if containers is not None: | |
| # Run script inside the target container via docker exec | |
| try: | |
| script_content = script_path.read_text() | |
| encoded = base64.b64encode(script_content.encode()).decode() | |
| env_prefix = " ".join( | |
| f"{k}={v}" for k, v in env_vars.items() | |
| ) | |
| await containers.exec( | |
| container, | |
| f"echo {encoded} | base64 -d > /tmp/{script_name} " | |
| f"&& chmod +x /tmp/{script_name} " | |
| f"&& {env_prefix} nohup bash /tmp/{script_name} " | |
| f"> /dev/null 2>&1 &", | |
| ) | |
| self._script_containers.append(container) | |
| except Exception as exc: | |
| logger.warning( | |
| "Failed to start NPC script %s in container %s: %s", | |
| script_name, container, exc, | |
| ) | |
| else: | |
| # Fallback: run on host (original behavior) | |
| try: | |
| proc = await asyncio.create_subprocess_exec( | |
| "bash", | |
| str(script_path), | |
| stdout=asyncio.subprocess.DEVNULL, | |
| stderr=asyncio.subprocess.DEVNULL, | |
| env=env_vars, | |
| ) | |
| self._processes.append(proc) | |
| except OSError as exc: | |
| logger.warning("Failed to start NPC script %s: %s", script_name, exc) | |
| # Level 1 LLM NPCs -- start async agent loops if personas are present | |
| if npc_cfg.level >= 1 and snapshot.npc_personas and containers is not None: | |
| from open_range.builder.npc.npc_agent import LLMNPCAgent | |
| for persona in snapshot.npc_personas: | |
| agent = LLMNPCAgent(model=self._model) | |
| task = asyncio.create_task( | |
| agent.run_loop(persona, containers, snapshot), | |
| name=f"npc_{persona.name}", | |
| ) | |
| self._tasks.append(task) | |
| self._npc_agents.append(agent) | |
| logger.info("Started LLM NPC agent: %s", persona.name) | |
| async def stop(self) -> None: | |
| """Stop all NPC traffic generators and agents.""" | |
| # Cancel async NPC agent tasks | |
| for task in self._tasks: | |
| task.cancel() | |
| if self._tasks: | |
| await asyncio.gather(*self._tasks, return_exceptions=True) | |
| self._tasks.clear() | |
| self._npc_agents.clear() | |
| # Terminate shell script processes (host-mode fallback) | |
| for proc in self._processes: | |
| try: | |
| proc.terminate() | |
| await asyncio.wait_for(proc.wait(), timeout=5.0) | |
| except (ProcessLookupError, asyncio.TimeoutError): | |
| try: | |
| proc.kill() | |
| except ProcessLookupError: | |
| pass | |
| self._processes.clear() | |
| # Kill background scripts inside containers | |
| if self._containers is not None: | |
| for container in set(self._script_containers): | |
| try: | |
| await self._containers.exec( | |
| container, | |
| "pkill -f 'npc.*traffic' 2>/dev/null || true", | |
| ) | |
| except Exception: | |
| pass | |
| self._script_containers.clear() | |
| self._containers = None | |
| # Clear channel state | |
| for ch in self.channels.values(): | |
| ch.clear() | |
| self._running = False | |
| logger.info("All NPC traffic stopped.") | |
| # ----------------------------------------------------------------- | |
| # Synchronous wrappers (for callers without an event loop) | |
| # ----------------------------------------------------------------- | |
| def start_sync(self, snapshot: SnapshotSpec, containers: ContainerSet | None = None) -> None: | |
| """Synchronous wrapper around :meth:`start`. | |
| Uses the running event loop if available, otherwise creates a new one. | |
| """ | |
| try: | |
| loop = asyncio.get_running_loop() | |
| except RuntimeError: | |
| loop = None | |
| if loop and loop.is_running(): | |
| # We're inside an async context -- schedule and return. | |
| # Since we can't await here, run the coroutine eagerly using | |
| # loop.run_until_complete which won't work if a loop is running. | |
| # Instead, just call the sync-safe parts directly. | |
| self._start_sync_inner(snapshot, containers) | |
| else: | |
| asyncio.run(self.start(snapshot, containers)) | |
| def stop_sync(self) -> None: | |
| """Synchronous wrapper around :meth:`stop`.""" | |
| try: | |
| loop = asyncio.get_running_loop() | |
| except RuntimeError: | |
| loop = None | |
| if loop and loop.is_running(): | |
| self._stop_sync_inner() | |
| else: | |
| asyncio.run(self.stop()) | |
| def _start_sync_inner(self, snapshot: SnapshotSpec, containers: ContainerSet | None = None) -> None: | |
| """Synchronous start that avoids asyncio for mock mode and chat traffic.""" | |
| if self._running: | |
| self._stop_sync_inner() | |
| self._running = True | |
| self._containers = containers | |
| # Re-initialise channels for the new episode | |
| self.channels = { | |
| "chat": ChatChannel(), | |
| "voice": VoiceChannel(), | |
| "document": DocumentChannel(), | |
| } | |
| # Generate Level 0 chat traffic if personas are available | |
| if snapshot.npc_personas and len(snapshot.npc_personas) >= 2: | |
| from open_range.builder.npc.chat_traffic import generate_chat_traffic | |
| chat_ch = self.channels["chat"] | |
| assert isinstance(chat_ch, ChatChannel) | |
| generate_chat_traffic( | |
| personas=snapshot.npc_personas, | |
| channel=chat_ch, | |
| num_messages=10, | |
| ) | |
| logger.info( | |
| "Generated %d chat messages for %d personas", | |
| len(chat_ch.get_channel_log()), | |
| len(snapshot.npc_personas), | |
| ) | |
| if self._mock_mode: | |
| logger.info("NPC manager running in mock mode (no Docker/LLM)") | |
| return | |
| # In live mode with an active event loop, schedule async start | |
| # for scripts and LLM agents. This is best-effort -- if it | |
| # fails, the chat traffic is already available. | |
| if containers is not None: | |
| logger.info( | |
| "NPC live scripts deferred (use async start() for full support)" | |
| ) | |
| def _stop_sync_inner(self) -> None: | |
| """Synchronous stop for mock mode (no async cleanup needed).""" | |
| # Cancel any asyncio tasks that may exist | |
| for task in self._tasks: | |
| task.cancel() | |
| self._tasks.clear() | |
| self._npc_agents.clear() | |
| self._processes.clear() | |
| self._script_containers.clear() | |
| self._containers = None | |
| for ch in self.channels.values(): | |
| ch.clear() | |
| self._running = False | |
| # ----------------------------------------------------------------- | |
| # Traffic log for reward computation | |
| # ----------------------------------------------------------------- | |
| def get_traffic_log(self) -> list[dict[str, Any]]: | |
| """Return all NPC activity for reward computation. | |
| Combines SIEM channel logs with LLM NPC agent action logs. | |
| """ | |
| logs = self.get_siem_log() | |
| # Append LLM NPC agent actions | |
| for agent in self._npc_agents: | |
| try: | |
| logs.extend(agent.get_actions()) | |
| except Exception: | |
| pass | |
| logs.sort(key=lambda e: e.get("timestamp", 0)) | |
| return logs | |
| def running(self) -> bool: | |
| """Whether NPC traffic is currently active.""" | |
| return self._running | |
| def get_siem_log(self) -> list[dict[str, Any]]: | |
| """Aggregate activity logs from all channels for SIEM consumption.""" | |
| logs: list[dict[str, Any]] = [] | |
| chat_ch = self.channels.get("chat") | |
| if isinstance(chat_ch, ChatChannel): | |
| logs.extend(chat_ch.get_channel_log()) | |
| voice_ch = self.channels.get("voice") | |
| if isinstance(voice_ch, VoiceChannel): | |
| logs.extend(voice_ch.get_call_log()) | |
| doc_ch = self.channels.get("document") | |
| if isinstance(doc_ch, DocumentChannel): | |
| logs.extend(doc_ch.get_document_log()) | |
| # Sort by timestamp | |
| logs.sort(key=lambda e: e.get("timestamp", 0)) | |
| return logs | |