| |
| """ |
| Convert FoundationalASSIST CSV files to the CSEDM/OEKT JSON format. |
| |
| Inputs (under Data/ by default): |
| - Interactions.csv |
| - Problems.csv |
| - Skill_Set.csv |
| - Skills.csv |
| |
| Outputs (under src/data/FoundationalASSIST/ by default): |
| - dataset.json |
| - qmatrix.json |
| - trainset.json |
| - validset.json |
| - testset.json |
| |
| The produced dataset JSON follows the same schema used by src/data/CSEDM. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import random |
| import re |
| from pathlib import Path |
| from typing import Literal, cast |
|
|
| import pandas as pd |
| from tqdm import tqdm |
| from clean_utils import clean_problem_body |
|
|
| PROJECT_ROOT = Path(__file__).resolve().parents[3] |
| DEFAULT_DATA_DIR = Path(__file__).resolve().parent.parent / "Data" |
| DEFAULT_OUTPUT_DIR = PROJECT_ROOT / "src" / "data" / "FoundationalASSIST" |
| GroupingMode = Literal["none", "1h", "halfday", "day", "week", "month", "year"] |
|
|
|
|
| def parse_grouping_mode(value: str) -> GroupingMode: |
| """Normalize grouping mode aliases used by --grouping-time.""" |
| normalized = value.strip().lower() |
| aliases: dict[str, GroupingMode] = { |
| "0": "none", |
| "0.0": "none", |
| "none": "none", |
| "off": "none", |
| "no": "none", |
| "1h": "1h", |
| "hour": "1h", |
| "halfday": "halfday", |
| "half-day": "halfday", |
| "day": "day", |
| "week": "week", |
| "month": "month", |
| "year": "year", |
| } |
| mode = aliases.get(normalized) |
| if mode is None: |
| valid_values = "1h, halfday, day, week, month, year, none" |
| raise argparse.ArgumentTypeError( |
| f"Invalid grouping mode '{value}'. Valid values: {valid_values}." |
| ) |
| return mode |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser( |
| description="Convert FoundationalASSIST to CSEDM/OEKT JSON format." |
| ) |
| parser.add_argument( |
| "--data-dir", |
| type=Path, |
| default=DEFAULT_DATA_DIR, |
| help="Directory containing Interactions.csv, Problems.csv, Skills.csv.", |
| ) |
| parser.add_argument( |
| "--output-dir", |
| type=Path, |
| default=DEFAULT_OUTPUT_DIR, |
| help="Directory to write dataset.json/qmatrix.json/split files.", |
| ) |
| parser.add_argument( |
| "--seed", |
| type=int, |
| default=42, |
| help="Random seed used for train/valid/test student split.", |
| ) |
| parser.add_argument( |
| "--train-ratio", |
| type=float, |
| default=0.8, |
| help="Fraction of students in train split.", |
| ) |
| parser.add_argument( |
| "--valid-ratio", |
| type=float, |
| default=0.1, |
| help="Fraction of students in valid split.", |
| ) |
| parser.add_argument( |
| "--test-ratio", |
| type=float, |
| default=0.1, |
| help="Fraction of students in test split.", |
| ) |
| parser.add_argument( |
| "--max-interactions", |
| type=int, |
| default=None, |
| help=( |
| "Optional cap on number of interaction rows after sorting. " |
| "Useful for quick smoke tests." |
| ), |
| ) |
| parser.add_argument( |
| "--grouping-time", |
| type=parse_grouping_mode, |
| default="none", |
| help=( |
| "Calendar grouping mode per student: 1h, halfday, day, week, " |
| "month, year, or none." |
| ), |
| ) |
| return parser.parse_args() |
|
|
|
|
| def _text(v: object) -> str: |
| if v is None: |
| return "" |
| if v is pd.NA: |
| return "" |
| if isinstance(v, float) and pd.isna(v): |
| return "" |
| return str(v) |
|
|
|
|
| def _as_int(v: object) -> int: |
| return int(float(_text(v))) |
|
|
|
|
| def _as_float(v: object) -> float: |
| return float(_text(v)) |
|
|
|
|
| def label_answer_options(answer_string: object) -> dict[str, str] | None: |
| """Convert pipe-delimited answers to lettered format.""" |
| answer_text = _text(answer_string).strip() |
| if not answer_text: |
| return None |
|
|
| options = [opt.strip() for opt in answer_text.split("||")] |
| letters = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J"] |
| return {letters[i]: opt for i, opt in enumerate(options) if i < len(letters)} |
|
|
|
|
| def clean_html_and_normalize(text: object) -> str: |
| """Remove HTML tags and normalize text for reliable comparisons.""" |
| normalized = _text(text) |
| if not normalized: |
| return "" |
|
|
| normalized = re.sub(r"<[^>]+>", "", normalized) |
| normalized = " ".join(normalized.split()) |
| normalized = re.sub(r"\s*:\s*", ":", normalized) |
| return normalized.strip() |
|
|
|
|
| def match_student_answer_to_letters( |
| student_answer_text: object, |
| answer_options_dict: dict[str, str] | None, |
| ) -> str: |
| """Map student multiple-choice answer text(s) to letter labels.""" |
| answer_text = _text(student_answer_text) |
| if not answer_text or not answer_options_dict: |
| return answer_text |
|
|
| student_answers = [ans.strip() for ans in answer_text.split(" , ")] |
| normalized_options = { |
| letter: clean_html_and_normalize(text) |
| for letter, text in answer_options_dict.items() |
| } |
|
|
| matched_letters: list[str] = [] |
| for student_ans in student_answers: |
| normalized_student = clean_html_and_normalize(student_ans) |
|
|
| for letter, normalized_option in normalized_options.items(): |
| if normalized_student == normalized_option: |
| matched_letters.append(letter) |
| break |
| else: |
| for letter, normalized_option in normalized_options.items(): |
| if ( |
| normalized_student in normalized_option |
| or normalized_option in normalized_student |
| ): |
| matched_letters.append(letter) |
| break |
|
|
| if matched_letters: |
| return ", ".join(sorted(set(matched_letters))) |
| return answer_text |
|
|
|
|
| def get_correct_option_letters( |
| answer_options: dict[str, str] | None, |
| correct_answers: object, |
| ) -> str: |
| """Resolve the correct answer text(s) to option letters for MC items.""" |
| correct_answer_text = _text(correct_answers).strip() |
| if not answer_options or not correct_answer_text: |
| return correct_answer_text |
|
|
| correct_list = [ans.strip() for ans in correct_answer_text.split("||")] |
| correct_letters = [ |
| letter for letter, text in answer_options.items() if text in correct_list |
| ] |
| return ( |
| ", ".join(sorted(correct_letters)) if correct_letters else correct_answer_text |
| ) |
|
|
|
|
| def format_answer_options_for_prompt(answer_options: dict[str, str] | None) -> str: |
| """Format answer options dictionary for human-readable prompt text.""" |
| if not answer_options: |
| return "" |
| return "\n".join([f"{letter}) {text}" for letter, text in answer_options.items()]) |
|
|
|
|
| def load_and_preprocess_problems(problems_path: Path) -> pd.DataFrame: |
| """Load and preprocess problems with the same answer handling as KT inference.""" |
| problems_df = pd.read_csv(problems_path, low_memory=False) |
| problems_df["problem_id"] = pd.to_numeric( |
| problems_df["problem_id"], errors="coerce" |
| ) |
| problems_df = problems_df.dropna(subset=["problem_id"]).copy() |
| problems_df["problem_id"] = problems_df["problem_id"].astype(int) |
|
|
| problems_df = problems_df.sort_values(["problem_id"]).drop_duplicates( |
| subset=["problem_id"], keep="first" |
| ) |
|
|
| problems_df["cleaned body"] = problems_df["Problem Body"].apply(clean_problem_body) |
| problems_df["answer_options"] = problems_df["Multiple Choice Options"].apply( |
| label_answer_options |
| ) |
|
|
| mc_types = {"Multiple Choice (select 1)", "Multiple Choice (select all)"} |
| problems_df["correct_answers"] = problems_df.apply( |
| lambda row: ( |
| get_correct_option_letters( |
| row["answer_options"], |
| row["Multiple Choice Answers"], |
| ) |
| if _text(row["Problem Type"]).strip() in mc_types |
| else _text(row.get("Fill-in Answers", "")) |
| ), |
| axis=1, |
| ) |
| problems_df["answer_options_formatted"] = problems_df["answer_options"].apply( |
| format_answer_options_for_prompt |
| ) |
| return problems_df |
|
|
|
|
| def load_skill_tables( |
| skills_path: Path, |
| skill_set_path: Path, |
| ) -> tuple[list[dict], dict[int, list[int]], int]: |
| """Load skills and build a problem_id -> skill_ids mapping. |
| |
| Returns: |
| skills: OEKT skill list. |
| problem_to_skills: Mapping from original problem_id to contiguous skill IDs. |
| fallback_skill_id: Skill ID for untagged problems. |
| """ |
| usecols = ["problem_id", "node_code", "node_name"] |
| skills_df = pd.read_csv(skills_path, usecols=usecols, low_memory=False) |
|
|
| skills_df["problem_id"] = pd.to_numeric(skills_df["problem_id"], errors="coerce") |
| skills_df = skills_df.dropna(subset=["problem_id"]).copy() |
| skills_df["problem_id"] = skills_df["problem_id"].astype(int) |
| skills_df["node_code"] = skills_df["node_code"].apply(lambda v: _text(v).strip()) |
| skills_df["node_name"] = skills_df["node_name"].apply(lambda v: _text(v).strip()) |
| skills_df = skills_df[skills_df["node_code"] != ""].copy() |
|
|
| skill_set_df = pd.read_csv( |
| skill_set_path, |
| usecols=["index", "skill_code", "full_description"], |
| low_memory=False, |
| ) |
| skill_set_df["index"] = pd.to_numeric(skill_set_df["index"], errors="coerce") |
| skill_set_df = skill_set_df.dropna(subset=["index"]).copy() |
| skill_set_df["index"] = skill_set_df["index"].astype(int) |
| skill_set_df["skill_code"] = skill_set_df["skill_code"].apply( |
| lambda v: _text(v).strip() |
| ) |
| skill_set_df["full_description"] = skill_set_df["full_description"].apply( |
| lambda v: _text(v).strip() |
| ) |
| skill_set_df = skill_set_df[skill_set_df["skill_code"] != ""].copy() |
| skill_set_df = ( |
| skill_set_df.sort_values(["index", "skill_code"]) |
| .drop_duplicates(subset=["skill_code"], keep="first") |
| .copy() |
| ) |
|
|
| node_name_by_code = ( |
| skills_df.sort_values(["node_code", "node_name"]) |
| .drop_duplicates(subset=["node_code"], keep="first") |
| .set_index("node_code")["node_name"] |
| .to_dict() |
| ) |
|
|
| skill_rows: list[tuple[str, int, str, str]] = [] |
| skill_id_map: dict[str, int] = {} |
| for row in skill_set_df.itertuples(index=False): |
| node_code = _text(row.skill_code).strip() |
| skill_id = _as_int(row.index) - 1 |
| skill_id_map[node_code] = skill_id |
|
|
| node_name = _text(node_name_by_code.get(node_code, "")).strip() |
| name = node_name if node_name else node_code |
| description = _text(row.full_description).strip() |
| if not description: |
| print( |
| f"Warning: Missing description for skill '{node_code}' in Skill_Set.csv. " |
| f"Using default description." |
| ) |
|
|
| description = ( |
| f"Common Core State StandardS for Mathematics: Skill {node_code}" |
| ) |
|
|
| skill_rows.append((node_code, skill_id, name, description)) |
|
|
| |
| missing_node_codes = sorted( |
| set(skills_df["node_code"].tolist()) - set(skill_id_map) |
| ) |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| |
|
|
| if missing_node_codes: |
| raise ValueError( |
| f"Error: Found {len(missing_node_codes)} node_code(s) in Skills.csv that are missing from Skill_Set.csv. " |
| f"Please ensure all node_code values in Skills.csv have a corresponding skill_code in Skill_Set.csv. " |
| f"Missing node_codes: {missing_node_codes}" |
| ) |
|
|
| skills: list[dict] = [] |
| for _, skill_id, name, description in sorted(skill_rows, key=lambda r: r[0]): |
| skills.append( |
| { |
| "id": skill_id, |
| "name": name, |
| "description": description, |
| "prerequisites": [], |
| } |
| ) |
|
|
| fallback_skill_id = max([s["id"] for s in skills], default=-1) + 1 |
| skills.append( |
| { |
| "id": fallback_skill_id, |
| "name": "UnmappedSkill", |
| "description": "Fallback skill for questions without explicit skill tags.", |
| "prerequisites": [], |
| } |
| ) |
|
|
| problem_to_skills: dict[int, list[int]] = {} |
| pairs = skills_df[["problem_id", "node_code"]].drop_duplicates() |
| for row in pairs.itertuples(index=False): |
| pid = _as_int(row.problem_id) |
| sid = skill_id_map[_text(row.node_code).strip()] |
| problem_to_skills.setdefault(pid, []).append(sid) |
|
|
| for pid, sids in problem_to_skills.items(): |
| if len(sids) == 0: |
| print(f"Warning: Problem {pid} has no valid skill mappings.") |
| problem_to_skills[pid] = sorted(set(sids)) |
|
|
| return skills, problem_to_skills, fallback_skill_id |
|
|
|
|
| def build_question_content(problem_row: pd.Series) -> tuple[str, str]: |
| """Create question content and canonical correct answer from preprocessed fields.""" |
| body = _text(problem_row.get("cleaned body", "")).strip() |
| problem_type = _text(problem_row.get("Problem Type", "")).strip() |
| answer_options_formatted = _text( |
| problem_row.get("answer_options_formatted", "") |
| ).strip() |
| correct_answer = _text(problem_row.get("correct_answers", "")).strip() |
|
|
| body_parts: list[str] = [] |
| if body: |
| body_parts.append(body) |
| if problem_type: |
| body_parts.append(f"Problem Type: {problem_type}") |
| if answer_options_formatted: |
| body_parts.append(f"Answer Options:\n{answer_options_formatted}") |
|
|
| if not body_parts: |
| problem_id = problem_row.get("problem_id", "unknown") |
| return f"Problem {problem_id}", correct_answer |
|
|
| return "\n\n".join(body_parts), correct_answer |
|
|
|
|
| def load_questions( |
| problems_df: pd.DataFrame, |
| problem_to_skills: dict[int, list[int]], |
| fallback_skill_id: int, |
| ) -> tuple[list[dict], dict[int, str], int]: |
| """Build OEKT question objects from preprocessed Problems data.""" |
|
|
| questions: list[dict] = [] |
| problem_to_qid: dict[int, str] = {} |
| unmapped_questions = 0 |
|
|
| for row in problems_df.to_dict(orient="records"): |
| pid = _as_int(row["problem_id"]) |
| qid = f"q_{pid}" |
| skill_ids = problem_to_skills.get(pid, []) |
| if not skill_ids: |
| skill_ids = [fallback_skill_id] |
| unmapped_questions += 1 |
| content, correct_answer = build_question_content(pd.Series(row)) |
| question = { |
| "id": qid, |
| "content": content, |
| "skill_ids": skill_ids, |
| "rubrics": [ |
| { |
| "id": f"r_{pid}_0", |
| "description": (f"Match the correct answer: {correct_answer}"), |
| "skill_ids": skill_ids, |
| } |
| ], |
| } |
|
|
| questions.append(question) |
| problem_to_qid[pid] = qid |
|
|
| return questions, problem_to_qid, unmapped_questions |
|
|
|
|
| def load_interactions( |
| interactions_path: Path, |
| problem_meta_df: pd.DataFrame, |
| max_interactions: int | None = None, |
| ) -> pd.DataFrame: |
| """Load and normalize interaction logs used to build student trajectories.""" |
| usecols = [ |
| "id", |
| "problem_id", |
| "answer_text", |
| "discrete_score", |
| "end_time", |
| "user_id", |
| ] |
| df = pd.read_csv(interactions_path, usecols=usecols, low_memory=False) |
|
|
| df["problem_id"] = pd.to_numeric(df["problem_id"], errors="coerce") |
| df["discrete_score"] = pd.to_numeric(df["discrete_score"], errors="coerce") |
| df["id"] = pd.to_numeric(df["id"], errors="coerce") |
| df["end_time"] = pd.to_datetime(df["end_time"], errors="coerce", utc=True) |
|
|
| df = df.dropna(subset=["user_id", "problem_id", "discrete_score"]).copy() |
| df["user_id"] = df["user_id"].astype(str) |
| df["problem_id"] = df["problem_id"].astype(int) |
| df["id"] = df["id"].fillna(-1).astype(int) |
|
|
| answer_meta = problem_meta_df[ |
| ["problem_id", "Problem Type", "answer_options"] |
| ].copy() |
| df = df.merge(answer_meta, on="problem_id", how="left") |
|
|
| mc_types = {"Multiple Choice (select 1)", "Multiple Choice (select all)"} |
| df["answer_text"] = df.apply( |
| lambda row: ( |
| match_student_answer_to_letters(row["answer_text"], row["answer_options"]) |
| if _text(row.get("Problem Type", "")).strip() in mc_types |
| and isinstance(row.get("answer_options"), dict) |
| else _text(row["answer_text"]) |
| ), |
| axis=1, |
| ) |
|
|
| df = df.drop(columns=["Problem Type", "answer_options"]) |
|
|
| df = df.sort_values(["user_id", "end_time", "id"], kind="mergesort") |
| if max_interactions is not None: |
| if max_interactions <= 0: |
| raise ValueError("--max-interactions must be a positive integer.") |
| df = df.head(max_interactions).copy() |
| return df |
|
|
|
|
| def build_qmatrix(questions: list[dict], num_skills: int) -> list[list[float]]: |
| """Build a rubric x skill matrix consistent with question/rubric ordering.""" |
| qmatrix: list[list[float]] = [] |
| for question in questions: |
| for rubric in question["rubrics"]: |
| row = [0.0] * num_skills |
| for sid in rubric["skill_ids"]: |
| row[int(sid)] = 1.0 |
| qmatrix.append(row) |
| return qmatrix |
|
|
|
|
| def split_student_ids( |
| student_ids: list[str], |
| train_ratio: float, |
| valid_ratio: float, |
| test_ratio: float, |
| seed: int, |
| ) -> tuple[list[str], list[str], list[str]]: |
| """Create deterministic train/valid/test split lists at the student level.""" |
| if train_ratio < 0 or valid_ratio < 0 or test_ratio < 0: |
| raise ValueError("Split ratios must be non-negative.") |
|
|
| total = train_ratio + valid_ratio + test_ratio |
| if total <= 0: |
| raise ValueError("At least one split ratio must be > 0.") |
|
|
| ids = list(student_ids) |
| ids.sort() |
| rng = random.Random(seed) |
| rng.shuffle(ids) |
|
|
| train_count = int(len(ids) * (train_ratio / total)) |
| valid_count = int(len(ids) * (valid_ratio / total)) |
|
|
| train_ids = ids[:train_count] |
| valid_ids = ids[train_count : train_count + valid_count] |
| test_ids = ids[train_count + valid_count :] |
| return train_ids, valid_ids, test_ids |
|
|
|
|
| def get_calendar_group_key( |
| end_time: pd.Timestamp | None, |
| grouping_mode: GroupingMode, |
| missing_idx: int, |
| ) -> tuple[object, ...]: |
| """Return a stable calendar bucket key for an interaction timestamp.""" |
| if end_time is None: |
| return ("missing", missing_idx) |
|
|
| ts = end_time |
| if ts.tzinfo is None: |
| ts = ts.tz_localize("UTC") |
| else: |
| ts = ts.tz_convert("UTC") |
|
|
| if grouping_mode == "1h": |
| return ("1h", ts.year, ts.month, ts.day, ts.hour) |
| if grouping_mode == "halfday": |
| return ("halfday", ts.year, ts.month, ts.day, 0 if ts.hour < 12 else 1) |
| if grouping_mode == "day": |
| return ("day", ts.year, ts.month, ts.day) |
| if grouping_mode == "week": |
| iso = ts.isocalendar() |
| return ("week", int(iso.year), int(iso.week)) |
| if grouping_mode == "month": |
| return ("month", ts.year, ts.month) |
| if grouping_mode == "year": |
| return ("year", ts.year) |
|
|
| raise ValueError(f"Unsupported grouping mode: {grouping_mode}") |
|
|
|
|
| def write_dataset_json( |
| dataset_path: Path, |
| skills: list[dict], |
| questions: list[dict], |
| interactions_df: pd.DataFrame, |
| problem_to_qid: dict[int, str], |
| grouping_mode: GroupingMode = "none", |
| save_unmapped_skills: bool = False, |
| ) -> tuple[list[str], int, int, int, int]: |
| """Stream-write dataset.json while optionally grouping by calendar buckets.""" |
| dataset_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
| student_ids: list[str] = [] |
| num_students = 0 |
| num_time_steps = 0 |
| num_questions = 0 |
| skipped_interactions = 0 |
|
|
| with open(dataset_path, "w", encoding="utf-8") as f: |
| f.write("{") |
| f.write('"skills":') |
| if not save_unmapped_skills: |
| saving_skills = ( |
| skills[:-1] |
| if skills and skills[-1]["name"] == "UnmappedSkill" |
| else skills |
| ) |
| else: |
| saving_skills = skills |
| json.dump(saving_skills, f, ensure_ascii=False) |
| f.write(',"questions":') |
| json.dump(questions, f, ensure_ascii=False) |
| f.write(',"students":[') |
|
|
| first_student = True |
| for user_id, student_df in tqdm(interactions_df.groupby("user_id", sort=False)): |
| time_steps: list[dict] = [] |
| current_group_key: tuple[object, ...] | None = None |
|
|
| for row_idx, row in enumerate(student_df.itertuples(index=False)): |
| pid = _as_int(row.problem_id) |
| qid = problem_to_qid.get(pid) |
| if qid is None: |
| skipped_interactions += 1 |
| continue |
|
|
| score = 1 if _as_float(row.discrete_score) >= 1.0 else 0 |
| answer_text = _text(row.answer_text) |
| response = { |
| "question_id": qid, |
| "answer_text": answer_text, |
| "rubric_scores": [score], |
| } |
| num_questions += 1 |
|
|
| if grouping_mode == "none": |
| time_steps.append( |
| { |
| "t": len(time_steps), |
| "responses": [response], |
| } |
| ) |
| continue |
|
|
| row_end_time_raw = row.end_time |
| row_end_time: pd.Timestamp | None = ( |
| None |
| if pd.isna(row_end_time_raw) |
| else cast(pd.Timestamp, row_end_time_raw) |
| ) |
|
|
| group_key = get_calendar_group_key( |
| end_time=row_end_time, |
| grouping_mode=grouping_mode, |
| missing_idx=row_idx, |
| ) |
| if time_steps and current_group_key == group_key: |
| time_steps[-1]["responses"].append(response) |
| continue |
|
|
| time_steps.append( |
| { |
| "t": len(time_steps), |
| "responses": [response], |
| } |
| ) |
| current_group_key = group_key |
|
|
| if not time_steps: |
| continue |
|
|
| student_obj = { |
| "student_id": user_id, |
| "time_steps": time_steps, |
| } |
|
|
| if not first_student: |
| f.write(",") |
| json.dump(student_obj, f, ensure_ascii=False) |
| first_student = False |
|
|
| student_ids.append(str(user_id)) |
| num_students += 1 |
| num_time_steps += len(time_steps) |
|
|
| f.write("]}") |
|
|
| return ( |
| student_ids, |
| num_students, |
| num_time_steps, |
| num_questions, |
| skipped_interactions, |
| ) |
|
|
|
|
| def save_json(path: Path, obj: object) -> None: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| with open(path, "w", encoding="utf-8") as f: |
| json.dump(obj, f, indent=2, ensure_ascii=False) |
|
|
|
|
| def main() -> None: |
| args = parse_args() |
|
|
| data_dir = args.data_dir |
| output_dir = args.output_dir |
|
|
| interactions_path = data_dir / "Interactions.csv" |
| problems_path = data_dir / "Problems.csv" |
| skill_set_path = data_dir / "Skill_Set.csv" |
| skills_path = data_dir / "Skills.csv" |
|
|
| for p in [interactions_path, problems_path, skill_set_path, skills_path]: |
| if not p.exists(): |
| raise FileNotFoundError(f"Required input file not found: {p}") |
|
|
| print("Loading skills...") |
| skills, problem_to_skills, fallback_skill_id = load_skill_tables( |
| skills_path=skills_path, |
| skill_set_path=skill_set_path, |
| ) |
|
|
| print("Loading and preprocessing problems...") |
| problems_df = load_and_preprocess_problems(problems_path) |
|
|
| print("Loading questions...") |
| questions, problem_to_qid, unmapped_questions = load_questions( |
| problems_df=problems_df, |
| problem_to_skills=problem_to_skills, |
| fallback_skill_id=fallback_skill_id, |
| ) |
|
|
| print("Loading interactions...") |
| interactions_df = load_interactions( |
| interactions_path, |
| problem_meta_df=problems_df, |
| max_interactions=args.max_interactions, |
| ) |
|
|
| print("Writing dataset.json...") |
| dataset_path = output_dir / "dataset.json" |
| ( |
| student_ids, |
| num_students, |
| num_time_steps, |
| num_questions, |
| skipped_interactions, |
| ) = write_dataset_json( |
| dataset_path=dataset_path, |
| skills=skills, |
| questions=questions, |
| interactions_df=interactions_df, |
| problem_to_qid=problem_to_qid, |
| grouping_mode=args.grouping_time, |
| save_unmapped_skills=(unmapped_questions > 0), |
| ) |
|
|
| print("Building qmatrix.json...") |
| num_skills = len(skills) - int(unmapped_questions == 0) |
| qmatrix = build_qmatrix(questions, num_skills=num_skills) |
| save_json(output_dir / "qmatrix.json", qmatrix) |
|
|
| print("Building train/valid/test split files...") |
| train_ids, valid_ids, test_ids = split_student_ids( |
| student_ids=student_ids, |
| train_ratio=args.train_ratio, |
| valid_ratio=args.valid_ratio, |
| test_ratio=args.test_ratio, |
| seed=args.seed, |
| ) |
| save_json(output_dir / "trainset.json", train_ids) |
| save_json(output_dir / "validset.json", valid_ids) |
| save_json(output_dir / "testset.json", test_ids) |
|
|
| total_rubrics = sum(len(q["rubrics"]) for q in questions) |
| question_skill_counts = [len(q.get("skill_ids", [])) for q in questions] |
| rubric_skill_counts = [ |
| len(r.get("skill_ids", [])) for q in questions for r in q.get("rubrics", []) |
| ] |
| avg_skills_per_question = ( |
| sum(question_skill_counts) / len(question_skill_counts) |
| if question_skill_counts |
| else 0.0 |
| ) |
| avg_skills_per_rubric = ( |
| sum(rubric_skill_counts) / len(rubric_skill_counts) |
| if rubric_skill_counts |
| else 0.0 |
| ) |
| avg_time_steps_per_student = ( |
| num_time_steps / num_students if num_students > 0 else 0.0 |
| ) |
| avg_questions_per_timestep = ( |
| num_questions / num_time_steps if num_time_steps > 0 else 0.0 |
| ) |
|
|
| print("\n=== Conversion Summary ===") |
| print(f"Skills: {num_skills}") |
| print(f"Questions: {len(questions)}") |
| print(f"Rubrics: {total_rubrics}") |
| print(f"Avg skills/question: {avg_skills_per_question:.3f}") |
| print(f"Avg skills/rubric: {avg_skills_per_rubric:.3f}") |
| print(f"Students: {num_students}") |
| print(f"Time steps: {num_time_steps}") |
| print(f"Avg timesteps/student: {avg_time_steps_per_student:.3f}") |
| print(f"Avg questions/timestep: {avg_questions_per_timestep:.3f}") |
| print(f"Grouping mode: {args.grouping_time}") |
| print(f"Unmapped questions: {unmapped_questions}") |
| print(f"Skipped interactions:{skipped_interactions}") |
| print(f"Output directory: {output_dir}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|