File size: 7,792 Bytes
a227c91 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 | #!/usr/bin/env python3
"""
Fetch and process SGLang nightly test metrics from GitHub Actions artifacts.
This script fetches consolidated metrics from GitHub Actions workflow runs
and outputs them as JSON for the performance dashboard.
Usage:
python fetch_metrics.py --output metrics_data.json
python fetch_metrics.py --output metrics_data.json --days 30
python fetch_metrics.py --output metrics_data.json --run-id 21338741812
"""
import argparse
import io
import json
import os
import sys
import zipfile
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
import requests
GITHUB_REPO = "sgl-project/sglang"
WORKFLOW_NAME = "nightly-test-nvidia.yml"
ARTIFACT_PREFIX = "consolidated-metrics-"
def get_github_token() -> Optional[str]:
"""Get GitHub token from environment or gh CLI."""
# Check environment variable first
token = os.environ.get("GITHUB_TOKEN")
if token:
return token
# Try gh CLI
try:
import subprocess
result = subprocess.run(
["gh", "auth", "token"],
capture_output=True,
text=True,
check=True,
)
return result.stdout.strip()
except (subprocess.CalledProcessError, FileNotFoundError):
pass
return None
def get_headers(token: Optional[str]) -> dict:
"""Get request headers with optional authentication."""
headers = {
"Accept": "application/vnd.github.v3+json",
}
if token:
headers["Authorization"] = f"Bearer {token}"
return headers
def fetch_workflow_runs(
token: Optional[str],
days: int = 30,
event: Optional[str] = None,
) -> list:
"""Fetch completed workflow runs from GitHub Actions."""
url = f"https://api.github.com/repos/{GITHUB_REPO}/actions/workflows/{WORKFLOW_NAME}/runs"
params = {
"status": "completed",
"per_page": 100,
}
if event:
params["event"] = event
response = requests.get(url, headers=get_headers(token), params=params, timeout=30)
response.raise_for_status()
runs = response.json().get("workflow_runs", [])
# Filter by date
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
runs = [
run
for run in runs
if datetime.fromisoformat(run["created_at"].replace("Z", "+00:00")) > cutoff
]
return runs
def fetch_run_artifacts(token: Optional[str], run_id: int) -> list:
"""Fetch artifacts for a specific workflow run."""
url = f"https://api.github.com/repos/{GITHUB_REPO}/actions/runs/{run_id}/artifacts"
response = requests.get(url, headers=get_headers(token), timeout=30)
response.raise_for_status()
return response.json().get("artifacts", [])
def download_artifact(token: Optional[str], artifact_id: int) -> Optional[bytes]:
"""Download an artifact by ID."""
if not token:
print(f"Warning: GitHub token required to download artifacts", file=sys.stderr)
return None
url = f"https://api.github.com/repos/{GITHUB_REPO}/actions/artifacts/{artifact_id}/zip"
headers = get_headers(token)
response = requests.get(url, headers=headers, allow_redirects=True, timeout=60)
if response.status_code == 200:
return response.content
print(
f"Failed to download artifact {artifact_id}: {response.status_code}",
file=sys.stderr,
)
return None
def extract_metrics_from_zip(zip_content: bytes) -> Optional[dict]:
"""Extract metrics JSON from a zip file."""
try:
with zipfile.ZipFile(io.BytesIO(zip_content)) as zf:
# Find the JSON file in the archive
json_files = [f for f in zf.namelist() if f.endswith(".json")]
if not json_files:
return None
with zf.open(json_files[0]) as f:
return json.load(f)
except (zipfile.BadZipFile, json.JSONDecodeError) as e:
print(f"Failed to extract metrics: {e}", file=sys.stderr)
return None
def fetch_metrics_for_run(token: Optional[str], run: dict) -> Optional[dict]:
"""Fetch metrics for a single workflow run."""
run_id = run["id"]
print(f"Fetching metrics for run {run_id}...", file=sys.stderr)
artifacts = fetch_run_artifacts(token, run_id)
# Find consolidated metrics artifact
metrics_artifact = None
for artifact in artifacts:
if artifact["name"].startswith(ARTIFACT_PREFIX):
metrics_artifact = artifact
break
if not metrics_artifact:
print(f"No consolidated metrics found for run {run_id}", file=sys.stderr)
return None
# Download and extract
zip_content = download_artifact(token, metrics_artifact["id"])
if not zip_content:
return None
metrics = extract_metrics_from_zip(zip_content)
if not metrics:
return None
# Ensure required fields are present
if "run_id" not in metrics:
metrics["run_id"] = str(run_id)
if "run_date" not in metrics:
metrics["run_date"] = run["created_at"]
if "commit_sha" not in metrics:
metrics["commit_sha"] = run["head_sha"]
if "branch" not in metrics:
metrics["branch"] = run["head_branch"]
return metrics
def fetch_single_run(token: Optional[str], run_id: int) -> Optional[dict]:
"""Fetch metrics for a single run by ID."""
url = f"https://api.github.com/repos/{GITHUB_REPO}/actions/runs/{run_id}"
response = requests.get(url, headers=get_headers(token), timeout=30)
response.raise_for_status()
run = response.json()
return fetch_metrics_for_run(token, run)
def main():
parser = argparse.ArgumentParser(
description="Fetch SGLang nightly test metrics from GitHub Actions"
)
parser.add_argument(
"--output",
"-o",
type=str,
default="metrics_data.json",
help="Output JSON file path",
)
parser.add_argument(
"--days",
type=int,
default=30,
help="Number of days to fetch (default: 30)",
)
parser.add_argument(
"--run-id",
type=int,
help="Fetch a specific run by ID",
)
parser.add_argument(
"--event",
type=str,
choices=["schedule", "workflow_dispatch", "push"],
help="Filter by trigger event type",
)
parser.add_argument(
"--scheduled-only",
action="store_true",
help="Only fetch scheduled (nightly) runs",
)
args = parser.parse_args()
token = get_github_token()
if not token:
print(
"Warning: No GitHub token found. Some features may be limited.",
file=sys.stderr,
)
print(
"Set GITHUB_TOKEN env var or login with 'gh auth login'",
file=sys.stderr,
)
all_metrics = []
if args.run_id:
# Fetch single run
metrics = fetch_single_run(token, args.run_id)
if metrics:
all_metrics.append(metrics)
else:
# Fetch multiple runs
event = "schedule" if args.scheduled_only else args.event
runs = fetch_workflow_runs(token, days=args.days, event=event)
print(f"Found {len(runs)} workflow runs", file=sys.stderr)
for run in runs:
metrics = fetch_metrics_for_run(token, run)
if metrics:
all_metrics.append(metrics)
# Sort by date descending
all_metrics.sort(key=lambda x: x.get("run_date", ""), reverse=True)
# Write output
output_path = Path(args.output)
with open(output_path, "w") as f:
json.dump(all_metrics, f, indent=2)
print(f"Wrote {len(all_metrics)} metrics records to {output_path}", file=sys.stderr)
if __name__ == "__main__":
main()
|