"""Load data from HF Dataset with Streamlit caching.""" from __future__ import annotations import json import pandas as pd import streamlit as st from huggingface_hub import hf_hub_download DATASET_REPO = "buckeyeguy/osc-usage-data" @st.cache_data(ttl=300) def load_data() -> tuple[pd.DataFrame, pd.DataFrame, dict]: """Download Parquet + metadata from HF Dataset. Cached for 5 min.""" jobs_path = hf_hub_download(repo_id=DATASET_REPO, filename="jobs.parquet", repo_type="dataset") snapshots_path = hf_hub_download( repo_id=DATASET_REPO, filename="snapshots.parquet", repo_type="dataset" ) metadata_path = hf_hub_download( repo_id=DATASET_REPO, filename="metadata.json", repo_type="dataset" ) jobs = pd.read_parquet(jobs_path) snapshots = pd.read_parquet(snapshots_path) with open(metadata_path) as f: metadata = json.load(f) # Ensure datetime columns for col in ["submit_time", "start_time", "end_time"]: if col in jobs.columns: jobs[col] = pd.to_datetime(jobs[col]) # Add derived columns if "end_time" in jobs.columns: jobs["end_date"] = jobs["end_time"].dt.date jobs["end_month"] = jobs["end_time"].dt.to_period("M").astype(str) jobs["end_dow"] = jobs["end_time"].dt.dayofweek # 0=Mon jobs["end_hour"] = jobs["end_time"].dt.hour if "walltime_used" in jobs.columns: jobs["walltime_hours"] = jobs["walltime_used"] / 3600.0 # Behavioral outcome classification if "launch_method" in jobs.columns and "last_state" in jobs.columns: import numpy as np from config import INTERACTIVE_METHODS, QUICK_EXIT_SECONDS is_interactive = jobs["launch_method"].isin(INTERACTIVE_METHODS) wt = jobs.get("walltime_used", pd.Series(dtype="float64")) state = jobs["last_state"] # Start with batch classification (maps exit state directly) outcome = state.map( { "COMPLETED": "Completed", "FAILED": "Failed", "TIMEOUT": "Timed Out", "OUT_OF_MEMORY": "Out of Memory", } ).fillna("Cancelled") # All CANCELLED variants + NODE_FAIL → "Cancelled" # Override for interactive jobs is_quick = is_interactive & (wt < QUICK_EXIT_SECONDS) is_failed_interactive = is_interactive & state.isin({"FAILED", "OUT_OF_MEMORY"}) is_user_ended = ( is_interactive & ~is_quick & ~is_failed_interactive & state.str.startswith("CANCELLED") ) is_session_expired = ( is_interactive & ~is_quick & ~is_failed_interactive & (state == "TIMEOUT") ) outcome = np.where(is_quick, "Quick Exit", outcome) outcome = np.where(is_failed_interactive, "Failed", outcome) outcome = np.where(is_user_ended, "User Ended", outcome) outcome = np.where(is_session_expired, "Session Expired", outcome) jobs["outcome_category"] = outcome # Queue wait time if "submit_time" in jobs.columns and "start_time" in jobs.columns: jobs["wait_hours"] = (jobs["start_time"] - jobs["submit_time"]).dt.total_seconds() / 3600.0 return jobs, snapshots, metadata def filter_jobs( jobs: pd.DataFrame, date_range: tuple | None = None, projects: list[str] | None = None, users: list[str] | None = None, systems: list[str] | None = None, ) -> pd.DataFrame: """Apply sidebar filters to jobs DataFrame.""" df = jobs.copy() if date_range and "end_date" in df.columns: df = df[df["end_date"].between(date_range[0], date_range[1])] if projects: df = df[df["project_code"].isin(projects)] if users: df = df[df["username"].isin(users)] if systems: df = df[df["system_code"].isin(systems)] return df