icarus112's picture
Initial public Feather runtime image for HF Jobs
e2fabcd verified
#!/usr/bin/env python3
from __future__ import annotations
import json
import os
import subprocess
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
from threading import Thread
from huggingface_hub import HfApi
REPO_ROOT = Path('/workspace/feather')
CACHE_ROOT = Path.home() / '.cache' / 'autoresearch'
LOG_FILE = REPO_ROOT / 'run_domain_expanded.log'
JOB_ID = os.environ.get('JOB_ID', 'local-job')
OUTPUT_REPO = os.environ.get('HF_REPO_ID', 'icarus112/feather-pretrain-checkpoints')
TOKEN = os.environ.get('HF_TOKEN')
RUNTIME_MODE = os.environ.get('FEATHER_RUNTIME_MODE', 'space')
APP_PORT = int(os.environ.get('PORT', '7860'))
class _HealthHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path in ('/', '/health', '/healthz', '/ready'):
payload = {
'status': 'ok',
'mode': RUNTIME_MODE,
'job_id': JOB_ID,
}
body = json.dumps(payload).encode('utf-8')
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Content-Length', str(len(body)))
self.end_headers()
self.wfile.write(body)
return
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
return
def _start_health_server() -> HTTPServer:
server = HTTPServer(('0.0.0.0', APP_PORT), _HealthHandler)
thread = Thread(target=server.serve_forever, daemon=True)
thread.start()
print(f'[space] health server listening on 0.0.0.0:{APP_PORT}', flush=True)
return server
def upload_artifact(api: HfApi, path: Path, dest: str) -> None:
if not path.exists():
print(f'[upload] skip missing {path}', flush=True)
return
api.upload_file(
path_or_fileobj=str(path),
path_in_repo=dest,
repo_id=OUTPUT_REPO,
repo_type='model',
)
print(f'[upload] uploaded {path} -> {OUTPUT_REPO}/{dest}', flush=True)
def run_job_mode() -> int:
os.chdir(REPO_ROOT)
os.environ.setdefault('HYDRA_TIME_BUDGET', '43200')
os.environ.setdefault('HYDRA_TARGET_SHARDS', '2048')
os.environ.setdefault('HYDRA_DOWNLOAD_WORKERS', '16')
os.environ.setdefault('HYDRA_CKPT_INTERVAL', '1000')
os.environ.setdefault('HYDRA_RESUME_CKPT', str(CACHE_ROOT / 'latest.pt'))
cmd = [
'bash',
'./scripts/run_domain_expanded_pretrain.sh',
'--target-shards', os.environ['HYDRA_TARGET_SHARDS'],
'--download-workers', os.environ['HYDRA_DOWNLOAD_WORKERS'],
]
print('[job] starting Feather domain-expanded pretrain', flush=True)
print(f'[job] command={cmd}', flush=True)
proc = subprocess.run(cmd, check=False)
if TOKEN:
api = HfApi(token=TOKEN)
try:
api.create_repo(repo_id=OUTPUT_REPO, repo_type='model', private=True, exist_ok=True)
except Exception as e:
print(f'[upload] create_repo warning: {type(e).__name__}: {e}', flush=True)
prefix = f'jobs/{JOB_ID}'
try:
upload_artifact(api, LOG_FILE, f'{prefix}/run_domain_expanded.log')
upload_artifact(api, CACHE_ROOT / 'latest.pt', f'{prefix}/latest.pt')
upload_artifact(api, CACHE_ROOT / 'pretrain_final.pt', f'{prefix}/pretrain_final.pt')
except Exception as e:
print(f'[upload] upload warning: {type(e).__name__}: {e}', flush=True)
else:
print('[upload] HF_TOKEN not set; skipping artifact upload', flush=True)
return proc.returncode
def run_space_mode() -> int:
server = _start_health_server()
print('[space] Feather runtime image ready', flush=True)
try:
while True:
time.sleep(3600)
finally:
server.shutdown()
server.server_close()
def main() -> int:
if RUNTIME_MODE == 'job':
return run_job_mode()
return run_space_mode()
if __name__ == '__main__':
raise SystemExit(main())