"""Bootstrap dataset validācija.""" from __future__ import annotations import json from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Any DATASET_TYPES = ("conversation", "code", "image", "music", "video", "autonomous") _PROMPT_TYPES = {"code", "image", "music", "video", "autonomous"} _COMMON_REQUIRED_STRING_FIELDS = ("timestamp", "type", "source") _CONVERSATION_REQUIRED_STRING_FIELDS = ("session_id", "user", "assistant", "language") _VALIDATION_PROFILES = {"auto", "bootstrap", "eval"} _EVAL_REQUIRED_STRING_FIELDS = ( "task_id", "benchmark_version", "suite", "difficulty", "evaluation_mode", "risk_level", ) _EVAL_REQUIRED_STRING_LIST_FIELDS = ("expected_behavior", "scoring_hints") _EVAL_REFERENCE_REQUIRED_CATEGORIES = {"conversation", "code"} class DatasetValidationError(ValueError): """Bootstrap dataset satura validācijas kļūda.""" def __init__(self, issues: list[str]) -> None: self.issues = issues preview = "\n".join(f"- {issue}" for issue in issues[:20]) remaining = len(issues) - 20 if remaining > 0: preview = f"{preview}\n- ... un vēl {remaining} problēmas" super().__init__(f"Bootstrap dataset validācija neizdevās:\n{preview}") @dataclass(frozen=True) class DatasetValidationSummary: """Bootstrap dataset validācijas kopsavilkums.""" dataset_dir: Path files_checked: int total_records: int counts_by_category: dict[str, int] duplicate_count: int def validate_dataset_dir( dataset_dir: str | Path, *, profile: str = "auto" ) -> DatasetValidationSummary: """Validē lokālo bootstrap dataset direktoriju.""" root = Path(dataset_dir).expanduser().resolve() issues: list[str] = [] resolved_profile = _resolve_profile(root, profile) if not root.exists(): raise DatasetValidationError([f"Dataset direktorija nav atrasta: {root}"]) if not root.is_dir(): raise DatasetValidationError([f"Dataset ceļš nav direktorija: {root}"]) counts_by_category = {category: 0 for category in DATASET_TYPES} duplicate_origins: dict[tuple[str, str], str] = {} duplicate_count = 0 files_checked = 0 for file_path in sorted(root.rglob("*.jsonl")): if any(part.startswith(".") for part in file_path.relative_to(root).parts): continue if "hf_cache" in file_path.parts: continue try: relative_path = file_path.relative_to(root) except ValueError: relative_path = file_path if len(relative_path.parts) < 2: issues.append(f"{relative_path}: JSONL fails nav zem data tipa mapes.") continue category = relative_path.parts[0] if category not in counts_by_category: issues.append( f"{relative_path}: neatbalstīta dataset kategorija '{category}'. " f"Atļautās: {', '.join(DATASET_TYPES)}." ) continue files_checked += 1 with file_path.open(encoding="utf-8") as handle: for line_number, raw_line in enumerate(handle, start=1): stripped = raw_line.strip() if not stripped: continue location = f"{relative_path}:{line_number}" try: record = json.loads(stripped) except json.JSONDecodeError as exc: issues.append(f"{location}: nederīgs JSON ({exc.msg}).") continue if not isinstance(record, dict): issues.append(f"{location}: ierakstam jābūt JSON objektam.") continue counts_by_category[category] += 1 issues.extend( _validate_record(record, category, location, profile=resolved_profile) ) signature = _record_signature(record, category) if signature is None: continue key = (category, signature) first_location = duplicate_origins.get(key) if first_location is None: duplicate_origins[key] = location continue duplicate_count += 1 issues.append( f"{location}: dublikāts salīdzinājumā ar {first_location} " f"kategorijā '{category}'." ) if files_checked == 0: issues.append(f"{root}: nav atrasts neviens .jsonl bootstrap datu fails.") if issues: raise DatasetValidationError(issues) return DatasetValidationSummary( dataset_dir=root, files_checked=files_checked, total_records=sum(counts_by_category.values()), counts_by_category=counts_by_category, duplicate_count=duplicate_count, ) def format_summary(summary: DatasetValidationSummary) -> str: """Atgriež īsu cilvēkam lasāmu validācijas kopsavilkumu.""" category_counts = ", ".join( f"{category}={count}" for category, count in summary.counts_by_category.items() ) return ( f"Dataset validācija veiksmīga: files={summary.files_checked}, " f"records={summary.total_records}, duplicates={summary.duplicate_count}; " f"{category_counts}" ) def _validate_record( record: dict[str, Any], category: str, location: str, *, profile: str ) -> list[str]: issues: list[str] = [] for field_name in _COMMON_REQUIRED_STRING_FIELDS: value = record.get(field_name) if not _is_non_empty_string(value): issues.append(f"{location}: trūkst ne-tukša lauka '{field_name}'.") timestamp = record.get("timestamp") if isinstance(timestamp, str) and timestamp.strip() and not _is_iso8601_timestamp(timestamp): issues.append(f"{location}: lauks 'timestamp' nav ISO-8601 datums ar laika zonu.") record_type = record.get("type") if isinstance(record_type, str) and record_type != category: issues.append( f"{location}: lauks 'type' ir '{record_type}', bet faila kategorijai jābūt '{category}'." ) if category == "conversation": for field_name in _CONVERSATION_REQUIRED_STRING_FIELDS: value = record.get(field_name) if not _is_non_empty_string(value): issues.append(f"{location}: conversation ierakstam trūkst '{field_name}'.") if profile == "eval": issues.extend(_validate_eval_record(record, category, location)) return issues if category in _PROMPT_TYPES: if not _is_non_empty_string(record.get("prompt")): issues.append(f"{location}: ierakstam trūkst ne-tukša lauka 'prompt'.") metadata = record.get("metadata") if not isinstance(metadata, dict) or not metadata: issues.append(f"{location}: ierakstam vajag ne-tukšu objektu laukā 'metadata'.") if profile == "eval": issues.extend(_validate_eval_record(record, category, location)) return issues def _is_non_empty_string(value: Any) -> bool: return isinstance(value, str) and bool(value.strip()) def _is_non_empty_string_list(value: Any) -> bool: return ( isinstance(value, list) and bool(value) and all(_is_non_empty_string(item) for item in value) ) def _is_iso8601_timestamp(value: str) -> bool: normalized = value.strip() if normalized.endswith("Z"): normalized = f"{normalized[:-1]}+00:00" try: parsed = datetime.fromisoformat(normalized) except ValueError: return False return parsed.tzinfo is not None def _record_signature(record: dict[str, Any], category: str) -> str | None: if category == "conversation": user = record.get("user") assistant = record.get("assistant") if not (_is_non_empty_string(user) and _is_non_empty_string(assistant)): return None return "|".join((_normalize_text(user), _normalize_text(assistant))) prompt = record.get("prompt") if not _is_non_empty_string(prompt): return None return _normalize_text(prompt) def _normalize_text(value: str) -> str: return " ".join(value.casefold().split()) def _resolve_profile(root: Path, profile: str) -> str: normalized = profile.strip().lower() if normalized not in _VALIDATION_PROFILES: allowed = ", ".join(sorted(_VALIDATION_PROFILES)) raise DatasetValidationError( [f"Neatbalstīts validācijas profils '{profile}'. Atļautie: {allowed}."] ) if normalized != "auto": return normalized return "eval" if root.name == "eval-data" else "bootstrap" def _validate_eval_record(record: dict[str, Any], category: str, location: str) -> list[str]: issues: list[str] = [] for field_name in _EVAL_REQUIRED_STRING_FIELDS: if not _is_non_empty_string(record.get(field_name)): issues.append(f"{location}: eval ierakstam trūkst ne-tukša lauka '{field_name}'.") for field_name in _EVAL_REQUIRED_STRING_LIST_FIELDS: if not _is_non_empty_string_list(record.get(field_name)): issues.append( f"{location}: eval ierakstam vajag ne-tukšu string sarakstu laukā '{field_name}'." ) if category in _EVAL_REFERENCE_REQUIRED_CATEGORIES: if not _is_non_empty_string(record.get("reference_answer")): issues.append(f"{location}: {category} eval ierakstam trūkst 'reference_answer'.") if not _is_non_empty_string_list(record.get("acceptance_criteria")): issues.append( f"{location}: {category} eval ierakstam vajag ne-tukšu string sarakstu laukā " "'acceptance_criteria'." ) return issues