maris-ai-master / core-python /tests /test_training_quality.py
MarisUK's picture
Maris AI model sync
f440f03 verified
"""Tests training quality gate integrācijai."""
from __future__ import annotations
import json
import sys
import types
from pathlib import Path
from maris_core.training.train import train
def test_train_writes_dataset_quality_and_scoring_reports(
tmp_path: Path,
monkeypatch,
) -> None:
class FakeSplit(list):
column_names = ["user", "assistant"]
def map(self, function, **kwargs):
del kwargs
keys = sorted({key for item in self for key in item})
batch = {key: [item.get(key) for item in self] for key in keys}
mapped = function(batch)
size = len(next(iter(mapped.values()))) if mapped else 0
return FakeSplit(
[{key: value[index] for key, value in mapped.items()} for index in range(size)]
)
fake_dataset = {
"train": FakeSplit(
[
{"user": "Sveiki, Maris!", "assistant": "Sveiki! Kā varu palīdzēt šodien?"},
{"user": "Sveiki, Maris!", "assistant": "Sveiki! Kā varu palīdzēt šodien?"},
{"user": "Atkārto mani", "assistant": "Atkārto mani"},
{
"user": "Pastāsti man par realtime voice assistant arhitektūru.",
"assistant": "Varu palīdzēt ar LiveKit, STT, LLM un TTS plūsmu.",
},
]
)
}
monkeypatch.setattr("maris_core.training.train.load_hf_dataset", lambda _: fake_dataset)
class FakeTokenizer:
pad_token = None
eos_token = "<eos>"
pad_token_id = None
eos_token_id = 1
@classmethod
def from_pretrained(cls, model_name):
del model_name
return cls()
def __call__(self, texts, **kwargs):
del kwargs
return {
"input_ids": [[index + 1] for index, _ in enumerate(texts)],
"attention_mask": [[1] for _ in texts],
}
def save_pretrained(self, output_dir):
Path(output_dir).mkdir(parents=True, exist_ok=True)
Path(output_dir, "tokenizer.json").write_text("{}", encoding="utf-8")
Path(output_dir, "tokenizer_config.json").write_text("{}", encoding="utf-8")
class FakeModel:
config = types.SimpleNamespace(pad_token_id=None)
@classmethod
def from_pretrained(cls, model_name):
del model_name
return cls()
class FakeTrainingArguments:
def __init__(self, **kwargs):
self.kwargs = kwargs
class FakeTrainer:
last_train_dataset = None
def __init__(self, *, model, args, train_dataset, eval_dataset=None, data_collator=None):
del model, args, eval_dataset, data_collator
self.train_dataset = train_dataset
FakeTrainer.last_train_dataset = train_dataset
def train(self):
return types.SimpleNamespace(metrics={"train_loss": 0.1})
def save_model(self, output_dir):
Path(output_dir).mkdir(parents=True, exist_ok=True)
Path(output_dir, "config.json").write_text("{}", encoding="utf-8")
Path(output_dir, "model.safetensors").write_text("ok", encoding="utf-8")
monkeypatch.setitem(
sys.modules,
"transformers",
types.SimpleNamespace(
AutoModelForCausalLM=FakeModel,
AutoTokenizer=FakeTokenizer,
DataCollatorForLanguageModeling=lambda **kwargs: kwargs,
Trainer=FakeTrainer,
TrainingArguments=FakeTrainingArguments,
),
)
output_dir = tmp_path / "model"
metrics = train(output_dir=str(output_dir), validation_split_ratio=0)
quality_report = json.loads(
(output_dir / "dataset-quality-report.json").read_text(encoding="utf-8")
)
scoring_report = json.loads(
(output_dir / "dataset-scoring-report.json").read_text(encoding="utf-8")
)
assert metrics["quality_train_kept"] == 2.0
assert metrics["quality_train_dropped"] == 2.0
assert metrics["quality_train_duplicates_removed"] == 1.0
assert metrics["scoring_train_average_score"] > 0.0
assert metrics["scoring_train_expanded_records"] == 4.0
assert metrics["scoring_train_repeated_records"] == 2.0
assert quality_report["splits"]["train"]["kept_records"] == 2
assert quality_report["splits"]["train"]["duplicates_removed"] == 1
assert quality_report["splits"]["train"]["reasons"]["invalid_conversation_pair"] == 1
assert scoring_report["splits"]["train"]["expanded_records"] == 4
assert scoring_report["splits"]["train"]["repeated_records"] == 2
assert len(FakeTrainer.last_train_dataset) == 4
def test_train_skips_eval_when_quality_gate_filters_all_eval_rows(
tmp_path: Path,
monkeypatch,
) -> None:
class FakeSplit(list):
column_names = ["user", "assistant"]
def map(self, function, **kwargs):
del kwargs
keys = sorted({key for item in self for key in item})
batch = {key: [item.get(key) for item in self] for key in keys}
mapped = function(batch)
size = len(next(iter(mapped.values()))) if mapped else 0
return FakeSplit(
[{key: value[index] for key, value in mapped.items()} for index in range(size)]
)
fake_dataset = {
"train": FakeSplit(
[
{
"user": "Pastāsti par latviešu valodas asistentu arhitektūru.",
"assistant": "Varu palīdzēt ar STT, LLM, atmiņu un TTS plūsmu.",
},
{
"user": "Izskaidro retrieval pipeline.",
"assistant": "Retrieval pipeline indeksē, sameklē un pievieno kontekstu atbildei.",
},
]
),
"validation": FakeSplit(
[
{"user": "test", "assistant": "test"},
{"text": "todo"},
]
),
}
monkeypatch.setattr("maris_core.training.train.load_hf_dataset", lambda _: fake_dataset)
class FakeTokenizer:
pad_token = None
eos_token = "<eos>"
pad_token_id = None
eos_token_id = 1
@classmethod
def from_pretrained(cls, model_name):
del model_name
return cls()
def __call__(self, texts, **kwargs):
del kwargs
return {
"input_ids": [[index + 1] for index, _ in enumerate(texts)],
"attention_mask": [[1] for _ in texts],
}
def save_pretrained(self, output_dir):
Path(output_dir).mkdir(parents=True, exist_ok=True)
Path(output_dir, "tokenizer.json").write_text("{}", encoding="utf-8")
Path(output_dir, "tokenizer_config.json").write_text("{}", encoding="utf-8")
class FakeModel:
config = types.SimpleNamespace(pad_token_id=None)
@classmethod
def from_pretrained(cls, model_name):
del model_name
return cls()
class FakeTrainingArguments:
def __init__(self, **kwargs):
self.kwargs = kwargs
class FakeTrainer:
last_eval_dataset = "unset"
last_args = None
def __init__(self, *, model, args, train_dataset, eval_dataset=None, data_collator=None):
del model, train_dataset, data_collator
self.args = args
FakeTrainer.last_args = args
FakeTrainer.last_eval_dataset = eval_dataset
def train(self):
return types.SimpleNamespace(metrics={"train_loss": 0.2})
def save_model(self, output_dir):
Path(output_dir).mkdir(parents=True, exist_ok=True)
Path(output_dir, "config.json").write_text("{}", encoding="utf-8")
Path(output_dir, "model.safetensors").write_text("ok", encoding="utf-8")
monkeypatch.setitem(
sys.modules,
"transformers",
types.SimpleNamespace(
AutoModelForCausalLM=FakeModel,
AutoTokenizer=FakeTokenizer,
DataCollatorForLanguageModeling=lambda **kwargs: kwargs,
Trainer=FakeTrainer,
TrainingArguments=FakeTrainingArguments,
),
)
output_dir = tmp_path / "model"
metrics = train(output_dir=str(output_dir), validation_split_ratio=0)
quality_report = json.loads(
(output_dir / "dataset-quality-report.json").read_text(encoding="utf-8")
)
assert FakeTrainer.last_eval_dataset is None
assert FakeTrainer.last_args.kwargs["evaluation_strategy"] == "no"
assert metrics["train_loss"] == 0.2
assert metrics["quality_eval_kept"] == 0.0
assert metrics["quality_eval_dropped"] == 2.0
assert metrics["quality_eval_skipped"] == 1.0
assert quality_report["splits"]["eval"]["kept_records"] == 0
assert quality_report["splits"]["eval"]["reasons"]["invalid_conversation_pair"] == 1
assert quality_report["splits"]["eval"]["reasons"]["placeholder_text"] == 1
def test_train_uses_source_aware_and_benchmark_feedback_weighting(
tmp_path: Path,
monkeypatch,
) -> None:
class FakeSplit(list):
column_names = ["prompt", "metadata"]
def map(self, function, **kwargs):
del kwargs
keys = sorted({key for item in self for key in item})
batch = {key: [item.get(key) for item in self] for key in keys}
mapped = function(batch)
size = len(next(iter(mapped.values()))) if mapped else 0
return FakeSplit(
[{key: value[index] for key, value in mapped.items()} for index in range(size)]
)
fake_dataset = {
"train": FakeSplit(
[
{
"prompt": "Analizē sistēmas reasoning kompromisus.",
"metadata": {"source_tier": "production", "category": "reasoning"},
},
{
"prompt": "Uzraksti īsu kodu piemēru.",
"metadata": {"source_tier": "noisy", "category": "coding"},
},
]
)
}
benchmark_feedback_path = tmp_path / "previous-benchmark.json"
benchmark_feedback_path.write_text(
json.dumps(
{
"score_manifest": {
"overall": 0.63,
"reasoning": 0.4,
"coding": 0.78,
}
}
),
encoding="utf-8",
)
monkeypatch.setattr("maris_core.training.train.load_hf_dataset", lambda _: fake_dataset)
class FakeTokenizer:
pad_token = None
eos_token = "<eos>"
pad_token_id = None
eos_token_id = 1
@classmethod
def from_pretrained(cls, model_name):
del model_name
return cls()
def __call__(self, texts, **kwargs):
del kwargs
return {
"input_ids": [[index + 1] for index, _ in enumerate(texts)],
"attention_mask": [[1] for _ in texts],
}
def save_pretrained(self, output_dir):
Path(output_dir).mkdir(parents=True, exist_ok=True)
Path(output_dir, "tokenizer.json").write_text("{}", encoding="utf-8")
Path(output_dir, "tokenizer_config.json").write_text("{}", encoding="utf-8")
class FakeModel:
config = types.SimpleNamespace(pad_token_id=None)
@classmethod
def from_pretrained(cls, model_name):
del model_name
return cls()
class FakeTrainingArguments:
def __init__(self, **kwargs):
self.kwargs = kwargs
class FakeTrainer:
last_train_dataset = None
def __init__(self, *, model, args, train_dataset, eval_dataset=None, data_collator=None):
del model, args, eval_dataset, data_collator
self.train_dataset = train_dataset
FakeTrainer.last_train_dataset = train_dataset
def train(self):
return types.SimpleNamespace(metrics={"train_loss": 0.2})
def save_model(self, output_dir):
Path(output_dir).mkdir(parents=True, exist_ok=True)
Path(output_dir, "config.json").write_text("{}", encoding="utf-8")
Path(output_dir, "model.safetensors").write_text("ok", encoding="utf-8")
monkeypatch.setitem(
sys.modules,
"transformers",
types.SimpleNamespace(
AutoModelForCausalLM=FakeModel,
AutoTokenizer=FakeTokenizer,
DataCollatorForLanguageModeling=lambda **kwargs: kwargs,
Trainer=FakeTrainer,
TrainingArguments=FakeTrainingArguments,
),
)
output_dir = tmp_path / "model"
metrics = train(
output_dir=str(output_dir),
validation_split_ratio=0,
benchmark_feedback_path=str(benchmark_feedback_path),
)
scoring_report = json.loads(
(output_dir / "dataset-scoring-report.json").read_text(encoding="utf-8")
)
training_metrics = json.loads(
(output_dir / "training-metrics.json").read_text(encoding="utf-8")
)
assert metrics["scoring_train_feedback_boosted_records"] >= 1.0
assert metrics["scoring_train_average_repeat_multiplier"] > 1.0
assert scoring_report["splits"]["train"]["source_tiers"]["production"] == 1
assert scoring_report["splits"]["train"]["source_tiers"]["noisy"] == 1
assert scoring_report["splits"]["train"]["feedback_metric_hits"]["reasoning"] >= 1
assert training_metrics["scoring_dashboard"]["train"]["sources"]["production"]["records"] == 1
assert training_metrics["scoring_dashboard"]["train"]["categories"]["reasoning"]["records"] == 1
assert training_metrics["scoring_dashboard_train_sources_production_records"] == 1.0
assert training_metrics["scoring_dashboard_train_categories_reasoning_boosted_records"] >= 1.0
assert training_metrics["benchmark_feedback"]["discovery_mode"] == "explicit"
assert len(FakeTrainer.last_train_dataset) > 2
def test_train_auto_discovers_previous_benchmark_feedback(
tmp_path: Path,
monkeypatch,
) -> None:
class FakeSplit(list):
column_names = ["prompt", "metadata"]
def map(self, function, **kwargs):
del kwargs
keys = sorted({key for item in self for key in item})
batch = {key: [item.get(key) for item in self] for key in keys}
mapped = function(batch)
size = len(next(iter(mapped.values()))) if mapped else 0
return FakeSplit(
[{key: value[index] for key, value in mapped.items()} for index in range(size)]
)
fake_dataset = {
"train": FakeSplit(
[
{
"prompt": "Analizē sistēmas reasoning kompromisus.",
"metadata": {"source_tier": "production", "category": "reasoning"},
}
]
)
}
previous_run_dir = tmp_path / "runs" / "previous"
previous_run_dir.mkdir(parents=True)
(previous_run_dir / "benchmark-feedback.json").write_text(
json.dumps(
{
"artifact_type": "benchmark-feedback-reweighting",
"artifact_path": str(previous_run_dir / "benchmark-feedback.json"),
"overall_multiplier": 1.2,
"deficient_metrics": {
"reasoning": {
"target": 0.7,
"actual": 0.4,
"deficit": 0.3,
"multiplier": 1.6,
}
},
}
),
encoding="utf-8",
)
monkeypatch.setattr("maris_core.training.train.load_hf_dataset", lambda _: fake_dataset)
class FakeTokenizer:
pad_token = None
eos_token = "<eos>"
pad_token_id = None
eos_token_id = 1
@classmethod
def from_pretrained(cls, model_name):
del model_name
return cls()
def __call__(self, texts, **kwargs):
del kwargs
return {
"input_ids": [[index + 1] for index, _ in enumerate(texts)],
"attention_mask": [[1] for _ in texts],
}
def save_pretrained(self, output_dir):
Path(output_dir).mkdir(parents=True, exist_ok=True)
Path(output_dir, "tokenizer.json").write_text("{}", encoding="utf-8")
Path(output_dir, "tokenizer_config.json").write_text("{}", encoding="utf-8")
class FakeModel:
config = types.SimpleNamespace(pad_token_id=None)
@classmethod
def from_pretrained(cls, model_name):
del model_name
return cls()
class FakeTrainingArguments:
def __init__(self, **kwargs):
self.kwargs = kwargs
class FakeTrainer:
last_train_dataset = None
def __init__(self, *, model, args, train_dataset, eval_dataset=None, data_collator=None):
del model, args, eval_dataset, data_collator
FakeTrainer.last_train_dataset = train_dataset
def train(self):
return types.SimpleNamespace(metrics={"train_loss": 0.2})
def save_model(self, output_dir):
Path(output_dir).mkdir(parents=True, exist_ok=True)
Path(output_dir, "config.json").write_text("{}", encoding="utf-8")
Path(output_dir, "model.safetensors").write_text("ok", encoding="utf-8")
monkeypatch.setitem(
sys.modules,
"transformers",
types.SimpleNamespace(
AutoModelForCausalLM=FakeModel,
AutoTokenizer=FakeTokenizer,
DataCollatorForLanguageModeling=lambda **kwargs: kwargs,
Trainer=FakeTrainer,
TrainingArguments=FakeTrainingArguments,
),
)
output_dir = tmp_path / "runs" / "current"
metrics = train(output_dir=str(output_dir), validation_split_ratio=0)
training_metrics = json.loads(
(output_dir / "training-metrics.json").read_text(encoding="utf-8")
)
assert metrics["scoring_train_feedback_boosted_records"] >= 1.0
assert training_metrics["benchmark_feedback"]["discovery_mode"] == "auto_discovered"
assert training_metrics["scoring_dashboard_train_sources_production_records"] == 1.0
assert training_metrics["benchmark_feedback"]["artifact_path"].endswith(
"previous/benchmark-feedback.json"
)