| |
| """Plot the distribution of time gaps between consecutive student attempts. |
| |
| This script reads FoundationalASSIST `Interactions.csv`, groups interactions by |
| student (`user_id`), computes the time difference between each pair of |
| consecutive attempts (`end_time`), discretizes these differences into bins, and |
| plots the resulting distribution. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import math |
| from pathlib import Path |
|
|
| import matplotlib.pyplot as plt |
| import pandas as pd |
| from matplotlib.ticker import FuncFormatter, MaxNLocator |
|
|
|
|
| DEFAULT_INTERACTIONS_PATH = ( |
| Path(__file__).resolve().parent.parent / "Data" / "Interactions.csv" |
| ) |
| DEFAULT_OUTPUT_PLOT = ( |
| Path(__file__).resolve().parent.parent / "Results" / "time_gap_distribution.png" |
| ) |
| DEFAULT_OUTPUT_COUNTS = ( |
| Path(__file__).resolve().parent.parent |
| / "Results" |
| / "time_gap_distribution_counts.csv" |
| ) |
| CDF_MARKER_MINUTES = 60.0 |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser( |
| description=( |
| "Compute per-student consecutive-attempt time gaps from " |
| "Interactions.csv and plot their binned distribution." |
| ) |
| ) |
| parser.add_argument( |
| "--interactions-path", |
| type=Path, |
| default=DEFAULT_INTERACTIONS_PATH, |
| help="Path to Interactions.csv.", |
| ) |
| parser.add_argument( |
| "--output-plot", |
| type=Path, |
| default=DEFAULT_OUTPUT_PLOT, |
| help="Path to save the output figure.", |
| ) |
| parser.add_argument( |
| "--output-counts", |
| type=Path, |
| default=DEFAULT_OUTPUT_COUNTS, |
| help="Path to save bin counts as CSV.", |
| ) |
| parser.add_argument( |
| "--max-rows", |
| type=int, |
| default=None, |
| help="Optional cap on rows after sorting (for quick debugging).", |
| ) |
| parser.add_argument( |
| "--keep-nonpositive-gaps", |
| action="store_true", |
| help=( |
| "Keep zero/negative gaps. By default, only strictly positive " |
| "gaps are used." |
| ), |
| ) |
| parser.add_argument( |
| "--log-y", |
| action="store_true", |
| help="Use log scale on y-axis.", |
| ) |
| parser.add_argument( |
| "--plot-upper-limit-minutes", |
| type=float, |
| default=None, |
| help=( |
| "Optional upper limit for x-axis in minutes. " |
| "If omitted, uses the full range implied by bins." |
| ), |
| ) |
| parser.add_argument( |
| "--bin-time", |
| type=float, |
| default=None, |
| help=( |
| "Optional fixed bin width in minutes. " |
| "For example, --bin-time 10 creates bins [0,10), [10,20), ..." |
| ), |
| ) |
| parser.add_argument( |
| "--student-idx", |
| type=int, |
| default=None, |
| help=( |
| "Optional 0-based index of student to plot. Index is based on " |
| "sorted unique user_id values in the loaded interactions." |
| ), |
| ) |
| return parser.parse_args() |
|
|
|
|
| def load_interactions(path: Path, max_rows: int | None = None) -> pd.DataFrame: |
| """Load the minimum columns required for time-gap analysis.""" |
| usecols = ["id", "user_id", "end_time"] |
| df = pd.read_csv(path, usecols=usecols, low_memory=False) |
|
|
| df["id"] = pd.to_numeric(df["id"], errors="coerce") |
| df["id"] = df["id"].fillna(-1).astype(int) |
| df["user_id"] = df["user_id"].astype("string") |
| df["end_time"] = pd.to_datetime(df["end_time"], errors="coerce", utc=True) |
|
|
| df = df.dropna(subset=["user_id", "end_time"]).copy() |
| df = df.sort_values(["user_id", "end_time", "id"], kind="mergesort") |
|
|
| if max_rows is not None: |
| if max_rows <= 0: |
| raise ValueError("--max-rows must be a positive integer.") |
| df = df.head(max_rows).copy() |
|
|
| return df |
|
|
|
|
| def compute_time_gaps_minutes(df: pd.DataFrame) -> pd.Series: |
| """Compute consecutive attempt gaps per student in minutes.""" |
| gaps_seconds = ( |
| df.groupby("user_id", sort=False)["end_time"].diff().dt.total_seconds() |
| ) |
| return gaps_seconds / 60.0 |
|
|
|
|
| def default_bin_edges_minutes() -> list[float]: |
| """Base minute-scale edges (final open tail may be added from data max).""" |
| return [ |
| 0.0, |
| 1.0, |
| 5.0, |
| 10.0, |
| 30.0, |
| 60.0, |
| 180.0, |
| 720.0, |
| 1440.0, |
| 4320.0, |
| 10080.0, |
| ] |
|
|
|
|
| def build_bin_edges_minutes(valid: pd.Series) -> list[float]: |
| """Build finite plotting edges so bar widths are proportional on x-axis.""" |
| edges = default_bin_edges_minutes() |
| base_tail_start = edges[-1] |
| max_gap = float(valid.max()) |
|
|
| if max_gap > base_tail_start: |
| |
| tail_edge = max(base_tail_start + 60.0, max_gap * 1.05) |
| edges.append(tail_edge) |
|
|
| return edges |
|
|
|
|
| def build_fixed_width_bin_edges_minutes( |
| valid: pd.Series, bin_time_minutes: float |
| ) -> list[float]: |
| """Build fixed-width edges from min/max observed gaps.""" |
| min_gap = float(valid.min()) |
| max_gap = float(valid.max()) |
|
|
| start = bin_time_minutes * math.floor(min_gap / bin_time_minutes) |
| end = bin_time_minutes * math.ceil(max_gap / bin_time_minutes) |
|
|
| if math.isclose(start, 0.0, abs_tol=1e-12): |
| start = 0.0 |
| if math.isclose(end, start, abs_tol=1e-12): |
| end = start + bin_time_minutes |
|
|
| n_bins = int(round((end - start) / bin_time_minutes)) |
| edges = [start + i * bin_time_minutes for i in range(n_bins + 1)] |
| if edges[-1] <= max_gap: |
| edges.append(edges[-1] + bin_time_minutes) |
|
|
| return edges |
|
|
|
|
| def format_bin_bound(minutes: float) -> str: |
| if math.isclose(minutes, round(minutes), abs_tol=1e-9): |
| return str(int(round(minutes))) |
| return f"{minutes:.2f}".rstrip("0").rstrip(".") |
|
|
|
|
| def make_bin_labels( |
| edges: list[float], open_tail_from: float | None = None |
| ) -> list[str]: |
| labels: list[str] = [] |
| last_idx = len(edges) - 2 |
| for idx, (left, right) in enumerate(zip(edges[:-1], edges[1:])): |
| if open_tail_from is not None and idx == last_idx and left >= open_tail_from: |
| labels.append(f">= {format_bin_bound(left)} min") |
| else: |
| labels.append(f"[{format_bin_bound(left)}, {format_bin_bound(right)}) min") |
| return labels |
|
|
|
|
| def format_minutes_tick(value: float, _pos: float) -> str: |
| if value < 60: |
| return f"{int(round(value))}m" |
|
|
| if value < 1440: |
| hours = value / 60.0 |
| if math.isclose(hours, round(hours), abs_tol=1e-9): |
| return f"{int(round(hours))}h" |
| return f"{hours:.1f}h" |
|
|
| days = value / 1440.0 |
| if math.isclose(days, round(days), abs_tol=1e-9): |
| return f"{int(round(days))}d" |
| return f"{days:.1f}d" |
|
|
|
|
| def summarize_binned_distribution( |
| gaps_minutes: pd.Series, |
| keep_nonpositive: bool, |
| bin_time_minutes: float | None = None, |
| ) -> pd.DataFrame: |
| valid = filter_valid_gaps(gaps_minutes, keep_nonpositive) |
|
|
| if valid.empty: |
| raise ValueError("No valid time gaps found after filtering.") |
|
|
| if bin_time_minutes is not None: |
| edges = build_fixed_width_bin_edges_minutes(valid, bin_time_minutes) |
| open_tail_from = None |
| else: |
| base_edges = default_bin_edges_minutes() |
| edges = build_bin_edges_minutes(valid) |
| open_tail_from = base_edges[-1] if len(edges) > len(base_edges) else None |
|
|
| labels = make_bin_labels(edges, open_tail_from=open_tail_from) |
| binned = pd.cut(valid, bins=edges, labels=labels, right=False, include_lowest=True) |
|
|
| counts = binned.value_counts(sort=False) |
| probabilities = (counts / counts.sum()).astype(float) |
|
|
| bin_left = pd.Series(edges[:-1], dtype=float) |
| bin_right = pd.Series(edges[1:], dtype=float) |
| bin_width = bin_right - bin_left |
| probabilities_np = probabilities.to_numpy(dtype=float) |
| density_per_min = probabilities_np / bin_width.to_numpy(dtype=float) |
|
|
| summary = pd.DataFrame( |
| { |
| "bin": counts.index.astype(str), |
| "bin_left_min": bin_left.to_numpy(), |
| "bin_right_min": bin_right.to_numpy(), |
| "bin_width_min": bin_width.to_numpy(), |
| "count": counts.values, |
| "probability": probabilities_np, |
| "percentage": probabilities_np * 100.0, |
| "density_per_min": density_per_min, |
| } |
| ) |
| return summary |
|
|
|
|
| def filter_valid_gaps(gaps_minutes: pd.Series, keep_nonpositive: bool) -> pd.Series: |
| valid = gaps_minutes.dropna().copy() |
| if not keep_nonpositive: |
| valid = valid[valid > 0] |
| return valid |
|
|
|
|
| def cumulative_probability_at_minutes( |
| gaps_minutes: pd.Series, |
| threshold_minutes: float, |
| keep_nonpositive: bool, |
| ) -> float: |
| valid = filter_valid_gaps(gaps_minutes, keep_nonpositive) |
| if valid.empty: |
| raise ValueError("No valid time gaps found after filtering.") |
| return float((valid <= threshold_minutes).mean()) |
|
|
|
|
| def select_student_by_index( |
| df: pd.DataFrame, |
| student_idx: int, |
| ) -> tuple[pd.DataFrame, str, int]: |
| """Select one student's interactions by 0-based index over unique IDs.""" |
| student_ids = df["user_id"].drop_duplicates().tolist() |
| total_students = len(student_ids) |
|
|
| if total_students == 0: |
| raise ValueError("No students found in loaded interactions.") |
| if student_idx < 0 or student_idx >= total_students: |
| raise ValueError( |
| f"--student-idx must be in [0, {total_students - 1}], got {student_idx}." |
| ) |
|
|
| selected_student_id = str(student_ids[student_idx]) |
| selected_df = df[df["user_id"] == selected_student_id].copy() |
| return selected_df, selected_student_id, total_students |
|
|
|
|
| def append_student_id_to_output_path(path: Path, student_id: str) -> Path: |
| """Append a safe student-id suffix to output filename.""" |
| safe_id = "".join( |
| ch if ch.isalnum() or ch in ("-", "_") else "_" for ch in student_id |
| ) |
| return path.with_name(f"{path.stem}_{safe_id}{path.suffix}") |
|
|
|
|
| def plot_distribution( |
| summary_df: pd.DataFrame, |
| output_path: Path, |
| log_y: bool = False, |
| plot_upper_limit_minutes: float | None = None, |
| cdf_marker_minutes: float = CDF_MARKER_MINUTES, |
| cdf_at_marker: float | None = None, |
| student_idx: int | None = None, |
| ) -> None: |
| """Create and save a publication-ready distribution histogram.""" |
| output_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
| plt.style.use("seaborn-v0_8-whitegrid") |
|
|
| if student_idx is not None: |
| fig, ax = plt.subplots(figsize=(10, 5)) |
| else: |
| fig, ax = plt.subplots(figsize=(20, 5)) |
|
|
| left = summary_df["bin_left_min"].to_numpy(dtype=float) |
| width = summary_df["bin_width_min"].to_numpy(dtype=float) |
| height = summary_df["density_per_min"].to_numpy(dtype=float) |
|
|
| bars = ax.bar( |
| left, |
| height, |
| width=width, |
| align="edge", |
| color="#4C78A8", |
| |
| |
| ) |
|
|
| title = "Distribution of Time Gaps Between Consecutive Attempts" |
| if student_idx is not None: |
| title = f"{title} (student_idx={student_idx})" |
| ax.set_title(title) |
| ax.set_xlabel("Time Gap") |
| ax.set_ylabel("Probability Density (1/min)") |
| x_min = float(left.min()) |
| x_max = float((left + width).max()) |
| if plot_upper_limit_minutes is not None: |
| x_max = min(x_max, float(plot_upper_limit_minutes)) |
| ax.set_xlim(x_min, x_max) |
| ax.xaxis.set_major_locator(MaxNLocator(nbins=9)) |
| ax.xaxis.set_major_formatter(FuncFormatter(format_minutes_tick)) |
| ax.grid(axis="y", alpha=0.25, linewidth=0.8) |
| ax.spines["top"].set_visible(False) |
| ax.spines["right"].set_visible(False) |
|
|
| if log_y: |
| ax.set_yscale("log") |
|
|
| marker_label = f"CDF <= {int(cdf_marker_minutes)} min" |
| if cdf_at_marker is not None: |
| marker_label = f"{marker_label}: {cdf_at_marker * 100:.1f}%" |
| ax.axvline( |
| cdf_marker_minutes, |
| color="#E45756", |
| linestyle="--", |
| linewidth=1.6, |
| label=marker_label, |
| ) |
| ax.legend(loc="upper right", frameon=False, fontsize=9) |
|
|
| |
| for bar, pct in zip(bars, summary_df["percentage"]): |
| if pct < 1.0: |
| continue |
| height = bar.get_height() |
| if height <= 0: |
| continue |
| ax.annotate( |
| f"{pct:.1f}%", |
| xy=(bar.get_x() + bar.get_width() / 2.0, height), |
| xytext=(0, 3), |
| textcoords="offset points", |
| ha="center", |
| va="bottom", |
| fontsize=8, |
| ) |
|
|
| plt.tight_layout() |
| fig.savefig(output_path, dpi=400, bbox_inches="tight") |
| plt.close(fig) |
|
|
|
|
| def main() -> None: |
| args = parse_args() |
|
|
| if not args.interactions_path.exists(): |
| raise FileNotFoundError( |
| f"Interactions file not found: {args.interactions_path}" |
| ) |
|
|
| df = load_interactions(args.interactions_path, max_rows=args.max_rows) |
| selected_student_id: str | None = None |
| total_students = int(df["user_id"].nunique()) |
| if args.student_idx is not None: |
| df, selected_student_id, total_students = select_student_by_index( |
| df, |
| args.student_idx, |
| ) |
|
|
| output_plot_path = args.output_plot |
| output_counts_path = args.output_counts |
| if selected_student_id is not None: |
| output_plot_path = append_student_id_to_output_path( |
| output_plot_path, |
| selected_student_id, |
| ) |
| output_counts_path = append_student_id_to_output_path( |
| output_counts_path, |
| selected_student_id, |
| ) |
|
|
| gaps_minutes = compute_time_gaps_minutes(df) |
|
|
| if args.plot_upper_limit_minutes is not None and args.plot_upper_limit_minutes <= 0: |
| raise ValueError("--plot-upper-limit-minutes must be a positive number.") |
| if args.bin_time is not None and args.bin_time <= 0: |
| raise ValueError("--bin-time must be a positive number.") |
|
|
| summary = summarize_binned_distribution( |
| gaps_minutes, |
| keep_nonpositive=args.keep_nonpositive_gaps, |
| bin_time_minutes=args.bin_time, |
| ) |
| output_counts_path.parent.mkdir(parents=True, exist_ok=True) |
| summary.to_csv(output_counts_path, index=False) |
|
|
| cdf_at_marker = cumulative_probability_at_minutes( |
| gaps_minutes=gaps_minutes, |
| threshold_minutes=CDF_MARKER_MINUTES, |
| keep_nonpositive=args.keep_nonpositive_gaps, |
| ) |
|
|
| plot_distribution( |
| summary, |
| output_plot_path, |
| log_y=args.log_y, |
| plot_upper_limit_minutes=args.plot_upper_limit_minutes, |
| cdf_marker_minutes=CDF_MARKER_MINUTES, |
| cdf_at_marker=cdf_at_marker, |
| student_idx=args.student_idx, |
| ) |
|
|
| total_pairs = int(summary["count"].sum()) |
| print("Done.") |
| print(f"Interactions loaded: {len(df):,}") |
| print(f"Students in loaded data: {total_students:,}") |
| if selected_student_id is not None: |
| print(f"Selected student idx: {args.student_idx}") |
| print(f"Selected student id: {selected_student_id}") |
| print(f"Consecutive attempt pairs used: {total_pairs:,}") |
| if args.bin_time is not None: |
| print(f"Bin width (min): {args.bin_time}") |
| print( |
| f"Cumulative P(gap <= {int(CDF_MARKER_MINUTES)} min): " |
| f"{cdf_at_marker * 100:.2f}%" |
| ) |
| print(f"Saved plot: {output_plot_path}") |
| print(f"Saved bin counts: {output_counts_path}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|