| """Tests training quality gate integrācijai.""" |
|
|
| from __future__ import annotations |
|
|
| import json |
| import sys |
| import types |
| from pathlib import Path |
|
|
| from maris_core.training.train import train |
|
|
|
|
| def test_train_writes_dataset_quality_and_scoring_reports( |
| tmp_path: Path, |
| monkeypatch, |
| ) -> None: |
| class FakeSplit(list): |
| column_names = ["user", "assistant"] |
|
|
| def map(self, function, **kwargs): |
| del kwargs |
| keys = sorted({key for item in self for key in item}) |
| batch = {key: [item.get(key) for item in self] for key in keys} |
| mapped = function(batch) |
| size = len(next(iter(mapped.values()))) if mapped else 0 |
| return FakeSplit( |
| [{key: value[index] for key, value in mapped.items()} for index in range(size)] |
| ) |
|
|
| fake_dataset = { |
| "train": FakeSplit( |
| [ |
| {"user": "Sveiki, Maris!", "assistant": "Sveiki! Kā varu palīdzēt šodien?"}, |
| {"user": "Sveiki, Maris!", "assistant": "Sveiki! Kā varu palīdzēt šodien?"}, |
| {"user": "Atkārto mani", "assistant": "Atkārto mani"}, |
| { |
| "user": "Pastāsti man par realtime voice assistant arhitektūru.", |
| "assistant": "Varu palīdzēt ar LiveKit, STT, LLM un TTS plūsmu.", |
| }, |
| ] |
| ) |
| } |
|
|
| monkeypatch.setattr("maris_core.training.train.load_hf_dataset", lambda _: fake_dataset) |
|
|
| class FakeTokenizer: |
| pad_token = None |
| eos_token = "<eos>" |
| pad_token_id = None |
| eos_token_id = 1 |
|
|
| @classmethod |
| def from_pretrained(cls, model_name): |
| del model_name |
| return cls() |
|
|
| def __call__(self, texts, **kwargs): |
| del kwargs |
| return { |
| "input_ids": [[index + 1] for index, _ in enumerate(texts)], |
| "attention_mask": [[1] for _ in texts], |
| } |
|
|
| def save_pretrained(self, output_dir): |
| Path(output_dir).mkdir(parents=True, exist_ok=True) |
| Path(output_dir, "tokenizer.json").write_text("{}", encoding="utf-8") |
| Path(output_dir, "tokenizer_config.json").write_text("{}", encoding="utf-8") |
|
|
| class FakeModel: |
| config = types.SimpleNamespace(pad_token_id=None) |
|
|
| @classmethod |
| def from_pretrained(cls, model_name): |
| del model_name |
| return cls() |
|
|
| class FakeTrainingArguments: |
| def __init__(self, **kwargs): |
| self.kwargs = kwargs |
|
|
| class FakeTrainer: |
| last_train_dataset = None |
|
|
| def __init__(self, *, model, args, train_dataset, eval_dataset=None, data_collator=None): |
| del model, args, eval_dataset, data_collator |
| self.train_dataset = train_dataset |
| FakeTrainer.last_train_dataset = train_dataset |
|
|
| def train(self): |
| return types.SimpleNamespace(metrics={"train_loss": 0.1}) |
|
|
| def save_model(self, output_dir): |
| Path(output_dir).mkdir(parents=True, exist_ok=True) |
| Path(output_dir, "config.json").write_text("{}", encoding="utf-8") |
| Path(output_dir, "model.safetensors").write_text("ok", encoding="utf-8") |
|
|
| monkeypatch.setitem( |
| sys.modules, |
| "transformers", |
| types.SimpleNamespace( |
| AutoModelForCausalLM=FakeModel, |
| AutoTokenizer=FakeTokenizer, |
| DataCollatorForLanguageModeling=lambda **kwargs: kwargs, |
| Trainer=FakeTrainer, |
| TrainingArguments=FakeTrainingArguments, |
| ), |
| ) |
|
|
| output_dir = tmp_path / "model" |
| metrics = train(output_dir=str(output_dir), validation_split_ratio=0) |
|
|
| quality_report = json.loads( |
| (output_dir / "dataset-quality-report.json").read_text(encoding="utf-8") |
| ) |
| scoring_report = json.loads( |
| (output_dir / "dataset-scoring-report.json").read_text(encoding="utf-8") |
| ) |
|
|
| assert metrics["quality_train_kept"] == 2.0 |
| assert metrics["quality_train_dropped"] == 2.0 |
| assert metrics["quality_train_duplicates_removed"] == 1.0 |
| assert metrics["scoring_train_average_score"] > 0.0 |
| assert metrics["scoring_train_expanded_records"] == 4.0 |
| assert metrics["scoring_train_repeated_records"] == 2.0 |
| assert quality_report["splits"]["train"]["kept_records"] == 2 |
| assert quality_report["splits"]["train"]["duplicates_removed"] == 1 |
| assert quality_report["splits"]["train"]["reasons"]["invalid_conversation_pair"] == 1 |
| assert scoring_report["splits"]["train"]["expanded_records"] == 4 |
| assert scoring_report["splits"]["train"]["repeated_records"] == 2 |
| assert len(FakeTrainer.last_train_dataset) == 4 |
|
|
|
|
| def test_train_skips_eval_when_quality_gate_filters_all_eval_rows( |
| tmp_path: Path, |
| monkeypatch, |
| ) -> None: |
| class FakeSplit(list): |
| column_names = ["user", "assistant"] |
|
|
| def map(self, function, **kwargs): |
| del kwargs |
| keys = sorted({key for item in self for key in item}) |
| batch = {key: [item.get(key) for item in self] for key in keys} |
| mapped = function(batch) |
| size = len(next(iter(mapped.values()))) if mapped else 0 |
| return FakeSplit( |
| [{key: value[index] for key, value in mapped.items()} for index in range(size)] |
| ) |
|
|
| fake_dataset = { |
| "train": FakeSplit( |
| [ |
| { |
| "user": "Pastāsti par latviešu valodas asistentu arhitektūru.", |
| "assistant": "Varu palīdzēt ar STT, LLM, atmiņu un TTS plūsmu.", |
| }, |
| { |
| "user": "Izskaidro retrieval pipeline.", |
| "assistant": "Retrieval pipeline indeksē, sameklē un pievieno kontekstu atbildei.", |
| }, |
| ] |
| ), |
| "validation": FakeSplit( |
| [ |
| {"user": "test", "assistant": "test"}, |
| {"text": "todo"}, |
| ] |
| ), |
| } |
|
|
| monkeypatch.setattr("maris_core.training.train.load_hf_dataset", lambda _: fake_dataset) |
|
|
| class FakeTokenizer: |
| pad_token = None |
| eos_token = "<eos>" |
| pad_token_id = None |
| eos_token_id = 1 |
|
|
| @classmethod |
| def from_pretrained(cls, model_name): |
| del model_name |
| return cls() |
|
|
| def __call__(self, texts, **kwargs): |
| del kwargs |
| return { |
| "input_ids": [[index + 1] for index, _ in enumerate(texts)], |
| "attention_mask": [[1] for _ in texts], |
| } |
|
|
| def save_pretrained(self, output_dir): |
| Path(output_dir).mkdir(parents=True, exist_ok=True) |
| Path(output_dir, "tokenizer.json").write_text("{}", encoding="utf-8") |
| Path(output_dir, "tokenizer_config.json").write_text("{}", encoding="utf-8") |
|
|
| class FakeModel: |
| config = types.SimpleNamespace(pad_token_id=None) |
|
|
| @classmethod |
| def from_pretrained(cls, model_name): |
| del model_name |
| return cls() |
|
|
| class FakeTrainingArguments: |
| def __init__(self, **kwargs): |
| self.kwargs = kwargs |
|
|
| class FakeTrainer: |
| last_eval_dataset = "unset" |
| last_args = None |
|
|
| def __init__(self, *, model, args, train_dataset, eval_dataset=None, data_collator=None): |
| del model, train_dataset, data_collator |
| self.args = args |
| FakeTrainer.last_args = args |
| FakeTrainer.last_eval_dataset = eval_dataset |
|
|
| def train(self): |
| return types.SimpleNamespace(metrics={"train_loss": 0.2}) |
|
|
| def save_model(self, output_dir): |
| Path(output_dir).mkdir(parents=True, exist_ok=True) |
| Path(output_dir, "config.json").write_text("{}", encoding="utf-8") |
| Path(output_dir, "model.safetensors").write_text("ok", encoding="utf-8") |
|
|
| monkeypatch.setitem( |
| sys.modules, |
| "transformers", |
| types.SimpleNamespace( |
| AutoModelForCausalLM=FakeModel, |
| AutoTokenizer=FakeTokenizer, |
| DataCollatorForLanguageModeling=lambda **kwargs: kwargs, |
| Trainer=FakeTrainer, |
| TrainingArguments=FakeTrainingArguments, |
| ), |
| ) |
|
|
| output_dir = tmp_path / "model" |
| metrics = train(output_dir=str(output_dir), validation_split_ratio=0) |
|
|
| quality_report = json.loads( |
| (output_dir / "dataset-quality-report.json").read_text(encoding="utf-8") |
| ) |
|
|
| assert FakeTrainer.last_eval_dataset is None |
| assert FakeTrainer.last_args.kwargs["evaluation_strategy"] == "no" |
| assert metrics["train_loss"] == 0.2 |
| assert metrics["quality_eval_kept"] == 0.0 |
| assert metrics["quality_eval_dropped"] == 2.0 |
| assert metrics["quality_eval_skipped"] == 1.0 |
| assert quality_report["splits"]["eval"]["kept_records"] == 0 |
| assert quality_report["splits"]["eval"]["reasons"]["invalid_conversation_pair"] == 1 |
| assert quality_report["splits"]["eval"]["reasons"]["placeholder_text"] == 1 |
|
|
|
|
| def test_train_uses_source_aware_and_benchmark_feedback_weighting( |
| tmp_path: Path, |
| monkeypatch, |
| ) -> None: |
| class FakeSplit(list): |
| column_names = ["prompt", "metadata"] |
|
|
| def map(self, function, **kwargs): |
| del kwargs |
| keys = sorted({key for item in self for key in item}) |
| batch = {key: [item.get(key) for item in self] for key in keys} |
| mapped = function(batch) |
| size = len(next(iter(mapped.values()))) if mapped else 0 |
| return FakeSplit( |
| [{key: value[index] for key, value in mapped.items()} for index in range(size)] |
| ) |
|
|
| fake_dataset = { |
| "train": FakeSplit( |
| [ |
| { |
| "prompt": "Analizē sistēmas reasoning kompromisus.", |
| "metadata": {"source_tier": "production", "category": "reasoning"}, |
| }, |
| { |
| "prompt": "Uzraksti īsu kodu piemēru.", |
| "metadata": {"source_tier": "noisy", "category": "coding"}, |
| }, |
| ] |
| ) |
| } |
| benchmark_feedback_path = tmp_path / "previous-benchmark.json" |
| benchmark_feedback_path.write_text( |
| json.dumps( |
| { |
| "score_manifest": { |
| "overall": 0.63, |
| "reasoning": 0.4, |
| "coding": 0.78, |
| } |
| } |
| ), |
| encoding="utf-8", |
| ) |
|
|
| monkeypatch.setattr("maris_core.training.train.load_hf_dataset", lambda _: fake_dataset) |
|
|
| class FakeTokenizer: |
| pad_token = None |
| eos_token = "<eos>" |
| pad_token_id = None |
| eos_token_id = 1 |
|
|
| @classmethod |
| def from_pretrained(cls, model_name): |
| del model_name |
| return cls() |
|
|
| def __call__(self, texts, **kwargs): |
| del kwargs |
| return { |
| "input_ids": [[index + 1] for index, _ in enumerate(texts)], |
| "attention_mask": [[1] for _ in texts], |
| } |
|
|
| def save_pretrained(self, output_dir): |
| Path(output_dir).mkdir(parents=True, exist_ok=True) |
| Path(output_dir, "tokenizer.json").write_text("{}", encoding="utf-8") |
| Path(output_dir, "tokenizer_config.json").write_text("{}", encoding="utf-8") |
|
|
| class FakeModel: |
| config = types.SimpleNamespace(pad_token_id=None) |
|
|
| @classmethod |
| def from_pretrained(cls, model_name): |
| del model_name |
| return cls() |
|
|
| class FakeTrainingArguments: |
| def __init__(self, **kwargs): |
| self.kwargs = kwargs |
|
|
| class FakeTrainer: |
| last_train_dataset = None |
|
|
| def __init__(self, *, model, args, train_dataset, eval_dataset=None, data_collator=None): |
| del model, args, eval_dataset, data_collator |
| self.train_dataset = train_dataset |
| FakeTrainer.last_train_dataset = train_dataset |
|
|
| def train(self): |
| return types.SimpleNamespace(metrics={"train_loss": 0.2}) |
|
|
| def save_model(self, output_dir): |
| Path(output_dir).mkdir(parents=True, exist_ok=True) |
| Path(output_dir, "config.json").write_text("{}", encoding="utf-8") |
| Path(output_dir, "model.safetensors").write_text("ok", encoding="utf-8") |
|
|
| monkeypatch.setitem( |
| sys.modules, |
| "transformers", |
| types.SimpleNamespace( |
| AutoModelForCausalLM=FakeModel, |
| AutoTokenizer=FakeTokenizer, |
| DataCollatorForLanguageModeling=lambda **kwargs: kwargs, |
| Trainer=FakeTrainer, |
| TrainingArguments=FakeTrainingArguments, |
| ), |
| ) |
|
|
| output_dir = tmp_path / "model" |
| metrics = train( |
| output_dir=str(output_dir), |
| validation_split_ratio=0, |
| benchmark_feedback_path=str(benchmark_feedback_path), |
| ) |
| scoring_report = json.loads( |
| (output_dir / "dataset-scoring-report.json").read_text(encoding="utf-8") |
| ) |
| training_metrics = json.loads( |
| (output_dir / "training-metrics.json").read_text(encoding="utf-8") |
| ) |
|
|
| assert metrics["scoring_train_feedback_boosted_records"] >= 1.0 |
| assert metrics["scoring_train_average_repeat_multiplier"] > 1.0 |
| assert scoring_report["splits"]["train"]["source_tiers"]["production"] == 1 |
| assert scoring_report["splits"]["train"]["source_tiers"]["noisy"] == 1 |
| assert scoring_report["splits"]["train"]["feedback_metric_hits"]["reasoning"] >= 1 |
| assert training_metrics["scoring_dashboard"]["train"]["sources"]["production"]["records"] == 1 |
| assert training_metrics["scoring_dashboard"]["train"]["categories"]["reasoning"]["records"] == 1 |
| assert training_metrics["scoring_dashboard_train_sources_production_records"] == 1.0 |
| assert training_metrics["scoring_dashboard_train_categories_reasoning_boosted_records"] >= 1.0 |
| assert training_metrics["benchmark_feedback"]["discovery_mode"] == "explicit" |
| assert len(FakeTrainer.last_train_dataset) > 2 |
|
|
|
|
| def test_train_auto_discovers_previous_benchmark_feedback( |
| tmp_path: Path, |
| monkeypatch, |
| ) -> None: |
| class FakeSplit(list): |
| column_names = ["prompt", "metadata"] |
|
|
| def map(self, function, **kwargs): |
| del kwargs |
| keys = sorted({key for item in self for key in item}) |
| batch = {key: [item.get(key) for item in self] for key in keys} |
| mapped = function(batch) |
| size = len(next(iter(mapped.values()))) if mapped else 0 |
| return FakeSplit( |
| [{key: value[index] for key, value in mapped.items()} for index in range(size)] |
| ) |
|
|
| fake_dataset = { |
| "train": FakeSplit( |
| [ |
| { |
| "prompt": "Analizē sistēmas reasoning kompromisus.", |
| "metadata": {"source_tier": "production", "category": "reasoning"}, |
| } |
| ] |
| ) |
| } |
| previous_run_dir = tmp_path / "runs" / "previous" |
| previous_run_dir.mkdir(parents=True) |
| (previous_run_dir / "benchmark-feedback.json").write_text( |
| json.dumps( |
| { |
| "artifact_type": "benchmark-feedback-reweighting", |
| "artifact_path": str(previous_run_dir / "benchmark-feedback.json"), |
| "overall_multiplier": 1.2, |
| "deficient_metrics": { |
| "reasoning": { |
| "target": 0.7, |
| "actual": 0.4, |
| "deficit": 0.3, |
| "multiplier": 1.6, |
| } |
| }, |
| } |
| ), |
| encoding="utf-8", |
| ) |
|
|
| monkeypatch.setattr("maris_core.training.train.load_hf_dataset", lambda _: fake_dataset) |
|
|
| class FakeTokenizer: |
| pad_token = None |
| eos_token = "<eos>" |
| pad_token_id = None |
| eos_token_id = 1 |
|
|
| @classmethod |
| def from_pretrained(cls, model_name): |
| del model_name |
| return cls() |
|
|
| def __call__(self, texts, **kwargs): |
| del kwargs |
| return { |
| "input_ids": [[index + 1] for index, _ in enumerate(texts)], |
| "attention_mask": [[1] for _ in texts], |
| } |
|
|
| def save_pretrained(self, output_dir): |
| Path(output_dir).mkdir(parents=True, exist_ok=True) |
| Path(output_dir, "tokenizer.json").write_text("{}", encoding="utf-8") |
| Path(output_dir, "tokenizer_config.json").write_text("{}", encoding="utf-8") |
|
|
| class FakeModel: |
| config = types.SimpleNamespace(pad_token_id=None) |
|
|
| @classmethod |
| def from_pretrained(cls, model_name): |
| del model_name |
| return cls() |
|
|
| class FakeTrainingArguments: |
| def __init__(self, **kwargs): |
| self.kwargs = kwargs |
|
|
| class FakeTrainer: |
| last_train_dataset = None |
|
|
| def __init__(self, *, model, args, train_dataset, eval_dataset=None, data_collator=None): |
| del model, args, eval_dataset, data_collator |
| FakeTrainer.last_train_dataset = train_dataset |
|
|
| def train(self): |
| return types.SimpleNamespace(metrics={"train_loss": 0.2}) |
|
|
| def save_model(self, output_dir): |
| Path(output_dir).mkdir(parents=True, exist_ok=True) |
| Path(output_dir, "config.json").write_text("{}", encoding="utf-8") |
| Path(output_dir, "model.safetensors").write_text("ok", encoding="utf-8") |
|
|
| monkeypatch.setitem( |
| sys.modules, |
| "transformers", |
| types.SimpleNamespace( |
| AutoModelForCausalLM=FakeModel, |
| AutoTokenizer=FakeTokenizer, |
| DataCollatorForLanguageModeling=lambda **kwargs: kwargs, |
| Trainer=FakeTrainer, |
| TrainingArguments=FakeTrainingArguments, |
| ), |
| ) |
|
|
| output_dir = tmp_path / "runs" / "current" |
| metrics = train(output_dir=str(output_dir), validation_split_ratio=0) |
| training_metrics = json.loads( |
| (output_dir / "training-metrics.json").read_text(encoding="utf-8") |
| ) |
|
|
| assert metrics["scoring_train_feedback_boosted_records"] >= 1.0 |
| assert training_metrics["benchmark_feedback"]["discovery_mode"] == "auto_discovered" |
| assert training_metrics["scoring_dashboard_train_sources_production_records"] == 1.0 |
| assert training_metrics["benchmark_feedback"]["artifact_path"].endswith( |
| "previous/benchmark-feedback.json" |
| ) |
|
|