feather-a10g-large-runtime / overlay /scripts /launch_feather_hf_job.py
icarus112's picture
Update Feather a10g-large training runtime image
65cd644 verified
#!/usr/bin/env python3
from __future__ import annotations
import json
import os
import shlex
import shutil
import subprocess
import sys
import time
from pathlib import Path
from huggingface_hub import HfApi
REPO_ROOT = Path(__file__).resolve().parents[1]
if str(REPO_ROOT) not in sys.path:
sys.path.insert(0, str(REPO_ROOT))
from configs.harness_config import HarnessConfig
from scripts.hf_routing import resolve_routing
TARGET_SHARDS = os.environ.get('HYDRA_TARGET_SHARDS', '2048')
TIME_BUDGET = os.environ.get('HYDRA_TIME_BUDGET', '43200')
REQUESTED_GPU_FLAVOR = os.environ.get('FEATHER_HF_FLAVOR', 'a10g-large')
GPU_ARCH_BY_FLAVOR = {
'a10g-small': ('sm_86', '8.6'),
'a10g-large': ('sm_86', '8.6'),
'a10g-largex2': ('sm_86', '8.6'),
'a10g-largex4': ('sm_86', '8.6'),
'a100-large': ('sm_80', '8.0'),
'a100x4': ('sm_80', '8.0'),
'a100x8': ('sm_80', '8.0'),
'h200': ('sm_90a', '9.0'),
'h200x2': ('sm_90a', '9.0'),
'h200x4': ('sm_90a', '9.0'),
'h200x8': ('sm_90a', '9.0'),
}
HF_NAMESPACE = os.environ.get('FEATHER_HF_NAMESPACE')
DEFAULT_IMAGE = os.environ.get('FEATHER_HF_IMAGE', 'ghcr.io/slapglif/feather-hf-runtime:a10g-large')
IMAGE_DIR = Path(__file__).resolve().parents[1] / 'hf_jobs' / 'feather_h200_image'
TIMEOUT = os.environ.get('FEATHER_HF_JOB_TIMEOUT', '12h')
SPACE_PRIVATE = os.environ.get('FEATHER_HF_SPACE_PRIVATE', '1') == '1'
OUTPUT_PRIVATE = os.environ.get('FEATHER_HF_OUTPUT_PRIVATE', '1') == '1'
DOWNLOAD_WORKERS = os.environ.get('HYDRA_DOWNLOAD_WORKERS', '16')
CKPT_INTERVAL = os.environ.get('HYDRA_CKPT_INTERVAL', '1000')
DRY_RUN = os.environ.get('FEATHER_HF_DRY_RUN', '0') == '1'
USE_SPACE_IMAGE = os.environ.get('FEATHER_HF_USE_SPACE_IMAGE', '0') == '1'
# When true, assume the Space image has already been built by a previous
# invocation and skip the upload+build wait. Used by sweep drivers that fan
# out many jobs against a single pre-uploaded image.
SKIP_UPLOAD = os.environ.get('FEATHER_HF_SKIP_UPLOAD', '0') == '1'
SYNC_OVERLAY = os.environ.get('FEATHER_HF_SYNC_OVERLAY', '1') == '1'
def _truthy_env(name: str) -> bool:
return os.environ.get(name, '0').strip().lower() in {'1', 'true', 'yes', 'on'}
def should_enable_fast_start_streaming(target_shards: str, time_budget: str) -> bool:
"""Use streaming data path for short-budget launch profiles."""
try:
shards = int(target_shards)
budget = int(time_budget)
except ValueError:
return False
return shards > 0 and shards <= 256 and budget > 0 and budget <= 1800
def resolve_effective_gpu_flavor(requested_flavor: str, target_shards: str, time_budget: str) -> str:
"""Keep HYDRA/Feather remote launches on A10 by default.
H200 remains a break-glass diagnostic path, but normal training/canaries are
now routed to A10-class GPUs. FEATHER_HF_ALLOW_H200_EXPERIMENT is
intentionally separate from the older canary cost override so stale scripts
cannot accidentally keep using H200.
"""
if requested_flavor.startswith('h200') and not _truthy_env('FEATHER_HF_ALLOW_H200_EXPERIMENT'):
return os.environ.get('FEATHER_HF_A10_FLAVOR', os.environ.get('FEATHER_HF_CANARY_FLAVOR', 'a10g-large'))
return requested_flavor
GPU_FLAVOR = resolve_effective_gpu_flavor(REQUESTED_GPU_FLAVOR, TARGET_SHARDS, TIME_BUDGET)
GPU_PROFILE = os.environ.get('FEATHER_GPU_PROFILE', GPU_FLAVOR)
HTM_CUDA_ARCH, TORCH_CUDA_ARCH = GPU_ARCH_BY_FLAVOR.get(GPU_FLAVOR, ('sm_86', '8.6'))
def sync_overlay_from_repo() -> None:
"""Refresh Space overlay with required project files."""
overlay = IMAGE_DIR / 'overlay'
overlay.mkdir(parents=True, exist_ok=True)
include_paths = [
'hydra',
'subsystems',
'scripts',
'htm_rust',
'harness',
'configs',
'prepare.py',
'prepare_nemotron.py',
'train.py',
'pyproject.toml',
'uv.lock',
]
ignore = shutil.ignore_patterns(
'__pycache__',
'.pytest_cache',
'.ruff_cache',
'.venv',
'.git',
'target',
'*.pyc',
)
copied: list[str] = []
for rel in include_paths:
src = REPO_ROOT / rel
dst = overlay / rel
if not src.exists():
continue
preserve_overlay_dir = rel == 'htm_rust' and (dst / 'src' / 'gpu' / 'mod.rs').exists()
if dst.exists() and not preserve_overlay_dir:
if dst.is_dir():
shutil.rmtree(dst)
else:
dst.unlink()
if src.is_dir():
# htm_rust is currently overlay-extended: repo-root lacks the full GPU
# backend module set, while the HF overlay carries mod.rs/sp_gpu/tm_gpu
# and auxiliary kernels required for --features gpu. Merge rather than
# delete it, otherwise a fresh no-cache rebuild silently drops the
# step_batch_fused_cuda Python export.
shutil.copytree(src, dst, dirs_exist_ok=True, ignore=ignore)
else:
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dst)
copied.append(rel)
scripts_dir = overlay / 'scripts'
if scripts_dir.exists():
for sh_path in scripts_dir.rglob('*.sh'):
data = sh_path.read_bytes()
data = data.replace(b'\r\n', b'\n').replace(b'\r', b'\n')
sh_path.write_bytes(data)
print(f'[launch] overlay synced from repo ({len(copied)} paths): {copied}', flush=True)
def load_hf_token() -> str | None:
"""Load a Hugging Face token without printing or persisting secret values."""
token, _source = load_hf_token_with_source()
return token
def build_job_command() -> list[str]:
"""Return HF Jobs command, optionally overridden for diagnostics."""
override = os.environ.get('FEATHER_HF_JOB_COMMAND')
if override:
return shlex.split(override)
if _truthy_env('FEATHER_HF_BOOT_SMOKE') or _truthy_env('FEATHER_HF_STRICT_RUNTIME_PREFLIGHT'):
return ['python', '/app/scripts/hf_boot_smoke.py']
if _truthy_env('FEATHER_HF_CHECKPOINT_EVAL'):
return ['python', '/app/scripts/hf_checkpoint_eval.py']
return ['python', '/app/entrypoint.py']
def load_hf_token_with_source() -> tuple[str | None, str]:
"""Load a Hugging Face token and return a non-secret source label."""
for env_name in ('HF_TOKEN', 'HUGGINGFACE_HUB_TOKEN'):
token = os.environ.get(env_name)
if token:
return token, 'provided'
token_file = Path(os.environ.get('HF_TOKEN_PATH', Path.home() / '.cache' / 'huggingface' / 'token')).expanduser()
try:
token = token_file.read_text(encoding='utf-8').strip()
except FileNotFoundError:
return None, 'missing'
except OSError:
return None, 'unreadable'
return (token, 'token_file') if token else (None, 'empty_file')
def require_token() -> str:
token, _source = load_hf_token_with_source()
if not token:
raise SystemExit(
'HF token required: set HF_TOKEN/HUGGINGFACE_HUB_TOKEN or run `huggingface-cli login` '
'so ~/.cache/huggingface/token exists'
)
return token
def wait_for_space(api: HfApi, repo_id: str, timeout_s: int = 1800) -> None:
start = time.time()
seen_build_completion = False
seen_building = False
while True:
runtime = api.get_space_runtime(repo_id, token=load_hf_token())
stage = getattr(runtime, 'stage', None)
hardware = getattr(runtime, 'hardware', None)
print(f'[space] stage={stage} hardware={hardware}', flush=True)
if stage == 'BUILDING':
seen_building = True
if stage in {'APP_STARTING', 'RUNNING', 'PAUSED', 'SLEEPING'}:
seen_build_completion = True
if stage in {'RUNNING', 'PAUSED', 'SLEEPING'}:
return
# Image is built — Jobs can use it regardless of Space boot outcome.
# If we enter while the Space is already in RUNTIME_ERROR from a prior
# successful build, we may not observe APP_STARTING in this process; do
# not spin forever. This is the normal public-Space image-builder state.
if (seen_build_completion or seen_building) and stage in {'RUNTIME_ERROR', 'APP_STARTING_ERROR'}:
print(f'[space] Space boot failed with {stage} but built image is '
f'available in the Space registry and is usable by HF Jobs.',
flush=True)
return
# Hard build failures — no image was produced.
if stage in {'BUILD_ERROR', 'CONFIG_ERROR', 'NO_APP_FILE'}:
raise RuntimeError(f'Space {repo_id} build failed: stage={stage}')
if time.time() - start > timeout_s:
raise TimeoutError(f'Space {repo_id} did not become ready in {timeout_s}s (last stage={stage})')
time.sleep(20)
def _configure_line_buffered_output(stdout=sys.stdout, stderr=sys.stderr) -> None:
"""Make launch progress visible immediately when stdout/stderr are pipes."""
for stream in (stdout, stderr):
reconfigure = getattr(stream, 'reconfigure', None)
if reconfigure is None:
continue
try:
reconfigure(line_buffering=True)
except (TypeError, ValueError):
# Some wrapped streams do not support reconfigure at runtime.
pass
def apply_optimal_env_profile(env: dict[str, str]) -> None:
"""Apply full-component optimal runtime defaults unless caller supplied overrides."""
_optimal_defaults = {
'HYDRA_RUNTIME_PROFILE': 'optimal-strict',
'HYDRA_STRICT_OPTIMAL_COMPONENTS': '1',
'HYDRA_FORCE_HTM_CPU': '0',
'HYDRA_HTM_FUSED': '1',
'HYDRA_HTM_BATCHED_FUSED': '1',
'HYDRA_DISABLE_FUSED_SDR_TRITON': '0',
# Empty layer override means every layer remains on the intended
# Mamba3 backbone instead of a Hyena/GDN fallback/substitution.
'HYDRA_HYENA_LAYERS': '',
'HYDRA_GDN_LAYERS': '',
'HYDRA_TOKEN_CACHE_GB': '0',
'HYDRA_DISABLE_TOKEN_CACHE': '1',
}
for _k, _default in _optimal_defaults.items():
if _k in os.environ:
env[_k] = os.environ[_k]
else:
env.setdefault(_k, _default)
print(
'[launch] applied optimal runtime profile '
f"(HYDRA_RUNTIME_PROFILE={env['HYDRA_RUNTIME_PROFILE']}, "
f"HYDRA_STRICT_OPTIMAL_COMPONENTS={env['HYDRA_STRICT_OPTIMAL_COMPONENTS']}, "
f"HYDRA_FORCE_HTM_CPU={env['HYDRA_FORCE_HTM_CPU']}, "
f"HYDRA_HTM_FUSED={env['HYDRA_HTM_FUSED']}, "
f"HYDRA_HTM_BATCHED_FUSED={env['HYDRA_HTM_BATCHED_FUSED']}, "
f"HYDRA_DISABLE_FUSED_SDR_TRITON={env['HYDRA_DISABLE_FUSED_SDR_TRITON']}, "
f"HYDRA_HYENA_LAYERS={env['HYDRA_HYENA_LAYERS']}, "
f"HYDRA_GDN_LAYERS={env['HYDRA_GDN_LAYERS']})",
flush=True,
)
def apply_a10_compromise_telemetry_profile(env: dict[str, str]) -> None:
"""Apply A10-friendly compromise telemetry defaults.
This keeps the stable all-Hyena/non-fused HTM/fused-SDR-disabled runtime
used after the fused HTM blocker, but routes work to A10-class GPUs instead
of H200. It is intentionally not the full optimal architecture.
"""
_a10_compromise_defaults = {
'HYDRA_BATCH_SIZE': '16',
'HYDRA_TOTAL_BATCH': '32768',
'HYDRA_INERT_MAMBA': '1',
'HYDRA_HYENA_LAYERS': '0,1,2,3',
'HYDRA_DISABLE_FUSED_SDR_TRITON': '1',
'HYDRA_HTM_FUSED': '0',
'HYDRA_HTM_BATCHED_FUSED': '0',
'HYDRA_HTM_SUBSAMPLE': '128',
# Standardize non-corpus ablations/evals on the full Nemotron blend so
# only the intended architecture/runtime parameter varies between runs.
# Explicit caller env can still override for corpus/data-path ablations.
'HYDRA_USE_FULL_BLEND': '1',
'HYDRA_NEMOTRON_SINGLE_CONFIG': '',
'HYDRA_LOCAL_SHARDS_ONLY': '0',
'HYDRA_USE_NEMOTRON': '1',
'HYDRA_STREAM_PREFETCH': '64',
'HYDRA_STREAM_SHUFFLE_BUFFER': '16',
# Full-blend mode can otherwise keep downloading large background shards
# after a short canary hits its time budget, producing HF job ERRORs
# without useful metrics/checkpoint finalization.
'HYDRA_BACKGROUND_PREFETCH': '0',
'HYDRA_HYENA_FILTER_CACHE': '1',
'HYDRA_HYENA_TRAIN_CACHE': '1',
# A10 validation runs close to the memory cliff. Avoid Muon
# torch.compile/Inductor scratch state and keep final eval at the
# smallest batch unless the caller deliberately opts into a larger eval.
'HYDRA_MUON_COMPILE': '0',
'HYDRA_EVAL_BATCH': '1',
'PYTORCH_ALLOC_CONF': 'expandable_segments:True',
'HYDRA_MID_VAL_INTERVAL': '0',
# Keep bounded A10 canaries from tripping mid-run checkpoint/image-drift
# failures before they have emitted validation telemetry. Caller env can
# still opt back into periodic checkpoints for longer runs.
'HYDRA_CKPT_INTERVAL': '0',
'HYDRA_EVAL_TOKENS': '1000000',
'HYDRA_TOKEN_CACHE_GB': '0',
'HYDRA_DISABLE_TOKEN_CACHE': '1',
}
for _k, _default in _a10_compromise_defaults.items():
if _k in os.environ:
env[_k] = os.environ[_k]
else:
env[_k] = _default
print(
'[launch] applied A10 compromise telemetry profile '
f"(HYDRA_BATCH_SIZE={env['HYDRA_BATCH_SIZE']}, "
f"HYDRA_TOTAL_BATCH={env['HYDRA_TOTAL_BATCH']}, "
f"HYDRA_INERT_MAMBA={env['HYDRA_INERT_MAMBA']}, "
f"HYDRA_HYENA_LAYERS={env['HYDRA_HYENA_LAYERS']}, "
f"HYDRA_DISABLE_FUSED_SDR_TRITON={env['HYDRA_DISABLE_FUSED_SDR_TRITON']}, "
f"HYDRA_HTM_FUSED={env['HYDRA_HTM_FUSED']}, "
f"HYDRA_HTM_BATCHED_FUSED={env['HYDRA_HTM_BATCHED_FUSED']}, "
f"HYDRA_HTM_SUBSAMPLE={env['HYDRA_HTM_SUBSAMPLE']}, "
f"HYDRA_USE_FULL_BLEND={env['HYDRA_USE_FULL_BLEND']}, "
f"HYDRA_NEMOTRON_SINGLE_CONFIG={env['HYDRA_NEMOTRON_SINGLE_CONFIG']}, "
f"HYDRA_STREAM_PREFETCH={env['HYDRA_STREAM_PREFETCH']}, "
f"HYDRA_STREAM_SHUFFLE_BUFFER={env['HYDRA_STREAM_SHUFFLE_BUFFER']}, "
f"HYDRA_BACKGROUND_PREFETCH={env['HYDRA_BACKGROUND_PREFETCH']}, "
f"HYDRA_MUON_COMPILE={env['HYDRA_MUON_COMPILE']}, "
f"HYDRA_EVAL_BATCH={env['HYDRA_EVAL_BATCH']}, "
f"HYDRA_CKPT_INTERVAL={env['HYDRA_CKPT_INTERVAL']}, "
f"HYDRA_EVAL_TOKENS={env['HYDRA_EVAL_TOKENS']})",
flush=True,
)
def apply_a10_env_profile(env: dict[str, str]) -> None:
"""Apply operational A10 canary defaults unless caller supplied overrides."""
if not GPU_FLAVOR.startswith('a10'):
return
_a10_defaults = {
'HYDRA_MUON_COMPILE': '0',
'HYDRA_FORCE_HTM_CPU': '1',
'HYDRA_INERT_MAMBA': '1',
'HYDRA_HYENA_LAYERS': '0,1,2,3',
'HYDRA_DISABLE_FUSED_SDR_TRITON': '1',
'HYDRA_ALLOW_SYNTHETIC_RETINA': '1',
'HYDRA_FASTPATH': '1',
'HYDRA_TOKEN_CACHE_GB': '0',
'HYDRA_DISABLE_TOKEN_CACHE': '1',
}
for _k, _default in _a10_defaults.items():
if _k in os.environ:
env[_k] = os.environ[_k]
else:
env.setdefault(_k, _default)
if env.get('HYDRA_INERT_MAMBA') == '0' and 'HYDRA_FASTPATH' not in os.environ:
env['HYDRA_FASTPATH'] = '0'
print(
'[launch] applied A10 env profile '
f"(HYDRA_MUON_COMPILE={env['HYDRA_MUON_COMPILE']}, "
f"HYDRA_FORCE_HTM_CPU={env['HYDRA_FORCE_HTM_CPU']}, "
f"HYDRA_INERT_MAMBA={env['HYDRA_INERT_MAMBA']}, "
f"HYDRA_HYENA_LAYERS={env['HYDRA_HYENA_LAYERS']}, "
f"HYDRA_DISABLE_FUSED_SDR_TRITON={env['HYDRA_DISABLE_FUSED_SDR_TRITON']}, "
f"HYDRA_ALLOW_SYNTHETIC_RETINA={env['HYDRA_ALLOW_SYNTHETIC_RETINA']}, "
f"HYDRA_FASTPATH={env['HYDRA_FASTPATH']})",
flush=True,
)
def apply_caller_env_overrides(env: dict[str, str]) -> None:
"""Pass through caller HYDRA_*/FEATHER_* launch controls into a job env."""
for _k, _v in os.environ.items():
if (_k.startswith('HYDRA_') or _k.startswith('FEATHER_')) and _k not in env:
env[_k] = _v
def _int_env(env: dict[str, str], key: str) -> int | None:
value = env.get(key)
if value in (None, ''):
return None
try:
return int(str(value))
except (TypeError, ValueError):
return None
def apply_scale_free_a10g_proof_defaults(env: dict[str, str], *, gpu_flavor: str, runtime_profile: str | None) -> None:
"""Convert generic A10 defaults to faithful bounded HTM defaults when proof mode is requested."""
profile = (runtime_profile or '').strip().lower()
proof_requested = gpu_flavor.startswith('a10') and (
_truthy_env('FEATHER_HF_SCALE_FREE_PROOF')
or env.get('HYDRA_HTM_STRICT_SCALE_FREE') == '1'
or profile in {'optimal-strict', 'a10g-scale-free-proof'}
)
if not proof_requested:
return
proof_defaults = {
'HYDRA_FORCE_HTM_CPU': '0',
'HYDRA_HTM_FUSED': '1',
'HYDRA_HTM_BATCHED_FUSED': '1',
'HYDRA_TOKEN_CACHE_GB': '0',
'HYDRA_DISABLE_TOKEN_CACHE': '1',
}
for key, value in proof_defaults.items():
if key not in os.environ:
env[key] = value
def validate_scale_free_a10g_launch_env(
env: dict[str, str],
*,
gpu_flavor: str,
runtime_profile: str | None,
) -> dict:
"""Fail-closed guard for bounded A10G scale-free HTM proof launches."""
profile = (runtime_profile or '').strip().lower()
proof_requested = gpu_flavor.startswith('a10') and (
_truthy_env('FEATHER_HF_SCALE_FREE_PROOF')
or env.get('HYDRA_HTM_STRICT_SCALE_FREE') == '1'
or profile in {'optimal-strict', 'a10g-scale-free-proof'}
)
diagnostic_override = _truthy_env('FEATHER_HF_ALLOW_SCALE_FREE_DIAGNOSTIC_OVERRIDE')
reasons: list[str] = []
if proof_requested:
if env.get('HYDRA_TARGET_SHARDS') != '0':
reasons.append('HYDRA_TARGET_SHARDS=0 required for streaming/no-materialized-shard A10G proof')
if env.get('HYDRA_HTM_STRICT_SCALE_FREE') != '1':
reasons.append('HYDRA_HTM_STRICT_SCALE_FREE=1 required for scale-free HTM proof')
if env.get('HYDRA_FORCE_HTM_CPU') != '0':
reasons.append('HYDRA_FORCE_HTM_CPU=0 required; CPU fallback is forbidden for A10G proof')
if env.get('HYDRA_HTM_FUSED') != '1':
reasons.append('HYDRA_HTM_FUSED=1 required for faithful HTM GPU proof')
if env.get('HYDRA_HTM_BATCHED_FUSED') != '1':
reasons.append('HYDRA_HTM_BATCHED_FUSED=1 required for faithful HTM GPU proof')
region_pool = _int_env(env, 'HYDRA_HTM_REGION_POOL_SIZE')
chunk_b = _int_env(env, 'HYDRA_HTM_CHUNK_B')
if region_pool is None:
reasons.append('HYDRA_HTM_REGION_POOL_SIZE is required for bounded A10G proof')
elif region_pool > 4 and not diagnostic_override:
reasons.append('HYDRA_HTM_REGION_POOL_SIZE<=4 required unless FEATHER_HF_ALLOW_SCALE_FREE_DIAGNOSTIC_OVERRIDE=1')
if chunk_b is None:
reasons.append('HYDRA_HTM_CHUNK_B is required for bounded A10G proof')
elif region_pool is not None and chunk_b > region_pool:
reasons.append('HYDRA_HTM_CHUNK_B<=HYDRA_HTM_REGION_POOL_SIZE required for bounded A10G proof')
if env.get('HYDRA_TOKEN_CACHE_GB') != '0':
reasons.append('HYDRA_TOKEN_CACHE_GB=0 required; token cache/materialization is forbidden')
if env.get('HYDRA_DISABLE_TOKEN_CACHE') != '1':
reasons.append('HYDRA_DISABLE_TOKEN_CACHE=1 required; token cache/materialization is forbidden')
for key in (
'HYDRA_HTM_REGION_POOL_SIZE_FROM_VRAM',
'HYDRA_HTM_SCALE_TO_VRAM',
'HYDRA_VRAM_TOPOLOGY_SCALE',
'FEATHER_VRAM_TOPOLOGY_SCALE',
):
if str(env.get(key, '')).strip().lower() in {'1', 'true', 'yes', 'on'}:
reasons.append(f'{key} must be off; VRAM-derived topology scaling is forbidden')
return {
'scale_free_a10g_proof': proof_requested,
'valid': not reasons,
'reasons': reasons,
'diagnostic_override': diagnostic_override,
}
def _git_sha() -> str:
try:
return subprocess.run(
['git', 'rev-parse', '--short=12', 'HEAD'],
cwd=REPO_ROOT,
text=True,
capture_output=True,
check=True,
timeout=5,
).stdout.strip()
except Exception:
return 'unknown'
def build_dry_run_manifest(
*,
routing,
env: dict[str, str],
secondary_gates: dict,
fast_start_streaming: bool,
launch_guard: dict,
) -> dict:
"""Build an auditable no-submit manifest for HF/A10G launch review."""
runtime_profile = os.environ.get('FEATHER_HF_RUNTIME_PROFILE') or env.get('HYDRA_RUNTIME_PROFILE') or GPU_PROFILE
return {
'task_id': os.environ.get('HERMES_KANBAN_TASK', ''),
'run_id': os.environ.get('FEATHER_RUN_ID', 'dry-run'),
'git_sha': _git_sha(),
'hardware': {
'requested_flavor': REQUESTED_GPU_FLAVOR,
'flavor': GPU_FLAVOR,
'cuda_arch': HTM_CUDA_ARCH,
'torch_cuda_arch_list': TORCH_CUDA_ARCH,
},
'runtime_profile': runtime_profile,
'space_repo': routing.space_repo,
'output_repo': routing.output_repo,
'retina_cache_repo': routing.retina_cache_repo,
'image_mode': 'space' if USE_SPACE_IMAGE else 'ghcr',
'job_command': build_job_command(),
'target_shards': TARGET_SHARDS,
'time_budget': TIME_BUDGET,
'timeout': TIMEOUT,
'fast_start_streaming': fast_start_streaming,
'secondary_gates': secondary_gates,
'launch_guard': launch_guard,
'no_paid_launch_without_gate': True,
'paid_launch_confirmed': _truthy_env('FEATHER_HF_CONFIRM_PAID_LAUNCH'),
'duplicate_active_job_check': {'performed': False, 'reason': 'dry_run_no_hf_query'},
'receipts_required': {
'space_stage': 'verify before paid launch',
'duplicate_active_job_check': '0 active Feather A10G jobs before launch',
'strict_runtime_preflight': 'for optimal-strict: run FEATHER_HF_STRICT_RUNTIME_PREFLIGHT=1 and require torch_cuda/triton_driver/mamba/fused_sdr/htm_rust ok before train',
'htm_gpu': 'HTMRegionGpu=True and no CPU fallback for faithful rows',
'profile_forward': '0 for TPS rows; 1 only for attribution rows',
'graph_breaks': 'TORCH_LOGS=graph_breaks attached for compile probes',
'tps_window': 'median/p90/max after warmup',
'quality': 'MID_VAL or fresh_checkpoint_eval row with eval tokens/batch/corpus profile',
},
'env': dict(sorted(env.items())),
}
def maybe_write_dry_run_manifest(manifest: dict) -> None:
manifest_path = os.environ.get('FEATHER_HF_DRY_RUN_MANIFEST')
if not manifest_path:
print(f'[launch] dry-run manifest={json.dumps(manifest, sort_keys=True)}', flush=True)
return
path = Path(manifest_path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(manifest, indent=2, sort_keys=True) + '\n', encoding='utf-8')
print(f'[launch] dry-run manifest written: {path}', flush=True)
def main() -> int:
_configure_line_buffered_output()
print(f'[launch] phase=start dry_run={int(DRY_RUN)} use_space_image={int(USE_SPACE_IMAGE)} skip_upload={int(SKIP_UPLOAD)} sync_overlay={int(SYNC_OVERLAY)}', flush=True)
token, token_source = load_hf_token_with_source()
if not token:
raise SystemExit(
'HF token required: set HF_TOKEN/HUGGINGFACE_HUB_TOKEN or run `huggingface-cli login` '
'so ~/.cache/huggingface/token exists'
)
print(f'[launch] phase=token_loaded source={token_source}', flush=True)
routing = resolve_routing(token=token)
print('[launch] phase=routing_resolved', flush=True)
print('[launch] phase=api_init', flush=True)
api = HfApi(token=token)
secondary_gates = HarnessConfig().to_secondary_gates()
print(f'[launch] image_dir={IMAGE_DIR}', flush=True)
print(f'[launch] owner={routing.owner}', flush=True)
print(f'[launch] space_repo={routing.space_repo}', flush=True)
print(f'[launch] output_repo={routing.output_repo}', flush=True)
print(f'[launch] retina_cache_repo={routing.retina_cache_repo}', flush=True)
print(f'[launch] target_shards={TARGET_SHARDS} time_budget={TIME_BUDGET} timeout={TIMEOUT}', flush=True)
print(f'[launch] namespace={routing.job_namespace}', flush=True)
print(f'[launch] requested_flavor={REQUESTED_GPU_FLAVOR} effective_flavor={GPU_FLAVOR}', flush=True)
if REQUESTED_GPU_FLAVOR != GPU_FLAVOR:
print(
'[launch] A10-first policy: requested H200 but using '
f'{GPU_FLAVOR} instead (set FEATHER_HF_ALLOW_H200_EXPERIMENT=1 only for an explicit break-glass diagnostic)',
flush=True,
)
print(f'[launch] flavor={GPU_FLAVOR} profile={GPU_PROFILE} htm_cuda_arch={HTM_CUDA_ARCH} torch_cuda_arch={TORCH_CUDA_ARCH}', flush=True)
print(f'[launch] image_mode={"space" if USE_SPACE_IMAGE else "ghcr"}', flush=True)
print(f'[launch] secondary_gates={json.dumps(secondary_gates, sort_keys=True)}', flush=True)
if not USE_SPACE_IMAGE:
print(f'[launch] image={DEFAULT_IMAGE}', flush=True)
fast_start_streaming = should_enable_fast_start_streaming(TARGET_SHARDS, TIME_BUDGET)
if DRY_RUN:
if 'HYDRA_USE_NEMOTRON' not in os.environ and fast_start_streaming:
print('[launch] auto-enabled HYDRA_USE_NEMOTRON=1 for short-budget fast-start profile', flush=True)
if 'HYDRA_LOCAL_SHARDS_ONLY' not in os.environ and fast_start_streaming:
print('[launch] auto-enabled HYDRA_LOCAL_SHARDS_ONLY=0 for Nemotron streaming fast-start profile', flush=True)
dry_run_env: dict[str, str] = {}
runtime_profile = os.environ.get('FEATHER_HF_RUNTIME_PROFILE')
if runtime_profile == 'h200-compromise-telemetry':
print('[launch] deprecated profile h200-compromise-telemetry requested; applying A10 compromise telemetry defaults under A10-first policy', flush=True)
if runtime_profile == 'optimal-strict':
apply_optimal_env_profile(dry_run_env)
elif runtime_profile in {'a10-compromise-telemetry', 'h200-compromise-telemetry'}:
apply_a10_compromise_telemetry_profile(dry_run_env)
else:
apply_a10_env_profile(dry_run_env)
apply_caller_env_overrides(dry_run_env)
effective_runtime_profile = runtime_profile or dry_run_env.get('HYDRA_RUNTIME_PROFILE') or GPU_PROFILE
apply_scale_free_a10g_proof_defaults(
dry_run_env,
gpu_flavor=GPU_FLAVOR,
runtime_profile=effective_runtime_profile,
)
launch_guard = validate_scale_free_a10g_launch_env(
dry_run_env,
gpu_flavor=GPU_FLAVOR,
runtime_profile=effective_runtime_profile,
)
if not launch_guard['valid']:
raise SystemExit('[launch] scale-free A10G proof guard failed: ' + '; '.join(launch_guard['reasons']))
if launch_guard['scale_free_a10g_proof']:
print(f'[launch] scale-free A10G proof guard passed: {json.dumps(launch_guard, sort_keys=True)}', flush=True)
print(f'[launch] dry-run job_command={build_job_command()}', flush=True)
maybe_write_dry_run_manifest(
build_dry_run_manifest(
routing=routing,
env=dry_run_env,
secondary_gates=secondary_gates,
fast_start_streaming=fast_start_streaming,
launch_guard=launch_guard,
)
)
print('[launch] dry-run mode; skipping repo creation, upload, and job submission', flush=True)
return 0
api.create_repo(repo_id=routing.space_repo, repo_type='space', space_sdk='docker', private=SPACE_PRIVATE, exist_ok=True, token=token)
api.create_repo(repo_id=routing.output_repo, repo_type='model', private=OUTPUT_PRIVATE, exist_ok=True, token=token)
image_ref = DEFAULT_IMAGE
if USE_SPACE_IMAGE:
if SKIP_UPLOAD:
print('[launch] FEATHER_HF_SKIP_UPLOAD=1; reusing existing Space image', flush=True)
else:
if SYNC_OVERLAY:
sync_overlay_from_repo()
print('[launch] uploading custom Docker Space image context...', flush=True)
api.upload_folder(
repo_id=routing.space_repo,
repo_type='space',
folder_path=str(IMAGE_DIR),
commit_message=f'Update Feather {GPU_PROFILE} training runtime image',
ignore_patterns=[
'**/__pycache__/**',
'**/*.py[cod]',
'**/.pytest_cache/**',
'**/.mypy_cache/**',
'**/.ruff_cache/**',
'**/.venv/**',
'**/target/**',
'**/logs/**',
'**/*.log',
'**/*.out',
'**/*.pt',
'**/*.safetensors',
'**/*.parquet',
'**/*.npz',
'**/.git/**',
],
token=token,
)
print('[launch] waiting for Space image build to become ready...', flush=True)
wait_for_space(api, routing.space_repo)
image_ref = f'hf.co/spaces/{routing.space_repo}'
env = {
'HF_REPO_ID': routing.output_repo,
'FEATHER_HF_OWNER': routing.owner,
'FEATHER_HF_SPACE_REPO': routing.space_repo,
'FEATHER_HF_OUTPUT_REPO': routing.output_repo,
'FEATHER_HF_RETINA_CACHE_REPO': routing.retina_cache_repo,
'HYDRA_RETINA_CACHE_REPO': routing.retina_cache_repo,
'HYDRA_TARGET_SHARDS': TARGET_SHARDS,
'HYDRA_TIME_BUDGET': TIME_BUDGET,
'HYDRA_DOWNLOAD_WORKERS': DOWNLOAD_WORKERS,
'HYDRA_CKPT_INTERVAL': CKPT_INTERVAL,
'PYTHONUNBUFFERED': '1',
'FEATHER_RUNTIME_MODE': 'job',
'FEATHER_GPU_PROFILE': GPU_PROFILE,
'FEATHER_HF_FLAVOR': GPU_FLAVOR,
'HTM_CUDA_ARCH': HTM_CUDA_ARCH,
'TORCH_CUDA_ARCH_LIST': TORCH_CUDA_ARCH,
'TRITON_CACHE_DIR': f'/workspace/triton_cache/{GPU_PROFILE}',
'TRITON_CACHE_REPO': f'{routing.owner}/feather-triton-cache-{GPU_PROFILE}',
}
if 'HYDRA_USE_NEMOTRON' not in os.environ and fast_start_streaming:
env['HYDRA_USE_NEMOTRON'] = '1'
print('[launch] auto-enabled HYDRA_USE_NEMOTRON=1 for short-budget fast-start profile', flush=True)
if 'HYDRA_LOCAL_SHARDS_ONLY' not in os.environ and fast_start_streaming:
env['HYDRA_LOCAL_SHARDS_ONLY'] = '0'
print('[launch] auto-enabled HYDRA_LOCAL_SHARDS_ONLY=0 for Nemotron streaming fast-start profile', flush=True)
# A10 compatibility profile: avoid known PTX/compile runtime pitfalls and
# keep throughput path enabled. Caller can explicitly override each key by
# setting it in the parent environment.
runtime_profile = os.environ.get('FEATHER_HF_RUNTIME_PROFILE')
if runtime_profile == 'h200-compromise-telemetry':
print('[launch] deprecated profile h200-compromise-telemetry requested; applying A10 compromise telemetry defaults under A10-first policy', flush=True)
if runtime_profile == 'optimal-strict':
apply_optimal_env_profile(env)
elif runtime_profile in {'a10-compromise-telemetry', 'h200-compromise-telemetry'}:
apply_a10_compromise_telemetry_profile(env)
elif GPU_FLAVOR.startswith('a10'):
apply_a10_env_profile(env)
# Pass through any HYDRA_* / FEATHER_* overrides from the caller's env so
# sweep drivers can set HYDRA_N_LAYER, HYDRA_SDR_TARGET_ACTIVE,
# HYDRA_LAYER_DIAGNOSTICS, HYDRA_METRICS_OUT, HYDRA_MID_VAL_INTERVAL, etc.
# without needing launcher edits. Known keys above take precedence.
apply_caller_env_overrides(env)
effective_runtime_profile = runtime_profile or env.get('HYDRA_RUNTIME_PROFILE') or GPU_PROFILE
apply_scale_free_a10g_proof_defaults(
env,
gpu_flavor=GPU_FLAVOR,
runtime_profile=effective_runtime_profile,
)
launch_guard = validate_scale_free_a10g_launch_env(
env,
gpu_flavor=GPU_FLAVOR,
runtime_profile=effective_runtime_profile,
)
if not launch_guard['valid']:
raise SystemExit('[launch] scale-free A10G proof guard failed: ' + '; '.join(launch_guard['reasons']))
if launch_guard['scale_free_a10g_proof']:
print(f'[launch] scale-free A10G proof guard passed: {json.dumps(launch_guard, sort_keys=True)}', flush=True)
secrets = {'HF_TOKEN': token}
print(f'[launch] submitting HF Job on {GPU_FLAVOR} (single-GPU Feather path; A10G-large is 24GB VRAM / 12 vCPU / 46GB RAM)...', flush=True)
job_command = build_job_command()
if job_command != ['python', '/app/entrypoint.py']:
print(f'[launch] using custom HF job command: {job_command}', flush=True)
job = api.run_job(
image=image_ref,
command=job_command,
env=env,
secrets=secrets,
flavor=GPU_FLAVOR,
timeout=TIMEOUT,
namespace=routing.job_namespace,
token=token,
)
print(f'[launch] submitted job_id={job.id} status={job.status.stage} url={job.url}', flush=True)
return 0
if __name__ == '__main__':
raise SystemExit(main())