Spaces:
Running on T4
Running on T4
| #!/usr/bin/env python3 | |
| """ | |
| Bundle a Luth model on this device with LEAP (leap-bundle) and investigate the result. | |
| Per Liquid AI docs: leap-bundle create produces .gguf (default) or .bundle (--executorch). | |
| We inspect both artifact types. | |
| Steps: | |
| 1. Download the Luth model from Hugging Face to a local directory. | |
| 2. Validate the directory with leap-bundle validate. | |
| 3. Create a bundle with leap-bundle create (requires LEAP auth). | |
| 4. Poll until the bundle is completed, then download the output. | |
| 5. Investigate: report file sizes (.gguf / .bundle) and optionally run inference on .gguf. | |
| Requires: pip install leap-bundle huggingface_hub | |
| LEAP auth: leap-bundle login <api-key> (from https://leap.liquid.ai/profile#/api-keys) | |
| """ | |
| import argparse | |
| import json | |
| import os | |
| import re | |
| import subprocess | |
| import sys | |
| import time | |
| from pathlib import Path | |
| def _leap_env() -> dict[str, str]: | |
| """Environment for leap-bundle subprocess so UTF-8 is used (avoids Windows cp1252 + checkmark).""" | |
| env = os.environ.copy() | |
| env["PYTHONUTF8"] = "1" | |
| return env | |
| # Luth model repo IDs (LFM2-based are most likely LEAP-compatible) | |
| LUTH_REPOS = [ | |
| "kurakurai/Luth-LFM2-350M", | |
| "kurakurai/Luth-LFM2-700M", | |
| "kurakurai/Luth-LFM2-1.2B", | |
| "kurakurai/Luth-0.6B-Instruct", | |
| "kurakurai/Luth-1.7B-Instruct", | |
| ] | |
| DEFAULT_REPO = LUTH_REPOS[0] | |
| DEFAULT_WORK_DIR = Path("./luth_bundle_work") | |
| DEFAULT_QUANTIZATION = "Q4_K_M" | |
| POLL_INTERVAL_SEC = 60 | |
| POLL_MAX_MINUTES = 30 | |
| def run(cmd: list[str], capture: bool = True, cwd: Path | None = None) -> subprocess.CompletedProcess: | |
| """Run a command; raise on non-zero exit unless capture is False.""" | |
| kwargs = { | |
| "cwd": str(cwd) if cwd else None, | |
| "text": True, | |
| "encoding": "utf-8", | |
| "errors": "replace", | |
| "env": _leap_env(), | |
| } | |
| if capture: | |
| kwargs["capture_output"] = True | |
| r = subprocess.run(cmd, **kwargs) | |
| if r.returncode != 0 and capture: | |
| raise RuntimeError(f"Command failed: {' '.join(cmd)}\nstdout: {r.stdout}\nstderr: {r.stderr}") | |
| return r | |
| def has_leap_bundle() -> bool: | |
| try: | |
| run(["leap-bundle", "--version"], capture=True) | |
| return True | |
| except (FileNotFoundError, RuntimeError): | |
| return False | |
| def download_model(repo_id: str, work_dir: Path) -> Path: | |
| """Download Hugging Face model to work_dir/models/<repo_slug>. Returns path to model dir.""" | |
| try: | |
| from huggingface_hub import snapshot_download | |
| except ImportError: | |
| raise SystemExit("Install huggingface_hub: pip install huggingface_hub") | |
| slug = repo_id.replace("/", "--") | |
| dest = work_dir / "models" / slug | |
| dest.mkdir(parents=True, exist_ok=True) | |
| print(f"Downloading {repo_id} to {dest} ...") | |
| snapshot_download(repo_id=repo_id, local_dir=str(dest)) | |
| return dest | |
| def validate_bundle(model_path: Path) -> bool: | |
| """Run leap-bundle validate. Returns True if valid.""" | |
| r = run(["leap-bundle", "validate", str(model_path)], capture=True) | |
| return r.returncode == 0 | |
| def _parse_request_id(out: str) -> str | None: | |
| """Parse request_id from JSON output; API may return integer or string.""" | |
| try: | |
| # Handle single line or multi-line JSON | |
| data = json.loads(out.strip()) | |
| rid = data.get("request_id") | |
| if rid is not None: | |
| return str(rid) | |
| except (json.JSONDecodeError, TypeError): | |
| pass | |
| match = re.search(r'"request_id"\s*:\s*("([^"]+)"|(\d+))', out) | |
| if match: | |
| return match.group(2) or match.group(3) | |
| return None | |
| def create_bundle(model_path: Path, work_dir: Path) -> tuple[str | None, str | None]: | |
| """Run leap-bundle create --json. Returns (request_id, pending_id). | |
| On success: (request_id, None). On 'pending request' error: (None, pending_id). Else: (None, None). | |
| """ | |
| r = subprocess.run( | |
| ["leap-bundle", "create", str(model_path), "--json"], | |
| capture_output=True, | |
| text=True, | |
| encoding="utf-8", | |
| errors="replace", | |
| cwd=work_dir, | |
| env=_leap_env(), | |
| ) | |
| out = (r.stdout or r.stderr or "").strip() | |
| if r.returncode != 0: | |
| print("Create failed:", out or f"exit code {r.returncode}") | |
| pending_id = _parse_pending_request_id(out) | |
| if pending_id: | |
| return None, pending_id | |
| if "login" in out.lower() or "authenticat" in out.lower(): | |
| print("Run: leap-bundle login <api-key> (get key from https://leap.liquid.ai/profile#/api-keys)") | |
| return None, None | |
| # Parse request_id (API can return {"request_id": 1, "status": "success"}) | |
| rid = _parse_request_id(out) | |
| if rid: | |
| return rid, None | |
| if "already exists" in out or "exists" in out: | |
| print("Bundle request already exists for this model (same hash). Check leap-bundle list.") | |
| return None, None | |
| print("Create output:", out) | |
| return None, None | |
| def _parse_pending_request_id(out: str) -> str | None: | |
| """Extract pending request ID from error message.""" | |
| match = re.search(r"pending request\s*\(ID:\s*(\d+)\)", out, re.IGNORECASE) | |
| return match.group(1) if match else None | |
| def get_request_status(request_id: str) -> str: | |
| """Get status of a bundle request. Returns status string.""" | |
| r = subprocess.run( | |
| ["leap-bundle", "list", str(request_id)], | |
| capture_output=True, | |
| text=True, | |
| encoding="utf-8", | |
| errors="replace", | |
| env=_leap_env(), | |
| ) | |
| out = (r.stdout or r.stderr or "").lower() | |
| if "completed" in out: | |
| return "completed" | |
| if "failed" in out: | |
| return "failed" | |
| if "processing" in out or "upload" in out or "pending" in out: | |
| return "processing" | |
| return "unknown" | |
| def wait_for_bundle(request_id: str) -> bool: | |
| """Poll until completed or failed. Returns True if completed.""" | |
| deadline = time.monotonic() + POLL_MAX_MINUTES * 60 | |
| while time.monotonic() < deadline: | |
| status = get_request_status(request_id) | |
| print(f" Status: {status}") | |
| if status == "completed": | |
| return True | |
| if status == "failed": | |
| print("Bundle request failed. Run: leap-bundle list", request_id) | |
| return False | |
| time.sleep(POLL_INTERVAL_SEC) | |
| print("Timed out waiting for bundle.") | |
| return False | |
| # Per Liquid AI docs: create output is .gguf (default) or .bundle (--executorch) | |
| BUNDLE_EXTENSIONS = (".gguf", ".bundle") | |
| def _find_bundle_artifact(work_dir: Path) -> Path | None: | |
| """Return first .gguf or .bundle file under work_dir or cwd.""" | |
| for d in [work_dir, Path.cwd()]: | |
| for ext in BUNDLE_EXTENSIONS: | |
| for f in d.glob(f"*{ext}"): | |
| return f | |
| return None | |
| def download_bundle(request_id: str, work_dir: Path) -> Path | None: | |
| """Run leap-bundle download <request_id>. Returns path to downloaded bundle artifact if found.""" | |
| r = run(["leap-bundle", "download", request_id], capture=True, cwd=work_dir) | |
| artifact = _find_bundle_artifact(work_dir) | |
| if artifact is None and r.returncode != 0: | |
| err = (r.stderr or r.stdout or "") | |
| if "signed_url" in err: | |
| print(" (LEAP download failed: 'signed_url' – try later: python download_bundles.py --request-ids", request_id + ")", file=sys.stderr) | |
| return artifact | |
| def investigate(bundle_path: Path | None, model_path: Path) -> None: | |
| """Report sizes for source dir and bundle artifact (.gguf or .bundle); run inference only on .gguf.""" | |
| print("\n--- Investigation ---") | |
| if model_path.exists(): | |
| total = sum(f.stat().st_size for f in model_path.rglob("*") if f.is_file()) | |
| print(f" Source model dir: {model_path} total size: {total / (1024**2):.1f} MB") | |
| if bundle_path and bundle_path.exists(): | |
| size_mb = bundle_path.stat().st_size / (1024**2) | |
| kind = "GGUF" if bundle_path.suffix == ".gguf" else "ExecuTorch (.bundle)" | |
| print(f" Bundle file: {bundle_path} size: {size_mb:.1f} MB [{kind}]") | |
| if bundle_path.suffix == ".gguf": | |
| try: | |
| from llama_cpp import Llama | |
| print(" Running short inference (llama_cpp)...") | |
| llm = Llama(model_path=str(bundle_path), n_ctx=256, verbose=False) | |
| out = llm("Bonjour, dis-moi une phrase courte en français.\n", max_tokens=32, temperature=0.3) | |
| text = out["choices"][0]["text"].strip() | |
| print(f" Sample output: {text[:200]}") | |
| except ImportError: | |
| print(" (Install llama-cpp-python to run a sample inference on the GGUF)") | |
| else: | |
| print(" (ExecuTorch .bundle; use LEAP SDK for inference)") | |
| else: | |
| print(" No bundle file (.gguf or .bundle) found to inspect.") | |
| def main() -> int: | |
| p = argparse.ArgumentParser( | |
| description="Bundle a Luth model with LEAP and investigate the result.", | |
| epilog="Requires: pip install leap-bundle huggingface_hub. Auth: leap-bundle login <api-key>", | |
| ) | |
| p.add_argument( | |
| "--model", | |
| default=DEFAULT_REPO, | |
| choices=LUTH_REPOS, | |
| help="Luth model repo ID (default: %(default)s); ignored if --all", | |
| ) | |
| p.add_argument( | |
| "--all", | |
| action="store_true", | |
| help="Bundle and inspect every Luth model in sequence (5 models; LEAP free tier = 5 requests/24h)", | |
| ) | |
| p.add_argument( | |
| "--work-dir", | |
| type=Path, | |
| default=DEFAULT_WORK_DIR, | |
| help="Working directory for download and bundle output (default: %(default)s)", | |
| ) | |
| p.add_argument( | |
| "--quantization", | |
| default=DEFAULT_QUANTIZATION, | |
| help="(Reserved; current leap-bundle create has no --quantization option)", | |
| ) | |
| p.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="Only download and validate; do not create or download bundle", | |
| ) | |
| p.add_argument( | |
| "--skip-create", | |
| action="store_true", | |
| help="Skip bundle create (use existing local model dir only); still run investigate", | |
| ) | |
| p.add_argument( | |
| "--request-id", | |
| type=str, | |
| metavar="ID", | |
| help="If bundle already created, download by request ID and then investigate", | |
| ) | |
| args = p.parse_args() | |
| args.work_dir = args.work_dir.resolve() | |
| args.work_dir.mkdir(parents=True, exist_ok=True) | |
| if not has_leap_bundle(): | |
| print("leap-bundle CLI not found. Install: pip install leap-bundle", file=sys.stderr) | |
| return 1 | |
| models_to_run = LUTH_REPOS if args.all else [args.model] | |
| if args.all and args.request_id: | |
| print("--request-id is ignored when using --all.", file=sys.stderr) | |
| args.request_id = None | |
| if args.all: | |
| print(f"Running for all {len(models_to_run)} Luth models: {', '.join(models_to_run)}") | |
| print("Note: LEAP free tier allows 5 bundle requests per 24h.\n") | |
| exit_code = 0 | |
| for repo_id in models_to_run: | |
| print(f"\n{'='*60}\n {repo_id}\n{'='*60}") | |
| try: | |
| # 1. Download | |
| model_path = download_model(repo_id, args.work_dir) | |
| # 2. Validate | |
| print("Validating directory for LEAP bundle...") | |
| if not validate_bundle(model_path): | |
| print("Validation failed. Fix the model directory and retry.", file=sys.stderr) | |
| exit_code = 1 | |
| continue | |
| print("Validation passed.") | |
| if args.dry_run: | |
| investigate(None, model_path) | |
| continue | |
| gguf_path: Path | None = None | |
| if args.request_id and not args.all: | |
| # Download existing bundle by ID (single-model only) | |
| print(f"Downloading bundle request {args.request_id}...") | |
| gguf_path = download_bundle(args.request_id, args.work_dir) | |
| elif not args.skip_create: | |
| # 3. Create bundle (LEAP allows only one pending request; wait for it if needed) | |
| request_id: str | None = None | |
| pending_id: str | None = None | |
| print("Creating bundle...") | |
| request_id, pending_id = create_bundle(model_path, args.work_dir) | |
| if pending_id: | |
| print(f"Waiting for previous bundle request {pending_id} to complete...") | |
| if wait_for_bundle(pending_id): | |
| download_bundle(pending_id, args.work_dir) | |
| print("Retrying create for this model...") | |
| request_id, pending_id = create_bundle(model_path, args.work_dir) | |
| if pending_id: | |
| print("Still pending; skipping create for this model.", file=sys.stderr) | |
| request_id = None | |
| if request_id: | |
| # 4. Wait and download | |
| print(f"Waiting for bundle request {request_id} (poll every {POLL_INTERVAL_SEC}s)...") | |
| if wait_for_bundle(request_id): | |
| gguf_path = download_bundle(request_id, args.work_dir) | |
| elif not pending_id: | |
| print("No new request created. Use --request-id <id> to download an existing bundle.") | |
| else: | |
| print("Skipping bundle create (--skip-create).") | |
| # 5. Investigate | |
| investigate(gguf_path, model_path) | |
| except Exception as e: | |
| print(f"Error processing {repo_id}: {e}", file=sys.stderr) | |
| exit_code = 1 | |
| return exit_code | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |