Spaces:
Sleeping
Sleeping
| """Telemetry simulation for GPU cluster with NCCL-faithful log formats.""" | |
| from __future__ import annotations | |
| from enum import Enum | |
| from random import Random | |
| from typing import Any | |
| from app.config import NUM_NODES | |
| from simulation.cluster import ClusterStateMachine | |
| class NCCLSubsystem(str, Enum): | |
| """Supported NCCL INFO subsystems.""" | |
| NET_IB = "NET/IB" | |
| GRAPH = "GRAPH/Search" | |
| WATCHDOG = "Watchdog" | |
| INIT = "Init" | |
| TRANSPORT = "Transport" | |
| SOCKET = "Socket" | |
| class NCCLLogEngine: | |
| """Deterministic NCCL INFO log generator.""" | |
| def __init__(self, seed: int) -> None: | |
| """Initialize engine with deterministic randomness.""" | |
| self._seed = seed | |
| self._random = Random(seed) | |
| def reset(self, seed: int) -> None: | |
| """Reset engine randomness with a new seed.""" | |
| self._seed = seed | |
| self._random = Random(seed) | |
| def generate( | |
| self, | |
| cluster: ClusterStateMachine, | |
| subsystem: NCCLSubsystem, | |
| time_window: int, | |
| ) -> list[str]: | |
| """Generate subsystem-specific NCCL INFO logs.""" | |
| window = max(1, time_window) | |
| failure_type = cluster._scenario.failure_type | |
| logs: list[str] = [] | |
| for offset in range(window): | |
| rank_id = offset % NUM_NODES | |
| if subsystem == NCCLSubsystem.NET_IB: | |
| logs.extend(self._net_ib_logs(cluster, rank_id, failure_type)) | |
| elif subsystem == NCCLSubsystem.GRAPH: | |
| logs.extend(self._graph_logs(cluster, rank_id, failure_type, offset)) | |
| elif subsystem == NCCLSubsystem.WATCHDOG: | |
| logs.extend(self._watchdog_logs(cluster, rank_id, failure_type, offset)) | |
| elif subsystem == NCCLSubsystem.INIT: | |
| logs.extend(self._init_logs(cluster, rank_id, failure_type)) | |
| elif subsystem == NCCLSubsystem.TRANSPORT: | |
| logs.extend(self._transport_logs(cluster, rank_id, failure_type)) | |
| elif subsystem == NCCLSubsystem.SOCKET: | |
| logs.extend(self._socket_logs(cluster, rank_id, failure_type)) | |
| return logs | |
| def _prefix(self, node_id: int, rank_offset: int = 0) -> str: | |
| """Build deterministic hostname/pid/tid prefix for a node.""" | |
| hostname = f"gpu-node-{node_id:02d}" | |
| pid = 1000 + ((self._seed * 131 + node_id * 977) % 9000) | |
| tid = pid + rank_offset | |
| return f"{hostname}:{pid}:{tid}" | |
| def _line( | |
| self, | |
| node_id: int, | |
| subsystem: NCCLSubsystem, | |
| message: str, | |
| rank_offset: int = 0, | |
| ) -> str: | |
| """Format one NCCL INFO line in production style.""" | |
| prefix = self._prefix(node_id=node_id, rank_offset=rank_offset) | |
| return f"{prefix} NCCL INFO {subsystem.value} {message}" | |
| def _net_ib_logs( | |
| self, cluster: ClusterStateMachine, rank_id: int, failure_type: str | |
| ) -> list[str]: | |
| """Generate NET/IB logs for throughput and interface health.""" | |
| node_id = rank_id % NUM_NODES | |
| ratio = cluster.training.throughput_tokens_per_sec / max( | |
| 1.0, cluster.training.target_throughput | |
| ) | |
| current_gbps = 200.0 * ratio | |
| logs = [ | |
| self._line( | |
| node_id, | |
| NCCLSubsystem.NET_IB, | |
| ": Using [0]mlx5_0:1/IB [1]mlx5_1:1/IB", | |
| rank_offset=rank_id, | |
| ), | |
| ] | |
| if failure_type in {"congestion", "cascade"}: | |
| logs.append( | |
| self._line( | |
| node_id, | |
| NCCLSubsystem.NET_IB, | |
| ( | |
| f": Link degraded on mlx5_1:1, effective bandwidth " | |
| f"{current_gbps:.1f} GB/s (< 160.0 GB/s target)" | |
| ), | |
| rank_offset=rank_id, | |
| ) | |
| ) | |
| else: | |
| logs.append( | |
| self._line( | |
| node_id, | |
| NCCLSubsystem.NET_IB, | |
| f": Link stable, effective bandwidth {current_gbps:.1f} GB/s", | |
| rank_offset=rank_id, | |
| ) | |
| ) | |
| return logs | |
| def _graph_logs( | |
| self, | |
| cluster: ClusterStateMachine, | |
| rank_id: int, | |
| failure_type: str, | |
| offset: int, | |
| ) -> list[str]: | |
| """Generate graph search and topology logs.""" | |
| node_id = rank_id % NUM_NODES | |
| rings = 4 | |
| logs = [ | |
| self._line( | |
| node_id, | |
| NCCLSubsystem.GRAPH, | |
| f": {NUM_NODES} nodes, {NUM_NODES * 8} GPUs, {rings} rings", | |
| rank_offset=rank_id, | |
| ) | |
| ] | |
| if failure_type in {"congestion", "cascade"}: | |
| logs.append( | |
| self._line( | |
| node_id, | |
| NCCLSubsystem.GRAPH, | |
| ": ring 2 crosses oversubscribed spine links (rack-locality violated)", | |
| rank_offset=rank_id, | |
| ) | |
| ) | |
| if failure_type in {"desync", "cascade"}: | |
| base_seq = 1198 + offset | |
| drift = (rank_id % 3) - 1 | |
| logs.append( | |
| self._line( | |
| node_id, | |
| NCCLSubsystem.GRAPH, | |
| ( | |
| ": compiled different collectives across ranks " | |
| f"(rank={rank_id}, seq_id={base_seq + drift}, expected={base_seq})" | |
| ), | |
| rank_offset=rank_id, | |
| ) | |
| ) | |
| return logs | |
| def _watchdog_logs( | |
| self, | |
| cluster: ClusterStateMachine, | |
| rank_id: int, | |
| failure_type: str, | |
| offset: int, | |
| ) -> list[str]: | |
| """Generate watchdog timeout and rank stall diagnostics.""" | |
| node_id = rank_id % NUM_NODES | |
| failing_rank = cluster._scenario.failing_rank_id | |
| seq_id = 1198 + offset | |
| logs: list[str] = [] | |
| if failure_type in {"oom", "desync", "cascade"}: | |
| logs.append( | |
| self._line( | |
| node_id, | |
| NCCLSubsystem.WATCHDOG, | |
| ( | |
| ": collective operation timeout on rank " | |
| f"{failing_rank} (seq_id={seq_id})" | |
| ), | |
| rank_offset=rank_id, | |
| ) | |
| ) | |
| if failure_type in {"oom", "cascade"}: | |
| logs.append( | |
| self._line( | |
| node_id, | |
| NCCLSubsystem.WATCHDOG, | |
| f": rank {failing_rank} stalled after CUDA Xid 79 (OOM)", | |
| rank_offset=rank_id, | |
| ) | |
| ) | |
| else: | |
| logs.append( | |
| self._line( | |
| node_id, | |
| NCCLSubsystem.WATCHDOG, | |
| ": no collective timeout detected in current window", | |
| rank_offset=rank_id, | |
| ) | |
| ) | |
| return logs | |
| def _init_logs( | |
| self, cluster: ClusterStateMachine, rank_id: int, failure_type: str | |
| ) -> list[str]: | |
| """Generate NCCL init and compatibility diagnostics.""" | |
| node_id = rank_id % NUM_NODES | |
| logs = [ | |
| self._line( | |
| node_id, | |
| NCCLSubsystem.INIT, | |
| ": Bootstrap : Using eth0:10.10.0.0<0>", | |
| rank_offset=rank_id, | |
| ) | |
| ] | |
| if failure_type == "cascade": | |
| logs.append( | |
| self._line( | |
| node_id, | |
| NCCLSubsystem.INIT, | |
| ( | |
| ": Loaded NCCL 2.21.5 but expected 2.27.0 - " | |
| "LD_LIBRARY_PATH version mismatch; message truncated errors may occur" | |
| ), | |
| rank_offset=rank_id, | |
| ) | |
| ) | |
| return logs | |
| def _transport_logs( | |
| self, cluster: ClusterStateMachine, rank_id: int, failure_type: str | |
| ) -> list[str]: | |
| """Generate transport-level completion and error logs.""" | |
| node_id = rank_id % NUM_NODES | |
| logs: list[str] = [] | |
| if failure_type in {"oom", "cascade"}: | |
| logs.append( | |
| self._line( | |
| node_id, | |
| NCCLSubsystem.TRANSPORT, | |
| "Got completion with error 12 (Remote access error)", | |
| rank_offset=rank_id, | |
| ) | |
| ) | |
| logs.append( | |
| self._line( | |
| node_id, | |
| NCCLSubsystem.TRANSPORT, | |
| "CUDA driver reported Xid 79 on receive queue", | |
| rank_offset=rank_id, | |
| ) | |
| ) | |
| elif failure_type in {"desync", "congestion"}: | |
| logs.append( | |
| self._line( | |
| node_id, | |
| NCCLSubsystem.TRANSPORT, | |
| "send proxy retries increased due to delayed completions", | |
| rank_offset=rank_id, | |
| ) | |
| ) | |
| else: | |
| logs.append( | |
| self._line( | |
| node_id, | |
| NCCLSubsystem.TRANSPORT, | |
| "all transport channels healthy", | |
| rank_offset=rank_id, | |
| ) | |
| ) | |
| return logs | |
| def _socket_logs( | |
| self, cluster: ClusterStateMachine, rank_id: int, failure_type: str | |
| ) -> list[str]: | |
| """Generate socket connect and retry logs.""" | |
| node_id = rank_id % NUM_NODES | |
| failing_rank = cluster._scenario.failing_rank_id | |
| if failure_type in {"oom", "desync", "cascade"}: | |
| return [ | |
| self._line( | |
| node_id, | |
| NCCLSubsystem.SOCKET, | |
| f"socketPollConnect: Connection refused on rank {failing_rank}", | |
| rank_offset=rank_id, | |
| ) | |
| ] | |
| return [ | |
| self._line( | |
| node_id, | |
| NCCLSubsystem.SOCKET, | |
| "socketPollConnect: connection established for all peers", | |
| rank_offset=rank_id, | |
| ) | |
| ] | |
| class TelemetryStream: | |
| """Simulates surface telemetry and hidden NCCL subsystem diagnostics.""" | |
| def __init__( | |
| self, | |
| seed: int, | |
| red_herring_probability: float = 0.30, | |
| telemetry_mask_probability: float = 0.20, | |
| ) -> None: | |
| """Initialize telemetry stream with deterministic randomness.""" | |
| self._seed = seed | |
| self._random = Random(seed) | |
| self._surface_log_buffer: list[str] = [] | |
| self._step = 0 | |
| self._nccl_engine = NCCLLogEngine(seed=seed) | |
| self._red_herring_probability = red_herring_probability | |
| self._telemetry_mask_probability = telemetry_mask_probability | |
| def reset(self, seed: int) -> None: | |
| """Reset telemetry state.""" | |
| self._seed = seed | |
| self._random = Random(seed) | |
| self._surface_log_buffer = [] | |
| self._step = 0 | |
| self._nccl_engine.reset(seed=seed) | |
| def configure_difficulty( | |
| self, | |
| red_herring_probability: float, | |
| telemetry_mask_probability: float, | |
| ) -> None: | |
| """Apply adaptive curriculum probabilities.""" | |
| self._red_herring_probability = red_herring_probability | |
| self._telemetry_mask_probability = telemetry_mask_probability | |
| def update(self, cluster: ClusterStateMachine) -> None: | |
| """Update surface telemetry based on cluster state.""" | |
| self._step += 1 | |
| self._surface_log_buffer.clear() | |
| throughput = cluster.training.throughput_tokens_per_sec | |
| target = cluster.training.target_throughput | |
| ratio = throughput / max(1.0, target) | |
| jitter_pct = self._random.uniform(0.5, 2.5) | |
| self._surface_log_buffer.append( | |
| f"cluster_status: training={cluster.training.job_status} step={cluster.training.current_step}" | |
| ) | |
| self._surface_log_buffer.append( | |
| f"throughput: {throughput:.1f}/{target:.1f} tok/s ({ratio * 100:.1f}% of target)" | |
| ) | |
| degraded_nodes = [node.node_id for node in cluster.nodes if node.health_status != "healthy"] | |
| if self._random.random() < self._telemetry_mask_probability: | |
| self._surface_log_buffer.append("node_health: all nodes nominal") | |
| else: | |
| self._surface_log_buffer.append( | |
| f"node_health: degraded_nodes={degraded_nodes if degraded_nodes else 'none'}" | |
| ) | |
| healthy_non_failing_nodes = [ | |
| node.node_id | |
| for node in cluster.nodes | |
| if node.health_status == "healthy" and node.node_id != cluster._scenario.failing_node_id | |
| ] | |
| if ( | |
| healthy_non_failing_nodes | |
| and self._random.random() < self._red_herring_probability | |
| ): | |
| random_healthy_node = self._random.choice(healthy_non_failing_nodes) | |
| self._surface_log_buffer.append( | |
| "alert: node " | |
| f"{random_healthy_node} elevated retransmit rate (may be noise)" | |
| ) | |
| if cluster.training.job_status in {"stalled", "failed"}: | |
| self._surface_log_buffer.append( | |
| "alert: distributed collective progress stalled; diagnostics required" | |
| ) | |
| elif ratio < 0.8: | |
| self._surface_log_buffer.append( | |
| "alert: interconnect throughput below expected envelope" | |
| ) | |
| self._surface_log_buffer.append( | |
| f"sampling_noise: telemetry jitter {jitter_pct:.2f}%" | |
| ) | |
| self._surface_log_buffer = self._surface_log_buffer[:7] | |
| def visible_logs(self) -> list[str]: | |
| """Return visible surface logs for the agent.""" | |
| return list(self._surface_log_buffer) | |
| def generate_nccl_subsystem_logs( | |
| self, | |
| cluster: ClusterStateMachine, | |
| subsystem: str, | |
| time_window: int = 10, | |
| ) -> list[str]: | |
| """Return deep subsystem logs for targeted diagnostics.""" | |
| normalized = subsystem.strip().lower().replace("-", "_").replace("/", "_") | |
| subsystem_map: dict[str, NCCLSubsystem] = { | |
| "net_ib": NCCLSubsystem.NET_IB, | |
| "graph": NCCLSubsystem.GRAPH, | |
| "watchdog": NCCLSubsystem.WATCHDOG, | |
| "init": NCCLSubsystem.INIT, | |
| "transport": NCCLSubsystem.TRANSPORT, | |
| "socket": NCCLSubsystem.SOCKET, | |
| } | |
| selected = subsystem_map.get(normalized) | |
| if selected is None: | |
| selected = NCCLSubsystem.WATCHDOG | |
| return self._nccl_engine.generate( | |
| cluster=cluster, | |
| subsystem=selected, | |
| time_window=time_window, | |
| ) | |
| def generate_nccl_logs( | |
| self, cluster: ClusterStateMachine, time_window: int = 10 | |
| ) -> list[str]: | |
| """Backward-compatible deep logs helper across major subsystems.""" | |
| logs: list[str] = [] | |
| for subsystem in ( | |
| NCCLSubsystem.WATCHDOG, | |
| NCCLSubsystem.GRAPH, | |
| NCCLSubsystem.NET_IB, | |
| ): | |
| logs.extend( | |
| self._nccl_engine.generate( | |
| cluster=cluster, | |
| subsystem=subsystem, | |
| time_window=max(1, time_window // 3), | |
| ) | |
| ) | |
| return logs | |
| def get_investigation_output( | |
| self, | |
| cluster: ClusterStateMachine, | |
| action_type: str, | |
| rank_id: int | None = None, | |
| ) -> dict[str, Any]: | |
| """Return action-specific deep output with realistic noise.""" | |
| if action_type == "inspect_flight_recorder": | |
| if rank_id is None: | |
| return {"error": "rank_id parameter required"} | |
| payload = cluster._generate_flight_recorder_data(int(rank_id)) | |
| entries = list(payload.get("entries", [])) | |
| wrong_ranks = [ | |
| node.node_id | |
| for node in cluster.nodes | |
| if node.node_id != int(rank_id) | |
| ] | |
| self._random.shuffle(wrong_ranks) | |
| noise_count = self._random.randint(1, 2) | |
| for wrong_rank in wrong_ranks[:noise_count]: | |
| noise_entry = { | |
| "profiling_name": "nccl:all_reduce", | |
| "rank": wrong_rank, | |
| "collective_seq_id": 2000 + wrong_rank, | |
| "p2p_seq_id": 0, | |
| "op_id": 2000 + wrong_rank, | |
| "state": "completed", | |
| "input_sizes": [[1024, 2048]], | |
| "output_sizes": [[1024, 2048]], | |
| "input_dtypes": ["Float"], | |
| "output_dtypes": ["Float"], | |
| "timeout_ms": 1800000, | |
| "time_created_ns": (self._seed * 1_000_000) + (wrong_rank * 10_000), | |
| "time_started_ns": (self._seed * 1_000_000) + (wrong_rank * 10_000) + 100, | |
| "time_finished_ns": (self._seed * 1_000_000) + (wrong_rank * 10_000) + 600, | |
| "frames": [ | |
| { | |
| "name": "all_reduce", | |
| "filename": "torch/distributed/distributed_c10d.py", | |
| "line": 2891, | |
| } | |
| ], | |
| } | |
| entries.append(noise_entry) | |
| payload["entries"] = entries | |
| return {"flight_recorder": payload} | |
| if action_type == "query_nccl_logs": | |
| logs = self.generate_nccl_subsystem_logs(cluster, "watchdog", time_window=10) | |
| wrong_rank_candidates = [ | |
| node.node_id | |
| for node in cluster.nodes | |
| if node.node_id != cluster._scenario.failing_rank_id | |
| ] | |
| wrong_rank = ( | |
| self._random.choice(wrong_rank_candidates) | |
| if wrong_rank_candidates | |
| else cluster._scenario.failing_rank_id | |
| ) | |
| misleading = [ | |
| ( | |
| f"[t-{i}][rank{wrong_rank}] NCCL INFO Watchdog: " | |
| "brief timeout observed; auto-recovered in 12ms" | |
| ) | |
| for i in range(3, 0, -1) | |
| ] | |
| return {"nccl_logs": [*misleading, *logs]} | |
| return {"error": "unsupported investigation action"} | |