| |
| """Cheap HF Jobs boot/log/runtime smoke for HYDRA/Feather images. |
| |
| This command is intentionally non-training and non-secret-printing. It exists so |
| we can verify that an HF image starts, emits logs, sees the requested runtime |
| environment, and carries the checkpoint symbols needed by the real training |
| entrypoint before spending on data prep or training. |
| """ |
| from __future__ import annotations |
|
|
| import importlib |
| import json |
| import os |
| import sys |
| from pathlib import Path |
|
|
|
|
| SAFE_ENV_KEYS = [ |
| "FEATHER_GPU_PROFILE", |
| "FEATHER_HF_FLAVOR", |
| "FEATHER_RUNTIME_MODE", |
| "FEATHER_HF_STRICT_RUNTIME_PREFLIGHT", |
| "HYDRA_RUNTIME_PROFILE", |
| "HYDRA_STRICT_OPTIMAL_COMPONENTS", |
| "HYDRA_USE_NEMOTRON", |
| "HYDRA_NEMOTRON_SINGLE_CONFIG", |
| "HYDRA_LOCAL_SHARDS_ONLY", |
| "HYDRA_TARGET_SHARDS", |
| "HYDRA_TIME_BUDGET", |
| "HYDRA_CKPT_INTERVAL", |
| "HYDRA_EVAL_TOKENS", |
| "HYDRA_HYENA_LAYERS", |
| "HYDRA_FORCE_HTM_CPU", |
| "HYDRA_HTM_FUSED", |
| "HYDRA_HTM_BATCHED_FUSED", |
| "HYDRA_DISABLE_FUSED_SDR_TRITON", |
| "HYDRA_TOKEN_CACHE_GB", |
| "HYDRA_DISABLE_TOKEN_CACHE", |
| "HYDRA_HTM_STRICT_SCALE_FREE", |
| "HYDRA_HTM_REGION_POOL_SIZE", |
| "HYDRA_HTM_CHUNK_B", |
| "HTM_CUDA_ARCH", |
| "TORCH_CUDA_ARCH_LIST", |
| ] |
|
|
|
|
| def _repo_candidates() -> list[Path]: |
| here = Path(__file__).resolve() |
| return [ |
| Path("/workspace/feather"), |
| Path("/app"), |
| here.parents[1] if len(here.parents) > 1 else here.parent, |
| ] |
|
|
|
|
| def ensure_repo_on_path() -> None: |
| for candidate in _repo_candidates(): |
| if (candidate / "hydra").exists() and str(candidate) not in sys.path: |
| sys.path.insert(0, str(candidate)) |
| print(f"[boot_smoke] repo_path={candidate}", flush=True) |
| return |
| print("[boot_smoke] repo_path=<not-found>; using existing sys.path", flush=True) |
|
|
|
|
| def safe_env_summary() -> dict[str, str]: |
| return {key: os.environ[key] for key in SAFE_ENV_KEYS if key in os.environ} |
|
|
|
|
| def _truthy_env(name: str) -> bool: |
| return os.environ.get(name, "0").strip().lower() in {"1", "true", "yes", "on"} |
|
|
|
|
| def strict_optimal_preflight_requested() -> bool: |
| return ( |
| _truthy_env("FEATHER_HF_STRICT_RUNTIME_PREFLIGHT") |
| or os.environ.get("HYDRA_STRICT_OPTIMAL_COMPONENTS", "0") == "1" |
| or os.environ.get("HYDRA_RUNTIME_PROFILE", "").strip().lower() == "optimal-strict" |
| ) |
|
|
|
|
| def _import_required_module(module_name: str): |
| try: |
| module = importlib.import_module(module_name) |
| except Exception as exc: |
| print(f"[strict_preflight] {module_name}=failed {type(exc).__name__}: {exc}", flush=True) |
| return None |
| print(f"[strict_preflight] {module_name}=ok", flush=True) |
| return module |
|
|
|
|
| def run_strict_optimal_preflight() -> int: |
| """Fail before training if the strict-optimal A10G fast path is unavailable. |
| |
| This is intentionally a runtime/image preflight, not a CPU fallback. It |
| verifies the same strict fast-path surfaces that otherwise fail only after a |
| paid trainer has finished build/provenance setup. |
| """ |
| print("[strict_preflight] phase=start", flush=True) |
| failures: list[str] = [] |
|
|
| torch = _import_required_module("torch") |
| if torch is None: |
| failures.append("torch_import") |
| else: |
| try: |
| cuda_available = bool(torch.cuda.is_available()) |
| device_count = int(torch.cuda.device_count()) if cuda_available else 0 |
| device_name = torch.cuda.get_device_name(0) if cuda_available and device_count else "<none>" |
| if not cuda_available or device_count < 1: |
| failures.append("torch_cuda") |
| print( |
| f"[strict_preflight] torch_cuda=failed cuda_available={int(cuda_available)} device_count={device_count}", |
| flush=True, |
| ) |
| else: |
| print( |
| f"[strict_preflight] torch_cuda=ok device_count={device_count} device0={device_name}", |
| flush=True, |
| ) |
| except Exception as exc: |
| failures.append("torch_cuda") |
| print(f"[strict_preflight] torch_cuda=failed {type(exc).__name__}: {exc}", flush=True) |
|
|
| triton = _import_required_module("triton") |
| if triton is None: |
| failures.append("triton_import") |
| else: |
| try: |
| active = triton.runtime.driver.active |
| device = active.get_current_device() |
| print(f"[strict_preflight] triton_driver=ok device={device}", flush=True) |
| except Exception as exc: |
| failures.append("triton_driver") |
| print(f"[strict_preflight] triton_driver=failed {type(exc).__name__}: {exc}", flush=True) |
|
|
| mamba = _import_required_module("mamba_ssm") |
| if mamba is None or not hasattr(mamba, "Mamba3"): |
| failures.append("mamba") |
| print("[strict_preflight] mamba=missing Mamba3", flush=True) |
| else: |
| print("[strict_preflight] mamba=ok Mamba3=True", flush=True) |
|
|
| fused_sdr = None |
| for module_name in ("subsystems.fused_sdr_project",): |
| try: |
| module = importlib.import_module(module_name) |
| except Exception as exc: |
| print(f"[strict_preflight] fused_sdr_candidate={module_name} failed {type(exc).__name__}: {exc}", flush=True) |
| continue |
| if hasattr(module, "FusedSDRProject"): |
| fused_sdr = (module_name, module) |
| break |
| if fused_sdr is None: |
| failures.append("fused_sdr") |
| print("[strict_preflight] fused_sdr=missing FusedSDRProject", flush=True) |
| else: |
| module_name, _module = fused_sdr |
| print(f"[strict_preflight] fused_sdr=ok module={module_name}", flush=True) |
|
|
| htm = _import_required_module("htm_rust") |
| if htm is None: |
| failures.append("htm_rust") |
| else: |
| has_region = hasattr(htm, "HTMRegion") |
| has_gpu = hasattr(htm, "HTMRegionGpu") |
| has_fused = hasattr(htm, "step_batch_fused_cuda") |
| if not (has_region and has_gpu and has_fused): |
| failures.append("htm_rust") |
| print( |
| "[strict_preflight] htm_rust=failed " |
| f"HTMRegion={has_region} HTMRegionGpu={has_gpu} step_batch_fused_cuda={has_fused}", |
| flush=True, |
| ) |
| else: |
| print( |
| "[strict_preflight] htm_rust=ok " |
| f"HTMRegion={has_region} HTMRegionGpu={has_gpu} step_batch_fused_cuda={has_fused}", |
| flush=True, |
| ) |
|
|
| if failures: |
| print(f"[strict_preflight] phase=failed failures={','.join(failures)}", flush=True) |
| return 5 |
| print("[strict_preflight] phase=done", flush=True) |
| return 0 |
|
|
|
|
| def main() -> int: |
| print("[boot_smoke] phase=start", flush=True) |
| ensure_repo_on_path() |
| print(f"[boot_smoke] python={sys.version.split()[0]} executable={sys.executable}", flush=True) |
| print(f"[boot_smoke] env={json.dumps(safe_env_summary(), sort_keys=True)}", flush=True) |
|
|
| try: |
| torch = importlib.import_module("torch") |
| cuda_available = bool(torch.cuda.is_available()) |
| device_count = int(torch.cuda.device_count()) if cuda_available else 0 |
| device_name = torch.cuda.get_device_name(0) if cuda_available and device_count else "<none>" |
| print( |
| f"[boot_smoke] torch={torch.__version__} cuda_available={int(cuda_available)} " |
| f"device_count={device_count} device0={device_name}", |
| flush=True, |
| ) |
| except Exception as exc: |
| print(f"[boot_smoke] torch_import_failed={type(exc).__name__}: {exc}", flush=True) |
| return 2 |
|
|
| if strict_optimal_preflight_requested(): |
| rc = run_strict_optimal_preflight() |
| if rc != 0: |
| return rc |
|
|
| try: |
| training = importlib.import_module("hydra.training") |
| required = ["LATEST_CKPT", "PRETRAIN_FINAL_CKPT", "save_ckpt", "maybe_resume_ckpt"] |
| missing = [name for name in required if not hasattr(training, name)] |
| if missing: |
| print(f"[boot_smoke] training_contract=missing {missing}", flush=True) |
| return 3 |
| print( |
| "[boot_smoke] training_contract=ok " |
| f"LATEST_CKPT={getattr(training, 'LATEST_CKPT')} " |
| f"PRETRAIN_FINAL_CKPT={getattr(training, 'PRETRAIN_FINAL_CKPT')}", |
| flush=True, |
| ) |
| except Exception as exc: |
| print(f"[boot_smoke] training_import_failed={type(exc).__name__}: {exc}", flush=True) |
| return 4 |
|
|
| print("[boot_smoke] phase=done", flush=True) |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|