Pozify / scripts /exercise_router_modal.py
tthhanh's picture
docs: reorganize documentation structure and workflow
2a5c0c7
Raw
History Blame Contribute Delete
28.4 kB
from __future__ import annotations
import json
import os
from pathlib import Path
import shutil
import sys
from typing import Any
import zipfile
import modal
APP_NAME = "pozify-exercise-router"
DATASET_ID = "RickyRiccio/Real_Time_Exercise_Recognition_Dataset"
DEFAULT_HF_REPO_ID = "build-small-hackathon/pozify-exercise-router"
HF_REPO_ID_ENV = "POZIFY_ROUTER_HF_REPO_ID"
HF_PRIVATE_ENV = "POZIFY_ROUTER_HF_PRIVATE"
DATA_ROOT = Path("/data")
MODEL_ROOT = Path("/models")
DOCS_ROOT = Path("/root/docs")
RAW_ROOT = DATA_ROOT / "raw"
RICCIO_ROOT = RAW_ROOT / "riccio"
CUSTOM_UNKNOWN_ROOT = RAW_ROOT / "custom_unknown"
MANIFEST_PATH = DATA_ROOT / "manifests" / "router_examples.jsonl"
FEATURE_MANIFEST_PATH = DATA_ROOT / "features" / "feature_manifest.jsonl"
VIDEO_SUFFIXES = {".avi", ".m4v", ".mov", ".mp4", ".mpeg", ".mpg", ".webm"}
ARCHIVE_SUFFIXES = {".zip"}
RICCIO_VIDEO_COLLECTIONS = (
"final_kaggle_with_additional_video",
"my_test_video_1",
)
HF_ARTIFACT_FILENAMES = (
"baseline.joblib",
"router.joblib",
"router_selection.json",
"temporal.pt",
"evaluation.json",
"baseline_metrics.json",
"temporal_metrics.json",
)
image = (
modal.Image.debian_slim(python_version="3.10")
.apt_install("libegl1", "libgl1", "libgles2", "libglib2.0-0")
.pip_install(
"huggingface-hub>=0.24.0",
"joblib==1.5.3",
"mediapipe>=0.10.35",
"numpy==1.26.4",
"opencv-python-headless>=4.10.0",
"scikit-learn==1.7.2",
"scipy==1.15.3",
"torch==2.11.0",
)
.add_local_dir("src", "/root/src", copy=True)
.add_local_file(
"docs/22-router-huggingface-model-card.md",
"/root/docs/22-router-huggingface-model-card.md",
copy=True,
)
.add_local_file(
"docs/20-router-training-report.md",
"/root/docs/20-router-training-report.md",
copy=True,
)
)
app = modal.App(APP_NAME, image=image)
data_volume = modal.Volume.from_name(
"pozify-router-data", create_if_missing=True, version=2
)
model_volume = modal.Volume.from_name(
"pozify-router-models", create_if_missing=True, version=2
)
def _write_json(path: Path, payload: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
def _write_jsonl(path: Path, rows: list[dict[str, Any]]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(
"\n".join(json.dumps(row, ensure_ascii=False) for row in rows) + "\n",
encoding="utf-8",
)
def _read_jsonl(path: Path) -> list[dict[str, Any]]:
if not path.exists():
return []
return [
json.loads(line)
for line in path.read_text(encoding="utf-8").splitlines()
if line.strip()
]
def _env_truthy(value: str | None) -> bool:
return value is not None and value.strip().lower() in {"1", "true", "yes", "on"}
def _video_files(root: Path) -> list[Path]:
if not root.exists():
return []
return sorted(
path for path in root.rglob("*") if path.suffix.lower() in VIDEO_SUFFIXES
)
def _riccio_video_files(root: Path) -> list[Path]:
files: list[Path] = []
dataset_roots = [
root / "Real-Time Exercise Recognition Dataset",
root
/ "Real-Time Exercise Recognition Dataset"
/ "Real-Time Exercise Recognition Dataset",
root,
]
for collection_name in RICCIO_VIDEO_COLLECTIONS:
for dataset_root in dataset_roots:
collection_root = dataset_root / collection_name
if not collection_root.exists():
continue
for class_dir in sorted(
path for path in collection_root.iterdir() if path.is_dir()
):
files.extend(_video_files(class_dir))
if files:
return sorted(files)
return _video_files(root)
def _extract_archives(root: Path) -> list[str]:
extracted: list[str] = []
for archive_path in sorted(root.rglob("*")):
if archive_path.suffix.lower() not in ARCHIVE_SUFFIXES:
continue
target_dir = archive_path.with_suffix("")
marker = target_dir / ".pozify_extracted"
if marker.exists():
continue
target_dir.mkdir(parents=True, exist_ok=True)
if any(target_dir.iterdir()):
marker.write_text("ok\n", encoding="utf-8")
continue
if archive_path.suffix.lower() == ".zip":
with zipfile.ZipFile(archive_path) as archive:
archive.extractall(target_dir)
marker.write_text("ok\n", encoding="utf-8")
extracted.append(str(archive_path))
return extracted
def _label_from_path(path: Path) -> str:
sys.path.insert(0, "/root/src")
from pozify.ml.exercise_router_features import normalize_router_label
for part in reversed(path.parts):
label = normalize_router_label(part)
if label != "unknown":
return label
if any(
token in part.lower()
for token in ("unknown", "curl", "idle", "walk", "stretch")
):
return "unknown"
return "unknown"
def _example_id(path: Path) -> str:
safe = "_".join(path.relative_to(RAW_ROOT).parts)
return "".join(
char if char.isalnum() or char in {"_", "-", "."} else "_" for char in safe
)
@app.function(
volumes={str(DATA_ROOT): data_volume},
timeout=60 * 60,
secrets=[modal.Secret.from_name("huggingface-secret")],
gpu="any",
)
def ingest() -> dict[str, Any]:
from huggingface_hub import snapshot_download
RICCIO_ROOT.mkdir(parents=True, exist_ok=True)
snapshot_download(
repo_id=DATASET_ID,
repo_type="dataset",
local_dir=RICCIO_ROOT,
local_dir_use_symlinks=False,
)
extracted_archives = _extract_archives(RICCIO_ROOT)
examples: list[dict[str, Any]] = []
for video_path in [
*_riccio_video_files(RICCIO_ROOT),
*_video_files(CUSTOM_UNKNOWN_ROOT),
]:
source = (
"custom_unknown" if CUSTOM_UNKNOWN_ROOT in video_path.parents else "riccio"
)
examples.append(
{
"id": _example_id(video_path),
"video_path": str(video_path),
"label": (
"unknown"
if source == "custom_unknown"
else _label_from_path(video_path)
),
"source": source,
}
)
_write_jsonl(MANIFEST_PATH, examples)
_write_json(
DATA_ROOT / "manifests" / "ingest_summary.json",
{
"dataset_id": DATASET_ID,
"example_count": len(examples),
"extracted_archives": extracted_archives,
"labels": sorted({example["label"] for example in examples}),
"custom_unknown_root": str(CUSTOM_UNKNOWN_ROOT),
},
)
data_volume.commit()
return {"example_count": len(examples), "manifest_path": str(MANIFEST_PATH)}
@app.function(volumes={str(DATA_ROOT): data_volume})
def load_feature_examples(limit: int | None = None) -> list[dict[str, Any]]:
examples = _read_jsonl(MANIFEST_PATH)
return examples[:limit] if limit else examples
@app.function(volumes={str(DATA_ROOT): data_volume}, timeout=30 * 60)
def extract_example_features(example: dict[str, Any]) -> dict[str, Any]:
sys.path.insert(0, "/root/src")
import numpy as np
from pozify.ml.exercise_router_features import (
FEATURE_SCHEMA,
ROUTER_LANDMARK_SCHEMA,
extract_router_windows,
window_tensor_feature_names,
window_vector_feature_names,
)
from pozify.steps import pose_cleaning, pose_landmarker, video_qc
video_path = Path(example["video_path"])
manifest = video_qc.run(str(video_path))
raw_sequence = pose_landmarker.run(manifest, mock=False)
sequence = pose_cleaning.run(raw_sequence)
windows = extract_router_windows(sequence)
feature_path = DATA_ROOT / "features" / f"{example['id']}.npz"
feature_path.parent.mkdir(parents=True, exist_ok=True)
if windows:
np.savez_compressed(
feature_path,
vectors=np.stack([window.vector for window in windows]),
tensors=np.stack([window.tensor for window in windows]),
label=example["label"],
feature_schema=FEATURE_SCHEMA,
landmark_schema=ROUTER_LANDMARK_SCHEMA,
vector_feature_names=np.asarray(window_vector_feature_names()),
tensor_feature_names=np.asarray(window_tensor_feature_names()),
)
else:
np.savez_compressed(
feature_path,
vectors=np.empty((0, 0), dtype=np.float32),
tensors=np.empty((0, 0, 0), dtype=np.float32),
label=example["label"],
feature_schema=FEATURE_SCHEMA,
landmark_schema=ROUTER_LANDMARK_SCHEMA,
vector_feature_names=np.asarray(window_vector_feature_names()),
tensor_feature_names=np.asarray(window_tensor_feature_names()),
)
data_volume.commit()
return {
"id": example["id"],
"label": example["label"],
"video_path": str(video_path),
"feature_path": str(feature_path),
"window_count": len(windows),
"pose_valid_ratio": sequence.pose_valid_ratio,
"feature_schema": FEATURE_SCHEMA,
"landmark_schema": ROUTER_LANDMARK_SCHEMA,
}
@app.function(volumes={str(DATA_ROOT): data_volume})
def write_feature_manifest(rows: list[dict[str, Any]]) -> dict[str, Any]:
_write_jsonl(FEATURE_MANIFEST_PATH, rows)
_write_json(
DATA_ROOT / "features" / "feature_summary.json",
{
"example_count": len(rows),
"window_count": sum(int(row.get("window_count", 0)) for row in rows),
"failed_count": sum(1 for row in rows if not row.get("ok", True)),
"feature_schema": rows[0].get("feature_schema") if rows else None,
"landmark_schema": rows[0].get("landmark_schema") if rows else None,
},
)
data_volume.commit()
return {
"feature_manifest_path": str(FEATURE_MANIFEST_PATH),
"example_count": len(rows),
}
def _load_feature_arrays() -> tuple[Any, Any, Any]:
import numpy as np
sys.path.insert(0, "/root/src")
from pozify.ml.exercise_router_features import (
FEATURE_SCHEMA,
ROUTER_LANDMARK_SCHEMA,
)
vectors: list[np.ndarray] = []
tensors: list[np.ndarray] = []
labels: list[str] = []
for row in _read_jsonl(FEATURE_MANIFEST_PATH):
if not row.get("ok", True) or int(row.get("window_count", 0)) <= 0:
continue
data = np.load(row["feature_path"], allow_pickle=False)
feature_schema = str(data["feature_schema"]) if "feature_schema" in data else ""
landmark_schema = (
str(data["landmark_schema"]) if "landmark_schema" in data else ""
)
if (
feature_schema != FEATURE_SCHEMA
or landmark_schema != ROUTER_LANDMARK_SCHEMA
):
continue
row_vectors = data["vectors"]
row_tensors = data["tensors"]
label = str(data["label"])
for index in range(row_vectors.shape[0]):
vectors.append(row_vectors[index])
tensors.append(row_tensors[index])
labels.append(label)
if not vectors:
return None, None, []
return np.stack(vectors), np.stack(tensors), labels
@app.function(
volumes={str(DATA_ROOT): data_volume, str(MODEL_ROOT): model_volume},
timeout=30 * 60,
)
def train_baseline() -> dict[str, Any]:
import joblib
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
sys.path.insert(0, "/root/src")
from pozify.ml.exercise_router_features import (
FEATURE_SCHEMA,
ROUTER_LANDMARK_SCHEMA,
window_vector_feature_names,
)
vectors, _tensors, labels = _load_feature_arrays()
if vectors is None or len(set(labels)) < 2:
result = {
"ok": False,
"error": "At least two labels with extracted windows are required",
}
_write_json(MODEL_ROOT / "baseline_metrics.json", result)
model_volume.commit()
return result
label_counts = {label: labels.count(label) for label in sorted(set(labels))}
stratify = labels if min(label_counts.values()) >= 2 else None
train_x, valid_x, train_y, valid_y = train_test_split(
vectors,
labels,
test_size=0.2,
random_state=42,
stratify=stratify,
)
model = Pipeline(
steps=[
("scaler", StandardScaler()),
(
"classifier",
HistGradientBoostingClassifier(
max_iter=200,
learning_rate=0.08,
random_state=42,
),
),
]
)
model.fit(train_x, train_y)
predictions = model.predict(valid_x)
metrics = {
"ok": True,
"model_kind": "baseline",
"accuracy": round(float(accuracy_score(valid_y, predictions)), 4),
"label_counts": label_counts,
"window_count": int(vectors.shape[0]),
}
joblib.dump(
{
"model": model,
"labels": [str(label) for label in model.classes_],
"model_kind": "baseline",
"feature_names": window_vector_feature_names(),
"feature_schema": FEATURE_SCHEMA,
"landmark_schema": ROUTER_LANDMARK_SCHEMA,
"input_size": int(vectors.shape[-1]),
"metrics": metrics,
},
MODEL_ROOT / "baseline.joblib",
)
_write_json(MODEL_ROOT / "baseline_metrics.json", metrics)
model_volume.commit()
return metrics
@app.function(
gpu="A10",
volumes={str(DATA_ROOT): data_volume, str(MODEL_ROOT): model_volume},
timeout=60 * 60,
)
def train_temporal(epochs: int = 73) -> dict[str, Any]:
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
sys.path.insert(0, "/root/src")
from pozify.ml.exercise_router_features import (
FEATURE_SCHEMA,
ROUTER_LABELS,
ROUTER_LANDMARK_SCHEMA,
window_tensor_feature_names,
)
from pozify.ml.exercise_router_evaluation import (
evaluation_to_dict,
evaluate_router_predictions,
)
from pozify.ml.exercise_router_temporal import (
TEMPORAL_ARCHITECTURE,
TEMPORAL_BATCH_SIZE,
TEMPORAL_DROPOUT_RATE,
TEMPORAL_HIDDEN_SIZE,
TEMPORAL_LEARNING_RATE,
TEMPORAL_NUM_LAYERS,
TemporalRouterConfig,
build_temporal_router_model,
)
_vectors, tensors, labels = _load_feature_arrays()
if tensors is None or len(set(labels)) < 2:
result = {
"ok": False,
"error": "At least two labels with extracted windows are required",
}
_write_json(MODEL_ROOT / "temporal_metrics.json", result)
model_volume.commit()
return result
label_to_index = {label: index for index, label in enumerate(ROUTER_LABELS)}
label_indices = [
label_to_index.get(label, label_to_index["unknown"]) for label in labels
]
label_counts = {label: labels.count(label) for label in sorted(set(labels))}
stratify = labels if min(label_counts.values()) >= 2 else None
train_x, valid_x, train_y, valid_y, _train_labels, valid_labels = train_test_split(
tensors,
label_indices,
labels,
test_size=0.2,
random_state=42,
stratify=stratify,
)
train_dataset = TensorDataset(
torch.tensor(train_x, dtype=torch.float32),
torch.tensor(train_y, dtype=torch.long),
)
generator = torch.Generator().manual_seed(42)
loader = DataLoader(
train_dataset, batch_size=TEMPORAL_BATCH_SIZE, shuffle=True, generator=generator
)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.manual_seed(42)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(42)
config = TemporalRouterConfig(
input_size=int(tensors.shape[-1]),
label_count=len(ROUTER_LABELS),
hidden_size=TEMPORAL_HIDDEN_SIZE,
num_layers=TEMPORAL_NUM_LAYERS,
bidirectional=True,
dropout_rate=TEMPORAL_DROPOUT_RATE,
)
model = build_temporal_router_model(config).to(device)
optimizer = torch.optim.AdamW(
model.parameters(), lr=TEMPORAL_LEARNING_RATE, weight_decay=1e-4
)
loss_fn = nn.CrossEntropyLoss()
model.train()
last_loss = 0.0
for _epoch in range(epochs):
for batch_x, batch_y in loader:
batch_x = batch_x.to(device)
batch_y = batch_y.to(device)
optimizer.zero_grad()
loss = loss_fn(model(batch_x), batch_y)
loss.backward()
optimizer.step()
last_loss = float(loss.detach().cpu())
valid_predictions = _predict_temporal_labels(
model=model,
tensors=valid_x,
labels=ROUTER_LABELS,
device=device,
)
validation = evaluation_to_dict(
evaluate_router_predictions(list(valid_labels), valid_predictions)
)
torch.save(
{
"model_kind": "temporal",
"architecture": TEMPORAL_ARCHITECTURE,
"state_dict": model.cpu().state_dict(),
"labels": list(ROUTER_LABELS),
"input_size": int(tensors.shape[-1]),
"feature_names": window_tensor_feature_names(),
"feature_schema": FEATURE_SCHEMA,
"landmark_schema": ROUTER_LANDMARK_SCHEMA,
"hidden_size": TEMPORAL_HIDDEN_SIZE,
"num_layers": TEMPORAL_NUM_LAYERS,
"bidirectional": True,
"dropout_rate": TEMPORAL_DROPOUT_RATE,
"learning_rate": TEMPORAL_LEARNING_RATE,
"batch_size": TEMPORAL_BATCH_SIZE,
},
MODEL_ROOT / "temporal.pt",
)
metrics = {
"ok": True,
"model_kind": "temporal",
"architecture": TEMPORAL_ARCHITECTURE,
"epochs": epochs,
"final_training_loss": round(last_loss, 4),
"window_count": int(tensors.shape[0]),
"label_counts": label_counts,
"learning_rate": TEMPORAL_LEARNING_RATE,
"batch_size": TEMPORAL_BATCH_SIZE,
"hidden_size": TEMPORAL_HIDDEN_SIZE,
"dropout_rate": TEMPORAL_DROPOUT_RATE,
"validation": validation,
}
_write_json(MODEL_ROOT / "temporal_metrics.json", metrics)
model_volume.commit()
return metrics
def _predict_temporal_labels(
*,
model: Any,
tensors: Any,
labels: tuple[str, ...],
device: Any,
) -> list[str]:
import torch
from torch.utils.data import DataLoader, TensorDataset
dataset = TensorDataset(torch.tensor(tensors, dtype=torch.float32))
loader = DataLoader(dataset, batch_size=128, shuffle=False)
predictions: list[str] = []
model.eval()
with torch.no_grad():
for (batch_x,) in loader:
logits = model(batch_x.to(device))
predicted_indices = torch.argmax(logits, dim=1).detach().cpu().tolist()
predictions.extend(labels[index] for index in predicted_indices)
return predictions
def _evaluate_temporal_checkpoint(checkpoint_path: Path, tensors: Any) -> list[str]:
import torch
sys.path.insert(0, "/root/src")
from pozify.ml.exercise_router_temporal import (
TemporalRouterConfig,
build_temporal_router_model,
)
checkpoint = torch.load(checkpoint_path, map_location="cpu", weights_only=False)
labels = tuple(str(label) for label in checkpoint["labels"])
config = TemporalRouterConfig(
input_size=int(checkpoint["input_size"]),
label_count=len(labels),
hidden_size=int(checkpoint.get("hidden_size", 64)),
num_layers=int(checkpoint.get("num_layers", 1)),
bidirectional=bool(checkpoint.get("bidirectional", True)),
dropout_rate=float(checkpoint.get("dropout_rate", 0.2174)),
)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = build_temporal_router_model(config).to(device)
model.load_state_dict(checkpoint["state_dict"])
return _predict_temporal_labels(
model=model,
tensors=tensors,
labels=labels,
device=device,
)
@app.function(volumes={str(DATA_ROOT): data_volume, str(MODEL_ROOT): model_volume})
def evaluate() -> dict[str, Any]:
import joblib
sys.path.insert(0, "/root/src")
from pozify.ml.exercise_router_evaluation import (
evaluation_to_dict,
evaluate_router_predictions,
select_router_candidate,
)
vectors, tensors, labels = _load_feature_arrays()
baseline_path = MODEL_ROOT / "baseline.joblib"
temporal_path = MODEL_ROOT / "temporal.pt"
if vectors is None or tensors is None:
result = {"ok": False, "error": "Feature arrays are required"}
_write_json(MODEL_ROOT / "evaluation.json", result)
model_volume.commit()
return result
candidates: list[dict[str, Any]] = []
model_results: dict[str, Any] = {}
if baseline_path.exists():
artifact = joblib.load(baseline_path)
predictions = list(artifact["model"].predict(vectors))
baseline_evaluation = evaluation_to_dict(
evaluate_router_predictions(labels, predictions)
)
model_results["baseline"] = {
"ok": True,
"artifact": "baseline.joblib",
**baseline_evaluation,
}
candidates.append(
{
"name": "baseline",
"source_artifact": baseline_path,
"selected_artifact": "router.joblib",
**baseline_evaluation,
}
)
if temporal_path.exists():
predictions = _evaluate_temporal_checkpoint(temporal_path, tensors)
temporal_evaluation = evaluation_to_dict(
evaluate_router_predictions(labels, predictions)
)
model_results["temporal"] = {
"ok": True,
"artifact": "temporal.pt",
**temporal_evaluation,
}
candidates.append(
{
"name": "temporal",
"source_artifact": temporal_path,
"selected_artifact": "temporal.pt",
**temporal_evaluation,
}
)
if not candidates:
result = {
"ok": False,
"error": "At least one trained router artifact is required",
}
_write_json(MODEL_ROOT / "evaluation.json", result)
model_volume.commit()
return result
selected = select_router_candidate(candidates)
if baseline_path.exists():
shutil.copyfile(baseline_path, MODEL_ROOT / "router.joblib")
selection = {
"selected_model": f"{selected['name']}.{ 'joblib' if selected['name'] == 'baseline' else 'pt' }",
"selected_artifact": selected["selected_artifact"],
"reason": "prefer BiLSTM temporal when available; baseline falls back when temporal is missing",
}
_write_json(MODEL_ROOT / "router_selection.json", selection)
result = {
"ok": True,
"selected_model": selection["selected_model"],
"selected_artifact": selection["selected_artifact"],
"models": model_results,
**{
key: selected[key]
for key in (
"accuracy",
"precision",
"recall",
"unknown_rejection_rate",
"confusion_matrix",
)
},
}
_write_json(MODEL_ROOT / "evaluation.json", result)
model_volume.commit()
return result
def _upload_hf_file(
api: Any,
*,
repo_id: str,
local_path: Path,
path_in_repo: str,
) -> dict[str, Any]:
if not local_path.exists():
return {
"path": str(local_path),
"path_in_repo": path_in_repo,
"uploaded": False,
"reason": "missing",
}
api.upload_file(
repo_id=repo_id,
repo_type="model",
path_or_fileobj=str(local_path),
path_in_repo=path_in_repo,
)
return {
"path": str(local_path),
"path_in_repo": path_in_repo,
"uploaded": True,
}
@app.function(
volumes={str(MODEL_ROOT): model_volume},
secrets=[modal.Secret.from_name("huggingface-secret")],
timeout=20 * 60,
)
def publish_to_hf(
repo_id: str | None = None,
private: bool | None = None,
) -> dict[str, Any]:
from huggingface_hub import HfApi
repo_id = repo_id or os.getenv(HF_REPO_ID_ENV) or DEFAULT_HF_REPO_ID
private = _env_truthy(os.getenv(HF_PRIVATE_ENV)) if private is None else private
if not os.getenv("HF_TOKEN"):
return {
"ok": False,
"error": "HF_TOKEN is required in the Modal environment or local .env",
"repo_id": repo_id,
}
api = HfApi()
api.create_repo(repo_id=repo_id, repo_type="model", private=private, exist_ok=True)
uploads = [
_upload_hf_file(
api,
repo_id=repo_id,
local_path=DOCS_ROOT / "huggingface-router-model-card.md",
path_in_repo="README.md",
),
_upload_hf_file(
api,
repo_id=repo_id,
local_path=DOCS_ROOT / "exercise-router-training-report.md",
path_in_repo="training_report.md",
),
]
uploads.extend(
_upload_hf_file(
api,
repo_id=repo_id,
local_path=MODEL_ROOT / filename,
path_in_repo=filename,
)
for filename in HF_ARTIFACT_FILENAMES
)
result = {
"ok": any(item["uploaded"] for item in uploads),
"repo_id": repo_id,
"private": private,
"uploads": uploads,
}
_write_json(MODEL_ROOT / "hf_upload.json", result)
model_volume.commit()
return result
@app.local_entrypoint()
def main(
stage: str = "evaluate",
limit: int | None = None,
epochs: int = 73,
repo_id: str | None = None,
private: bool | None = None,
) -> None:
if stage == "ingest":
print(ingest.remote())
elif stage == "features":
examples = load_feature_examples.remote(limit)
rows: list[dict[str, Any]] = []
for result in extract_example_features.map(examples, return_exceptions=True):
if isinstance(result, Exception):
rows.append({"ok": False, "error": repr(result)})
else:
rows.append({"ok": True, **result})
print(write_feature_manifest.remote(rows))
elif stage == "train-baseline":
print(train_baseline.remote())
elif stage == "train-temporal":
print(train_temporal.remote(epochs=epochs))
elif stage == "evaluate":
print(evaluate.remote())
print(publish_to_hf.remote(repo_id=repo_id, private=private))
elif stage == "publish":
print(publish_to_hf.remote(repo_id=repo_id, private=private))
elif stage == "all":
print(ingest.remote())
examples = load_feature_examples.remote(limit)
rows = []
for result in extract_example_features.map(examples, return_exceptions=True):
rows.append(
{"ok": False, "error": repr(result)}
if isinstance(result, Exception)
else {"ok": True, **result}
)
print(write_feature_manifest.remote(rows))
print(train_baseline.remote())
print(train_temporal.remote(epochs=epochs))
print(evaluate.remote())
print(publish_to_hf.remote(repo_id=repo_id, private=private))
else:
raise ValueError(
"stage must be one of: ingest, features, train-baseline, train-temporal, evaluate, publish, all"
)