"""Dataseta scoring, source-aware weighting un benchmark feedback palīgi.""" from __future__ import annotations import json from dataclasses import asdict, dataclass, field from pathlib import Path from typing import Any from maris_core.data.preprocessing import clean_text, record_to_training_text DEFAULT_SOURCE_WEIGHT_MAP = { "production": 1.3, "synthetic": 1.0, "noisy": 0.65, "unknown": 1.0, } SOURCE_TIER_TOKEN_MAP = { "production": ("production", "prod", "live", "human", "curated", "real", "customer"), "synthetic": ("synthetic", "generated", "augmented", "distilled", "bootstrap", "seeded"), "noisy": ("noisy", "weak", "scraped", "raw", "unfiltered", "test"), } BENCHMARK_METRIC_ALIASES = { "reasoning": {"reasoning", "analysis", "logic", "planner"}, "coding": {"coding", "code", "programming", "developer"}, "long_context": {"long_context", "context", "memory", "retrieval"}, "helpfulness": {"helpfulness", "helpful", "assistant", "support"}, "factuality": {"factuality", "facts", "grounding", "grounded"}, "latvian_quality": {"latvian_quality", "latvian", "language_lv", "lv"}, "safety": {"safety", "safe", "guardrails", "policy"}, } @dataclass(slots=True, frozen=True) class DatasetScoringConfig: """Konfigurācija dataset scoring/weighting solim.""" enabled: bool = True weighted_repetition_enabled: bool = True max_text_chars: int = 8192 low_score_repeat_count: int = 1 medium_score_repeat_count: int = 2 high_score_repeat_count: int = 3 medium_score_threshold: float = 0.55 high_score_threshold: float = 0.8 source_weighting_enabled: bool = True source_weight_map: dict[str, float] = field( default_factory=lambda: DEFAULT_SOURCE_WEIGHT_MAP.copy() ) category_weight_map: dict[str, float] = field(default_factory=dict) max_effective_repeat_count: int = 6 benchmark_feedback_enabled: bool = True benchmark_feedback_path: str = "" benchmark_feedback_boost_scale: float = 2.0 benchmark_feedback_max_multiplier: float = 1.75 @dataclass(slots=True, frozen=True) class DatasetBenchmarkFeedback: """Iepriekšējā benchmark artefakta kopsavilkums reweighting vajadzībām.""" artifact_path: str deficient_metrics: dict[str, dict[str, float]] overall_multiplier: float = 1.0 discovery_mode: str = "explicit" @dataclass(slots=True) class DatasetScoringSplitReport: """Viena split scoring rezultātu kopsavilkums.""" split_name: str input_records: int = 0 expanded_records: int = 0 average_score: float = 0.0 max_score: float = 0.0 min_score: float = 0.0 repeated_records: int = 0 score_buckets: dict[str, int] = field(default_factory=dict) repeat_buckets: dict[str, int] = field(default_factory=dict) source_tiers: dict[str, int] = field(default_factory=dict) category_buckets: dict[str, int] = field(default_factory=dict) feedback_metric_hits: dict[str, int] = field(default_factory=dict) feedback_boosted_records: int = 0 average_repeat_multiplier: float = 1.0 source_dashboard: dict[str, dict[str, float]] = field(default_factory=dict) category_dashboard: dict[str, dict[str, float]] = field(default_factory=dict) sample_scores: list[dict[str, Any]] = field(default_factory=list) def to_dict(self) -> dict[str, Any]: return asdict(self) @dataclass(slots=True) class DatasetScoringReport: """Pilns dataset scoring artefakts.""" artifact_type: str config: dict[str, Any] splits: dict[str, DatasetScoringSplitReport] def to_dict(self) -> dict[str, Any]: return { "artifact_type": self.artifact_type, "config": self.config, "splits": {name: report.to_dict() for name, report in self.splits.items()}, "input_records": sum(report.input_records for report in self.splits.values()), "expanded_records": sum(report.expanded_records for report in self.splits.values()), "repeated_records": sum(report.repeated_records for report in self.splits.values()), } def apply_scoring_to_records( records: list[dict[str, Any]], *, split_name: str, config: DatasetScoringConfig, expand_weights: bool, benchmark_feedback: DatasetBenchmarkFeedback | None = None, ) -> tuple[list[dict[str, Any]], DatasetScoringSplitReport]: """Aprēķina score un materializē weighting kā atkārtojumus.""" report = DatasetScoringSplitReport(split_name=split_name, input_records=len(records)) if not records: return [], report scored_records: list[tuple[dict[str, Any], float, int]] = [] total_score = 0.0 total_repeat_multiplier = 0.0 source_dashboard: dict[str, dict[str, float]] = {} category_dashboard: dict[str, dict[str, float]] = {} for record in records: score = score_record(record, max_text_chars=config.max_text_chars) source_tier = detect_source_tier(record) category_label = detect_record_category(record) source_multiplier = _source_multiplier_for_record(record, config) category_multiplier = _category_multiplier_for_record(record, config) feedback_multiplier, matched_metrics = _feedback_multiplier_for_record( record, benchmark_feedback, ) repeat_multiplier = source_multiplier * category_multiplier * feedback_multiplier repeat_count = _effective_repeat_count( score, repeat_multiplier=repeat_multiplier, config=config, expand_weights=expand_weights, ) total_score += score total_repeat_multiplier += repeat_multiplier report.score_buckets[_score_bucket(score)] = ( report.score_buckets.get(_score_bucket(score), 0) + 1 ) report.repeat_buckets[str(repeat_count)] = ( report.repeat_buckets.get(str(repeat_count), 0) + 1 ) report.source_tiers[source_tier] = report.source_tiers.get(source_tier, 0) + 1 report.category_buckets[category_label] = report.category_buckets.get(category_label, 0) + 1 if repeat_count > 1: report.repeated_records += 1 if matched_metrics: report.feedback_boosted_records += 1 for metric in matched_metrics: report.feedback_metric_hits[metric] = report.feedback_metric_hits.get(metric, 0) + 1 _update_dashboard_bucket( source_dashboard, source_tier, score=score, repeat_count=repeat_count, repeat_multiplier=repeat_multiplier, boosted=bool(matched_metrics), ) _update_dashboard_bucket( category_dashboard, category_label, score=score, repeat_count=repeat_count, repeat_multiplier=repeat_multiplier, boosted=bool(matched_metrics), ) if len(report.sample_scores) < 5: report.sample_scores.append( { "score": round(score, 4), "source_tier": source_tier, "category": category_label, "source_multiplier": round(source_multiplier, 4), "category_multiplier": round(category_multiplier, 4), "feedback_multiplier": round(feedback_multiplier, 4), "matched_metrics": matched_metrics, "repeat_count": repeat_count, "preview": record_to_training_text(record, max_chars=config.max_text_chars)[ :160 ], } ) scored_records.append((record, score, repeat_count)) report.average_score = round(total_score / len(scored_records), 4) report.average_repeat_multiplier = round(total_repeat_multiplier / len(scored_records), 4) report.source_dashboard = _finalize_dashboard(source_dashboard) report.category_dashboard = _finalize_dashboard(category_dashboard) score_values = [score for _, score, _ in scored_records] report.min_score = round(min(score_values), 4) report.max_score = round(max(score_values), 4) if not config.enabled: expanded = list(records) else: expanded = [] for record, score, repeat_count in scored_records: repeat_total = repeat_count if expand_weights else 1 for copy_index in range(repeat_total): enriched = dict(record) enriched["maris_dataset_score"] = round(score, 4) enriched["maris_dataset_repeat_count"] = repeat_count enriched["maris_dataset_repeat_index"] = copy_index enriched["maris_dataset_source_tier"] = detect_source_tier(record) expanded.append(enriched) report.expanded_records = len(expanded) return expanded, report def build_dataset_scoring_report( *, config: DatasetScoringConfig, train_report: DatasetScoringSplitReport, eval_report: DatasetScoringSplitReport | None = None, ) -> DatasetScoringReport: """Izveido serializējamu scoring artefaktu.""" splits = {train_report.split_name: train_report} if eval_report is not None: splits[eval_report.split_name] = eval_report return DatasetScoringReport( artifact_type="dataset-scoring-report", config=asdict(config), splits=splits, ) def load_benchmark_feedback( path: str | Path, *, targets: dict[str, float], boost_scale: float, max_multiplier: float, ) -> DatasetBenchmarkFeedback: """Ielādē benchmark manifestu/feedback artefaktu un pārvērš reweighting noteikumos.""" payload = json.loads(Path(path).read_text(encoding="utf-8")) if "deficient_metrics" in payload: deficient_metrics = { str(metric): { "target": float(details.get("target", 0.0)), "actual": float(details.get("actual", 0.0)), "deficit": float(details.get("deficit", 0.0)), "multiplier": float(details.get("multiplier", 1.0)), } for metric, details in payload.get("deficient_metrics", {}).items() if isinstance(details, dict) } overall_multiplier = float(payload.get("overall_multiplier", 1.0) or 1.0) return DatasetBenchmarkFeedback( artifact_path=str(path), deficient_metrics=deficient_metrics, overall_multiplier=overall_multiplier, discovery_mode=str(payload.get("discovery_mode", "explicit") or "explicit"), ) score_manifest = payload.get("score_manifest") if not isinstance(score_manifest, dict): raise ValueError("Benchmark feedback failā jābūt `score_manifest` vai `deficient_metrics`.") deficient_metrics: dict[str, dict[str, float]] = {} overall_multiplier = 1.0 for metric, target in targets.items(): actual = float(score_manifest.get(metric, score_manifest.get("overall", 0.0)) or 0.0) deficit = max(float(target) - actual, 0.0) if deficit <= 0: continue multiplier = min(1.0 + deficit * boost_scale, max_multiplier) deficient_metrics[str(metric)] = { "target": float(target), "actual": actual, "deficit": round(deficit, 4), "multiplier": round(multiplier, 4), } if metric == "overall": overall_multiplier = round(multiplier, 4) return DatasetBenchmarkFeedback( artifact_path=str(path), deficient_metrics=deficient_metrics, overall_multiplier=overall_multiplier, ) def build_benchmark_feedback_artifact( feedback: DatasetBenchmarkFeedback, ) -> dict[str, Any]: """Izveido serializējamu benchmark-feedback artefaktu.""" return { "artifact_type": "benchmark-feedback-reweighting", "artifact_path": feedback.artifact_path, "overall_multiplier": feedback.overall_multiplier, "discovery_mode": feedback.discovery_mode, "deficient_metrics": feedback.deficient_metrics, } def score_record(record: dict[str, Any], *, max_text_chars: int) -> float: """Aprēķina heuristisku datu kvalitātes score [0,1] intervālā.""" text = clean_text(record_to_training_text(record, max_chars=max_text_chars)) if not text: return 0.0 text_length = len(text) tokens = [token for token in text.casefold().split() if token] unique_tokens = len(set(tokens)) length_score = min(text_length / 240.0, 1.0) diversity_score = min(unique_tokens / max(len(tokens), 1), 1.0) structure_score = _structure_score(record) metadata_score = _metadata_score(record) score = ( 0.35 * length_score + 0.30 * diversity_score + 0.20 * structure_score + 0.15 * metadata_score ) return max(0.0, min(round(score, 4), 1.0)) def detect_source_tier(record: dict[str, Any]) -> str: """Atrod source tier svarošnai.""" candidates = _record_terms(record) explicit = record.get("source_tier") or record.get("source_quality") if isinstance(explicit, str): normalized = clean_text(explicit).casefold().replace(" ", "_") if normalized in DEFAULT_SOURCE_WEIGHT_MAP: return normalized for tier, tokens in SOURCE_TIER_TOKEN_MAP.items(): if any(token in candidates for token in tokens): return tier return "unknown" def detect_record_category(record: dict[str, Any]) -> str: """Atrod stabilu kategorijas label dashboard grupēšanai.""" for candidate in ( record.get("category"), record.get("task_category"), record.get("branch_focus"), ): if isinstance(candidate, str) and clean_text(candidate): return _normalize_label(candidate) metadata = record.get("metadata") if isinstance(metadata, dict): for key in ("category", "focus", "type"): candidate = metadata.get(key) if isinstance(candidate, str) and clean_text(candidate): return _normalize_label(candidate) return "general" def _structure_score(record: dict[str, Any]) -> float: if isinstance(record.get("user"), str) and isinstance(record.get("assistant"), str): user = clean_text(str(record.get("user", ""))) assistant = clean_text(str(record.get("assistant", ""))) if user and assistant and user.casefold() != assistant.casefold(): return 1.0 return 0.3 if isinstance(record.get("prompt"), str): return 0.8 if clean_text(str(record.get("prompt", ""))) else 0.2 if isinstance(record.get("text"), str): return 0.6 if clean_text(str(record.get("text", ""))) else 0.2 return 0.4 def _metadata_score(record: dict[str, Any]) -> float: metadata = record.get("metadata") score = 0.0 if isinstance(metadata, dict): score += min(len(metadata) / 4.0, 1.0) * 0.7 if isinstance(record.get("language"), str) and clean_text(str(record["language"])): score += 0.15 if isinstance(record.get("source"), str) and clean_text(str(record["source"])): score += 0.15 return min(score, 1.0) def _source_multiplier_for_record(record: dict[str, Any], config: DatasetScoringConfig) -> float: if not config.source_weighting_enabled: return 1.0 source_tier = detect_source_tier(record) return max(0.1, float(config.source_weight_map.get(source_tier, 1.0))) def _category_multiplier_for_record(record: dict[str, Any], config: DatasetScoringConfig) -> float: if not config.category_weight_map: return 1.0 labels = _record_labels(record) matches = [ float(weight) for label, weight in config.category_weight_map.items() if clean_text(str(label)).casefold().replace("-", "_").replace(" ", "_") in labels ] if not matches: return 1.0 return max(0.1, max(matches)) def _feedback_multiplier_for_record( record: dict[str, Any], feedback: DatasetBenchmarkFeedback | None, ) -> tuple[float, list[str]]: if feedback is None or not feedback.deficient_metrics: return 1.0, [] labels = _record_labels(record) matched_metrics = sorted( metric for metric in feedback.deficient_metrics if metric == "overall" or labels.intersection(BENCHMARK_METRIC_ALIASES.get(metric, {metric})) ) if not matched_metrics: return feedback.overall_multiplier, ["overall"] if feedback.overall_multiplier > 1.0 else [] specific_multiplier = ( max( float(feedback.deficient_metrics[metric].get("multiplier", 1.0) or 1.0) for metric in matched_metrics if metric != "overall" ) if any(metric != "overall" for metric in matched_metrics) else 1.0 ) combined = min( max(1.0, specific_multiplier) * max(1.0, feedback.overall_multiplier), max( [feedback.overall_multiplier] + [ float(details.get("multiplier", 1.0) or 1.0) for details in feedback.deficient_metrics.values() ] ), ) return round(max(1.0, combined), 4), matched_metrics def _effective_repeat_count( score: float, *, repeat_multiplier: float, config: DatasetScoringConfig, expand_weights: bool, ) -> int: if not config.enabled or not expand_weights or not config.weighted_repetition_enabled: return 1 base_repeat_count = _repeat_count_for_score(score, config) weighted = int(round(base_repeat_count * max(repeat_multiplier, 0.1))) return max(1, min(weighted, config.max_effective_repeat_count)) def _repeat_count_for_score(score: float, config: DatasetScoringConfig) -> int: if score >= config.high_score_threshold: return max(1, config.high_score_repeat_count) if score >= config.medium_score_threshold: return max(1, config.medium_score_repeat_count) return max(1, config.low_score_repeat_count) def _score_bucket(score: float) -> str: if score >= 0.8: return "high" if score >= 0.55: return "medium" return "low" def _record_terms(record: dict[str, Any]) -> set[str]: values: list[str] = [] for key in ("source", "source_type", "source_quality", "source_tier", "category", "language"): value = record.get(key) if isinstance(value, str): values.extend(value.casefold().replace("-", " ").replace("_", " ").split()) metadata = record.get("metadata") if isinstance(metadata, dict): for key in ("source", "source_type", "source_quality", "source_tier", "origin", "category"): value = metadata.get(key) if isinstance(value, str): values.extend(value.casefold().replace("-", " ").replace("_", " ").split()) tags = metadata.get("tags") if isinstance(tags, list): for item in tags: if isinstance(item, str): values.extend(item.casefold().replace("-", " ").replace("_", " ").split()) tags = record.get("tags") if isinstance(tags, list): for item in tags: if isinstance(item, str): values.extend(item.casefold().replace("-", " ").replace("_", " ").split()) return set(values) def _record_labels(record: dict[str, Any]) -> set[str]: labels: set[str] = set() for key in ("category", "branch_focus", "source", "language"): value = record.get(key) if isinstance(value, str) and clean_text(value): normalized = clean_text(value).casefold().replace("-", "_").replace(" ", "_") labels.add(normalized) labels.update(normalized.split("_")) metadata = record.get("metadata") if isinstance(metadata, dict): for key in ("category", "focus", "type", "language"): value = metadata.get(key) if isinstance(value, str) and clean_text(value): normalized = clean_text(value).casefold().replace("-", "_").replace(" ", "_") labels.add(normalized) labels.update(normalized.split("_")) tags = metadata.get("tags") if isinstance(tags, list): for item in tags: if isinstance(item, str) and clean_text(item): normalized = clean_text(item).casefold().replace("-", "_").replace(" ", "_") labels.add(normalized) labels.update(normalized.split("_")) tags = record.get("tags") if isinstance(tags, list): for item in tags: if isinstance(item, str) and clean_text(item): normalized = clean_text(item).casefold().replace("-", "_").replace(" ", "_") labels.add(normalized) labels.update(normalized.split("_")) return labels def _update_dashboard_bucket( dashboard: dict[str, dict[str, float]], label: str, *, score: float, repeat_count: int, repeat_multiplier: float, boosted: bool, ) -> None: """Uzkrāj dashboard metriku bucketam; `boosted` nozīmē benchmark feedback match.""" bucket = dashboard.setdefault( label, { "records": 0.0, "score_total": 0.0, "repeat_total": 0.0, "repeat_multiplier_total": 0.0, "boosted_records": 0.0, }, ) bucket["records"] += 1.0 bucket["score_total"] += score bucket["repeat_total"] += float(repeat_count) bucket["repeat_multiplier_total"] += repeat_multiplier if boosted: bucket["boosted_records"] += 1.0 def _finalize_dashboard(dashboard: dict[str, dict[str, float]]) -> dict[str, dict[str, float]]: finalized: dict[str, dict[str, float]] = {} for label, bucket in sorted(dashboard.items()): records = max(bucket.get("records", 0.0), 1.0) finalized[label] = { "records": int(bucket.get("records", 0.0)), "average_score": round(bucket.get("score_total", 0.0) / records, 4), "average_repeat_count": round(bucket.get("repeat_total", 0.0) / records, 4), "average_repeat_multiplier": round( bucket.get("repeat_multiplier_total", 0.0) / records, 4, ), "boosted_records": int(bucket.get("boosted_records", 0.0)), } return finalized def _normalize_label(value: str) -> str: return clean_text(value).casefold().replace("-", "_").replace(" ", "_")