open-range / src /open_range /builder /npc /npc_manager.py
Lars Talian
fix(npc): normalize compiled topology hosts (#103)
bafb155 unverified
"""NPC traffic orchestrator.
Starts Level 0 shell-script traffic generators and (optionally) Level 1
LLM-driven NPC agents for a given snapshot. Multimodal NPC channels
(chat, voice, document) are initialised at start and their activity logs
are available for SIEM consumption.
In **mock mode** (``mock_mode=True``), no Docker exec or LLM calls are
made. Only synthetic chat traffic is generated from the
``chat_traffic`` module, so unit tests can exercise the NPC pipeline
without infrastructure.
"""
from __future__ import annotations
import asyncio
import base64
import logging
from pathlib import Path
from typing import Any
from open_range.builder.npc.channels import ChatChannel, DocumentChannel, VoiceChannel
from open_range.protocols import ContainerSet, SnapshotSpec
logger = logging.getLogger(__name__)
_SCRIPT_DIR = Path(__file__).parent
# ---------------------------------------------------------------------------
# Service keyword mappings used to match script prefixes to topology hosts
# and to resolve well-known env-var roles from service lists.
# ---------------------------------------------------------------------------
# Map a script filename keyword to service keywords that indicate a host
# can run that script. Order matters for priority within each entry.
_SCRIPT_SERVICE_KEYWORDS: dict[str, list[str]] = {
"http": ["nginx", "apache", "httpd", "web", "php-fpm"],
"db": ["mysql", "mariadb", "postgres", "postgresql", "mongodb", "redis"],
"ssh": ["nmap", "hydra", "nikto", "ssh-client", "attacker", "sshd"],
"smtp": ["postfix", "sendmail", "exim", "dovecot", "mail"],
}
# Map an env-var role (e.g. WEB_HOST) to service keywords that identify the
# host fulfilling that role.
_ROLE_SERVICE_KEYWORDS: dict[str, list[str]] = {
"WEB_HOST": ["nginx", "apache", "httpd", "web", "php-fpm"],
"DB_HOST": ["mysql", "mariadb", "postgres", "postgresql", "mongodb"],
"MAIL_HOST": ["postfix", "sendmail", "dovecot", "mail"],
"LDAP_HOST": ["openldap", "ldap", "slapd"],
"SIEM_HOST": ["rsyslog", "elasticsearch", "siem", "splunk"],
}
def _hosts_from_topology(topology: dict[str, Any]) -> list[dict[str, Any]]:
"""Return normalized host dicts for compiled or manifest-style topology.
``compile_manifest_topology()`` canonicalizes ``topology["hosts"]`` to a
list of host names and keeps the richer metadata in ``host_catalog`` /
``host_details``. NPC helpers need the richer dict shape, so normalize the
compiled form back into ``{"name": ..., "services": ...}`` records here.
"""
raw_hosts = topology.get("hosts") or []
host_catalog = topology.get("host_catalog")
if not isinstance(host_catalog, dict):
host_catalog = {}
host_details = topology.get("host_details")
if not isinstance(host_details, dict):
host_details = {}
hosts: list[dict[str, Any]] = []
seen: set[str] = set()
def _append_host(raw_host: Any) -> None:
if isinstance(raw_host, dict):
name = str(raw_host.get("name", "")).strip()
else:
name = str(raw_host).strip()
if not name or name in seen:
return
merged: dict[str, Any] = {}
catalog_detail = host_catalog.get(name)
if isinstance(catalog_detail, dict):
merged.update(catalog_detail)
detailed_detail = host_details.get(name)
if isinstance(detailed_detail, dict):
merged.update(detailed_detail)
if isinstance(raw_host, dict):
merged.update(raw_host)
merged["name"] = name
services = merged.get("services")
merged["services"] = list(services) if isinstance(services, list) else []
seen.add(name)
hosts.append(merged)
if isinstance(raw_hosts, list):
for raw_host in raw_hosts:
_append_host(raw_host)
for name in host_catalog:
_append_host(name)
for name in host_details:
_append_host(name)
return hosts
def _host_matches_keywords(host: dict[str, Any], keywords: list[str]) -> bool:
"""Return True if the host's name or any of its services match *keywords*."""
host_name = (host.get("name") or "").lower()
services = [s.lower() for s in (host.get("services") or [])]
for kw in keywords:
kw_lower = kw.lower()
if kw_lower in host_name or any(kw_lower in svc for svc in services):
return True
return False
def _container_for_script(script_name: str, topology: dict[str, Any]) -> str:
"""Determine which container a script should run inside.
Matches the script filename against service keywords in the topology
hosts. Falls back to the first host if nothing matches.
"""
hosts = _hosts_from_topology(topology)
if not hosts:
return "web" # legacy fallback when topology is empty
for prefix, keywords in _SCRIPT_SERVICE_KEYWORDS.items():
if prefix in script_name.lower():
for host in hosts:
if _host_matches_keywords(host, keywords):
return host["name"]
break # prefix matched but no host found; fall through
# Default: first host in topology
return hosts[0].get("name", "web")
def _resolve_env_vars(topology: dict[str, Any], rate_lambda: float) -> dict[str, str]:
"""Build environment variables by resolving roles and credentials from topology.
Resolves host roles (WEB_HOST, DB_HOST, etc.) and credentials (DB_USER,
DB_PASS, SSH_USER, SSH_PASS) from the topology so shell scripts don't
need hardcoded values.
"""
hosts = _hosts_from_topology(topology)
env: dict[str, str] = {"RATE_LAMBDA": str(int(rate_lambda))}
for role, keywords in _ROLE_SERVICE_KEYWORDS.items():
for host in hosts:
if _host_matches_keywords(host, keywords):
env[role] = host["name"]
break
# Pass DB and SSH credentials from topology to shell scripts
users = topology.get("users", [])
for user in users:
if not isinstance(user, dict):
continue
hosts_list = user.get("hosts", [])
if "db" in hosts_list and "DB_USER" not in env:
env["DB_USER"] = user.get("username", "app_user")
env["DB_PASS"] = user.get("password", "AppUs3r!2024")
if any(h in hosts_list for h in ("web", "files", "ldap", "siem")):
role = user.get("role", "")
if role in ("admin", "sysadmin", "root") and "SSH_USER" not in env:
env["SSH_USER"] = user.get("username", "admin")
env["SSH_PASS"] = user.get("password", "Adm1n!2024")
return env
def _derive_scripts_from_topology(topology: dict[str, Any]) -> list[str]:
"""Derive available NPC scripts from topology services.
Scans the topology hosts and checks which script prefixes have a
matching host. Only returns scripts that actually exist on disk.
"""
hosts = _hosts_from_topology(topology)
scripts: list[str] = []
for prefix, keywords in _SCRIPT_SERVICE_KEYWORDS.items():
for host in hosts:
if _host_matches_keywords(host, keywords):
candidate = f"{prefix}_traffic.sh"
if (_SCRIPT_DIR / candidate).exists():
scripts.append(candidate)
break # one match per prefix is enough
return scripts
class NPCManager:
"""Start and stop NPC background traffic for a snapshot.
Args:
mock_mode: When True, skip Docker exec and LLM calls (unit tests).
model: LiteLLM model string for Level 1 NPC agents.
Defaults to ``OPENRANGE_NPC_MODEL`` env var, then
``azure/gpt-5.2-codex``. Any LiteLLM-supported model works
(e.g. ``openai/gpt-4o``, ``anthropic/claude-haiku-4-5-20251001``,
``ollama/llama3``).
"""
def __init__(self, mock_mode: bool = False, model: str | None = None) -> None:
self._mock_mode = mock_mode
self._model = model # passed to LLMNPCAgent
self._processes: list[asyncio.subprocess.Process] = []
self._tasks: list[asyncio.Task[Any]] = []
self._running = False
self._npc_agents: list[Any] = [] # LLMNPCAgent instances
# Containers where scripts were deployed (for cleanup)
self._script_containers: list[str] = []
self._containers: ContainerSet | None = None
# Multimodal NPC communication channels
self.channels: dict[str, ChatChannel | VoiceChannel | DocumentChannel] = {
"chat": ChatChannel(),
"voice": VoiceChannel(),
"document": DocumentChannel(),
}
# -----------------------------------------------------------------
# Async start / stop (used when an event loop is available)
# -----------------------------------------------------------------
async def start(
self,
snapshot: SnapshotSpec,
containers: ContainerSet | None = None,
) -> None:
"""Start NPC traffic generators.
Level 0: shell scripts (http, ssh, db traffic loops).
Level 1: LLM NPC agents (deferred to npc_agent.py).
In mock mode, only synthetic chat traffic is generated.
"""
if self._running:
await self.stop()
self._running = True
self._containers = containers
npc_cfg = snapshot.npc_traffic
# Re-initialise channels for the new episode
self.channels = {
"chat": ChatChannel(),
"voice": VoiceChannel(),
"document": DocumentChannel(),
}
# Generate Level 0 chat traffic if personas are available
if snapshot.npc_personas and len(snapshot.npc_personas) >= 2:
from open_range.builder.npc.chat_traffic import generate_chat_traffic
chat_ch = self.channels["chat"]
assert isinstance(chat_ch, ChatChannel)
generate_chat_traffic(
personas=snapshot.npc_personas,
channel=chat_ch,
num_messages=10,
)
logger.info(
"Generated %d chat messages for %d personas",
len(chat_ch.get_channel_log()),
len(snapshot.npc_personas),
)
# In mock mode, skip Docker exec and LLM agent loops
if self._mock_mode:
logger.info("NPC manager running in mock mode (no Docker/LLM)")
return
topology = snapshot.topology
# Determine which scripts to run -- derive from topology when
# the snapshot does not specify scripts explicitly.
scripts = npc_cfg.scripts or _derive_scripts_from_topology(topology)
# Resolve environment variables (WEB_HOST, DB_HOST, etc.) from
# the topology instead of hardcoding host names.
env_vars = _resolve_env_vars(topology, npc_cfg.rate_lambda)
for script_name in scripts:
script_path = _SCRIPT_DIR / script_name
if not script_path.exists():
logger.warning("NPC script not found: %s", script_path)
continue
container = _container_for_script(script_name, topology)
logger.info(
"Starting NPC script: %s in container %s (rate=%s)",
script_name, container, npc_cfg.rate_lambda,
)
if containers is not None:
# Run script inside the target container via docker exec
try:
script_content = script_path.read_text()
encoded = base64.b64encode(script_content.encode()).decode()
env_prefix = " ".join(
f"{k}={v}" for k, v in env_vars.items()
)
await containers.exec(
container,
f"echo {encoded} | base64 -d > /tmp/{script_name} "
f"&& chmod +x /tmp/{script_name} "
f"&& {env_prefix} nohup bash /tmp/{script_name} "
f"> /dev/null 2>&1 &",
)
self._script_containers.append(container)
except Exception as exc:
logger.warning(
"Failed to start NPC script %s in container %s: %s",
script_name, container, exc,
)
else:
# Fallback: run on host (original behavior)
try:
proc = await asyncio.create_subprocess_exec(
"bash",
str(script_path),
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
env=env_vars,
)
self._processes.append(proc)
except OSError as exc:
logger.warning("Failed to start NPC script %s: %s", script_name, exc)
# Level 1 LLM NPCs -- start async agent loops if personas are present
if npc_cfg.level >= 1 and snapshot.npc_personas and containers is not None:
from open_range.builder.npc.npc_agent import LLMNPCAgent
for persona in snapshot.npc_personas:
agent = LLMNPCAgent(model=self._model)
task = asyncio.create_task(
agent.run_loop(persona, containers, snapshot),
name=f"npc_{persona.name}",
)
self._tasks.append(task)
self._npc_agents.append(agent)
logger.info("Started LLM NPC agent: %s", persona.name)
async def stop(self) -> None:
"""Stop all NPC traffic generators and agents."""
# Cancel async NPC agent tasks
for task in self._tasks:
task.cancel()
if self._tasks:
await asyncio.gather(*self._tasks, return_exceptions=True)
self._tasks.clear()
self._npc_agents.clear()
# Terminate shell script processes (host-mode fallback)
for proc in self._processes:
try:
proc.terminate()
await asyncio.wait_for(proc.wait(), timeout=5.0)
except (ProcessLookupError, asyncio.TimeoutError):
try:
proc.kill()
except ProcessLookupError:
pass
self._processes.clear()
# Kill background scripts inside containers
if self._containers is not None:
for container in set(self._script_containers):
try:
await self._containers.exec(
container,
"pkill -f 'npc.*traffic' 2>/dev/null || true",
)
except Exception:
pass
self._script_containers.clear()
self._containers = None
# Clear channel state
for ch in self.channels.values():
ch.clear()
self._running = False
logger.info("All NPC traffic stopped.")
# -----------------------------------------------------------------
# Synchronous wrappers (for callers without an event loop)
# -----------------------------------------------------------------
def start_sync(self, snapshot: SnapshotSpec, containers: ContainerSet | None = None) -> None:
"""Synchronous wrapper around :meth:`start`.
Uses the running event loop if available, otherwise creates a new one.
"""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop and loop.is_running():
# We're inside an async context -- schedule and return.
# Since we can't await here, run the coroutine eagerly using
# loop.run_until_complete which won't work if a loop is running.
# Instead, just call the sync-safe parts directly.
self._start_sync_inner(snapshot, containers)
else:
asyncio.run(self.start(snapshot, containers))
def stop_sync(self) -> None:
"""Synchronous wrapper around :meth:`stop`."""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop and loop.is_running():
self._stop_sync_inner()
else:
asyncio.run(self.stop())
def _start_sync_inner(self, snapshot: SnapshotSpec, containers: ContainerSet | None = None) -> None:
"""Synchronous start that avoids asyncio for mock mode and chat traffic."""
if self._running:
self._stop_sync_inner()
self._running = True
self._containers = containers
# Re-initialise channels for the new episode
self.channels = {
"chat": ChatChannel(),
"voice": VoiceChannel(),
"document": DocumentChannel(),
}
# Generate Level 0 chat traffic if personas are available
if snapshot.npc_personas and len(snapshot.npc_personas) >= 2:
from open_range.builder.npc.chat_traffic import generate_chat_traffic
chat_ch = self.channels["chat"]
assert isinstance(chat_ch, ChatChannel)
generate_chat_traffic(
personas=snapshot.npc_personas,
channel=chat_ch,
num_messages=10,
)
logger.info(
"Generated %d chat messages for %d personas",
len(chat_ch.get_channel_log()),
len(snapshot.npc_personas),
)
if self._mock_mode:
logger.info("NPC manager running in mock mode (no Docker/LLM)")
return
# In live mode with an active event loop, schedule async start
# for scripts and LLM agents. This is best-effort -- if it
# fails, the chat traffic is already available.
if containers is not None:
logger.info(
"NPC live scripts deferred (use async start() for full support)"
)
def _stop_sync_inner(self) -> None:
"""Synchronous stop for mock mode (no async cleanup needed)."""
# Cancel any asyncio tasks that may exist
for task in self._tasks:
task.cancel()
self._tasks.clear()
self._npc_agents.clear()
self._processes.clear()
self._script_containers.clear()
self._containers = None
for ch in self.channels.values():
ch.clear()
self._running = False
# -----------------------------------------------------------------
# Traffic log for reward computation
# -----------------------------------------------------------------
def get_traffic_log(self) -> list[dict[str, Any]]:
"""Return all NPC activity for reward computation.
Combines SIEM channel logs with LLM NPC agent action logs.
"""
logs = self.get_siem_log()
# Append LLM NPC agent actions
for agent in self._npc_agents:
try:
logs.extend(agent.get_actions())
except Exception:
pass
logs.sort(key=lambda e: e.get("timestamp", 0))
return logs
@property
def running(self) -> bool:
"""Whether NPC traffic is currently active."""
return self._running
def get_siem_log(self) -> list[dict[str, Any]]:
"""Aggregate activity logs from all channels for SIEM consumption."""
logs: list[dict[str, Any]] = []
chat_ch = self.channels.get("chat")
if isinstance(chat_ch, ChatChannel):
logs.extend(chat_ch.get_channel_log())
voice_ch = self.channels.get("voice")
if isinstance(voice_ch, VoiceChannel):
logs.extend(voice_ch.get_call_log())
doc_ch = self.channels.get("document")
if isinstance(doc_ch, DocumentChannel):
logs.extend(doc_ch.get_document_log())
# Sort by timestamp
logs.sort(key=lambda e: e.get("timestamp", 0))
return logs