#!/usr/bin/env python3 from __future__ import annotations import json import os import shlex import shutil import subprocess import sys import time from pathlib import Path from huggingface_hub import HfApi REPO_ROOT = Path(__file__).resolve().parents[1] if str(REPO_ROOT) not in sys.path: sys.path.insert(0, str(REPO_ROOT)) from configs.harness_config import HarnessConfig from scripts.hf_routing import resolve_routing TARGET_SHARDS = os.environ.get('HYDRA_TARGET_SHARDS', '2048') TIME_BUDGET = os.environ.get('HYDRA_TIME_BUDGET', '43200') REQUESTED_GPU_FLAVOR = os.environ.get('FEATHER_HF_FLAVOR', 'a10g-large') GPU_ARCH_BY_FLAVOR = { 'a10g-small': ('sm_86', '8.6'), 'a10g-large': ('sm_86', '8.6'), 'a10g-largex2': ('sm_86', '8.6'), 'a10g-largex4': ('sm_86', '8.6'), 'a100-large': ('sm_80', '8.0'), 'a100x4': ('sm_80', '8.0'), 'a100x8': ('sm_80', '8.0'), 'h200': ('sm_90a', '9.0'), 'h200x2': ('sm_90a', '9.0'), 'h200x4': ('sm_90a', '9.0'), 'h200x8': ('sm_90a', '9.0'), } HF_NAMESPACE = os.environ.get('FEATHER_HF_NAMESPACE') DEFAULT_IMAGE = os.environ.get('FEATHER_HF_IMAGE', 'ghcr.io/slapglif/feather-hf-runtime:a10g-large') IMAGE_DIR = Path(__file__).resolve().parents[1] / 'hf_jobs' / 'feather_h200_image' TIMEOUT = os.environ.get('FEATHER_HF_JOB_TIMEOUT', '12h') SPACE_PRIVATE = os.environ.get('FEATHER_HF_SPACE_PRIVATE', '1') == '1' OUTPUT_PRIVATE = os.environ.get('FEATHER_HF_OUTPUT_PRIVATE', '1') == '1' DOWNLOAD_WORKERS = os.environ.get('HYDRA_DOWNLOAD_WORKERS', '16') CKPT_INTERVAL = os.environ.get('HYDRA_CKPT_INTERVAL', '1000') DRY_RUN = os.environ.get('FEATHER_HF_DRY_RUN', '0') == '1' USE_SPACE_IMAGE = os.environ.get('FEATHER_HF_USE_SPACE_IMAGE', '0') == '1' # When true, assume the Space image has already been built by a previous # invocation and skip the upload+build wait. Used by sweep drivers that fan # out many jobs against a single pre-uploaded image. SKIP_UPLOAD = os.environ.get('FEATHER_HF_SKIP_UPLOAD', '0') == '1' SYNC_OVERLAY = os.environ.get('FEATHER_HF_SYNC_OVERLAY', '1') == '1' def _truthy_env(name: str) -> bool: return os.environ.get(name, '0').strip().lower() in {'1', 'true', 'yes', 'on'} def should_enable_fast_start_streaming(target_shards: str, time_budget: str) -> bool: """Use streaming data path for short-budget launch profiles.""" try: shards = int(target_shards) budget = int(time_budget) except ValueError: return False return shards > 0 and shards <= 256 and budget > 0 and budget <= 1800 def resolve_effective_gpu_flavor(requested_flavor: str, target_shards: str, time_budget: str) -> str: """Keep HYDRA/Feather remote launches on A10 by default. H200 remains a break-glass diagnostic path, but normal training/canaries are now routed to A10-class GPUs. FEATHER_HF_ALLOW_H200_EXPERIMENT is intentionally separate from the older canary cost override so stale scripts cannot accidentally keep using H200. """ if requested_flavor.startswith('h200') and not _truthy_env('FEATHER_HF_ALLOW_H200_EXPERIMENT'): return os.environ.get('FEATHER_HF_A10_FLAVOR', os.environ.get('FEATHER_HF_CANARY_FLAVOR', 'a10g-large')) return requested_flavor GPU_FLAVOR = resolve_effective_gpu_flavor(REQUESTED_GPU_FLAVOR, TARGET_SHARDS, TIME_BUDGET) GPU_PROFILE = os.environ.get('FEATHER_GPU_PROFILE', GPU_FLAVOR) HTM_CUDA_ARCH, TORCH_CUDA_ARCH = GPU_ARCH_BY_FLAVOR.get(GPU_FLAVOR, ('sm_86', '8.6')) def sync_overlay_from_repo() -> None: """Refresh Space overlay with required project files.""" overlay = IMAGE_DIR / 'overlay' overlay.mkdir(parents=True, exist_ok=True) include_paths = [ 'hydra', 'subsystems', 'scripts', 'htm_rust', 'harness', 'configs', 'prepare.py', 'prepare_nemotron.py', 'train.py', 'pyproject.toml', 'uv.lock', ] ignore = shutil.ignore_patterns( '__pycache__', '.pytest_cache', '.ruff_cache', '.venv', '.git', 'target', '*.pyc', ) copied: list[str] = [] for rel in include_paths: src = REPO_ROOT / rel dst = overlay / rel if not src.exists(): continue preserve_overlay_dir = rel == 'htm_rust' and (dst / 'src' / 'gpu' / 'mod.rs').exists() if dst.exists() and not preserve_overlay_dir: if dst.is_dir(): shutil.rmtree(dst) else: dst.unlink() if src.is_dir(): # htm_rust is currently overlay-extended: repo-root lacks the full GPU # backend module set, while the HF overlay carries mod.rs/sp_gpu/tm_gpu # and auxiliary kernels required for --features gpu. Merge rather than # delete it, otherwise a fresh no-cache rebuild silently drops the # step_batch_fused_cuda Python export. shutil.copytree(src, dst, dirs_exist_ok=True, ignore=ignore) else: dst.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(src, dst) copied.append(rel) scripts_dir = overlay / 'scripts' if scripts_dir.exists(): for sh_path in scripts_dir.rglob('*.sh'): data = sh_path.read_bytes() data = data.replace(b'\r\n', b'\n').replace(b'\r', b'\n') sh_path.write_bytes(data) print(f'[launch] overlay synced from repo ({len(copied)} paths): {copied}', flush=True) def load_hf_token() -> str | None: """Load a Hugging Face token without printing or persisting secret values.""" token, _source = load_hf_token_with_source() return token def build_job_command() -> list[str]: """Return HF Jobs command, optionally overridden for diagnostics.""" override = os.environ.get('FEATHER_HF_JOB_COMMAND') if override: return shlex.split(override) if _truthy_env('FEATHER_HF_BOOT_SMOKE') or _truthy_env('FEATHER_HF_STRICT_RUNTIME_PREFLIGHT'): return ['python', '/app/scripts/hf_boot_smoke.py'] if _truthy_env('FEATHER_HF_CHECKPOINT_EVAL'): return ['python', '/app/scripts/hf_checkpoint_eval.py'] return ['python', '/app/entrypoint.py'] def load_hf_token_with_source() -> tuple[str | None, str]: """Load a Hugging Face token and return a non-secret source label.""" for env_name in ('HF_TOKEN', 'HUGGINGFACE_HUB_TOKEN'): token = os.environ.get(env_name) if token: return token, 'provided' token_file = Path(os.environ.get('HF_TOKEN_PATH', Path.home() / '.cache' / 'huggingface' / 'token')).expanduser() try: token = token_file.read_text(encoding='utf-8').strip() except FileNotFoundError: return None, 'missing' except OSError: return None, 'unreadable' return (token, 'token_file') if token else (None, 'empty_file') def require_token() -> str: token, _source = load_hf_token_with_source() if not token: raise SystemExit( 'HF token required: set HF_TOKEN/HUGGINGFACE_HUB_TOKEN or run `huggingface-cli login` ' 'so ~/.cache/huggingface/token exists' ) return token def wait_for_space(api: HfApi, repo_id: str, timeout_s: int = 1800) -> None: start = time.time() seen_build_completion = False seen_building = False while True: runtime = api.get_space_runtime(repo_id, token=load_hf_token()) stage = getattr(runtime, 'stage', None) hardware = getattr(runtime, 'hardware', None) print(f'[space] stage={stage} hardware={hardware}', flush=True) if stage == 'BUILDING': seen_building = True if stage in {'APP_STARTING', 'RUNNING', 'PAUSED', 'SLEEPING'}: seen_build_completion = True if stage in {'RUNNING', 'PAUSED', 'SLEEPING'}: return # Image is built — Jobs can use it regardless of Space boot outcome. # If we enter while the Space is already in RUNTIME_ERROR from a prior # successful build, we may not observe APP_STARTING in this process; do # not spin forever. This is the normal public-Space image-builder state. if (seen_build_completion or seen_building) and stage in {'RUNTIME_ERROR', 'APP_STARTING_ERROR'}: print(f'[space] Space boot failed with {stage} but built image is ' f'available in the Space registry and is usable by HF Jobs.', flush=True) return # Hard build failures — no image was produced. if stage in {'BUILD_ERROR', 'CONFIG_ERROR', 'NO_APP_FILE'}: raise RuntimeError(f'Space {repo_id} build failed: stage={stage}') if time.time() - start > timeout_s: raise TimeoutError(f'Space {repo_id} did not become ready in {timeout_s}s (last stage={stage})') time.sleep(20) def _configure_line_buffered_output(stdout=sys.stdout, stderr=sys.stderr) -> None: """Make launch progress visible immediately when stdout/stderr are pipes.""" for stream in (stdout, stderr): reconfigure = getattr(stream, 'reconfigure', None) if reconfigure is None: continue try: reconfigure(line_buffering=True) except (TypeError, ValueError): # Some wrapped streams do not support reconfigure at runtime. pass def apply_optimal_env_profile(env: dict[str, str]) -> None: """Apply full-component optimal runtime defaults unless caller supplied overrides.""" _optimal_defaults = { 'HYDRA_RUNTIME_PROFILE': 'optimal-strict', 'HYDRA_STRICT_OPTIMAL_COMPONENTS': '1', 'HYDRA_FORCE_HTM_CPU': '0', 'HYDRA_HTM_FUSED': '1', 'HYDRA_HTM_BATCHED_FUSED': '1', 'HYDRA_DISABLE_FUSED_SDR_TRITON': '0', # Empty layer override means every layer remains on the intended # Mamba3 backbone instead of a Hyena/GDN fallback/substitution. 'HYDRA_HYENA_LAYERS': '', 'HYDRA_GDN_LAYERS': '', 'HYDRA_TOKEN_CACHE_GB': '0', 'HYDRA_DISABLE_TOKEN_CACHE': '1', } for _k, _default in _optimal_defaults.items(): if _k in os.environ: env[_k] = os.environ[_k] else: env.setdefault(_k, _default) print( '[launch] applied optimal runtime profile ' f"(HYDRA_RUNTIME_PROFILE={env['HYDRA_RUNTIME_PROFILE']}, " f"HYDRA_STRICT_OPTIMAL_COMPONENTS={env['HYDRA_STRICT_OPTIMAL_COMPONENTS']}, " f"HYDRA_FORCE_HTM_CPU={env['HYDRA_FORCE_HTM_CPU']}, " f"HYDRA_HTM_FUSED={env['HYDRA_HTM_FUSED']}, " f"HYDRA_HTM_BATCHED_FUSED={env['HYDRA_HTM_BATCHED_FUSED']}, " f"HYDRA_DISABLE_FUSED_SDR_TRITON={env['HYDRA_DISABLE_FUSED_SDR_TRITON']}, " f"HYDRA_HYENA_LAYERS={env['HYDRA_HYENA_LAYERS']}, " f"HYDRA_GDN_LAYERS={env['HYDRA_GDN_LAYERS']})", flush=True, ) def apply_a10_compromise_telemetry_profile(env: dict[str, str]) -> None: """Apply A10-friendly compromise telemetry defaults. This keeps the stable all-Hyena/non-fused HTM/fused-SDR-disabled runtime used after the fused HTM blocker, but routes work to A10-class GPUs instead of H200. It is intentionally not the full optimal architecture. """ _a10_compromise_defaults = { 'HYDRA_BATCH_SIZE': '16', 'HYDRA_TOTAL_BATCH': '32768', 'HYDRA_INERT_MAMBA': '1', 'HYDRA_HYENA_LAYERS': '0,1,2,3', 'HYDRA_DISABLE_FUSED_SDR_TRITON': '1', 'HYDRA_HTM_FUSED': '0', 'HYDRA_HTM_BATCHED_FUSED': '0', 'HYDRA_HTM_SUBSAMPLE': '128', # Standardize non-corpus ablations/evals on the full Nemotron blend so # only the intended architecture/runtime parameter varies between runs. # Explicit caller env can still override for corpus/data-path ablations. 'HYDRA_USE_FULL_BLEND': '1', 'HYDRA_NEMOTRON_SINGLE_CONFIG': '', 'HYDRA_LOCAL_SHARDS_ONLY': '0', 'HYDRA_USE_NEMOTRON': '1', 'HYDRA_STREAM_PREFETCH': '64', 'HYDRA_STREAM_SHUFFLE_BUFFER': '16', # Full-blend mode can otherwise keep downloading large background shards # after a short canary hits its time budget, producing HF job ERRORs # without useful metrics/checkpoint finalization. 'HYDRA_BACKGROUND_PREFETCH': '0', 'HYDRA_HYENA_FILTER_CACHE': '1', 'HYDRA_HYENA_TRAIN_CACHE': '1', # A10 validation runs close to the memory cliff. Avoid Muon # torch.compile/Inductor scratch state and keep final eval at the # smallest batch unless the caller deliberately opts into a larger eval. 'HYDRA_MUON_COMPILE': '0', 'HYDRA_EVAL_BATCH': '1', 'PYTORCH_ALLOC_CONF': 'expandable_segments:True', 'HYDRA_MID_VAL_INTERVAL': '0', # Keep bounded A10 canaries from tripping mid-run checkpoint/image-drift # failures before they have emitted validation telemetry. Caller env can # still opt back into periodic checkpoints for longer runs. 'HYDRA_CKPT_INTERVAL': '0', 'HYDRA_EVAL_TOKENS': '1000000', 'HYDRA_TOKEN_CACHE_GB': '0', 'HYDRA_DISABLE_TOKEN_CACHE': '1', } for _k, _default in _a10_compromise_defaults.items(): if _k in os.environ: env[_k] = os.environ[_k] else: env[_k] = _default print( '[launch] applied A10 compromise telemetry profile ' f"(HYDRA_BATCH_SIZE={env['HYDRA_BATCH_SIZE']}, " f"HYDRA_TOTAL_BATCH={env['HYDRA_TOTAL_BATCH']}, " f"HYDRA_INERT_MAMBA={env['HYDRA_INERT_MAMBA']}, " f"HYDRA_HYENA_LAYERS={env['HYDRA_HYENA_LAYERS']}, " f"HYDRA_DISABLE_FUSED_SDR_TRITON={env['HYDRA_DISABLE_FUSED_SDR_TRITON']}, " f"HYDRA_HTM_FUSED={env['HYDRA_HTM_FUSED']}, " f"HYDRA_HTM_BATCHED_FUSED={env['HYDRA_HTM_BATCHED_FUSED']}, " f"HYDRA_HTM_SUBSAMPLE={env['HYDRA_HTM_SUBSAMPLE']}, " f"HYDRA_USE_FULL_BLEND={env['HYDRA_USE_FULL_BLEND']}, " f"HYDRA_NEMOTRON_SINGLE_CONFIG={env['HYDRA_NEMOTRON_SINGLE_CONFIG']}, " f"HYDRA_STREAM_PREFETCH={env['HYDRA_STREAM_PREFETCH']}, " f"HYDRA_STREAM_SHUFFLE_BUFFER={env['HYDRA_STREAM_SHUFFLE_BUFFER']}, " f"HYDRA_BACKGROUND_PREFETCH={env['HYDRA_BACKGROUND_PREFETCH']}, " f"HYDRA_MUON_COMPILE={env['HYDRA_MUON_COMPILE']}, " f"HYDRA_EVAL_BATCH={env['HYDRA_EVAL_BATCH']}, " f"HYDRA_CKPT_INTERVAL={env['HYDRA_CKPT_INTERVAL']}, " f"HYDRA_EVAL_TOKENS={env['HYDRA_EVAL_TOKENS']})", flush=True, ) def apply_a10_env_profile(env: dict[str, str]) -> None: """Apply operational A10 canary defaults unless caller supplied overrides.""" if not GPU_FLAVOR.startswith('a10'): return _a10_defaults = { 'HYDRA_MUON_COMPILE': '0', 'HYDRA_FORCE_HTM_CPU': '1', 'HYDRA_INERT_MAMBA': '1', 'HYDRA_HYENA_LAYERS': '0,1,2,3', 'HYDRA_DISABLE_FUSED_SDR_TRITON': '1', 'HYDRA_ALLOW_SYNTHETIC_RETINA': '1', 'HYDRA_FASTPATH': '1', 'HYDRA_TOKEN_CACHE_GB': '0', 'HYDRA_DISABLE_TOKEN_CACHE': '1', } for _k, _default in _a10_defaults.items(): if _k in os.environ: env[_k] = os.environ[_k] else: env.setdefault(_k, _default) if env.get('HYDRA_INERT_MAMBA') == '0' and 'HYDRA_FASTPATH' not in os.environ: env['HYDRA_FASTPATH'] = '0' print( '[launch] applied A10 env profile ' f"(HYDRA_MUON_COMPILE={env['HYDRA_MUON_COMPILE']}, " f"HYDRA_FORCE_HTM_CPU={env['HYDRA_FORCE_HTM_CPU']}, " f"HYDRA_INERT_MAMBA={env['HYDRA_INERT_MAMBA']}, " f"HYDRA_HYENA_LAYERS={env['HYDRA_HYENA_LAYERS']}, " f"HYDRA_DISABLE_FUSED_SDR_TRITON={env['HYDRA_DISABLE_FUSED_SDR_TRITON']}, " f"HYDRA_ALLOW_SYNTHETIC_RETINA={env['HYDRA_ALLOW_SYNTHETIC_RETINA']}, " f"HYDRA_FASTPATH={env['HYDRA_FASTPATH']})", flush=True, ) def apply_caller_env_overrides(env: dict[str, str]) -> None: """Pass through caller HYDRA_*/FEATHER_* launch controls into a job env.""" for _k, _v in os.environ.items(): if (_k.startswith('HYDRA_') or _k.startswith('FEATHER_')) and _k not in env: env[_k] = _v def _int_env(env: dict[str, str], key: str) -> int | None: value = env.get(key) if value in (None, ''): return None try: return int(str(value)) except (TypeError, ValueError): return None def apply_scale_free_a10g_proof_defaults(env: dict[str, str], *, gpu_flavor: str, runtime_profile: str | None) -> None: """Convert generic A10 defaults to faithful bounded HTM defaults when proof mode is requested.""" profile = (runtime_profile or '').strip().lower() proof_requested = gpu_flavor.startswith('a10') and ( _truthy_env('FEATHER_HF_SCALE_FREE_PROOF') or env.get('HYDRA_HTM_STRICT_SCALE_FREE') == '1' or profile in {'optimal-strict', 'a10g-scale-free-proof'} ) if not proof_requested: return proof_defaults = { 'HYDRA_FORCE_HTM_CPU': '0', 'HYDRA_HTM_FUSED': '1', 'HYDRA_HTM_BATCHED_FUSED': '1', 'HYDRA_TOKEN_CACHE_GB': '0', 'HYDRA_DISABLE_TOKEN_CACHE': '1', } for key, value in proof_defaults.items(): if key not in os.environ: env[key] = value def validate_scale_free_a10g_launch_env( env: dict[str, str], *, gpu_flavor: str, runtime_profile: str | None, ) -> dict: """Fail-closed guard for bounded A10G scale-free HTM proof launches.""" profile = (runtime_profile or '').strip().lower() proof_requested = gpu_flavor.startswith('a10') and ( _truthy_env('FEATHER_HF_SCALE_FREE_PROOF') or env.get('HYDRA_HTM_STRICT_SCALE_FREE') == '1' or profile in {'optimal-strict', 'a10g-scale-free-proof'} ) diagnostic_override = _truthy_env('FEATHER_HF_ALLOW_SCALE_FREE_DIAGNOSTIC_OVERRIDE') reasons: list[str] = [] if proof_requested: if env.get('HYDRA_TARGET_SHARDS') != '0': reasons.append('HYDRA_TARGET_SHARDS=0 required for streaming/no-materialized-shard A10G proof') if env.get('HYDRA_HTM_STRICT_SCALE_FREE') != '1': reasons.append('HYDRA_HTM_STRICT_SCALE_FREE=1 required for scale-free HTM proof') if env.get('HYDRA_FORCE_HTM_CPU') != '0': reasons.append('HYDRA_FORCE_HTM_CPU=0 required; CPU fallback is forbidden for A10G proof') if env.get('HYDRA_HTM_FUSED') != '1': reasons.append('HYDRA_HTM_FUSED=1 required for faithful HTM GPU proof') if env.get('HYDRA_HTM_BATCHED_FUSED') != '1': reasons.append('HYDRA_HTM_BATCHED_FUSED=1 required for faithful HTM GPU proof') region_pool = _int_env(env, 'HYDRA_HTM_REGION_POOL_SIZE') chunk_b = _int_env(env, 'HYDRA_HTM_CHUNK_B') if region_pool is None: reasons.append('HYDRA_HTM_REGION_POOL_SIZE is required for bounded A10G proof') elif region_pool > 4 and not diagnostic_override: reasons.append('HYDRA_HTM_REGION_POOL_SIZE<=4 required unless FEATHER_HF_ALLOW_SCALE_FREE_DIAGNOSTIC_OVERRIDE=1') if chunk_b is None: reasons.append('HYDRA_HTM_CHUNK_B is required for bounded A10G proof') elif region_pool is not None and chunk_b > region_pool: reasons.append('HYDRA_HTM_CHUNK_B<=HYDRA_HTM_REGION_POOL_SIZE required for bounded A10G proof') if env.get('HYDRA_TOKEN_CACHE_GB') != '0': reasons.append('HYDRA_TOKEN_CACHE_GB=0 required; token cache/materialization is forbidden') if env.get('HYDRA_DISABLE_TOKEN_CACHE') != '1': reasons.append('HYDRA_DISABLE_TOKEN_CACHE=1 required; token cache/materialization is forbidden') for key in ( 'HYDRA_HTM_REGION_POOL_SIZE_FROM_VRAM', 'HYDRA_HTM_SCALE_TO_VRAM', 'HYDRA_VRAM_TOPOLOGY_SCALE', 'FEATHER_VRAM_TOPOLOGY_SCALE', ): if str(env.get(key, '')).strip().lower() in {'1', 'true', 'yes', 'on'}: reasons.append(f'{key} must be off; VRAM-derived topology scaling is forbidden') return { 'scale_free_a10g_proof': proof_requested, 'valid': not reasons, 'reasons': reasons, 'diagnostic_override': diagnostic_override, } def _git_sha() -> str: try: return subprocess.run( ['git', 'rev-parse', '--short=12', 'HEAD'], cwd=REPO_ROOT, text=True, capture_output=True, check=True, timeout=5, ).stdout.strip() except Exception: return 'unknown' def build_dry_run_manifest( *, routing, env: dict[str, str], secondary_gates: dict, fast_start_streaming: bool, launch_guard: dict, ) -> dict: """Build an auditable no-submit manifest for HF/A10G launch review.""" runtime_profile = os.environ.get('FEATHER_HF_RUNTIME_PROFILE') or env.get('HYDRA_RUNTIME_PROFILE') or GPU_PROFILE return { 'task_id': os.environ.get('HERMES_KANBAN_TASK', ''), 'run_id': os.environ.get('FEATHER_RUN_ID', 'dry-run'), 'git_sha': _git_sha(), 'hardware': { 'requested_flavor': REQUESTED_GPU_FLAVOR, 'flavor': GPU_FLAVOR, 'cuda_arch': HTM_CUDA_ARCH, 'torch_cuda_arch_list': TORCH_CUDA_ARCH, }, 'runtime_profile': runtime_profile, 'space_repo': routing.space_repo, 'output_repo': routing.output_repo, 'retina_cache_repo': routing.retina_cache_repo, 'image_mode': 'space' if USE_SPACE_IMAGE else 'ghcr', 'job_command': build_job_command(), 'target_shards': TARGET_SHARDS, 'time_budget': TIME_BUDGET, 'timeout': TIMEOUT, 'fast_start_streaming': fast_start_streaming, 'secondary_gates': secondary_gates, 'launch_guard': launch_guard, 'no_paid_launch_without_gate': True, 'paid_launch_confirmed': _truthy_env('FEATHER_HF_CONFIRM_PAID_LAUNCH'), 'duplicate_active_job_check': {'performed': False, 'reason': 'dry_run_no_hf_query'}, 'receipts_required': { 'space_stage': 'verify before paid launch', 'duplicate_active_job_check': '0 active Feather A10G jobs before launch', 'strict_runtime_preflight': 'for optimal-strict: run FEATHER_HF_STRICT_RUNTIME_PREFLIGHT=1 and require torch_cuda/triton_driver/mamba/fused_sdr/htm_rust ok before train', 'htm_gpu': 'HTMRegionGpu=True and no CPU fallback for faithful rows', 'profile_forward': '0 for TPS rows; 1 only for attribution rows', 'graph_breaks': 'TORCH_LOGS=graph_breaks attached for compile probes', 'tps_window': 'median/p90/max after warmup', 'quality': 'MID_VAL or fresh_checkpoint_eval row with eval tokens/batch/corpus profile', }, 'env': dict(sorted(env.items())), } def maybe_write_dry_run_manifest(manifest: dict) -> None: manifest_path = os.environ.get('FEATHER_HF_DRY_RUN_MANIFEST') if not manifest_path: print(f'[launch] dry-run manifest={json.dumps(manifest, sort_keys=True)}', flush=True) return path = Path(manifest_path) path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(manifest, indent=2, sort_keys=True) + '\n', encoding='utf-8') print(f'[launch] dry-run manifest written: {path}', flush=True) def main() -> int: _configure_line_buffered_output() print(f'[launch] phase=start dry_run={int(DRY_RUN)} use_space_image={int(USE_SPACE_IMAGE)} skip_upload={int(SKIP_UPLOAD)} sync_overlay={int(SYNC_OVERLAY)}', flush=True) token, token_source = load_hf_token_with_source() if not token: raise SystemExit( 'HF token required: set HF_TOKEN/HUGGINGFACE_HUB_TOKEN or run `huggingface-cli login` ' 'so ~/.cache/huggingface/token exists' ) print(f'[launch] phase=token_loaded source={token_source}', flush=True) routing = resolve_routing(token=token) print('[launch] phase=routing_resolved', flush=True) print('[launch] phase=api_init', flush=True) api = HfApi(token=token) secondary_gates = HarnessConfig().to_secondary_gates() print(f'[launch] image_dir={IMAGE_DIR}', flush=True) print(f'[launch] owner={routing.owner}', flush=True) print(f'[launch] space_repo={routing.space_repo}', flush=True) print(f'[launch] output_repo={routing.output_repo}', flush=True) print(f'[launch] retina_cache_repo={routing.retina_cache_repo}', flush=True) print(f'[launch] target_shards={TARGET_SHARDS} time_budget={TIME_BUDGET} timeout={TIMEOUT}', flush=True) print(f'[launch] namespace={routing.job_namespace}', flush=True) print(f'[launch] requested_flavor={REQUESTED_GPU_FLAVOR} effective_flavor={GPU_FLAVOR}', flush=True) if REQUESTED_GPU_FLAVOR != GPU_FLAVOR: print( '[launch] A10-first policy: requested H200 but using ' f'{GPU_FLAVOR} instead (set FEATHER_HF_ALLOW_H200_EXPERIMENT=1 only for an explicit break-glass diagnostic)', flush=True, ) print(f'[launch] flavor={GPU_FLAVOR} profile={GPU_PROFILE} htm_cuda_arch={HTM_CUDA_ARCH} torch_cuda_arch={TORCH_CUDA_ARCH}', flush=True) print(f'[launch] image_mode={"space" if USE_SPACE_IMAGE else "ghcr"}', flush=True) print(f'[launch] secondary_gates={json.dumps(secondary_gates, sort_keys=True)}', flush=True) if not USE_SPACE_IMAGE: print(f'[launch] image={DEFAULT_IMAGE}', flush=True) fast_start_streaming = should_enable_fast_start_streaming(TARGET_SHARDS, TIME_BUDGET) if DRY_RUN: if 'HYDRA_USE_NEMOTRON' not in os.environ and fast_start_streaming: print('[launch] auto-enabled HYDRA_USE_NEMOTRON=1 for short-budget fast-start profile', flush=True) if 'HYDRA_LOCAL_SHARDS_ONLY' not in os.environ and fast_start_streaming: print('[launch] auto-enabled HYDRA_LOCAL_SHARDS_ONLY=0 for Nemotron streaming fast-start profile', flush=True) dry_run_env: dict[str, str] = {} runtime_profile = os.environ.get('FEATHER_HF_RUNTIME_PROFILE') if runtime_profile == 'h200-compromise-telemetry': print('[launch] deprecated profile h200-compromise-telemetry requested; applying A10 compromise telemetry defaults under A10-first policy', flush=True) if runtime_profile == 'optimal-strict': apply_optimal_env_profile(dry_run_env) elif runtime_profile in {'a10-compromise-telemetry', 'h200-compromise-telemetry'}: apply_a10_compromise_telemetry_profile(dry_run_env) else: apply_a10_env_profile(dry_run_env) apply_caller_env_overrides(dry_run_env) effective_runtime_profile = runtime_profile or dry_run_env.get('HYDRA_RUNTIME_PROFILE') or GPU_PROFILE apply_scale_free_a10g_proof_defaults( dry_run_env, gpu_flavor=GPU_FLAVOR, runtime_profile=effective_runtime_profile, ) launch_guard = validate_scale_free_a10g_launch_env( dry_run_env, gpu_flavor=GPU_FLAVOR, runtime_profile=effective_runtime_profile, ) if not launch_guard['valid']: raise SystemExit('[launch] scale-free A10G proof guard failed: ' + '; '.join(launch_guard['reasons'])) if launch_guard['scale_free_a10g_proof']: print(f'[launch] scale-free A10G proof guard passed: {json.dumps(launch_guard, sort_keys=True)}', flush=True) print(f'[launch] dry-run job_command={build_job_command()}', flush=True) maybe_write_dry_run_manifest( build_dry_run_manifest( routing=routing, env=dry_run_env, secondary_gates=secondary_gates, fast_start_streaming=fast_start_streaming, launch_guard=launch_guard, ) ) print('[launch] dry-run mode; skipping repo creation, upload, and job submission', flush=True) return 0 api.create_repo(repo_id=routing.space_repo, repo_type='space', space_sdk='docker', private=SPACE_PRIVATE, exist_ok=True, token=token) api.create_repo(repo_id=routing.output_repo, repo_type='model', private=OUTPUT_PRIVATE, exist_ok=True, token=token) image_ref = DEFAULT_IMAGE if USE_SPACE_IMAGE: if SKIP_UPLOAD: print('[launch] FEATHER_HF_SKIP_UPLOAD=1; reusing existing Space image', flush=True) else: if SYNC_OVERLAY: sync_overlay_from_repo() print('[launch] uploading custom Docker Space image context...', flush=True) api.upload_folder( repo_id=routing.space_repo, repo_type='space', folder_path=str(IMAGE_DIR), commit_message=f'Update Feather {GPU_PROFILE} training runtime image', ignore_patterns=[ '**/__pycache__/**', '**/*.py[cod]', '**/.pytest_cache/**', '**/.mypy_cache/**', '**/.ruff_cache/**', '**/.venv/**', '**/target/**', '**/logs/**', '**/*.log', '**/*.out', '**/*.pt', '**/*.safetensors', '**/*.parquet', '**/*.npz', '**/.git/**', ], token=token, ) print('[launch] waiting for Space image build to become ready...', flush=True) wait_for_space(api, routing.space_repo) image_ref = f'hf.co/spaces/{routing.space_repo}' env = { 'HF_REPO_ID': routing.output_repo, 'FEATHER_HF_OWNER': routing.owner, 'FEATHER_HF_SPACE_REPO': routing.space_repo, 'FEATHER_HF_OUTPUT_REPO': routing.output_repo, 'FEATHER_HF_RETINA_CACHE_REPO': routing.retina_cache_repo, 'HYDRA_RETINA_CACHE_REPO': routing.retina_cache_repo, 'HYDRA_TARGET_SHARDS': TARGET_SHARDS, 'HYDRA_TIME_BUDGET': TIME_BUDGET, 'HYDRA_DOWNLOAD_WORKERS': DOWNLOAD_WORKERS, 'HYDRA_CKPT_INTERVAL': CKPT_INTERVAL, 'PYTHONUNBUFFERED': '1', 'FEATHER_RUNTIME_MODE': 'job', 'FEATHER_GPU_PROFILE': GPU_PROFILE, 'FEATHER_HF_FLAVOR': GPU_FLAVOR, 'HTM_CUDA_ARCH': HTM_CUDA_ARCH, 'TORCH_CUDA_ARCH_LIST': TORCH_CUDA_ARCH, 'TRITON_CACHE_DIR': f'/workspace/triton_cache/{GPU_PROFILE}', 'TRITON_CACHE_REPO': f'{routing.owner}/feather-triton-cache-{GPU_PROFILE}', } if 'HYDRA_USE_NEMOTRON' not in os.environ and fast_start_streaming: env['HYDRA_USE_NEMOTRON'] = '1' print('[launch] auto-enabled HYDRA_USE_NEMOTRON=1 for short-budget fast-start profile', flush=True) if 'HYDRA_LOCAL_SHARDS_ONLY' not in os.environ and fast_start_streaming: env['HYDRA_LOCAL_SHARDS_ONLY'] = '0' print('[launch] auto-enabled HYDRA_LOCAL_SHARDS_ONLY=0 for Nemotron streaming fast-start profile', flush=True) # A10 compatibility profile: avoid known PTX/compile runtime pitfalls and # keep throughput path enabled. Caller can explicitly override each key by # setting it in the parent environment. runtime_profile = os.environ.get('FEATHER_HF_RUNTIME_PROFILE') if runtime_profile == 'h200-compromise-telemetry': print('[launch] deprecated profile h200-compromise-telemetry requested; applying A10 compromise telemetry defaults under A10-first policy', flush=True) if runtime_profile == 'optimal-strict': apply_optimal_env_profile(env) elif runtime_profile in {'a10-compromise-telemetry', 'h200-compromise-telemetry'}: apply_a10_compromise_telemetry_profile(env) elif GPU_FLAVOR.startswith('a10'): apply_a10_env_profile(env) # Pass through any HYDRA_* / FEATHER_* overrides from the caller's env so # sweep drivers can set HYDRA_N_LAYER, HYDRA_SDR_TARGET_ACTIVE, # HYDRA_LAYER_DIAGNOSTICS, HYDRA_METRICS_OUT, HYDRA_MID_VAL_INTERVAL, etc. # without needing launcher edits. Known keys above take precedence. apply_caller_env_overrides(env) effective_runtime_profile = runtime_profile or env.get('HYDRA_RUNTIME_PROFILE') or GPU_PROFILE apply_scale_free_a10g_proof_defaults( env, gpu_flavor=GPU_FLAVOR, runtime_profile=effective_runtime_profile, ) launch_guard = validate_scale_free_a10g_launch_env( env, gpu_flavor=GPU_FLAVOR, runtime_profile=effective_runtime_profile, ) if not launch_guard['valid']: raise SystemExit('[launch] scale-free A10G proof guard failed: ' + '; '.join(launch_guard['reasons'])) if launch_guard['scale_free_a10g_proof']: print(f'[launch] scale-free A10G proof guard passed: {json.dumps(launch_guard, sort_keys=True)}', flush=True) secrets = {'HF_TOKEN': token} print(f'[launch] submitting HF Job on {GPU_FLAVOR} (single-GPU Feather path; A10G-large is 24GB VRAM / 12 vCPU / 46GB RAM)...', flush=True) job_command = build_job_command() if job_command != ['python', '/app/entrypoint.py']: print(f'[launch] using custom HF job command: {job_command}', flush=True) job = api.run_job( image=image_ref, command=job_command, env=env, secrets=secrets, flavor=GPU_FLAVOR, timeout=TIMEOUT, namespace=routing.job_namespace, token=token, ) print(f'[launch] submitted job_id={job.id} status={job.status.stage} url={job.url}', flush=True) return 0 if __name__ == '__main__': raise SystemExit(main())