"""Tests training quality gate integrācijai.""" from __future__ import annotations import json import sys import types from pathlib import Path from maris_core.training.train import train def test_train_writes_dataset_quality_and_scoring_reports( tmp_path: Path, monkeypatch, ) -> None: class FakeSplit(list): column_names = ["user", "assistant"] def map(self, function, **kwargs): del kwargs keys = sorted({key for item in self for key in item}) batch = {key: [item.get(key) for item in self] for key in keys} mapped = function(batch) size = len(next(iter(mapped.values()))) if mapped else 0 return FakeSplit( [{key: value[index] for key, value in mapped.items()} for index in range(size)] ) fake_dataset = { "train": FakeSplit( [ {"user": "Sveiki, Maris!", "assistant": "Sveiki! Kā varu palīdzēt šodien?"}, {"user": "Sveiki, Maris!", "assistant": "Sveiki! Kā varu palīdzēt šodien?"}, {"user": "Atkārto mani", "assistant": "Atkārto mani"}, { "user": "Pastāsti man par realtime voice assistant arhitektūru.", "assistant": "Varu palīdzēt ar LiveKit, STT, LLM un TTS plūsmu.", }, ] ) } monkeypatch.setattr("maris_core.training.train.load_hf_dataset", lambda _: fake_dataset) class FakeTokenizer: pad_token = None eos_token = "" pad_token_id = None eos_token_id = 1 @classmethod def from_pretrained(cls, model_name): del model_name return cls() def __call__(self, texts, **kwargs): del kwargs return { "input_ids": [[index + 1] for index, _ in enumerate(texts)], "attention_mask": [[1] for _ in texts], } def save_pretrained(self, output_dir): Path(output_dir).mkdir(parents=True, exist_ok=True) Path(output_dir, "tokenizer.json").write_text("{}", encoding="utf-8") Path(output_dir, "tokenizer_config.json").write_text("{}", encoding="utf-8") class FakeModel: config = types.SimpleNamespace(pad_token_id=None) @classmethod def from_pretrained(cls, model_name): del model_name return cls() class FakeTrainingArguments: def __init__(self, **kwargs): self.kwargs = kwargs class FakeTrainer: last_train_dataset = None def __init__(self, *, model, args, train_dataset, eval_dataset=None, data_collator=None): del model, args, eval_dataset, data_collator self.train_dataset = train_dataset FakeTrainer.last_train_dataset = train_dataset def train(self): return types.SimpleNamespace(metrics={"train_loss": 0.1}) def save_model(self, output_dir): Path(output_dir).mkdir(parents=True, exist_ok=True) Path(output_dir, "config.json").write_text("{}", encoding="utf-8") Path(output_dir, "model.safetensors").write_text("ok", encoding="utf-8") monkeypatch.setitem( sys.modules, "transformers", types.SimpleNamespace( AutoModelForCausalLM=FakeModel, AutoTokenizer=FakeTokenizer, DataCollatorForLanguageModeling=lambda **kwargs: kwargs, Trainer=FakeTrainer, TrainingArguments=FakeTrainingArguments, ), ) output_dir = tmp_path / "model" metrics = train(output_dir=str(output_dir), validation_split_ratio=0) quality_report = json.loads( (output_dir / "dataset-quality-report.json").read_text(encoding="utf-8") ) scoring_report = json.loads( (output_dir / "dataset-scoring-report.json").read_text(encoding="utf-8") ) assert metrics["quality_train_kept"] == 2.0 assert metrics["quality_train_dropped"] == 2.0 assert metrics["quality_train_duplicates_removed"] == 1.0 assert metrics["scoring_train_average_score"] > 0.0 assert metrics["scoring_train_expanded_records"] == 4.0 assert metrics["scoring_train_repeated_records"] == 2.0 assert quality_report["splits"]["train"]["kept_records"] == 2 assert quality_report["splits"]["train"]["duplicates_removed"] == 1 assert quality_report["splits"]["train"]["reasons"]["invalid_conversation_pair"] == 1 assert scoring_report["splits"]["train"]["expanded_records"] == 4 assert scoring_report["splits"]["train"]["repeated_records"] == 2 assert len(FakeTrainer.last_train_dataset) == 4 def test_train_skips_eval_when_quality_gate_filters_all_eval_rows( tmp_path: Path, monkeypatch, ) -> None: class FakeSplit(list): column_names = ["user", "assistant"] def map(self, function, **kwargs): del kwargs keys = sorted({key for item in self for key in item}) batch = {key: [item.get(key) for item in self] for key in keys} mapped = function(batch) size = len(next(iter(mapped.values()))) if mapped else 0 return FakeSplit( [{key: value[index] for key, value in mapped.items()} for index in range(size)] ) fake_dataset = { "train": FakeSplit( [ { "user": "Pastāsti par latviešu valodas asistentu arhitektūru.", "assistant": "Varu palīdzēt ar STT, LLM, atmiņu un TTS plūsmu.", }, { "user": "Izskaidro retrieval pipeline.", "assistant": "Retrieval pipeline indeksē, sameklē un pievieno kontekstu atbildei.", }, ] ), "validation": FakeSplit( [ {"user": "test", "assistant": "test"}, {"text": "todo"}, ] ), } monkeypatch.setattr("maris_core.training.train.load_hf_dataset", lambda _: fake_dataset) class FakeTokenizer: pad_token = None eos_token = "" pad_token_id = None eos_token_id = 1 @classmethod def from_pretrained(cls, model_name): del model_name return cls() def __call__(self, texts, **kwargs): del kwargs return { "input_ids": [[index + 1] for index, _ in enumerate(texts)], "attention_mask": [[1] for _ in texts], } def save_pretrained(self, output_dir): Path(output_dir).mkdir(parents=True, exist_ok=True) Path(output_dir, "tokenizer.json").write_text("{}", encoding="utf-8") Path(output_dir, "tokenizer_config.json").write_text("{}", encoding="utf-8") class FakeModel: config = types.SimpleNamespace(pad_token_id=None) @classmethod def from_pretrained(cls, model_name): del model_name return cls() class FakeTrainingArguments: def __init__(self, **kwargs): self.kwargs = kwargs class FakeTrainer: last_eval_dataset = "unset" last_args = None def __init__(self, *, model, args, train_dataset, eval_dataset=None, data_collator=None): del model, train_dataset, data_collator self.args = args FakeTrainer.last_args = args FakeTrainer.last_eval_dataset = eval_dataset def train(self): return types.SimpleNamespace(metrics={"train_loss": 0.2}) def save_model(self, output_dir): Path(output_dir).mkdir(parents=True, exist_ok=True) Path(output_dir, "config.json").write_text("{}", encoding="utf-8") Path(output_dir, "model.safetensors").write_text("ok", encoding="utf-8") monkeypatch.setitem( sys.modules, "transformers", types.SimpleNamespace( AutoModelForCausalLM=FakeModel, AutoTokenizer=FakeTokenizer, DataCollatorForLanguageModeling=lambda **kwargs: kwargs, Trainer=FakeTrainer, TrainingArguments=FakeTrainingArguments, ), ) output_dir = tmp_path / "model" metrics = train(output_dir=str(output_dir), validation_split_ratio=0) quality_report = json.loads( (output_dir / "dataset-quality-report.json").read_text(encoding="utf-8") ) assert FakeTrainer.last_eval_dataset is None assert FakeTrainer.last_args.kwargs["evaluation_strategy"] == "no" assert metrics["train_loss"] == 0.2 assert metrics["quality_eval_kept"] == 0.0 assert metrics["quality_eval_dropped"] == 2.0 assert metrics["quality_eval_skipped"] == 1.0 assert quality_report["splits"]["eval"]["kept_records"] == 0 assert quality_report["splits"]["eval"]["reasons"]["invalid_conversation_pair"] == 1 assert quality_report["splits"]["eval"]["reasons"]["placeholder_text"] == 1 def test_train_uses_source_aware_and_benchmark_feedback_weighting( tmp_path: Path, monkeypatch, ) -> None: class FakeSplit(list): column_names = ["prompt", "metadata"] def map(self, function, **kwargs): del kwargs keys = sorted({key for item in self for key in item}) batch = {key: [item.get(key) for item in self] for key in keys} mapped = function(batch) size = len(next(iter(mapped.values()))) if mapped else 0 return FakeSplit( [{key: value[index] for key, value in mapped.items()} for index in range(size)] ) fake_dataset = { "train": FakeSplit( [ { "prompt": "Analizē sistēmas reasoning kompromisus.", "metadata": {"source_tier": "production", "category": "reasoning"}, }, { "prompt": "Uzraksti īsu kodu piemēru.", "metadata": {"source_tier": "noisy", "category": "coding"}, }, ] ) } benchmark_feedback_path = tmp_path / "previous-benchmark.json" benchmark_feedback_path.write_text( json.dumps( { "score_manifest": { "overall": 0.63, "reasoning": 0.4, "coding": 0.78, } } ), encoding="utf-8", ) monkeypatch.setattr("maris_core.training.train.load_hf_dataset", lambda _: fake_dataset) class FakeTokenizer: pad_token = None eos_token = "" pad_token_id = None eos_token_id = 1 @classmethod def from_pretrained(cls, model_name): del model_name return cls() def __call__(self, texts, **kwargs): del kwargs return { "input_ids": [[index + 1] for index, _ in enumerate(texts)], "attention_mask": [[1] for _ in texts], } def save_pretrained(self, output_dir): Path(output_dir).mkdir(parents=True, exist_ok=True) Path(output_dir, "tokenizer.json").write_text("{}", encoding="utf-8") Path(output_dir, "tokenizer_config.json").write_text("{}", encoding="utf-8") class FakeModel: config = types.SimpleNamespace(pad_token_id=None) @classmethod def from_pretrained(cls, model_name): del model_name return cls() class FakeTrainingArguments: def __init__(self, **kwargs): self.kwargs = kwargs class FakeTrainer: last_train_dataset = None def __init__(self, *, model, args, train_dataset, eval_dataset=None, data_collator=None): del model, args, eval_dataset, data_collator self.train_dataset = train_dataset FakeTrainer.last_train_dataset = train_dataset def train(self): return types.SimpleNamespace(metrics={"train_loss": 0.2}) def save_model(self, output_dir): Path(output_dir).mkdir(parents=True, exist_ok=True) Path(output_dir, "config.json").write_text("{}", encoding="utf-8") Path(output_dir, "model.safetensors").write_text("ok", encoding="utf-8") monkeypatch.setitem( sys.modules, "transformers", types.SimpleNamespace( AutoModelForCausalLM=FakeModel, AutoTokenizer=FakeTokenizer, DataCollatorForLanguageModeling=lambda **kwargs: kwargs, Trainer=FakeTrainer, TrainingArguments=FakeTrainingArguments, ), ) output_dir = tmp_path / "model" metrics = train( output_dir=str(output_dir), validation_split_ratio=0, benchmark_feedback_path=str(benchmark_feedback_path), ) scoring_report = json.loads( (output_dir / "dataset-scoring-report.json").read_text(encoding="utf-8") ) training_metrics = json.loads( (output_dir / "training-metrics.json").read_text(encoding="utf-8") ) assert metrics["scoring_train_feedback_boosted_records"] >= 1.0 assert metrics["scoring_train_average_repeat_multiplier"] > 1.0 assert scoring_report["splits"]["train"]["source_tiers"]["production"] == 1 assert scoring_report["splits"]["train"]["source_tiers"]["noisy"] == 1 assert scoring_report["splits"]["train"]["feedback_metric_hits"]["reasoning"] >= 1 assert training_metrics["scoring_dashboard"]["train"]["sources"]["production"]["records"] == 1 assert training_metrics["scoring_dashboard"]["train"]["categories"]["reasoning"]["records"] == 1 assert training_metrics["scoring_dashboard_train_sources_production_records"] == 1.0 assert training_metrics["scoring_dashboard_train_categories_reasoning_boosted_records"] >= 1.0 assert training_metrics["benchmark_feedback"]["discovery_mode"] == "explicit" assert len(FakeTrainer.last_train_dataset) > 2 def test_train_auto_discovers_previous_benchmark_feedback( tmp_path: Path, monkeypatch, ) -> None: class FakeSplit(list): column_names = ["prompt", "metadata"] def map(self, function, **kwargs): del kwargs keys = sorted({key for item in self for key in item}) batch = {key: [item.get(key) for item in self] for key in keys} mapped = function(batch) size = len(next(iter(mapped.values()))) if mapped else 0 return FakeSplit( [{key: value[index] for key, value in mapped.items()} for index in range(size)] ) fake_dataset = { "train": FakeSplit( [ { "prompt": "Analizē sistēmas reasoning kompromisus.", "metadata": {"source_tier": "production", "category": "reasoning"}, } ] ) } previous_run_dir = tmp_path / "runs" / "previous" previous_run_dir.mkdir(parents=True) (previous_run_dir / "benchmark-feedback.json").write_text( json.dumps( { "artifact_type": "benchmark-feedback-reweighting", "artifact_path": str(previous_run_dir / "benchmark-feedback.json"), "overall_multiplier": 1.2, "deficient_metrics": { "reasoning": { "target": 0.7, "actual": 0.4, "deficit": 0.3, "multiplier": 1.6, } }, } ), encoding="utf-8", ) monkeypatch.setattr("maris_core.training.train.load_hf_dataset", lambda _: fake_dataset) class FakeTokenizer: pad_token = None eos_token = "" pad_token_id = None eos_token_id = 1 @classmethod def from_pretrained(cls, model_name): del model_name return cls() def __call__(self, texts, **kwargs): del kwargs return { "input_ids": [[index + 1] for index, _ in enumerate(texts)], "attention_mask": [[1] for _ in texts], } def save_pretrained(self, output_dir): Path(output_dir).mkdir(parents=True, exist_ok=True) Path(output_dir, "tokenizer.json").write_text("{}", encoding="utf-8") Path(output_dir, "tokenizer_config.json").write_text("{}", encoding="utf-8") class FakeModel: config = types.SimpleNamespace(pad_token_id=None) @classmethod def from_pretrained(cls, model_name): del model_name return cls() class FakeTrainingArguments: def __init__(self, **kwargs): self.kwargs = kwargs class FakeTrainer: last_train_dataset = None def __init__(self, *, model, args, train_dataset, eval_dataset=None, data_collator=None): del model, args, eval_dataset, data_collator FakeTrainer.last_train_dataset = train_dataset def train(self): return types.SimpleNamespace(metrics={"train_loss": 0.2}) def save_model(self, output_dir): Path(output_dir).mkdir(parents=True, exist_ok=True) Path(output_dir, "config.json").write_text("{}", encoding="utf-8") Path(output_dir, "model.safetensors").write_text("ok", encoding="utf-8") monkeypatch.setitem( sys.modules, "transformers", types.SimpleNamespace( AutoModelForCausalLM=FakeModel, AutoTokenizer=FakeTokenizer, DataCollatorForLanguageModeling=lambda **kwargs: kwargs, Trainer=FakeTrainer, TrainingArguments=FakeTrainingArguments, ), ) output_dir = tmp_path / "runs" / "current" metrics = train(output_dir=str(output_dir), validation_split_ratio=0) training_metrics = json.loads( (output_dir / "training-metrics.json").read_text(encoding="utf-8") ) assert metrics["scoring_train_feedback_boosted_records"] >= 1.0 assert training_metrics["benchmark_feedback"]["discovery_mode"] == "auto_discovered" assert training_metrics["scoring_dashboard_train_sources_production_records"] == 1.0 assert training_metrics["benchmark_feedback"]["artifact_path"].endswith( "previous/benchmark-feedback.json" )