| |
| |
| |
| |
| |
|
|
| """ |
| Procedural Task Generator for CyberSOCEnv. |
| |
| Generates 1000+ unique, deterministic attack scenarios from a task_id seed. |
| Each task_id (e.g. 'gen_0001') always produces the exact same scenario. |
| |
| Design: |
| - hash(task_id) -> deterministic seed -> random.Random instance |
| - Seed drives ALL choices: attack type, hosts, processes, IOCs, alerts |
| - 12 attack categories, 50+ malware names, 40+ C2 domains |
| - 3 difficulty tiers based on task number |
| |
| No actual randomness — reproducible across runs and platforms. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import hashlib |
| import random |
| import itertools |
| from typing import Any, Dict, List, Tuple |
|
|
| |
| |
| |
|
|
| INCOMPATIBLE_THREATS = { |
| ("ransomware", "cryptomining"), |
| ("data_exfiltration", "ransomware"), |
| } |
|
|
| def validate_task_def(task_def: dict) -> List[str]: |
| errors = [] |
| compromised_hosts = set() |
| for threat in task_def.get("attack_chain", []): |
| compromised_hosts.update(threat.get("compromised_hosts", [])) |
|
|
| reqs = task_def.get("containment_requirements", {}) |
| for req in reqs.get("must_kill", []): |
| hostname = req.get("hostname") if isinstance(req, dict) else req.split(":")[0] |
| if hostname not in compromised_hosts: |
| errors.append(f"must_kill references non-compromised host: {hostname}") |
|
|
| for host in reqs.get("must_isolate", []): |
| if host not in compromised_hosts: |
| errors.append(f"must_isolate references non-compromised host: {host}") |
|
|
| return errors |
|
|
|
|
| |
| |
| |
|
|
| |
| MALWARE_PROCESSES = { |
| "ransomware": [ |
| "cryptolocker.exe", "wannacry.exe", "blackcat_ransom.exe", |
| "lockbit3.exe", "revil_encrypt.exe", "hive_locker.exe", |
| "conti_crypt.exe", "ryuk_payload.exe", "maze_encrypt.exe", |
| "darkside_enc.exe", "babuk_lock.exe", "avaddon_crypt.exe", |
| ], |
| "phishing": [ |
| "outlook_macro.exe", "word_dropper.exe", "macro_loader.exe", |
| "vba_agent.exe", "pdf_exploit.exe", "html_smuggler.exe", |
| "iso_mounter.exe", "lnk_runner.exe", |
| ], |
| "credential_theft": [ |
| "mimikatz.exe", "lazagne.exe", "hashdump.exe", |
| "procdump_lsass.exe", "rubeus.exe", "kerbrute.exe", |
| "sharphound.exe", "bloodhound_collect.exe", |
| ], |
| "lateral_movement": [ |
| "svchost_backdoor.exe", "psexec_svc.exe", "wmic_lateral.exe", |
| "rdp_hijack.exe", "ssh_brute.exe", "evil_winrm.exe", |
| "dcom_exec.exe", "smb_relay.exe", |
| ], |
| "c2_communication": [ |
| "svchost_c2.exe", "cobalt_beacon.exe", "sliver_implant.exe", |
| "meterpreter.exe", "covenant_grunt.exe", "mythic_agent.exe", |
| "dns_tunnel.exe", "icmp_beacon.exe", |
| ], |
| "privilege_escalation": [ |
| "exploit_kernel.exe", "potato_exploit.exe", "uac_bypass.exe", |
| "printspoofer.exe", "juicy_potato.exe", "named_pipe_exploit.exe", |
| "token_impersonate.exe", "dll_hijack.exe", |
| ], |
| "data_exfiltration": [ |
| "data_pump.exe", "rclone_sync.exe", "mega_upload.exe", |
| "ftp_exfil.exe", "dns_exfil.exe", "cloud_sync_mal.exe", |
| "archive_send.exe", "stealer_agent.exe", |
| ], |
| "cryptomining": [ |
| "xmrig_miner.exe", "ethminer.exe", "cpuminer.exe", |
| "nicehash_mal.exe", "coinhive_svc.exe", "monero_mine.exe", |
| ], |
| "supply_chain": [ |
| "update_agent_mal.exe", "npm_backdoor.exe", "pip_trojan.exe", |
| "vscode_ext_mal.exe", "docker_implant.exe", "nuget_poison.exe", |
| ], |
| "insider_threat": [ |
| "usb_copy.exe", "screen_capture.exe", "keylogger_svc.exe", |
| "email_forward.exe", "cloud_upload.exe", "print_spooler_mal.exe", |
| ], |
| "webshell": [ |
| "cmd_webshell.php", "asp_backdoor.exe", "jsp_shell.exe", |
| "python_rshell.exe", "nodejs_shell.exe", "perl_cgi_shell.exe", |
| ], |
| "botnet": [ |
| "mirai_bot.exe", "emotet_loader.exe", "trickbot_svc.exe", |
| "qbot_agent.exe", "dridex_dll.exe", "zloader_inject.exe", |
| ], |
| } |
|
|
| |
| C2_DOMAINS = [ |
| "cdn-update.malware-c2.net", "api.darkc2.io", "telemetry-svc.ru", |
| "secure-update.evil.net", "cdn.payload-delivery.com", "api.shadownet.io", |
| "sync.cloud-c2.xyz", "update.legit-looking.com", "beacon.covert-ops.net", |
| "dns.tunnel-relay.org", "img.cdn-malware.com", "static.evil-cdn.net", |
| "api.stealthc2.io", "ws.encrypted-relay.net", "feed.darkweb-proxy.com", |
| "auth.phish-server.net", "login.fake-portal.com", "mail.spoof-relay.org", |
| "git.supply-chain.dev", "npm.compromised-pkg.io", "pypi.trojan-lib.org", |
| "dl.ransomware-pay.onion", "tor.exit-node-c2.net", "i2p.covert-chan.net", |
| "iot.botnet-c2.xyz", "cam.mirai-variant.net", "mqtt.iot-exploit.io", |
| "ftp.exfil-server.ru", "sftp.data-steal.com", "mega.cloud-drop.io", |
| "gist.code-exfil.dev", "paste.data-dump.xyz", "bin.steganography.net", |
| "vpn.tunnel-c2.com", "proxy.relay-beacon.org", "socks.covert-proxy.io", |
| "wpad.evil-config.net", "ntp.time-beacon.com", "ldap.ad-exploit.org", |
| "kerberos.ticket-steal.net", |
| ] |
|
|
| |
| C2_IPS = [ |
| "198.51.100.10", "198.51.100.22", "198.51.100.33", "198.51.100.44", |
| "198.51.100.55", "198.51.100.66", "198.51.100.77", "198.51.100.88", |
| "198.51.100.99", "198.51.100.110", "198.51.100.121", "198.51.100.132", |
| "203.0.113.10", "203.0.113.21", "203.0.113.32", "203.0.113.43", |
| "203.0.113.54", "203.0.113.65", "203.0.113.76", "203.0.113.87", |
| "203.0.113.98", "203.0.113.109", "203.0.113.120", "203.0.113.131", |
| "192.0.2.10", "192.0.2.21", "192.0.2.32", "192.0.2.43", |
| "192.0.2.54", "192.0.2.65", "192.0.2.76", "192.0.2.87", |
| "100.64.0.10", "100.64.0.22", "100.64.0.33", "100.64.0.44", |
| ] |
|
|
| |
| SUBNETS = { |
| "corporate": {"prefix": "WS", "count": 90, "criticality": 0.3}, |
| "engineering": {"prefix": "DEV", "count": 36, "criticality": 0.5}, |
| "finance": {"prefix": "FIN", "count": 14, "criticality": 0.8}, |
| "dmz": {"prefix": "DMZ", "count": 3, "criticality": 0.6}, |
| "datacenter": {"prefix": "SRV", "count": 20, "criticality": 0.9}, |
| "executive": {"prefix": "EXEC", "count": 5, "criticality": 1.0}, |
| } |
|
|
| |
| ATTACK_PHASES = [ |
| "initial_access", "execution", "persistence", "privilege_escalation", |
| "credential_access", "lateral_movement", "command_and_control", |
| "exfiltration", "impact", |
| ] |
|
|
| |
| ALERT_TEMPLATES = { |
| "ransomware": [ |
| "EDR detected file encryption activity on {host}. Process '{proc}' is encrypting files in user directories.", |
| "Anomalous file system activity: {count} files renamed with .{ext} extension in {secs} seconds on {host}.", |
| "Ransomware signature detected in process '{proc}' on {host}. Volume shadow copies being deleted.", |
| ], |
| "phishing": [ |
| "User on {host} clicked suspicious link in email. {proc} execution detected downloading payload from {domain}.", |
| "Macro-enabled document opened on {host}. Outbound connection to {domain} detected.", |
| "Suspicious email attachment executed on {host}. Process '{proc}' spawned child processes.", |
| ], |
| "credential_theft": [ |
| "LSASS memory access detected on {host} — possible credential dumping via {proc}.", |
| "Kerberos ticket request anomaly on {host}. Process '{proc}' attempting ticket manipulation.", |
| "SAM database access detected on {host}. Credential harvesting tool '{proc}' identified.", |
| ], |
| "lateral_movement": [ |
| "Suspicious RDP login to {host} from compromised source using admin credentials. Process '{proc}' spawned.", |
| "SMB lateral movement detected: '{proc}' deployed on {host} via remote service creation.", |
| "WMI remote execution detected on {host}. Process '{proc}' launched from external host.", |
| ], |
| "c2_communication": [ |
| "Periodic beaconing detected from {host} to {ip} every {interval} seconds. Encrypted payload exchange observed.", |
| "DNS tunneling activity from {host}. Suspicious queries to {domain} with encoded payloads.", |
| "Cobalt Strike beacon profile detected on {host}. Process '{proc}' communicating with {ip}.", |
| ], |
| "privilege_escalation": [ |
| "Kernel exploit attempt on {host}. Process '{proc}' gained SYSTEM privileges.", |
| "UAC bypass detected on {host}. Process '{proc}' elevated to admin without user consent.", |
| "Token impersonation attack on {host}. Process '{proc}' obtained domain admin token.", |
| ], |
| "data_exfiltration": [ |
| "Large data transfer ({size} GB) to external IP {ip} from {host}. Possible exfiltration of {data_type}.", |
| "Staging activity detected on {host}. Process '{proc}' archiving sensitive directories for extraction.", |
| "Cloud storage upload from {host} to unauthorized account. Process '{proc}' transferring {data_type}.", |
| ], |
| "cryptomining": [ |
| "High CPU usage (98%) on {host}. Process '{proc}' identified as cryptocurrency miner.", |
| "Mining pool connection from {host} to {ip}:{port}. Process '{proc}' consuming all available cores.", |
| "Stratum protocol detected on {host}. Unauthorized mining process '{proc}' active.", |
| ], |
| "supply_chain": [ |
| "Compromised package detected in CI/CD pipeline on {host}. Process '{proc}' executing post-install scripts.", |
| "Backdoored update agent on {host}. Process '{proc}' downloading payloads from {domain}.", |
| "Malicious dependency loaded on {host}. Process '{proc}' establishing covert communication channels.", |
| ], |
| "insider_threat": [ |
| "Unusual data access pattern on {host}. Process '{proc}' accessing files outside user's normal scope.", |
| "USB mass storage device connected on {host}. Process '{proc}' copying sensitive files to removable media.", |
| "After-hours bulk file download on {host}. Process '{proc}' archiving {data_type} documents.", |
| ], |
| "webshell": [ |
| "Web shell detected on {host}. Process '{proc}' executing system commands via HTTP POST requests.", |
| "Suspicious file upload on {host}. Process '{proc}' created in web-accessible directory with bash capabilities.", |
| "Remote code execution on {host}. Process '{proc}' spawned from web server with SYSTEM context.", |
| ], |
| "botnet": [ |
| "Bot agent detected on {host}. Process '{proc}' joining command pool at {ip}.", |
| "DDoS toolkit loaded on {host}. Process '{proc}' ready to receive attack instructions from {domain}.", |
| "Worm propagation from {host}. Process '{proc}' scanning network for vulnerable hosts.", |
| ], |
| } |
|
|
| |
| SEVERITIES = ["low", "medium", "high", "critical"] |
| SEVERITY_WEIGHTS = {"easy": [0.1, 0.4, 0.4, 0.1], "medium": [0.0, 0.2, 0.5, 0.3], "hard": [0.0, 0.1, 0.3, 0.6]} |
|
|
| |
| DATA_TYPES = [ |
| "customer PII", "financial records", "employee credentials", |
| "source code", "trade secrets", "medical records", |
| "encryption keys", "database backups", "API tokens", |
| "board meeting minutes", "M&A documents", "patent filings", |
| ] |
|
|
| |
| RANSOM_EXTENSIONS = [ |
| "locked", "encrypted", "crypted", "crypt", "enc", "pay", |
| "ransom", "darkside", "blackcat", "hive", "lockbit", "ryuk", |
| ] |
|
|
|
|
| |
| |
| |
|
|
| def _seed_from_task_id(task_id: str) -> int: |
| """Create a deterministic integer seed from a task_id string.""" |
| h = hashlib.sha256(task_id.encode("utf-8")).hexdigest() |
| return int(h[:16], 16) |
|
|
|
|
| def _make_hash(rng: random.Random) -> str: |
| """Generate a fake MD5-like hash deterministically.""" |
| return "".join(rng.choice("0123456789abcdef") for _ in range(32)) |
|
|
|
|
| |
| |
| |
|
|
| def _get_difficulty(task_id: str, rng: random.Random) -> str: |
| """Determine difficulty from task_id pattern or seed.""" |
| |
| if task_id.startswith("easy_") or task_id.startswith("gen_easy_"): |
| return "easy" |
| if task_id.startswith("medium_") or task_id.startswith("gen_medium_"): |
| return "medium" |
| if task_id.startswith("hard_") or task_id.startswith("gen_hard_"): |
| return "hard" |
|
|
| |
| if task_id.startswith("gen_"): |
| try: |
| num = int(task_id.split("_")[1]) |
| if num <= 333: |
| return "easy" |
| elif num <= 666: |
| return "medium" |
| else: |
| return "hard" |
| except (ValueError, IndexError): |
| pass |
|
|
| |
| return rng.choice(["easy", "medium", "hard"]) |
|
|
|
|
| |
| |
| |
|
|
| def _pick_hosts(rng: random.Random, subnet: str, count: int) -> List[str]: |
| """Pick `count` unique host names from a subnet.""" |
| info = SUBNETS[subnet] |
| prefix = info["prefix"] |
| max_idx = info["count"] |
| indices = rng.sample(range(1, max_idx + 1), min(count, max_idx)) |
| return [f"{prefix}-{idx:03d}" for idx in indices] |
|
|
|
|
| def _pick_subnets(rng: random.Random, count: int) -> List[str]: |
| """Pick `count` unique subnet names.""" |
| all_subnets = list(SUBNETS.keys()) |
| return rng.sample(all_subnets, min(count, len(all_subnets))) |
|
|
|
|
| def _generate_threat( |
| rng: random.Random, |
| threat_id: str, |
| attack_type: str, |
| phase: str, |
| available_subnets: List[str], |
| used_hosts: set, |
| ) -> Tuple[Dict[str, Any], List[str]]: |
| """Generate a single threat in the attack chain. |
| |
| Returns: |
| (threat_dict, list_of_compromised_hosts) |
| """ |
| |
| subnet = rng.choice(available_subnets) |
| num_hosts = rng.randint(1, 3) if attack_type != "ransomware" else rng.randint(1, 2) |
|
|
| hosts = _pick_hosts(rng, subnet, num_hosts + 3) |
| hosts = [h for h in hosts if h not in used_hosts][:num_hosts] |
| if not hosts: |
| |
| fallback_subnet = rng.choice(list(SUBNETS.keys())) |
| hosts = _pick_hosts(rng, fallback_subnet, num_hosts + 5) |
| hosts = [h for h in hosts if h not in used_hosts][:max(1, num_hosts)] |
|
|
| |
| procs = MALWARE_PROCESSES.get(attack_type, MALWARE_PROCESSES["lateral_movement"]) |
| proc = rng.choice(procs) |
|
|
| |
| num_hashes = rng.randint(1, 2) |
| hashes = [_make_hash(rng) for _ in range(num_hashes)] |
|
|
| num_ips = rng.randint(0, 2) if attack_type in ("c2_communication", "data_exfiltration", "cryptomining", "botnet") else rng.randint(0, 1) |
| ips = rng.sample(C2_IPS, min(num_ips, len(C2_IPS))) if num_ips > 0 else [] |
|
|
| num_domains = rng.randint(0, 2) if attack_type in ("c2_communication", "phishing", "supply_chain", "botnet") else rng.randint(0, 1) |
| domains = rng.sample(C2_DOMAINS, min(num_domains, len(C2_DOMAINS))) if num_domains > 0 else [] |
|
|
| |
| c2_servers = ips[:1] if attack_type in ("c2_communication", "data_exfiltration", "botnet") else [] |
|
|
| |
| lateral_targets: List[str] = [] |
| if attack_type in ("lateral_movement", "credential_theft", "c2_communication"): |
| lat_subnet = rng.choice(list(SUBNETS.keys())) |
| lat_hosts = _pick_hosts(rng, lat_subnet, 2) |
| lateral_targets = [h for h in lat_hosts if h not in used_hosts and h not in hosts][:rng.randint(0, 2)] |
|
|
| |
| exfil_targets: List[str] = [] |
| if attack_type == "data_exfiltration": |
| exfil_targets = list(hosts) |
|
|
| threat = { |
| "threat_id": threat_id, |
| "threat_type": attack_type, |
| "phase": phase, |
| "compromised_hosts": hosts, |
| "malicious_processes": [proc], |
| "c2_servers": c2_servers, |
| "iocs": { |
| "hashes": hashes, |
| "ips": ips, |
| "domains": domains, |
| }, |
| "lateral_targets": lateral_targets, |
| "exfil_targets": exfil_targets, |
| } |
|
|
| return threat, hosts |
|
|
|
|
| def _generate_alert( |
| rng: random.Random, |
| alert_idx: int, |
| task_prefix: str, |
| threat: Dict[str, Any], |
| timestamp_base: int, |
| ) -> Dict[str, Any]: |
| """Generate a single SIEM alert for a threat.""" |
| attack_type = threat["threat_type"] |
| host = rng.choice(threat["compromised_hosts"]) |
| proc = threat["malicious_processes"][0] |
|
|
| |
| templates = ALERT_TEMPLATES.get(attack_type, ALERT_TEMPLATES["lateral_movement"]) |
| template = rng.choice(templates) |
|
|
| |
| description = template.format( |
| host=host, |
| proc=proc, |
| domain=rng.choice(threat["iocs"]["domains"]) if threat["iocs"]["domains"] else "unknown.example.com", |
| ip=rng.choice(threat["iocs"]["ips"]) if threat["iocs"]["ips"] else "0.0.0.0", |
| count=rng.randint(50, 500), |
| ext=rng.choice(RANSOM_EXTENSIONS), |
| secs=rng.randint(10, 120), |
| interval=rng.choice([30, 60, 90, 120, 300]), |
| size=round(rng.uniform(0.5, 15.0), 1), |
| data_type=rng.choice(DATA_TYPES), |
| port=rng.choice([3333, 4444, 5555, 8080, 8443, 9090]), |
| ) |
|
|
| |
| ioc_indicators = [] |
| if threat["iocs"]["hashes"]: |
| ioc_indicators.append(rng.choice(threat["iocs"]["hashes"])) |
| if threat["iocs"]["ips"]: |
| ioc_indicators.append(rng.choice(threat["iocs"]["ips"])) |
| if threat["iocs"]["domains"]: |
| ioc_indicators.append(rng.choice(threat["iocs"]["domains"])) |
|
|
| |
| subnet = "corporate" |
| for sn, info in SUBNETS.items(): |
| if host.startswith(info["prefix"]): |
| subnet = sn |
| break |
|
|
| |
| severity_weights = SEVERITY_WEIGHTS.get( |
| "hard" if attack_type in ("data_exfiltration", "ransomware", "privilege_escalation") else "medium", |
| SEVERITY_WEIGHTS["medium"] |
| ) |
| severity = rng.choices(SEVERITIES, weights=severity_weights, k=1)[0] |
|
|
| |
| minutes_offset = timestamp_base + alert_idx * rng.randint(5, 45) |
| hour = 6 + (minutes_offset // 60) |
| minute = minutes_offset % 60 |
| timestamp = f"2025-01-15T{hour:02d}:{minute:02d}:00Z" |
|
|
| return { |
| "alert_id": f"ALERT-{task_prefix}{alert_idx + 1:03d}", |
| "timestamp": timestamp, |
| "source_host": host, |
| "severity": severity, |
| "threat_type": attack_type, |
| "description": description, |
| "ioc_indicators": ioc_indicators, |
| "subnet": subnet, |
| "is_acknowledged": False, |
| } |
|
|
|
|
| |
| |
| |
|
|
| def generate_task(task_id: str, eval_mode: bool = False) -> Dict[str, Any]: |
| """Generate a complete, deterministic task definition from a task_id. |
| |
| The task_id is hashed to create a seed, ensuring the same task_id |
| always produces the exact same scenario. |
| |
| Args: |
| task_id: Any string (e.g. 'gen_0001', 'gen_0500', 'phishing_test') |
| |
| Returns: |
| A task_def dict compatible with CyberSOCEnvironment.reset() |
| """ |
| seed_offset = 0 |
| |
| if eval_mode: |
| seed_offset += 10000 |
|
|
| while True: |
| seed = _seed_from_task_id(task_id) + seed_offset |
| rng = random.Random(seed) |
|
|
| |
| difficulty = _get_difficulty(task_id, rng) |
|
|
| |
| if difficulty == "easy": |
| num_threats = 1 |
| max_steps = rng.randint(12, 18) |
| initial_impact = round(rng.uniform(0.02, 0.08), 2) |
| impact_per_step = round(rng.uniform(0.01, 0.03), 3) |
| num_subnets = rng.randint(1, 2) |
| elif difficulty == "medium": |
| num_threats = rng.randint(2, 3) |
| max_steps = rng.randint(20, 28) |
| initial_impact = round(rng.uniform(0.08, 0.15), 2) |
| impact_per_step = round(rng.uniform(0.02, 0.04), 3) |
| num_subnets = rng.randint(2, 4) |
| else: |
| num_threats = rng.randint(3, 6) |
| max_steps = rng.randint(25, 35) |
| initial_impact = round(rng.uniform(0.15, 0.25), 2) |
| impact_per_step = round(rng.uniform(0.03, 0.05), 3) |
| num_subnets = rng.randint(3, 6) |
|
|
| |
| all_attack_types = list(MALWARE_PROCESSES.keys()) |
| if difficulty == "easy": |
| |
| attack_types = [rng.choice(all_attack_types)] |
| elif difficulty == "medium": |
| |
| chains = [ |
| ["phishing", "credential_theft", "lateral_movement"], |
| ["phishing", "c2_communication", "data_exfiltration"], |
| ["webshell", "privilege_escalation", "lateral_movement"], |
| ["supply_chain", "c2_communication", "credential_theft"], |
| ["botnet", "cryptomining", "lateral_movement"], |
| ["insider_threat", "data_exfiltration"], |
| ] |
| chain = rng.choice(chains) |
| attack_types = chain[:num_threats] |
| else: |
| |
| chains = [ |
| ["phishing", "c2_communication", "privilege_escalation", "data_exfiltration", "ransomware"], |
| ["supply_chain", "c2_communication", "lateral_movement", "credential_theft", "data_exfiltration", "ransomware"], |
| ["webshell", "privilege_escalation", "c2_communication", "lateral_movement", "data_exfiltration"], |
| ["phishing", "credential_theft", "lateral_movement", "cryptomining", "botnet"], |
| ["insider_threat", "privilege_escalation", "data_exfiltration", "c2_communication"], |
| ["botnet", "lateral_movement", "privilege_escalation", "ransomware", "data_exfiltration"], |
| ] |
| chain = rng.choice(chains) |
| attack_types = chain[:num_threats] |
|
|
| |
| if any((t1, t2) in INCOMPATIBLE_THREATS or (t2, t1) in INCOMPATIBLE_THREATS |
| for t1, t2 in itertools.combinations(attack_types, 2)): |
| seed_offset += 1 |
| continue |
|
|
| |
| involved_subnets = _pick_subnets(rng, num_subnets) |
|
|
| |
| attack_chain: List[Dict[str, Any]] = [] |
| used_hosts: set = set() |
| task_prefix = task_id.replace("gen_", "G").upper()[:6] |
|
|
| for i, attack_type in enumerate(attack_types): |
| phase_idx = min(i, len(ATTACK_PHASES) - 1) |
| |
| phase_map = { |
| "phishing": "initial_access", |
| "webshell": "initial_access", |
| "supply_chain": "initial_access", |
| "credential_theft": "credential_access", |
| "privilege_escalation": "privilege_escalation", |
| "lateral_movement": "lateral_movement", |
| "c2_communication": "command_and_control", |
| "data_exfiltration": "exfiltration", |
| "ransomware": "impact", |
| "cryptomining": "impact", |
| "insider_threat": "exfiltration", |
| "botnet": "command_and_control", |
| } |
| phase = phase_map.get(attack_type, ATTACK_PHASES[phase_idx]) |
|
|
| threat_id = f"T-{task_prefix}-{i + 1:03d}" |
| threat, new_hosts = _generate_threat( |
| rng, threat_id, attack_type, phase, involved_subnets, used_hosts |
| ) |
| attack_chain.append(threat) |
| used_hosts.update(new_hosts) |
|
|
| |
| initial_alerts: List[Dict[str, Any]] = [] |
| timestamp_base = rng.randint(0, 60) |
| for i, threat in enumerate(attack_chain): |
| num_alerts = rng.randint(1, 2) |
| for j in range(num_alerts): |
| alert = _generate_alert( |
| rng, len(initial_alerts), task_prefix, threat, timestamp_base |
| ) |
| initial_alerts.append(alert) |
|
|
| |
| must_kill = [] |
| must_block_iocs = [] |
| must_forensics = [] |
| must_not_isolate = [] |
|
|
| for threat in attack_chain: |
| for host in threat["compromised_hosts"]: |
| for proc in threat["malicious_processes"]: |
| must_kill.append({"hostname": host, "process": proc}) |
| if host not in must_forensics: |
| must_forensics.append(host) |
|
|
| |
| for h in threat["iocs"]["hashes"]: |
| if h not in must_block_iocs: |
| must_block_iocs.append(h) |
| for ip in threat["iocs"]["ips"]: |
| if ip not in must_block_iocs: |
| must_block_iocs.append(ip) |
| for d in threat["iocs"]["domains"]: |
| if d not in must_block_iocs: |
| must_block_iocs.append(d) |
|
|
| |
| non_involved = [s for s in SUBNETS if s not in involved_subnets] |
| if difficulty == "easy": |
| must_not_isolate = non_involved |
| elif difficulty == "medium": |
| must_not_isolate = [s for s in non_involved if SUBNETS[s]["criticality"] >= 0.8] |
|
|
| |
| type_names = list(set(t["threat_type"] for t in attack_chain)) |
| host_count = len(used_hosts) |
| desc = ( |
| f"[{difficulty.upper()}] {', '.join(type_names).replace('_', ' ').title()} " |
| f"across {host_count} host(s) in {', '.join(involved_subnets)}." |
| ) |
|
|
| task_def = { |
| "description": desc, |
| "max_steps": max_steps, |
| "initial_business_impact": initial_impact, |
| "impact_per_step": impact_per_step, |
| "attack_chain": attack_chain, |
| "initial_alerts": initial_alerts, |
| "optimal_actions": [ |
| "run_forensics", "kill_process", "block_ioc", "submit_containment_plan" |
| ], |
| "containment_requirements": { |
| "must_kill": must_kill, |
| "must_block_iocs": must_block_iocs, |
| "must_forensics": must_forensics, |
| "must_not_isolate": must_not_isolate, |
| }, |
| } |
|
|
| errors = validate_task_def(task_def) |
| if not errors: |
| return task_def |
|
|
| |
| seed_offset += 1 |
|
|
|
|
| |
| |
| |
|
|
| def list_generated_task_ids(count: int = 1000) -> List[str]: |
| """Return the list of generated task IDs.""" |
| return [f"gen_{i:04d}" for i in range(1, count + 1)] |
|
|
|
|
| def get_task_summary(task_id: str) -> Dict[str, str]: |
| """Get a short summary of a generated task (for openenv.yaml).""" |
| task_def = generate_task(task_id) |
| difficulty = _get_difficulty(task_id, random.Random(_seed_from_task_id(task_id))) |
| return { |
| "description": task_def["description"], |
| "max_steps": task_def["max_steps"], |
| "difficulty": difficulty, |
| } |
|
|