#!/usr/bin/env python3 """Plot the distribution of time gaps between consecutive student attempts. This script reads FoundationalASSIST `Interactions.csv`, groups interactions by student (`user_id`), computes the time difference between each pair of consecutive attempts (`end_time`), discretizes these differences into bins, and plots the resulting distribution. """ from __future__ import annotations import argparse import math from pathlib import Path import matplotlib.pyplot as plt import pandas as pd from matplotlib.ticker import FuncFormatter, MaxNLocator DEFAULT_INTERACTIONS_PATH = ( Path(__file__).resolve().parent.parent / "Data" / "Interactions.csv" ) DEFAULT_OUTPUT_PLOT = ( Path(__file__).resolve().parent.parent / "Results" / "time_gap_distribution.png" ) DEFAULT_OUTPUT_COUNTS = ( Path(__file__).resolve().parent.parent / "Results" / "time_gap_distribution_counts.csv" ) CDF_MARKER_MINUTES = 60.0 def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description=( "Compute per-student consecutive-attempt time gaps from " "Interactions.csv and plot their binned distribution." ) ) parser.add_argument( "--interactions-path", type=Path, default=DEFAULT_INTERACTIONS_PATH, help="Path to Interactions.csv.", ) parser.add_argument( "--output-plot", type=Path, default=DEFAULT_OUTPUT_PLOT, help="Path to save the output figure.", ) parser.add_argument( "--output-counts", type=Path, default=DEFAULT_OUTPUT_COUNTS, help="Path to save bin counts as CSV.", ) parser.add_argument( "--max-rows", type=int, default=None, help="Optional cap on rows after sorting (for quick debugging).", ) parser.add_argument( "--keep-nonpositive-gaps", action="store_true", help=( "Keep zero/negative gaps. By default, only strictly positive " "gaps are used." ), ) parser.add_argument( "--log-y", action="store_true", help="Use log scale on y-axis.", ) parser.add_argument( "--plot-upper-limit-minutes", type=float, default=None, help=( "Optional upper limit for x-axis in minutes. " "If omitted, uses the full range implied by bins." ), ) parser.add_argument( "--bin-time", type=float, default=None, help=( "Optional fixed bin width in minutes. " "For example, --bin-time 10 creates bins [0,10), [10,20), ..." ), ) parser.add_argument( "--student-idx", type=int, default=None, help=( "Optional 0-based index of student to plot. Index is based on " "sorted unique user_id values in the loaded interactions." ), ) return parser.parse_args() def load_interactions(path: Path, max_rows: int | None = None) -> pd.DataFrame: """Load the minimum columns required for time-gap analysis.""" usecols = ["id", "user_id", "end_time"] df = pd.read_csv(path, usecols=usecols, low_memory=False) df["id"] = pd.to_numeric(df["id"], errors="coerce") df["id"] = df["id"].fillna(-1).astype(int) df["user_id"] = df["user_id"].astype("string") df["end_time"] = pd.to_datetime(df["end_time"], errors="coerce", utc=True) df = df.dropna(subset=["user_id", "end_time"]).copy() df = df.sort_values(["user_id", "end_time", "id"], kind="mergesort") if max_rows is not None: if max_rows <= 0: raise ValueError("--max-rows must be a positive integer.") df = df.head(max_rows).copy() return df def compute_time_gaps_minutes(df: pd.DataFrame) -> pd.Series: """Compute consecutive attempt gaps per student in minutes.""" gaps_seconds = ( df.groupby("user_id", sort=False)["end_time"].diff().dt.total_seconds() ) return gaps_seconds / 60.0 def default_bin_edges_minutes() -> list[float]: """Base minute-scale edges (final open tail may be added from data max).""" return [ 0.0, 1.0, 5.0, 10.0, 30.0, 60.0, 180.0, 720.0, 1440.0, 4320.0, 10080.0, ] def build_bin_edges_minutes(valid: pd.Series) -> list[float]: """Build finite plotting edges so bar widths are proportional on x-axis.""" edges = default_bin_edges_minutes() base_tail_start = edges[-1] max_gap = float(valid.max()) if max_gap > base_tail_start: # Add a finite terminal edge that fully contains the data tail. tail_edge = max(base_tail_start + 60.0, max_gap * 1.05) edges.append(tail_edge) return edges def build_fixed_width_bin_edges_minutes( valid: pd.Series, bin_time_minutes: float ) -> list[float]: """Build fixed-width edges from min/max observed gaps.""" min_gap = float(valid.min()) max_gap = float(valid.max()) start = bin_time_minutes * math.floor(min_gap / bin_time_minutes) end = bin_time_minutes * math.ceil(max_gap / bin_time_minutes) if math.isclose(start, 0.0, abs_tol=1e-12): start = 0.0 if math.isclose(end, start, abs_tol=1e-12): end = start + bin_time_minutes n_bins = int(round((end - start) / bin_time_minutes)) edges = [start + i * bin_time_minutes for i in range(n_bins + 1)] if edges[-1] <= max_gap: edges.append(edges[-1] + bin_time_minutes) return edges def format_bin_bound(minutes: float) -> str: if math.isclose(minutes, round(minutes), abs_tol=1e-9): return str(int(round(minutes))) return f"{minutes:.2f}".rstrip("0").rstrip(".") def make_bin_labels( edges: list[float], open_tail_from: float | None = None ) -> list[str]: labels: list[str] = [] last_idx = len(edges) - 2 for idx, (left, right) in enumerate(zip(edges[:-1], edges[1:])): if open_tail_from is not None and idx == last_idx and left >= open_tail_from: labels.append(f">= {format_bin_bound(left)} min") else: labels.append(f"[{format_bin_bound(left)}, {format_bin_bound(right)}) min") return labels def format_minutes_tick(value: float, _pos: float) -> str: if value < 60: return f"{int(round(value))}m" if value < 1440: hours = value / 60.0 if math.isclose(hours, round(hours), abs_tol=1e-9): return f"{int(round(hours))}h" return f"{hours:.1f}h" days = value / 1440.0 if math.isclose(days, round(days), abs_tol=1e-9): return f"{int(round(days))}d" return f"{days:.1f}d" def summarize_binned_distribution( gaps_minutes: pd.Series, keep_nonpositive: bool, bin_time_minutes: float | None = None, ) -> pd.DataFrame: valid = filter_valid_gaps(gaps_minutes, keep_nonpositive) if valid.empty: raise ValueError("No valid time gaps found after filtering.") if bin_time_minutes is not None: edges = build_fixed_width_bin_edges_minutes(valid, bin_time_minutes) open_tail_from = None else: base_edges = default_bin_edges_minutes() edges = build_bin_edges_minutes(valid) open_tail_from = base_edges[-1] if len(edges) > len(base_edges) else None labels = make_bin_labels(edges, open_tail_from=open_tail_from) binned = pd.cut(valid, bins=edges, labels=labels, right=False, include_lowest=True) counts = binned.value_counts(sort=False) probabilities = (counts / counts.sum()).astype(float) bin_left = pd.Series(edges[:-1], dtype=float) bin_right = pd.Series(edges[1:], dtype=float) bin_width = bin_right - bin_left probabilities_np = probabilities.to_numpy(dtype=float) density_per_min = probabilities_np / bin_width.to_numpy(dtype=float) summary = pd.DataFrame( { "bin": counts.index.astype(str), "bin_left_min": bin_left.to_numpy(), "bin_right_min": bin_right.to_numpy(), "bin_width_min": bin_width.to_numpy(), "count": counts.values, "probability": probabilities_np, "percentage": probabilities_np * 100.0, "density_per_min": density_per_min, } ) return summary def filter_valid_gaps(gaps_minutes: pd.Series, keep_nonpositive: bool) -> pd.Series: valid = gaps_minutes.dropna().copy() if not keep_nonpositive: valid = valid[valid > 0] return valid def cumulative_probability_at_minutes( gaps_minutes: pd.Series, threshold_minutes: float, keep_nonpositive: bool, ) -> float: valid = filter_valid_gaps(gaps_minutes, keep_nonpositive) if valid.empty: raise ValueError("No valid time gaps found after filtering.") return float((valid <= threshold_minutes).mean()) def select_student_by_index( df: pd.DataFrame, student_idx: int, ) -> tuple[pd.DataFrame, str, int]: """Select one student's interactions by 0-based index over unique IDs.""" student_ids = df["user_id"].drop_duplicates().tolist() total_students = len(student_ids) if total_students == 0: raise ValueError("No students found in loaded interactions.") if student_idx < 0 or student_idx >= total_students: raise ValueError( f"--student-idx must be in [0, {total_students - 1}], got {student_idx}." ) selected_student_id = str(student_ids[student_idx]) selected_df = df[df["user_id"] == selected_student_id].copy() return selected_df, selected_student_id, total_students def append_student_id_to_output_path(path: Path, student_id: str) -> Path: """Append a safe student-id suffix to output filename.""" safe_id = "".join( ch if ch.isalnum() or ch in ("-", "_") else "_" for ch in student_id ) return path.with_name(f"{path.stem}_{safe_id}{path.suffix}") def plot_distribution( summary_df: pd.DataFrame, output_path: Path, log_y: bool = False, plot_upper_limit_minutes: float | None = None, cdf_marker_minutes: float = CDF_MARKER_MINUTES, cdf_at_marker: float | None = None, student_idx: int | None = None, ) -> None: """Create and save a publication-ready distribution histogram.""" output_path.parent.mkdir(parents=True, exist_ok=True) plt.style.use("seaborn-v0_8-whitegrid") if student_idx is not None: fig, ax = plt.subplots(figsize=(10, 5)) else: fig, ax = plt.subplots(figsize=(20, 5)) left = summary_df["bin_left_min"].to_numpy(dtype=float) width = summary_df["bin_width_min"].to_numpy(dtype=float) height = summary_df["density_per_min"].to_numpy(dtype=float) bars = ax.bar( left, height, width=width, align="edge", color="#4C78A8", # edgecolor="white", # linewidth=1.0, ) title = "Distribution of Time Gaps Between Consecutive Attempts" if student_idx is not None: title = f"{title} (student_idx={student_idx})" ax.set_title(title) ax.set_xlabel("Time Gap") ax.set_ylabel("Probability Density (1/min)") x_min = float(left.min()) x_max = float((left + width).max()) if plot_upper_limit_minutes is not None: x_max = min(x_max, float(plot_upper_limit_minutes)) ax.set_xlim(x_min, x_max) ax.xaxis.set_major_locator(MaxNLocator(nbins=9)) ax.xaxis.set_major_formatter(FuncFormatter(format_minutes_tick)) ax.grid(axis="y", alpha=0.25, linewidth=0.8) ax.spines["top"].set_visible(False) ax.spines["right"].set_visible(False) if log_y: ax.set_yscale("log") marker_label = f"CDF <= {int(cdf_marker_minutes)} min" if cdf_at_marker is not None: marker_label = f"{marker_label}: {cdf_at_marker * 100:.1f}%" ax.axvline( cdf_marker_minutes, color="#E45756", linestyle="--", linewidth=1.6, label=marker_label, ) ax.legend(loc="upper right", frameon=False, fontsize=9) # Label non-trivial bins for readability in papers. for bar, pct in zip(bars, summary_df["percentage"]): if pct < 1.0: continue height = bar.get_height() if height <= 0: continue ax.annotate( f"{pct:.1f}%", xy=(bar.get_x() + bar.get_width() / 2.0, height), xytext=(0, 3), textcoords="offset points", ha="center", va="bottom", fontsize=8, ) plt.tight_layout() fig.savefig(output_path, dpi=400, bbox_inches="tight") plt.close(fig) def main() -> None: args = parse_args() if not args.interactions_path.exists(): raise FileNotFoundError( f"Interactions file not found: {args.interactions_path}" ) df = load_interactions(args.interactions_path, max_rows=args.max_rows) selected_student_id: str | None = None total_students = int(df["user_id"].nunique()) if args.student_idx is not None: df, selected_student_id, total_students = select_student_by_index( df, args.student_idx, ) output_plot_path = args.output_plot output_counts_path = args.output_counts if selected_student_id is not None: output_plot_path = append_student_id_to_output_path( output_plot_path, selected_student_id, ) output_counts_path = append_student_id_to_output_path( output_counts_path, selected_student_id, ) gaps_minutes = compute_time_gaps_minutes(df) if args.plot_upper_limit_minutes is not None and args.plot_upper_limit_minutes <= 0: raise ValueError("--plot-upper-limit-minutes must be a positive number.") if args.bin_time is not None and args.bin_time <= 0: raise ValueError("--bin-time must be a positive number.") summary = summarize_binned_distribution( gaps_minutes, keep_nonpositive=args.keep_nonpositive_gaps, bin_time_minutes=args.bin_time, ) output_counts_path.parent.mkdir(parents=True, exist_ok=True) summary.to_csv(output_counts_path, index=False) cdf_at_marker = cumulative_probability_at_minutes( gaps_minutes=gaps_minutes, threshold_minutes=CDF_MARKER_MINUTES, keep_nonpositive=args.keep_nonpositive_gaps, ) plot_distribution( summary, output_plot_path, log_y=args.log_y, plot_upper_limit_minutes=args.plot_upper_limit_minutes, cdf_marker_minutes=CDF_MARKER_MINUTES, cdf_at_marker=cdf_at_marker, student_idx=args.student_idx, ) total_pairs = int(summary["count"].sum()) print("Done.") print(f"Interactions loaded: {len(df):,}") print(f"Students in loaded data: {total_students:,}") if selected_student_id is not None: print(f"Selected student idx: {args.student_idx}") print(f"Selected student id: {selected_student_id}") print(f"Consecutive attempt pairs used: {total_pairs:,}") if args.bin_time is not None: print(f"Bin width (min): {args.bin_time}") print( f"Cumulative P(gap <= {int(CDF_MARKER_MINUTES)} min): " f"{cdf_at_marker * 100:.2f}%" ) print(f"Saved plot: {output_plot_path}") print(f"Saved bin counts: {output_counts_path}") if __name__ == "__main__": main()