| """Transformers CI — most common test failures. |
| |
| A tiny Gradio dashboard over the public `transformers-ci-telemetry` bucket |
| (daily-partitioned Parquet produced by the CI telemetry publisher). It ranks |
| the tests and exception types that fail most often, with a few headline stats. |
| |
| Data location: set ``TELEMETRY_DIR`` to the bucket mount. We otherwise probe a |
| short list of common paths (the Space's bucket mount, the local checkout) and |
| use the first one that actually contains a ``daily/`` tree. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import glob |
| import os |
| from typing import Any |
|
|
| import gradio as gr |
| import pandas as pd |
|
|
| |
| |
| _CANDIDATE_DIRS = [ |
| os.environ.get("TELEMETRY_DIR", ""), |
| "/data/transformers-ci-telemetry", |
| "/data", |
| "/bucket", |
| os.path.join(os.path.dirname(__file__), "data"), |
| "/Users/tarek/Dev/transformers-ci-telemetry", |
| ] |
|
|
|
|
| def _telemetry_dir() -> str | None: |
| """First candidate dir that contains a non-empty ``daily/`` tree.""" |
| for candidate in _CANDIDATE_DIRS: |
| if candidate and glob.glob(os.path.join(candidate, "daily", "*", "test_rows.parquet")): |
| return candidate |
| return None |
|
|
|
|
| def load_test_rows() -> pd.DataFrame: |
| """Concatenate every ``daily/*/test_rows.parquet`` into one frame.""" |
| base = _telemetry_dir() |
| if base is None: |
| return pd.DataFrame() |
| files = sorted(glob.glob(os.path.join(base, "daily", "*", "test_rows.parquet"))) |
| frames = [] |
| for path in files: |
| try: |
| frames.append(pd.read_parquet(path)) |
| except Exception: |
| continue |
| if not frames: |
| return pd.DataFrame() |
| return pd.concat(frames, ignore_index=True) |
|
|
|
|
| def _summary_md(df: pd.DataFrame) -> str: |
| if df.empty: |
| return ( |
| "### No data found\n\n" |
| "No `daily/*/test_rows.parquet` under any known bucket path. " |
| "Set `TELEMETRY_DIR` to the mounted bucket." |
| ) |
| total = len(df) |
| failures = int((df["status_code"] == "ERROR").sum()) |
| rate = (failures / total * 100) if total else 0.0 |
| runs = df["run_id"].nunique() |
| days = df["date"].nunique() |
| return ( |
| f"**{total}** test executions across **{runs}** run(s) / **{days}** day(s) · " |
| f"**{failures}** failures · **{rate:.1f}%** failure rate" |
| ) |
|
|
|
|
| def _top_failing_tests(df: pd.DataFrame, limit: int = 20) -> pd.DataFrame: |
| if df.empty: |
| return pd.DataFrame(columns=["test_nodeid", "failures"]) |
| errors = df[df["status_code"] == "ERROR"] |
| if errors.empty: |
| return pd.DataFrame(columns=["test_nodeid", "failures"]) |
| out = ( |
| errors.groupby("test_nodeid") |
| .size() |
| .reset_index(name="failures") |
| .sort_values("failures", ascending=False) |
| .head(limit) |
| .reset_index(drop=True) |
| ) |
| return out |
|
|
|
|
| def _failures_by(df: pd.DataFrame, column: str, label: str) -> pd.DataFrame: |
| cols = [label, "failures"] |
| if df.empty: |
| return pd.DataFrame(columns=cols) |
| errors = df[df["status_code"] == "ERROR"].copy() |
| if errors.empty: |
| return pd.DataFrame(columns=cols) |
| errors[column] = errors[column].fillna("").replace("", "(none)") |
| out = ( |
| errors.groupby(column) |
| .size() |
| .reset_index(name="failures") |
| .sort_values("failures", ascending=False) |
| .reset_index(drop=True) |
| ) |
| return out.rename(columns={column: label}) |
|
|
|
|
| def _error_rows(df: pd.DataFrame, limit: int = 100) -> tuple[pd.DataFrame, list[dict[str, Any]]]: |
| columns = ["date", "test_nodeid", "exception_type", "exception_message"] |
| if df.empty or "status_code" not in df: |
| return pd.DataFrame(columns=columns), [] |
|
|
| errors = df[df["status_code"] == "ERROR"].copy() |
| if errors.empty: |
| return pd.DataFrame(columns=columns), [] |
|
|
| if "date" in errors: |
| errors = errors.sort_values("date", ascending=False) |
| errors = errors.head(limit).reset_index(drop=True) |
|
|
| detail_columns = columns + ["exception_stacktrace", "run_id", "test_job", "model", "gpu"] |
| for column in detail_columns: |
| if column not in errors: |
| errors[column] = "" |
|
|
| details = errors[detail_columns].fillna("").to_dict("records") |
| return errors[columns].fillna(""), details |
|
|
|
|
| def _error_stacktrace(details: list[dict[str, Any]] | None, evt: gr.SelectData) -> str: |
| if not details: |
| return "Select an error row to see its full stacktrace." |
|
|
| row_index = evt.index[0] if isinstance(evt.index, (list, tuple)) else evt.index |
| try: |
| row = details[int(row_index)] |
| except (TypeError, ValueError, IndexError): |
| return "Select an error row to see its full stacktrace." |
|
|
| stacktrace = str(row.get("exception_stacktrace") or "").strip() |
| message = str(row.get("exception_message") or "").strip() |
| if not stacktrace: |
| stacktrace = message or "(no stacktrace recorded)" |
|
|
| header = "\n".join( |
| value |
| for value in [ |
| str(row.get("test_nodeid") or "").strip(), |
| f"{row.get('exception_type')}: {message}".strip(": "), |
| f"date={row.get('date')} run_id={row.get('run_id')} job={row.get('test_job')}", |
| f"model={row.get('model') or '(none)'} gpu={row.get('gpu') or '(none)'}", |
| ] |
| if value |
| ) |
| return f"{header}\n\n{stacktrace}" if header else stacktrace |
|
|
|
|
| def refresh(): |
| df = load_test_rows() |
| top_tests = _top_failing_tests(df) |
| by_type = _failures_by(df, "exception_type", "exception_type") |
| by_model = _failures_by(df, "model", "model") |
| error_rows, error_details = _error_rows(df) |
| |
| |
| plot_df = top_tests.head(10).copy() |
| if not plot_df.empty: |
| plot_df["test"] = plot_df["test_nodeid"].str.split("::").str[-1] |
| else: |
| plot_df = pd.DataFrame({"test": [], "failures": []}) |
| return ( |
| _summary_md(df), |
| plot_df, |
| top_tests, |
| by_type, |
| by_model, |
| error_rows, |
| error_details, |
| "Select an error row to see its full stacktrace.", |
| ) |
|
|
|
|
| with gr.Blocks(title="Transformers CI — common failures") as demo: |
| gr.Markdown("# ⚡ Transformers CI — most common test failures") |
| gr.Markdown( |
| "Built on the public " |
| "[`transformers-ci-telemetry`](https://huggingface.co/buckets/huggingface/transformers-ci-telemetry) " |
| "bucket — CI test telemetry, refreshed hourly." |
| ) |
| summary = gr.Markdown() |
| refresh_btn = gr.Button("↻ Refresh", variant="secondary") |
|
|
| gr.Markdown("## Top failing tests") |
| fail_plot = gr.BarPlot( |
| x="test", y="failures", title="Failures by test (top 10)", height=320 |
| ) |
| top_tests_tbl = gr.Dataframe(label="Top failing tests", interactive=False) |
|
|
| with gr.Row(): |
| by_type_tbl = gr.Dataframe(label="Failures by exception type", interactive=False) |
| by_model_tbl = gr.Dataframe(label="Failures by model", interactive=False) |
|
|
| gr.Markdown("## Recent errors") |
| error_details_state = gr.State([]) |
| error_rows_tbl = gr.Dataframe(label="Errors", interactive=False, wrap=True) |
| stacktrace_box = gr.Code( |
| label="Full stacktrace", |
| language="python", |
| interactive=False, |
| lines=24, |
| ) |
|
|
| outputs = [ |
| summary, |
| fail_plot, |
| top_tests_tbl, |
| by_type_tbl, |
| by_model_tbl, |
| error_rows_tbl, |
| error_details_state, |
| stacktrace_box, |
| ] |
| refresh_btn.click(refresh, outputs=outputs) |
| demo.load(refresh, outputs=outputs) |
| error_rows_tbl.select( |
| _error_stacktrace, |
| inputs=error_details_state, |
| outputs=stacktrace_box, |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|