| """Bootstrap dataset validācija.""" |
|
|
| from __future__ import annotations |
|
|
| import json |
| from dataclasses import dataclass |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Any |
|
|
| DATASET_TYPES = ("conversation", "code", "image", "music", "video", "autonomous") |
| _PROMPT_TYPES = {"code", "image", "music", "video", "autonomous"} |
| _COMMON_REQUIRED_STRING_FIELDS = ("timestamp", "type", "source") |
| _CONVERSATION_REQUIRED_STRING_FIELDS = ("session_id", "user", "assistant", "language") |
| _VALIDATION_PROFILES = {"auto", "bootstrap", "eval"} |
| _EVAL_REQUIRED_STRING_FIELDS = ( |
| "task_id", |
| "benchmark_version", |
| "suite", |
| "difficulty", |
| "evaluation_mode", |
| "risk_level", |
| ) |
| _EVAL_REQUIRED_STRING_LIST_FIELDS = ("expected_behavior", "scoring_hints") |
| _EVAL_REFERENCE_REQUIRED_CATEGORIES = {"conversation", "code"} |
|
|
|
|
| class DatasetValidationError(ValueError): |
| """Bootstrap dataset satura validācijas kļūda.""" |
|
|
| def __init__(self, issues: list[str]) -> None: |
| self.issues = issues |
| preview = "\n".join(f"- {issue}" for issue in issues[:20]) |
| remaining = len(issues) - 20 |
| if remaining > 0: |
| preview = f"{preview}\n- ... un vēl {remaining} problēmas" |
| super().__init__(f"Bootstrap dataset validācija neizdevās:\n{preview}") |
|
|
|
|
| @dataclass(frozen=True) |
| class DatasetValidationSummary: |
| """Bootstrap dataset validācijas kopsavilkums.""" |
|
|
| dataset_dir: Path |
| files_checked: int |
| total_records: int |
| counts_by_category: dict[str, int] |
| duplicate_count: int |
|
|
|
|
| def validate_dataset_dir( |
| dataset_dir: str | Path, *, profile: str = "auto" |
| ) -> DatasetValidationSummary: |
| """Validē lokālo bootstrap dataset direktoriju.""" |
| root = Path(dataset_dir).expanduser().resolve() |
| issues: list[str] = [] |
|
|
| resolved_profile = _resolve_profile(root, profile) |
|
|
| if not root.exists(): |
| raise DatasetValidationError([f"Dataset direktorija nav atrasta: {root}"]) |
| if not root.is_dir(): |
| raise DatasetValidationError([f"Dataset ceļš nav direktorija: {root}"]) |
|
|
| counts_by_category = {category: 0 for category in DATASET_TYPES} |
| duplicate_origins: dict[tuple[str, str], str] = {} |
| duplicate_count = 0 |
| files_checked = 0 |
|
|
| for file_path in sorted(root.rglob("*.jsonl")): |
| if any(part.startswith(".") for part in file_path.relative_to(root).parts): |
| continue |
| if "hf_cache" in file_path.parts: |
| continue |
|
|
| try: |
| relative_path = file_path.relative_to(root) |
| except ValueError: |
| relative_path = file_path |
|
|
| if len(relative_path.parts) < 2: |
| issues.append(f"{relative_path}: JSONL fails nav zem data tipa mapes.") |
| continue |
|
|
| category = relative_path.parts[0] |
| if category not in counts_by_category: |
| issues.append( |
| f"{relative_path}: neatbalstīta dataset kategorija '{category}'. " |
| f"Atļautās: {', '.join(DATASET_TYPES)}." |
| ) |
| continue |
|
|
| files_checked += 1 |
| with file_path.open(encoding="utf-8") as handle: |
| for line_number, raw_line in enumerate(handle, start=1): |
| stripped = raw_line.strip() |
| if not stripped: |
| continue |
|
|
| location = f"{relative_path}:{line_number}" |
| try: |
| record = json.loads(stripped) |
| except json.JSONDecodeError as exc: |
| issues.append(f"{location}: nederīgs JSON ({exc.msg}).") |
| continue |
|
|
| if not isinstance(record, dict): |
| issues.append(f"{location}: ierakstam jābūt JSON objektam.") |
| continue |
|
|
| counts_by_category[category] += 1 |
| issues.extend( |
| _validate_record(record, category, location, profile=resolved_profile) |
| ) |
|
|
| signature = _record_signature(record, category) |
| if signature is None: |
| continue |
| key = (category, signature) |
| first_location = duplicate_origins.get(key) |
| if first_location is None: |
| duplicate_origins[key] = location |
| continue |
| duplicate_count += 1 |
| issues.append( |
| f"{location}: dublikāts salīdzinājumā ar {first_location} " |
| f"kategorijā '{category}'." |
| ) |
|
|
| if files_checked == 0: |
| issues.append(f"{root}: nav atrasts neviens .jsonl bootstrap datu fails.") |
|
|
| if issues: |
| raise DatasetValidationError(issues) |
|
|
| return DatasetValidationSummary( |
| dataset_dir=root, |
| files_checked=files_checked, |
| total_records=sum(counts_by_category.values()), |
| counts_by_category=counts_by_category, |
| duplicate_count=duplicate_count, |
| ) |
|
|
|
|
| def format_summary(summary: DatasetValidationSummary) -> str: |
| """Atgriež īsu cilvēkam lasāmu validācijas kopsavilkumu.""" |
| category_counts = ", ".join( |
| f"{category}={count}" for category, count in summary.counts_by_category.items() |
| ) |
| return ( |
| f"Dataset validācija veiksmīga: files={summary.files_checked}, " |
| f"records={summary.total_records}, duplicates={summary.duplicate_count}; " |
| f"{category_counts}" |
| ) |
|
|
|
|
| def _validate_record( |
| record: dict[str, Any], category: str, location: str, *, profile: str |
| ) -> list[str]: |
| issues: list[str] = [] |
|
|
| for field_name in _COMMON_REQUIRED_STRING_FIELDS: |
| value = record.get(field_name) |
| if not _is_non_empty_string(value): |
| issues.append(f"{location}: trūkst ne-tukša lauka '{field_name}'.") |
|
|
| timestamp = record.get("timestamp") |
| if isinstance(timestamp, str) and timestamp.strip() and not _is_iso8601_timestamp(timestamp): |
| issues.append(f"{location}: lauks 'timestamp' nav ISO-8601 datums ar laika zonu.") |
|
|
| record_type = record.get("type") |
| if isinstance(record_type, str) and record_type != category: |
| issues.append( |
| f"{location}: lauks 'type' ir '{record_type}', bet faila kategorijai jābūt '{category}'." |
| ) |
|
|
| if category == "conversation": |
| for field_name in _CONVERSATION_REQUIRED_STRING_FIELDS: |
| value = record.get(field_name) |
| if not _is_non_empty_string(value): |
| issues.append(f"{location}: conversation ierakstam trūkst '{field_name}'.") |
| if profile == "eval": |
| issues.extend(_validate_eval_record(record, category, location)) |
| return issues |
|
|
| if category in _PROMPT_TYPES: |
| if not _is_non_empty_string(record.get("prompt")): |
| issues.append(f"{location}: ierakstam trūkst ne-tukša lauka 'prompt'.") |
| metadata = record.get("metadata") |
| if not isinstance(metadata, dict) or not metadata: |
| issues.append(f"{location}: ierakstam vajag ne-tukšu objektu laukā 'metadata'.") |
|
|
| if profile == "eval": |
| issues.extend(_validate_eval_record(record, category, location)) |
|
|
| return issues |
|
|
|
|
| def _is_non_empty_string(value: Any) -> bool: |
| return isinstance(value, str) and bool(value.strip()) |
|
|
|
|
| def _is_non_empty_string_list(value: Any) -> bool: |
| return ( |
| isinstance(value, list) |
| and bool(value) |
| and all(_is_non_empty_string(item) for item in value) |
| ) |
|
|
|
|
| def _is_iso8601_timestamp(value: str) -> bool: |
| normalized = value.strip() |
| if normalized.endswith("Z"): |
| normalized = f"{normalized[:-1]}+00:00" |
| try: |
| parsed = datetime.fromisoformat(normalized) |
| except ValueError: |
| return False |
| return parsed.tzinfo is not None |
|
|
|
|
| def _record_signature(record: dict[str, Any], category: str) -> str | None: |
| if category == "conversation": |
| user = record.get("user") |
| assistant = record.get("assistant") |
| if not (_is_non_empty_string(user) and _is_non_empty_string(assistant)): |
| return None |
| return "|".join((_normalize_text(user), _normalize_text(assistant))) |
|
|
| prompt = record.get("prompt") |
| if not _is_non_empty_string(prompt): |
| return None |
| return _normalize_text(prompt) |
|
|
|
|
| def _normalize_text(value: str) -> str: |
| return " ".join(value.casefold().split()) |
|
|
|
|
| def _resolve_profile(root: Path, profile: str) -> str: |
| normalized = profile.strip().lower() |
| if normalized not in _VALIDATION_PROFILES: |
| allowed = ", ".join(sorted(_VALIDATION_PROFILES)) |
| raise DatasetValidationError( |
| [f"Neatbalstīts validācijas profils '{profile}'. Atļautie: {allowed}."] |
| ) |
| if normalized != "auto": |
| return normalized |
| return "eval" if root.name == "eval-data" else "bootstrap" |
|
|
|
|
| def _validate_eval_record(record: dict[str, Any], category: str, location: str) -> list[str]: |
| issues: list[str] = [] |
|
|
| for field_name in _EVAL_REQUIRED_STRING_FIELDS: |
| if not _is_non_empty_string(record.get(field_name)): |
| issues.append(f"{location}: eval ierakstam trūkst ne-tukša lauka '{field_name}'.") |
|
|
| for field_name in _EVAL_REQUIRED_STRING_LIST_FIELDS: |
| if not _is_non_empty_string_list(record.get(field_name)): |
| issues.append( |
| f"{location}: eval ierakstam vajag ne-tukšu string sarakstu laukā '{field_name}'." |
| ) |
|
|
| if category in _EVAL_REFERENCE_REQUIRED_CATEGORIES: |
| if not _is_non_empty_string(record.get("reference_answer")): |
| issues.append(f"{location}: {category} eval ierakstam trūkst 'reference_answer'.") |
| if not _is_non_empty_string_list(record.get("acceptance_criteria")): |
| issues.append( |
| f"{location}: {category} eval ierakstam vajag ne-tukšu string sarakstu laukā " |
| "'acceptance_criteria'." |
| ) |
|
|
| return issues |
|
|