open-range / src /open_range /builder /npc /npc_agent.py
Aaron Brown
Remove hardcoded fallbacks, add snapshot-driven service lifecycle
7fedc25
"""LLM-driven NPC agent (Level 1).
Each NPC autonomously lives their workday -- browsing pages, emailing
colleagues, querying records, accessing shares. Available actions are
derived from the SnapshotSpec so they adapt to whatever environment the
Builder LLM generated. NPCs also react to incoming stimuli (phishing,
social engineering) based on their security_awareness profile.
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import random
import re
import shlex
import time
from typing import Any
import litellm
from open_range.protocols import ContainerSet, NPCAction, NPCPersona, SnapshotSpec, Stimulus
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Prompts
# ---------------------------------------------------------------------------
NPC_ROUTINE_PROMPT = """\
You are simulating a corporate employee going about their normal workday. \
You will receive your persona, the available resources in this environment, \
and your recent activity.
Pick the NEXT natural work action. Stay in character for your role.
Return valid JSON:
{
"action": "<browse|send_email|lookup|access_share|login|query_db|idle>",
"target": "<a specific target from the available resources>",
"detail": "<1-sentence description of what you're doing and why>",
"email_body": "<if send_email, the message; otherwise empty>"
}
Guidelines:
- Pick targets from the AVAILABLE RESOURCES provided (real pages, shares, users)
- Vary your actions -- don't repeat the same thing twice in a row
- Be mundane and realistic: checking records, status updates, reviewing docs
- Your role determines what you'd naturally do (clinical staff look up records, \
admin checks billing, IT reviews systems, executives read reports)
"""
NPC_REACT_PROMPT = """\
You are simulating an employee who received an unexpected message. \
Based on your security_awareness and susceptibility, decide how to respond. \
Stay in character. Return valid JSON:
{
"action": "<click_link|open_attachment|reply|share_credentials|ignore|report_to_IT|forward>",
"response_content": "<your reply text if replying, otherwise empty>",
"side_effects": ["<what happens as a result>"]
}
- security_awareness > 0.7: verify sender, check URLs, report suspicious messages
- security_awareness < 0.3: trusting, clicks links, may share credentials if asked
"""
class LLMNPCAgent:
"""Async NPC agent that autonomously lives its workday via LLM."""
def __init__(
self,
model: str | None = None,
temperature: float = 0.3,
) -> None:
self.model = model or os.environ.get(
"OPENRANGE_NPC_MODEL", "azure/gpt-5.2-codex"
)
if "codex" in self.model.lower():
self.temperature: float | None = None
else:
self.temperature = temperature
self._actions: list[dict[str, Any]] = []
def get_actions(self) -> list[dict[str, Any]]:
"""Return all recorded NPC actions for SIEM consumption."""
return list(self._actions)
# ------------------------------------------------------------------
# Reactive: respond to external stimulus
# ------------------------------------------------------------------
async def decide(self, persona: NPCPersona, stimulus: Stimulus) -> NPCAction:
"""Decide how to respond to a stimulus (NPCBehavior protocol)."""
try:
user_payload = (
"Respond as this NPC employee in valid JSON.\n\n"
+ json.dumps({
"persona": persona.model_dump(),
"stimulus": stimulus.model_dump(),
})
)
kwargs: dict[str, Any] = {
"model": self.model,
"messages": [
{"role": "system", "content": NPC_REACT_PROMPT},
{"role": "user", "content": user_payload},
],
"response_format": {"type": "json_object"},
}
if self.temperature is not None:
kwargs["temperature"] = self.temperature
response = await litellm.acompletion(**kwargs)
raw = json.loads(response.choices[0].message.content)
return NPCAction(
action=raw.get("action", "ignore"),
response_content=raw.get("response_content", ""),
side_effects=raw.get("side_effects", []),
)
except Exception as exc:
logger.warning("NPC %s react failed: %s", persona.name, exc)
return NPCAction(action="ignore")
# ------------------------------------------------------------------
# Proactive: what to do next at work (derived from snapshot)
# ------------------------------------------------------------------
async def next_routine_action(
self, persona: NPCPersona, env_context: dict[str, Any],
) -> dict[str, str]:
"""Ask LLM what this NPC would naturally do next.
env_context contains available_pages, available_shares, etc.
derived from the SnapshotSpec so the LLM picks real targets.
"""
recent = [
f"{a.get('action','?')}: {a.get('detail','')}"
for a in self._actions[-5:]
]
try:
user_payload = (
"Pick this employee's next work action in valid JSON.\n\n"
+ json.dumps({
"persona": {
"name": persona.name,
"role": persona.role,
"department": persona.department,
},
"available_resources": env_context,
"recent_actions": recent,
})
)
kwargs: dict[str, Any] = {
"model": self.model,
"messages": [
{"role": "system", "content": NPC_ROUTINE_PROMPT},
{"role": "user", "content": user_payload},
],
"response_format": {"type": "json_object"},
}
if self.temperature is not None:
kwargs["temperature"] = self.temperature
response = await litellm.acompletion(**kwargs)
return json.loads(response.choices[0].message.content)
except Exception as exc:
logger.debug("NPC %s routine LLM failed: %s", persona.name, exc)
return _fallback_action(persona, env_context)
# ------------------------------------------------------------------
# Main loop
# ------------------------------------------------------------------
async def run_loop(
self,
persona: NPCPersona,
containers: ContainerSet,
snapshot: SnapshotSpec,
) -> None:
"""Run the NPC's autonomous workday.
Each cycle:
1. Pick and execute a routine work action
2. Check mailbox for incoming stimuli (phishing)
3. React to any stimuli found
"""
from open_range.builder.npc.actions import NPCActionExecutor
executor = NPCActionExecutor(containers, snapshot)
# Build environment context once from snapshot
env_context = {
"pages": executor._pages,
"shares": executor._shares,
"db_tables": executor._db_tables,
"colleagues": executor._users,
}
email_acct = persona.accounts.get("email", "")
mail_user = (
email_acct.split("@")[0]
if "@" in email_acct
else persona.name.lower().split()[0]
)
# Sanitize mail_user to prevent path traversal / injection
if not re.match(r"^[a-zA-Z0-9._-]+$", mail_user):
mail_user = re.sub(r"[^a-zA-Z0-9._-]", "_", mail_user)
base_interval = persona.routine.get("action_interval_min", 2)
interval_s = base_interval * 60
logger.info(
"NPC %s (%s) starting workday (every %dm, %d pages, %d shares)",
persona.name, persona.role, base_interval,
len(env_context["pages"]), len(env_context["shares"]),
)
while True:
try:
# --- Phase 1: Routine work action ---
routine = await self.next_routine_action(persona, env_context)
log_entry = await executor.execute_routine(
persona,
routine.get("action", "idle"),
routine.get("target", ""),
routine.get("detail", ""),
routine.get("email_body", ""),
)
self._actions.append(log_entry)
logger.debug("NPC %s: %s", persona.name, log_entry.get("detail", ""))
# --- Phase 2: Check mailbox for incoming stimuli ---
# Red may send real phishing emails via SMTP. Check multiple
# mail spool locations for new messages.
try:
safe_mail_user = shlex.quote(mail_user)
mail_host = executor._host_mail
mail_output = await containers.exec(
mail_host,
f"{{ find /var/spool/mail/ /var/mail/ "
f"/home/{safe_mail_user}/Maildir/new/ "
f"-newer /tmp/.npc_check_{safe_mail_user} "
f"-type f 2>/dev/null || true; }} | head -3",
)
await containers.exec(mail_host, f"touch /tmp/.npc_check_{safe_mail_user}")
if mail_output and mail_output.strip():
for email_file in mail_output.strip().split("\n")[:3]:
email_file = email_file.strip()
if not email_file:
continue
content = await containers.exec(
mail_host, f"head -50 {shlex.quote(email_file)} 2>/dev/null || true",
)
if not content or not content.strip():
continue
# Extract sender from email headers
sender = "unknown"
subject = "Incoming message"
for line in content.split("\n")[:20]:
if line.lower().startswith("from:"):
sender = line.split(":", 1)[1].strip()
elif line.lower().startswith("subject:"):
subject = line.split(":", 1)[1].strip()
stimulus = Stimulus(
type="email",
sender=sender,
subject=subject,
content=content[:500],
plausibility=0.7,
)
react = await self.decide(persona, stimulus)
react_log = await executor.execute(persona, react)
react_log["stimulus_type"] = "email"
react_log["reactive"] = True
self._actions.append(react_log)
except Exception as mail_exc:
logger.debug("NPC %s mail check: %s", persona.name, mail_exc)
# --- Sleep with jitter ---
await asyncio.sleep(interval_s * random.uniform(0.7, 1.3))
except asyncio.CancelledError:
logger.info("NPC %s workday ended", persona.name)
break
except Exception as exc:
logger.warning("NPC %s loop error: %s", persona.name, exc)
await asyncio.sleep(30)
# ---------------------------------------------------------------------------
# Fallback routine (no LLM, picks from snapshot-derived resources)
# ---------------------------------------------------------------------------
def _fallback_action(persona: NPCPersona, env: dict[str, Any]) -> dict[str, str]:
"""Pick a routine action without LLM, using available resources."""
pages = env.get("pages", ["/"])
shares = env.get("shares", ["general"])
colleagues = env.get("colleagues", [])
actions = [
{"action": "browse", "target": random.choice(pages) if pages else "/", "detail": "Checking portal"},
{"action": "browse", "target": random.choice(pages) if pages else "/", "detail": "Reviewing page"},
{"action": "idle", "target": "", "detail": "Reading documents at desk"},
]
if shares:
actions.append({"action": "access_share", "target": random.choice(shares), "detail": "Checking files"})
if colleagues:
actions.append({"action": "send_email", "target": random.choice(colleagues), "detail": "Status update", "email_body": "Quick check-in on today's items."})
return random.choice(actions)
# ---------------------------------------------------------------------------
# Simpler behavior classes (Level 0, no LLM)
# ---------------------------------------------------------------------------
class NullNPCBehavior:
"""No-op NPC behavior for Level 0."""
async def decide(self, persona: NPCPersona, stimulus: Stimulus) -> NPCAction:
return NPCAction(action="ignore")
class RuleBasedNPCBehavior:
"""Heuristic NPC decisions based on susceptibility scores."""
async def decide(self, persona: NPCPersona, stimulus: Stimulus) -> NPCAction:
susceptibility = persona.susceptibility.get(
stimulus.type, persona.susceptibility.get("phishing_email", 0.5)
)
score = stimulus.plausibility * susceptibility
if persona.security_awareness > 0.7 and score < 0.8:
return NPCAction(action="report_to_IT", side_effects=["reported suspicious email to IT"])
elif score > 0.6:
return NPCAction(action="click_link", side_effects=["clicked link in email"])
elif score > 0.3:
return NPCAction(action="ignore")
else:
return NPCAction(action="report_to_IT", side_effects=["forwarded to security team"])