"""Level 0 chat traffic generator. Generates realistic internal chat messages between NPC personas, based on their roles, departments, and daily activities. This is the chat equivalent of http_traffic.sh -- deterministic background noise for the Blue team to filter. """ from __future__ import annotations import random import time from typing import Any from open_range.builder.npc.channels import ChatChannel from open_range.protocols import NPCPersona # Pre-defined message templates keyed by topic category. # Each template is a (sender_role_hint, content) tuple. _CHAT_TEMPLATES: list[dict[str, str]] = [ { "topic": "IT", "messages": [ "Has anyone else had trouble connecting to the VPN this morning?", "The printer on 3rd floor is jammed again.", "Can someone approve my access request for the staging server?", "Reminder: system maintenance window is tonight 11pm-2am.", "Is Jira down for anyone else right now?", ], }, { "topic": "work", "messages": [ "Can you review the Q4 report when you get a chance?", "Meeting moved to 2pm, same room.", "The client wants the deliverable by Friday.", "I've updated the spreadsheet with the latest numbers.", "Anyone have the link to the project wiki?", ], }, { "topic": "security", "messages": [ "Got a weird email from 'IT support' asking me to reset my password. Legit?", "Reminder: security awareness training is due by end of month.", "Please don't share credentials over chat.", "New phishing campaign targeting our domain. Be careful with attachments.", "Password rotation is happening next Monday.", ], }, { "topic": "social", "messages": [ "Anyone want to grab lunch at the new place down the street?", "Happy birthday! Cake in the break room at 3.", "Who's coming to the team outing on Saturday?", "The coffee machine is finally fixed!", "Does anyone have a phone charger I can borrow?", ], }, ] def _pick_message(topic: str | None = None) -> str: """Pick a random chat message, optionally from a specific topic.""" if topic: for group in _CHAT_TEMPLATES: if group["topic"] == topic: return random.choice(group["messages"]) # Random topic group = random.choice(_CHAT_TEMPLATES) return random.choice(group["messages"]) def generate_chat_traffic( personas: list[NPCPersona], channel: ChatChannel, num_messages: int = 10, seed: int | None = None, ) -> list[dict[str, Any]]: """Generate a batch of realistic internal chat messages. Args: personas: List of NPC personas to generate chat between. channel: ChatChannel instance to send messages through. num_messages: Number of messages to generate. seed: Optional random seed for reproducibility. Returns: List of message dicts (same as channel log entries). """ if seed is not None: random.seed(seed) if len(personas) < 2: return [] generated: list[dict[str, Any]] = [] for _ in range(num_messages): # Pick sender and recipient (different people) sender_persona = random.choice(personas) recipient_candidates = [p for p in personas if p.name != sender_persona.name] if not recipient_candidates: continue recipient_persona = random.choice(recipient_candidates) # Pick a topic based on sender's role topic = None if "security" in sender_persona.department.lower(): topic = random.choice(["security", "IT", "work"]) elif "IT" in sender_persona.department.lower(): topic = random.choice(["IT", "work"]) # Otherwise random content = _pick_message(topic) msg = channel.send_message( sender=sender_persona.name, recipient=recipient_persona.name, content=content, ) generated.append({ "sender": msg.sender, "recipient": msg.recipient, "content": msg.content, "timestamp": msg.timestamp, }) return generated