fr-on-device / bundle_luth.py
Joseph Pollack
adds real lfm and pleais numbers
935bdc8 unverified
#!/usr/bin/env python3
"""
Bundle a Luth model on this device with LEAP (leap-bundle) and investigate the result.
Per Liquid AI docs: leap-bundle create produces .gguf (default) or .bundle (--executorch).
We inspect both artifact types.
Steps:
1. Download the Luth model from Hugging Face to a local directory.
2. Validate the directory with leap-bundle validate.
3. Create a bundle with leap-bundle create (requires LEAP auth).
4. Poll until the bundle is completed, then download the output.
5. Investigate: report file sizes (.gguf / .bundle) and optionally run inference on .gguf.
Requires: pip install leap-bundle huggingface_hub
LEAP auth: leap-bundle login <api-key> (from https://leap.liquid.ai/profile#/api-keys)
"""
import argparse
import json
import os
import re
import subprocess
import sys
import time
from pathlib import Path
def _leap_env() -> dict[str, str]:
"""Environment for leap-bundle subprocess so UTF-8 is used (avoids Windows cp1252 + checkmark)."""
env = os.environ.copy()
env["PYTHONUTF8"] = "1"
return env
# Luth model repo IDs (LFM2-based are most likely LEAP-compatible)
LUTH_REPOS = [
"kurakurai/Luth-LFM2-350M",
"kurakurai/Luth-LFM2-700M",
"kurakurai/Luth-LFM2-1.2B",
"kurakurai/Luth-0.6B-Instruct",
"kurakurai/Luth-1.7B-Instruct",
]
DEFAULT_REPO = LUTH_REPOS[0]
DEFAULT_WORK_DIR = Path("./luth_bundle_work")
DEFAULT_QUANTIZATION = "Q4_K_M"
POLL_INTERVAL_SEC = 60
POLL_MAX_MINUTES = 30
def run(cmd: list[str], capture: bool = True, cwd: Path | None = None) -> subprocess.CompletedProcess:
"""Run a command; raise on non-zero exit unless capture is False."""
kwargs = {
"cwd": str(cwd) if cwd else None,
"text": True,
"encoding": "utf-8",
"errors": "replace",
"env": _leap_env(),
}
if capture:
kwargs["capture_output"] = True
r = subprocess.run(cmd, **kwargs)
if r.returncode != 0 and capture:
raise RuntimeError(f"Command failed: {' '.join(cmd)}\nstdout: {r.stdout}\nstderr: {r.stderr}")
return r
def has_leap_bundle() -> bool:
try:
run(["leap-bundle", "--version"], capture=True)
return True
except (FileNotFoundError, RuntimeError):
return False
def download_model(repo_id: str, work_dir: Path) -> Path:
"""Download Hugging Face model to work_dir/models/<repo_slug>. Returns path to model dir."""
try:
from huggingface_hub import snapshot_download
except ImportError:
raise SystemExit("Install huggingface_hub: pip install huggingface_hub")
slug = repo_id.replace("/", "--")
dest = work_dir / "models" / slug
dest.mkdir(parents=True, exist_ok=True)
print(f"Downloading {repo_id} to {dest} ...")
snapshot_download(repo_id=repo_id, local_dir=str(dest))
return dest
def validate_bundle(model_path: Path) -> bool:
"""Run leap-bundle validate. Returns True if valid."""
r = run(["leap-bundle", "validate", str(model_path)], capture=True)
return r.returncode == 0
def _parse_request_id(out: str) -> str | None:
"""Parse request_id from JSON output; API may return integer or string."""
try:
# Handle single line or multi-line JSON
data = json.loads(out.strip())
rid = data.get("request_id")
if rid is not None:
return str(rid)
except (json.JSONDecodeError, TypeError):
pass
match = re.search(r'"request_id"\s*:\s*("([^"]+)"|(\d+))', out)
if match:
return match.group(2) or match.group(3)
return None
def create_bundle(model_path: Path, work_dir: Path) -> tuple[str | None, str | None]:
"""Run leap-bundle create --json. Returns (request_id, pending_id).
On success: (request_id, None). On 'pending request' error: (None, pending_id). Else: (None, None).
"""
r = subprocess.run(
["leap-bundle", "create", str(model_path), "--json"],
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
cwd=work_dir,
env=_leap_env(),
)
out = (r.stdout or r.stderr or "").strip()
if r.returncode != 0:
print("Create failed:", out or f"exit code {r.returncode}")
pending_id = _parse_pending_request_id(out)
if pending_id:
return None, pending_id
if "login" in out.lower() or "authenticat" in out.lower():
print("Run: leap-bundle login <api-key> (get key from https://leap.liquid.ai/profile#/api-keys)")
return None, None
# Parse request_id (API can return {"request_id": 1, "status": "success"})
rid = _parse_request_id(out)
if rid:
return rid, None
if "already exists" in out or "exists" in out:
print("Bundle request already exists for this model (same hash). Check leap-bundle list.")
return None, None
print("Create output:", out)
return None, None
def _parse_pending_request_id(out: str) -> str | None:
"""Extract pending request ID from error message."""
match = re.search(r"pending request\s*\(ID:\s*(\d+)\)", out, re.IGNORECASE)
return match.group(1) if match else None
def get_request_status(request_id: str) -> str:
"""Get status of a bundle request. Returns status string."""
r = subprocess.run(
["leap-bundle", "list", str(request_id)],
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
env=_leap_env(),
)
out = (r.stdout or r.stderr or "").lower()
if "completed" in out:
return "completed"
if "failed" in out:
return "failed"
if "processing" in out or "upload" in out or "pending" in out:
return "processing"
return "unknown"
def wait_for_bundle(request_id: str) -> bool:
"""Poll until completed or failed. Returns True if completed."""
deadline = time.monotonic() + POLL_MAX_MINUTES * 60
while time.monotonic() < deadline:
status = get_request_status(request_id)
print(f" Status: {status}")
if status == "completed":
return True
if status == "failed":
print("Bundle request failed. Run: leap-bundle list", request_id)
return False
time.sleep(POLL_INTERVAL_SEC)
print("Timed out waiting for bundle.")
return False
# Per Liquid AI docs: create output is .gguf (default) or .bundle (--executorch)
BUNDLE_EXTENSIONS = (".gguf", ".bundle")
def _find_bundle_artifact(work_dir: Path) -> Path | None:
"""Return first .gguf or .bundle file under work_dir or cwd."""
for d in [work_dir, Path.cwd()]:
for ext in BUNDLE_EXTENSIONS:
for f in d.glob(f"*{ext}"):
return f
return None
def download_bundle(request_id: str, work_dir: Path) -> Path | None:
"""Run leap-bundle download <request_id>. Returns path to downloaded bundle artifact if found."""
r = run(["leap-bundle", "download", request_id], capture=True, cwd=work_dir)
artifact = _find_bundle_artifact(work_dir)
if artifact is None and r.returncode != 0:
err = (r.stderr or r.stdout or "")
if "signed_url" in err:
print(" (LEAP download failed: 'signed_url' – try later: python download_bundles.py --request-ids", request_id + ")", file=sys.stderr)
return artifact
def investigate(bundle_path: Path | None, model_path: Path) -> None:
"""Report sizes for source dir and bundle artifact (.gguf or .bundle); run inference only on .gguf."""
print("\n--- Investigation ---")
if model_path.exists():
total = sum(f.stat().st_size for f in model_path.rglob("*") if f.is_file())
print(f" Source model dir: {model_path} total size: {total / (1024**2):.1f} MB")
if bundle_path and bundle_path.exists():
size_mb = bundle_path.stat().st_size / (1024**2)
kind = "GGUF" if bundle_path.suffix == ".gguf" else "ExecuTorch (.bundle)"
print(f" Bundle file: {bundle_path} size: {size_mb:.1f} MB [{kind}]")
if bundle_path.suffix == ".gguf":
try:
from llama_cpp import Llama
print(" Running short inference (llama_cpp)...")
llm = Llama(model_path=str(bundle_path), n_ctx=256, verbose=False)
out = llm("Bonjour, dis-moi une phrase courte en français.\n", max_tokens=32, temperature=0.3)
text = out["choices"][0]["text"].strip()
print(f" Sample output: {text[:200]}")
except ImportError:
print(" (Install llama-cpp-python to run a sample inference on the GGUF)")
else:
print(" (ExecuTorch .bundle; use LEAP SDK for inference)")
else:
print(" No bundle file (.gguf or .bundle) found to inspect.")
def main() -> int:
p = argparse.ArgumentParser(
description="Bundle a Luth model with LEAP and investigate the result.",
epilog="Requires: pip install leap-bundle huggingface_hub. Auth: leap-bundle login <api-key>",
)
p.add_argument(
"--model",
default=DEFAULT_REPO,
choices=LUTH_REPOS,
help="Luth model repo ID (default: %(default)s); ignored if --all",
)
p.add_argument(
"--all",
action="store_true",
help="Bundle and inspect every Luth model in sequence (5 models; LEAP free tier = 5 requests/24h)",
)
p.add_argument(
"--work-dir",
type=Path,
default=DEFAULT_WORK_DIR,
help="Working directory for download and bundle output (default: %(default)s)",
)
p.add_argument(
"--quantization",
default=DEFAULT_QUANTIZATION,
help="(Reserved; current leap-bundle create has no --quantization option)",
)
p.add_argument(
"--dry-run",
action="store_true",
help="Only download and validate; do not create or download bundle",
)
p.add_argument(
"--skip-create",
action="store_true",
help="Skip bundle create (use existing local model dir only); still run investigate",
)
p.add_argument(
"--request-id",
type=str,
metavar="ID",
help="If bundle already created, download by request ID and then investigate",
)
args = p.parse_args()
args.work_dir = args.work_dir.resolve()
args.work_dir.mkdir(parents=True, exist_ok=True)
if not has_leap_bundle():
print("leap-bundle CLI not found. Install: pip install leap-bundle", file=sys.stderr)
return 1
models_to_run = LUTH_REPOS if args.all else [args.model]
if args.all and args.request_id:
print("--request-id is ignored when using --all.", file=sys.stderr)
args.request_id = None
if args.all:
print(f"Running for all {len(models_to_run)} Luth models: {', '.join(models_to_run)}")
print("Note: LEAP free tier allows 5 bundle requests per 24h.\n")
exit_code = 0
for repo_id in models_to_run:
print(f"\n{'='*60}\n {repo_id}\n{'='*60}")
try:
# 1. Download
model_path = download_model(repo_id, args.work_dir)
# 2. Validate
print("Validating directory for LEAP bundle...")
if not validate_bundle(model_path):
print("Validation failed. Fix the model directory and retry.", file=sys.stderr)
exit_code = 1
continue
print("Validation passed.")
if args.dry_run:
investigate(None, model_path)
continue
gguf_path: Path | None = None
if args.request_id and not args.all:
# Download existing bundle by ID (single-model only)
print(f"Downloading bundle request {args.request_id}...")
gguf_path = download_bundle(args.request_id, args.work_dir)
elif not args.skip_create:
# 3. Create bundle (LEAP allows only one pending request; wait for it if needed)
request_id: str | None = None
pending_id: str | None = None
print("Creating bundle...")
request_id, pending_id = create_bundle(model_path, args.work_dir)
if pending_id:
print(f"Waiting for previous bundle request {pending_id} to complete...")
if wait_for_bundle(pending_id):
download_bundle(pending_id, args.work_dir)
print("Retrying create for this model...")
request_id, pending_id = create_bundle(model_path, args.work_dir)
if pending_id:
print("Still pending; skipping create for this model.", file=sys.stderr)
request_id = None
if request_id:
# 4. Wait and download
print(f"Waiting for bundle request {request_id} (poll every {POLL_INTERVAL_SEC}s)...")
if wait_for_bundle(request_id):
gguf_path = download_bundle(request_id, args.work_dir)
elif not pending_id:
print("No new request created. Use --request-id <id> to download an existing bundle.")
else:
print("Skipping bundle create (--skip-create).")
# 5. Investigate
investigate(gguf_path, model_path)
except Exception as e:
print(f"Error processing {repo_id}: {e}", file=sys.stderr)
exit_code = 1
return exit_code
if __name__ == "__main__":
sys.exit(main())