File size: 7,981 Bytes
66de927 1620e22 66de927 1620e22 66de927 1620e22 66de927 1620e22 66de927 8437436 66de927 1620e22 66de927 1620e22 66de927 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 | """Transformers CI — most common test failures.
A tiny Gradio dashboard over the public `transformers-ci-telemetry` bucket
(daily-partitioned Parquet produced by the CI telemetry publisher). It ranks
the tests and exception types that fail most often, with a few headline stats.
Data location: set ``TELEMETRY_DIR`` to the bucket mount. We otherwise probe a
short list of common paths (the Space's bucket mount, the local checkout) and
use the first one that actually contains a ``daily/`` tree.
"""
from __future__ import annotations
import glob
import os
from typing import Any
import gradio as gr
import pandas as pd
# Candidate locations for the bucket contents, in priority order. The Space
# mounts the bucket at a configured path; locally it's the synced checkout.
_CANDIDATE_DIRS = [
os.environ.get("TELEMETRY_DIR", ""),
"/data/transformers-ci-telemetry",
"/data",
"/bucket",
os.path.join(os.path.dirname(__file__), "data"),
"/Users/tarek/Dev/transformers-ci-telemetry",
]
def _telemetry_dir() -> str | None:
"""First candidate dir that contains a non-empty ``daily/`` tree."""
for candidate in _CANDIDATE_DIRS:
if candidate and glob.glob(os.path.join(candidate, "daily", "*", "test_rows.parquet")):
return candidate
return None
def load_test_rows() -> pd.DataFrame:
"""Concatenate every ``daily/*/test_rows.parquet`` into one frame."""
base = _telemetry_dir()
if base is None:
return pd.DataFrame()
files = sorted(glob.glob(os.path.join(base, "daily", "*", "test_rows.parquet")))
frames = []
for path in files:
try:
frames.append(pd.read_parquet(path))
except Exception: # noqa: BLE001 - skip a corrupt/partial partition
continue
if not frames:
return pd.DataFrame()
return pd.concat(frames, ignore_index=True)
def _summary_md(df: pd.DataFrame) -> str:
if df.empty:
return (
"### No data found\n\n"
"No `daily/*/test_rows.parquet` under any known bucket path. "
"Set `TELEMETRY_DIR` to the mounted bucket."
)
total = len(df)
failures = int((df["status_code"] == "ERROR").sum())
rate = (failures / total * 100) if total else 0.0
runs = df["run_id"].nunique()
days = df["date"].nunique()
return (
f"**{total}** test executions across **{runs}** run(s) / **{days}** day(s) · "
f"**{failures}** failures · **{rate:.1f}%** failure rate"
)
def _top_failing_tests(df: pd.DataFrame, limit: int = 20) -> pd.DataFrame:
if df.empty:
return pd.DataFrame(columns=["test_nodeid", "failures"])
errors = df[df["status_code"] == "ERROR"]
if errors.empty:
return pd.DataFrame(columns=["test_nodeid", "failures"])
out = (
errors.groupby("test_nodeid")
.size()
.reset_index(name="failures")
.sort_values("failures", ascending=False)
.head(limit)
.reset_index(drop=True)
)
return out
def _failures_by(df: pd.DataFrame, column: str, label: str) -> pd.DataFrame:
cols = [label, "failures"]
if df.empty:
return pd.DataFrame(columns=cols)
errors = df[df["status_code"] == "ERROR"].copy()
if errors.empty:
return pd.DataFrame(columns=cols)
errors[column] = errors[column].fillna("").replace("", "(none)")
out = (
errors.groupby(column)
.size()
.reset_index(name="failures")
.sort_values("failures", ascending=False)
.reset_index(drop=True)
)
return out.rename(columns={column: label})
def _error_rows(df: pd.DataFrame, limit: int = 100) -> tuple[pd.DataFrame, list[dict[str, Any]]]:
columns = ["date", "test_nodeid", "exception_type", "exception_message"]
if df.empty or "status_code" not in df:
return pd.DataFrame(columns=columns), []
errors = df[df["status_code"] == "ERROR"].copy()
if errors.empty:
return pd.DataFrame(columns=columns), []
if "date" in errors:
errors = errors.sort_values("date", ascending=False)
errors = errors.head(limit).reset_index(drop=True)
detail_columns = columns + ["exception_stacktrace", "run_id", "test_job", "model", "gpu"]
for column in detail_columns:
if column not in errors:
errors[column] = ""
details = errors[detail_columns].fillna("").to_dict("records")
return errors[columns].fillna(""), details
def _error_stacktrace(details: list[dict[str, Any]] | None, evt: gr.SelectData) -> str:
if not details:
return "Select an error row to see its full stacktrace."
row_index = evt.index[0] if isinstance(evt.index, (list, tuple)) else evt.index
try:
row = details[int(row_index)]
except (TypeError, ValueError, IndexError):
return "Select an error row to see its full stacktrace."
stacktrace = str(row.get("exception_stacktrace") or "").strip()
message = str(row.get("exception_message") or "").strip()
if not stacktrace:
stacktrace = message or "(no stacktrace recorded)"
header = "\n".join(
value
for value in [
str(row.get("test_nodeid") or "").strip(),
f"{row.get('exception_type')}: {message}".strip(": "),
f"date={row.get('date')} run_id={row.get('run_id')} job={row.get('test_job')}",
f"model={row.get('model') or '(none)'} gpu={row.get('gpu') or '(none)'}",
]
if value
)
return f"{header}\n\n{stacktrace}" if header else stacktrace
def refresh():
df = load_test_rows()
top_tests = _top_failing_tests(df)
by_type = _failures_by(df, "exception_type", "exception_type")
by_model = _failures_by(df, "model", "model")
error_rows, error_details = _error_rows(df)
# BarPlot wants a tidy frame; reuse the top-tests table (trim the nodeid for
# readability on the axis).
plot_df = top_tests.head(10).copy()
if not plot_df.empty:
plot_df["test"] = plot_df["test_nodeid"].str.split("::").str[-1]
else:
plot_df = pd.DataFrame({"test": [], "failures": []})
return (
_summary_md(df),
plot_df,
top_tests,
by_type,
by_model,
error_rows,
error_details,
"Select an error row to see its full stacktrace.",
)
with gr.Blocks(title="Transformers CI — common failures") as demo:
gr.Markdown("# ⚡ Transformers CI — most common test failures")
gr.Markdown(
"Built on the public "
"[`transformers-ci-telemetry`](https://huggingface.co/buckets/huggingface/transformers-ci-telemetry) "
"bucket — CI test telemetry, refreshed hourly."
)
summary = gr.Markdown()
refresh_btn = gr.Button("↻ Refresh", variant="secondary")
gr.Markdown("## Top failing tests")
fail_plot = gr.BarPlot(
x="test", y="failures", title="Failures by test (top 10)", height=320
)
top_tests_tbl = gr.Dataframe(label="Top failing tests", interactive=False)
with gr.Row():
by_type_tbl = gr.Dataframe(label="Failures by exception type", interactive=False)
by_model_tbl = gr.Dataframe(label="Failures by model", interactive=False)
gr.Markdown("## Recent errors")
error_details_state = gr.State([])
error_rows_tbl = gr.Dataframe(label="Errors", interactive=False, wrap=True)
stacktrace_box = gr.Code(
label="Full stacktrace",
language="python",
interactive=False,
lines=24,
)
outputs = [
summary,
fail_plot,
top_tests_tbl,
by_type_tbl,
by_model_tbl,
error_rows_tbl,
error_details_state,
stacktrace_box,
]
refresh_btn.click(refresh, outputs=outputs)
demo.load(refresh, outputs=outputs)
error_rows_tbl.select(
_error_stacktrace,
inputs=error_details_state,
outputs=stacktrace_box,
)
if __name__ == "__main__":
demo.launch()
|