quanho114
Deploy VietQA API
ebb8326
"""Data loading utilities for test questions."""
import csv
import json
from pathlib import Path
from src.data_processing.models import QuestionInput
# Standard column mappings for choice columns
_CHOICE_COLUMN_MAPPINGS = {
"choice_a": 0, "choice_b": 1, "choice_c": 2, "choice_d": 3,
"option_a": 0, "option_b": 1, "option_c": 2, "option_d": 3,
"a": 0, "b": 1, "c": 2, "d": 3,
}
def load_test_data_from_json(file_path: Path) -> list[QuestionInput]:
"""Load test questions from JSON file.
Expected format: List of dicts with qid, question, choices, answer (optional)
Args:
file_path: Path to JSON file
Returns:
List of QuestionInput objects
Raises:
FileNotFoundError: If file doesn't exist
ValueError: If file format is invalid
"""
if not file_path.exists():
raise FileNotFoundError(f"Test data file not found: {file_path}")
if file_path.suffix.lower() != ".json":
raise ValueError(f"Only JSON files are supported: {file_path}")
with open(file_path, encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, list):
raise ValueError(f"JSON file must contain a list of questions: {file_path}")
questions = []
for item in data:
if "choices" not in item or not isinstance(item["choices"], list):
raise ValueError(f"Question {item.get('qid', 'unknown')} must have 'choices' as a list")
questions.append(QuestionInput(
qid=item["qid"],
question=item["question"],
choices=item["choices"],
answer=item.get("answer"),
))
return questions
def _normalize_row_keys(row: dict[str, str]) -> dict[str, str]:
"""Normalize row keys to lowercase and strip whitespace."""
return {k.lower().strip(): v for k, v in row.items()}
def _extract_choices_from_row(row: dict[str, str]) -> list[str]:
"""Extract choices from a normalized CSV row.
Tries multiple strategies:
1. Individual choice columns (choice_a/option_a/a, etc.)
2. JSON array in 'choices' column
3. Comma/semicolon separated string in 'choices' column
Args:
row: Normalized row dict with lowercase keys
Returns:
List of choice strings (may contain empty strings)
"""
# Strategy 1: Individual columns (choice_a, option_a, a, etc.)
choices = ["", "", "", ""]
found_individual = False
for col_name, idx in _CHOICE_COLUMN_MAPPINGS.items():
if col_name in row and row[col_name]:
choices[idx] = row[col_name].strip()
found_individual = True
if found_individual:
return [c for c in choices if c]
# Strategy 2 & 3: Parse 'choices' column
choices_raw = row.get("choices", "")
if not choices_raw:
return []
# Try JSON parse first
try:
parsed = json.loads(choices_raw)
if isinstance(parsed, list):
return [str(c).strip() for c in parsed if str(c).strip()]
except (json.JSONDecodeError, TypeError):
pass
# Fallback: split by comma or semicolon
return [c.strip() for c in choices_raw.replace(";", ",").split(",") if c.strip()]
def load_test_data_from_csv(file_path: Path) -> list[QuestionInput]:
"""Load test questions from CSV file.
Supports multiple CSV formats:
- Columns: qid, question, choice_a, choice_b, choice_c, choice_d
- Columns: qid, question, option_a, option_b, option_c, option_d
- Columns: qid, question, A, B, C, D
- Columns: qid, question, choices (JSON array or comma-separated)
Args:
file_path: Path to CSV file
Returns:
List of QuestionInput objects
Raises:
FileNotFoundError: If file doesn't exist
"""
if not file_path.exists():
raise FileNotFoundError(f"Test data file not found: {file_path}")
questions = []
with open(file_path, encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
norm_row = _normalize_row_keys(row)
qid = norm_row.get("qid", "").strip()
question = norm_row.get("question", "").strip()
if not qid or not question:
continue
choices = _extract_choices_from_row(norm_row)
if not choices:
choices = ["", "", "", ""]
questions.append(QuestionInput(
qid=qid,
question=question,
choices=choices,
answer=norm_row.get("answer", "").strip() or None,
))
return questions