| """Dataseta scoring, source-aware weighting un benchmark feedback palīgi.""" |
|
|
| from __future__ import annotations |
|
|
| import json |
| from dataclasses import asdict, dataclass, field |
| from pathlib import Path |
| from typing import Any |
|
|
| from maris_core.data.preprocessing import clean_text, record_to_training_text |
|
|
| DEFAULT_SOURCE_WEIGHT_MAP = { |
| "production": 1.3, |
| "synthetic": 1.0, |
| "noisy": 0.65, |
| "unknown": 1.0, |
| } |
| SOURCE_TIER_TOKEN_MAP = { |
| "production": ("production", "prod", "live", "human", "curated", "real", "customer"), |
| "synthetic": ("synthetic", "generated", "augmented", "distilled", "bootstrap", "seeded"), |
| "noisy": ("noisy", "weak", "scraped", "raw", "unfiltered", "test"), |
| } |
| BENCHMARK_METRIC_ALIASES = { |
| "reasoning": {"reasoning", "analysis", "logic", "planner"}, |
| "coding": {"coding", "code", "programming", "developer"}, |
| "long_context": {"long_context", "context", "memory", "retrieval"}, |
| "helpfulness": {"helpfulness", "helpful", "assistant", "support"}, |
| "factuality": {"factuality", "facts", "grounding", "grounded"}, |
| "latvian_quality": {"latvian_quality", "latvian", "language_lv", "lv"}, |
| "safety": {"safety", "safe", "guardrails", "policy"}, |
| } |
|
|
|
|
| @dataclass(slots=True, frozen=True) |
| class DatasetScoringConfig: |
| """Konfigurācija dataset scoring/weighting solim.""" |
|
|
| enabled: bool = True |
| weighted_repetition_enabled: bool = True |
| max_text_chars: int = 8192 |
| low_score_repeat_count: int = 1 |
| medium_score_repeat_count: int = 2 |
| high_score_repeat_count: int = 3 |
| medium_score_threshold: float = 0.55 |
| high_score_threshold: float = 0.8 |
| source_weighting_enabled: bool = True |
| source_weight_map: dict[str, float] = field( |
| default_factory=lambda: DEFAULT_SOURCE_WEIGHT_MAP.copy() |
| ) |
| category_weight_map: dict[str, float] = field(default_factory=dict) |
| max_effective_repeat_count: int = 6 |
| benchmark_feedback_enabled: bool = True |
| benchmark_feedback_path: str = "" |
| benchmark_feedback_boost_scale: float = 2.0 |
| benchmark_feedback_max_multiplier: float = 1.75 |
|
|
|
|
| @dataclass(slots=True, frozen=True) |
| class DatasetBenchmarkFeedback: |
| """Iepriekšējā benchmark artefakta kopsavilkums reweighting vajadzībām.""" |
|
|
| artifact_path: str |
| deficient_metrics: dict[str, dict[str, float]] |
| overall_multiplier: float = 1.0 |
| discovery_mode: str = "explicit" |
|
|
|
|
| @dataclass(slots=True) |
| class DatasetScoringSplitReport: |
| """Viena split scoring rezultātu kopsavilkums.""" |
|
|
| split_name: str |
| input_records: int = 0 |
| expanded_records: int = 0 |
| average_score: float = 0.0 |
| max_score: float = 0.0 |
| min_score: float = 0.0 |
| repeated_records: int = 0 |
| score_buckets: dict[str, int] = field(default_factory=dict) |
| repeat_buckets: dict[str, int] = field(default_factory=dict) |
| source_tiers: dict[str, int] = field(default_factory=dict) |
| category_buckets: dict[str, int] = field(default_factory=dict) |
| feedback_metric_hits: dict[str, int] = field(default_factory=dict) |
| feedback_boosted_records: int = 0 |
| average_repeat_multiplier: float = 1.0 |
| source_dashboard: dict[str, dict[str, float]] = field(default_factory=dict) |
| category_dashboard: dict[str, dict[str, float]] = field(default_factory=dict) |
| sample_scores: list[dict[str, Any]] = field(default_factory=list) |
|
|
| def to_dict(self) -> dict[str, Any]: |
| return asdict(self) |
|
|
|
|
| @dataclass(slots=True) |
| class DatasetScoringReport: |
| """Pilns dataset scoring artefakts.""" |
|
|
| artifact_type: str |
| config: dict[str, Any] |
| splits: dict[str, DatasetScoringSplitReport] |
|
|
| def to_dict(self) -> dict[str, Any]: |
| return { |
| "artifact_type": self.artifact_type, |
| "config": self.config, |
| "splits": {name: report.to_dict() for name, report in self.splits.items()}, |
| "input_records": sum(report.input_records for report in self.splits.values()), |
| "expanded_records": sum(report.expanded_records for report in self.splits.values()), |
| "repeated_records": sum(report.repeated_records for report in self.splits.values()), |
| } |
|
|
|
|
| def apply_scoring_to_records( |
| records: list[dict[str, Any]], |
| *, |
| split_name: str, |
| config: DatasetScoringConfig, |
| expand_weights: bool, |
| benchmark_feedback: DatasetBenchmarkFeedback | None = None, |
| ) -> tuple[list[dict[str, Any]], DatasetScoringSplitReport]: |
| """Aprēķina score un materializē weighting kā atkārtojumus.""" |
|
|
| report = DatasetScoringSplitReport(split_name=split_name, input_records=len(records)) |
| if not records: |
| return [], report |
|
|
| scored_records: list[tuple[dict[str, Any], float, int]] = [] |
| total_score = 0.0 |
| total_repeat_multiplier = 0.0 |
| source_dashboard: dict[str, dict[str, float]] = {} |
| category_dashboard: dict[str, dict[str, float]] = {} |
|
|
| for record in records: |
| score = score_record(record, max_text_chars=config.max_text_chars) |
| source_tier = detect_source_tier(record) |
| category_label = detect_record_category(record) |
| source_multiplier = _source_multiplier_for_record(record, config) |
| category_multiplier = _category_multiplier_for_record(record, config) |
| feedback_multiplier, matched_metrics = _feedback_multiplier_for_record( |
| record, |
| benchmark_feedback, |
| ) |
| repeat_multiplier = source_multiplier * category_multiplier * feedback_multiplier |
| repeat_count = _effective_repeat_count( |
| score, |
| repeat_multiplier=repeat_multiplier, |
| config=config, |
| expand_weights=expand_weights, |
| ) |
| total_score += score |
| total_repeat_multiplier += repeat_multiplier |
| report.score_buckets[_score_bucket(score)] = ( |
| report.score_buckets.get(_score_bucket(score), 0) + 1 |
| ) |
| report.repeat_buckets[str(repeat_count)] = ( |
| report.repeat_buckets.get(str(repeat_count), 0) + 1 |
| ) |
| report.source_tiers[source_tier] = report.source_tiers.get(source_tier, 0) + 1 |
| report.category_buckets[category_label] = report.category_buckets.get(category_label, 0) + 1 |
| if repeat_count > 1: |
| report.repeated_records += 1 |
| if matched_metrics: |
| report.feedback_boosted_records += 1 |
| for metric in matched_metrics: |
| report.feedback_metric_hits[metric] = report.feedback_metric_hits.get(metric, 0) + 1 |
| _update_dashboard_bucket( |
| source_dashboard, |
| source_tier, |
| score=score, |
| repeat_count=repeat_count, |
| repeat_multiplier=repeat_multiplier, |
| boosted=bool(matched_metrics), |
| ) |
| _update_dashboard_bucket( |
| category_dashboard, |
| category_label, |
| score=score, |
| repeat_count=repeat_count, |
| repeat_multiplier=repeat_multiplier, |
| boosted=bool(matched_metrics), |
| ) |
| if len(report.sample_scores) < 5: |
| report.sample_scores.append( |
| { |
| "score": round(score, 4), |
| "source_tier": source_tier, |
| "category": category_label, |
| "source_multiplier": round(source_multiplier, 4), |
| "category_multiplier": round(category_multiplier, 4), |
| "feedback_multiplier": round(feedback_multiplier, 4), |
| "matched_metrics": matched_metrics, |
| "repeat_count": repeat_count, |
| "preview": record_to_training_text(record, max_chars=config.max_text_chars)[ |
| :160 |
| ], |
| } |
| ) |
| scored_records.append((record, score, repeat_count)) |
|
|
| report.average_score = round(total_score / len(scored_records), 4) |
| report.average_repeat_multiplier = round(total_repeat_multiplier / len(scored_records), 4) |
| report.source_dashboard = _finalize_dashboard(source_dashboard) |
| report.category_dashboard = _finalize_dashboard(category_dashboard) |
| score_values = [score for _, score, _ in scored_records] |
| report.min_score = round(min(score_values), 4) |
| report.max_score = round(max(score_values), 4) |
|
|
| if not config.enabled: |
| expanded = list(records) |
| else: |
| expanded = [] |
| for record, score, repeat_count in scored_records: |
| repeat_total = repeat_count if expand_weights else 1 |
| for copy_index in range(repeat_total): |
| enriched = dict(record) |
| enriched["maris_dataset_score"] = round(score, 4) |
| enriched["maris_dataset_repeat_count"] = repeat_count |
| enriched["maris_dataset_repeat_index"] = copy_index |
| enriched["maris_dataset_source_tier"] = detect_source_tier(record) |
| expanded.append(enriched) |
|
|
| report.expanded_records = len(expanded) |
| return expanded, report |
|
|
|
|
| def build_dataset_scoring_report( |
| *, |
| config: DatasetScoringConfig, |
| train_report: DatasetScoringSplitReport, |
| eval_report: DatasetScoringSplitReport | None = None, |
| ) -> DatasetScoringReport: |
| """Izveido serializējamu scoring artefaktu.""" |
|
|
| splits = {train_report.split_name: train_report} |
| if eval_report is not None: |
| splits[eval_report.split_name] = eval_report |
| return DatasetScoringReport( |
| artifact_type="dataset-scoring-report", |
| config=asdict(config), |
| splits=splits, |
| ) |
|
|
|
|
| def load_benchmark_feedback( |
| path: str | Path, |
| *, |
| targets: dict[str, float], |
| boost_scale: float, |
| max_multiplier: float, |
| ) -> DatasetBenchmarkFeedback: |
| """Ielādē benchmark manifestu/feedback artefaktu un pārvērš reweighting noteikumos.""" |
|
|
| payload = json.loads(Path(path).read_text(encoding="utf-8")) |
| if "deficient_metrics" in payload: |
| deficient_metrics = { |
| str(metric): { |
| "target": float(details.get("target", 0.0)), |
| "actual": float(details.get("actual", 0.0)), |
| "deficit": float(details.get("deficit", 0.0)), |
| "multiplier": float(details.get("multiplier", 1.0)), |
| } |
| for metric, details in payload.get("deficient_metrics", {}).items() |
| if isinstance(details, dict) |
| } |
| overall_multiplier = float(payload.get("overall_multiplier", 1.0) or 1.0) |
| return DatasetBenchmarkFeedback( |
| artifact_path=str(path), |
| deficient_metrics=deficient_metrics, |
| overall_multiplier=overall_multiplier, |
| discovery_mode=str(payload.get("discovery_mode", "explicit") or "explicit"), |
| ) |
|
|
| score_manifest = payload.get("score_manifest") |
| if not isinstance(score_manifest, dict): |
| raise ValueError("Benchmark feedback failā jābūt `score_manifest` vai `deficient_metrics`.") |
|
|
| deficient_metrics: dict[str, dict[str, float]] = {} |
| overall_multiplier = 1.0 |
| for metric, target in targets.items(): |
| actual = float(score_manifest.get(metric, score_manifest.get("overall", 0.0)) or 0.0) |
| deficit = max(float(target) - actual, 0.0) |
| if deficit <= 0: |
| continue |
| multiplier = min(1.0 + deficit * boost_scale, max_multiplier) |
| deficient_metrics[str(metric)] = { |
| "target": float(target), |
| "actual": actual, |
| "deficit": round(deficit, 4), |
| "multiplier": round(multiplier, 4), |
| } |
| if metric == "overall": |
| overall_multiplier = round(multiplier, 4) |
|
|
| return DatasetBenchmarkFeedback( |
| artifact_path=str(path), |
| deficient_metrics=deficient_metrics, |
| overall_multiplier=overall_multiplier, |
| ) |
|
|
|
|
| def build_benchmark_feedback_artifact( |
| feedback: DatasetBenchmarkFeedback, |
| ) -> dict[str, Any]: |
| """Izveido serializējamu benchmark-feedback artefaktu.""" |
|
|
| return { |
| "artifact_type": "benchmark-feedback-reweighting", |
| "artifact_path": feedback.artifact_path, |
| "overall_multiplier": feedback.overall_multiplier, |
| "discovery_mode": feedback.discovery_mode, |
| "deficient_metrics": feedback.deficient_metrics, |
| } |
|
|
|
|
| def score_record(record: dict[str, Any], *, max_text_chars: int) -> float: |
| """Aprēķina heuristisku datu kvalitātes score [0,1] intervālā.""" |
|
|
| text = clean_text(record_to_training_text(record, max_chars=max_text_chars)) |
| if not text: |
| return 0.0 |
|
|
| text_length = len(text) |
| tokens = [token for token in text.casefold().split() if token] |
| unique_tokens = len(set(tokens)) |
|
|
| length_score = min(text_length / 240.0, 1.0) |
| diversity_score = min(unique_tokens / max(len(tokens), 1), 1.0) |
| structure_score = _structure_score(record) |
| metadata_score = _metadata_score(record) |
|
|
| score = ( |
| 0.35 * length_score |
| + 0.30 * diversity_score |
| + 0.20 * structure_score |
| + 0.15 * metadata_score |
| ) |
| return max(0.0, min(round(score, 4), 1.0)) |
|
|
|
|
| def detect_source_tier(record: dict[str, Any]) -> str: |
| """Atrod source tier svarošnai.""" |
|
|
| candidates = _record_terms(record) |
| explicit = record.get("source_tier") or record.get("source_quality") |
| if isinstance(explicit, str): |
| normalized = clean_text(explicit).casefold().replace(" ", "_") |
| if normalized in DEFAULT_SOURCE_WEIGHT_MAP: |
| return normalized |
|
|
| for tier, tokens in SOURCE_TIER_TOKEN_MAP.items(): |
| if any(token in candidates for token in tokens): |
| return tier |
| return "unknown" |
|
|
|
|
| def detect_record_category(record: dict[str, Any]) -> str: |
| """Atrod stabilu kategorijas label dashboard grupēšanai.""" |
|
|
| for candidate in ( |
| record.get("category"), |
| record.get("task_category"), |
| record.get("branch_focus"), |
| ): |
| if isinstance(candidate, str) and clean_text(candidate): |
| return _normalize_label(candidate) |
|
|
| metadata = record.get("metadata") |
| if isinstance(metadata, dict): |
| for key in ("category", "focus", "type"): |
| candidate = metadata.get(key) |
| if isinstance(candidate, str) and clean_text(candidate): |
| return _normalize_label(candidate) |
| return "general" |
|
|
|
|
| def _structure_score(record: dict[str, Any]) -> float: |
| if isinstance(record.get("user"), str) and isinstance(record.get("assistant"), str): |
| user = clean_text(str(record.get("user", ""))) |
| assistant = clean_text(str(record.get("assistant", ""))) |
| if user and assistant and user.casefold() != assistant.casefold(): |
| return 1.0 |
| return 0.3 |
| if isinstance(record.get("prompt"), str): |
| return 0.8 if clean_text(str(record.get("prompt", ""))) else 0.2 |
| if isinstance(record.get("text"), str): |
| return 0.6 if clean_text(str(record.get("text", ""))) else 0.2 |
| return 0.4 |
|
|
|
|
| def _metadata_score(record: dict[str, Any]) -> float: |
| metadata = record.get("metadata") |
| score = 0.0 |
| if isinstance(metadata, dict): |
| score += min(len(metadata) / 4.0, 1.0) * 0.7 |
| if isinstance(record.get("language"), str) and clean_text(str(record["language"])): |
| score += 0.15 |
| if isinstance(record.get("source"), str) and clean_text(str(record["source"])): |
| score += 0.15 |
| return min(score, 1.0) |
|
|
|
|
| def _source_multiplier_for_record(record: dict[str, Any], config: DatasetScoringConfig) -> float: |
| if not config.source_weighting_enabled: |
| return 1.0 |
| source_tier = detect_source_tier(record) |
| return max(0.1, float(config.source_weight_map.get(source_tier, 1.0))) |
|
|
|
|
| def _category_multiplier_for_record(record: dict[str, Any], config: DatasetScoringConfig) -> float: |
| if not config.category_weight_map: |
| return 1.0 |
| labels = _record_labels(record) |
| matches = [ |
| float(weight) |
| for label, weight in config.category_weight_map.items() |
| if clean_text(str(label)).casefold().replace("-", "_").replace(" ", "_") in labels |
| ] |
| if not matches: |
| return 1.0 |
| return max(0.1, max(matches)) |
|
|
|
|
| def _feedback_multiplier_for_record( |
| record: dict[str, Any], |
| feedback: DatasetBenchmarkFeedback | None, |
| ) -> tuple[float, list[str]]: |
| if feedback is None or not feedback.deficient_metrics: |
| return 1.0, [] |
|
|
| labels = _record_labels(record) |
| matched_metrics = sorted( |
| metric |
| for metric in feedback.deficient_metrics |
| if metric == "overall" |
| or labels.intersection(BENCHMARK_METRIC_ALIASES.get(metric, {metric})) |
| ) |
| if not matched_metrics: |
| return feedback.overall_multiplier, ["overall"] if feedback.overall_multiplier > 1.0 else [] |
|
|
| specific_multiplier = ( |
| max( |
| float(feedback.deficient_metrics[metric].get("multiplier", 1.0) or 1.0) |
| for metric in matched_metrics |
| if metric != "overall" |
| ) |
| if any(metric != "overall" for metric in matched_metrics) |
| else 1.0 |
| ) |
| combined = min( |
| max(1.0, specific_multiplier) * max(1.0, feedback.overall_multiplier), |
| max( |
| [feedback.overall_multiplier] |
| + [ |
| float(details.get("multiplier", 1.0) or 1.0) |
| for details in feedback.deficient_metrics.values() |
| ] |
| ), |
| ) |
| return round(max(1.0, combined), 4), matched_metrics |
|
|
|
|
| def _effective_repeat_count( |
| score: float, |
| *, |
| repeat_multiplier: float, |
| config: DatasetScoringConfig, |
| expand_weights: bool, |
| ) -> int: |
| if not config.enabled or not expand_weights or not config.weighted_repetition_enabled: |
| return 1 |
| base_repeat_count = _repeat_count_for_score(score, config) |
| weighted = int(round(base_repeat_count * max(repeat_multiplier, 0.1))) |
| return max(1, min(weighted, config.max_effective_repeat_count)) |
|
|
|
|
| def _repeat_count_for_score(score: float, config: DatasetScoringConfig) -> int: |
| if score >= config.high_score_threshold: |
| return max(1, config.high_score_repeat_count) |
| if score >= config.medium_score_threshold: |
| return max(1, config.medium_score_repeat_count) |
| return max(1, config.low_score_repeat_count) |
|
|
|
|
| def _score_bucket(score: float) -> str: |
| if score >= 0.8: |
| return "high" |
| if score >= 0.55: |
| return "medium" |
| return "low" |
|
|
|
|
| def _record_terms(record: dict[str, Any]) -> set[str]: |
| values: list[str] = [] |
| for key in ("source", "source_type", "source_quality", "source_tier", "category", "language"): |
| value = record.get(key) |
| if isinstance(value, str): |
| values.extend(value.casefold().replace("-", " ").replace("_", " ").split()) |
| metadata = record.get("metadata") |
| if isinstance(metadata, dict): |
| for key in ("source", "source_type", "source_quality", "source_tier", "origin", "category"): |
| value = metadata.get(key) |
| if isinstance(value, str): |
| values.extend(value.casefold().replace("-", " ").replace("_", " ").split()) |
| tags = metadata.get("tags") |
| if isinstance(tags, list): |
| for item in tags: |
| if isinstance(item, str): |
| values.extend(item.casefold().replace("-", " ").replace("_", " ").split()) |
| tags = record.get("tags") |
| if isinstance(tags, list): |
| for item in tags: |
| if isinstance(item, str): |
| values.extend(item.casefold().replace("-", " ").replace("_", " ").split()) |
| return set(values) |
|
|
|
|
| def _record_labels(record: dict[str, Any]) -> set[str]: |
| labels: set[str] = set() |
| for key in ("category", "branch_focus", "source", "language"): |
| value = record.get(key) |
| if isinstance(value, str) and clean_text(value): |
| normalized = clean_text(value).casefold().replace("-", "_").replace(" ", "_") |
| labels.add(normalized) |
| labels.update(normalized.split("_")) |
| metadata = record.get("metadata") |
| if isinstance(metadata, dict): |
| for key in ("category", "focus", "type", "language"): |
| value = metadata.get(key) |
| if isinstance(value, str) and clean_text(value): |
| normalized = clean_text(value).casefold().replace("-", "_").replace(" ", "_") |
| labels.add(normalized) |
| labels.update(normalized.split("_")) |
| tags = metadata.get("tags") |
| if isinstance(tags, list): |
| for item in tags: |
| if isinstance(item, str) and clean_text(item): |
| normalized = clean_text(item).casefold().replace("-", "_").replace(" ", "_") |
| labels.add(normalized) |
| labels.update(normalized.split("_")) |
| tags = record.get("tags") |
| if isinstance(tags, list): |
| for item in tags: |
| if isinstance(item, str) and clean_text(item): |
| normalized = clean_text(item).casefold().replace("-", "_").replace(" ", "_") |
| labels.add(normalized) |
| labels.update(normalized.split("_")) |
| return labels |
|
|
|
|
| def _update_dashboard_bucket( |
| dashboard: dict[str, dict[str, float]], |
| label: str, |
| *, |
| score: float, |
| repeat_count: int, |
| repeat_multiplier: float, |
| boosted: bool, |
| ) -> None: |
| """Uzkrāj dashboard metriku bucketam; `boosted` nozīmē benchmark feedback match.""" |
|
|
| bucket = dashboard.setdefault( |
| label, |
| { |
| "records": 0.0, |
| "score_total": 0.0, |
| "repeat_total": 0.0, |
| "repeat_multiplier_total": 0.0, |
| "boosted_records": 0.0, |
| }, |
| ) |
| bucket["records"] += 1.0 |
| bucket["score_total"] += score |
| bucket["repeat_total"] += float(repeat_count) |
| bucket["repeat_multiplier_total"] += repeat_multiplier |
| if boosted: |
| bucket["boosted_records"] += 1.0 |
|
|
|
|
| def _finalize_dashboard(dashboard: dict[str, dict[str, float]]) -> dict[str, dict[str, float]]: |
| finalized: dict[str, dict[str, float]] = {} |
| for label, bucket in sorted(dashboard.items()): |
| records = max(bucket.get("records", 0.0), 1.0) |
| finalized[label] = { |
| "records": int(bucket.get("records", 0.0)), |
| "average_score": round(bucket.get("score_total", 0.0) / records, 4), |
| "average_repeat_count": round(bucket.get("repeat_total", 0.0) / records, 4), |
| "average_repeat_multiplier": round( |
| bucket.get("repeat_multiplier_total", 0.0) / records, |
| 4, |
| ), |
| "boosted_records": int(bucket.get("boosted_records", 0.0)), |
| } |
| return finalized |
|
|
|
|
| def _normalize_label(value: str) -> str: |
| return clean_text(value).casefold().replace("-", "_").replace(" ", "_") |
|
|