Spaces:
Running
Running
File size: 4,593 Bytes
ebb8326 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
"""Data loading utilities for test questions."""
import csv
import json
from pathlib import Path
from src.data_processing.models import QuestionInput
# Standard column mappings for choice columns
_CHOICE_COLUMN_MAPPINGS = {
"choice_a": 0, "choice_b": 1, "choice_c": 2, "choice_d": 3,
"option_a": 0, "option_b": 1, "option_c": 2, "option_d": 3,
"a": 0, "b": 1, "c": 2, "d": 3,
}
def load_test_data_from_json(file_path: Path) -> list[QuestionInput]:
"""Load test questions from JSON file.
Expected format: List of dicts with qid, question, choices, answer (optional)
Args:
file_path: Path to JSON file
Returns:
List of QuestionInput objects
Raises:
FileNotFoundError: If file doesn't exist
ValueError: If file format is invalid
"""
if not file_path.exists():
raise FileNotFoundError(f"Test data file not found: {file_path}")
if file_path.suffix.lower() != ".json":
raise ValueError(f"Only JSON files are supported: {file_path}")
with open(file_path, encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, list):
raise ValueError(f"JSON file must contain a list of questions: {file_path}")
questions = []
for item in data:
if "choices" not in item or not isinstance(item["choices"], list):
raise ValueError(f"Question {item.get('qid', 'unknown')} must have 'choices' as a list")
questions.append(QuestionInput(
qid=item["qid"],
question=item["question"],
choices=item["choices"],
answer=item.get("answer"),
))
return questions
def _normalize_row_keys(row: dict[str, str]) -> dict[str, str]:
"""Normalize row keys to lowercase and strip whitespace."""
return {k.lower().strip(): v for k, v in row.items()}
def _extract_choices_from_row(row: dict[str, str]) -> list[str]:
"""Extract choices from a normalized CSV row.
Tries multiple strategies:
1. Individual choice columns (choice_a/option_a/a, etc.)
2. JSON array in 'choices' column
3. Comma/semicolon separated string in 'choices' column
Args:
row: Normalized row dict with lowercase keys
Returns:
List of choice strings (may contain empty strings)
"""
# Strategy 1: Individual columns (choice_a, option_a, a, etc.)
choices = ["", "", "", ""]
found_individual = False
for col_name, idx in _CHOICE_COLUMN_MAPPINGS.items():
if col_name in row and row[col_name]:
choices[idx] = row[col_name].strip()
found_individual = True
if found_individual:
return [c for c in choices if c]
# Strategy 2 & 3: Parse 'choices' column
choices_raw = row.get("choices", "")
if not choices_raw:
return []
# Try JSON parse first
try:
parsed = json.loads(choices_raw)
if isinstance(parsed, list):
return [str(c).strip() for c in parsed if str(c).strip()]
except (json.JSONDecodeError, TypeError):
pass
# Fallback: split by comma or semicolon
return [c.strip() for c in choices_raw.replace(";", ",").split(",") if c.strip()]
def load_test_data_from_csv(file_path: Path) -> list[QuestionInput]:
"""Load test questions from CSV file.
Supports multiple CSV formats:
- Columns: qid, question, choice_a, choice_b, choice_c, choice_d
- Columns: qid, question, option_a, option_b, option_c, option_d
- Columns: qid, question, A, B, C, D
- Columns: qid, question, choices (JSON array or comma-separated)
Args:
file_path: Path to CSV file
Returns:
List of QuestionInput objects
Raises:
FileNotFoundError: If file doesn't exist
"""
if not file_path.exists():
raise FileNotFoundError(f"Test data file not found: {file_path}")
questions = []
with open(file_path, encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
norm_row = _normalize_row_keys(row)
qid = norm_row.get("qid", "").strip()
question = norm_row.get("question", "").strip()
if not qid or not question:
continue
choices = _extract_choices_from_row(norm_row)
if not choices:
choices = ["", "", "", ""]
questions.append(QuestionInput(
qid=qid,
question=question,
choices=choices,
answer=norm_row.get("answer", "").strip() or None,
))
return questions
|