#!/usr/bin/env python3 """Plot distribution of per-student total time from first to last attempt. This script reads FoundationalASSIST `Interactions.csv`, groups interactions by student (`user_id`), computes each student's total time span from first to last recorded attempt (`end_time`), discretizes these totals into bins, and plots the resulting distribution. """ from __future__ import annotations import argparse import math from pathlib import Path import matplotlib.pyplot as plt import pandas as pd from matplotlib.ticker import FuncFormatter, MaxNLocator DEFAULT_INTERACTIONS_PATH = ( Path(__file__).resolve().parent.parent / "Data" / "Interactions.csv" ) DEFAULT_OUTPUT_PLOT = ( Path(__file__).resolve().parent.parent / "Results" / "total_time_distribution.png" ) DEFAULT_OUTPUT_COUNTS = ( Path(__file__).resolve().parent.parent / "Results" / "total_time_distribution_counts.csv" ) DEFAULT_CDF_MARKER_MINUTES = 1051200.0 def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description=( "Compute per-student total time (first to last attempt) from " "Interactions.csv and plot the binned distribution." ) ) parser.add_argument( "--interactions-path", type=Path, default=DEFAULT_INTERACTIONS_PATH, help="Path to Interactions.csv.", ) parser.add_argument( "--output-plot", type=Path, default=DEFAULT_OUTPUT_PLOT, help="Path to save the output figure.", ) parser.add_argument( "--output-counts", type=Path, default=DEFAULT_OUTPUT_COUNTS, help="Path to save bin counts as CSV.", ) parser.add_argument( "--max-rows", type=int, default=None, help="Optional cap on rows after sorting (for quick debugging).", ) parser.add_argument( "--keep-nonpositive-total-times", action="store_true", help=( "Keep zero/negative total times. By default, only strictly " "positive total times are used." ), ) parser.add_argument( "--log-y", action="store_true", help="Use log scale on y-axis.", ) parser.add_argument( "--plot-upper-limit-minutes", type=float, default=None, help=( "Optional upper limit for x-axis in minutes. " "If omitted, uses the full range implied by bins." ), ) parser.add_argument( "--bin-time", type=float, default=None, help=( "Optional fixed bin width in minutes. " "For example, --bin-time 60 creates hourly bins." ), ) parser.add_argument( "--cdf-marker-minutes", type=float, default=DEFAULT_CDF_MARKER_MINUTES, help="Threshold (in minutes) for plotting cumulative probability marker.", ) return parser.parse_args() def load_interactions(path: Path, max_rows: int | None = None) -> pd.DataFrame: """Load minimum interaction fields required for timing analysis.""" usecols = ["id", "user_id", "end_time"] df = pd.read_csv(path, usecols=usecols, low_memory=False) df["id"] = pd.to_numeric(df["id"], errors="coerce") df["id"] = df["id"].fillna(-1).astype(int) df["user_id"] = df["user_id"].astype("string") df["end_time"] = pd.to_datetime(df["end_time"], errors="coerce", utc=True) df = df.dropna(subset=["user_id", "end_time"]).copy() df = df.sort_values(["user_id", "end_time", "id"], kind="mergesort") if max_rows is not None: if max_rows <= 0: raise ValueError("--max-rows must be a positive integer.") df = df.head(max_rows).copy() return df def compute_student_total_times_minutes(df: pd.DataFrame) -> pd.Series: """Compute per-student total time span from first to last attempt.""" grouped = df.groupby("user_id", sort=False)["end_time"] first_times = grouped.min() last_times = grouped.max() total_minutes = (last_times - first_times).dt.total_seconds() / 60.0 total_minutes.name = "total_time_minutes" return total_minutes def default_bin_edges_minutes() -> list[float]: """Base edges for total-time distribution in minutes.""" return [ 0.0, 10.0, 30.0, 60.0, 180.0, 360.0, 720.0, 1440.0, 2880.0, 4320.0, 10080.0, 20160.0, 43200.0, 100800.0, ] def build_bin_edges_minutes(valid: pd.Series) -> list[float]: """Build finite plotting edges for proportional-width histogram bars.""" edges = default_bin_edges_minutes() base_tail_start = edges[-1] max_total = float(valid.max()) if max_total > base_tail_start: tail_edge = max(base_tail_start + 60.0, max_total * 1.05) edges.append(tail_edge) return edges def build_fixed_width_bin_edges_minutes( valid: pd.Series, bin_time_minutes: float ) -> list[float]: """Build fixed-width edges from min/max observed total times.""" min_total = float(valid.min()) max_total = float(valid.max()) start = bin_time_minutes * math.floor(min_total / bin_time_minutes) end = bin_time_minutes * math.ceil(max_total / bin_time_minutes) if math.isclose(start, 0.0, abs_tol=1e-12): start = 0.0 if math.isclose(end, start, abs_tol=1e-12): end = start + bin_time_minutes n_bins = int(round((end - start) / bin_time_minutes)) edges = [start + i * bin_time_minutes for i in range(n_bins + 1)] if edges[-1] <= max_total: edges.append(edges[-1] + bin_time_minutes) return edges def format_bin_bound(minutes: float) -> str: if math.isclose(minutes, round(minutes), abs_tol=1e-9): return str(int(round(minutes))) return f"{minutes:.2f}".rstrip("0").rstrip(".") def make_bin_labels( edges: list[float], open_tail_from: float | None = None ) -> list[str]: labels: list[str] = [] last_idx = len(edges) - 2 for idx, (left, right) in enumerate(zip(edges[:-1], edges[1:])): if open_tail_from is not None and idx == last_idx and left >= open_tail_from: labels.append(f">= {format_bin_bound(left)} min") else: labels.append(f"[{format_bin_bound(left)}, {format_bin_bound(right)}) min") return labels def format_minutes_tick(value: float, _pos: float) -> str: if value < 60: return f"{value:.0f}m" if value < 1440: return f"{value / 60:.0f}h" return f"{value / 1440:.0f}d" def filter_valid_total_times( total_times_minutes: pd.Series, keep_nonpositive: bool ) -> pd.Series: valid = total_times_minutes.dropna().copy() if not keep_nonpositive: valid = valid[valid > 0] return valid def summarize_binned_distribution( total_times_minutes: pd.Series, keep_nonpositive: bool, bin_time_minutes: float | None = None, ) -> pd.DataFrame: valid = filter_valid_total_times(total_times_minutes, keep_nonpositive) if valid.empty: raise ValueError("No valid total times found after filtering.") if bin_time_minutes is not None: edges = build_fixed_width_bin_edges_minutes(valid, bin_time_minutes) open_tail_from = None else: base_edges = default_bin_edges_minutes() edges = build_bin_edges_minutes(valid) open_tail_from = base_edges[-1] if len(edges) > len(base_edges) else None labels = make_bin_labels(edges, open_tail_from=open_tail_from) binned = pd.cut(valid, bins=edges, labels=labels, right=False, include_lowest=True) counts = binned.value_counts(sort=False) probabilities = (counts / counts.sum()).astype(float) bin_left = pd.Series(edges[:-1], dtype=float) bin_right = pd.Series(edges[1:], dtype=float) bin_width = bin_right - bin_left probabilities_np = probabilities.to_numpy(dtype=float) density_per_min = probabilities_np / bin_width.to_numpy(dtype=float) summary = pd.DataFrame( { "bin": counts.index.astype(str), "bin_left_min": bin_left.to_numpy(), "bin_right_min": bin_right.to_numpy(), "bin_width_min": bin_width.to_numpy(), "count": counts.values, "probability": probabilities_np, "percentage": probabilities_np * 100.0, "density_per_min": density_per_min, } ) return summary def cumulative_probability_at_minutes( total_times_minutes: pd.Series, threshold_minutes: float, keep_nonpositive: bool, ) -> float: valid = filter_valid_total_times(total_times_minutes, keep_nonpositive) if valid.empty: raise ValueError("No valid total times found after filtering.") return float((valid <= threshold_minutes).mean()) def plot_distribution( summary_df: pd.DataFrame, output_path: Path, log_y: bool = False, plot_upper_limit_minutes: float | None = None, cdf_marker_minutes: float = DEFAULT_CDF_MARKER_MINUTES, cdf_at_marker: float | None = None, ) -> None: """Create and save a publication-ready total-time distribution histogram.""" output_path.parent.mkdir(parents=True, exist_ok=True) plt.style.use("seaborn-v0_8-whitegrid") fig, ax = plt.subplots(figsize=(20, 5)) left = summary_df["bin_left_min"].to_numpy(dtype=float) width = summary_df["bin_width_min"].to_numpy(dtype=float) height = summary_df["density_per_min"].to_numpy(dtype=float) bars = ax.bar( left, height, width=width, align="edge", color="#4C78A8", # edgecolor="white", # linewidth=1.0, ) ax.set_title("Distribution of Student Total Time (First to Last Attempt)") ax.set_xlabel("Total Time Per Student") ax.set_ylabel("Probability Density (1/min)") x_min = float(left.min()) x_max = float((left + width).max()) if plot_upper_limit_minutes is not None: x_max = min(x_max, float(plot_upper_limit_minutes)) ax.set_xlim(x_min, x_max) ax.xaxis.set_major_locator(MaxNLocator(nbins=9)) ax.xaxis.set_major_formatter(FuncFormatter(format_minutes_tick)) ax.grid(axis="y", alpha=0.25, linewidth=0.8) ax.spines["top"].set_visible(False) ax.spines["right"].set_visible(False) if log_y: ax.set_yscale("log") marker_label = f"CDF <= {format_bin_bound(cdf_marker_minutes)} min" if cdf_at_marker is not None: marker_label = f"{marker_label}: {cdf_at_marker * 100:.1f}%" ax.axvline( cdf_marker_minutes, color="#E45756", linestyle="--", linewidth=1.6, label=marker_label, ) ax.legend(loc="upper right", frameon=False, fontsize=9) # Skip dense labeling when there are many bins to keep figure readable. annotate_bars = len(summary_df) <= 40 if annotate_bars: for bar, pct in zip(bars, summary_df["percentage"]): if pct < 1.0: continue h = bar.get_height() if h <= 0: continue ax.annotate( f"{pct:.1f}%", xy=(bar.get_x() + bar.get_width() / 2.0, h), xytext=(0, 3), textcoords="offset points", ha="center", va="bottom", fontsize=8, ) plt.tight_layout() fig.savefig(output_path, dpi=400, bbox_inches="tight") plt.close(fig) def main() -> None: args = parse_args() if not args.interactions_path.exists(): raise FileNotFoundError( f"Interactions file not found: {args.interactions_path}" ) if args.plot_upper_limit_minutes is not None and args.plot_upper_limit_minutes <= 0: raise ValueError("--plot-upper-limit-minutes must be a positive number.") if args.bin_time is not None and args.bin_time <= 0: raise ValueError("--bin-time must be a positive number.") if args.cdf_marker_minutes <= 0: raise ValueError("--cdf-marker-minutes must be a positive number.") df = load_interactions(args.interactions_path, max_rows=args.max_rows) total_times_minutes = compute_student_total_times_minutes(df) summary = summarize_binned_distribution( total_times_minutes, keep_nonpositive=args.keep_nonpositive_total_times, bin_time_minutes=args.bin_time, ) summary.to_csv(args.output_counts, index=False) cdf_at_marker = cumulative_probability_at_minutes( total_times_minutes=total_times_minutes, threshold_minutes=args.cdf_marker_minutes, keep_nonpositive=args.keep_nonpositive_total_times, ) plot_distribution( summary, args.output_plot, log_y=args.log_y, plot_upper_limit_minutes=args.plot_upper_limit_minutes, cdf_marker_minutes=args.cdf_marker_minutes, cdf_at_marker=cdf_at_marker, ) total_students = int(df["user_id"].nunique()) students_used = int( len( filter_valid_total_times( total_times_minutes, keep_nonpositive=args.keep_nonpositive_total_times, ) ) ) print("Done.") print(f"Interactions loaded: {len(df):,}") print(f"Students observed: {total_students:,}") print(f"Students used in distribution: {students_used:,}") if args.bin_time is not None: print(f"Bin width (min): {args.bin_time}") print( f"Cumulative P(total_time <= {format_bin_bound(args.cdf_marker_minutes)} min): " f"{cdf_at_marker * 100:.2f}%" ) print(f"Saved plot: {args.output_plot}") print(f"Saved bin counts: {args.output_counts}") if __name__ == "__main__": main()