"""Vuln mutation logic -- swap vulnerabilities between resets. The Mutator wraps a SnapshotBuilder and adds mutation-specific context: ensuring vuln diversity, targeting weak areas, and feeding back error context from failed validations. Each call to ``mutate()`` produces a snapshot with different vulnerabilities than recent episodes. """ from __future__ import annotations import logging import random import re from copy import deepcopy from pathlib import PurePosixPath from typing import Any from open_range.builder.builder import TemplateOnlyBuilder, render_template_payloads from open_range.builder.manifest_graph import compile_manifest_topology from open_range.builder.mutation_policy import PopulationMutationPolicy from open_range.protocols import ( BuildContext, EvidenceItem, ExploitStep, FlagSpec, GoldenPathStep, LineageMetadata, MutationOp, MutationPlan, SnapshotBuilder, SnapshotSpec, Vulnerability, ) logger = logging.getLogger(__name__) _SUPPORTED_MUTATION_OPS = { "add_service", "add_user", "add_dependency_edge", "add_trust_edge", "seed_vuln", "add_benign_noise", } _GENERIC_SERVICES = { "", "sh", "bash", "ssh", "sshd", "ssh-client", "cron", "nmap", "curl", "hydra", "nikto", "sqlmap", } _LIVE_MUTATION_SUPPORTED_VULNS = {"sqli", "path_traversal"} class Mutator: """Orchestrate vuln mutation across resets. Tracks episode history and feeds it into the Builder's context so that each reset produces a genuinely different challenge. """ def __init__( self, builder: SnapshotBuilder, max_retries: int = 3, policy: PopulationMutationPolicy | None = None, ) -> None: """Initialize the mutator with a builder and retry limit. Args: builder: Any SnapshotBuilder implementation. max_retries: Maximum build attempts (passed through to builder). """ self.builder = builder self.max_retries = max_retries self.policy = policy or PopulationMutationPolicy() self._history: list[str] = [] # recent vuln classes self._attack_surfaces: list[str] = [] # recent injection points self._episode_count: int = 0 async def mutate( self, manifest: dict, context: BuildContext | None = None, error: dict[str, Any] | None = None, parent_snapshot: SnapshotSpec | None = None, parent_snapshot_id: str | None = None, ) -> SnapshotSpec: """Generate a root or child snapshot, avoiding recent vuln classes. Args: manifest: Parsed manifest dict. context: Optional base context (curriculum stats, etc.). error: Error feedback from a failed validation attempt. parent_snapshot: Admitted parent snapshot to mutate forward. parent_snapshot_id: Persisted ID for *parent_snapshot*. Returns: A new SnapshotSpec. Root snapshots are compiled from the manifest; child snapshots are mutated from the parent. """ if context is None: context = BuildContext() # Inject episode history into context context.previous_vuln_classes = list(self._history[-3:]) context.recent_attack_surfaces = list(self._attack_surfaces[-5:]) context.episode_count = self._episode_count logger.debug( "Mutator: preparing mutation for episode %d (avoiding vulns: %s, surfaces: %s)", self._episode_count + 1, context.previous_vuln_classes, context.recent_attack_surfaces, ) if error is not None: logger.warning( "Mutator: retrying with error feedback: %s", list(error.keys()) if isinstance(error, dict) else error, ) # error field may or may not exist on BuildContext try: context.error = error # type: ignore[attr-defined] except (AttributeError, ValueError): pass # protocol version without error field # Build with diversity enforcement -- retry up to 3 times if the # snapshot repeats recent vuln classes or injection points. max_diversity_retries = 3 snapshot: SnapshotSpec | None = None last_reason = "" for attempt in range(1, max_diversity_retries + 1): if parent_snapshot is None: candidate = await self.builder.build(manifest, context) candidate = self._hydrate_root_snapshot(candidate, manifest) else: candidate = self._mutate_parent_snapshot( manifest=manifest, parent_snapshot=parent_snapshot, parent_snapshot_id=parent_snapshot_id, context=context, ) passes, reason = self._check_diversity(candidate, manifest) if passes: snapshot = candidate break last_reason = reason logger.info( "Mutator: diversity check failed on attempt %d/%d: %s", attempt, max_diversity_retries, reason, ) if snapshot is None: # Exhausted retries -- accept last candidate with a warning logger.warning( "Mutator: accepting snapshot after %d diversity retries; last failure: %s", max_diversity_retries, last_reason, ) snapshot = candidate # type: ignore[possibly-undefined] # Update history new_classes = [v.type for v in snapshot.truth_graph.vulns] self._history.extend(new_classes) new_surfaces = [v.injection_point for v in snapshot.truth_graph.vulns] self._attack_surfaces.extend(new_surfaces) self._episode_count += 1 logger.info( "Mutator: episode %d complete, vuln classes: %s, total history: %d entries", self._episode_count, new_classes, len(self._history), ) return snapshot @property def episode_count(self) -> int: """Number of episodes (mutations) completed so far.""" return self._episode_count @property def history(self) -> list[str]: """All vuln classes used so far, in order.""" return list(self._history) def _check_diversity( self, snapshot: SnapshotSpec, manifest: dict[str, Any], ) -> tuple[bool, str]: """Check whether *snapshot* meets vuln diversity constraints. Returns: ``(passes, reason)`` -- *passes* is ``True`` when the snapshot satisfies the diversity rules; *reason* explains why it failed. """ new_classes = [v.type for v in snapshot.truth_graph.vulns] new_surfaces = [v.injection_point for v in snapshot.truth_graph.vulns] recent_classes = set(self._history[-3:]) if self._history else set() recent_surfaces = set(self._attack_surfaces[-5:]) if self._attack_surfaces else set() all_families = {str(v) for v in manifest.get("bug_families", []) if v} # --- vuln class check --- if new_classes and recent_classes: new_class_set = set(new_classes) if new_class_set and new_class_set.issubset(recent_classes): # Only reject if there ARE alternative families we could use alternatives = all_families - recent_classes if alternatives: return ( False, f"All vuln classes {sorted(new_class_set)} repeat recent history " f"{sorted(recent_classes)}; alternatives available: {sorted(alternatives)}", ) # --- injection point check --- if new_surfaces and recent_surfaces: new_surface_set = set(new_surfaces) if new_surface_set and new_surface_set.issubset(recent_surfaces): # Only reject if the manifest has enough families to allow # different surfaces (any alternative family would produce a # different dynamic injection point) alternatives = all_families - set(new_classes) if alternatives: return ( False, f"All injection points {sorted(new_surface_set)} repeat recent surfaces " f"{sorted(recent_surfaces)}; alternatives available: {sorted(alternatives)}", ) return (True, "") def _hydrate_root_snapshot( self, snapshot: SnapshotSpec, manifest: dict[str, Any], ) -> SnapshotSpec: root = snapshot.model_copy(deep=True) root.topology = compile_manifest_topology(manifest, root.topology) root.lineage = LineageMetadata( manifest_id=str(manifest.get("name", "")), generation_depth=0, mutation_summary=["compile_base_snapshot"], ) root.mutation_plan = None normalization = root.topology.get("manifest_normalization", {}) if isinstance(normalization, dict): notes = normalization.get("notes", []) if isinstance(notes, list): for note in notes: logger.info("Mutator: manifest normalization applied: %s", note) return root def _mutate_parent_snapshot( self, *, manifest: dict[str, Any], parent_snapshot: SnapshotSpec, parent_snapshot_id: str | None, context: BuildContext, ) -> SnapshotSpec: rng = random.Random(context.seed if context.seed is not None else self._episode_count + 1) child = parent_snapshot.model_copy(deep=True) child.topology = _ensure_mutable_topology(child.topology, manifest) plan = self._plan_mutations( manifest=manifest, snapshot=child, parent_snapshot_id=parent_snapshot_id, context=context, rng=rng, ) self._validate_plan_legality(manifest, plan) self._apply_plan(child, plan, manifest, context) child.files = render_template_payloads(child, manifest=manifest) lineage = parent_snapshot.lineage.model_copy(deep=True) child.lineage = LineageMetadata( parent_snapshot_id=parent_snapshot_id or parent_snapshot.lineage.snapshot_id or None, root_snapshot_id=lineage.root_snapshot_id or parent_snapshot_id or "", manifest_id=lineage.manifest_id or str(manifest.get("name", "")), generation_depth=lineage.generation_depth + 1, mutation_ids=[op.mutation_id for op in plan.ops], mutation_summary=[_mutation_summary(op) for op in plan.ops], ) child.mutation_plan = plan return child def _plan_mutations( self, *, manifest: dict[str, Any], snapshot: SnapshotSpec, parent_snapshot_id: str | None, context: BuildContext, rng: random.Random, ) -> MutationPlan: ops: list[MutationOp] = [] structural_candidates = [] op = self._candidate_add_service(manifest, snapshot, rng) if op is not None: structural_candidates.append(op) op = self._candidate_add_user(manifest, snapshot, context, rng) if op is not None: structural_candidates.append(op) op = self._candidate_add_dependency_edge(manifest, snapshot, rng) if op is not None: structural_candidates.append(op) op = self._candidate_add_trust_edge(manifest, snapshot, rng) if op is not None: structural_candidates.append(op) security_candidates = [] op = self._candidate_seed_vuln(manifest, snapshot, context, rng) if op is not None: security_candidates.append(op) op = self._candidate_add_benign_noise(snapshot, rng) if op is not None: security_candidates.append(op) ops, policy_score, score_breakdown = self.policy.choose_mutations( structural_candidates=structural_candidates, security_candidates=security_candidates, snapshot=snapshot, context=context, rng=rng, ) if ops: logger.info( "Mutator policy %s chose ops=%s score=%.3f breakdown=%s", self.policy.name, [op.mutation_id for op in ops], policy_score, score_breakdown, ) if not ops: fallback = self._candidate_add_benign_noise(snapshot, rng) if fallback is not None: ops.append(fallback) return MutationPlan( parent_snapshot_id=parent_snapshot_id, ops=ops, predicted_complexity_delta=len(ops), predicted_chain_delta=sum(1 for op in ops if op.op_type == "seed_vuln"), predicted_novelty=round(0.2 * len({op.op_type for op in ops}), 2), policy_name=self.policy.name, policy_score=policy_score, score_breakdown=score_breakdown, ) def _validate_plan_legality( self, manifest: dict[str, Any], plan: MutationPlan, ) -> None: manifest_hosts = _manifest_hosts(manifest) allowed_bug_families = {str(v) for v in manifest.get("bug_families", []) if str(v)} allowed_users = _manifest_users(manifest) allowed_principals = _manifest_principals(manifest) allowed_services = _manifest_services(manifest) allowed_dependency_edges = _manifest_dependency_edges(manifest) allowed_trust_edges = _manifest_trust_edges(manifest) for op in plan.ops: prefix = f"Illegal mutation op {op.mutation_id!r} ({op.op_type})" if op.op_type not in _SUPPORTED_MUTATION_OPS: raise ValueError(f"{prefix}: unsupported op_type") if op.op_type == "add_service": host = op.target_selector.get("host", "") service = str(op.params.get("service", "")).strip() if host not in manifest_hosts: raise ValueError(f"{prefix}: add_service targets unknown host {host!r}") if service and service not in allowed_services.get(host, frozenset()): raise ValueError( f"{prefix}: add_service introduces illegal service {service!r} on {host!r}" ) elif op.op_type == "add_user": username = str(op.params.get("username", "")).strip() if username and username not in allowed_users: raise ValueError( f"{prefix}: add_user introduces unknown manifest user {username!r}" ) elif op.op_type == "add_dependency_edge": source = op.target_selector.get("source", "") target = op.target_selector.get("target", "") if (source, target) not in allowed_dependency_edges: raise ValueError( f"{prefix}: add_dependency_edge introduces illegal edge {source!r}->{target!r}" ) elif op.op_type == "add_trust_edge": source = op.target_selector.get("source", "") target = op.target_selector.get("target", "") edge_type = str(op.params.get("type", "")).strip() if source and source not in allowed_principals: raise ValueError( f"{prefix}: add_trust_edge introduces unknown principal {source!r}" ) if target and target not in allowed_principals: raise ValueError( f"{prefix}: add_trust_edge introduces unknown principal {target!r}" ) if (source, target, edge_type) not in allowed_trust_edges: raise ValueError( f"{prefix}: add_trust_edge introduces illegal edge " f"{source!r}->{target!r} ({edge_type!r})" ) elif op.op_type == "seed_vuln": host = op.target_selector.get("host", "") vuln_type = str(op.params.get("vuln_type", "")).strip() required_services = { str(service).strip() for service in op.params.get("required_services", []) if str(service).strip() } if host not in manifest_hosts: raise ValueError(f"{prefix}: seed_vuln targets unknown host {host!r}") if vuln_type and vuln_type not in allowed_bug_families: raise ValueError( f"{prefix}: seed_vuln uses illegal family {vuln_type!r}" ) if required_services: host_services = allowed_services.get(host, frozenset()) if not required_services.intersection(host_services): raise ValueError( f"{prefix}: seed_vuln host {host!r} incompatible with required " f"services {sorted(required_services)}" ) def _candidate_add_service( self, manifest: dict[str, Any], snapshot: SnapshotSpec, rng: random.Random, ) -> MutationOp | None: topology = snapshot.topology host_catalog = topology.get("host_catalog", {}) host_details = topology.get("host_details", {}) candidates: list[tuple[str, str]] = [] if not isinstance(host_catalog, dict) or not isinstance(host_details, dict): return None for host, raw_catalog in host_catalog.items(): if not isinstance(raw_catalog, dict): continue allowed = raw_catalog.get("services", []) detail = host_details.get(host, {}) current = detail.get("services", []) if isinstance(detail, dict) else [] if not isinstance(allowed, list) or not isinstance(current, list): continue for service in allowed: if service and service not in current: candidates.append((str(host), str(service))) if not candidates: return None host, service = rng.choice(candidates) return MutationOp( mutation_id=f"mut_add_service_{host}_{service}", op_type="add_service", target_selector={"host": host}, params={"service": service}, expected_effects=[f"service {service} added to {host}"], risk_tags=["surface_expansion"], ) def _candidate_add_user( self, manifest: dict[str, Any], snapshot: SnapshotSpec, context: BuildContext, rng: random.Random, ) -> MutationOp | None: existing = _existing_usernames(snapshot) candidates = [ raw for raw in manifest.get("users", []) if isinstance(raw, dict) and raw.get("username") not in existing ] if not candidates: return None user = deepcopy(rng.choice(candidates)) username = str(user.get("username", "")).strip() if not username: return None password = _predictable_password(username, context.seed) return MutationOp( mutation_id=f"mut_add_user_{username}", op_type="add_user", target_selector={"user": username}, params={ "username": username, "password": password, "hosts": deepcopy(user.get("hosts", [])), "groups": [str(user.get("department", "") or "users").lower().replace(" ", "_")], "email": str(user.get("email", "")), "full_name": str(user.get("full_name", "")), "department": str(user.get("department", "")), "role": str(user.get("role", "")), }, expected_effects=[f"user {username} added to snapshot accounts"], risk_tags=["identity_expansion"], ) def _candidate_add_dependency_edge( self, manifest: dict[str, Any], snapshot: SnapshotSpec, rng: random.Random, ) -> MutationOp | None: topology = snapshot.topology current = { (str(edge.get("source", "")), str(edge.get("target", ""))) for edge in topology.get("dependency_edges", []) if isinstance(edge, dict) } candidates: list[tuple[str, str]] = [] for raw in manifest.get("topology", {}).get("hosts", []): if not isinstance(raw, dict): continue source = str(raw.get("name", "")).strip() raw_targets = raw.get("connects_to", []) if not source or not isinstance(raw_targets, list): continue for target_raw in raw_targets: target = str(target_raw).strip() if target and (source, target) not in current: candidates.append((source, target)) if not candidates: return None source, target = rng.choice(candidates) return MutationOp( mutation_id=f"mut_add_dep_{source}_{target}", op_type="add_dependency_edge", target_selector={"source": source, "target": target}, params={}, expected_effects=[f"dependency edge {source}->{target} added"], risk_tags=["topology_expansion"], ) def _candidate_add_trust_edge( self, manifest: dict[str, Any], snapshot: SnapshotSpec, rng: random.Random, ) -> MutationOp | None: topology = snapshot.topology current = { ( str(edge.get("source", "")), str(edge.get("target", "")), str(edge.get("type", "")), ) for edge in topology.get("trust_edges", []) if isinstance(edge, dict) } candidates: list[dict[str, str]] = [] for raw in manifest.get("trust_relationships", []): if not isinstance(raw, dict): continue source = str(raw.get("source") or raw.get("from") or "").strip() target = str(raw.get("target") or raw.get("to") or "").strip() edge_type = str(raw.get("type", "")).strip() if source and target and (source, target, edge_type) not in current: candidates.append( { "source": source, "target": target, "type": edge_type, "context": str(raw.get("context") or raw.get("description") or ""), } ) if not candidates: return None choice = rng.choice(candidates) return MutationOp( mutation_id=f"mut_add_trust_{choice['source']}_{choice['target']}_{choice['type']}", op_type="add_trust_edge", target_selector={"source": choice["source"], "target": choice["target"]}, params={"type": choice["type"], "context": choice["context"]}, expected_effects=[f"trust edge {choice['source']}->{choice['target']} added"], risk_tags=["trust_expansion"], ) def _candidate_seed_vuln( self, manifest: dict[str, Any], snapshot: SnapshotSpec, context: BuildContext, rng: random.Random, ) -> MutationOp | None: allowed = [str(v) for v in manifest.get("bug_families", []) if v] if not allowed: return None existing = {v.type for v in snapshot.truth_graph.vulns} templates = self._compatible_vuln_templates(snapshot, context) if not templates: return None preferred_types = [v for v in context.weak_areas if v in allowed and v not in existing] remaining_types = [v for v in allowed if v not in existing] candidate_types = preferred_types or remaining_types or allowed compatible = [ template for template in templates if str(template.get("type", "")) in candidate_types ] if not compatible: return None template = rng.choice(compatible) vuln_type = str(template.get("type", "")).strip() host = str(template.get("host", "")).strip() service = str(template.get("service", "")).strip() required_services = sorted(self._template_required_services(snapshot, template)) return MutationOp( mutation_id=f"mut_seed_vuln_{vuln_type}_{host}_{len(snapshot.truth_graph.vulns)+1}", op_type="seed_vuln", target_selector={"host": host}, params={ "vuln_type": vuln_type, "service": service, "template_id": str(template.get("id", vuln_type)), "required_services": required_services, }, expected_effects=[f"new {vuln_type} foothold on {host}"], risk_tags=["security_condition"], ) def _candidate_add_benign_noise( self, snapshot: SnapshotSpec, rng: random.Random, ) -> MutationOp | None: locations = [item.location for item in snapshot.evidence_spec if item.location] location = rng.choice(locations) if locations else "siem:background.log" return MutationOp( mutation_id=f"mut_add_noise_{len(snapshot.evidence_spec)+1}", op_type="add_benign_noise", target_selector={"location": location}, params={"location": location}, expected_effects=[f"benign evidence noise added at {location}"], risk_tags=["observability_noise"], ) def _apply_plan( self, snapshot: SnapshotSpec, plan: MutationPlan, manifest: dict[str, Any], context: BuildContext, ) -> None: topology = snapshot.topology host_details = topology.setdefault("host_details", {}) dependency_edges = topology.setdefault("dependency_edges", []) trust_edges = topology.setdefault("trust_edges", []) principal_catalog = topology.setdefault("principal_catalog", {}) users = topology.setdefault("users", []) if not isinstance(host_details, dict): host_details = {} topology["host_details"] = host_details if not isinstance(dependency_edges, list): dependency_edges = [] topology["dependency_edges"] = dependency_edges if not isinstance(trust_edges, list): trust_edges = [] topology["trust_edges"] = trust_edges if not isinstance(principal_catalog, dict): principal_catalog = {} topology["principal_catalog"] = principal_catalog if not isinstance(users, list): users = [] topology["users"] = users for op in plan.ops: if op.op_type not in _SUPPORTED_MUTATION_OPS: raise ValueError(f"Unsupported mutation op {op.op_type!r}") if op.op_type == "add_service": host = op.target_selector["host"] detail = host_details.setdefault(host, {"services": [], "connects_to": []}) services = detail.setdefault("services", []) service = str(op.params.get("service", "")).strip() if service and service not in services: services.append(service) elif op.op_type == "add_user": username = str(op.params.get("username", "")) user_record = { "username": username, "password": str(op.params.get("password", "")), "groups": deepcopy(op.params.get("groups", [])), "hosts": deepcopy(op.params.get("hosts", [])), "email": str(op.params.get("email", "")), "full_name": str(op.params.get("full_name", "")), "department": str(op.params.get("department", "")), "role": str(op.params.get("role", "")), } users.append(user_record) principal_catalog[username] = { "username": username, "kind": "user", "is_login_account": True, "hosts": deepcopy(op.params.get("hosts", [])), "department": str(op.params.get("department", "")), "role": str(op.params.get("role", "")), "email": str(op.params.get("email", "")), "full_name": str(op.params.get("full_name", "")), } elif op.op_type == "add_dependency_edge": dependency_edges.append( { "source": op.target_selector["source"], "target": op.target_selector["target"], } ) elif op.op_type == "add_trust_edge": trust_edges.append( { "source": op.target_selector["source"], "target": op.target_selector["target"], "type": str(op.params.get("type", "")), "context": str(op.params.get("context", "")), } ) elif op.op_type == "seed_vuln": template = self._resolve_vuln_template(op) instantiated = _instantiate_seed_vuln_from_template( template=template, host=op.target_selector["host"], index=len(snapshot.truth_graph.vulns) + 1, step_offset=len(snapshot.golden_path), ) snapshot.truth_graph.vulns.append(instantiated["vuln"]) snapshot.truth_graph.exploit_chain.append(instantiated["exploit_step"]) snapshot.flags.append(instantiated["flag"]) snapshot.golden_path.extend(instantiated["golden_path"]) snapshot.evidence_spec.extend(instantiated["evidence"]) _append_task_path(snapshot, instantiated["flag"], instantiated["milestone"]) op.params.update( { "service": instantiated["vuln"].service, "instantiated_vuln_id": instantiated["vuln"].id, "instantiated_flag_id": instantiated["flag"].id, "instantiated_flag_value": instantiated["flag"].value, "instantiated_flag_host": instantiated["flag"].host, "instantiated_exploit_command": instantiated["exploit_step"].command, } ) elif op.op_type == "add_benign_noise": location = str(op.params.get("location", "siem:background.log")) snapshot.evidence_spec.append( EvidenceItem( type="log_entry", location=location, pattern=( f"Benign background activity {context.episode_count + len(snapshot.evidence_spec)}" ), ) ) snapshot.topology = topology def _compatible_vuln_templates( self, snapshot: SnapshotSpec, context: BuildContext, ) -> list[dict[str, Any]]: templates = [] live_only = "prefer_live_admission_compatible_vulns" in context.narrative_hints for template in self._vuln_pool(): if not isinstance(template, dict): continue vuln_type = str(template.get("type", "")).strip() host = str(template.get("host", "")).strip() if not vuln_type or not host: continue if live_only and vuln_type not in _LIVE_MUTATION_SUPPORTED_VULNS: continue if not self._template_has_task_path(template): continue if host not in _existing_hosts(snapshot): continue required_services = self._template_required_services(snapshot, template) host_services = _host_services(snapshot.topology, host) if required_services and not required_services.intersection(host_services): continue templates.append(template) return templates def _template_required_services( self, snapshot: SnapshotSpec, template: dict[str, Any], ) -> set[str]: template_host = str(template.get("host", "")).strip() host_services = _host_services(snapshot.topology, template_host) if host_services: return host_services service_text = str(template.get("service", "")).strip().lower() if not service_text: return set() parts = { token for token in re.split(r"[^a-z0-9_-]+", service_text) if token and token not in _GENERIC_SERVICES } return parts def _template_has_task_path(self, template: dict[str, Any]) -> bool: raw_steps = template.get("golden_path_steps", []) if not isinstance(raw_steps, list) or not raw_steps: return False flag_value = str(template.get("flag_value", "")).strip() if not flag_value: return False return any( isinstance(step, dict) and flag_value in str(step.get("cmd", "")) and str(step.get("cmd", "")).strip().startswith("submit_flag ") for step in raw_steps ) def _resolve_vuln_template(self, op: MutationOp) -> dict[str, Any]: template_id = str(op.params.get("template_id", "")).strip() vuln_type = str(op.params.get("vuln_type", "")).strip() host = str(op.target_selector.get("host", "")).strip() for template in self._vuln_pool(): if not isinstance(template, dict): continue if template_id and str(template.get("id", "")).strip() == template_id: return template for template in self._vuln_pool(): if not isinstance(template, dict): continue if ( str(template.get("type", "")).strip() == vuln_type and str(template.get("host", "")).strip() == host ): return template raise ValueError( f"No vulnerability template found for mutation op {op.mutation_id!r}" ) def _vuln_pool(self) -> list[dict[str, Any]]: raw_pool = getattr(self.builder, "vuln_pool", None) if isinstance(raw_pool, list) and raw_pool: return raw_pool return TemplateOnlyBuilder().vuln_pool def _manifest_hosts(manifest: dict[str, Any]) -> set[str]: hosts: set[str] = set() for raw in manifest.get("topology", {}).get("hosts", []): if not isinstance(raw, dict): continue name = str(raw.get("name", "")).strip() if name: hosts.add(name) return hosts def _manifest_users(manifest: dict[str, Any]) -> set[str]: users: set[str] = set() for raw in manifest.get("users", []): if not isinstance(raw, dict): continue username = str(raw.get("username", "")).strip() if username: users.add(username) return users def _manifest_principals(manifest: dict[str, Any]) -> set[str]: principals = set(_manifest_users(manifest)) for raw in manifest.get("trust_relationships", []): if not isinstance(raw, dict): continue source = str(raw.get("source") or raw.get("from") or "").strip() target = str(raw.get("target") or raw.get("to") or "").strip() if source: principals.add(source) if target: principals.add(target) return principals def _manifest_services(manifest: dict[str, Any]) -> dict[str, frozenset[str]]: services: dict[str, frozenset[str]] = {} for raw in manifest.get("topology", {}).get("hosts", []): if not isinstance(raw, dict): continue host = str(raw.get("name", "")).strip() if not host: continue raw_services = raw.get("services", []) if not isinstance(raw_services, list): raw_services = [] services[host] = frozenset(str(service).strip() for service in raw_services if str(service).strip()) return services def _manifest_dependency_edges(manifest: dict[str, Any]) -> set[tuple[str, str]]: edges: set[tuple[str, str]] = set() for raw in manifest.get("topology", {}).get("hosts", []): if not isinstance(raw, dict): continue source = str(raw.get("name", "")).strip() raw_targets = raw.get("connects_to", []) if not source or not isinstance(raw_targets, list): continue for raw_target in raw_targets: target = str(raw_target).strip() if target: edges.add((source, target)) return edges def _manifest_trust_edges(manifest: dict[str, Any]) -> set[tuple[str, str, str]]: edges: set[tuple[str, str, str]] = set() for raw in manifest.get("trust_relationships", []): if not isinstance(raw, dict): continue source = str(raw.get("source") or raw.get("from") or "").strip() target = str(raw.get("target") or raw.get("to") or "").strip() edge_type = str(raw.get("type", "")).strip() if source and target: edges.add((source, target, edge_type)) return edges def _ensure_mutable_topology( topology: dict[str, Any], manifest: dict[str, Any], ) -> dict[str, Any]: return compile_manifest_topology(manifest, topology) def _existing_hosts(snapshot: SnapshotSpec) -> set[str]: hosts: set[str] = set() for raw in snapshot.topology.get("hosts", []): if isinstance(raw, dict): name = str(raw.get("name", "")).strip() if name: hosts.add(name) else: name = str(raw).strip() if name: hosts.add(name) return hosts def _existing_usernames(snapshot: SnapshotSpec) -> set[str]: usernames: set[str] = set() for raw in snapshot.topology.get("users", []): if not isinstance(raw, dict): continue username = str(raw.get("username", "")).strip() if username: usernames.add(username) return usernames def _predictable_password(username: str, seed: int | None) -> str: suffix = 2025 if seed is None else 2025 + (seed % 3) base = username.split("@", 1)[0] or "Welcome" return f"{base.capitalize()}!{suffix}" def _mutation_summary(op: MutationOp) -> str: if op.op_type == "add_service": return f"add service {op.params.get('service', '')} to {op.target_selector.get('host', '')}" if op.op_type == "add_user": return f"add user {op.params.get('username', '')}" if op.op_type == "add_dependency_edge": return ( f"add dependency {op.target_selector.get('source', '')}->" f"{op.target_selector.get('target', '')}" ) if op.op_type == "add_trust_edge": return ( f"add trust {op.target_selector.get('source', '')}->" f"{op.target_selector.get('target', '')}" ) if op.op_type == "seed_vuln": return ( f"seed {op.params.get('vuln_type', '')} on " f"{op.target_selector.get('host', '')}" ) if op.op_type == "add_benign_noise": return f"add benign noise at {op.params.get('location', '')}" return op.op_type def _host_services(topology: dict[str, Any], host: str) -> set[str]: services: set[str] = set() host_details = topology.get("host_details", {}) host_catalog = topology.get("host_catalog", {}) for raw_map in (host_details, host_catalog): if not isinstance(raw_map, dict): continue raw_detail = raw_map.get(host, {}) if not isinstance(raw_detail, dict): continue raw_services = raw_detail.get("services", []) if not isinstance(raw_services, list): continue for raw_service in raw_services: service = str(raw_service).strip().lower() if service and service not in _GENERIC_SERVICES: services.add(service) return services def _instantiate_seed_vuln_from_template( *, template: dict[str, Any], host: str, index: int, step_offset: int, ) -> dict[str, Any]: vuln_type = str(template.get("type", "")).strip() template_host = str(template.get("host", host)).strip() or host raw_steps = template.get("golden_path_steps", []) if not isinstance(raw_steps, list) or not raw_steps: raise ValueError(f"Template {template.get('id', vuln_type)!r} has no golden path") flag_id = f"flag{index}" template_flag_value = str(template.get("flag_value", f"FLAG{{{vuln_type}}}")).strip() flag_value = _mutation_flag_value(template_flag_value, vuln_type, index) template_flag_path = str(template.get("flag_path", f"/var/flags/flag{index}.txt")).strip() flag_path = _mutation_flag_path(template_flag_path, index) service = str(template.get("service", "")).strip() vuln_id = f"{vuln_type}_{index}" exploit_command = _template_exploit_command(raw_steps) vuln = Vulnerability( id=vuln_id, type=vuln_type, host=host, service=service, injection_point=_rewrite_template_text( exploit_command, template_host=template_host, target_host=host, template_flag_value=template_flag_value, flag_value=flag_value, template_flag_path=template_flag_path, flag_path=flag_path, ), vulnerable_code=template.get("vulnerable_code", ""), root_cause=str(template.get("root_cause", "")), blast_radius=str(template.get("blast_radius", "")), remediation=str(template.get("remediation", "")), ) exploit_step = ExploitStep( vuln_id=vuln_id, command=vuln.injection_point, description=f"Exploit {vuln_type} on {host}", ) flag = FlagSpec( id=flag_id, value=flag_value, path=flag_path, host=host, ) golden_path: list[GoldenPathStep] = [] for raw_step in raw_steps: if not isinstance(raw_step, dict): continue command = _rewrite_template_text( str(raw_step.get("cmd", "")), template_host=template_host, target_host=host, template_flag_value=template_flag_value, flag_value=flag_value, template_flag_path=template_flag_path, flag_path=flag_path, ) expect = _rewrite_template_text( str(raw_step.get("expect_stdout", "")), template_host=template_host, target_host=host, template_flag_value=template_flag_value, flag_value=flag_value, template_flag_path=template_flag_path, flag_path=flag_path, ) golden_path.append( GoldenPathStep( step=step_offset + len(golden_path) + 1, command=command, expect_in_stdout=expect, host=str(raw_step.get("host", "attacker") or "attacker"), description=str(raw_step.get("description", "")), ) ) evidence = [ EvidenceItem( type="log_entry", location=f"{host}:/var/log/app/access.log", pattern=f"{vuln_type} exploitation on {host}", ), EvidenceItem( type="alert", location="siem:/var/log/siem/consolidated/all.log", pattern=f"{vuln_type} alert for {host}", ), ] return { "vuln": vuln, "exploit_step": exploit_step, "flag": flag, "golden_path": golden_path, "evidence": evidence, "milestone": f"Capture {flag.id} by exploiting {vuln_type} on {host}", } def _append_task_path(snapshot: SnapshotSpec, flag: FlagSpec, milestone: str) -> None: if milestone and milestone not in snapshot.task.milestones: snapshot.task.milestones.append(milestone) condition = {"type": "flag", "value": flag.value} if condition not in snapshot.task.success_conditions: snapshot.task.success_conditions.append(condition) def _mutation_flag_value(template_value: str, vuln_type: str, index: int) -> str: if template_value.startswith("FLAG{") and template_value.endswith("}"): inner = template_value[5:-1] else: inner = f"{vuln_type}_{index}" return f"FLAG{{{inner}_mut{index}}}" def _mutation_flag_path(template_path: str, index: int) -> str: if template_path.startswith("db:"): return template_path path = PurePosixPath(template_path or f"/var/flags/flag{index}.txt") stem = path.stem or f"flag{index}" suffix = path.suffix renamed = path.with_name(f"{stem}_mut{index}{suffix}") return renamed.as_posix() def _template_exploit_command(raw_steps: list[Any]) -> str: non_submit = [ str(raw.get("cmd", "")).strip() for raw in raw_steps if isinstance(raw, dict) and not str(raw.get("cmd", "")).strip().startswith("submit_flag ") ] if non_submit: return non_submit[-1] return "" def _rewrite_template_text( text: str, *, template_host: str, target_host: str, template_flag_value: str, flag_value: str, template_flag_path: str, flag_path: str, ) -> str: updated = text.replace(template_flag_value, flag_value) if template_flag_path and flag_path and template_flag_path != flag_path: updated = updated.replace(template_flag_path, flag_path) updated = updated.replace( PurePosixPath(template_flag_path).name, PurePosixPath(flag_path).name, ) if template_host and target_host and template_host != target_host: replacements = { f"http://{template_host}/": f"http://{target_host}/", f"http://{template_host}": f"http://{target_host}", f"ldap://{template_host}": f"ldap://{target_host}", f"//{template_host}/": f"//{target_host}/", f"-h {template_host} ": f"-h {target_host} ", f"@{template_host} ": f"@{target_host} ", f"@{template_host}'": f"@{target_host}'", f"@{template_host}\"": f"@{target_host}\"", } for old, new in replacements.items(): updated = updated.replace(old, new) return updated