CyberSOC / server /task_generator.py
Ajayyy00
Initial commit: CyberSOC Enterprise Environment Baseline
bb0d7fd
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
Procedural Task Generator for CyberSOCEnv.
Generates 1000+ unique, deterministic attack scenarios from a task_id seed.
Each task_id (e.g. 'gen_0001') always produces the exact same scenario.
Design:
- hash(task_id) -> deterministic seed -> random.Random instance
- Seed drives ALL choices: attack type, hosts, processes, IOCs, alerts
- 12 attack categories, 50+ malware names, 40+ C2 domains
- 3 difficulty tiers based on task number
No actual randomness — reproducible across runs and platforms.
"""
from __future__ import annotations
import hashlib
import random
from typing import Any, Dict, List, Tuple
# =============================================================================
# Template Pools (the "vocabulary" of the generator)
# =============================================================================
# --- Malware process names by category ---
MALWARE_PROCESSES = {
"ransomware": [
"cryptolocker.exe", "wannacry.exe", "blackcat_ransom.exe",
"lockbit3.exe", "revil_encrypt.exe", "hive_locker.exe",
"conti_crypt.exe", "ryuk_payload.exe", "maze_encrypt.exe",
"darkside_enc.exe", "babuk_lock.exe", "avaddon_crypt.exe",
],
"phishing": [
"outlook_macro.exe", "word_dropper.exe", "macro_loader.exe",
"vba_agent.exe", "pdf_exploit.exe", "html_smuggler.exe",
"iso_mounter.exe", "lnk_runner.exe",
],
"credential_theft": [
"mimikatz.exe", "lazagne.exe", "hashdump.exe",
"procdump_lsass.exe", "rubeus.exe", "kerbrute.exe",
"sharphound.exe", "bloodhound_collect.exe",
],
"lateral_movement": [
"svchost_backdoor.exe", "psexec_svc.exe", "wmic_lateral.exe",
"rdp_hijack.exe", "ssh_brute.exe", "evil_winrm.exe",
"dcom_exec.exe", "smb_relay.exe",
],
"c2_communication": [
"svchost_c2.exe", "cobalt_beacon.exe", "sliver_implant.exe",
"meterpreter.exe", "covenant_grunt.exe", "mythic_agent.exe",
"dns_tunnel.exe", "icmp_beacon.exe",
],
"privilege_escalation": [
"exploit_kernel.exe", "potato_exploit.exe", "uac_bypass.exe",
"printspoofer.exe", "juicy_potato.exe", "named_pipe_exploit.exe",
"token_impersonate.exe", "dll_hijack.exe",
],
"data_exfiltration": [
"data_pump.exe", "rclone_sync.exe", "mega_upload.exe",
"ftp_exfil.exe", "dns_exfil.exe", "cloud_sync_mal.exe",
"archive_send.exe", "stealer_agent.exe",
],
"cryptomining": [
"xmrig_miner.exe", "ethminer.exe", "cpuminer.exe",
"nicehash_mal.exe", "coinhive_svc.exe", "monero_mine.exe",
],
"supply_chain": [
"update_agent_mal.exe", "npm_backdoor.exe", "pip_trojan.exe",
"vscode_ext_mal.exe", "docker_implant.exe", "nuget_poison.exe",
],
"insider_threat": [
"usb_copy.exe", "screen_capture.exe", "keylogger_svc.exe",
"email_forward.exe", "cloud_upload.exe", "print_spooler_mal.exe",
],
"webshell": [
"cmd_webshell.php", "asp_backdoor.exe", "jsp_shell.exe",
"python_rshell.exe", "nodejs_shell.exe", "perl_cgi_shell.exe",
],
"botnet": [
"mirai_bot.exe", "emotet_loader.exe", "trickbot_svc.exe",
"qbot_agent.exe", "dridex_dll.exe", "zloader_inject.exe",
],
}
# --- C2 domains ---
C2_DOMAINS = [
"cdn-update.malware-c2.net", "api.darkc2.io", "telemetry-svc.ru",
"secure-update.evil.net", "cdn.payload-delivery.com", "api.shadownet.io",
"sync.cloud-c2.xyz", "update.legit-looking.com", "beacon.covert-ops.net",
"dns.tunnel-relay.org", "img.cdn-malware.com", "static.evil-cdn.net",
"api.stealthc2.io", "ws.encrypted-relay.net", "feed.darkweb-proxy.com",
"auth.phish-server.net", "login.fake-portal.com", "mail.spoof-relay.org",
"git.supply-chain.dev", "npm.compromised-pkg.io", "pypi.trojan-lib.org",
"dl.ransomware-pay.onion", "tor.exit-node-c2.net", "i2p.covert-chan.net",
"iot.botnet-c2.xyz", "cam.mirai-variant.net", "mqtt.iot-exploit.io",
"ftp.exfil-server.ru", "sftp.data-steal.com", "mega.cloud-drop.io",
"gist.code-exfil.dev", "paste.data-dump.xyz", "bin.steganography.net",
"vpn.tunnel-c2.com", "proxy.relay-beacon.org", "socks.covert-proxy.io",
"wpad.evil-config.net", "ntp.time-beacon.com", "ldap.ad-exploit.org",
"kerberos.ticket-steal.net",
]
# --- C2 IPs (RFC 5737 documentation ranges + realistic-looking) ---
C2_IPS = [
"198.51.100.10", "198.51.100.22", "198.51.100.33", "198.51.100.44",
"198.51.100.55", "198.51.100.66", "198.51.100.77", "198.51.100.88",
"198.51.100.99", "198.51.100.110", "198.51.100.121", "198.51.100.132",
"203.0.113.10", "203.0.113.21", "203.0.113.32", "203.0.113.43",
"203.0.113.54", "203.0.113.65", "203.0.113.76", "203.0.113.87",
"203.0.113.98", "203.0.113.109", "203.0.113.120", "203.0.113.131",
"192.0.2.10", "192.0.2.21", "192.0.2.32", "192.0.2.43",
"192.0.2.54", "192.0.2.65", "192.0.2.76", "192.0.2.87",
"100.64.0.10", "100.64.0.22", "100.64.0.33", "100.64.0.44",
]
# --- Subnet definitions (must match build_network() in tasks.py) ---
SUBNETS = {
"corporate": {"prefix": "WS", "count": 150, "criticality": 0.3},
"engineering": {"prefix": "DEV", "count": 100, "criticality": 0.5},
"finance": {"prefix": "FIN", "count": 50, "criticality": 0.8},
"dmz": {"prefix": "DMZ", "count": 30, "criticality": 0.6},
"datacenter": {"prefix": "SRV", "count": 50, "criticality": 0.9},
"executive": {"prefix": "EXEC", "count": 20, "criticality": 1.0},
}
# --- Attack phases in kill-chain order ---
ATTACK_PHASES = [
"initial_access", "execution", "persistence", "privilege_escalation",
"credential_access", "lateral_movement", "command_and_control",
"exfiltration", "impact",
]
# --- Alert description templates ---
ALERT_TEMPLATES = {
"ransomware": [
"EDR detected file encryption activity on {host}. Process '{proc}' is encrypting files in user directories.",
"Anomalous file system activity: {count} files renamed with .{ext} extension in {secs} seconds on {host}.",
"Ransomware signature detected in process '{proc}' on {host}. Volume shadow copies being deleted.",
],
"phishing": [
"User on {host} clicked suspicious link in email. {proc} execution detected downloading payload from {domain}.",
"Macro-enabled document opened on {host}. Outbound connection to {domain} detected.",
"Suspicious email attachment executed on {host}. Process '{proc}' spawned child processes.",
],
"credential_theft": [
"LSASS memory access detected on {host} — possible credential dumping via {proc}.",
"Kerberos ticket request anomaly on {host}. Process '{proc}' attempting ticket manipulation.",
"SAM database access detected on {host}. Credential harvesting tool '{proc}' identified.",
],
"lateral_movement": [
"Suspicious RDP login to {host} from compromised source using admin credentials. Process '{proc}' spawned.",
"SMB lateral movement detected: '{proc}' deployed on {host} via remote service creation.",
"WMI remote execution detected on {host}. Process '{proc}' launched from external host.",
],
"c2_communication": [
"Periodic beaconing detected from {host} to {ip} every {interval} seconds. Encrypted payload exchange observed.",
"DNS tunneling activity from {host}. Suspicious queries to {domain} with encoded payloads.",
"Cobalt Strike beacon profile detected on {host}. Process '{proc}' communicating with {ip}.",
],
"privilege_escalation": [
"Kernel exploit attempt on {host}. Process '{proc}' gained SYSTEM privileges.",
"UAC bypass detected on {host}. Process '{proc}' elevated to admin without user consent.",
"Token impersonation attack on {host}. Process '{proc}' obtained domain admin token.",
],
"data_exfiltration": [
"Large data transfer ({size} GB) to external IP {ip} from {host}. Possible exfiltration of {data_type}.",
"Staging activity detected on {host}. Process '{proc}' archiving sensitive directories for extraction.",
"Cloud storage upload from {host} to unauthorized account. Process '{proc}' transferring {data_type}.",
],
"cryptomining": [
"High CPU usage (98%) on {host}. Process '{proc}' identified as cryptocurrency miner.",
"Mining pool connection from {host} to {ip}:{port}. Process '{proc}' consuming all available cores.",
"Stratum protocol detected on {host}. Unauthorized mining process '{proc}' active.",
],
"supply_chain": [
"Compromised package detected in CI/CD pipeline on {host}. Process '{proc}' executing post-install scripts.",
"Backdoored update agent on {host}. Process '{proc}' downloading payloads from {domain}.",
"Malicious dependency loaded on {host}. Process '{proc}' establishing covert communication channels.",
],
"insider_threat": [
"Unusual data access pattern on {host}. Process '{proc}' accessing files outside user's normal scope.",
"USB mass storage device connected on {host}. Process '{proc}' copying sensitive files to removable media.",
"After-hours bulk file download on {host}. Process '{proc}' archiving {data_type} documents.",
],
"webshell": [
"Web shell detected on {host}. Process '{proc}' executing system commands via HTTP POST requests.",
"Suspicious file upload on {host}. Process '{proc}' created in web-accessible directory with bash capabilities.",
"Remote code execution on {host}. Process '{proc}' spawned from web server with SYSTEM context.",
],
"botnet": [
"Bot agent detected on {host}. Process '{proc}' joining command pool at {ip}.",
"DDoS toolkit loaded on {host}. Process '{proc}' ready to receive attack instructions from {domain}.",
"Worm propagation from {host}. Process '{proc}' scanning network for vulnerable hosts.",
],
}
# --- Severity levels with weights ---
SEVERITIES = ["low", "medium", "high", "critical"]
SEVERITY_WEIGHTS = {"easy": [0.1, 0.4, 0.4, 0.1], "medium": [0.0, 0.2, 0.5, 0.3], "hard": [0.0, 0.1, 0.3, 0.6]}
# --- Data types for exfil descriptions ---
DATA_TYPES = [
"customer PII", "financial records", "employee credentials",
"source code", "trade secrets", "medical records",
"encryption keys", "database backups", "API tokens",
"board meeting minutes", "M&A documents", "patent filings",
]
# --- File extensions for ransomware ---
RANSOM_EXTENSIONS = [
"locked", "encrypted", "crypted", "crypt", "enc", "pay",
"ransom", "darkside", "blackcat", "hive", "lockbit", "ryuk",
]
# =============================================================================
# Deterministic Seed Helper
# =============================================================================
def _seed_from_task_id(task_id: str) -> int:
"""Create a deterministic integer seed from a task_id string."""
h = hashlib.sha256(task_id.encode("utf-8")).hexdigest()
return int(h[:16], 16)
def _make_hash(rng: random.Random) -> str:
"""Generate a fake MD5-like hash deterministically."""
return "".join(rng.choice("0123456789abcdef") for _ in range(32))
# =============================================================================
# Difficulty Classification
# =============================================================================
def _get_difficulty(task_id: str, rng: random.Random) -> str:
"""Determine difficulty from task_id pattern or seed."""
# If task_id has an explicit difficulty prefix, use it
if task_id.startswith("easy_") or task_id.startswith("gen_easy_"):
return "easy"
if task_id.startswith("medium_") or task_id.startswith("gen_medium_"):
return "medium"
if task_id.startswith("hard_") or task_id.startswith("gen_hard_"):
return "hard"
# For gen_NNNN pattern, use number ranges
if task_id.startswith("gen_"):
try:
num = int(task_id.split("_")[1])
if num <= 333:
return "easy"
elif num <= 666:
return "medium"
else:
return "hard"
except (ValueError, IndexError):
pass
# Fallback: use seed-based distribution
return rng.choice(["easy", "medium", "hard"])
# =============================================================================
# Core Generator
# =============================================================================
def _pick_hosts(rng: random.Random, subnet: str, count: int) -> List[str]:
"""Pick `count` unique host names from a subnet."""
info = SUBNETS[subnet]
prefix = info["prefix"]
max_idx = info["count"]
indices = rng.sample(range(1, max_idx + 1), min(count, max_idx))
return [f"{prefix}-{idx:03d}" for idx in indices]
def _pick_subnets(rng: random.Random, count: int) -> List[str]:
"""Pick `count` unique subnet names."""
all_subnets = list(SUBNETS.keys())
return rng.sample(all_subnets, min(count, len(all_subnets)))
def _generate_threat(
rng: random.Random,
threat_id: str,
attack_type: str,
phase: str,
available_subnets: List[str],
used_hosts: set,
) -> Tuple[Dict[str, Any], List[str]]:
"""Generate a single threat in the attack chain.
Returns:
(threat_dict, list_of_compromised_hosts)
"""
# Pick target subnet and hosts
subnet = rng.choice(available_subnets)
num_hosts = rng.randint(1, 3) if attack_type != "ransomware" else rng.randint(1, 2)
hosts = _pick_hosts(rng, subnet, num_hosts + 3) # Pick extra to avoid collisions
hosts = [h for h in hosts if h not in used_hosts][:num_hosts]
if not hosts:
# Fallback: pick from any subnet
fallback_subnet = rng.choice(list(SUBNETS.keys()))
hosts = _pick_hosts(rng, fallback_subnet, num_hosts + 5)
hosts = [h for h in hosts if h not in used_hosts][:max(1, num_hosts)]
# Pick malware process
procs = MALWARE_PROCESSES.get(attack_type, MALWARE_PROCESSES["lateral_movement"])
proc = rng.choice(procs)
# Generate IOCs
num_hashes = rng.randint(1, 2)
hashes = [_make_hash(rng) for _ in range(num_hashes)]
num_ips = rng.randint(0, 2) if attack_type in ("c2_communication", "data_exfiltration", "cryptomining", "botnet") else rng.randint(0, 1)
ips = rng.sample(C2_IPS, min(num_ips, len(C2_IPS))) if num_ips > 0 else []
num_domains = rng.randint(0, 2) if attack_type in ("c2_communication", "phishing", "supply_chain", "botnet") else rng.randint(0, 1)
domains = rng.sample(C2_DOMAINS, min(num_domains, len(C2_DOMAINS))) if num_domains > 0 else []
# C2 servers (subset of IPs for c2/exfil types)
c2_servers = ips[:1] if attack_type in ("c2_communication", "data_exfiltration", "botnet") else []
# Lateral targets (for movement-type threats)
lateral_targets: List[str] = []
if attack_type in ("lateral_movement", "credential_theft", "c2_communication"):
lat_subnet = rng.choice(list(SUBNETS.keys()))
lat_hosts = _pick_hosts(rng, lat_subnet, 2)
lateral_targets = [h for h in lat_hosts if h not in used_hosts and h not in hosts][:rng.randint(0, 2)]
# Exfil targets
exfil_targets: List[str] = []
if attack_type == "data_exfiltration":
exfil_targets = list(hosts)
threat = {
"threat_id": threat_id,
"threat_type": attack_type,
"phase": phase,
"compromised_hosts": hosts,
"malicious_processes": [proc],
"c2_servers": c2_servers,
"iocs": {
"hashes": hashes,
"ips": ips,
"domains": domains,
},
"lateral_targets": lateral_targets,
"exfil_targets": exfil_targets,
}
return threat, hosts
def _generate_alert(
rng: random.Random,
alert_idx: int,
task_prefix: str,
threat: Dict[str, Any],
timestamp_base: int,
) -> Dict[str, Any]:
"""Generate a single SIEM alert for a threat."""
attack_type = threat["threat_type"]
host = rng.choice(threat["compromised_hosts"])
proc = threat["malicious_processes"][0]
# Pick template
templates = ALERT_TEMPLATES.get(attack_type, ALERT_TEMPLATES["lateral_movement"])
template = rng.choice(templates)
# Fill template
description = template.format(
host=host,
proc=proc,
domain=rng.choice(threat["iocs"]["domains"]) if threat["iocs"]["domains"] else "unknown.example.com",
ip=rng.choice(threat["iocs"]["ips"]) if threat["iocs"]["ips"] else "0.0.0.0",
count=rng.randint(50, 500),
ext=rng.choice(RANSOM_EXTENSIONS),
secs=rng.randint(10, 120),
interval=rng.choice([30, 60, 90, 120, 300]),
size=round(rng.uniform(0.5, 15.0), 1),
data_type=rng.choice(DATA_TYPES),
port=rng.choice([3333, 4444, 5555, 8080, 8443, 9090]),
)
# Collect IOC indicators for the alert
ioc_indicators = []
if threat["iocs"]["hashes"]:
ioc_indicators.append(rng.choice(threat["iocs"]["hashes"]))
if threat["iocs"]["ips"]:
ioc_indicators.append(rng.choice(threat["iocs"]["ips"]))
if threat["iocs"]["domains"]:
ioc_indicators.append(rng.choice(threat["iocs"]["domains"]))
# Determine subnet from host prefix
subnet = "corporate"
for sn, info in SUBNETS.items():
if host.startswith(info["prefix"]):
subnet = sn
break
# Severity
severity_weights = SEVERITY_WEIGHTS.get(
"hard" if attack_type in ("data_exfiltration", "ransomware", "privilege_escalation") else "medium",
SEVERITY_WEIGHTS["medium"]
)
severity = rng.choices(SEVERITIES, weights=severity_weights, k=1)[0]
# Timestamp (spread across a few hours)
minutes_offset = timestamp_base + alert_idx * rng.randint(5, 45)
hour = 6 + (minutes_offset // 60)
minute = minutes_offset % 60
timestamp = f"2025-01-15T{hour:02d}:{minute:02d}:00Z"
return {
"alert_id": f"ALERT-{task_prefix}{alert_idx + 1:03d}",
"timestamp": timestamp,
"source_host": host,
"severity": severity,
"threat_type": attack_type,
"description": description,
"ioc_indicators": ioc_indicators,
"subnet": subnet,
"is_acknowledged": False,
}
# =============================================================================
# Main Generator Function
# =============================================================================
def generate_task(task_id: str) -> Dict[str, Any]:
"""Generate a complete, deterministic task definition from a task_id.
The task_id is hashed to create a seed, ensuring the same task_id
always produces the exact same scenario.
Args:
task_id: Any string (e.g. 'gen_0001', 'gen_0500', 'phishing_test')
Returns:
A task_def dict compatible with CyberSOCEnvironment.reset()
"""
seed = _seed_from_task_id(task_id)
rng = random.Random(seed)
# Determine difficulty
difficulty = _get_difficulty(task_id, rng)
# Configure parameters based on difficulty
if difficulty == "easy":
num_threats = 1
max_steps = rng.randint(12, 18)
initial_impact = round(rng.uniform(0.02, 0.08), 2)
impact_per_step = round(rng.uniform(0.01, 0.03), 3)
num_subnets = rng.randint(1, 2)
elif difficulty == "medium":
num_threats = rng.randint(2, 3)
max_steps = rng.randint(20, 28)
initial_impact = round(rng.uniform(0.08, 0.15), 2)
impact_per_step = round(rng.uniform(0.02, 0.04), 3)
num_subnets = rng.randint(2, 4)
else: # hard
num_threats = rng.randint(3, 6)
max_steps = rng.randint(25, 35)
initial_impact = round(rng.uniform(0.15, 0.25), 2)
impact_per_step = round(rng.uniform(0.03, 0.05), 3)
num_subnets = rng.randint(3, 6)
# Pick attack types for this scenario
all_attack_types = list(MALWARE_PROCESSES.keys())
if difficulty == "easy":
# Easy: single focused attack
attack_types = [rng.choice(all_attack_types)]
elif difficulty == "medium":
# Medium: multi-stage, pick a plausible chain
chains = [
["phishing", "credential_theft", "lateral_movement"],
["phishing", "c2_communication", "data_exfiltration"],
["webshell", "privilege_escalation", "lateral_movement"],
["supply_chain", "c2_communication", "credential_theft"],
["botnet", "cryptomining", "lateral_movement"],
["insider_threat", "data_exfiltration"],
]
chain = rng.choice(chains)
attack_types = chain[:num_threats]
else:
# Hard: complex multi-phase APT
chains = [
["phishing", "c2_communication", "privilege_escalation", "data_exfiltration", "ransomware"],
["supply_chain", "c2_communication", "lateral_movement", "credential_theft", "data_exfiltration", "ransomware"],
["webshell", "privilege_escalation", "c2_communication", "lateral_movement", "data_exfiltration"],
["phishing", "credential_theft", "lateral_movement", "cryptomining", "botnet"],
["insider_threat", "privilege_escalation", "data_exfiltration", "c2_communication"],
["botnet", "lateral_movement", "privilege_escalation", "ransomware", "data_exfiltration"],
]
chain = rng.choice(chains)
attack_types = chain[:num_threats]
# Pick subnets involved
involved_subnets = _pick_subnets(rng, num_subnets)
# Generate attack chain
attack_chain: List[Dict[str, Any]] = []
used_hosts: set = set()
task_prefix = task_id.replace("gen_", "G").upper()[:6]
for i, attack_type in enumerate(attack_types):
phase_idx = min(i, len(ATTACK_PHASES) - 1)
# Use realistic phase based on attack type
phase_map = {
"phishing": "initial_access",
"webshell": "initial_access",
"supply_chain": "initial_access",
"credential_theft": "credential_access",
"privilege_escalation": "privilege_escalation",
"lateral_movement": "lateral_movement",
"c2_communication": "command_and_control",
"data_exfiltration": "exfiltration",
"ransomware": "impact",
"cryptomining": "impact",
"insider_threat": "exfiltration",
"botnet": "command_and_control",
}
phase = phase_map.get(attack_type, ATTACK_PHASES[phase_idx])
threat_id = f"T-{task_prefix}-{i + 1:03d}"
threat, new_hosts = _generate_threat(
rng, threat_id, attack_type, phase, involved_subnets, used_hosts
)
attack_chain.append(threat)
used_hosts.update(new_hosts)
# Generate alerts (1-2 per threat)
initial_alerts: List[Dict[str, Any]] = []
timestamp_base = rng.randint(0, 60)
for i, threat in enumerate(attack_chain):
num_alerts = rng.randint(1, 2)
for j in range(num_alerts):
alert = _generate_alert(
rng, len(initial_alerts), task_prefix, threat, timestamp_base
)
initial_alerts.append(alert)
# Generate containment requirements
must_kill = []
must_block_iocs = []
must_forensics = []
must_not_isolate = []
for threat in attack_chain:
for host in threat["compromised_hosts"]:
for proc in threat["malicious_processes"]:
must_kill.append({"hostname": host, "process": proc})
if host not in must_forensics:
must_forensics.append(host)
# Collect all IOCs as required blocks
for h in threat["iocs"]["hashes"]:
if h not in must_block_iocs:
must_block_iocs.append(h)
for ip in threat["iocs"]["ips"]:
if ip not in must_block_iocs:
must_block_iocs.append(ip)
for d in threat["iocs"]["domains"]:
if d not in must_block_iocs:
must_block_iocs.append(d)
# Subnets that should NOT be isolated (business-critical ones not in the attack)
non_involved = [s for s in SUBNETS if s not in involved_subnets]
if difficulty == "easy":
must_not_isolate = non_involved
elif difficulty == "medium":
must_not_isolate = [s for s in non_involved if SUBNETS[s]["criticality"] >= 0.8]
# Build description
type_names = list(set(t["threat_type"] for t in attack_chain))
host_count = len(used_hosts)
desc = (
f"[{difficulty.upper()}] {', '.join(type_names).replace('_', ' ').title()} "
f"across {host_count} host(s) in {', '.join(involved_subnets)}."
)
return {
"description": desc,
"max_steps": max_steps,
"initial_business_impact": initial_impact,
"impact_per_step": impact_per_step,
"attack_chain": attack_chain,
"initial_alerts": initial_alerts,
"optimal_actions": [
"run_forensics", "kill_process", "block_ioc", "submit_containment_plan"
],
"containment_requirements": {
"must_kill": must_kill,
"must_block_iocs": must_block_iocs,
"must_forensics": must_forensics,
"must_not_isolate": must_not_isolate,
},
}
# =============================================================================
# Batch Generation (for openenv.yaml and validation)
# =============================================================================
def list_generated_task_ids(count: int = 1000) -> List[str]:
"""Return the list of generated task IDs."""
return [f"gen_{i:04d}" for i in range(1, count + 1)]
def get_task_summary(task_id: str) -> Dict[str, str]:
"""Get a short summary of a generated task (for openenv.yaml)."""
task_def = generate_task(task_id)
difficulty = _get_difficulty(task_id, random.Random(_seed_from_task_id(task_id)))
return {
"description": task_def["description"],
"max_steps": task_def["max_steps"],
"difficulty": difficulty,
}