File size: 5,066 Bytes
f9306c2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3e67073
f9306c2
 
 
 
 
 
 
 
45ade5d
 
 
 
 
 
 
 
 
 
f9306c2
45ade5d
 
 
 
 
 
 
 
 
f9306c2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3e67073
 
 
 
 
 
f9306c2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
"""Dataset loading helpers for MathVision-like JSONL exports."""

from __future__ import annotations

import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any


@dataclass(frozen=True, slots=True)
class MathVisionRecord:
    """A single visual math problem with optional image and solution metadata."""

    problem_id: str
    question: str
    answer: str
    subject: str | None = None
    level: int | None = None
    problem_type: str | None = None
    image_path: Path | None = None
    options: tuple[str, ...] = ()
    solution: str | None = None


def load_jsonl_records(path: Path) -> list[MathVisionRecord]:
    """Load MathVision-like records from a UTF-8 JSONL file."""

    return load_jsonl_records_from_text(path.read_text(encoding="utf-8"), source_dir=path.parent)


def load_jsonl_records_from_text(
    content: str,
    *,
    source_dir: Path | None = None,
) -> list[MathVisionRecord]:
    """Load MathVision-like records from UTF-8 JSONL text."""

    records: list[MathVisionRecord] = []
    for line_number, line in enumerate(content.splitlines(), start=1):
        stripped = line.strip()
        if not stripped:
            continue
        payload = json.loads(stripped)
        if not isinstance(payload, dict):
            msg = f"Line {line_number} must contain a JSON object."
            raise ValueError(msg)
        records.append(record_from_mapping(payload, source_dir=source_dir))
    return records


def record_from_mapping(
    payload: dict[str, Any], *, source_dir: Path | None = None
) -> MathVisionRecord:
    """Create a typed record from a raw dictionary."""

    problem_id = _required_string(payload, "id", fallback_key="problem_id")
    question = _required_string(payload, "question")
    answer = _required_string(payload, "answer")
    image_path = _optional_path(payload.get("image"), source_dir=source_dir)
    options = _options_from_value(payload.get("options"))

    return MathVisionRecord(
        problem_id=problem_id,
        question=question,
        answer=answer,
        subject=_optional_string(payload.get("subject")),
        level=_optional_int(payload.get("level")),
        problem_type=_optional_string(
            payload.get("problem_type")
            or payload.get("problemKind")
            or payload.get("type")
            or payload.get("task")
        ),
        image_path=image_path,
        options=options,
        solution=_optional_string(payload.get("solution")),
    )


def filter_records(
    records: list[MathVisionRecord],
    *,
    subject: str | None = None,
    level: int | None = None,
) -> list[MathVisionRecord]:
    """Return records matching optional subject and level filters."""

    return [
        record
        for record in records
        if (subject is None or record.subject == subject)
        and (level is None or record.level == level)
    ]


def summarize_records(records: list[MathVisionRecord]) -> dict[str, object]:
    """Build a compact summary for CLI output or dashboards."""

    subjects = sorted({record.subject for record in records if record.subject is not None})
    levels = sorted({record.level for record in records if record.level is not None})
    image_count = sum(1 for record in records if record.image_path is not None)
    return {
        "records": len(records),
        "images": image_count,
        "subjects": subjects,
        "levels": levels,
    }


def _required_string(
    payload: dict[str, Any], key: str, *, fallback_key: str | None = None
) -> str:
    value = payload.get(key)
    if value is None and fallback_key is not None:
        value = payload.get(fallback_key)
    if not isinstance(value, str) or not value.strip():
        msg = f"Missing required string field: {key}"
        raise ValueError(msg)
    return value


def _optional_string(value: object) -> str | None:
    if value is None:
        return None
    if not isinstance(value, str):
        msg = "Optional text fields must be strings when present."
        raise ValueError(msg)
    return value


def _optional_int(value: object) -> int | None:
    if value is None:
        return None
    if isinstance(value, bool) or not isinstance(value, int):
        msg = "Level must be an integer when present."
        raise ValueError(msg)
    return value


def _optional_path(value: object, *, source_dir: Path | None) -> Path | None:
    if value is None:
        return None
    if not isinstance(value, str) or not value:
        msg = "Image path must be a non-empty string when present."
        raise ValueError(msg)
    image_path = Path(value)
    if source_dir is not None and not image_path.is_absolute():
        return source_dir / image_path
    return image_path


def _options_from_value(value: object) -> tuple[str, ...]:
    if value is None:
        return ()
    if not isinstance(value, list) or not all(isinstance(option, str) for option in value):
        msg = "Options must be a list of strings when present."
        raise ValueError(msg)
    return tuple(value)