File size: 20,162 Bytes
8c486a8
 
 
c30abe9
 
 
a24d0f2
 
 
 
 
8c486a8
 
 
 
 
a24d0f2
8c486a8
 
 
 
c30abe9
8c486a8
 
 
 
 
 
a24d0f2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bafb155
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a24d0f2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f016eb7
a24d0f2
f016eb7
 
 
a24d0f2
 
 
 
 
 
 
 
 
 
f016eb7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a24d0f2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8c486a8
 
f016eb7
 
 
 
 
 
 
 
 
 
8c486a8
f016eb7
a24d0f2
f016eb7
8c486a8
 
 
a24d0f2
 
 
 
 
8c486a8
c30abe9
 
 
 
 
 
 
a24d0f2
 
 
 
8c486a8
 
 
a24d0f2
8c486a8
 
 
 
 
a24d0f2
 
8c486a8
 
 
 
 
a24d0f2
8c486a8
 
c30abe9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a24d0f2
 
 
 
 
 
 
 
 
 
 
 
 
 
8c486a8
 
 
 
 
 
 
a24d0f2
 
 
 
 
8c486a8
a24d0f2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8c486a8
 
a24d0f2
8c486a8
 
 
f016eb7
8c486a8
f016eb7
8c486a8
 
 
a24d0f2
8c486a8
 
 
 
 
 
 
 
 
 
a24d0f2
8c486a8
a24d0f2
8c486a8
 
 
 
 
 
 
 
 
 
 
a24d0f2
 
 
 
 
 
 
 
 
 
 
 
 
c30abe9
 
 
 
8c486a8
 
 
a24d0f2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8c486a8
 
 
 
c30abe9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
"""NPC traffic orchestrator.

Starts Level 0 shell-script traffic generators and (optionally) Level 1
LLM-driven NPC agents for a given snapshot.  Multimodal NPC channels
(chat, voice, document) are initialised at start and their activity logs
are available for SIEM consumption.

In **mock mode** (``mock_mode=True``), no Docker exec or LLM calls are
made.  Only synthetic chat traffic is generated from the
``chat_traffic`` module, so unit tests can exercise the NPC pipeline
without infrastructure.
"""

from __future__ import annotations

import asyncio
import base64
import logging
from pathlib import Path
from typing import Any

from open_range.builder.npc.channels import ChatChannel, DocumentChannel, VoiceChannel
from open_range.protocols import ContainerSet, SnapshotSpec

logger = logging.getLogger(__name__)

_SCRIPT_DIR = Path(__file__).parent

# ---------------------------------------------------------------------------
# Service keyword mappings used to match script prefixes to topology hosts
# and to resolve well-known env-var roles from service lists.
# ---------------------------------------------------------------------------

# Map a script filename keyword to service keywords that indicate a host
# can run that script.  Order matters for priority within each entry.
_SCRIPT_SERVICE_KEYWORDS: dict[str, list[str]] = {
    "http": ["nginx", "apache", "httpd", "web", "php-fpm"],
    "db": ["mysql", "mariadb", "postgres", "postgresql", "mongodb", "redis"],
    "ssh": ["nmap", "hydra", "nikto", "ssh-client", "attacker", "sshd"],
    "smtp": ["postfix", "sendmail", "exim", "dovecot", "mail"],
}

# Map an env-var role (e.g. WEB_HOST) to service keywords that identify the
# host fulfilling that role.
_ROLE_SERVICE_KEYWORDS: dict[str, list[str]] = {
    "WEB_HOST": ["nginx", "apache", "httpd", "web", "php-fpm"],
    "DB_HOST": ["mysql", "mariadb", "postgres", "postgresql", "mongodb"],
    "MAIL_HOST": ["postfix", "sendmail", "dovecot", "mail"],
    "LDAP_HOST": ["openldap", "ldap", "slapd"],
    "SIEM_HOST": ["rsyslog", "elasticsearch", "siem", "splunk"],
}


def _hosts_from_topology(topology: dict[str, Any]) -> list[dict[str, Any]]:
    """Return normalized host dicts for compiled or manifest-style topology.

    ``compile_manifest_topology()`` canonicalizes ``topology["hosts"]`` to a
    list of host names and keeps the richer metadata in ``host_catalog`` /
    ``host_details``. NPC helpers need the richer dict shape, so normalize the
    compiled form back into ``{"name": ..., "services": ...}`` records here.
    """
    raw_hosts = topology.get("hosts") or []
    host_catalog = topology.get("host_catalog")
    if not isinstance(host_catalog, dict):
        host_catalog = {}
    host_details = topology.get("host_details")
    if not isinstance(host_details, dict):
        host_details = {}

    hosts: list[dict[str, Any]] = []
    seen: set[str] = set()

    def _append_host(raw_host: Any) -> None:
        if isinstance(raw_host, dict):
            name = str(raw_host.get("name", "")).strip()
        else:
            name = str(raw_host).strip()
        if not name or name in seen:
            return

        merged: dict[str, Any] = {}
        catalog_detail = host_catalog.get(name)
        if isinstance(catalog_detail, dict):
            merged.update(catalog_detail)
        detailed_detail = host_details.get(name)
        if isinstance(detailed_detail, dict):
            merged.update(detailed_detail)
        if isinstance(raw_host, dict):
            merged.update(raw_host)

        merged["name"] = name
        services = merged.get("services")
        merged["services"] = list(services) if isinstance(services, list) else []
        seen.add(name)
        hosts.append(merged)

    if isinstance(raw_hosts, list):
        for raw_host in raw_hosts:
            _append_host(raw_host)

    for name in host_catalog:
        _append_host(name)

    for name in host_details:
        _append_host(name)

    return hosts


def _host_matches_keywords(host: dict[str, Any], keywords: list[str]) -> bool:
    """Return True if the host's name or any of its services match *keywords*."""
    host_name = (host.get("name") or "").lower()
    services = [s.lower() for s in (host.get("services") or [])]
    for kw in keywords:
        kw_lower = kw.lower()
        if kw_lower in host_name or any(kw_lower in svc for svc in services):
            return True
    return False


def _container_for_script(script_name: str, topology: dict[str, Any]) -> str:
    """Determine which container a script should run inside.

    Matches the script filename against service keywords in the topology
    hosts.  Falls back to the first host if nothing matches.
    """
    hosts = _hosts_from_topology(topology)
    if not hosts:
        return "web"  # legacy fallback when topology is empty

    for prefix, keywords in _SCRIPT_SERVICE_KEYWORDS.items():
        if prefix in script_name.lower():
            for host in hosts:
                if _host_matches_keywords(host, keywords):
                    return host["name"]
            break  # prefix matched but no host found; fall through

    # Default: first host in topology
    return hosts[0].get("name", "web")


def _resolve_env_vars(topology: dict[str, Any], rate_lambda: float) -> dict[str, str]:
    """Build environment variables by resolving roles and credentials from topology.

    Resolves host roles (WEB_HOST, DB_HOST, etc.) and credentials (DB_USER,
    DB_PASS, SSH_USER, SSH_PASS) from the topology so shell scripts don't
    need hardcoded values.
    """
    hosts = _hosts_from_topology(topology)
    env: dict[str, str] = {"RATE_LAMBDA": str(int(rate_lambda))}

    for role, keywords in _ROLE_SERVICE_KEYWORDS.items():
        for host in hosts:
            if _host_matches_keywords(host, keywords):
                env[role] = host["name"]
                break

    # Pass DB and SSH credentials from topology to shell scripts
    users = topology.get("users", [])
    for user in users:
        if not isinstance(user, dict):
            continue
        hosts_list = user.get("hosts", [])
        if "db" in hosts_list and "DB_USER" not in env:
            env["DB_USER"] = user.get("username", "app_user")
            env["DB_PASS"] = user.get("password", "AppUs3r!2024")
        if any(h in hosts_list for h in ("web", "files", "ldap", "siem")):
            role = user.get("role", "")
            if role in ("admin", "sysadmin", "root") and "SSH_USER" not in env:
                env["SSH_USER"] = user.get("username", "admin")
                env["SSH_PASS"] = user.get("password", "Adm1n!2024")

    return env


def _derive_scripts_from_topology(topology: dict[str, Any]) -> list[str]:
    """Derive available NPC scripts from topology services.

    Scans the topology hosts and checks which script prefixes have a
    matching host.  Only returns scripts that actually exist on disk.
    """
    hosts = _hosts_from_topology(topology)
    scripts: list[str] = []

    for prefix, keywords in _SCRIPT_SERVICE_KEYWORDS.items():
        for host in hosts:
            if _host_matches_keywords(host, keywords):
                candidate = f"{prefix}_traffic.sh"
                if (_SCRIPT_DIR / candidate).exists():
                    scripts.append(candidate)
                break  # one match per prefix is enough

    return scripts


class NPCManager:
    """Start and stop NPC background traffic for a snapshot.

    Args:
        mock_mode: When True, skip Docker exec and LLM calls (unit tests).
        model: LiteLLM model string for Level 1 NPC agents.
            Defaults to ``OPENRANGE_NPC_MODEL`` env var, then
            ``azure/gpt-5.2-codex``.  Any LiteLLM-supported model works
            (e.g. ``openai/gpt-4o``, ``anthropic/claude-haiku-4-5-20251001``,
            ``ollama/llama3``).
    """

    def __init__(self, mock_mode: bool = False, model: str | None = None) -> None:
        self._mock_mode = mock_mode
        self._model = model  # passed to LLMNPCAgent
        self._processes: list[asyncio.subprocess.Process] = []
        self._tasks: list[asyncio.Task[Any]] = []
        self._running = False
        self._npc_agents: list[Any] = []  # LLMNPCAgent instances

        # Containers where scripts were deployed (for cleanup)
        self._script_containers: list[str] = []
        self._containers: ContainerSet | None = None

        # Multimodal NPC communication channels
        self.channels: dict[str, ChatChannel | VoiceChannel | DocumentChannel] = {
            "chat": ChatChannel(),
            "voice": VoiceChannel(),
            "document": DocumentChannel(),
        }

    # -----------------------------------------------------------------
    # Async start / stop (used when an event loop is available)
    # -----------------------------------------------------------------

    async def start(
        self,
        snapshot: SnapshotSpec,
        containers: ContainerSet | None = None,
    ) -> None:
        """Start NPC traffic generators.

        Level 0: shell scripts (http, ssh, db traffic loops).
        Level 1: LLM NPC agents (deferred to npc_agent.py).

        In mock mode, only synthetic chat traffic is generated.
        """
        if self._running:
            await self.stop()

        self._running = True
        self._containers = containers
        npc_cfg = snapshot.npc_traffic

        # Re-initialise channels for the new episode
        self.channels = {
            "chat": ChatChannel(),
            "voice": VoiceChannel(),
            "document": DocumentChannel(),
        }

        # Generate Level 0 chat traffic if personas are available
        if snapshot.npc_personas and len(snapshot.npc_personas) >= 2:
            from open_range.builder.npc.chat_traffic import generate_chat_traffic

            chat_ch = self.channels["chat"]
            assert isinstance(chat_ch, ChatChannel)
            generate_chat_traffic(
                personas=snapshot.npc_personas,
                channel=chat_ch,
                num_messages=10,
            )
            logger.info(
                "Generated %d chat messages for %d personas",
                len(chat_ch.get_channel_log()),
                len(snapshot.npc_personas),
            )

        # In mock mode, skip Docker exec and LLM agent loops
        if self._mock_mode:
            logger.info("NPC manager running in mock mode (no Docker/LLM)")
            return

        topology = snapshot.topology

        # Determine which scripts to run -- derive from topology when
        # the snapshot does not specify scripts explicitly.
        scripts = npc_cfg.scripts or _derive_scripts_from_topology(topology)

        # Resolve environment variables (WEB_HOST, DB_HOST, etc.) from
        # the topology instead of hardcoding host names.
        env_vars = _resolve_env_vars(topology, npc_cfg.rate_lambda)

        for script_name in scripts:
            script_path = _SCRIPT_DIR / script_name
            if not script_path.exists():
                logger.warning("NPC script not found: %s", script_path)
                continue

            container = _container_for_script(script_name, topology)
            logger.info(
                "Starting NPC script: %s in container %s (rate=%s)",
                script_name, container, npc_cfg.rate_lambda,
            )

            if containers is not None:
                # Run script inside the target container via docker exec
                try:
                    script_content = script_path.read_text()
                    encoded = base64.b64encode(script_content.encode()).decode()
                    env_prefix = " ".join(
                        f"{k}={v}" for k, v in env_vars.items()
                    )
                    await containers.exec(
                        container,
                        f"echo {encoded} | base64 -d > /tmp/{script_name} "
                        f"&& chmod +x /tmp/{script_name} "
                        f"&& {env_prefix} nohup bash /tmp/{script_name} "
                        f"> /dev/null 2>&1 &",
                    )
                    self._script_containers.append(container)
                except Exception as exc:
                    logger.warning(
                        "Failed to start NPC script %s in container %s: %s",
                        script_name, container, exc,
                    )
            else:
                # Fallback: run on host (original behavior)
                try:
                    proc = await asyncio.create_subprocess_exec(
                        "bash",
                        str(script_path),
                        stdout=asyncio.subprocess.DEVNULL,
                        stderr=asyncio.subprocess.DEVNULL,
                        env=env_vars,
                    )
                    self._processes.append(proc)
                except OSError as exc:
                    logger.warning("Failed to start NPC script %s: %s", script_name, exc)

        # Level 1 LLM NPCs -- start async agent loops if personas are present
        if npc_cfg.level >= 1 and snapshot.npc_personas and containers is not None:
            from open_range.builder.npc.npc_agent import LLMNPCAgent

            for persona in snapshot.npc_personas:
                agent = LLMNPCAgent(model=self._model)
                task = asyncio.create_task(
                    agent.run_loop(persona, containers, snapshot),
                    name=f"npc_{persona.name}",
                )
                self._tasks.append(task)
                self._npc_agents.append(agent)
                logger.info("Started LLM NPC agent: %s", persona.name)

    async def stop(self) -> None:
        """Stop all NPC traffic generators and agents."""
        # Cancel async NPC agent tasks
        for task in self._tasks:
            task.cancel()
        if self._tasks:
            await asyncio.gather(*self._tasks, return_exceptions=True)
        self._tasks.clear()
        self._npc_agents.clear()

        # Terminate shell script processes (host-mode fallback)
        for proc in self._processes:
            try:
                proc.terminate()
                await asyncio.wait_for(proc.wait(), timeout=5.0)
            except (ProcessLookupError, asyncio.TimeoutError):
                try:
                    proc.kill()
                except ProcessLookupError:
                    pass
        self._processes.clear()

        # Kill background scripts inside containers
        if self._containers is not None:
            for container in set(self._script_containers):
                try:
                    await self._containers.exec(
                        container,
                        "pkill -f 'npc.*traffic' 2>/dev/null || true",
                    )
                except Exception:
                    pass
        self._script_containers.clear()
        self._containers = None

        # Clear channel state
        for ch in self.channels.values():
            ch.clear()

        self._running = False
        logger.info("All NPC traffic stopped.")

    # -----------------------------------------------------------------
    # Synchronous wrappers (for callers without an event loop)
    # -----------------------------------------------------------------

    def start_sync(self, snapshot: SnapshotSpec, containers: ContainerSet | None = None) -> None:
        """Synchronous wrapper around :meth:`start`.

        Uses the running event loop if available, otherwise creates a new one.
        """
        try:
            loop = asyncio.get_running_loop()
        except RuntimeError:
            loop = None

        if loop and loop.is_running():
            # We're inside an async context -- schedule and return.
            # Since we can't await here, run the coroutine eagerly using
            # loop.run_until_complete which won't work if a loop is running.
            # Instead, just call the sync-safe parts directly.
            self._start_sync_inner(snapshot, containers)
        else:
            asyncio.run(self.start(snapshot, containers))

    def stop_sync(self) -> None:
        """Synchronous wrapper around :meth:`stop`."""
        try:
            loop = asyncio.get_running_loop()
        except RuntimeError:
            loop = None

        if loop and loop.is_running():
            self._stop_sync_inner()
        else:
            asyncio.run(self.stop())

    def _start_sync_inner(self, snapshot: SnapshotSpec, containers: ContainerSet | None = None) -> None:
        """Synchronous start that avoids asyncio for mock mode and chat traffic."""
        if self._running:
            self._stop_sync_inner()

        self._running = True
        self._containers = containers
        # Re-initialise channels for the new episode
        self.channels = {
            "chat": ChatChannel(),
            "voice": VoiceChannel(),
            "document": DocumentChannel(),
        }

        # Generate Level 0 chat traffic if personas are available
        if snapshot.npc_personas and len(snapshot.npc_personas) >= 2:
            from open_range.builder.npc.chat_traffic import generate_chat_traffic

            chat_ch = self.channels["chat"]
            assert isinstance(chat_ch, ChatChannel)
            generate_chat_traffic(
                personas=snapshot.npc_personas,
                channel=chat_ch,
                num_messages=10,
            )
            logger.info(
                "Generated %d chat messages for %d personas",
                len(chat_ch.get_channel_log()),
                len(snapshot.npc_personas),
            )

        if self._mock_mode:
            logger.info("NPC manager running in mock mode (no Docker/LLM)")
            return

        # In live mode with an active event loop, schedule async start
        # for scripts and LLM agents. This is best-effort -- if it
        # fails, the chat traffic is already available.
        if containers is not None:
            logger.info(
                "NPC live scripts deferred (use async start() for full support)"
            )

    def _stop_sync_inner(self) -> None:
        """Synchronous stop for mock mode (no async cleanup needed)."""
        # Cancel any asyncio tasks that may exist
        for task in self._tasks:
            task.cancel()
        self._tasks.clear()
        self._npc_agents.clear()
        self._processes.clear()
        self._script_containers.clear()
        self._containers = None

        for ch in self.channels.values():
            ch.clear()

        self._running = False

    # -----------------------------------------------------------------
    # Traffic log for reward computation
    # -----------------------------------------------------------------

    def get_traffic_log(self) -> list[dict[str, Any]]:
        """Return all NPC activity for reward computation.

        Combines SIEM channel logs with LLM NPC agent action logs.
        """
        logs = self.get_siem_log()

        # Append LLM NPC agent actions
        for agent in self._npc_agents:
            try:
                logs.extend(agent.get_actions())
            except Exception:
                pass

        logs.sort(key=lambda e: e.get("timestamp", 0))
        return logs

    @property
    def running(self) -> bool:
        """Whether NPC traffic is currently active."""
        return self._running

    def get_siem_log(self) -> list[dict[str, Any]]:
        """Aggregate activity logs from all channels for SIEM consumption."""
        logs: list[dict[str, Any]] = []
        chat_ch = self.channels.get("chat")
        if isinstance(chat_ch, ChatChannel):
            logs.extend(chat_ch.get_channel_log())
        voice_ch = self.channels.get("voice")
        if isinstance(voice_ch, VoiceChannel):
            logs.extend(voice_ch.get_call_log())
        doc_ch = self.channels.get("document")
        if isinstance(doc_ch, DocumentChannel):
            logs.extend(doc_ch.get_document_log())
        # Sort by timestamp
        logs.sort(key=lambda e: e.get("timestamp", 0))
        return logs