#!/usr/bin/env python3 """ Convert FoundationalASSIST CSV files to the CSEDM/OEKT JSON format. Inputs (under Data/ by default): - Interactions.csv - Problems.csv - Skill_Set.csv - Skills.csv Outputs (under src/data/FoundationalASSIST/ by default): - dataset.json - qmatrix.json - trainset.json - validset.json - testset.json The produced dataset JSON follows the same schema used by src/data/CSEDM. """ from __future__ import annotations import argparse import json import random import re from pathlib import Path from typing import Literal, cast import pandas as pd from tqdm import tqdm from clean_utils import clean_problem_body PROJECT_ROOT = Path(__file__).resolve().parents[3] DEFAULT_DATA_DIR = Path(__file__).resolve().parent.parent / "Data" DEFAULT_OUTPUT_DIR = PROJECT_ROOT / "src" / "data" / "FoundationalASSIST" GroupingMode = Literal["none", "1h", "halfday", "day", "week", "month", "year"] def parse_grouping_mode(value: str) -> GroupingMode: """Normalize grouping mode aliases used by --grouping-time.""" normalized = value.strip().lower() aliases: dict[str, GroupingMode] = { "0": "none", "0.0": "none", "none": "none", "off": "none", "no": "none", "1h": "1h", "hour": "1h", "halfday": "halfday", "half-day": "halfday", "day": "day", "week": "week", "month": "month", "year": "year", } mode = aliases.get(normalized) if mode is None: valid_values = "1h, halfday, day, week, month, year, none" raise argparse.ArgumentTypeError( f"Invalid grouping mode '{value}'. Valid values: {valid_values}." ) return mode def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Convert FoundationalASSIST to CSEDM/OEKT JSON format." ) parser.add_argument( "--data-dir", type=Path, default=DEFAULT_DATA_DIR, help="Directory containing Interactions.csv, Problems.csv, Skills.csv.", ) parser.add_argument( "--output-dir", type=Path, default=DEFAULT_OUTPUT_DIR, help="Directory to write dataset.json/qmatrix.json/split files.", ) parser.add_argument( "--seed", type=int, default=42, help="Random seed used for train/valid/test student split.", ) parser.add_argument( "--train-ratio", type=float, default=0.8, help="Fraction of students in train split.", ) parser.add_argument( "--valid-ratio", type=float, default=0.1, help="Fraction of students in valid split.", ) parser.add_argument( "--test-ratio", type=float, default=0.1, help="Fraction of students in test split.", ) parser.add_argument( "--max-interactions", type=int, default=None, help=( "Optional cap on number of interaction rows after sorting. " "Useful for quick smoke tests." ), ) parser.add_argument( "--grouping-time", type=parse_grouping_mode, default="none", help=( "Calendar grouping mode per student: 1h, halfday, day, week, " "month, year, or none." ), ) return parser.parse_args() def _text(v: object) -> str: if v is None: return "" if v is pd.NA: return "" if isinstance(v, float) and pd.isna(v): return "" return str(v) def _as_int(v: object) -> int: return int(float(_text(v))) def _as_float(v: object) -> float: return float(_text(v)) def label_answer_options(answer_string: object) -> dict[str, str] | None: """Convert pipe-delimited answers to lettered format.""" answer_text = _text(answer_string).strip() if not answer_text: return None options = [opt.strip() for opt in answer_text.split("||")] letters = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J"] return {letters[i]: opt for i, opt in enumerate(options) if i < len(letters)} def clean_html_and_normalize(text: object) -> str: """Remove HTML tags and normalize text for reliable comparisons.""" normalized = _text(text) if not normalized: return "" normalized = re.sub(r"<[^>]+>", "", normalized) normalized = " ".join(normalized.split()) normalized = re.sub(r"\s*:\s*", ":", normalized) return normalized.strip() def match_student_answer_to_letters( student_answer_text: object, answer_options_dict: dict[str, str] | None, ) -> str: """Map student multiple-choice answer text(s) to letter labels.""" answer_text = _text(student_answer_text) if not answer_text or not answer_options_dict: return answer_text student_answers = [ans.strip() for ans in answer_text.split(" , ")] normalized_options = { letter: clean_html_and_normalize(text) for letter, text in answer_options_dict.items() } matched_letters: list[str] = [] for student_ans in student_answers: normalized_student = clean_html_and_normalize(student_ans) for letter, normalized_option in normalized_options.items(): if normalized_student == normalized_option: matched_letters.append(letter) break else: for letter, normalized_option in normalized_options.items(): if ( normalized_student in normalized_option or normalized_option in normalized_student ): matched_letters.append(letter) break if matched_letters: return ", ".join(sorted(set(matched_letters))) return answer_text def get_correct_option_letters( answer_options: dict[str, str] | None, correct_answers: object, ) -> str: """Resolve the correct answer text(s) to option letters for MC items.""" correct_answer_text = _text(correct_answers).strip() if not answer_options or not correct_answer_text: return correct_answer_text correct_list = [ans.strip() for ans in correct_answer_text.split("||")] correct_letters = [ letter for letter, text in answer_options.items() if text in correct_list ] return ( ", ".join(sorted(correct_letters)) if correct_letters else correct_answer_text ) def format_answer_options_for_prompt(answer_options: dict[str, str] | None) -> str: """Format answer options dictionary for human-readable prompt text.""" if not answer_options: return "" return "\n".join([f"{letter}) {text}" for letter, text in answer_options.items()]) def load_and_preprocess_problems(problems_path: Path) -> pd.DataFrame: """Load and preprocess problems with the same answer handling as KT inference.""" problems_df = pd.read_csv(problems_path, low_memory=False) problems_df["problem_id"] = pd.to_numeric( problems_df["problem_id"], errors="coerce" ) problems_df = problems_df.dropna(subset=["problem_id"]).copy() problems_df["problem_id"] = problems_df["problem_id"].astype(int) problems_df = problems_df.sort_values(["problem_id"]).drop_duplicates( subset=["problem_id"], keep="first" ) problems_df["cleaned body"] = problems_df["Problem Body"].apply(clean_problem_body) problems_df["answer_options"] = problems_df["Multiple Choice Options"].apply( label_answer_options ) mc_types = {"Multiple Choice (select 1)", "Multiple Choice (select all)"} problems_df["correct_answers"] = problems_df.apply( lambda row: ( get_correct_option_letters( row["answer_options"], row["Multiple Choice Answers"], ) if _text(row["Problem Type"]).strip() in mc_types else _text(row.get("Fill-in Answers", "")) ), axis=1, ) problems_df["answer_options_formatted"] = problems_df["answer_options"].apply( format_answer_options_for_prompt ) return problems_df def load_skill_tables( skills_path: Path, skill_set_path: Path, ) -> tuple[list[dict], dict[int, list[int]], int]: """Load skills and build a problem_id -> skill_ids mapping. Returns: skills: OEKT skill list. problem_to_skills: Mapping from original problem_id to contiguous skill IDs. fallback_skill_id: Skill ID for untagged problems. """ usecols = ["problem_id", "node_code", "node_name"] skills_df = pd.read_csv(skills_path, usecols=usecols, low_memory=False) skills_df["problem_id"] = pd.to_numeric(skills_df["problem_id"], errors="coerce") skills_df = skills_df.dropna(subset=["problem_id"]).copy() skills_df["problem_id"] = skills_df["problem_id"].astype(int) skills_df["node_code"] = skills_df["node_code"].apply(lambda v: _text(v).strip()) skills_df["node_name"] = skills_df["node_name"].apply(lambda v: _text(v).strip()) skills_df = skills_df[skills_df["node_code"] != ""].copy() skill_set_df = pd.read_csv( skill_set_path, usecols=["index", "skill_code", "full_description"], low_memory=False, ) skill_set_df["index"] = pd.to_numeric(skill_set_df["index"], errors="coerce") skill_set_df = skill_set_df.dropna(subset=["index"]).copy() skill_set_df["index"] = skill_set_df["index"].astype(int) skill_set_df["skill_code"] = skill_set_df["skill_code"].apply( lambda v: _text(v).strip() ) skill_set_df["full_description"] = skill_set_df["full_description"].apply( lambda v: _text(v).strip() ) skill_set_df = skill_set_df[skill_set_df["skill_code"] != ""].copy() skill_set_df = ( skill_set_df.sort_values(["index", "skill_code"]) .drop_duplicates(subset=["skill_code"], keep="first") .copy() ) node_name_by_code = ( skills_df.sort_values(["node_code", "node_name"]) .drop_duplicates(subset=["node_code"], keep="first") .set_index("node_code")["node_name"] .to_dict() ) skill_rows: list[tuple[str, int, str, str]] = [] skill_id_map: dict[str, int] = {} for row in skill_set_df.itertuples(index=False): node_code = _text(row.skill_code).strip() skill_id = _as_int(row.index) - 1 skill_id_map[node_code] = skill_id node_name = _text(node_name_by_code.get(node_code, "")).strip() name = node_name if node_name else node_code description = _text(row.full_description).strip() if not description: print( f"Warning: Missing description for skill '{node_code}' in Skill_Set.csv. " f"Using default description." ) description = ( f"Common Core State StandardS for Mathematics: Skill {node_code}" ) skill_rows.append((node_code, skill_id, name, description)) # max_skill_id = max(skill_id_map.values(), default=-1) missing_node_codes = sorted( set(skills_df["node_code"].tolist()) - set(skill_id_map) ) # for node_code in missing_node_codes: # max_skill_id += 1 # skill_id_map[node_code] = max_skill_id # node_name = _text(node_name_by_code.get(node_code, "")).strip() # name = node_name if node_name else node_code # description = ( # node_name # if node_name # else f"Common Core State StandardS for Mathematics: Skill {node_code}" # ) # skill_rows.append((node_code, max_skill_id, name, description)) if missing_node_codes: raise ValueError( f"Error: Found {len(missing_node_codes)} node_code(s) in Skills.csv that are missing from Skill_Set.csv. " f"Please ensure all node_code values in Skills.csv have a corresponding skill_code in Skill_Set.csv. " f"Missing node_codes: {missing_node_codes}" ) skills: list[dict] = [] for _, skill_id, name, description in sorted(skill_rows, key=lambda r: r[0]): skills.append( { "id": skill_id, "name": name, "description": description, "prerequisites": [], } ) fallback_skill_id = max([s["id"] for s in skills], default=-1) + 1 skills.append( { "id": fallback_skill_id, "name": "UnmappedSkill", "description": "Fallback skill for questions without explicit skill tags.", "prerequisites": [], } ) problem_to_skills: dict[int, list[int]] = {} pairs = skills_df[["problem_id", "node_code"]].drop_duplicates() for row in pairs.itertuples(index=False): pid = _as_int(row.problem_id) sid = skill_id_map[_text(row.node_code).strip()] problem_to_skills.setdefault(pid, []).append(sid) for pid, sids in problem_to_skills.items(): if len(sids) == 0: print(f"Warning: Problem {pid} has no valid skill mappings.") problem_to_skills[pid] = sorted(set(sids)) return skills, problem_to_skills, fallback_skill_id def build_question_content(problem_row: pd.Series) -> tuple[str, str]: """Create question content and canonical correct answer from preprocessed fields.""" body = _text(problem_row.get("cleaned body", "")).strip() problem_type = _text(problem_row.get("Problem Type", "")).strip() answer_options_formatted = _text( problem_row.get("answer_options_formatted", "") ).strip() correct_answer = _text(problem_row.get("correct_answers", "")).strip() body_parts: list[str] = [] if body: body_parts.append(body) if problem_type: body_parts.append(f"Problem Type: {problem_type}") if answer_options_formatted: body_parts.append(f"Answer Options:\n{answer_options_formatted}") if not body_parts: problem_id = problem_row.get("problem_id", "unknown") return f"Problem {problem_id}", correct_answer return "\n\n".join(body_parts), correct_answer def load_questions( problems_df: pd.DataFrame, problem_to_skills: dict[int, list[int]], fallback_skill_id: int, ) -> tuple[list[dict], dict[int, str], int]: """Build OEKT question objects from preprocessed Problems data.""" questions: list[dict] = [] problem_to_qid: dict[int, str] = {} unmapped_questions = 0 for row in problems_df.to_dict(orient="records"): pid = _as_int(row["problem_id"]) qid = f"q_{pid}" skill_ids = problem_to_skills.get(pid, []) if not skill_ids: skill_ids = [fallback_skill_id] unmapped_questions += 1 content, correct_answer = build_question_content(pd.Series(row)) question = { "id": qid, "content": content, "skill_ids": skill_ids, "rubrics": [ { "id": f"r_{pid}_0", "description": (f"Match the correct answer: {correct_answer}"), "skill_ids": skill_ids, } ], } questions.append(question) problem_to_qid[pid] = qid return questions, problem_to_qid, unmapped_questions def load_interactions( interactions_path: Path, problem_meta_df: pd.DataFrame, max_interactions: int | None = None, ) -> pd.DataFrame: """Load and normalize interaction logs used to build student trajectories.""" usecols = [ "id", "problem_id", "answer_text", "discrete_score", "end_time", "user_id", ] df = pd.read_csv(interactions_path, usecols=usecols, low_memory=False) df["problem_id"] = pd.to_numeric(df["problem_id"], errors="coerce") df["discrete_score"] = pd.to_numeric(df["discrete_score"], errors="coerce") df["id"] = pd.to_numeric(df["id"], errors="coerce") df["end_time"] = pd.to_datetime(df["end_time"], errors="coerce", utc=True) df = df.dropna(subset=["user_id", "problem_id", "discrete_score"]).copy() df["user_id"] = df["user_id"].astype(str) df["problem_id"] = df["problem_id"].astype(int) df["id"] = df["id"].fillna(-1).astype(int) answer_meta = problem_meta_df[ ["problem_id", "Problem Type", "answer_options"] ].copy() df = df.merge(answer_meta, on="problem_id", how="left") mc_types = {"Multiple Choice (select 1)", "Multiple Choice (select all)"} df["answer_text"] = df.apply( lambda row: ( match_student_answer_to_letters(row["answer_text"], row["answer_options"]) if _text(row.get("Problem Type", "")).strip() in mc_types and isinstance(row.get("answer_options"), dict) else _text(row["answer_text"]) ), axis=1, ) df = df.drop(columns=["Problem Type", "answer_options"]) df = df.sort_values(["user_id", "end_time", "id"], kind="mergesort") if max_interactions is not None: if max_interactions <= 0: raise ValueError("--max-interactions must be a positive integer.") df = df.head(max_interactions).copy() return df def build_qmatrix(questions: list[dict], num_skills: int) -> list[list[float]]: """Build a rubric x skill matrix consistent with question/rubric ordering.""" qmatrix: list[list[float]] = [] for question in questions: for rubric in question["rubrics"]: row = [0.0] * num_skills for sid in rubric["skill_ids"]: row[int(sid)] = 1.0 qmatrix.append(row) return qmatrix def split_student_ids( student_ids: list[str], train_ratio: float, valid_ratio: float, test_ratio: float, seed: int, ) -> tuple[list[str], list[str], list[str]]: """Create deterministic train/valid/test split lists at the student level.""" if train_ratio < 0 or valid_ratio < 0 or test_ratio < 0: raise ValueError("Split ratios must be non-negative.") total = train_ratio + valid_ratio + test_ratio if total <= 0: raise ValueError("At least one split ratio must be > 0.") ids = list(student_ids) ids.sort() rng = random.Random(seed) rng.shuffle(ids) train_count = int(len(ids) * (train_ratio / total)) valid_count = int(len(ids) * (valid_ratio / total)) train_ids = ids[:train_count] valid_ids = ids[train_count : train_count + valid_count] test_ids = ids[train_count + valid_count :] return train_ids, valid_ids, test_ids def get_calendar_group_key( end_time: pd.Timestamp | None, grouping_mode: GroupingMode, missing_idx: int, ) -> tuple[object, ...]: """Return a stable calendar bucket key for an interaction timestamp.""" if end_time is None: return ("missing", missing_idx) ts = end_time if ts.tzinfo is None: ts = ts.tz_localize("UTC") else: ts = ts.tz_convert("UTC") if grouping_mode == "1h": return ("1h", ts.year, ts.month, ts.day, ts.hour) if grouping_mode == "halfday": return ("halfday", ts.year, ts.month, ts.day, 0 if ts.hour < 12 else 1) if grouping_mode == "day": return ("day", ts.year, ts.month, ts.day) if grouping_mode == "week": iso = ts.isocalendar() return ("week", int(iso.year), int(iso.week)) if grouping_mode == "month": return ("month", ts.year, ts.month) if grouping_mode == "year": return ("year", ts.year) raise ValueError(f"Unsupported grouping mode: {grouping_mode}") def write_dataset_json( dataset_path: Path, skills: list[dict], questions: list[dict], interactions_df: pd.DataFrame, problem_to_qid: dict[int, str], grouping_mode: GroupingMode = "none", save_unmapped_skills: bool = False, ) -> tuple[list[str], int, int, int, int]: """Stream-write dataset.json while optionally grouping by calendar buckets.""" dataset_path.parent.mkdir(parents=True, exist_ok=True) student_ids: list[str] = [] num_students = 0 num_time_steps = 0 num_questions = 0 skipped_interactions = 0 with open(dataset_path, "w", encoding="utf-8") as f: f.write("{") f.write('"skills":') if not save_unmapped_skills: saving_skills = ( skills[:-1] if skills and skills[-1]["name"] == "UnmappedSkill" else skills ) else: saving_skills = skills json.dump(saving_skills, f, ensure_ascii=False) f.write(',"questions":') json.dump(questions, f, ensure_ascii=False) f.write(',"students":[') first_student = True for user_id, student_df in tqdm(interactions_df.groupby("user_id", sort=False)): time_steps: list[dict] = [] current_group_key: tuple[object, ...] | None = None for row_idx, row in enumerate(student_df.itertuples(index=False)): pid = _as_int(row.problem_id) qid = problem_to_qid.get(pid) if qid is None: skipped_interactions += 1 continue score = 1 if _as_float(row.discrete_score) >= 1.0 else 0 answer_text = _text(row.answer_text) response = { "question_id": qid, "answer_text": answer_text, "rubric_scores": [score], } num_questions += 1 if grouping_mode == "none": time_steps.append( { "t": len(time_steps), "responses": [response], } ) continue row_end_time_raw = row.end_time row_end_time: pd.Timestamp | None = ( None if pd.isna(row_end_time_raw) else cast(pd.Timestamp, row_end_time_raw) ) group_key = get_calendar_group_key( end_time=row_end_time, grouping_mode=grouping_mode, missing_idx=row_idx, ) if time_steps and current_group_key == group_key: time_steps[-1]["responses"].append(response) continue time_steps.append( { "t": len(time_steps), "responses": [response], } ) current_group_key = group_key if not time_steps: continue student_obj = { "student_id": user_id, "time_steps": time_steps, } if not first_student: f.write(",") json.dump(student_obj, f, ensure_ascii=False) first_student = False student_ids.append(str(user_id)) num_students += 1 num_time_steps += len(time_steps) f.write("]}") return ( student_ids, num_students, num_time_steps, num_questions, skipped_interactions, ) def save_json(path: Path, obj: object) -> None: path.parent.mkdir(parents=True, exist_ok=True) with open(path, "w", encoding="utf-8") as f: json.dump(obj, f, indent=2, ensure_ascii=False) def main() -> None: args = parse_args() data_dir = args.data_dir output_dir = args.output_dir interactions_path = data_dir / "Interactions.csv" problems_path = data_dir / "Problems.csv" skill_set_path = data_dir / "Skill_Set.csv" skills_path = data_dir / "Skills.csv" for p in [interactions_path, problems_path, skill_set_path, skills_path]: if not p.exists(): raise FileNotFoundError(f"Required input file not found: {p}") print("Loading skills...") skills, problem_to_skills, fallback_skill_id = load_skill_tables( skills_path=skills_path, skill_set_path=skill_set_path, ) print("Loading and preprocessing problems...") problems_df = load_and_preprocess_problems(problems_path) print("Loading questions...") questions, problem_to_qid, unmapped_questions = load_questions( problems_df=problems_df, problem_to_skills=problem_to_skills, fallback_skill_id=fallback_skill_id, ) print("Loading interactions...") interactions_df = load_interactions( interactions_path, problem_meta_df=problems_df, max_interactions=args.max_interactions, ) print("Writing dataset.json...") dataset_path = output_dir / "dataset.json" ( student_ids, num_students, num_time_steps, num_questions, skipped_interactions, ) = write_dataset_json( dataset_path=dataset_path, skills=skills, questions=questions, interactions_df=interactions_df, problem_to_qid=problem_to_qid, grouping_mode=args.grouping_time, save_unmapped_skills=(unmapped_questions > 0), ) print("Building qmatrix.json...") num_skills = len(skills) - int(unmapped_questions == 0) qmatrix = build_qmatrix(questions, num_skills=num_skills) save_json(output_dir / "qmatrix.json", qmatrix) print("Building train/valid/test split files...") train_ids, valid_ids, test_ids = split_student_ids( student_ids=student_ids, train_ratio=args.train_ratio, valid_ratio=args.valid_ratio, test_ratio=args.test_ratio, seed=args.seed, ) save_json(output_dir / "trainset.json", train_ids) save_json(output_dir / "validset.json", valid_ids) save_json(output_dir / "testset.json", test_ids) total_rubrics = sum(len(q["rubrics"]) for q in questions) question_skill_counts = [len(q.get("skill_ids", [])) for q in questions] rubric_skill_counts = [ len(r.get("skill_ids", [])) for q in questions for r in q.get("rubrics", []) ] avg_skills_per_question = ( sum(question_skill_counts) / len(question_skill_counts) if question_skill_counts else 0.0 ) avg_skills_per_rubric = ( sum(rubric_skill_counts) / len(rubric_skill_counts) if rubric_skill_counts else 0.0 ) avg_time_steps_per_student = ( num_time_steps / num_students if num_students > 0 else 0.0 ) avg_questions_per_timestep = ( num_questions / num_time_steps if num_time_steps > 0 else 0.0 ) print("\n=== Conversion Summary ===") print(f"Skills: {num_skills}") print(f"Questions: {len(questions)}") print(f"Rubrics: {total_rubrics}") print(f"Avg skills/question: {avg_skills_per_question:.3f}") print(f"Avg skills/rubric: {avg_skills_per_rubric:.3f}") print(f"Students: {num_students}") print(f"Time steps: {num_time_steps}") print(f"Avg timesteps/student: {avg_time_steps_per_student:.3f}") print(f"Avg questions/timestep: {avg_questions_per_timestep:.3f}") print(f"Grouping mode: {args.grouping_time}") print(f"Unmapped questions: {unmapped_questions}") print(f"Skipped interactions:{skipped_interactions}") print(f"Output directory: {output_dir}") if __name__ == "__main__": main()