FoundationalASSIST / Code /process_to_single_file.py
martinakaduc's picture
Upload folder using huggingface_hub
6256eb9 verified
#!/usr/bin/env python3
"""
Convert FoundationalASSIST CSV files to the CSEDM/OEKT JSON format.
Inputs (under Data/ by default):
- Interactions.csv
- Problems.csv
- Skill_Set.csv
- Skills.csv
Outputs (under src/data/FoundationalASSIST/ by default):
- dataset.json
- qmatrix.json
- trainset.json
- validset.json
- testset.json
The produced dataset JSON follows the same schema used by src/data/CSEDM.
"""
from __future__ import annotations
import argparse
import json
import random
import re
from pathlib import Path
from typing import Literal, cast
import pandas as pd
from tqdm import tqdm
from clean_utils import clean_problem_body
PROJECT_ROOT = Path(__file__).resolve().parents[3]
DEFAULT_DATA_DIR = Path(__file__).resolve().parent.parent / "Data"
DEFAULT_OUTPUT_DIR = PROJECT_ROOT / "src" / "data" / "FoundationalASSIST"
GroupingMode = Literal["none", "1h", "halfday", "day", "week", "month", "year"]
def parse_grouping_mode(value: str) -> GroupingMode:
"""Normalize grouping mode aliases used by --grouping-time."""
normalized = value.strip().lower()
aliases: dict[str, GroupingMode] = {
"0": "none",
"0.0": "none",
"none": "none",
"off": "none",
"no": "none",
"1h": "1h",
"hour": "1h",
"halfday": "halfday",
"half-day": "halfday",
"day": "day",
"week": "week",
"month": "month",
"year": "year",
}
mode = aliases.get(normalized)
if mode is None:
valid_values = "1h, halfday, day, week, month, year, none"
raise argparse.ArgumentTypeError(
f"Invalid grouping mode '{value}'. Valid values: {valid_values}."
)
return mode
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Convert FoundationalASSIST to CSEDM/OEKT JSON format."
)
parser.add_argument(
"--data-dir",
type=Path,
default=DEFAULT_DATA_DIR,
help="Directory containing Interactions.csv, Problems.csv, Skills.csv.",
)
parser.add_argument(
"--output-dir",
type=Path,
default=DEFAULT_OUTPUT_DIR,
help="Directory to write dataset.json/qmatrix.json/split files.",
)
parser.add_argument(
"--seed",
type=int,
default=42,
help="Random seed used for train/valid/test student split.",
)
parser.add_argument(
"--train-ratio",
type=float,
default=0.8,
help="Fraction of students in train split.",
)
parser.add_argument(
"--valid-ratio",
type=float,
default=0.1,
help="Fraction of students in valid split.",
)
parser.add_argument(
"--test-ratio",
type=float,
default=0.1,
help="Fraction of students in test split.",
)
parser.add_argument(
"--max-interactions",
type=int,
default=None,
help=(
"Optional cap on number of interaction rows after sorting. "
"Useful for quick smoke tests."
),
)
parser.add_argument(
"--grouping-time",
type=parse_grouping_mode,
default="none",
help=(
"Calendar grouping mode per student: 1h, halfday, day, week, "
"month, year, or none."
),
)
return parser.parse_args()
def _text(v: object) -> str:
if v is None:
return ""
if v is pd.NA:
return ""
if isinstance(v, float) and pd.isna(v):
return ""
return str(v)
def _as_int(v: object) -> int:
return int(float(_text(v)))
def _as_float(v: object) -> float:
return float(_text(v))
def label_answer_options(answer_string: object) -> dict[str, str] | None:
"""Convert pipe-delimited answers to lettered format."""
answer_text = _text(answer_string).strip()
if not answer_text:
return None
options = [opt.strip() for opt in answer_text.split("||")]
letters = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J"]
return {letters[i]: opt for i, opt in enumerate(options) if i < len(letters)}
def clean_html_and_normalize(text: object) -> str:
"""Remove HTML tags and normalize text for reliable comparisons."""
normalized = _text(text)
if not normalized:
return ""
normalized = re.sub(r"<[^>]+>", "", normalized)
normalized = " ".join(normalized.split())
normalized = re.sub(r"\s*:\s*", ":", normalized)
return normalized.strip()
def match_student_answer_to_letters(
student_answer_text: object,
answer_options_dict: dict[str, str] | None,
) -> str:
"""Map student multiple-choice answer text(s) to letter labels."""
answer_text = _text(student_answer_text)
if not answer_text or not answer_options_dict:
return answer_text
student_answers = [ans.strip() for ans in answer_text.split(" , ")]
normalized_options = {
letter: clean_html_and_normalize(text)
for letter, text in answer_options_dict.items()
}
matched_letters: list[str] = []
for student_ans in student_answers:
normalized_student = clean_html_and_normalize(student_ans)
for letter, normalized_option in normalized_options.items():
if normalized_student == normalized_option:
matched_letters.append(letter)
break
else:
for letter, normalized_option in normalized_options.items():
if (
normalized_student in normalized_option
or normalized_option in normalized_student
):
matched_letters.append(letter)
break
if matched_letters:
return ", ".join(sorted(set(matched_letters)))
return answer_text
def get_correct_option_letters(
answer_options: dict[str, str] | None,
correct_answers: object,
) -> str:
"""Resolve the correct answer text(s) to option letters for MC items."""
correct_answer_text = _text(correct_answers).strip()
if not answer_options or not correct_answer_text:
return correct_answer_text
correct_list = [ans.strip() for ans in correct_answer_text.split("||")]
correct_letters = [
letter for letter, text in answer_options.items() if text in correct_list
]
return (
", ".join(sorted(correct_letters)) if correct_letters else correct_answer_text
)
def format_answer_options_for_prompt(answer_options: dict[str, str] | None) -> str:
"""Format answer options dictionary for human-readable prompt text."""
if not answer_options:
return ""
return "\n".join([f"{letter}) {text}" for letter, text in answer_options.items()])
def load_and_preprocess_problems(problems_path: Path) -> pd.DataFrame:
"""Load and preprocess problems with the same answer handling as KT inference."""
problems_df = pd.read_csv(problems_path, low_memory=False)
problems_df["problem_id"] = pd.to_numeric(
problems_df["problem_id"], errors="coerce"
)
problems_df = problems_df.dropna(subset=["problem_id"]).copy()
problems_df["problem_id"] = problems_df["problem_id"].astype(int)
problems_df = problems_df.sort_values(["problem_id"]).drop_duplicates(
subset=["problem_id"], keep="first"
)
problems_df["cleaned body"] = problems_df["Problem Body"].apply(clean_problem_body)
problems_df["answer_options"] = problems_df["Multiple Choice Options"].apply(
label_answer_options
)
mc_types = {"Multiple Choice (select 1)", "Multiple Choice (select all)"}
problems_df["correct_answers"] = problems_df.apply(
lambda row: (
get_correct_option_letters(
row["answer_options"],
row["Multiple Choice Answers"],
)
if _text(row["Problem Type"]).strip() in mc_types
else _text(row.get("Fill-in Answers", ""))
),
axis=1,
)
problems_df["answer_options_formatted"] = problems_df["answer_options"].apply(
format_answer_options_for_prompt
)
return problems_df
def load_skill_tables(
skills_path: Path,
skill_set_path: Path,
) -> tuple[list[dict], dict[int, list[int]], int]:
"""Load skills and build a problem_id -> skill_ids mapping.
Returns:
skills: OEKT skill list.
problem_to_skills: Mapping from original problem_id to contiguous skill IDs.
fallback_skill_id: Skill ID for untagged problems.
"""
usecols = ["problem_id", "node_code", "node_name"]
skills_df = pd.read_csv(skills_path, usecols=usecols, low_memory=False)
skills_df["problem_id"] = pd.to_numeric(skills_df["problem_id"], errors="coerce")
skills_df = skills_df.dropna(subset=["problem_id"]).copy()
skills_df["problem_id"] = skills_df["problem_id"].astype(int)
skills_df["node_code"] = skills_df["node_code"].apply(lambda v: _text(v).strip())
skills_df["node_name"] = skills_df["node_name"].apply(lambda v: _text(v).strip())
skills_df = skills_df[skills_df["node_code"] != ""].copy()
skill_set_df = pd.read_csv(
skill_set_path,
usecols=["index", "skill_code", "full_description"],
low_memory=False,
)
skill_set_df["index"] = pd.to_numeric(skill_set_df["index"], errors="coerce")
skill_set_df = skill_set_df.dropna(subset=["index"]).copy()
skill_set_df["index"] = skill_set_df["index"].astype(int)
skill_set_df["skill_code"] = skill_set_df["skill_code"].apply(
lambda v: _text(v).strip()
)
skill_set_df["full_description"] = skill_set_df["full_description"].apply(
lambda v: _text(v).strip()
)
skill_set_df = skill_set_df[skill_set_df["skill_code"] != ""].copy()
skill_set_df = (
skill_set_df.sort_values(["index", "skill_code"])
.drop_duplicates(subset=["skill_code"], keep="first")
.copy()
)
node_name_by_code = (
skills_df.sort_values(["node_code", "node_name"])
.drop_duplicates(subset=["node_code"], keep="first")
.set_index("node_code")["node_name"]
.to_dict()
)
skill_rows: list[tuple[str, int, str, str]] = []
skill_id_map: dict[str, int] = {}
for row in skill_set_df.itertuples(index=False):
node_code = _text(row.skill_code).strip()
skill_id = _as_int(row.index) - 1
skill_id_map[node_code] = skill_id
node_name = _text(node_name_by_code.get(node_code, "")).strip()
name = node_name if node_name else node_code
description = _text(row.full_description).strip()
if not description:
print(
f"Warning: Missing description for skill '{node_code}' in Skill_Set.csv. "
f"Using default description."
)
description = (
f"Common Core State StandardS for Mathematics: Skill {node_code}"
)
skill_rows.append((node_code, skill_id, name, description))
# max_skill_id = max(skill_id_map.values(), default=-1)
missing_node_codes = sorted(
set(skills_df["node_code"].tolist()) - set(skill_id_map)
)
# for node_code in missing_node_codes:
# max_skill_id += 1
# skill_id_map[node_code] = max_skill_id
# node_name = _text(node_name_by_code.get(node_code, "")).strip()
# name = node_name if node_name else node_code
# description = (
# node_name
# if node_name
# else f"Common Core State StandardS for Mathematics: Skill {node_code}"
# )
# skill_rows.append((node_code, max_skill_id, name, description))
if missing_node_codes:
raise ValueError(
f"Error: Found {len(missing_node_codes)} node_code(s) in Skills.csv that are missing from Skill_Set.csv. "
f"Please ensure all node_code values in Skills.csv have a corresponding skill_code in Skill_Set.csv. "
f"Missing node_codes: {missing_node_codes}"
)
skills: list[dict] = []
for _, skill_id, name, description in sorted(skill_rows, key=lambda r: r[0]):
skills.append(
{
"id": skill_id,
"name": name,
"description": description,
"prerequisites": [],
}
)
fallback_skill_id = max([s["id"] for s in skills], default=-1) + 1
skills.append(
{
"id": fallback_skill_id,
"name": "UnmappedSkill",
"description": "Fallback skill for questions without explicit skill tags.",
"prerequisites": [],
}
)
problem_to_skills: dict[int, list[int]] = {}
pairs = skills_df[["problem_id", "node_code"]].drop_duplicates()
for row in pairs.itertuples(index=False):
pid = _as_int(row.problem_id)
sid = skill_id_map[_text(row.node_code).strip()]
problem_to_skills.setdefault(pid, []).append(sid)
for pid, sids in problem_to_skills.items():
if len(sids) == 0:
print(f"Warning: Problem {pid} has no valid skill mappings.")
problem_to_skills[pid] = sorted(set(sids))
return skills, problem_to_skills, fallback_skill_id
def build_question_content(problem_row: pd.Series) -> tuple[str, str]:
"""Create question content and canonical correct answer from preprocessed fields."""
body = _text(problem_row.get("cleaned body", "")).strip()
problem_type = _text(problem_row.get("Problem Type", "")).strip()
answer_options_formatted = _text(
problem_row.get("answer_options_formatted", "")
).strip()
correct_answer = _text(problem_row.get("correct_answers", "")).strip()
body_parts: list[str] = []
if body:
body_parts.append(body)
if problem_type:
body_parts.append(f"Problem Type: {problem_type}")
if answer_options_formatted:
body_parts.append(f"Answer Options:\n{answer_options_formatted}")
if not body_parts:
problem_id = problem_row.get("problem_id", "unknown")
return f"Problem {problem_id}", correct_answer
return "\n\n".join(body_parts), correct_answer
def load_questions(
problems_df: pd.DataFrame,
problem_to_skills: dict[int, list[int]],
fallback_skill_id: int,
) -> tuple[list[dict], dict[int, str], int]:
"""Build OEKT question objects from preprocessed Problems data."""
questions: list[dict] = []
problem_to_qid: dict[int, str] = {}
unmapped_questions = 0
for row in problems_df.to_dict(orient="records"):
pid = _as_int(row["problem_id"])
qid = f"q_{pid}"
skill_ids = problem_to_skills.get(pid, [])
if not skill_ids:
skill_ids = [fallback_skill_id]
unmapped_questions += 1
content, correct_answer = build_question_content(pd.Series(row))
question = {
"id": qid,
"content": content,
"skill_ids": skill_ids,
"rubrics": [
{
"id": f"r_{pid}_0",
"description": (f"Match the correct answer: {correct_answer}"),
"skill_ids": skill_ids,
}
],
}
questions.append(question)
problem_to_qid[pid] = qid
return questions, problem_to_qid, unmapped_questions
def load_interactions(
interactions_path: Path,
problem_meta_df: pd.DataFrame,
max_interactions: int | None = None,
) -> pd.DataFrame:
"""Load and normalize interaction logs used to build student trajectories."""
usecols = [
"id",
"problem_id",
"answer_text",
"discrete_score",
"end_time",
"user_id",
]
df = pd.read_csv(interactions_path, usecols=usecols, low_memory=False)
df["problem_id"] = pd.to_numeric(df["problem_id"], errors="coerce")
df["discrete_score"] = pd.to_numeric(df["discrete_score"], errors="coerce")
df["id"] = pd.to_numeric(df["id"], errors="coerce")
df["end_time"] = pd.to_datetime(df["end_time"], errors="coerce", utc=True)
df = df.dropna(subset=["user_id", "problem_id", "discrete_score"]).copy()
df["user_id"] = df["user_id"].astype(str)
df["problem_id"] = df["problem_id"].astype(int)
df["id"] = df["id"].fillna(-1).astype(int)
answer_meta = problem_meta_df[
["problem_id", "Problem Type", "answer_options"]
].copy()
df = df.merge(answer_meta, on="problem_id", how="left")
mc_types = {"Multiple Choice (select 1)", "Multiple Choice (select all)"}
df["answer_text"] = df.apply(
lambda row: (
match_student_answer_to_letters(row["answer_text"], row["answer_options"])
if _text(row.get("Problem Type", "")).strip() in mc_types
and isinstance(row.get("answer_options"), dict)
else _text(row["answer_text"])
),
axis=1,
)
df = df.drop(columns=["Problem Type", "answer_options"])
df = df.sort_values(["user_id", "end_time", "id"], kind="mergesort")
if max_interactions is not None:
if max_interactions <= 0:
raise ValueError("--max-interactions must be a positive integer.")
df = df.head(max_interactions).copy()
return df
def build_qmatrix(questions: list[dict], num_skills: int) -> list[list[float]]:
"""Build a rubric x skill matrix consistent with question/rubric ordering."""
qmatrix: list[list[float]] = []
for question in questions:
for rubric in question["rubrics"]:
row = [0.0] * num_skills
for sid in rubric["skill_ids"]:
row[int(sid)] = 1.0
qmatrix.append(row)
return qmatrix
def split_student_ids(
student_ids: list[str],
train_ratio: float,
valid_ratio: float,
test_ratio: float,
seed: int,
) -> tuple[list[str], list[str], list[str]]:
"""Create deterministic train/valid/test split lists at the student level."""
if train_ratio < 0 or valid_ratio < 0 or test_ratio < 0:
raise ValueError("Split ratios must be non-negative.")
total = train_ratio + valid_ratio + test_ratio
if total <= 0:
raise ValueError("At least one split ratio must be > 0.")
ids = list(student_ids)
ids.sort()
rng = random.Random(seed)
rng.shuffle(ids)
train_count = int(len(ids) * (train_ratio / total))
valid_count = int(len(ids) * (valid_ratio / total))
train_ids = ids[:train_count]
valid_ids = ids[train_count : train_count + valid_count]
test_ids = ids[train_count + valid_count :]
return train_ids, valid_ids, test_ids
def get_calendar_group_key(
end_time: pd.Timestamp | None,
grouping_mode: GroupingMode,
missing_idx: int,
) -> tuple[object, ...]:
"""Return a stable calendar bucket key for an interaction timestamp."""
if end_time is None:
return ("missing", missing_idx)
ts = end_time
if ts.tzinfo is None:
ts = ts.tz_localize("UTC")
else:
ts = ts.tz_convert("UTC")
if grouping_mode == "1h":
return ("1h", ts.year, ts.month, ts.day, ts.hour)
if grouping_mode == "halfday":
return ("halfday", ts.year, ts.month, ts.day, 0 if ts.hour < 12 else 1)
if grouping_mode == "day":
return ("day", ts.year, ts.month, ts.day)
if grouping_mode == "week":
iso = ts.isocalendar()
return ("week", int(iso.year), int(iso.week))
if grouping_mode == "month":
return ("month", ts.year, ts.month)
if grouping_mode == "year":
return ("year", ts.year)
raise ValueError(f"Unsupported grouping mode: {grouping_mode}")
def write_dataset_json(
dataset_path: Path,
skills: list[dict],
questions: list[dict],
interactions_df: pd.DataFrame,
problem_to_qid: dict[int, str],
grouping_mode: GroupingMode = "none",
save_unmapped_skills: bool = False,
) -> tuple[list[str], int, int, int, int]:
"""Stream-write dataset.json while optionally grouping by calendar buckets."""
dataset_path.parent.mkdir(parents=True, exist_ok=True)
student_ids: list[str] = []
num_students = 0
num_time_steps = 0
num_questions = 0
skipped_interactions = 0
with open(dataset_path, "w", encoding="utf-8") as f:
f.write("{")
f.write('"skills":')
if not save_unmapped_skills:
saving_skills = (
skills[:-1]
if skills and skills[-1]["name"] == "UnmappedSkill"
else skills
)
else:
saving_skills = skills
json.dump(saving_skills, f, ensure_ascii=False)
f.write(',"questions":')
json.dump(questions, f, ensure_ascii=False)
f.write(',"students":[')
first_student = True
for user_id, student_df in tqdm(interactions_df.groupby("user_id", sort=False)):
time_steps: list[dict] = []
current_group_key: tuple[object, ...] | None = None
for row_idx, row in enumerate(student_df.itertuples(index=False)):
pid = _as_int(row.problem_id)
qid = problem_to_qid.get(pid)
if qid is None:
skipped_interactions += 1
continue
score = 1 if _as_float(row.discrete_score) >= 1.0 else 0
answer_text = _text(row.answer_text)
response = {
"question_id": qid,
"answer_text": answer_text,
"rubric_scores": [score],
}
num_questions += 1
if grouping_mode == "none":
time_steps.append(
{
"t": len(time_steps),
"responses": [response],
}
)
continue
row_end_time_raw = row.end_time
row_end_time: pd.Timestamp | None = (
None
if pd.isna(row_end_time_raw)
else cast(pd.Timestamp, row_end_time_raw)
)
group_key = get_calendar_group_key(
end_time=row_end_time,
grouping_mode=grouping_mode,
missing_idx=row_idx,
)
if time_steps and current_group_key == group_key:
time_steps[-1]["responses"].append(response)
continue
time_steps.append(
{
"t": len(time_steps),
"responses": [response],
}
)
current_group_key = group_key
if not time_steps:
continue
student_obj = {
"student_id": user_id,
"time_steps": time_steps,
}
if not first_student:
f.write(",")
json.dump(student_obj, f, ensure_ascii=False)
first_student = False
student_ids.append(str(user_id))
num_students += 1
num_time_steps += len(time_steps)
f.write("]}")
return (
student_ids,
num_students,
num_time_steps,
num_questions,
skipped_interactions,
)
def save_json(path: Path, obj: object) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(obj, f, indent=2, ensure_ascii=False)
def main() -> None:
args = parse_args()
data_dir = args.data_dir
output_dir = args.output_dir
interactions_path = data_dir / "Interactions.csv"
problems_path = data_dir / "Problems.csv"
skill_set_path = data_dir / "Skill_Set.csv"
skills_path = data_dir / "Skills.csv"
for p in [interactions_path, problems_path, skill_set_path, skills_path]:
if not p.exists():
raise FileNotFoundError(f"Required input file not found: {p}")
print("Loading skills...")
skills, problem_to_skills, fallback_skill_id = load_skill_tables(
skills_path=skills_path,
skill_set_path=skill_set_path,
)
print("Loading and preprocessing problems...")
problems_df = load_and_preprocess_problems(problems_path)
print("Loading questions...")
questions, problem_to_qid, unmapped_questions = load_questions(
problems_df=problems_df,
problem_to_skills=problem_to_skills,
fallback_skill_id=fallback_skill_id,
)
print("Loading interactions...")
interactions_df = load_interactions(
interactions_path,
problem_meta_df=problems_df,
max_interactions=args.max_interactions,
)
print("Writing dataset.json...")
dataset_path = output_dir / "dataset.json"
(
student_ids,
num_students,
num_time_steps,
num_questions,
skipped_interactions,
) = write_dataset_json(
dataset_path=dataset_path,
skills=skills,
questions=questions,
interactions_df=interactions_df,
problem_to_qid=problem_to_qid,
grouping_mode=args.grouping_time,
save_unmapped_skills=(unmapped_questions > 0),
)
print("Building qmatrix.json...")
num_skills = len(skills) - int(unmapped_questions == 0)
qmatrix = build_qmatrix(questions, num_skills=num_skills)
save_json(output_dir / "qmatrix.json", qmatrix)
print("Building train/valid/test split files...")
train_ids, valid_ids, test_ids = split_student_ids(
student_ids=student_ids,
train_ratio=args.train_ratio,
valid_ratio=args.valid_ratio,
test_ratio=args.test_ratio,
seed=args.seed,
)
save_json(output_dir / "trainset.json", train_ids)
save_json(output_dir / "validset.json", valid_ids)
save_json(output_dir / "testset.json", test_ids)
total_rubrics = sum(len(q["rubrics"]) for q in questions)
question_skill_counts = [len(q.get("skill_ids", [])) for q in questions]
rubric_skill_counts = [
len(r.get("skill_ids", [])) for q in questions for r in q.get("rubrics", [])
]
avg_skills_per_question = (
sum(question_skill_counts) / len(question_skill_counts)
if question_skill_counts
else 0.0
)
avg_skills_per_rubric = (
sum(rubric_skill_counts) / len(rubric_skill_counts)
if rubric_skill_counts
else 0.0
)
avg_time_steps_per_student = (
num_time_steps / num_students if num_students > 0 else 0.0
)
avg_questions_per_timestep = (
num_questions / num_time_steps if num_time_steps > 0 else 0.0
)
print("\n=== Conversion Summary ===")
print(f"Skills: {num_skills}")
print(f"Questions: {len(questions)}")
print(f"Rubrics: {total_rubrics}")
print(f"Avg skills/question: {avg_skills_per_question:.3f}")
print(f"Avg skills/rubric: {avg_skills_per_rubric:.3f}")
print(f"Students: {num_students}")
print(f"Time steps: {num_time_steps}")
print(f"Avg timesteps/student: {avg_time_steps_per_student:.3f}")
print(f"Avg questions/timestep: {avg_questions_per_timestep:.3f}")
print(f"Grouping mode: {args.grouping_time}")
print(f"Unmapped questions: {unmapped_questions}")
print(f"Skipped interactions:{skipped_interactions}")
print(f"Output directory: {output_dir}")
if __name__ == "__main__":
main()