Spaces:
Running on Zero
Running on Zero
| from __future__ import annotations | |
| import json | |
| import inspect | |
| import os | |
| from pathlib import Path | |
| import shutil | |
| import sys | |
| from typing import Any | |
| import modal | |
| APP_NAME = "pozify-coach-summary" | |
| DEFAULT_HF_REPO_NAME = "pozify-coach-summary" | |
| HF_REPO_ID_ENV = "POZIFY_COACH_SUMMARY_HF_REPO_ID" | |
| HF_MERGED_REPO_ID_ENV = "POZIFY_COACH_SUMMARY_MERGED_HF_REPO_ID" | |
| HF_PRIVATE_ENV = "POZIFY_COACH_SUMMARY_HF_PRIVATE" | |
| RUNTIME_MODEL_ENV = "POZIFY_COACH_SUMMARY_MODEL" | |
| DATA_ROOT = Path("/data") | |
| MODEL_ROOT = Path("/models") | |
| ROOT_DATA = Path("/root/data") | |
| ROOT_CONFIGS = Path("/root/configs") | |
| SFT_ROOT = DATA_ROOT / "sft" | |
| MODEL_CARD_PATH = MODEL_ROOT / "README.md" | |
| DEFAULT_CONFIG_PATH = ROOT_CONFIGS / "coach_summary_lora.default.json" | |
| TRAINING_CONFIG_PATH = MODEL_ROOT / "training_config.json" | |
| TRAINING_SUMMARY_PATH = MODEL_ROOT / "training_summary.json" | |
| EVALUATION_PATH = MODEL_ROOT / "evaluation.json" | |
| HF_UPLOAD_PATH = MODEL_ROOT / "hf_upload.json" | |
| MERGE_SUMMARY_PATH = MODEL_ROOT / "merge_summary.json" | |
| HF_MERGED_UPLOAD_PATH = MODEL_ROOT / "hf_merged_upload.json" | |
| DEFAULT_ADAPTER_DIR = MODEL_ROOT / "adapter" | |
| DEFAULT_MERGED_DIR = MODEL_ROOT / "merged_model" | |
| HF_METADATA_FILENAMES = ( | |
| "training_config.json", | |
| "training_summary.json", | |
| "evaluation.json", | |
| "hf_upload.json", | |
| "merge_summary.json", | |
| "hf_merged_upload.json", | |
| ) | |
| HF_DATA_FILENAMES = ( | |
| "coach_summary_train.jsonl", | |
| "coach_summary_eval.jsonl", | |
| "public_fitness_style.jsonl", | |
| ) | |
| TRAINING_GPU = "A100-80GB" | |
| image = ( | |
| modal.Image.from_registry( | |
| "nvidia/cuda:13.0.0-devel-ubuntu22.04", | |
| add_python="3.10", | |
| ) | |
| .apt_install("build-essential", "git", "ninja-build") | |
| .env( | |
| { | |
| "CC": "/usr/bin/gcc", | |
| "CXX": "/usr/bin/g++", | |
| "CUDA_HOME": "/usr/local/cuda", | |
| "MAX_JOBS": "4", | |
| "TORCH_CUDA_ARCH_LIST": "8.0", | |
| } | |
| ) | |
| .pip_install( | |
| "accelerate==1.14.0", | |
| "bitsandbytes>=0.48.0", | |
| "datasets>=2.20.0", | |
| "huggingface-hub>=0.24.0", | |
| "packaging>=24.0", | |
| "peft==0.12.0", | |
| "setuptools>=69.0.0", | |
| "torch==2.11.0", | |
| "transformers==5.12.0", | |
| "wheel>=0.43.0", | |
| ) | |
| .pip_install("causal-conv1d>=1.5.0", extra_options="--no-build-isolation") | |
| .pip_install("mamba-ssm>=2.2.4", extra_options="--no-build-isolation") | |
| .add_local_dir("src", "/root/src", copy=True) | |
| .add_local_dir("data", "/root/data", copy=True) | |
| .add_local_dir("configs", "/root/configs", copy=True) | |
| ) | |
| app = modal.App(APP_NAME, image=image) | |
| data_volume = modal.Volume.from_name( | |
| "pozify-coach-summary-data", create_if_missing=True, version=2 | |
| ) | |
| model_volume = modal.Volume.from_name( | |
| "pozify-coach-summary-models", create_if_missing=True, version=2 | |
| ) | |
| def _load_local_env_vars(filename: str = ".env") -> dict[str, str]: | |
| candidates = ( | |
| Path.cwd() / filename, | |
| Path(__file__).resolve().parents[1] / filename, | |
| ) | |
| values: dict[str, str] = {} | |
| for path in candidates: | |
| if not path.is_file(): | |
| continue | |
| for line in path.read_text(encoding="utf-8").splitlines(): | |
| stripped = line.strip() | |
| if not stripped or stripped.startswith("#") or "=" not in stripped: | |
| continue | |
| key, value = stripped.split("=", 1) | |
| key = key.strip() | |
| if not key: | |
| continue | |
| values[key] = value.strip().strip("'").strip('"') | |
| return values | |
| def _hf_secret() -> modal.Secret: | |
| env_values = _load_local_env_vars() | |
| secret_payload: dict[str, str] = {} | |
| for key in ( | |
| "HF_TOKEN", | |
| HF_REPO_ID_ENV, | |
| HF_MERGED_REPO_ID_ENV, | |
| HF_PRIVATE_ENV, | |
| RUNTIME_MODEL_ENV, | |
| ): | |
| value = os.getenv(key, env_values.get(key)) | |
| if value is not None and str(value).strip(): | |
| secret_payload[key] = str(value).strip() | |
| return modal.Secret.from_dict(secret_payload) | |
| def _write_json(path: Path, payload: Any) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") | |
| def _read_json(path: Path) -> dict[str, Any]: | |
| payload = json.loads(path.read_text(encoding="utf-8")) | |
| if not isinstance(payload, dict): | |
| raise ValueError(f"{path} must contain a JSON object") | |
| return payload | |
| def _read_jsonl(path: Path) -> list[dict[str, Any]]: | |
| if not path.exists(): | |
| return [] | |
| rows: list[dict[str, Any]] = [] | |
| for line in path.read_text(encoding="utf-8").splitlines(): | |
| stripped = line.strip() | |
| if not stripped: | |
| continue | |
| payload = json.loads(stripped) | |
| if not isinstance(payload, dict): | |
| raise ValueError(f"{path} contains a non-object JSONL row") | |
| rows.append(payload) | |
| return rows | |
| def _write_jsonl(path: Path, rows: list[dict[str, Any]]) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| with path.open("w", encoding="utf-8") as handle: | |
| for row in rows: | |
| handle.write(json.dumps(row, ensure_ascii=False)) | |
| handle.write("\n") | |
| def _env_truthy(value: str | None) -> bool: | |
| return value is not None and value.strip().lower() in {"1", "true", "yes", "on"} | |
| def _supports_kwarg(callable_obj: Any, name: str) -> bool: | |
| try: | |
| parameters = inspect.signature(callable_obj).parameters | |
| except (TypeError, ValueError): | |
| return False | |
| return name in parameters | |
| def _filtered_kwargs(callable_obj: Any, kwargs: dict[str, Any]) -> dict[str, Any]: | |
| try: | |
| parameters = inspect.signature(callable_obj).parameters | |
| except (TypeError, ValueError): | |
| return dict(kwargs) | |
| return {key: value for key, value in kwargs.items() if key in parameters} | |
| def _load_config(*, include_saved_training_config: bool = True) -> dict[str, Any]: | |
| config = _read_json(DEFAULT_CONFIG_PATH) | |
| if include_saved_training_config and TRAINING_CONFIG_PATH.exists(): | |
| config.update(_read_json(TRAINING_CONFIG_PATH)) | |
| return config | |
| def _make_generation_config_greedy(model: Any) -> None: | |
| generation_config = getattr(model, "generation_config", None) | |
| if generation_config is None: | |
| return | |
| generation_config.do_sample = False | |
| for name in ("temperature", "top_p", "top_k", "typical_p", "epsilon_cutoff", "eta_cutoff"): | |
| if hasattr(generation_config, name): | |
| setattr(generation_config, name, None) | |
| def _render_messages(messages: list[dict[str, str]]) -> str: | |
| parts: list[str] = [] | |
| for message in messages: | |
| role = message["role"].capitalize() | |
| parts.append(f"{role}: {message['content'].strip()}") | |
| return "\n\n".join(parts) | |
| def _sample_style_rows( | |
| *, | |
| style_rows: list[dict[str, Any]], | |
| train_count: int, | |
| style_weight: float, | |
| ) -> list[dict[str, Any]]: | |
| if not style_rows or style_weight <= 0: | |
| return [] | |
| keep_count = min(len(style_rows), int(round(train_count * style_weight))) | |
| return style_rows[:keep_count] | |
| def _build_training_dataset_rows( | |
| *, | |
| train_rows: list[dict[str, Any]], | |
| style_rows: list[dict[str, Any]], | |
| style_weight: float, | |
| ) -> list[dict[str, str]]: | |
| selected_style_rows = _sample_style_rows( | |
| style_rows=style_rows, | |
| train_count=len(train_rows), | |
| style_weight=style_weight, | |
| ) | |
| merged = [*train_rows, *selected_style_rows] | |
| return [{"text": _render_messages(row["messages"])} for row in merged] | |
| def _build_eval_dataset_rows(rows: list[dict[str, Any]]) -> list[dict[str, str]]: | |
| return [{"text": _render_messages(row["messages"])} for row in rows] | |
| def _model_card_text( | |
| *, | |
| repo_id: str, | |
| config: dict[str, Any], | |
| training_summary: dict[str, Any] | None, | |
| evaluation: dict[str, Any] | None, | |
| ) -> str: | |
| lines = [ | |
| f"# {repo_id}", | |
| "", | |
| "LoRA adapter for Pozify grounded coach-summary generation.", | |
| "", | |
| "## Base Model", | |
| "", | |
| f"- `{config.get('base_model', 'unknown')}`", | |
| "", | |
| "## Training Data", | |
| "", | |
| f"- Train file: `{config.get('train_file')}`", | |
| f"- Eval file: `{config.get('eval_file')}`", | |
| f"- Style file: `{config.get('style_file', SFT_ROOT / 'public_fitness_style.jsonl')}`", | |
| "", | |
| "## Objective", | |
| "", | |
| "Generate grounded `coach_summary.json` outputs from Pozify structured evidence and knowledge cards.", | |
| "", | |
| ] | |
| if training_summary: | |
| lines.extend( | |
| [ | |
| "## Training Summary", | |
| "", | |
| f"- Train rows: `{training_summary.get('train_row_count')}`", | |
| f"- Eval rows: `{training_summary.get('eval_row_count')}`", | |
| f"- Style rows mixed in: `{training_summary.get('style_row_count')}`", | |
| f"- Output dir: `{training_summary.get('output_dir')}`", | |
| "", | |
| ] | |
| ) | |
| if evaluation: | |
| lines.extend( | |
| [ | |
| "## Evaluation", | |
| "", | |
| f"- JSON validity rate: `{evaluation.get('json_validity_rate')}`", | |
| f"- Verifier pass rate: `{evaluation.get('verifier_pass_rate')}`", | |
| f"- Section completeness rate: `{evaluation.get('section_completeness_rate')}`", | |
| "", | |
| ] | |
| ) | |
| return "\n".join(lines) | |
| def _resolve_repo_id( | |
| api: Any, | |
| repo_id: str | None, | |
| *, | |
| env_names: tuple[str, ...] = (HF_REPO_ID_ENV,), | |
| ) -> str: | |
| if repo_id: | |
| return repo_id | |
| for env_name in env_names: | |
| configured = os.getenv(env_name) | |
| if configured: | |
| return configured | |
| try: | |
| whoami = api.whoami() | |
| if isinstance(whoami, dict): | |
| username = whoami.get("name") or whoami.get("fullname") | |
| if isinstance(username, str) and username.strip(): | |
| return f"{username.strip()}/{DEFAULT_HF_REPO_NAME}" | |
| except Exception: | |
| pass | |
| return DEFAULT_HF_REPO_NAME | |
| def _verifier_inputs_from_evidence(payload: dict[str, Any]) -> tuple[Any, Any, Any, Any, Any]: | |
| sys.path.insert(0, "/root/src") | |
| from pozify.contracts import ( | |
| ExerciseClassification, | |
| IssueMarker, | |
| IssueMarkers, | |
| Rep, | |
| RepAnalysis, | |
| RepAnalysisItem, | |
| Reps, | |
| Variation, | |
| ) | |
| classification_payload = payload["exercise_classification"] | |
| variation_payload = payload["variation"] | |
| rep_summary_payload = payload["rep_summary"] | |
| issue_summary_payload = payload["issue_summary"] | |
| classification = ExerciseClassification( | |
| exercise=str(classification_payload["exercise"]), | |
| confidence=float(classification_payload["confidence"]), | |
| window_predictions=[], | |
| fallback_required=bool(classification_payload.get("fallback_required", False)), | |
| ) | |
| variation = Variation( | |
| exercise=str(variation_payload["exercise"]), | |
| detected_variation=str(variation_payload["detected_variation"]), | |
| variation_confidence=float(variation_payload["variation_confidence"]), | |
| not_issues=[str(item) for item in variation_payload.get("not_issues", [])], | |
| ) | |
| rep_metrics = rep_summary_payload.get("rep_metrics", []) | |
| reps = Reps( | |
| exercise=str(classification.exercise), | |
| reps=[ | |
| Rep( | |
| rep_id=int(item.get("rep_id", index + 1)), | |
| start_frame=0, | |
| mid_frame=0, | |
| end_frame=0, | |
| start_sec=0.0, | |
| mid_sec=0.0, | |
| end_sec=float(item.get("duration_sec", 0.0)), | |
| ) | |
| for index, item in enumerate(rep_metrics) | |
| ], | |
| partial_reps=[], | |
| ) | |
| analysis = RepAnalysis( | |
| exercise=str(classification.exercise), | |
| items=[ | |
| RepAnalysisItem( | |
| rep_id=int(item.get("rep_id", index + 1)), | |
| duration_sec=float(item.get("duration_sec", 0.0)), | |
| range_of_motion_score=float(item.get("range_of_motion_score", 0.0)), | |
| stability_score=float(item.get("stability_score", 0.0)), | |
| symmetry_score=float(item.get("symmetry_score", 0.0)), | |
| metrics=dict(item.get("metrics", {})), | |
| variation_hints=[str(value) for value in item.get("variation_hints", [])], | |
| ) | |
| for index, item in enumerate(rep_metrics) | |
| ], | |
| aggregate_metrics=dict(rep_summary_payload.get("aggregate_metrics", {})), | |
| ) | |
| issues = IssueMarkers( | |
| issues=[ | |
| IssueMarker( | |
| rep_id=int(item.get("rep_id", 0)), | |
| issue=str(item["issue"]), | |
| severity=float(item.get("severity", 0.0)), | |
| start_frame=int(item.get("start_frame", 0)), | |
| end_frame=int(item.get("end_frame", 0)), | |
| start_sec=float(item.get("start_sec", 0.0)), | |
| end_sec=float(item.get("end_sec", 0.0)), | |
| affected_joints=[str(value) for value in item.get("affected_joints", [])], | |
| evidence=dict(item.get("evidence", {})), | |
| ) | |
| for item in issue_summary_payload.get("issues", []) | |
| ] | |
| ) | |
| return classification, variation, reps, analysis, issues | |
| def _generate_json_only( | |
| *, | |
| model: Any, | |
| tokenizer: Any, | |
| prompt: str, | |
| max_new_tokens: int, | |
| ) -> str: | |
| import torch | |
| inputs = tokenizer(prompt, return_tensors="pt").to(model.device) | |
| with torch.no_grad(): | |
| outputs = model.generate( | |
| **inputs, | |
| max_new_tokens=max_new_tokens, | |
| do_sample=False, | |
| temperature=0.0, | |
| ) | |
| generated = outputs[0][inputs["input_ids"].shape[1] :] | |
| return tokenizer.decode(generated, skip_special_tokens=True).strip() | |
| def prepare_data() -> dict[str, Any]: | |
| SFT_ROOT.mkdir(parents=True, exist_ok=True) | |
| copied: list[str] = [] | |
| counts: dict[str, int] = {} | |
| for filename in HF_DATA_FILENAMES: | |
| source = ROOT_DATA / "sft" / filename | |
| if not source.exists(): | |
| continue | |
| target = SFT_ROOT / filename | |
| shutil.copyfile(source, target) | |
| copied.append(filename) | |
| counts[filename] = len(_read_jsonl(target)) | |
| summary = { | |
| "ok": bool(copied), | |
| "copied_files": copied, | |
| "row_counts": counts, | |
| "sft_root": str(SFT_ROOT), | |
| } | |
| _write_json(DATA_ROOT / "prepare_data_summary.json", summary) | |
| data_volume.commit() | |
| return summary | |
| def train( | |
| epochs: int | None = None, | |
| style_weight: float = 0.2, | |
| output_subdir: str = "adapter", | |
| ) -> dict[str, Any]: | |
| os.environ.setdefault("PYTORCH_CUDA_ALLOC_CONF", "expandable_segments:True") | |
| import torch | |
| from datasets import Dataset | |
| from peft import LoraConfig, get_peft_model | |
| from transformers import ( | |
| AutoModelForCausalLM, | |
| AutoTokenizer, | |
| DataCollatorForLanguageModeling, | |
| Trainer, | |
| TrainingArguments, | |
| ) | |
| config = _load_config(include_saved_training_config=False) | |
| if epochs is not None: | |
| config["num_train_epochs"] = epochs | |
| config["style_weight"] = style_weight | |
| config["style_file"] = str(SFT_ROOT / "public_fitness_style.jsonl") | |
| train_rows = _read_jsonl(SFT_ROOT / "coach_summary_train.jsonl") | |
| eval_rows = _read_jsonl(SFT_ROOT / "coach_summary_eval.jsonl") | |
| style_rows = _read_jsonl(SFT_ROOT / "public_fitness_style.jsonl") | |
| if not train_rows or not eval_rows: | |
| result = { | |
| "ok": False, | |
| "error": "Missing SFT train/eval rows. Run prepare_data first.", | |
| } | |
| _write_json(TRAINING_SUMMARY_PATH, result) | |
| model_volume.commit() | |
| return result | |
| training_rows = _build_training_dataset_rows( | |
| train_rows=train_rows, | |
| style_rows=style_rows, | |
| style_weight=style_weight, | |
| ) | |
| eval_dataset_rows = _build_eval_dataset_rows(eval_rows) | |
| tokenizer = AutoTokenizer.from_pretrained(str(config["base_model"])) | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| tokenizer.padding_side = "right" | |
| max_seq_length = int(config.get("max_seq_length", 2048)) | |
| def tokenize_rows(rows: list[dict[str, str]]) -> tuple[list[dict[str, list[int]]], dict[str, Any]]: | |
| tokenized_rows: list[dict[str, list[int]]] = [] | |
| lengths: list[int] = [] | |
| truncated_count = 0 | |
| head_tokens = min(256, max_seq_length // 4) | |
| tail_tokens = max_seq_length - head_tokens | |
| for row in rows: | |
| input_ids = tokenizer( | |
| str(row["text"]), | |
| add_special_tokens=False, | |
| truncation=False, | |
| )["input_ids"] | |
| if tokenizer.eos_token_id is not None: | |
| input_ids = [*input_ids, int(tokenizer.eos_token_id)] | |
| lengths.append(len(input_ids)) | |
| if len(input_ids) > max_seq_length: | |
| truncated_count += 1 | |
| input_ids = [*input_ids[:head_tokens], *input_ids[-tail_tokens:]] | |
| tokenized_rows.append( | |
| { | |
| "input_ids": input_ids, | |
| "attention_mask": [1] * len(input_ids), | |
| } | |
| ) | |
| stats = { | |
| "max_seq_length": max_seq_length, | |
| "truncated_row_count": truncated_count, | |
| "max_input_tokens_before_truncation": max(lengths) if lengths else 0, | |
| "avg_input_tokens_before_truncation": round(sum(lengths) / len(lengths), 2) | |
| if lengths | |
| else 0, | |
| } | |
| return tokenized_rows, stats | |
| tokenized_training_rows, train_token_stats = tokenize_rows(training_rows) | |
| tokenized_eval_rows, eval_token_stats = tokenize_rows(eval_dataset_rows) | |
| train_dataset = Dataset.from_list(tokenized_training_rows) | |
| eval_dataset = Dataset.from_list(tokenized_eval_rows) | |
| model_kwargs: dict[str, Any] = { | |
| "dtype": torch.bfloat16, | |
| "device_map": "auto", | |
| "attn_implementation": "sdpa", | |
| } | |
| try: | |
| model = AutoModelForCausalLM.from_pretrained( | |
| str(config["base_model"]), | |
| **model_kwargs, | |
| ) | |
| except (TypeError, ValueError): | |
| model_kwargs.pop("attn_implementation", None) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| str(config["base_model"]), | |
| **model_kwargs, | |
| ) | |
| model.config.use_cache = False | |
| peft_config = LoraConfig( | |
| r=int(config.get("lora_r", 16)), | |
| lora_alpha=int(config.get("lora_alpha", 32)), | |
| lora_dropout=float(config.get("lora_dropout", 0.05)), | |
| bias="none", | |
| task_type="CAUSAL_LM", | |
| target_modules=[ | |
| "q_proj", | |
| "k_proj", | |
| "v_proj", | |
| "o_proj", | |
| "gate_proj", | |
| "up_proj", | |
| "down_proj", | |
| ], | |
| ) | |
| model = get_peft_model(model, peft_config) | |
| adapter_dir = MODEL_ROOT / output_subdir | |
| if adapter_dir.exists(): | |
| shutil.rmtree(adapter_dir) | |
| adapter_dir.mkdir(parents=True, exist_ok=True) | |
| training_args_kwargs = { | |
| "output_dir": str(adapter_dir), | |
| "learning_rate": float(config["learning_rate"]), | |
| "num_train_epochs": float(config["num_train_epochs"]), | |
| "per_device_train_batch_size": int(config["per_device_train_batch_size"]), | |
| "per_device_eval_batch_size": 1, | |
| "gradient_accumulation_steps": int(config["gradient_accumulation_steps"]), | |
| "save_strategy": "epoch", | |
| "logging_steps": 10, | |
| "bf16": True, | |
| "gradient_checkpointing": True, | |
| "remove_unused_columns": False, | |
| "prediction_loss_only": True, | |
| "optim": "paged_adamw_8bit", | |
| "report_to": [], | |
| } | |
| if _supports_kwarg(TrainingArguments.__init__, "eval_strategy"): | |
| training_args_kwargs["eval_strategy"] = "epoch" | |
| elif _supports_kwarg(TrainingArguments.__init__, "evaluation_strategy"): | |
| training_args_kwargs["evaluation_strategy"] = "epoch" | |
| if _supports_kwarg(TrainingArguments.__init__, "gradient_checkpointing_kwargs"): | |
| training_args_kwargs["gradient_checkpointing_kwargs"] = {"use_reentrant": False} | |
| data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False) | |
| trainer = Trainer( | |
| model=model, | |
| train_dataset=train_dataset, | |
| eval_dataset=eval_dataset, | |
| args=TrainingArguments( | |
| **_filtered_kwargs(TrainingArguments.__init__, training_args_kwargs) | |
| ), | |
| data_collator=data_collator, | |
| ) | |
| train_result = trainer.train() | |
| trainer.save_model(str(adapter_dir)) | |
| tokenizer.save_pretrained(str(adapter_dir)) | |
| summary = { | |
| "ok": True, | |
| "base_model": config["base_model"], | |
| "adapter_dir": str(adapter_dir), | |
| "train_row_count": len(train_rows), | |
| "eval_row_count": len(eval_rows), | |
| "style_row_count": len(training_rows) - len(train_rows), | |
| "merged_train_row_count": len(training_rows), | |
| "style_weight": style_weight, | |
| "epochs": config["num_train_epochs"], | |
| "global_step": int(getattr(train_result, "global_step", 0)), | |
| "training_loss": float(getattr(train_result, "training_loss", 0.0)), | |
| "output_dir": str(adapter_dir), | |
| "train_token_stats": train_token_stats, | |
| "eval_token_stats": eval_token_stats, | |
| } | |
| _write_json(TRAINING_CONFIG_PATH, config) | |
| _write_json(TRAINING_SUMMARY_PATH, summary) | |
| model_volume.commit() | |
| return summary | |
| def evaluate( | |
| adapter_subdir: str = "adapter", | |
| limit: int | None = None, | |
| ) -> dict[str, Any]: | |
| import torch | |
| from peft import PeftModel | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| sys.path.insert(0, "/root/src") | |
| from pozify.steps import verifier | |
| from pozify.steps.coach_summary import _extract_json_object, _summary_from_payload | |
| config = _load_config() | |
| adapter_dir = MODEL_ROOT / adapter_subdir | |
| eval_rows = _read_jsonl(SFT_ROOT / "coach_summary_eval.jsonl") | |
| if limit is not None: | |
| eval_rows = eval_rows[:limit] | |
| if not adapter_dir.exists(): | |
| result = {"ok": False, "error": f"Adapter dir not found: {adapter_dir}"} | |
| _write_json(EVALUATION_PATH, result) | |
| model_volume.commit() | |
| return result | |
| tokenizer = AutoTokenizer.from_pretrained(str(config["base_model"])) | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| base_model = AutoModelForCausalLM.from_pretrained( | |
| str(config["base_model"]), | |
| dtype=torch.bfloat16, | |
| device_map="auto", | |
| ) | |
| model = PeftModel.from_pretrained(base_model, str(adapter_dir)) | |
| json_valid_count = 0 | |
| verifier_pass_count = 0 | |
| section_complete_count = 0 | |
| evaluated = 0 | |
| failures: list[dict[str, Any]] = [] | |
| required_sections = { | |
| "summary", | |
| "what_you_did", | |
| "what_looked_good", | |
| "what_changed_across_reps", | |
| "valid_variation_vs_issue", | |
| "top_fixes", | |
| "next_session_plan", | |
| "confidence_notes", | |
| } | |
| for index, row in enumerate(eval_rows): | |
| evaluated += 1 | |
| prompt = _render_messages(row["messages"][:2]) | |
| try: | |
| generated_text = _generate_json_only( | |
| model=model, | |
| tokenizer=tokenizer, | |
| prompt=prompt, | |
| max_new_tokens=int(config.get("max_new_tokens", 700)), | |
| ) | |
| payload = _extract_json_object(generated_text) | |
| json_valid_count += 1 | |
| if required_sections <= payload.keys(): | |
| section_complete_count += 1 | |
| summary = _summary_from_payload(payload) | |
| evidence_payload = json.loads(row["messages"][1]["content"]) | |
| classification, variation, reps, analysis, issues = _verifier_inputs_from_evidence( | |
| evidence_payload | |
| ) | |
| verification = verifier.run( | |
| summary, | |
| issues, | |
| variation, | |
| classification=classification, | |
| analysis=analysis, | |
| reps=reps, | |
| ) | |
| if verification.passed: | |
| verifier_pass_count += 1 | |
| else: | |
| failures.append( | |
| { | |
| "index": index, | |
| "reason": "verifier_failed", | |
| "notes": verification.notes, | |
| } | |
| ) | |
| except Exception as exc: # pragma: no cover - runtime failure path | |
| failures.append({"index": index, "reason": repr(exc)}) | |
| result = { | |
| "ok": True, | |
| "adapter_dir": str(adapter_dir), | |
| "evaluated_count": evaluated, | |
| "json_valid_count": json_valid_count, | |
| "json_validity_rate": round(json_valid_count / evaluated, 4) if evaluated else 0.0, | |
| "verifier_pass_count": verifier_pass_count, | |
| "verifier_pass_rate": round(verifier_pass_count / evaluated, 4) if evaluated else 0.0, | |
| "section_completeness_rate": round(section_complete_count / evaluated, 4) | |
| if evaluated | |
| else 0.0, | |
| "failure_count": len(failures), | |
| "failures": failures[:20], | |
| } | |
| _write_json(EVALUATION_PATH, result) | |
| model_volume.commit() | |
| return result | |
| def _upload_hf_file( | |
| api: Any, | |
| *, | |
| repo_id: str, | |
| local_path: Path, | |
| path_in_repo: str, | |
| ) -> dict[str, Any]: | |
| if not local_path.exists(): | |
| return { | |
| "path": str(local_path), | |
| "path_in_repo": path_in_repo, | |
| "uploaded": False, | |
| "reason": "missing", | |
| } | |
| api.upload_file( | |
| repo_id=repo_id, | |
| repo_type="model", | |
| path_or_fileobj=str(local_path), | |
| path_in_repo=path_in_repo, | |
| ) | |
| return { | |
| "path": str(local_path), | |
| "path_in_repo": path_in_repo, | |
| "uploaded": True, | |
| } | |
| def merge( | |
| adapter_subdir: str = "adapter", | |
| merged_subdir: str = "merged_model", | |
| ) -> dict[str, Any]: | |
| import torch | |
| from peft import PeftModel | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| config = _load_config() | |
| adapter_dir = MODEL_ROOT / adapter_subdir | |
| merged_dir = MODEL_ROOT / merged_subdir | |
| if not adapter_dir.exists(): | |
| result = {"ok": False, "error": f"Adapter dir not found: {adapter_dir}"} | |
| _write_json(MERGE_SUMMARY_PATH, result) | |
| model_volume.commit() | |
| return result | |
| if merged_dir.exists(): | |
| shutil.rmtree(merged_dir) | |
| merged_dir.mkdir(parents=True, exist_ok=True) | |
| tokenizer = AutoTokenizer.from_pretrained(str(adapter_dir)) | |
| base_model = AutoModelForCausalLM.from_pretrained( | |
| str(config["base_model"]), | |
| dtype=torch.bfloat16, | |
| device_map="auto", | |
| low_cpu_mem_usage=True, | |
| ) | |
| model = PeftModel.from_pretrained(base_model, str(adapter_dir)) | |
| merged_model = model.merge_and_unload() | |
| _make_generation_config_greedy(merged_model) | |
| merged_model.save_pretrained(str(merged_dir), safe_serialization=True) | |
| tokenizer.save_pretrained(str(merged_dir)) | |
| result = { | |
| "ok": True, | |
| "base_model": config["base_model"], | |
| "adapter_dir": str(adapter_dir), | |
| "merged_dir": str(merged_dir), | |
| "dtype": "bfloat16", | |
| } | |
| _write_json(MERGE_SUMMARY_PATH, result) | |
| model_volume.commit() | |
| return result | |
| def publish_to_hf( | |
| repo_id: str | None = None, | |
| private: bool | None = None, | |
| adapter_subdir: str = "adapter", | |
| ) -> dict[str, Any]: | |
| from huggingface_hub import HfApi | |
| from huggingface_hub.errors import HfHubHTTPError | |
| private = _env_truthy(os.getenv(HF_PRIVATE_ENV)) if private is None else private | |
| if not os.getenv("HF_TOKEN"): | |
| return { | |
| "ok": False, | |
| "error": "HF_TOKEN is required in the Modal environment or local .env", | |
| "repo_id": repo_id or os.getenv(HF_REPO_ID_ENV) or DEFAULT_HF_REPO_NAME, | |
| } | |
| config = _read_json(TRAINING_CONFIG_PATH) if TRAINING_CONFIG_PATH.exists() else {} | |
| training_summary = _read_json(TRAINING_SUMMARY_PATH) if TRAINING_SUMMARY_PATH.exists() else None | |
| evaluation = _read_json(EVALUATION_PATH) if EVALUATION_PATH.exists() else None | |
| api = HfApi() | |
| repo_id = _resolve_repo_id(api, repo_id, env_names=(HF_REPO_ID_ENV,)) | |
| MODEL_CARD_PATH.write_text( | |
| _model_card_text( | |
| repo_id=repo_id, | |
| config=config, | |
| training_summary=training_summary, | |
| evaluation=evaluation, | |
| ), | |
| encoding="utf-8", | |
| ) | |
| try: | |
| api.create_repo(repo_id=repo_id, repo_type="model", private=private, exist_ok=True) | |
| except HfHubHTTPError as exc: | |
| message = str(exc) | |
| guidance = ( | |
| "Publish failed while creating or accessing the Hugging Face model repo. " | |
| "If your token does not have org-level write access, publish to a personal repo id " | |
| "such as `<your-username>/pozify-coach-summary`, or set " | |
| f"`{HF_REPO_ID_ENV}` in `.env` to a repo you control." | |
| ) | |
| return { | |
| "ok": False, | |
| "repo_id": repo_id, | |
| "private": private, | |
| "error": message, | |
| "guidance": guidance, | |
| } | |
| uploads = [ | |
| _upload_hf_file( | |
| api, | |
| repo_id=repo_id, | |
| local_path=MODEL_CARD_PATH, | |
| path_in_repo="README.md", | |
| ) | |
| ] | |
| adapter_dir = MODEL_ROOT / adapter_subdir | |
| if adapter_dir.exists(): | |
| api.upload_folder( | |
| repo_id=repo_id, | |
| repo_type="model", | |
| folder_path=str(adapter_dir), | |
| path_in_repo="adapter", | |
| ) | |
| uploads.append( | |
| { | |
| "path": str(adapter_dir), | |
| "path_in_repo": "adapter/", | |
| "uploaded": True, | |
| } | |
| ) | |
| else: | |
| uploads.append( | |
| { | |
| "path": str(adapter_dir), | |
| "path_in_repo": "adapter/", | |
| "uploaded": False, | |
| "reason": "missing", | |
| } | |
| ) | |
| uploads.extend( | |
| _upload_hf_file( | |
| api, | |
| repo_id=repo_id, | |
| local_path=MODEL_ROOT / filename, | |
| path_in_repo=filename, | |
| ) | |
| for filename in HF_METADATA_FILENAMES | |
| ) | |
| result = { | |
| "ok": any(item["uploaded"] for item in uploads), | |
| "repo_id": repo_id, | |
| "private": private, | |
| "uploads": uploads, | |
| } | |
| _write_json(HF_UPLOAD_PATH, result) | |
| model_volume.commit() | |
| return result | |
| def publish_merged_to_hf( | |
| repo_id: str | None = None, | |
| private: bool | None = None, | |
| merged_subdir: str = "merged_model", | |
| ) -> dict[str, Any]: | |
| from huggingface_hub import HfApi | |
| from huggingface_hub.errors import HfHubHTTPError | |
| private = _env_truthy(os.getenv(HF_PRIVATE_ENV)) if private is None else private | |
| resolved_repo_hint = ( | |
| repo_id | |
| or os.getenv(RUNTIME_MODEL_ENV) | |
| or os.getenv(HF_MERGED_REPO_ID_ENV) | |
| or DEFAULT_HF_REPO_NAME | |
| ) | |
| if not os.getenv("HF_TOKEN"): | |
| return { | |
| "ok": False, | |
| "error": "HF_TOKEN is required in the Modal environment or local .env", | |
| "repo_id": resolved_repo_hint, | |
| } | |
| config = _read_json(TRAINING_CONFIG_PATH) if TRAINING_CONFIG_PATH.exists() else {} | |
| training_summary = _read_json(TRAINING_SUMMARY_PATH) if TRAINING_SUMMARY_PATH.exists() else None | |
| evaluation = _read_json(EVALUATION_PATH) if EVALUATION_PATH.exists() else None | |
| merge_summary = _read_json(MERGE_SUMMARY_PATH) if MERGE_SUMMARY_PATH.exists() else None | |
| merged_dir = MODEL_ROOT / merged_subdir | |
| if not merged_dir.exists(): | |
| result = { | |
| "ok": False, | |
| "error": f"Merged model dir not found: {merged_dir}", | |
| "repo_id": resolved_repo_hint, | |
| } | |
| _write_json(HF_MERGED_UPLOAD_PATH, result) | |
| model_volume.commit() | |
| return result | |
| api = HfApi() | |
| repo_id = _resolve_repo_id( | |
| api, | |
| repo_id, | |
| env_names=(RUNTIME_MODEL_ENV, HF_MERGED_REPO_ID_ENV, HF_REPO_ID_ENV), | |
| ) | |
| MODEL_CARD_PATH.write_text( | |
| _model_card_text( | |
| repo_id=repo_id, | |
| config=config, | |
| training_summary=training_summary, | |
| evaluation=evaluation, | |
| ) | |
| + "\n## Packaging\n\n- Published as a merged, inference-ready Transformers checkpoint.\n", | |
| encoding="utf-8", | |
| ) | |
| try: | |
| api.create_repo(repo_id=repo_id, repo_type="model", private=private, exist_ok=True) | |
| except HfHubHTTPError as exc: | |
| message = str(exc) | |
| guidance = ( | |
| "Publish failed while creating or accessing the merged Hugging Face model repo. " | |
| "Set `POZIFY_COACH_SUMMARY_MODEL` or pass `--repo-id <your-username>/pozify-coach-summary` " | |
| "to publish to a repo your token can write to." | |
| ) | |
| result = { | |
| "ok": False, | |
| "repo_id": repo_id, | |
| "private": private, | |
| "error": message, | |
| "guidance": guidance, | |
| } | |
| _write_json(HF_MERGED_UPLOAD_PATH, result) | |
| model_volume.commit() | |
| return result | |
| api.upload_folder( | |
| repo_id=repo_id, | |
| repo_type="model", | |
| folder_path=str(merged_dir), | |
| ) | |
| uploads = [ | |
| { | |
| "path": str(merged_dir), | |
| "path_in_repo": "./", | |
| "uploaded": True, | |
| }, | |
| _upload_hf_file( | |
| api, | |
| repo_id=repo_id, | |
| local_path=MODEL_CARD_PATH, | |
| path_in_repo="README.md", | |
| ), | |
| ] | |
| uploads.extend( | |
| _upload_hf_file( | |
| api, | |
| repo_id=repo_id, | |
| local_path=MODEL_ROOT / filename, | |
| path_in_repo=filename, | |
| ) | |
| for filename in ( | |
| "training_config.json", | |
| "training_summary.json", | |
| "evaluation.json", | |
| "merge_summary.json", | |
| ) | |
| ) | |
| result = { | |
| "ok": True, | |
| "repo_id": repo_id, | |
| "private": private, | |
| "merge_summary": merge_summary, | |
| "uploads": uploads, | |
| } | |
| _write_json(HF_MERGED_UPLOAD_PATH, result) | |
| model_volume.commit() | |
| return result | |
| def main( | |
| stage: str = "evaluate", | |
| epochs: int | None = None, | |
| style_weight: float = 0.2, | |
| limit: int | None = None, | |
| repo_id: str | None = None, | |
| private: bool | None = None, | |
| ) -> None: | |
| if stage == "prepare-data": | |
| print(prepare_data.remote()) | |
| elif stage == "train": | |
| print(train.remote(epochs=epochs, style_weight=style_weight)) | |
| elif stage == "evaluate": | |
| print(evaluate.remote(limit=limit)) | |
| elif stage == "merge": | |
| print(merge.remote()) | |
| elif stage == "publish": | |
| print(publish_to_hf.remote(repo_id=repo_id, private=private)) | |
| elif stage == "publish-merged": | |
| print(publish_merged_to_hf.remote(repo_id=repo_id, private=private)) | |
| elif stage == "all": | |
| print(prepare_data.remote()) | |
| print(train.remote(epochs=epochs, style_weight=style_weight)) | |
| print(evaluate.remote(limit=limit)) | |
| print(merge.remote()) | |
| print(publish_merged_to_hf.remote(repo_id=repo_id, private=private)) | |
| else: | |
| raise ValueError( | |
| "stage must be one of: prepare-data, train, evaluate, merge, publish, publish-merged, all" | |
| ) | |