FoundationalASSIST / Code /plot_totaltime_distribution.py
martinakaduc's picture
Upload folder using huggingface_hub
6256eb9 verified
#!/usr/bin/env python3
"""Plot distribution of per-student total time from first to last attempt.
This script reads FoundationalASSIST `Interactions.csv`, groups interactions by
student (`user_id`), computes each student's total time span from first to last
recorded attempt (`end_time`), discretizes these totals into bins, and plots
the resulting distribution.
"""
from __future__ import annotations
import argparse
import math
from pathlib import Path
import matplotlib.pyplot as plt
import pandas as pd
from matplotlib.ticker import FuncFormatter, MaxNLocator
DEFAULT_INTERACTIONS_PATH = (
Path(__file__).resolve().parent.parent / "Data" / "Interactions.csv"
)
DEFAULT_OUTPUT_PLOT = (
Path(__file__).resolve().parent.parent / "Results" / "total_time_distribution.png"
)
DEFAULT_OUTPUT_COUNTS = (
Path(__file__).resolve().parent.parent
/ "Results"
/ "total_time_distribution_counts.csv"
)
DEFAULT_CDF_MARKER_MINUTES = 1051200.0
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=(
"Compute per-student total time (first to last attempt) from "
"Interactions.csv and plot the binned distribution."
)
)
parser.add_argument(
"--interactions-path",
type=Path,
default=DEFAULT_INTERACTIONS_PATH,
help="Path to Interactions.csv.",
)
parser.add_argument(
"--output-plot",
type=Path,
default=DEFAULT_OUTPUT_PLOT,
help="Path to save the output figure.",
)
parser.add_argument(
"--output-counts",
type=Path,
default=DEFAULT_OUTPUT_COUNTS,
help="Path to save bin counts as CSV.",
)
parser.add_argument(
"--max-rows",
type=int,
default=None,
help="Optional cap on rows after sorting (for quick debugging).",
)
parser.add_argument(
"--keep-nonpositive-total-times",
action="store_true",
help=(
"Keep zero/negative total times. By default, only strictly "
"positive total times are used."
),
)
parser.add_argument(
"--log-y",
action="store_true",
help="Use log scale on y-axis.",
)
parser.add_argument(
"--plot-upper-limit-minutes",
type=float,
default=None,
help=(
"Optional upper limit for x-axis in minutes. "
"If omitted, uses the full range implied by bins."
),
)
parser.add_argument(
"--bin-time",
type=float,
default=None,
help=(
"Optional fixed bin width in minutes. "
"For example, --bin-time 60 creates hourly bins."
),
)
parser.add_argument(
"--cdf-marker-minutes",
type=float,
default=DEFAULT_CDF_MARKER_MINUTES,
help="Threshold (in minutes) for plotting cumulative probability marker.",
)
return parser.parse_args()
def load_interactions(path: Path, max_rows: int | None = None) -> pd.DataFrame:
"""Load minimum interaction fields required for timing analysis."""
usecols = ["id", "user_id", "end_time"]
df = pd.read_csv(path, usecols=usecols, low_memory=False)
df["id"] = pd.to_numeric(df["id"], errors="coerce")
df["id"] = df["id"].fillna(-1).astype(int)
df["user_id"] = df["user_id"].astype("string")
df["end_time"] = pd.to_datetime(df["end_time"], errors="coerce", utc=True)
df = df.dropna(subset=["user_id", "end_time"]).copy()
df = df.sort_values(["user_id", "end_time", "id"], kind="mergesort")
if max_rows is not None:
if max_rows <= 0:
raise ValueError("--max-rows must be a positive integer.")
df = df.head(max_rows).copy()
return df
def compute_student_total_times_minutes(df: pd.DataFrame) -> pd.Series:
"""Compute per-student total time span from first to last attempt."""
grouped = df.groupby("user_id", sort=False)["end_time"]
first_times = grouped.min()
last_times = grouped.max()
total_minutes = (last_times - first_times).dt.total_seconds() / 60.0
total_minutes.name = "total_time_minutes"
return total_minutes
def default_bin_edges_minutes() -> list[float]:
"""Base edges for total-time distribution in minutes."""
return [
0.0,
10.0,
30.0,
60.0,
180.0,
360.0,
720.0,
1440.0,
2880.0,
4320.0,
10080.0,
20160.0,
43200.0,
100800.0,
]
def build_bin_edges_minutes(valid: pd.Series) -> list[float]:
"""Build finite plotting edges for proportional-width histogram bars."""
edges = default_bin_edges_minutes()
base_tail_start = edges[-1]
max_total = float(valid.max())
if max_total > base_tail_start:
tail_edge = max(base_tail_start + 60.0, max_total * 1.05)
edges.append(tail_edge)
return edges
def build_fixed_width_bin_edges_minutes(
valid: pd.Series, bin_time_minutes: float
) -> list[float]:
"""Build fixed-width edges from min/max observed total times."""
min_total = float(valid.min())
max_total = float(valid.max())
start = bin_time_minutes * math.floor(min_total / bin_time_minutes)
end = bin_time_minutes * math.ceil(max_total / bin_time_minutes)
if math.isclose(start, 0.0, abs_tol=1e-12):
start = 0.0
if math.isclose(end, start, abs_tol=1e-12):
end = start + bin_time_minutes
n_bins = int(round((end - start) / bin_time_minutes))
edges = [start + i * bin_time_minutes for i in range(n_bins + 1)]
if edges[-1] <= max_total:
edges.append(edges[-1] + bin_time_minutes)
return edges
def format_bin_bound(minutes: float) -> str:
if math.isclose(minutes, round(minutes), abs_tol=1e-9):
return str(int(round(minutes)))
return f"{minutes:.2f}".rstrip("0").rstrip(".")
def make_bin_labels(
edges: list[float], open_tail_from: float | None = None
) -> list[str]:
labels: list[str] = []
last_idx = len(edges) - 2
for idx, (left, right) in enumerate(zip(edges[:-1], edges[1:])):
if open_tail_from is not None and idx == last_idx and left >= open_tail_from:
labels.append(f">= {format_bin_bound(left)} min")
else:
labels.append(f"[{format_bin_bound(left)}, {format_bin_bound(right)}) min")
return labels
def format_minutes_tick(value: float, _pos: float) -> str:
if value < 60:
return f"{value:.0f}m"
if value < 1440:
return f"{value / 60:.0f}h"
return f"{value / 1440:.0f}d"
def filter_valid_total_times(
total_times_minutes: pd.Series, keep_nonpositive: bool
) -> pd.Series:
valid = total_times_minutes.dropna().copy()
if not keep_nonpositive:
valid = valid[valid > 0]
return valid
def summarize_binned_distribution(
total_times_minutes: pd.Series,
keep_nonpositive: bool,
bin_time_minutes: float | None = None,
) -> pd.DataFrame:
valid = filter_valid_total_times(total_times_minutes, keep_nonpositive)
if valid.empty:
raise ValueError("No valid total times found after filtering.")
if bin_time_minutes is not None:
edges = build_fixed_width_bin_edges_minutes(valid, bin_time_minutes)
open_tail_from = None
else:
base_edges = default_bin_edges_minutes()
edges = build_bin_edges_minutes(valid)
open_tail_from = base_edges[-1] if len(edges) > len(base_edges) else None
labels = make_bin_labels(edges, open_tail_from=open_tail_from)
binned = pd.cut(valid, bins=edges, labels=labels, right=False, include_lowest=True)
counts = binned.value_counts(sort=False)
probabilities = (counts / counts.sum()).astype(float)
bin_left = pd.Series(edges[:-1], dtype=float)
bin_right = pd.Series(edges[1:], dtype=float)
bin_width = bin_right - bin_left
probabilities_np = probabilities.to_numpy(dtype=float)
density_per_min = probabilities_np / bin_width.to_numpy(dtype=float)
summary = pd.DataFrame(
{
"bin": counts.index.astype(str),
"bin_left_min": bin_left.to_numpy(),
"bin_right_min": bin_right.to_numpy(),
"bin_width_min": bin_width.to_numpy(),
"count": counts.values,
"probability": probabilities_np,
"percentage": probabilities_np * 100.0,
"density_per_min": density_per_min,
}
)
return summary
def cumulative_probability_at_minutes(
total_times_minutes: pd.Series,
threshold_minutes: float,
keep_nonpositive: bool,
) -> float:
valid = filter_valid_total_times(total_times_minutes, keep_nonpositive)
if valid.empty:
raise ValueError("No valid total times found after filtering.")
return float((valid <= threshold_minutes).mean())
def plot_distribution(
summary_df: pd.DataFrame,
output_path: Path,
log_y: bool = False,
plot_upper_limit_minutes: float | None = None,
cdf_marker_minutes: float = DEFAULT_CDF_MARKER_MINUTES,
cdf_at_marker: float | None = None,
) -> None:
"""Create and save a publication-ready total-time distribution histogram."""
output_path.parent.mkdir(parents=True, exist_ok=True)
plt.style.use("seaborn-v0_8-whitegrid")
fig, ax = plt.subplots(figsize=(20, 5))
left = summary_df["bin_left_min"].to_numpy(dtype=float)
width = summary_df["bin_width_min"].to_numpy(dtype=float)
height = summary_df["density_per_min"].to_numpy(dtype=float)
bars = ax.bar(
left,
height,
width=width,
align="edge",
color="#4C78A8",
# edgecolor="white",
# linewidth=1.0,
)
ax.set_title("Distribution of Student Total Time (First to Last Attempt)")
ax.set_xlabel("Total Time Per Student")
ax.set_ylabel("Probability Density (1/min)")
x_min = float(left.min())
x_max = float((left + width).max())
if plot_upper_limit_minutes is not None:
x_max = min(x_max, float(plot_upper_limit_minutes))
ax.set_xlim(x_min, x_max)
ax.xaxis.set_major_locator(MaxNLocator(nbins=9))
ax.xaxis.set_major_formatter(FuncFormatter(format_minutes_tick))
ax.grid(axis="y", alpha=0.25, linewidth=0.8)
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
if log_y:
ax.set_yscale("log")
marker_label = f"CDF <= {format_bin_bound(cdf_marker_minutes)} min"
if cdf_at_marker is not None:
marker_label = f"{marker_label}: {cdf_at_marker * 100:.1f}%"
ax.axvline(
cdf_marker_minutes,
color="#E45756",
linestyle="--",
linewidth=1.6,
label=marker_label,
)
ax.legend(loc="upper right", frameon=False, fontsize=9)
# Skip dense labeling when there are many bins to keep figure readable.
annotate_bars = len(summary_df) <= 40
if annotate_bars:
for bar, pct in zip(bars, summary_df["percentage"]):
if pct < 1.0:
continue
h = bar.get_height()
if h <= 0:
continue
ax.annotate(
f"{pct:.1f}%",
xy=(bar.get_x() + bar.get_width() / 2.0, h),
xytext=(0, 3),
textcoords="offset points",
ha="center",
va="bottom",
fontsize=8,
)
plt.tight_layout()
fig.savefig(output_path, dpi=400, bbox_inches="tight")
plt.close(fig)
def main() -> None:
args = parse_args()
if not args.interactions_path.exists():
raise FileNotFoundError(
f"Interactions file not found: {args.interactions_path}"
)
if args.plot_upper_limit_minutes is not None and args.plot_upper_limit_minutes <= 0:
raise ValueError("--plot-upper-limit-minutes must be a positive number.")
if args.bin_time is not None and args.bin_time <= 0:
raise ValueError("--bin-time must be a positive number.")
if args.cdf_marker_minutes <= 0:
raise ValueError("--cdf-marker-minutes must be a positive number.")
df = load_interactions(args.interactions_path, max_rows=args.max_rows)
total_times_minutes = compute_student_total_times_minutes(df)
summary = summarize_binned_distribution(
total_times_minutes,
keep_nonpositive=args.keep_nonpositive_total_times,
bin_time_minutes=args.bin_time,
)
summary.to_csv(args.output_counts, index=False)
cdf_at_marker = cumulative_probability_at_minutes(
total_times_minutes=total_times_minutes,
threshold_minutes=args.cdf_marker_minutes,
keep_nonpositive=args.keep_nonpositive_total_times,
)
plot_distribution(
summary,
args.output_plot,
log_y=args.log_y,
plot_upper_limit_minutes=args.plot_upper_limit_minutes,
cdf_marker_minutes=args.cdf_marker_minutes,
cdf_at_marker=cdf_at_marker,
)
total_students = int(df["user_id"].nunique())
students_used = int(
len(
filter_valid_total_times(
total_times_minutes,
keep_nonpositive=args.keep_nonpositive_total_times,
)
)
)
print("Done.")
print(f"Interactions loaded: {len(df):,}")
print(f"Students observed: {total_students:,}")
print(f"Students used in distribution: {students_used:,}")
if args.bin_time is not None:
print(f"Bin width (min): {args.bin_time}")
print(
f"Cumulative P(total_time <= {format_bin_bound(args.cdf_marker_minutes)} min): "
f"{cdf_at_marker * 100:.2f}%"
)
print(f"Saved plot: {args.output_plot}")
print(f"Saved bin counts: {args.output_counts}")
if __name__ == "__main__":
main()