nervousystem-env / simulation /telemetry.py
vx7sh's picture
feat(curriculum): adaptive difficulty for telemetry, masking, and secondary failures
3928ed0
"""Telemetry simulation for GPU cluster with NCCL-faithful log formats."""
from __future__ import annotations
from enum import Enum
from random import Random
from typing import Any
from app.config import NUM_NODES
from simulation.cluster import ClusterStateMachine
class NCCLSubsystem(str, Enum):
"""Supported NCCL INFO subsystems."""
NET_IB = "NET/IB"
GRAPH = "GRAPH/Search"
WATCHDOG = "Watchdog"
INIT = "Init"
TRANSPORT = "Transport"
SOCKET = "Socket"
class NCCLLogEngine:
"""Deterministic NCCL INFO log generator."""
def __init__(self, seed: int) -> None:
"""Initialize engine with deterministic randomness."""
self._seed = seed
self._random = Random(seed)
def reset(self, seed: int) -> None:
"""Reset engine randomness with a new seed."""
self._seed = seed
self._random = Random(seed)
def generate(
self,
cluster: ClusterStateMachine,
subsystem: NCCLSubsystem,
time_window: int,
) -> list[str]:
"""Generate subsystem-specific NCCL INFO logs."""
window = max(1, time_window)
failure_type = cluster._scenario.failure_type
logs: list[str] = []
for offset in range(window):
rank_id = offset % NUM_NODES
if subsystem == NCCLSubsystem.NET_IB:
logs.extend(self._net_ib_logs(cluster, rank_id, failure_type))
elif subsystem == NCCLSubsystem.GRAPH:
logs.extend(self._graph_logs(cluster, rank_id, failure_type, offset))
elif subsystem == NCCLSubsystem.WATCHDOG:
logs.extend(self._watchdog_logs(cluster, rank_id, failure_type, offset))
elif subsystem == NCCLSubsystem.INIT:
logs.extend(self._init_logs(cluster, rank_id, failure_type))
elif subsystem == NCCLSubsystem.TRANSPORT:
logs.extend(self._transport_logs(cluster, rank_id, failure_type))
elif subsystem == NCCLSubsystem.SOCKET:
logs.extend(self._socket_logs(cluster, rank_id, failure_type))
return logs
def _prefix(self, node_id: int, rank_offset: int = 0) -> str:
"""Build deterministic hostname/pid/tid prefix for a node."""
hostname = f"gpu-node-{node_id:02d}"
pid = 1000 + ((self._seed * 131 + node_id * 977) % 9000)
tid = pid + rank_offset
return f"{hostname}:{pid}:{tid}"
def _line(
self,
node_id: int,
subsystem: NCCLSubsystem,
message: str,
rank_offset: int = 0,
) -> str:
"""Format one NCCL INFO line in production style."""
prefix = self._prefix(node_id=node_id, rank_offset=rank_offset)
return f"{prefix} NCCL INFO {subsystem.value} {message}"
def _net_ib_logs(
self, cluster: ClusterStateMachine, rank_id: int, failure_type: str
) -> list[str]:
"""Generate NET/IB logs for throughput and interface health."""
node_id = rank_id % NUM_NODES
ratio = cluster.training.throughput_tokens_per_sec / max(
1.0, cluster.training.target_throughput
)
current_gbps = 200.0 * ratio
logs = [
self._line(
node_id,
NCCLSubsystem.NET_IB,
": Using [0]mlx5_0:1/IB [1]mlx5_1:1/IB",
rank_offset=rank_id,
),
]
if failure_type in {"congestion", "cascade"}:
logs.append(
self._line(
node_id,
NCCLSubsystem.NET_IB,
(
f": Link degraded on mlx5_1:1, effective bandwidth "
f"{current_gbps:.1f} GB/s (< 160.0 GB/s target)"
),
rank_offset=rank_id,
)
)
else:
logs.append(
self._line(
node_id,
NCCLSubsystem.NET_IB,
f": Link stable, effective bandwidth {current_gbps:.1f} GB/s",
rank_offset=rank_id,
)
)
return logs
def _graph_logs(
self,
cluster: ClusterStateMachine,
rank_id: int,
failure_type: str,
offset: int,
) -> list[str]:
"""Generate graph search and topology logs."""
node_id = rank_id % NUM_NODES
rings = 4
logs = [
self._line(
node_id,
NCCLSubsystem.GRAPH,
f": {NUM_NODES} nodes, {NUM_NODES * 8} GPUs, {rings} rings",
rank_offset=rank_id,
)
]
if failure_type in {"congestion", "cascade"}:
logs.append(
self._line(
node_id,
NCCLSubsystem.GRAPH,
": ring 2 crosses oversubscribed spine links (rack-locality violated)",
rank_offset=rank_id,
)
)
if failure_type in {"desync", "cascade"}:
base_seq = 1198 + offset
drift = (rank_id % 3) - 1
logs.append(
self._line(
node_id,
NCCLSubsystem.GRAPH,
(
": compiled different collectives across ranks "
f"(rank={rank_id}, seq_id={base_seq + drift}, expected={base_seq})"
),
rank_offset=rank_id,
)
)
return logs
def _watchdog_logs(
self,
cluster: ClusterStateMachine,
rank_id: int,
failure_type: str,
offset: int,
) -> list[str]:
"""Generate watchdog timeout and rank stall diagnostics."""
node_id = rank_id % NUM_NODES
failing_rank = cluster._scenario.failing_rank_id
seq_id = 1198 + offset
logs: list[str] = []
if failure_type in {"oom", "desync", "cascade"}:
logs.append(
self._line(
node_id,
NCCLSubsystem.WATCHDOG,
(
": collective operation timeout on rank "
f"{failing_rank} (seq_id={seq_id})"
),
rank_offset=rank_id,
)
)
if failure_type in {"oom", "cascade"}:
logs.append(
self._line(
node_id,
NCCLSubsystem.WATCHDOG,
f": rank {failing_rank} stalled after CUDA Xid 79 (OOM)",
rank_offset=rank_id,
)
)
else:
logs.append(
self._line(
node_id,
NCCLSubsystem.WATCHDOG,
": no collective timeout detected in current window",
rank_offset=rank_id,
)
)
return logs
def _init_logs(
self, cluster: ClusterStateMachine, rank_id: int, failure_type: str
) -> list[str]:
"""Generate NCCL init and compatibility diagnostics."""
node_id = rank_id % NUM_NODES
logs = [
self._line(
node_id,
NCCLSubsystem.INIT,
": Bootstrap : Using eth0:10.10.0.0<0>",
rank_offset=rank_id,
)
]
if failure_type == "cascade":
logs.append(
self._line(
node_id,
NCCLSubsystem.INIT,
(
": Loaded NCCL 2.21.5 but expected 2.27.0 - "
"LD_LIBRARY_PATH version mismatch; message truncated errors may occur"
),
rank_offset=rank_id,
)
)
return logs
def _transport_logs(
self, cluster: ClusterStateMachine, rank_id: int, failure_type: str
) -> list[str]:
"""Generate transport-level completion and error logs."""
node_id = rank_id % NUM_NODES
logs: list[str] = []
if failure_type in {"oom", "cascade"}:
logs.append(
self._line(
node_id,
NCCLSubsystem.TRANSPORT,
"Got completion with error 12 (Remote access error)",
rank_offset=rank_id,
)
)
logs.append(
self._line(
node_id,
NCCLSubsystem.TRANSPORT,
"CUDA driver reported Xid 79 on receive queue",
rank_offset=rank_id,
)
)
elif failure_type in {"desync", "congestion"}:
logs.append(
self._line(
node_id,
NCCLSubsystem.TRANSPORT,
"send proxy retries increased due to delayed completions",
rank_offset=rank_id,
)
)
else:
logs.append(
self._line(
node_id,
NCCLSubsystem.TRANSPORT,
"all transport channels healthy",
rank_offset=rank_id,
)
)
return logs
def _socket_logs(
self, cluster: ClusterStateMachine, rank_id: int, failure_type: str
) -> list[str]:
"""Generate socket connect and retry logs."""
node_id = rank_id % NUM_NODES
failing_rank = cluster._scenario.failing_rank_id
if failure_type in {"oom", "desync", "cascade"}:
return [
self._line(
node_id,
NCCLSubsystem.SOCKET,
f"socketPollConnect: Connection refused on rank {failing_rank}",
rank_offset=rank_id,
)
]
return [
self._line(
node_id,
NCCLSubsystem.SOCKET,
"socketPollConnect: connection established for all peers",
rank_offset=rank_id,
)
]
class TelemetryStream:
"""Simulates surface telemetry and hidden NCCL subsystem diagnostics."""
def __init__(
self,
seed: int,
red_herring_probability: float = 0.30,
telemetry_mask_probability: float = 0.20,
) -> None:
"""Initialize telemetry stream with deterministic randomness."""
self._seed = seed
self._random = Random(seed)
self._surface_log_buffer: list[str] = []
self._step = 0
self._nccl_engine = NCCLLogEngine(seed=seed)
self._red_herring_probability = red_herring_probability
self._telemetry_mask_probability = telemetry_mask_probability
def reset(self, seed: int) -> None:
"""Reset telemetry state."""
self._seed = seed
self._random = Random(seed)
self._surface_log_buffer = []
self._step = 0
self._nccl_engine.reset(seed=seed)
def configure_difficulty(
self,
red_herring_probability: float,
telemetry_mask_probability: float,
) -> None:
"""Apply adaptive curriculum probabilities."""
self._red_herring_probability = red_herring_probability
self._telemetry_mask_probability = telemetry_mask_probability
def update(self, cluster: ClusterStateMachine) -> None:
"""Update surface telemetry based on cluster state."""
self._step += 1
self._surface_log_buffer.clear()
throughput = cluster.training.throughput_tokens_per_sec
target = cluster.training.target_throughput
ratio = throughput / max(1.0, target)
jitter_pct = self._random.uniform(0.5, 2.5)
self._surface_log_buffer.append(
f"cluster_status: training={cluster.training.job_status} step={cluster.training.current_step}"
)
self._surface_log_buffer.append(
f"throughput: {throughput:.1f}/{target:.1f} tok/s ({ratio * 100:.1f}% of target)"
)
degraded_nodes = [node.node_id for node in cluster.nodes if node.health_status != "healthy"]
if self._random.random() < self._telemetry_mask_probability:
self._surface_log_buffer.append("node_health: all nodes nominal")
else:
self._surface_log_buffer.append(
f"node_health: degraded_nodes={degraded_nodes if degraded_nodes else 'none'}"
)
healthy_non_failing_nodes = [
node.node_id
for node in cluster.nodes
if node.health_status == "healthy" and node.node_id != cluster._scenario.failing_node_id
]
if (
healthy_non_failing_nodes
and self._random.random() < self._red_herring_probability
):
random_healthy_node = self._random.choice(healthy_non_failing_nodes)
self._surface_log_buffer.append(
"alert: node "
f"{random_healthy_node} elevated retransmit rate (may be noise)"
)
if cluster.training.job_status in {"stalled", "failed"}:
self._surface_log_buffer.append(
"alert: distributed collective progress stalled; diagnostics required"
)
elif ratio < 0.8:
self._surface_log_buffer.append(
"alert: interconnect throughput below expected envelope"
)
self._surface_log_buffer.append(
f"sampling_noise: telemetry jitter {jitter_pct:.2f}%"
)
self._surface_log_buffer = self._surface_log_buffer[:7]
def visible_logs(self) -> list[str]:
"""Return visible surface logs for the agent."""
return list(self._surface_log_buffer)
def generate_nccl_subsystem_logs(
self,
cluster: ClusterStateMachine,
subsystem: str,
time_window: int = 10,
) -> list[str]:
"""Return deep subsystem logs for targeted diagnostics."""
normalized = subsystem.strip().lower().replace("-", "_").replace("/", "_")
subsystem_map: dict[str, NCCLSubsystem] = {
"net_ib": NCCLSubsystem.NET_IB,
"graph": NCCLSubsystem.GRAPH,
"watchdog": NCCLSubsystem.WATCHDOG,
"init": NCCLSubsystem.INIT,
"transport": NCCLSubsystem.TRANSPORT,
"socket": NCCLSubsystem.SOCKET,
}
selected = subsystem_map.get(normalized)
if selected is None:
selected = NCCLSubsystem.WATCHDOG
return self._nccl_engine.generate(
cluster=cluster,
subsystem=selected,
time_window=time_window,
)
def generate_nccl_logs(
self, cluster: ClusterStateMachine, time_window: int = 10
) -> list[str]:
"""Backward-compatible deep logs helper across major subsystems."""
logs: list[str] = []
for subsystem in (
NCCLSubsystem.WATCHDOG,
NCCLSubsystem.GRAPH,
NCCLSubsystem.NET_IB,
):
logs.extend(
self._nccl_engine.generate(
cluster=cluster,
subsystem=subsystem,
time_window=max(1, time_window // 3),
)
)
return logs
def get_investigation_output(
self,
cluster: ClusterStateMachine,
action_type: str,
rank_id: int | None = None,
) -> dict[str, Any]:
"""Return action-specific deep output with realistic noise."""
if action_type == "inspect_flight_recorder":
if rank_id is None:
return {"error": "rank_id parameter required"}
payload = cluster._generate_flight_recorder_data(int(rank_id))
entries = list(payload.get("entries", []))
wrong_ranks = [
node.node_id
for node in cluster.nodes
if node.node_id != int(rank_id)
]
self._random.shuffle(wrong_ranks)
noise_count = self._random.randint(1, 2)
for wrong_rank in wrong_ranks[:noise_count]:
noise_entry = {
"profiling_name": "nccl:all_reduce",
"rank": wrong_rank,
"collective_seq_id": 2000 + wrong_rank,
"p2p_seq_id": 0,
"op_id": 2000 + wrong_rank,
"state": "completed",
"input_sizes": [[1024, 2048]],
"output_sizes": [[1024, 2048]],
"input_dtypes": ["Float"],
"output_dtypes": ["Float"],
"timeout_ms": 1800000,
"time_created_ns": (self._seed * 1_000_000) + (wrong_rank * 10_000),
"time_started_ns": (self._seed * 1_000_000) + (wrong_rank * 10_000) + 100,
"time_finished_ns": (self._seed * 1_000_000) + (wrong_rank * 10_000) + 600,
"frames": [
{
"name": "all_reduce",
"filename": "torch/distributed/distributed_c10d.py",
"line": 2891,
}
],
}
entries.append(noise_entry)
payload["entries"] = entries
return {"flight_recorder": payload}
if action_type == "query_nccl_logs":
logs = self.generate_nccl_subsystem_logs(cluster, "watchdog", time_window=10)
wrong_rank_candidates = [
node.node_id
for node in cluster.nodes
if node.node_id != cluster._scenario.failing_rank_id
]
wrong_rank = (
self._random.choice(wrong_rank_candidates)
if wrong_rank_candidates
else cluster._scenario.failing_rank_id
)
misleading = [
(
f"[t-{i}][rank{wrong_rank}] NCCL INFO Watchdog: "
"brief timeout observed; auto-recovered in 12ms"
)
for i in range(3, 0, -1)
]
return {"nccl_logs": [*misleading, *logs]}
return {"error": "unsupported investigation action"}