MarisUK's picture
Maris AI model sync
f440f03 verified
"""Dataseta scoring, source-aware weighting un benchmark feedback palīgi."""
from __future__ import annotations
import json
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any
from maris_core.data.preprocessing import clean_text, record_to_training_text
DEFAULT_SOURCE_WEIGHT_MAP = {
"production": 1.3,
"synthetic": 1.0,
"noisy": 0.65,
"unknown": 1.0,
}
SOURCE_TIER_TOKEN_MAP = {
"production": ("production", "prod", "live", "human", "curated", "real", "customer"),
"synthetic": ("synthetic", "generated", "augmented", "distilled", "bootstrap", "seeded"),
"noisy": ("noisy", "weak", "scraped", "raw", "unfiltered", "test"),
}
BENCHMARK_METRIC_ALIASES = {
"reasoning": {"reasoning", "analysis", "logic", "planner"},
"coding": {"coding", "code", "programming", "developer"},
"long_context": {"long_context", "context", "memory", "retrieval"},
"helpfulness": {"helpfulness", "helpful", "assistant", "support"},
"factuality": {"factuality", "facts", "grounding", "grounded"},
"latvian_quality": {"latvian_quality", "latvian", "language_lv", "lv"},
"safety": {"safety", "safe", "guardrails", "policy"},
}
@dataclass(slots=True, frozen=True)
class DatasetScoringConfig:
"""Konfigurācija dataset scoring/weighting solim."""
enabled: bool = True
weighted_repetition_enabled: bool = True
max_text_chars: int = 8192
low_score_repeat_count: int = 1
medium_score_repeat_count: int = 2
high_score_repeat_count: int = 3
medium_score_threshold: float = 0.55
high_score_threshold: float = 0.8
source_weighting_enabled: bool = True
source_weight_map: dict[str, float] = field(
default_factory=lambda: DEFAULT_SOURCE_WEIGHT_MAP.copy()
)
category_weight_map: dict[str, float] = field(default_factory=dict)
max_effective_repeat_count: int = 6
benchmark_feedback_enabled: bool = True
benchmark_feedback_path: str = ""
benchmark_feedback_boost_scale: float = 2.0
benchmark_feedback_max_multiplier: float = 1.75
@dataclass(slots=True, frozen=True)
class DatasetBenchmarkFeedback:
"""Iepriekšējā benchmark artefakta kopsavilkums reweighting vajadzībām."""
artifact_path: str
deficient_metrics: dict[str, dict[str, float]]
overall_multiplier: float = 1.0
discovery_mode: str = "explicit"
@dataclass(slots=True)
class DatasetScoringSplitReport:
"""Viena split scoring rezultātu kopsavilkums."""
split_name: str
input_records: int = 0
expanded_records: int = 0
average_score: float = 0.0
max_score: float = 0.0
min_score: float = 0.0
repeated_records: int = 0
score_buckets: dict[str, int] = field(default_factory=dict)
repeat_buckets: dict[str, int] = field(default_factory=dict)
source_tiers: dict[str, int] = field(default_factory=dict)
category_buckets: dict[str, int] = field(default_factory=dict)
feedback_metric_hits: dict[str, int] = field(default_factory=dict)
feedback_boosted_records: int = 0
average_repeat_multiplier: float = 1.0
source_dashboard: dict[str, dict[str, float]] = field(default_factory=dict)
category_dashboard: dict[str, dict[str, float]] = field(default_factory=dict)
sample_scores: list[dict[str, Any]] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
return asdict(self)
@dataclass(slots=True)
class DatasetScoringReport:
"""Pilns dataset scoring artefakts."""
artifact_type: str
config: dict[str, Any]
splits: dict[str, DatasetScoringSplitReport]
def to_dict(self) -> dict[str, Any]:
return {
"artifact_type": self.artifact_type,
"config": self.config,
"splits": {name: report.to_dict() for name, report in self.splits.items()},
"input_records": sum(report.input_records for report in self.splits.values()),
"expanded_records": sum(report.expanded_records for report in self.splits.values()),
"repeated_records": sum(report.repeated_records for report in self.splits.values()),
}
def apply_scoring_to_records(
records: list[dict[str, Any]],
*,
split_name: str,
config: DatasetScoringConfig,
expand_weights: bool,
benchmark_feedback: DatasetBenchmarkFeedback | None = None,
) -> tuple[list[dict[str, Any]], DatasetScoringSplitReport]:
"""Aprēķina score un materializē weighting kā atkārtojumus."""
report = DatasetScoringSplitReport(split_name=split_name, input_records=len(records))
if not records:
return [], report
scored_records: list[tuple[dict[str, Any], float, int]] = []
total_score = 0.0
total_repeat_multiplier = 0.0
source_dashboard: dict[str, dict[str, float]] = {}
category_dashboard: dict[str, dict[str, float]] = {}
for record in records:
score = score_record(record, max_text_chars=config.max_text_chars)
source_tier = detect_source_tier(record)
category_label = detect_record_category(record)
source_multiplier = _source_multiplier_for_record(record, config)
category_multiplier = _category_multiplier_for_record(record, config)
feedback_multiplier, matched_metrics = _feedback_multiplier_for_record(
record,
benchmark_feedback,
)
repeat_multiplier = source_multiplier * category_multiplier * feedback_multiplier
repeat_count = _effective_repeat_count(
score,
repeat_multiplier=repeat_multiplier,
config=config,
expand_weights=expand_weights,
)
total_score += score
total_repeat_multiplier += repeat_multiplier
report.score_buckets[_score_bucket(score)] = (
report.score_buckets.get(_score_bucket(score), 0) + 1
)
report.repeat_buckets[str(repeat_count)] = (
report.repeat_buckets.get(str(repeat_count), 0) + 1
)
report.source_tiers[source_tier] = report.source_tiers.get(source_tier, 0) + 1
report.category_buckets[category_label] = report.category_buckets.get(category_label, 0) + 1
if repeat_count > 1:
report.repeated_records += 1
if matched_metrics:
report.feedback_boosted_records += 1
for metric in matched_metrics:
report.feedback_metric_hits[metric] = report.feedback_metric_hits.get(metric, 0) + 1
_update_dashboard_bucket(
source_dashboard,
source_tier,
score=score,
repeat_count=repeat_count,
repeat_multiplier=repeat_multiplier,
boosted=bool(matched_metrics),
)
_update_dashboard_bucket(
category_dashboard,
category_label,
score=score,
repeat_count=repeat_count,
repeat_multiplier=repeat_multiplier,
boosted=bool(matched_metrics),
)
if len(report.sample_scores) < 5:
report.sample_scores.append(
{
"score": round(score, 4),
"source_tier": source_tier,
"category": category_label,
"source_multiplier": round(source_multiplier, 4),
"category_multiplier": round(category_multiplier, 4),
"feedback_multiplier": round(feedback_multiplier, 4),
"matched_metrics": matched_metrics,
"repeat_count": repeat_count,
"preview": record_to_training_text(record, max_chars=config.max_text_chars)[
:160
],
}
)
scored_records.append((record, score, repeat_count))
report.average_score = round(total_score / len(scored_records), 4)
report.average_repeat_multiplier = round(total_repeat_multiplier / len(scored_records), 4)
report.source_dashboard = _finalize_dashboard(source_dashboard)
report.category_dashboard = _finalize_dashboard(category_dashboard)
score_values = [score for _, score, _ in scored_records]
report.min_score = round(min(score_values), 4)
report.max_score = round(max(score_values), 4)
if not config.enabled:
expanded = list(records)
else:
expanded = []
for record, score, repeat_count in scored_records:
repeat_total = repeat_count if expand_weights else 1
for copy_index in range(repeat_total):
enriched = dict(record)
enriched["maris_dataset_score"] = round(score, 4)
enriched["maris_dataset_repeat_count"] = repeat_count
enriched["maris_dataset_repeat_index"] = copy_index
enriched["maris_dataset_source_tier"] = detect_source_tier(record)
expanded.append(enriched)
report.expanded_records = len(expanded)
return expanded, report
def build_dataset_scoring_report(
*,
config: DatasetScoringConfig,
train_report: DatasetScoringSplitReport,
eval_report: DatasetScoringSplitReport | None = None,
) -> DatasetScoringReport:
"""Izveido serializējamu scoring artefaktu."""
splits = {train_report.split_name: train_report}
if eval_report is not None:
splits[eval_report.split_name] = eval_report
return DatasetScoringReport(
artifact_type="dataset-scoring-report",
config=asdict(config),
splits=splits,
)
def load_benchmark_feedback(
path: str | Path,
*,
targets: dict[str, float],
boost_scale: float,
max_multiplier: float,
) -> DatasetBenchmarkFeedback:
"""Ielādē benchmark manifestu/feedback artefaktu un pārvērš reweighting noteikumos."""
payload = json.loads(Path(path).read_text(encoding="utf-8"))
if "deficient_metrics" in payload:
deficient_metrics = {
str(metric): {
"target": float(details.get("target", 0.0)),
"actual": float(details.get("actual", 0.0)),
"deficit": float(details.get("deficit", 0.0)),
"multiplier": float(details.get("multiplier", 1.0)),
}
for metric, details in payload.get("deficient_metrics", {}).items()
if isinstance(details, dict)
}
overall_multiplier = float(payload.get("overall_multiplier", 1.0) or 1.0)
return DatasetBenchmarkFeedback(
artifact_path=str(path),
deficient_metrics=deficient_metrics,
overall_multiplier=overall_multiplier,
discovery_mode=str(payload.get("discovery_mode", "explicit") or "explicit"),
)
score_manifest = payload.get("score_manifest")
if not isinstance(score_manifest, dict):
raise ValueError("Benchmark feedback failā jābūt `score_manifest` vai `deficient_metrics`.")
deficient_metrics: dict[str, dict[str, float]] = {}
overall_multiplier = 1.0
for metric, target in targets.items():
actual = float(score_manifest.get(metric, score_manifest.get("overall", 0.0)) or 0.0)
deficit = max(float(target) - actual, 0.0)
if deficit <= 0:
continue
multiplier = min(1.0 + deficit * boost_scale, max_multiplier)
deficient_metrics[str(metric)] = {
"target": float(target),
"actual": actual,
"deficit": round(deficit, 4),
"multiplier": round(multiplier, 4),
}
if metric == "overall":
overall_multiplier = round(multiplier, 4)
return DatasetBenchmarkFeedback(
artifact_path=str(path),
deficient_metrics=deficient_metrics,
overall_multiplier=overall_multiplier,
)
def build_benchmark_feedback_artifact(
feedback: DatasetBenchmarkFeedback,
) -> dict[str, Any]:
"""Izveido serializējamu benchmark-feedback artefaktu."""
return {
"artifact_type": "benchmark-feedback-reweighting",
"artifact_path": feedback.artifact_path,
"overall_multiplier": feedback.overall_multiplier,
"discovery_mode": feedback.discovery_mode,
"deficient_metrics": feedback.deficient_metrics,
}
def score_record(record: dict[str, Any], *, max_text_chars: int) -> float:
"""Aprēķina heuristisku datu kvalitātes score [0,1] intervālā."""
text = clean_text(record_to_training_text(record, max_chars=max_text_chars))
if not text:
return 0.0
text_length = len(text)
tokens = [token for token in text.casefold().split() if token]
unique_tokens = len(set(tokens))
length_score = min(text_length / 240.0, 1.0)
diversity_score = min(unique_tokens / max(len(tokens), 1), 1.0)
structure_score = _structure_score(record)
metadata_score = _metadata_score(record)
score = (
0.35 * length_score
+ 0.30 * diversity_score
+ 0.20 * structure_score
+ 0.15 * metadata_score
)
return max(0.0, min(round(score, 4), 1.0))
def detect_source_tier(record: dict[str, Any]) -> str:
"""Atrod source tier svarošnai."""
candidates = _record_terms(record)
explicit = record.get("source_tier") or record.get("source_quality")
if isinstance(explicit, str):
normalized = clean_text(explicit).casefold().replace(" ", "_")
if normalized in DEFAULT_SOURCE_WEIGHT_MAP:
return normalized
for tier, tokens in SOURCE_TIER_TOKEN_MAP.items():
if any(token in candidates for token in tokens):
return tier
return "unknown"
def detect_record_category(record: dict[str, Any]) -> str:
"""Atrod stabilu kategorijas label dashboard grupēšanai."""
for candidate in (
record.get("category"),
record.get("task_category"),
record.get("branch_focus"),
):
if isinstance(candidate, str) and clean_text(candidate):
return _normalize_label(candidate)
metadata = record.get("metadata")
if isinstance(metadata, dict):
for key in ("category", "focus", "type"):
candidate = metadata.get(key)
if isinstance(candidate, str) and clean_text(candidate):
return _normalize_label(candidate)
return "general"
def _structure_score(record: dict[str, Any]) -> float:
if isinstance(record.get("user"), str) and isinstance(record.get("assistant"), str):
user = clean_text(str(record.get("user", "")))
assistant = clean_text(str(record.get("assistant", "")))
if user and assistant and user.casefold() != assistant.casefold():
return 1.0
return 0.3
if isinstance(record.get("prompt"), str):
return 0.8 if clean_text(str(record.get("prompt", ""))) else 0.2
if isinstance(record.get("text"), str):
return 0.6 if clean_text(str(record.get("text", ""))) else 0.2
return 0.4
def _metadata_score(record: dict[str, Any]) -> float:
metadata = record.get("metadata")
score = 0.0
if isinstance(metadata, dict):
score += min(len(metadata) / 4.0, 1.0) * 0.7
if isinstance(record.get("language"), str) and clean_text(str(record["language"])):
score += 0.15
if isinstance(record.get("source"), str) and clean_text(str(record["source"])):
score += 0.15
return min(score, 1.0)
def _source_multiplier_for_record(record: dict[str, Any], config: DatasetScoringConfig) -> float:
if not config.source_weighting_enabled:
return 1.0
source_tier = detect_source_tier(record)
return max(0.1, float(config.source_weight_map.get(source_tier, 1.0)))
def _category_multiplier_for_record(record: dict[str, Any], config: DatasetScoringConfig) -> float:
if not config.category_weight_map:
return 1.0
labels = _record_labels(record)
matches = [
float(weight)
for label, weight in config.category_weight_map.items()
if clean_text(str(label)).casefold().replace("-", "_").replace(" ", "_") in labels
]
if not matches:
return 1.0
return max(0.1, max(matches))
def _feedback_multiplier_for_record(
record: dict[str, Any],
feedback: DatasetBenchmarkFeedback | None,
) -> tuple[float, list[str]]:
if feedback is None or not feedback.deficient_metrics:
return 1.0, []
labels = _record_labels(record)
matched_metrics = sorted(
metric
for metric in feedback.deficient_metrics
if metric == "overall"
or labels.intersection(BENCHMARK_METRIC_ALIASES.get(metric, {metric}))
)
if not matched_metrics:
return feedback.overall_multiplier, ["overall"] if feedback.overall_multiplier > 1.0 else []
specific_multiplier = (
max(
float(feedback.deficient_metrics[metric].get("multiplier", 1.0) or 1.0)
for metric in matched_metrics
if metric != "overall"
)
if any(metric != "overall" for metric in matched_metrics)
else 1.0
)
combined = min(
max(1.0, specific_multiplier) * max(1.0, feedback.overall_multiplier),
max(
[feedback.overall_multiplier]
+ [
float(details.get("multiplier", 1.0) or 1.0)
for details in feedback.deficient_metrics.values()
]
),
)
return round(max(1.0, combined), 4), matched_metrics
def _effective_repeat_count(
score: float,
*,
repeat_multiplier: float,
config: DatasetScoringConfig,
expand_weights: bool,
) -> int:
if not config.enabled or not expand_weights or not config.weighted_repetition_enabled:
return 1
base_repeat_count = _repeat_count_for_score(score, config)
weighted = int(round(base_repeat_count * max(repeat_multiplier, 0.1)))
return max(1, min(weighted, config.max_effective_repeat_count))
def _repeat_count_for_score(score: float, config: DatasetScoringConfig) -> int:
if score >= config.high_score_threshold:
return max(1, config.high_score_repeat_count)
if score >= config.medium_score_threshold:
return max(1, config.medium_score_repeat_count)
return max(1, config.low_score_repeat_count)
def _score_bucket(score: float) -> str:
if score >= 0.8:
return "high"
if score >= 0.55:
return "medium"
return "low"
def _record_terms(record: dict[str, Any]) -> set[str]:
values: list[str] = []
for key in ("source", "source_type", "source_quality", "source_tier", "category", "language"):
value = record.get(key)
if isinstance(value, str):
values.extend(value.casefold().replace("-", " ").replace("_", " ").split())
metadata = record.get("metadata")
if isinstance(metadata, dict):
for key in ("source", "source_type", "source_quality", "source_tier", "origin", "category"):
value = metadata.get(key)
if isinstance(value, str):
values.extend(value.casefold().replace("-", " ").replace("_", " ").split())
tags = metadata.get("tags")
if isinstance(tags, list):
for item in tags:
if isinstance(item, str):
values.extend(item.casefold().replace("-", " ").replace("_", " ").split())
tags = record.get("tags")
if isinstance(tags, list):
for item in tags:
if isinstance(item, str):
values.extend(item.casefold().replace("-", " ").replace("_", " ").split())
return set(values)
def _record_labels(record: dict[str, Any]) -> set[str]:
labels: set[str] = set()
for key in ("category", "branch_focus", "source", "language"):
value = record.get(key)
if isinstance(value, str) and clean_text(value):
normalized = clean_text(value).casefold().replace("-", "_").replace(" ", "_")
labels.add(normalized)
labels.update(normalized.split("_"))
metadata = record.get("metadata")
if isinstance(metadata, dict):
for key in ("category", "focus", "type", "language"):
value = metadata.get(key)
if isinstance(value, str) and clean_text(value):
normalized = clean_text(value).casefold().replace("-", "_").replace(" ", "_")
labels.add(normalized)
labels.update(normalized.split("_"))
tags = metadata.get("tags")
if isinstance(tags, list):
for item in tags:
if isinstance(item, str) and clean_text(item):
normalized = clean_text(item).casefold().replace("-", "_").replace(" ", "_")
labels.add(normalized)
labels.update(normalized.split("_"))
tags = record.get("tags")
if isinstance(tags, list):
for item in tags:
if isinstance(item, str) and clean_text(item):
normalized = clean_text(item).casefold().replace("-", "_").replace(" ", "_")
labels.add(normalized)
labels.update(normalized.split("_"))
return labels
def _update_dashboard_bucket(
dashboard: dict[str, dict[str, float]],
label: str,
*,
score: float,
repeat_count: int,
repeat_multiplier: float,
boosted: bool,
) -> None:
"""Uzkrāj dashboard metriku bucketam; `boosted` nozīmē benchmark feedback match."""
bucket = dashboard.setdefault(
label,
{
"records": 0.0,
"score_total": 0.0,
"repeat_total": 0.0,
"repeat_multiplier_total": 0.0,
"boosted_records": 0.0,
},
)
bucket["records"] += 1.0
bucket["score_total"] += score
bucket["repeat_total"] += float(repeat_count)
bucket["repeat_multiplier_total"] += repeat_multiplier
if boosted:
bucket["boosted_records"] += 1.0
def _finalize_dashboard(dashboard: dict[str, dict[str, float]]) -> dict[str, dict[str, float]]:
finalized: dict[str, dict[str, float]] = {}
for label, bucket in sorted(dashboard.items()):
records = max(bucket.get("records", 0.0), 1.0)
finalized[label] = {
"records": int(bucket.get("records", 0.0)),
"average_score": round(bucket.get("score_total", 0.0) / records, 4),
"average_repeat_count": round(bucket.get("repeat_total", 0.0) / records, 4),
"average_repeat_multiplier": round(
bucket.get("repeat_multiplier_total", 0.0) / records,
4,
),
"boosted_records": int(bucket.get("boosted_records", 0.0)),
}
return finalized
def _normalize_label(value: str) -> str:
return clean_text(value).casefold().replace("-", "_").replace(" ", "_")