| """Human-in-the-loop Space helpers for staging training artifacts.""" |
|
|
| from __future__ import annotations |
|
|
| import json |
| import os |
| from datetime import UTC, datetime |
| from pathlib import Path |
| from typing import Any |
| from uuid import uuid4 |
|
|
| from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator |
|
|
| from maris_core.data.quality import ( |
| DatasetQualityGateConfig, |
| apply_quality_gate_to_records, |
| build_dataset_quality_report, |
| ) |
| from maris_core.training.preferences import ( |
| PreferenceExample, |
| build_blind_side_by_side_artifact, |
| build_human_eval_summary, |
| summarize_preference_dataset, |
| ) |
| from maris_core.training.space_ui import SAFE_OUTPUT_SEGMENT_RE |
| from maris_core.utils.env import validate_hf_model, validate_hf_repo_id |
|
|
| HUMAN_TRAINING_STAGE_DIRNAME = "human-training-staging" |
| HUMAN_TRAINING_REPO_PREFIX = "human-training" |
| HUMAN_TRAINING_MIN_TEXT_CHARS = 24 |
| HUMAN_TRAINING_BLIND_REVIEW_SEED = 7 |
|
|
|
|
| def _timestamp() -> str: |
| return datetime.now(UTC).replace(microsecond=0).isoformat() |
|
|
|
|
| def _new_run_id() -> str: |
| timestamp = datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ") |
| return f"{timestamp}-{uuid4().hex[:8]}" |
|
|
|
|
| def _validate_output_subdir(value: str) -> str: |
| normalized = value.strip().strip("/") |
| if not normalized: |
| raise ValueError("Output apakšdirektorija nedrīkst būt tukša.") |
| parts = Path(normalized).parts |
| if ".." in parts or not SAFE_OUTPUT_SEGMENT_RE.fullmatch(normalized): |
| raise ValueError("Output apakšdirektorijā drīkst būt tikai droši ceļa segmenti.") |
| return normalized |
|
|
|
|
| def _normalize_lines(values: list[str]) -> list[str]: |
| normalized: list[str] = [] |
| seen: set[str] = set() |
| for value in values: |
| text = str(value or "").strip() |
| if not text: |
| continue |
| signature = text.casefold() |
| if signature in seen: |
| continue |
| seen.add(signature) |
| normalized.append(text) |
| return normalized |
|
|
|
|
| class HumanTrainingConversationExample(BaseModel): |
| model_config = ConfigDict(str_strip_whitespace=True) |
|
|
| user: str = Field(min_length=1) |
| assistant: str = Field(min_length=1) |
| context: str = "" |
|
|
|
|
| class HumanTrainingPreferencePair(BaseModel): |
| model_config = ConfigDict(str_strip_whitespace=True) |
|
|
| prompt: str = Field(min_length=1) |
| chosen: str = Field(min_length=1) |
| rejected: str = Field(min_length=1) |
| context: str = "" |
| confidence: float | None = Field(default=None, ge=0.0, le=1.0) |
|
|
|
|
| class HumanTrainingEvalExample(BaseModel): |
| model_config = ConfigDict(str_strip_whitespace=True) |
|
|
| prompt: str = Field(min_length=1) |
| completion: str = Field(min_length=1) |
| context: str = "" |
|
|
|
|
| class HumanTrainingRequest(BaseModel): |
| """Request payload for building staged human-training artifacts.""" |
|
|
| model_config = ConfigDict(str_strip_whitespace=True) |
|
|
| dataset_repo: str |
| model_repo: str = "" |
| hub_model_id: str = "" |
| model_preset: str = "balanced" |
| model_name: str = "" |
| num_epochs: int = Field(default=3, ge=1, le=100) |
| all_branches: bool = False |
| push_to_hub: bool = True |
| output_subdir: str = "maris-human-training" |
| continue_from_latest_artifact: bool = True |
| continue_model_path: str = "" |
| profile_facts: list[str] = Field(default_factory=list) |
| profile_preferences: list[str] = Field(default_factory=list) |
| response_instructions: list[str] = Field(default_factory=list) |
| conversation_examples: list[HumanTrainingConversationExample] = Field(default_factory=list) |
| preference_pairs: list[HumanTrainingPreferencePair] = Field(default_factory=list) |
| eval_examples: list[HumanTrainingEvalExample] = Field(default_factory=list) |
|
|
| @field_validator("dataset_repo") |
| @classmethod |
| def validate_dataset_repo(cls, value: str) -> str: |
| try: |
| return validate_hf_repo_id(value, "dataset_repo", label="dataset repo") |
| except RuntimeError as exc: |
| raise ValueError(str(exc)) from exc |
|
|
| @field_validator("model_repo") |
| @classmethod |
| def validate_model_repo(cls, value: str) -> str: |
| normalized = value.strip() |
| if not normalized: |
| return "" |
| try: |
| return validate_hf_model(normalized, "model_repo") |
| except RuntimeError as exc: |
| raise ValueError(str(exc)) from exc |
|
|
| @field_validator("hub_model_id") |
| @classmethod |
| def validate_hub_model_id(cls, value: str) -> str: |
| normalized = value.strip() |
| if not normalized: |
| return "" |
| try: |
| return validate_hf_model(normalized, "hub_model_id") |
| except RuntimeError as exc: |
| raise ValueError(str(exc)) from exc |
|
|
| @field_validator("model_name") |
| @classmethod |
| def validate_model_name(cls, value: str) -> str: |
| normalized = value.strip() |
| if not normalized: |
| return "" |
| try: |
| return validate_hf_model(normalized, "model_name") |
| except RuntimeError as exc: |
| raise ValueError(str(exc)) from exc |
|
|
| @field_validator("output_subdir") |
| @classmethod |
| def validate_output_subdir(cls, value: str) -> str: |
| return _validate_output_subdir(value) |
|
|
| @field_validator("continue_model_path") |
| @classmethod |
| def validate_continue_model_path(cls, value: str) -> str: |
| normalized = value.strip() |
| if not normalized: |
| return "" |
| return _validate_output_subdir(normalized) |
|
|
| @field_validator("profile_facts", "profile_preferences", "response_instructions") |
| @classmethod |
| def dedupe_lines(cls, value: list[str]) -> list[str]: |
| return _normalize_lines(value) |
|
|
| @model_validator(mode="after") |
| def validate_has_training_signal(self) -> HumanTrainingRequest: |
| resolved_model_repo = self.hub_model_id or self.model_repo |
| if not resolved_model_repo: |
| raise ValueError("Jānorāda hub_model_id vai model_repo.") |
| self.hub_model_id = resolved_model_repo |
| self.model_repo = resolved_model_repo |
| if not any( |
| ( |
| self.profile_facts, |
| self.profile_preferences, |
| self.response_instructions, |
| self.conversation_examples, |
| self.preference_pairs, |
| self.eval_examples, |
| ) |
| ): |
| raise ValueError("Human training pieprasījumā jābūt vismaz vienam ievades blokam.") |
| if not self.model_name and not self.model_preset: |
| self.model_preset = "balanced" |
| return self |
|
|
|
|
| class HumanTrainingExecuteRequest(BaseModel): |
| model_config = ConfigDict(str_strip_whitespace=True) |
|
|
| run_id: str = Field(min_length=6) |
| publish_artifacts: bool = True |
| start_training: bool = False |
|
|
| @model_validator(mode="after") |
| def validate_execution_flags(self) -> HumanTrainingExecuteRequest: |
| if self.start_training and not self.publish_artifacts: |
| raise ValueError( |
| "Lai sāktu treniņu, artefakti vispirms jāpublicē dataset repozitorijā." |
| ) |
| return self |
|
|
|
|
| class HumanTrainingLaunchSpec(BaseModel): |
| model_config = ConfigDict(str_strip_whitespace=True) |
|
|
| dataset_repo: str |
| model_repo: str |
| hub_model_id: str = "" |
| model_preset: str = "" |
| model_name: str = "" |
| num_epochs: int |
| all_branches: bool |
| push_to_hub: bool |
| output_subdir: str |
| continue_from_latest_artifact: bool = True |
| continue_model_path: str = "" |
|
|
| @model_validator(mode="after") |
| def validate_model_target(self) -> HumanTrainingLaunchSpec: |
| resolved_model_repo = self.hub_model_id or self.model_repo |
| if not resolved_model_repo: |
| raise ValueError("Jānorāda hub_model_id vai model_repo.") |
| self.hub_model_id = resolved_model_repo |
| self.model_repo = resolved_model_repo |
| return self |
|
|
|
|
| def resolve_human_training_stage_dir(persistent_dir: str, run_id: str) -> Path: |
| root = Path(persistent_dir).expanduser().resolve() |
| target = (root / HUMAN_TRAINING_STAGE_DIRNAME / run_id).resolve() |
| if os.path.commonpath([str(root), str(target)]) != str(root): |
| raise ValueError( |
| "Human training staging direktorijai jāatrodas persistent storage ietvaros." |
| ) |
| return target |
|
|
|
|
| def stage_human_training_artifacts( |
| request: HumanTrainingRequest, |
| *, |
| persistent_dir: str, |
| ) -> dict[str, Any]: |
| run_id = _new_run_id() |
| stage_dir = resolve_human_training_stage_dir(persistent_dir, run_id) |
| stage_dir.mkdir(parents=True, exist_ok=True) |
|
|
| train_records = _build_train_records(request) |
| eval_records = _build_eval_records(request) |
| preference_dataset = _build_preference_dataset(request) |
|
|
| quality_config = DatasetQualityGateConfig(min_text_chars=HUMAN_TRAINING_MIN_TEXT_CHARS) |
| filtered_train, train_report = apply_quality_gate_to_records( |
| train_records, |
| split_name="train", |
| config=quality_config, |
| ) |
| filtered_eval, eval_report = ( |
| apply_quality_gate_to_records( |
| eval_records, |
| split_name="eval", |
| config=quality_config, |
| ) |
| if eval_records |
| else ([], None) |
| ) |
| quality_report = build_dataset_quality_report( |
| config=quality_config, |
| train_report=train_report, |
| eval_report=eval_report, |
| ).to_dict() |
|
|
| preference_examples = [PreferenceExample(**item) for item in preference_dataset["preferences"]] |
| preference_summary = ( |
| summarize_preference_dataset(preference_examples) if preference_examples else None |
| ) |
| blind_review = ( |
| build_blind_side_by_side_artifact( |
| preference_examples, |
| seed=HUMAN_TRAINING_BLIND_REVIEW_SEED, |
| ) |
| if preference_examples |
| else None |
| ) |
| human_eval_summary = ( |
| build_human_eval_summary(preference_examples) if preference_examples else None |
| ) |
|
|
| artifact_specs: dict[str, dict[str, Any]] = { |
| "train_dataset": { |
| "repo_path": f"data/{HUMAN_TRAINING_REPO_PREFIX}/{run_id}/train.json", |
| "payload": filtered_train, |
| "record_count": len(filtered_train), |
| }, |
| "dataset_quality_report": { |
| "repo_path": f"artifacts/{HUMAN_TRAINING_REPO_PREFIX}/{run_id}/dataset-quality-report.json", |
| "payload": quality_report, |
| "record_count": quality_report["kept_records"], |
| }, |
| } |
| if filtered_eval: |
| artifact_specs["eval_dataset"] = { |
| "repo_path": f"artifacts/{HUMAN_TRAINING_REPO_PREFIX}/{run_id}/eval.json", |
| "payload": filtered_eval, |
| "record_count": len(filtered_eval), |
| } |
| if preference_dataset["preferences"]: |
| artifact_specs["preference_dataset"] = { |
| "repo_path": f"artifacts/{HUMAN_TRAINING_REPO_PREFIX}/{run_id}/preferences.json", |
| "payload": preference_dataset, |
| "record_count": len(preference_dataset["preferences"]), |
| } |
| if preference_summary is not None: |
| artifact_specs["preference_summary"] = { |
| "repo_path": f"artifacts/{HUMAN_TRAINING_REPO_PREFIX}/{run_id}/preference-summary.json", |
| "payload": preference_summary, |
| "record_count": preference_summary["total_examples"], |
| } |
| if blind_review is not None: |
| artifact_specs["blind_review"] = { |
| "repo_path": f"artifacts/{HUMAN_TRAINING_REPO_PREFIX}/{run_id}/blind-review.json", |
| "payload": blind_review, |
| "record_count": blind_review["total_pairs"], |
| } |
| if human_eval_summary is not None: |
| artifact_specs["human_eval_summary"] = { |
| "repo_path": f"artifacts/{HUMAN_TRAINING_REPO_PREFIX}/{run_id}/human-eval-summary.json", |
| "payload": human_eval_summary, |
| "record_count": human_eval_summary["total_examples"], |
| } |
|
|
| artifacts: dict[str, dict[str, Any]] = {} |
| for name, spec in artifact_specs.items(): |
| local_path = stage_dir / Path(spec["repo_path"]).name |
| _write_json(local_path, spec["payload"]) |
| artifacts[name] = { |
| "local_path": str(local_path), |
| "repo_path": spec["repo_path"], |
| "record_count": spec["record_count"], |
| } |
|
|
| launch_spec = HumanTrainingLaunchSpec( |
| dataset_repo=request.dataset_repo, |
| model_repo=request.model_repo, |
| hub_model_id=request.hub_model_id, |
| model_preset="" if request.model_name else request.model_preset, |
| model_name=request.model_name, |
| num_epochs=request.num_epochs, |
| all_branches=request.all_branches, |
| push_to_hub=request.push_to_hub, |
| output_subdir=request.output_subdir, |
| continue_from_latest_artifact=request.continue_from_latest_artifact, |
| continue_model_path=request.continue_model_path, |
| ) |
| manifest = { |
| "artifact_type": "human-training-manifest", |
| "run_id": run_id, |
| "staged_at": _timestamp(), |
| "stage_dir": str(stage_dir), |
| "dataset_repo": request.dataset_repo, |
| "model_repo": request.model_repo, |
| "input_summary": { |
| "profile_facts": len(request.profile_facts), |
| "profile_preferences": len(request.profile_preferences), |
| "response_instructions": len(request.response_instructions), |
| "conversation_examples": len(request.conversation_examples), |
| "preference_pairs": len(request.preference_pairs), |
| "eval_examples": len(request.eval_examples), |
| }, |
| "quality_report": quality_report, |
| "artifacts": artifacts, |
| "training_request": launch_spec.model_dump(), |
| "ready_for_review": True, |
| "ready_for_training": bool(filtered_train), |
| } |
| _write_json(stage_dir / "manifest.json", manifest) |
| return manifest |
|
|
|
|
| def load_human_training_manifest(persistent_dir: str, run_id: str) -> dict[str, Any]: |
| manifest_path = resolve_human_training_stage_dir(persistent_dir, run_id) / "manifest.json" |
| if not manifest_path.is_file(): |
| raise FileNotFoundError(f"Human training staging ieraksts {run_id} nav atrasts.") |
| return json.loads(manifest_path.read_text(encoding="utf-8")) |
|
|
|
|
| def publish_human_training_artifacts( |
| manifest: dict[str, Any], |
| *, |
| save_file: Any, |
| ) -> list[dict[str, Any]]: |
| published: list[dict[str, Any]] = [] |
| dataset_repo = str(manifest["dataset_repo"]) |
| run_id = str(manifest["run_id"]) |
| for artifact_name, artifact in manifest.get("artifacts", {}).items(): |
| repo_path = artifact.get("repo_path") |
| local_path = artifact.get("local_path") |
| if not isinstance(repo_path, str) or not isinstance(local_path, str): |
| continue |
| content = Path(local_path).read_text(encoding="utf-8") |
| result = save_file( |
| repo_id=dataset_repo, |
| repo_type="dataset", |
| path_in_repo=repo_path, |
| content=content, |
| commit_message=f"Add human training artifacts for {run_id}", |
| ) |
| published.append({"artifact": artifact_name, **result}) |
| return published |
|
|
|
|
| def build_human_training_launch_spec(manifest: dict[str, Any]) -> HumanTrainingLaunchSpec: |
| return HumanTrainingLaunchSpec.model_validate(manifest["training_request"]) |
|
|
|
|
| def _build_train_records(request: HumanTrainingRequest) -> list[dict[str, Any]]: |
| records: list[dict[str, Any]] = [] |
| profile_record = _build_profile_record(request) |
| if profile_record is not None: |
| records.append(profile_record) |
| for example in request.conversation_examples: |
| record = { |
| "user": example.user, |
| "assistant": example.assistant, |
| "metadata": {"source": "human_training_space"}, |
| } |
| if example.context: |
| record["context"] = example.context |
| records.append(record) |
| return records |
|
|
|
|
| def _build_profile_record(request: HumanTrainingRequest) -> dict[str, Any] | None: |
| sections: list[str] = [] |
| if request.profile_facts: |
| sections.append( |
| "Fakti par lietotāju:\n" + "\n".join(f"- {item}" for item in request.profile_facts) |
| ) |
| if request.profile_preferences: |
| sections.append( |
| "Lietotāja preferences:\n" |
| + "\n".join(f"- {item}" for item in request.profile_preferences) |
| ) |
| if request.response_instructions: |
| sections.append( |
| "Atbildēšanas instrukcijas:\n" |
| + "\n".join(f"- {item}" for item in request.response_instructions) |
| ) |
| if not sections: |
| return None |
| return { |
| "prompt": "Iegaumē šo lietotāja profilu un atbildē saskaņā ar to turpmākajās sarunās.", |
| "completion": "\n\n".join(sections), |
| "metadata": { |
| "source": "human_training_space", |
| "artifact": "profile_memory", |
| }, |
| } |
|
|
|
|
| def _build_eval_records(request: HumanTrainingRequest) -> list[dict[str, Any]]: |
| records: list[dict[str, Any]] = [] |
| for example in request.eval_examples: |
| record = { |
| "prompt": example.prompt, |
| "completion": example.completion, |
| "metadata": {"source": "human_training_space", "artifact": "eval"}, |
| } |
| if example.context: |
| record["context"] = example.context |
| records.append(record) |
| return records |
|
|
|
|
| def _build_preference_dataset(request: HumanTrainingRequest) -> dict[str, Any]: |
| preferences: list[dict[str, Any]] = [] |
| for index, pair in enumerate(request.preference_pairs, start=1): |
| item = { |
| "prompt": pair.prompt, |
| "chosen": pair.chosen, |
| "rejected": pair.rejected, |
| "context": pair.context, |
| "source": "human_training_space", |
| "source_type": "real_reviewer", |
| "reviewer_segment": "self-training", |
| "preference_outcome": "chosen", |
| "confidence": pair.confidence, |
| "pair_id": f"human-training-{index:04d}", |
| "blind": True, |
| "production_like": True, |
| "tags": ["human-training"], |
| } |
| preferences.append(item) |
| return { |
| "artifact_type": "human-training-preferences", |
| "preferences": preferences, |
| } |
|
|
|
|
| def _write_json(path: Path, payload: Any) -> None: |
| path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") |
|
|