FoundationalASSIST / Code /plot_timegap_distribution.py
martinakaduc's picture
Upload folder using huggingface_hub
6256eb9 verified
#!/usr/bin/env python3
"""Plot the distribution of time gaps between consecutive student attempts.
This script reads FoundationalASSIST `Interactions.csv`, groups interactions by
student (`user_id`), computes the time difference between each pair of
consecutive attempts (`end_time`), discretizes these differences into bins, and
plots the resulting distribution.
"""
from __future__ import annotations
import argparse
import math
from pathlib import Path
import matplotlib.pyplot as plt
import pandas as pd
from matplotlib.ticker import FuncFormatter, MaxNLocator
DEFAULT_INTERACTIONS_PATH = (
Path(__file__).resolve().parent.parent / "Data" / "Interactions.csv"
)
DEFAULT_OUTPUT_PLOT = (
Path(__file__).resolve().parent.parent / "Results" / "time_gap_distribution.png"
)
DEFAULT_OUTPUT_COUNTS = (
Path(__file__).resolve().parent.parent
/ "Results"
/ "time_gap_distribution_counts.csv"
)
CDF_MARKER_MINUTES = 60.0
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=(
"Compute per-student consecutive-attempt time gaps from "
"Interactions.csv and plot their binned distribution."
)
)
parser.add_argument(
"--interactions-path",
type=Path,
default=DEFAULT_INTERACTIONS_PATH,
help="Path to Interactions.csv.",
)
parser.add_argument(
"--output-plot",
type=Path,
default=DEFAULT_OUTPUT_PLOT,
help="Path to save the output figure.",
)
parser.add_argument(
"--output-counts",
type=Path,
default=DEFAULT_OUTPUT_COUNTS,
help="Path to save bin counts as CSV.",
)
parser.add_argument(
"--max-rows",
type=int,
default=None,
help="Optional cap on rows after sorting (for quick debugging).",
)
parser.add_argument(
"--keep-nonpositive-gaps",
action="store_true",
help=(
"Keep zero/negative gaps. By default, only strictly positive "
"gaps are used."
),
)
parser.add_argument(
"--log-y",
action="store_true",
help="Use log scale on y-axis.",
)
parser.add_argument(
"--plot-upper-limit-minutes",
type=float,
default=None,
help=(
"Optional upper limit for x-axis in minutes. "
"If omitted, uses the full range implied by bins."
),
)
parser.add_argument(
"--bin-time",
type=float,
default=None,
help=(
"Optional fixed bin width in minutes. "
"For example, --bin-time 10 creates bins [0,10), [10,20), ..."
),
)
parser.add_argument(
"--student-idx",
type=int,
default=None,
help=(
"Optional 0-based index of student to plot. Index is based on "
"sorted unique user_id values in the loaded interactions."
),
)
return parser.parse_args()
def load_interactions(path: Path, max_rows: int | None = None) -> pd.DataFrame:
"""Load the minimum columns required for time-gap analysis."""
usecols = ["id", "user_id", "end_time"]
df = pd.read_csv(path, usecols=usecols, low_memory=False)
df["id"] = pd.to_numeric(df["id"], errors="coerce")
df["id"] = df["id"].fillna(-1).astype(int)
df["user_id"] = df["user_id"].astype("string")
df["end_time"] = pd.to_datetime(df["end_time"], errors="coerce", utc=True)
df = df.dropna(subset=["user_id", "end_time"]).copy()
df = df.sort_values(["user_id", "end_time", "id"], kind="mergesort")
if max_rows is not None:
if max_rows <= 0:
raise ValueError("--max-rows must be a positive integer.")
df = df.head(max_rows).copy()
return df
def compute_time_gaps_minutes(df: pd.DataFrame) -> pd.Series:
"""Compute consecutive attempt gaps per student in minutes."""
gaps_seconds = (
df.groupby("user_id", sort=False)["end_time"].diff().dt.total_seconds()
)
return gaps_seconds / 60.0
def default_bin_edges_minutes() -> list[float]:
"""Base minute-scale edges (final open tail may be added from data max)."""
return [
0.0,
1.0,
5.0,
10.0,
30.0,
60.0,
180.0,
720.0,
1440.0,
4320.0,
10080.0,
]
def build_bin_edges_minutes(valid: pd.Series) -> list[float]:
"""Build finite plotting edges so bar widths are proportional on x-axis."""
edges = default_bin_edges_minutes()
base_tail_start = edges[-1]
max_gap = float(valid.max())
if max_gap > base_tail_start:
# Add a finite terminal edge that fully contains the data tail.
tail_edge = max(base_tail_start + 60.0, max_gap * 1.05)
edges.append(tail_edge)
return edges
def build_fixed_width_bin_edges_minutes(
valid: pd.Series, bin_time_minutes: float
) -> list[float]:
"""Build fixed-width edges from min/max observed gaps."""
min_gap = float(valid.min())
max_gap = float(valid.max())
start = bin_time_minutes * math.floor(min_gap / bin_time_minutes)
end = bin_time_minutes * math.ceil(max_gap / bin_time_minutes)
if math.isclose(start, 0.0, abs_tol=1e-12):
start = 0.0
if math.isclose(end, start, abs_tol=1e-12):
end = start + bin_time_minutes
n_bins = int(round((end - start) / bin_time_minutes))
edges = [start + i * bin_time_minutes for i in range(n_bins + 1)]
if edges[-1] <= max_gap:
edges.append(edges[-1] + bin_time_minutes)
return edges
def format_bin_bound(minutes: float) -> str:
if math.isclose(minutes, round(minutes), abs_tol=1e-9):
return str(int(round(minutes)))
return f"{minutes:.2f}".rstrip("0").rstrip(".")
def make_bin_labels(
edges: list[float], open_tail_from: float | None = None
) -> list[str]:
labels: list[str] = []
last_idx = len(edges) - 2
for idx, (left, right) in enumerate(zip(edges[:-1], edges[1:])):
if open_tail_from is not None and idx == last_idx and left >= open_tail_from:
labels.append(f">= {format_bin_bound(left)} min")
else:
labels.append(f"[{format_bin_bound(left)}, {format_bin_bound(right)}) min")
return labels
def format_minutes_tick(value: float, _pos: float) -> str:
if value < 60:
return f"{int(round(value))}m"
if value < 1440:
hours = value / 60.0
if math.isclose(hours, round(hours), abs_tol=1e-9):
return f"{int(round(hours))}h"
return f"{hours:.1f}h"
days = value / 1440.0
if math.isclose(days, round(days), abs_tol=1e-9):
return f"{int(round(days))}d"
return f"{days:.1f}d"
def summarize_binned_distribution(
gaps_minutes: pd.Series,
keep_nonpositive: bool,
bin_time_minutes: float | None = None,
) -> pd.DataFrame:
valid = filter_valid_gaps(gaps_minutes, keep_nonpositive)
if valid.empty:
raise ValueError("No valid time gaps found after filtering.")
if bin_time_minutes is not None:
edges = build_fixed_width_bin_edges_minutes(valid, bin_time_minutes)
open_tail_from = None
else:
base_edges = default_bin_edges_minutes()
edges = build_bin_edges_minutes(valid)
open_tail_from = base_edges[-1] if len(edges) > len(base_edges) else None
labels = make_bin_labels(edges, open_tail_from=open_tail_from)
binned = pd.cut(valid, bins=edges, labels=labels, right=False, include_lowest=True)
counts = binned.value_counts(sort=False)
probabilities = (counts / counts.sum()).astype(float)
bin_left = pd.Series(edges[:-1], dtype=float)
bin_right = pd.Series(edges[1:], dtype=float)
bin_width = bin_right - bin_left
probabilities_np = probabilities.to_numpy(dtype=float)
density_per_min = probabilities_np / bin_width.to_numpy(dtype=float)
summary = pd.DataFrame(
{
"bin": counts.index.astype(str),
"bin_left_min": bin_left.to_numpy(),
"bin_right_min": bin_right.to_numpy(),
"bin_width_min": bin_width.to_numpy(),
"count": counts.values,
"probability": probabilities_np,
"percentage": probabilities_np * 100.0,
"density_per_min": density_per_min,
}
)
return summary
def filter_valid_gaps(gaps_minutes: pd.Series, keep_nonpositive: bool) -> pd.Series:
valid = gaps_minutes.dropna().copy()
if not keep_nonpositive:
valid = valid[valid > 0]
return valid
def cumulative_probability_at_minutes(
gaps_minutes: pd.Series,
threshold_minutes: float,
keep_nonpositive: bool,
) -> float:
valid = filter_valid_gaps(gaps_minutes, keep_nonpositive)
if valid.empty:
raise ValueError("No valid time gaps found after filtering.")
return float((valid <= threshold_minutes).mean())
def select_student_by_index(
df: pd.DataFrame,
student_idx: int,
) -> tuple[pd.DataFrame, str, int]:
"""Select one student's interactions by 0-based index over unique IDs."""
student_ids = df["user_id"].drop_duplicates().tolist()
total_students = len(student_ids)
if total_students == 0:
raise ValueError("No students found in loaded interactions.")
if student_idx < 0 or student_idx >= total_students:
raise ValueError(
f"--student-idx must be in [0, {total_students - 1}], got {student_idx}."
)
selected_student_id = str(student_ids[student_idx])
selected_df = df[df["user_id"] == selected_student_id].copy()
return selected_df, selected_student_id, total_students
def append_student_id_to_output_path(path: Path, student_id: str) -> Path:
"""Append a safe student-id suffix to output filename."""
safe_id = "".join(
ch if ch.isalnum() or ch in ("-", "_") else "_" for ch in student_id
)
return path.with_name(f"{path.stem}_{safe_id}{path.suffix}")
def plot_distribution(
summary_df: pd.DataFrame,
output_path: Path,
log_y: bool = False,
plot_upper_limit_minutes: float | None = None,
cdf_marker_minutes: float = CDF_MARKER_MINUTES,
cdf_at_marker: float | None = None,
student_idx: int | None = None,
) -> None:
"""Create and save a publication-ready distribution histogram."""
output_path.parent.mkdir(parents=True, exist_ok=True)
plt.style.use("seaborn-v0_8-whitegrid")
if student_idx is not None:
fig, ax = plt.subplots(figsize=(10, 5))
else:
fig, ax = plt.subplots(figsize=(20, 5))
left = summary_df["bin_left_min"].to_numpy(dtype=float)
width = summary_df["bin_width_min"].to_numpy(dtype=float)
height = summary_df["density_per_min"].to_numpy(dtype=float)
bars = ax.bar(
left,
height,
width=width,
align="edge",
color="#4C78A8",
# edgecolor="white",
# linewidth=1.0,
)
title = "Distribution of Time Gaps Between Consecutive Attempts"
if student_idx is not None:
title = f"{title} (student_idx={student_idx})"
ax.set_title(title)
ax.set_xlabel("Time Gap")
ax.set_ylabel("Probability Density (1/min)")
x_min = float(left.min())
x_max = float((left + width).max())
if plot_upper_limit_minutes is not None:
x_max = min(x_max, float(plot_upper_limit_minutes))
ax.set_xlim(x_min, x_max)
ax.xaxis.set_major_locator(MaxNLocator(nbins=9))
ax.xaxis.set_major_formatter(FuncFormatter(format_minutes_tick))
ax.grid(axis="y", alpha=0.25, linewidth=0.8)
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
if log_y:
ax.set_yscale("log")
marker_label = f"CDF <= {int(cdf_marker_minutes)} min"
if cdf_at_marker is not None:
marker_label = f"{marker_label}: {cdf_at_marker * 100:.1f}%"
ax.axvline(
cdf_marker_minutes,
color="#E45756",
linestyle="--",
linewidth=1.6,
label=marker_label,
)
ax.legend(loc="upper right", frameon=False, fontsize=9)
# Label non-trivial bins for readability in papers.
for bar, pct in zip(bars, summary_df["percentage"]):
if pct < 1.0:
continue
height = bar.get_height()
if height <= 0:
continue
ax.annotate(
f"{pct:.1f}%",
xy=(bar.get_x() + bar.get_width() / 2.0, height),
xytext=(0, 3),
textcoords="offset points",
ha="center",
va="bottom",
fontsize=8,
)
plt.tight_layout()
fig.savefig(output_path, dpi=400, bbox_inches="tight")
plt.close(fig)
def main() -> None:
args = parse_args()
if not args.interactions_path.exists():
raise FileNotFoundError(
f"Interactions file not found: {args.interactions_path}"
)
df = load_interactions(args.interactions_path, max_rows=args.max_rows)
selected_student_id: str | None = None
total_students = int(df["user_id"].nunique())
if args.student_idx is not None:
df, selected_student_id, total_students = select_student_by_index(
df,
args.student_idx,
)
output_plot_path = args.output_plot
output_counts_path = args.output_counts
if selected_student_id is not None:
output_plot_path = append_student_id_to_output_path(
output_plot_path,
selected_student_id,
)
output_counts_path = append_student_id_to_output_path(
output_counts_path,
selected_student_id,
)
gaps_minutes = compute_time_gaps_minutes(df)
if args.plot_upper_limit_minutes is not None and args.plot_upper_limit_minutes <= 0:
raise ValueError("--plot-upper-limit-minutes must be a positive number.")
if args.bin_time is not None and args.bin_time <= 0:
raise ValueError("--bin-time must be a positive number.")
summary = summarize_binned_distribution(
gaps_minutes,
keep_nonpositive=args.keep_nonpositive_gaps,
bin_time_minutes=args.bin_time,
)
output_counts_path.parent.mkdir(parents=True, exist_ok=True)
summary.to_csv(output_counts_path, index=False)
cdf_at_marker = cumulative_probability_at_minutes(
gaps_minutes=gaps_minutes,
threshold_minutes=CDF_MARKER_MINUTES,
keep_nonpositive=args.keep_nonpositive_gaps,
)
plot_distribution(
summary,
output_plot_path,
log_y=args.log_y,
plot_upper_limit_minutes=args.plot_upper_limit_minutes,
cdf_marker_minutes=CDF_MARKER_MINUTES,
cdf_at_marker=cdf_at_marker,
student_idx=args.student_idx,
)
total_pairs = int(summary["count"].sum())
print("Done.")
print(f"Interactions loaded: {len(df):,}")
print(f"Students in loaded data: {total_students:,}")
if selected_student_id is not None:
print(f"Selected student idx: {args.student_idx}")
print(f"Selected student id: {selected_student_id}")
print(f"Consecutive attempt pairs used: {total_pairs:,}")
if args.bin_time is not None:
print(f"Bin width (min): {args.bin_time}")
print(
f"Cumulative P(gap <= {int(CDF_MARKER_MINUTES)} min): "
f"{cdf_at_marker * 100:.2f}%"
)
print(f"Saved plot: {output_plot_path}")
print(f"Saved bin counts: {output_counts_path}")
if __name__ == "__main__":
main()