"""Telemetry simulation for GPU cluster with NCCL-faithful log formats.""" from __future__ import annotations from enum import Enum from random import Random from typing import Any from app.config import NUM_NODES from simulation.cluster import ClusterStateMachine class NCCLSubsystem(str, Enum): """Supported NCCL INFO subsystems.""" NET_IB = "NET/IB" GRAPH = "GRAPH/Search" WATCHDOG = "Watchdog" INIT = "Init" TRANSPORT = "Transport" SOCKET = "Socket" class NCCLLogEngine: """Deterministic NCCL INFO log generator.""" def __init__(self, seed: int) -> None: """Initialize engine with deterministic randomness.""" self._seed = seed self._random = Random(seed) def reset(self, seed: int) -> None: """Reset engine randomness with a new seed.""" self._seed = seed self._random = Random(seed) def generate( self, cluster: ClusterStateMachine, subsystem: NCCLSubsystem, time_window: int, ) -> list[str]: """Generate subsystem-specific NCCL INFO logs.""" window = max(1, time_window) failure_type = cluster._scenario.failure_type logs: list[str] = [] for offset in range(window): rank_id = offset % NUM_NODES if subsystem == NCCLSubsystem.NET_IB: logs.extend(self._net_ib_logs(cluster, rank_id, failure_type)) elif subsystem == NCCLSubsystem.GRAPH: logs.extend(self._graph_logs(cluster, rank_id, failure_type, offset)) elif subsystem == NCCLSubsystem.WATCHDOG: logs.extend(self._watchdog_logs(cluster, rank_id, failure_type, offset)) elif subsystem == NCCLSubsystem.INIT: logs.extend(self._init_logs(cluster, rank_id, failure_type)) elif subsystem == NCCLSubsystem.TRANSPORT: logs.extend(self._transport_logs(cluster, rank_id, failure_type)) elif subsystem == NCCLSubsystem.SOCKET: logs.extend(self._socket_logs(cluster, rank_id, failure_type)) return logs def _prefix(self, node_id: int, rank_offset: int = 0) -> str: """Build deterministic hostname/pid/tid prefix for a node.""" hostname = f"gpu-node-{node_id:02d}" pid = 1000 + ((self._seed * 131 + node_id * 977) % 9000) tid = pid + rank_offset return f"{hostname}:{pid}:{tid}" def _line( self, node_id: int, subsystem: NCCLSubsystem, message: str, rank_offset: int = 0, ) -> str: """Format one NCCL INFO line in production style.""" prefix = self._prefix(node_id=node_id, rank_offset=rank_offset) return f"{prefix} NCCL INFO {subsystem.value} {message}" def _net_ib_logs( self, cluster: ClusterStateMachine, rank_id: int, failure_type: str ) -> list[str]: """Generate NET/IB logs for throughput and interface health.""" node_id = rank_id % NUM_NODES ratio = cluster.training.throughput_tokens_per_sec / max( 1.0, cluster.training.target_throughput ) current_gbps = 200.0 * ratio logs = [ self._line( node_id, NCCLSubsystem.NET_IB, ": Using [0]mlx5_0:1/IB [1]mlx5_1:1/IB", rank_offset=rank_id, ), ] if failure_type in {"congestion", "cascade"}: logs.append( self._line( node_id, NCCLSubsystem.NET_IB, ( f": Link degraded on mlx5_1:1, effective bandwidth " f"{current_gbps:.1f} GB/s (< 160.0 GB/s target)" ), rank_offset=rank_id, ) ) else: logs.append( self._line( node_id, NCCLSubsystem.NET_IB, f": Link stable, effective bandwidth {current_gbps:.1f} GB/s", rank_offset=rank_id, ) ) return logs def _graph_logs( self, cluster: ClusterStateMachine, rank_id: int, failure_type: str, offset: int, ) -> list[str]: """Generate graph search and topology logs.""" node_id = rank_id % NUM_NODES rings = 4 logs = [ self._line( node_id, NCCLSubsystem.GRAPH, f": {NUM_NODES} nodes, {NUM_NODES * 8} GPUs, {rings} rings", rank_offset=rank_id, ) ] if failure_type in {"congestion", "cascade"}: logs.append( self._line( node_id, NCCLSubsystem.GRAPH, ": ring 2 crosses oversubscribed spine links (rack-locality violated)", rank_offset=rank_id, ) ) if failure_type in {"desync", "cascade"}: base_seq = 1198 + offset drift = (rank_id % 3) - 1 logs.append( self._line( node_id, NCCLSubsystem.GRAPH, ( ": compiled different collectives across ranks " f"(rank={rank_id}, seq_id={base_seq + drift}, expected={base_seq})" ), rank_offset=rank_id, ) ) return logs def _watchdog_logs( self, cluster: ClusterStateMachine, rank_id: int, failure_type: str, offset: int, ) -> list[str]: """Generate watchdog timeout and rank stall diagnostics.""" node_id = rank_id % NUM_NODES failing_rank = cluster._scenario.failing_rank_id seq_id = 1198 + offset logs: list[str] = [] if failure_type in {"oom", "desync", "cascade"}: logs.append( self._line( node_id, NCCLSubsystem.WATCHDOG, ( ": collective operation timeout on rank " f"{failing_rank} (seq_id={seq_id})" ), rank_offset=rank_id, ) ) if failure_type in {"oom", "cascade"}: logs.append( self._line( node_id, NCCLSubsystem.WATCHDOG, f": rank {failing_rank} stalled after CUDA Xid 79 (OOM)", rank_offset=rank_id, ) ) else: logs.append( self._line( node_id, NCCLSubsystem.WATCHDOG, ": no collective timeout detected in current window", rank_offset=rank_id, ) ) return logs def _init_logs( self, cluster: ClusterStateMachine, rank_id: int, failure_type: str ) -> list[str]: """Generate NCCL init and compatibility diagnostics.""" node_id = rank_id % NUM_NODES logs = [ self._line( node_id, NCCLSubsystem.INIT, ": Bootstrap : Using eth0:10.10.0.0<0>", rank_offset=rank_id, ) ] if failure_type == "cascade": logs.append( self._line( node_id, NCCLSubsystem.INIT, ( ": Loaded NCCL 2.21.5 but expected 2.27.0 - " "LD_LIBRARY_PATH version mismatch; message truncated errors may occur" ), rank_offset=rank_id, ) ) return logs def _transport_logs( self, cluster: ClusterStateMachine, rank_id: int, failure_type: str ) -> list[str]: """Generate transport-level completion and error logs.""" node_id = rank_id % NUM_NODES logs: list[str] = [] if failure_type in {"oom", "cascade"}: logs.append( self._line( node_id, NCCLSubsystem.TRANSPORT, "Got completion with error 12 (Remote access error)", rank_offset=rank_id, ) ) logs.append( self._line( node_id, NCCLSubsystem.TRANSPORT, "CUDA driver reported Xid 79 on receive queue", rank_offset=rank_id, ) ) elif failure_type in {"desync", "congestion"}: logs.append( self._line( node_id, NCCLSubsystem.TRANSPORT, "send proxy retries increased due to delayed completions", rank_offset=rank_id, ) ) else: logs.append( self._line( node_id, NCCLSubsystem.TRANSPORT, "all transport channels healthy", rank_offset=rank_id, ) ) return logs def _socket_logs( self, cluster: ClusterStateMachine, rank_id: int, failure_type: str ) -> list[str]: """Generate socket connect and retry logs.""" node_id = rank_id % NUM_NODES failing_rank = cluster._scenario.failing_rank_id if failure_type in {"oom", "desync", "cascade"}: return [ self._line( node_id, NCCLSubsystem.SOCKET, f"socketPollConnect: Connection refused on rank {failing_rank}", rank_offset=rank_id, ) ] return [ self._line( node_id, NCCLSubsystem.SOCKET, "socketPollConnect: connection established for all peers", rank_offset=rank_id, ) ] class TelemetryStream: """Simulates surface telemetry and hidden NCCL subsystem diagnostics.""" def __init__( self, seed: int, red_herring_probability: float = 0.30, telemetry_mask_probability: float = 0.20, ) -> None: """Initialize telemetry stream with deterministic randomness.""" self._seed = seed self._random = Random(seed) self._surface_log_buffer: list[str] = [] self._step = 0 self._nccl_engine = NCCLLogEngine(seed=seed) self._red_herring_probability = red_herring_probability self._telemetry_mask_probability = telemetry_mask_probability def reset(self, seed: int) -> None: """Reset telemetry state.""" self._seed = seed self._random = Random(seed) self._surface_log_buffer = [] self._step = 0 self._nccl_engine.reset(seed=seed) def configure_difficulty( self, red_herring_probability: float, telemetry_mask_probability: float, ) -> None: """Apply adaptive curriculum probabilities.""" self._red_herring_probability = red_herring_probability self._telemetry_mask_probability = telemetry_mask_probability def update(self, cluster: ClusterStateMachine) -> None: """Update surface telemetry based on cluster state.""" self._step += 1 self._surface_log_buffer.clear() throughput = cluster.training.throughput_tokens_per_sec target = cluster.training.target_throughput ratio = throughput / max(1.0, target) jitter_pct = self._random.uniform(0.5, 2.5) self._surface_log_buffer.append( f"cluster_status: training={cluster.training.job_status} step={cluster.training.current_step}" ) self._surface_log_buffer.append( f"throughput: {throughput:.1f}/{target:.1f} tok/s ({ratio * 100:.1f}% of target)" ) degraded_nodes = [node.node_id for node in cluster.nodes if node.health_status != "healthy"] if self._random.random() < self._telemetry_mask_probability: self._surface_log_buffer.append("node_health: all nodes nominal") else: self._surface_log_buffer.append( f"node_health: degraded_nodes={degraded_nodes if degraded_nodes else 'none'}" ) healthy_non_failing_nodes = [ node.node_id for node in cluster.nodes if node.health_status == "healthy" and node.node_id != cluster._scenario.failing_node_id ] if ( healthy_non_failing_nodes and self._random.random() < self._red_herring_probability ): random_healthy_node = self._random.choice(healthy_non_failing_nodes) self._surface_log_buffer.append( "alert: node " f"{random_healthy_node} elevated retransmit rate (may be noise)" ) if cluster.training.job_status in {"stalled", "failed"}: self._surface_log_buffer.append( "alert: distributed collective progress stalled; diagnostics required" ) elif ratio < 0.8: self._surface_log_buffer.append( "alert: interconnect throughput below expected envelope" ) self._surface_log_buffer.append( f"sampling_noise: telemetry jitter {jitter_pct:.2f}%" ) self._surface_log_buffer = self._surface_log_buffer[:7] def visible_logs(self) -> list[str]: """Return visible surface logs for the agent.""" return list(self._surface_log_buffer) def generate_nccl_subsystem_logs( self, cluster: ClusterStateMachine, subsystem: str, time_window: int = 10, ) -> list[str]: """Return deep subsystem logs for targeted diagnostics.""" normalized = subsystem.strip().lower().replace("-", "_").replace("/", "_") subsystem_map: dict[str, NCCLSubsystem] = { "net_ib": NCCLSubsystem.NET_IB, "graph": NCCLSubsystem.GRAPH, "watchdog": NCCLSubsystem.WATCHDOG, "init": NCCLSubsystem.INIT, "transport": NCCLSubsystem.TRANSPORT, "socket": NCCLSubsystem.SOCKET, } selected = subsystem_map.get(normalized) if selected is None: selected = NCCLSubsystem.WATCHDOG return self._nccl_engine.generate( cluster=cluster, subsystem=selected, time_window=time_window, ) def generate_nccl_logs( self, cluster: ClusterStateMachine, time_window: int = 10 ) -> list[str]: """Backward-compatible deep logs helper across major subsystems.""" logs: list[str] = [] for subsystem in ( NCCLSubsystem.WATCHDOG, NCCLSubsystem.GRAPH, NCCLSubsystem.NET_IB, ): logs.extend( self._nccl_engine.generate( cluster=cluster, subsystem=subsystem, time_window=max(1, time_window // 3), ) ) return logs def get_investigation_output( self, cluster: ClusterStateMachine, action_type: str, rank_id: int | None = None, ) -> dict[str, Any]: """Return action-specific deep output with realistic noise.""" if action_type == "inspect_flight_recorder": if rank_id is None: return {"error": "rank_id parameter required"} payload = cluster._generate_flight_recorder_data(int(rank_id)) entries = list(payload.get("entries", [])) wrong_ranks = [ node.node_id for node in cluster.nodes if node.node_id != int(rank_id) ] self._random.shuffle(wrong_ranks) noise_count = self._random.randint(1, 2) for wrong_rank in wrong_ranks[:noise_count]: noise_entry = { "profiling_name": "nccl:all_reduce", "rank": wrong_rank, "collective_seq_id": 2000 + wrong_rank, "p2p_seq_id": 0, "op_id": 2000 + wrong_rank, "state": "completed", "input_sizes": [[1024, 2048]], "output_sizes": [[1024, 2048]], "input_dtypes": ["Float"], "output_dtypes": ["Float"], "timeout_ms": 1800000, "time_created_ns": (self._seed * 1_000_000) + (wrong_rank * 10_000), "time_started_ns": (self._seed * 1_000_000) + (wrong_rank * 10_000) + 100, "time_finished_ns": (self._seed * 1_000_000) + (wrong_rank * 10_000) + 600, "frames": [ { "name": "all_reduce", "filename": "torch/distributed/distributed_c10d.py", "line": 2891, } ], } entries.append(noise_entry) payload["entries"] = entries return {"flight_recorder": payload} if action_type == "query_nccl_logs": logs = self.generate_nccl_subsystem_logs(cluster, "watchdog", time_window=10) wrong_rank_candidates = [ node.node_id for node in cluster.nodes if node.node_id != cluster._scenario.failing_rank_id ] wrong_rank = ( self._random.choice(wrong_rank_candidates) if wrong_rank_candidates else cluster._scenario.failing_rank_id ) misleading = [ ( f"[t-{i}][rank{wrong_rank}] NCCL INFO Watchdog: " "brief timeout observed; auto-recovered in 12ms" ) for i in range(3, 0, -1) ] return {"nccl_logs": [*misleading, *logs]} return {"error": "unsupported investigation action"}