| |
|
|
| import argparse |
| import base64 |
| import json |
| import os |
| import re |
| import sys |
| import time |
| from collections import Counter, defaultdict |
| from datetime import datetime, timedelta |
| from typing import Dict, List, Optional |
|
|
| import requests |
|
|
|
|
| class SGLangCIAnalyzer: |
|
|
| def __init__(self, token: str): |
| self.token = token |
| self.base_url = "https://api.github.com" |
| self.repo = "sgl-project/sglang" |
| self.headers = { |
| "Authorization": f"token {token}", |
| "Accept": "application/vnd.github.v3+json", |
| "User-Agent": "SGLang-CI-Analyzer/1.0", |
| } |
| self.session = requests.Session() |
| self.session.headers.update(self.headers) |
|
|
| |
| self.nightly_workflows = [ |
| "nightly-test-nvidia.yml", |
| "nightly-test-amd.yml", |
| "nightly-test-intel.yml", |
| ] |
|
|
| |
| self.perf_patterns = { |
| "output_throughput": re.compile( |
| r"Output token throughput \(tok/s\):\s*([\d.]+)" |
| ), |
| "input_throughput": re.compile( |
| r"Input token throughput \(tok/s\):\s*([\d.]+)" |
| ), |
| "latency": re.compile(r"Median E2E Latency \(ms\):\s*([\d.]+)"), |
| "ttft": re.compile(r"Median TTFT \(ms\):\s*([\d.]+)"), |
| "accept_length": re.compile(r"Accept length:\s*([\d.]+)"), |
| "accuracy": re.compile(r"Accuracy:\s*([\d.]+)"), |
| "gsm8k_score": re.compile(r"GSM8K Score:\s*([\d.]+)"), |
| } |
|
|
| |
| self.data_repo = "sglang-bot/sglang-ci-data" |
| self.data_branch = "main" |
|
|
| def get_recent_runs(self, limit: int = 100, branch: str = None) -> List[Dict]: |
| branch_info = f" from branch '{branch}'" if branch else "" |
| print(f"Fetching {limit} recent CI runs{branch_info}...") |
|
|
| all_runs = [] |
| page = 1 |
| per_page = 100 |
|
|
| while len(all_runs) < limit: |
| url = f"{self.base_url}/repos/{self.repo}/actions/runs" |
| params = {"per_page": min(per_page, limit - len(all_runs)), "page": page} |
| if branch: |
| params["branch"] = branch |
|
|
| try: |
| response = self.session.get(url, params=params) |
| response.raise_for_status() |
| data = response.json() |
|
|
| if not data.get("workflow_runs"): |
| break |
|
|
| all_runs.extend(data["workflow_runs"]) |
| print(f"Fetched {len(all_runs)} runs so far...") |
|
|
| if len(data["workflow_runs"]) < per_page: |
| break |
|
|
| page += 1 |
| time.sleep(0.1) |
|
|
| except requests.exceptions.RequestException as e: |
| print(f"Error fetching CI data: {e}") |
| break |
|
|
| return all_runs[:limit] |
|
|
| def analyze_ci_failures(self, runs: List[Dict]) -> Dict: |
| print( |
| "Analyzing CI failure data (pr-test.yml, quantization-test.yml, nightly-test.yml jobs only)..." |
| ) |
|
|
| job_categories = { |
| "build": [ |
| "build-test", |
| "sgl-kernel-build-wheels", |
| ], |
| "unit-test": [ |
| "stage-a-test-1", |
| "unit-test-backend-1-gpu", |
| "unit-test-backend-2-gpu", |
| "stage-b-test-4-gpu-b200", |
| "unit-test-backend-4-gpu", |
| "unit-test-backend-8-gpu", |
| ], |
| "performance": [ |
| "performance-test-1-gpu-part-1", |
| "performance-test-1-gpu-part-2", |
| "performance-test-1-gpu-part-3", |
| "performance-test-2-gpu", |
| ], |
| "accuracy": [ |
| "accuracy-test-1-gpu", |
| "accuracy-test-2-gpu", |
| ], |
| "mla-test": [ |
| "sgl-kernel-mla-test", |
| ], |
| "deepep": [ |
| "unit-test-deepep-4-gpu", |
| "unit-test-deepep-8-gpu", |
| ], |
| "per-commit": [ |
| "per-commit-8-gpu-h20", |
| ], |
| "nightly": [ |
| |
| "nightly-test-general-1-gpu-runner", |
| "nightly-test-general-4-gpu-h100", |
| "nightly-test-general-8-gpu-h200", |
| "nightly-test-general-8-gpu-h20", |
| "nightly-test-general-8-gpu-b200", |
| "nightly-test-text-accuracy-2-gpu-runner", |
| "nightly-test-text-perf-2-gpu-runner", |
| "nightly-test-vlm-accuracy-2-gpu-runner", |
| "nightly-test-vlm-perf-2-gpu-runner", |
| "nightly-test-perf-4-gpu-b200", |
| "nightly-test-perf-8-gpu-b200", |
| |
| "nightly-test", |
| ], |
| "integration": [ |
| "run-all-notebooks", |
| "quantization-test", |
| "test-disaggregation", |
| ], |
| "b200": [ |
| "unit-test-backend-4-gpu-b200", |
| ], |
| "gb200": [ |
| "unit-test-backend-4-gpu-gb200", |
| ], |
| } |
|
|
| stats = { |
| "total_runs": len(runs), |
| "failed_runs": 0, |
| "successful_runs": 0, |
| "cancelled_runs": 0, |
| "skipped_runs": 0, |
| "category_failures": defaultdict(int), |
| "job_failures": defaultdict(int), |
| "failure_patterns": defaultdict(int), |
| "job_failure_links": defaultdict( |
| list |
| ), |
| "job_last_success": {}, |
| "performance_metrics": defaultdict( |
| lambda: defaultdict(list) |
| ), |
| } |
|
|
| total_runs = len(runs) |
| for i, run in enumerate(runs, 1): |
| if i % max(1, min(50, total_runs // 10)) == 0 or i == total_runs: |
| progress = (i / total_runs) * 100 |
| print(f"Progress: {i}/{total_runs} ({progress:.1f}%)") |
|
|
| run_status = run.get("conclusion", "unknown") |
| workflow_name = run.get("name", "Unknown") |
| run_id = run.get("id") |
| run_number = run.get("run_number") |
| created_at = run.get("created_at") |
|
|
| if run_status == "failure": |
| stats["failed_runs"] += 1 |
| elif run_status == "success": |
| stats["successful_runs"] += 1 |
| elif run_status == "cancelled": |
| stats["cancelled_runs"] += 1 |
| elif run_status == "skipped": |
| stats["skipped_runs"] += 1 |
|
|
| jobs = self._get_job_details(run_id) |
| run_url = f"https://github.com/{self.repo}/actions/runs/{run_id}" |
| pr_info = self._get_pr_info(run) |
|
|
| for job in jobs: |
| job_name = job.get("name", "Unknown") |
| job_conclusion = job.get("conclusion", "unknown") |
|
|
| target_jobs = [ |
| "check-changes", |
| "sgl-kernel-build-wheels", |
| "sgl-kernel-unit-test", |
| "sgl-kernel-mla-test", |
| "sgl-kernel-benchmark-test", |
| "stage-a-test-1", |
| "unit-test-backend-1-gpu", |
| "unit-test-backend-2-gpu", |
| "stage-b-test-4-gpu-b200", |
| "unit-test-backend-4-gpu", |
| "unit-test-backend-8-gpu-h200", |
| "unit-test-backend-8-gpu-h20", |
| "performance-test-1-gpu-part-1", |
| "performance-test-1-gpu-part-2", |
| "performance-test-1-gpu-part-3", |
| "performance-test-2-gpu", |
| "accuracy-test-1-gpu", |
| "accuracy-test-2-gpu", |
| "unit-test-deepep-4-gpu", |
| "unit-test-deepep-8-gpu", |
| "unit-test-backend-8-gpu-deepseek-v32", |
| "unit-test-backend-4-gpu-b200", |
| "unit-test-backend-4-gpu-gb200", |
| "quantization-test", |
| |
| "nightly-test-general-1-gpu-runner", |
| "nightly-test-general-4-gpu-h100", |
| "nightly-test-general-8-gpu-h200", |
| "nightly-test-general-8-gpu-h20", |
| "nightly-test-general-8-gpu-b200", |
| "nightly-test-text-accuracy-2-gpu-runner", |
| "nightly-test-text-perf-2-gpu-runner", |
| "nightly-test-vlm-accuracy-2-gpu-runner", |
| "nightly-test-vlm-perf-2-gpu-runner", |
| "nightly-test-perf-4-gpu-b200", |
| "nightly-test-perf-8-gpu-b200", |
| |
| "nightly-test", |
| ] |
|
|
| if job_name in target_jobs: |
| if job_conclusion == "success": |
| stats["job_last_success"][job_name] = { |
| "url": run_url, |
| "run_number": run_number, |
| "created_at": created_at, |
| "pr_info": pr_info, |
| } |
|
|
| |
| if job_name in job_categories["nightly"] and ( |
| "perf" in job_name.lower() |
| or "accuracy" in job_name.lower() |
| or "eval" in job_name.lower() |
| ): |
| job_id = job.get("id") |
| logs = self.get_job_logs(job_id) |
| if logs: |
| metrics = self.parse_metrics_from_logs(logs, job_name) |
| for metric_name, values in metrics.items(): |
| if values: |
| for value in values: |
| stats["performance_metrics"][job_name][ |
| metric_name |
| ].append( |
| { |
| "value": value, |
| "timestamp": created_at, |
| "run_id": run_id, |
| "run_url": run_url, |
| } |
| ) |
|
|
| elif job_conclusion == "failure": |
| stats["job_failures"][job_name] += 1 |
|
|
| if len(stats["job_failure_links"][job_name]) < 3: |
| stats["job_failure_links"][job_name].append( |
| { |
| "url": run_url, |
| "run_number": run_number, |
| "created_at": created_at, |
| "pr_info": pr_info, |
| } |
| ) |
|
|
| for category, jobs_list in job_categories.items(): |
| if any( |
| job_pattern in job_name for job_pattern in jobs_list |
| ): |
| stats["category_failures"][category] += 1 |
| break |
|
|
| self._analyze_failure_pattern(job, stats) |
|
|
| time.sleep(0.1) |
|
|
| return stats |
|
|
| def _get_job_details(self, run_id: int) -> List[Dict]: |
| url = f"{self.base_url}/repos/{self.repo}/actions/runs/{run_id}/jobs" |
| try: |
| response = self.session.get(url) |
| response.raise_for_status() |
| return response.json().get("jobs", []) |
| except: |
| return [] |
|
|
| def _get_pr_info(self, run: Dict) -> Dict: |
| pr_info = { |
| "pr_number": None, |
| "author": run.get("head_commit", {}) |
| .get("author", {}) |
| .get("name", "Unknown"), |
| "head_sha": run.get("head_sha", ""), |
| "head_branch": run.get("head_branch", ""), |
| } |
|
|
| pull_requests = run.get("pull_requests", []) |
| if pull_requests: |
| pr_info["pr_number"] = pull_requests[0].get("number") |
|
|
| return pr_info |
|
|
| def _analyze_failure_pattern(self, job: Dict, stats: Dict): |
| job_name = job.get("name", "") |
| steps = job.get("steps", []) |
|
|
| for step in steps: |
| if step.get("conclusion") == "failure": |
| step_name = step.get("name", "") |
|
|
| if "timeout" in step_name.lower(): |
| stats["failure_patterns"]["Timeout"] += 1 |
| elif "build" in step_name.lower() or "build" in job_name.lower(): |
| stats["failure_patterns"]["Build Failure"] += 1 |
| elif "install" in step_name.lower() or "dependency" in job_name.lower(): |
| stats["failure_patterns"]["Dependency Installation Failure"] += 1 |
| elif "unit" in job_name.lower() or "unit-test" in job_name.lower(): |
| stats["failure_patterns"]["Unit Test Failure"] += 1 |
| elif "performance" in job_name.lower() or "perf" in job_name.lower(): |
| stats["failure_patterns"]["Performance Test Failure"] += 1 |
| elif "accuracy" in job_name.lower(): |
| stats["failure_patterns"]["Accuracy Test Failure"] += 1 |
| elif "mla" in job_name.lower(): |
| stats["failure_patterns"]["MLA Test Failure"] += 1 |
| elif "deepep" in job_name.lower(): |
| stats["failure_patterns"]["DeepEP Test Failure"] += 1 |
| elif "nightly" in job_name.lower(): |
| stats["failure_patterns"]["Nightly Test Failure"] += 1 |
| elif "notebook" in job_name.lower(): |
| stats["failure_patterns"]["Notebook Test Failure"] += 1 |
| elif "disaggregation" in job_name.lower(): |
| stats["failure_patterns"]["Disaggregation Test Failure"] += 1 |
| elif "h20" in job_name.lower() or "h200" in job_name.lower(): |
| stats["failure_patterns"]["H20/H200 GPU Failure"] += 1 |
| elif "b200" in job_name.lower(): |
| stats["failure_patterns"]["B200 GPU Failure"] += 1 |
| elif "gpu" in job_name.lower(): |
| stats["failure_patterns"]["GPU Related Failure"] += 1 |
| else: |
| stats["failure_patterns"]["Other"] += 1 |
|
|
| def generate_report(self, stats: Dict): |
| print("\n" + "=" * 60) |
| print("SGLang CI Analysis Report (Target Workflows Only)") |
| print("=" * 60) |
|
|
| total = stats["total_runs"] |
| failed = stats["failed_runs"] |
| success = stats["successful_runs"] |
| cancelled = stats["cancelled_runs"] |
| skipped = stats["skipped_runs"] |
| success_rate = (success / total * 100) if total > 0 else 0 |
|
|
| print(f"\nOverall Statistics:") |
| print(f" Total runs: {total}") |
| print(f" Successful: {success}") |
| print(f" Failed: {failed}") |
| print(f" Cancelled: {cancelled}") |
| print(f" Skipped: {skipped}") |
| print(f" Success rate: {success_rate:.1f}%") |
|
|
| if stats["category_failures"]: |
| print(f"\nCategory Failure Statistics:") |
| for category, count in sorted( |
| stats["category_failures"].items(), key=lambda x: x[1], reverse=True |
| ): |
| print(f" {category}: {count} failures") |
|
|
| if stats["job_failures"]: |
| print(f"\nMost Frequently Failed Jobs (Top 50):") |
| for i, (job, count) in enumerate( |
| sorted(stats["job_failures"].items(), key=lambda x: x[1], reverse=True)[ |
| :50 |
| ], |
| 1, |
| ): |
| print(f" {i:2d}. {job}: {count} times") |
|
|
| if job in stats["job_last_success"]: |
| last_success = stats["job_last_success"][job] |
| success_date = datetime.fromisoformat( |
| last_success["created_at"].replace("Z", "+00:00") |
| ) |
| pr_info = last_success["pr_info"] |
|
|
| pr_text = "" |
| if pr_info["pr_number"]: |
| pr_text = ( |
| f" (PR #{pr_info['pr_number']} by {pr_info['author']})" |
| ) |
| else: |
| pr_text = f" by {pr_info['author']}" |
|
|
| print( |
| f" Last Success: Run #{last_success['run_number']} ({success_date.strftime('%Y-%m-%d %H:%M')}){pr_text}: {last_success['url']}" |
| ) |
|
|
| if ( |
| job in stats["job_failure_links"] |
| and stats["job_failure_links"][job] |
| ): |
| print(" Recent Failures:") |
| for link_info in stats["job_failure_links"][job]: |
| created_at = datetime.fromisoformat( |
| link_info["created_at"].replace("Z", "+00:00") |
| ) |
|
|
| pr_info = link_info.get("pr_info", {}) |
| pr_text = "" |
| if pr_info.get("pr_number"): |
| pr_text = f" (PR #{pr_info['pr_number']} by {pr_info.get('author', 'Unknown')})" |
| else: |
| pr_text = f" by {pr_info.get('author', 'Unknown')}" |
|
|
| print( |
| f" - Run #{link_info['run_number']} ({created_at.strftime('%Y-%m-%d %H:%M')}){pr_text}: {link_info['url']}" |
| ) |
|
|
| if stats["failure_patterns"]: |
| print(f"\nFailure Pattern Analysis:") |
| for pattern, count in sorted( |
| stats["failure_patterns"].items(), key=lambda x: x[1], reverse=True |
| ): |
| print(f" {pattern}: {count} times") |
|
|
| print("\n" + "=" * 60) |
|
|
| def save_detailed_report(self, stats: Dict, output_file: str = "ci_analysis.json"): |
| with open(output_file, "w", encoding="utf-8") as f: |
| json.dump(stats, f, ensure_ascii=False, indent=2) |
| print(f"\nDetailed report saved to: {output_file}") |
|
|
| def generate_github_summary(self, stats: Dict): |
| try: |
| github_step_summary = os.environ.get("GITHUB_STEP_SUMMARY") |
| if not github_step_summary: |
| print("Not running in GitHub Actions, skipping summary generation") |
| return |
|
|
| print("Generating GitHub Actions summary for CI Analysis...") |
|
|
| summary_lines = [] |
| summary_lines.append("# SGLang CI Analysis Report (Target Workflows Only)") |
| summary_lines.append("") |
|
|
| total = stats["total_runs"] |
| failed = stats["failed_runs"] |
| success = stats["successful_runs"] |
| cancelled = stats["cancelled_runs"] |
| skipped = stats["skipped_runs"] |
| success_rate = (success / total * 100) if total > 0 else 0 |
|
|
| summary_lines.append("## Overall Statistics") |
| summary_lines.append("") |
| summary_lines.append("| Metric | Count | Percentage |") |
| summary_lines.append("|--------|-------|------------|") |
| summary_lines.append(f"| Total Runs | {total} | 100% |") |
| summary_lines.append( |
| f"| Successful | {success} | {success/total*100:.1f}% |" |
| ) |
| summary_lines.append(f"| Failed | {failed} | {failed/total*100:.1f}% |") |
| summary_lines.append( |
| f"| Cancelled | {cancelled} | {cancelled/total*100:.1f}% |" |
| ) |
| summary_lines.append(f"| Skipped | {skipped} | {skipped/total*100:.1f}% |") |
| summary_lines.append(f"| **Success Rate** | **{success_rate:.1f}%** | - |") |
| summary_lines.append("") |
|
|
| if stats["category_failures"]: |
| summary_lines.append("## Category Failure Statistics") |
| summary_lines.append("") |
| summary_lines.append("| Category | Failures |") |
| summary_lines.append("|----------|----------|") |
| for category, count in sorted( |
| stats["category_failures"].items(), key=lambda x: x[1], reverse=True |
| ): |
| summary_lines.append(f"| {category} | {count} |") |
| summary_lines.append("") |
|
|
| if stats["job_failures"]: |
| summary_lines.append("## Most Frequently Failed Jobs (Top 20)") |
| summary_lines.append("") |
|
|
| top_failures = sorted( |
| stats["job_failures"].items(), key=lambda x: x[1], reverse=True |
| )[:20] |
|
|
| for i, (job, count) in enumerate(top_failures, 1): |
| summary_lines.append(f"### {i}. `{job}` ({count} failures)") |
| summary_lines.append("") |
|
|
| if job in stats["job_last_success"]: |
| last_success = stats["job_last_success"][job] |
| success_date = datetime.fromisoformat( |
| last_success["created_at"].replace("Z", "+00:00") |
| ) |
| pr_info = last_success["pr_info"] |
|
|
| pr_text = "" |
| if pr_info["pr_number"]: |
| pr_text = ( |
| f" (PR #{pr_info['pr_number']} by {pr_info['author']})" |
| ) |
| else: |
| pr_text = f" by {pr_info['author']}" |
|
|
| summary_lines.append( |
| f"**Last Success:** [Run #{last_success['run_number']}]({last_success['url']}) ({success_date.strftime('%Y-%m-%d %H:%M')}){pr_text}" |
| ) |
| summary_lines.append("") |
|
|
| if ( |
| job in stats["job_failure_links"] |
| and stats["job_failure_links"][job] |
| ): |
| summary_lines.append("**Recent Failures:**") |
| for link_info in stats["job_failure_links"][job]: |
| created_at = datetime.fromisoformat( |
| link_info["created_at"].replace("Z", "+00:00") |
| ) |
|
|
| pr_info = link_info.get("pr_info", {}) |
| pr_text = "" |
| if pr_info.get("pr_number"): |
| pr_text = f" (PR #{pr_info['pr_number']} by {pr_info.get('author', 'Unknown')})" |
| else: |
| pr_text = f" by {pr_info.get('author', 'Unknown')}" |
|
|
| summary_lines.append( |
| f"- [Run #{link_info['run_number']}]({link_info['url']}) ({created_at.strftime('%Y-%m-%d %H:%M')}){pr_text}" |
| ) |
| summary_lines.append("") |
|
|
| if stats["failure_patterns"]: |
| summary_lines.append("## Failure Pattern Analysis") |
| summary_lines.append("") |
| summary_lines.append("| Pattern | Count |") |
| summary_lines.append("|---------|-------|") |
| for pattern, count in sorted( |
| stats["failure_patterns"].items(), key=lambda x: x[1], reverse=True |
| ): |
| summary_lines.append(f"| {pattern} | {count} |") |
| summary_lines.append("") |
|
|
| |
| if stats.get("performance_metrics"): |
| summary_lines.append("## Nightly Test Performance Metrics") |
| summary_lines.append("") |
| summary_lines.append("| Job | Metric | Latest Value | Count | Trend |") |
| summary_lines.append("|-----|--------|--------------|-------|-------|") |
|
|
| for job_name in sorted(stats["performance_metrics"].keys()): |
| job_metrics = stats["performance_metrics"][job_name] |
| for metric_name in sorted(job_metrics.keys()): |
| metric_data = job_metrics[metric_name] |
| if metric_data: |
| |
| values = [m["value"] for m in metric_data] |
| avg_value = sum(values) / len(values) |
| count = len(values) |
|
|
| |
| trend_indicator = "➡️" |
| if len(values) >= 4: |
| first_half = values[: len(values) // 2] |
| second_half = values[len(values) // 2 :] |
| first_avg = sum(first_half) / len(first_half) |
| second_avg = sum(second_half) / len(second_half) |
|
|
| if first_avg > 0: |
| change_pct = ( |
| (second_avg - first_avg) / first_avg |
| ) * 100 |
|
|
| |
| |
| if "throughput" in metric_name.lower(): |
| if change_pct > 10: |
| trend_indicator = f"📈 +{change_pct:.1f}%" |
| elif change_pct < -10: |
| trend_indicator = f"⚠️ 📉 {change_pct:.1f}%" |
| else: |
| trend_indicator = f"➡️ {change_pct:+.1f}%" |
| elif ( |
| "latency" in metric_name.lower() |
| or "ttft" in metric_name.lower() |
| ): |
| if change_pct < -10: |
| trend_indicator = f"📈 {change_pct:.1f}%" |
| elif change_pct > 10: |
| trend_indicator = ( |
| f"⚠️ 📉 +{change_pct:.1f}%" |
| ) |
| else: |
| trend_indicator = f"➡️ {change_pct:+.1f}%" |
| else: |
| trend_indicator = f"➡️ {change_pct:+.1f}%" |
|
|
| summary_lines.append( |
| f"| {job_name} | {metric_name} | {avg_value:.2f} | {count} | {trend_indicator} |" |
| ) |
|
|
| summary_lines.append("") |
|
|
| with open(github_step_summary, "w", encoding="utf-8") as f: |
| f.write("\n".join(summary_lines)) |
| f.write("\n\n---\n\n") |
|
|
| print("GitHub Actions summary generated successfully") |
|
|
| except Exception as e: |
| print(f"Failed to generate GitHub Actions summary: {e}") |
|
|
| def get_nightly_runs(self, days: int = 2) -> List[Dict]: |
| """Get nightly test workflow runs from the last N days""" |
| print(f"Fetching nightly test runs from the last {days} days...") |
|
|
| since_date = (datetime.now() - timedelta(days=days)).isoformat() |
| all_runs = [] |
|
|
| for workflow_file in self.nightly_workflows: |
| print(f" Fetching from {workflow_file}...") |
| page = 1 |
| per_page = 10 |
| workflow_runs = [] |
| max_runs_per_workflow = days * 5 |
|
|
| while len(workflow_runs) < max_runs_per_workflow: |
| url = f"{self.base_url}/repos/{self.repo}/actions/runs" |
| params = { |
| "workflow_id": workflow_file, |
| "per_page": per_page, |
| "page": page, |
| "created": f">={since_date}", |
| } |
|
|
| try: |
| response = self.session.get(url, params=params) |
| response.raise_for_status() |
| data = response.json() |
|
|
| if not data.get("workflow_runs"): |
| break |
|
|
| runs = data["workflow_runs"] |
| workflow_runs.extend(runs) |
|
|
| if len(runs) < per_page: |
| break |
|
|
| page += 1 |
| time.sleep(0.1) |
|
|
| except requests.exceptions.RequestException as e: |
| print(f" Warning: Error fetching from {workflow_file}: {e}") |
| break |
|
|
| print(f" Fetched {len(workflow_runs)} runs from {workflow_file}") |
| all_runs.extend(workflow_runs) |
|
|
| print(f"Total nightly runs fetched: {len(all_runs)}") |
| return all_runs |
|
|
| def get_job_logs(self, job_id: int) -> Optional[str]: |
| """Get logs for a specific job""" |
| url = f"{self.base_url}/repos/{self.repo}/actions/jobs/{job_id}/logs" |
| try: |
| response = self.session.get(url) |
| response.raise_for_status() |
| return response.text |
| except requests.exceptions.RequestException as e: |
| print(f" Warning: Could not fetch logs for job {job_id}: {e}") |
| return None |
|
|
| def parse_metrics_from_logs( |
| self, logs: str, job_name: str |
| ) -> Dict[str, List[float]]: |
| """Parse performance metrics from job logs""" |
| metrics = defaultdict(list) |
|
|
| if not logs: |
| return metrics |
|
|
| for line in logs.split("\n"): |
| for metric_name, pattern in self.perf_patterns.items(): |
| match = pattern.search(line) |
| if match: |
| try: |
| value = float(match.group(1)) |
| metrics[metric_name].append(value) |
| except (ValueError, IndexError): |
| continue |
|
|
| return dict(metrics) |
|
|
| def analyze_nightly_with_metrics(self, runs: List[Dict]) -> Dict: |
| """Analyze nightly test runs including performance metrics""" |
| print("Analyzing nightly test data with performance metrics...") |
|
|
| |
| nightly_jobs = [ |
| |
| "nightly-test-general-1-gpu-runner", |
| "nightly-test-general-4-gpu-h100", |
| "nightly-test-general-8-gpu-h200", |
| "nightly-test-general-8-gpu-h20", |
| "nightly-test-general-8-gpu-b200", |
| "nightly-test-text-accuracy-2-gpu-runner", |
| "nightly-test-text-perf-2-gpu-runner", |
| "nightly-test-vlm-accuracy-2-gpu-runner", |
| "nightly-test-vlm-perf-2-gpu-runner", |
| "nightly-test-perf-4-gpu-b200", |
| "nightly-test-perf-8-gpu-b200", |
| |
| "nightly-test", |
| |
| "placeholder", |
| ] |
|
|
| stats = { |
| "total_runs": len(runs), |
| "successful_runs": 0, |
| "failed_runs": 0, |
| "cancelled_runs": 0, |
| "job_stats": defaultdict( |
| lambda: { |
| "total": 0, |
| "success": 0, |
| "failure": 0, |
| "recent_failures": [], |
| "avg_duration_minutes": 0, |
| "durations": [], |
| "performance_metrics": defaultdict(list), |
| } |
| ), |
| "daily_stats": defaultdict( |
| lambda: { |
| "total": 0, |
| "success": 0, |
| "failure": 0, |
| } |
| ), |
| } |
|
|
| for i, run in enumerate(runs, 1): |
| if i % 10 == 0: |
| print(f"Processed {i}/{len(runs)} runs...") |
|
|
| run_status = run.get("conclusion", "unknown") |
| run_id = run.get("id") |
| run_number = run.get("run_number") |
| created_at = run.get("created_at") |
| run_url = f"https://github.com/{self.repo}/actions/runs/{run_id}" |
|
|
| |
| date_str = created_at.split("T")[0] if created_at else "unknown" |
| stats["daily_stats"][date_str]["total"] += 1 |
|
|
| if run_status == "success": |
| stats["successful_runs"] += 1 |
| stats["daily_stats"][date_str]["success"] += 1 |
| elif run_status == "failure": |
| stats["failed_runs"] += 1 |
| stats["daily_stats"][date_str]["failure"] += 1 |
| elif run_status == "cancelled": |
| stats["cancelled_runs"] += 1 |
|
|
| |
| jobs = self._get_job_details(run_id) |
| for job in jobs: |
| job_name = job.get("name", "Unknown") |
| job_conclusion = job.get("conclusion", "unknown") |
| job_id = job.get("id") |
| started_at = job.get("started_at") |
| completed_at = job.get("completed_at") |
|
|
| |
| if job_name not in nightly_jobs: |
| continue |
|
|
| job_stat = stats["job_stats"][job_name] |
| job_stat["total"] += 1 |
|
|
| if job_conclusion == "success": |
| job_stat["success"] += 1 |
|
|
| |
| if ( |
| "perf" in job_name.lower() |
| or "accuracy" in job_name.lower() |
| or "eval" in job_name.lower() |
| ): |
| logs = self.get_job_logs(job_id) |
| if logs: |
| metrics = self.parse_metrics_from_logs(logs, job_name) |
| for metric_name, values in metrics.items(): |
| if values: |
| job_stat["performance_metrics"][metric_name].extend( |
| [ |
| { |
| "value": v, |
| "timestamp": created_at, |
| "run_id": run_id, |
| "job_name": job_name, |
| } |
| for v in values |
| ] |
| ) |
|
|
| elif job_conclusion == "failure": |
| job_stat["failure"] += 1 |
|
|
| if len(job_stat["recent_failures"]) < 5: |
| job_stat["recent_failures"].append( |
| { |
| "run_url": run_url, |
| "run_number": run_number, |
| "created_at": created_at, |
| "job_url": job.get("html_url"), |
| } |
| ) |
|
|
| |
| if started_at and completed_at: |
| try: |
| start = datetime.fromisoformat( |
| started_at.replace("Z", "+00:00") |
| ) |
| end = datetime.fromisoformat( |
| completed_at.replace("Z", "+00:00") |
| ) |
| duration_minutes = (end - start).total_seconds() / 60 |
| job_stat["durations"].append(duration_minutes) |
| except: |
| pass |
|
|
| time.sleep(0.1) |
|
|
| |
| for job_name, job_stat in stats["job_stats"].items(): |
| if job_stat["durations"]: |
| job_stat["avg_duration_minutes"] = sum(job_stat["durations"]) / len( |
| job_stat["durations"] |
| ) |
| del job_stat["durations"] |
|
|
| return stats |
|
|
| def generate_nightly_report(self, stats: Dict, output_file: str = None): |
| """Generate a report for nightly test analysis""" |
| print("\n" + "=" * 80) |
| print("NIGHTLY TEST MONITOR REPORT") |
| print("=" * 80) |
| print(f"Report Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") |
| print(f"Total Runs Analyzed: {stats['total_runs']}") |
| print( |
| f"Successful: {stats['successful_runs']} " |
| f"({stats['successful_runs']/max(1, stats['total_runs'])*100:.1f}%)" |
| ) |
| print( |
| f"Failed: {stats['failed_runs']} " |
| f"({stats['failed_runs']/max(1, stats['total_runs'])*100:.1f}%)" |
| ) |
| print(f"Cancelled: {stats['cancelled_runs']}") |
| print("=" * 80) |
|
|
| |
| print("\nDAILY TRENDS:") |
| print("-" * 80) |
| daily_stats = sorted(stats["daily_stats"].items(), reverse=True)[:7] |
| for date, day_stats in daily_stats: |
| success_rate = (day_stats["success"] / max(1, day_stats["total"])) * 100 |
| print( |
| f"{date}: {day_stats['total']} runs, {day_stats['success']} success " |
| f"({success_rate:.1f}%), {day_stats['failure']} failed" |
| ) |
|
|
| |
| print("\nJOB STATISTICS:") |
| print("-" * 80) |
| print( |
| f"{'Job Name':<50} {'Total':<8} {'Success':<8} {'Failed':<8} " |
| f"{'Rate':<8} {'Avg Duration'}" |
| ) |
| print("-" * 80) |
|
|
| job_stats_sorted = sorted( |
| stats["job_stats"].items(), key=lambda x: x[1]["failure"], reverse=True |
| ) |
|
|
| for job_name, job_stat in job_stats_sorted: |
| total = job_stat["total"] |
| success = job_stat["success"] |
| failure = job_stat["failure"] |
| success_rate = (success / max(1, total)) * 100 |
| avg_duration = job_stat["avg_duration_minutes"] |
|
|
| print( |
| f"{job_name:<50} {total:<8} {success:<8} {failure:<8} " |
| f"{success_rate:>6.1f}% {avg_duration:>7.1f}m" |
| ) |
|
|
| |
| if job_stat.get("performance_metrics"): |
| perf_metrics = job_stat["performance_metrics"] |
| print(f" Performance metrics:") |
|
|
| for metric_name, metric_data in perf_metrics.items(): |
| if metric_data: |
| values = [m["value"] for m in metric_data] |
| avg_value = sum(values) / len(values) |
| print(f" - {metric_name}: {avg_value:.2f} (n={len(values)})") |
|
|
| |
| if job_stat["recent_failures"]: |
| print(f" Recent failures:") |
| for failure in job_stat["recent_failures"][:3]: |
| print(f" - Run #{failure['run_number']}: {failure['run_url']}") |
|
|
| print("=" * 80) |
|
|
| |
| if output_file: |
| with open(output_file, "w") as f: |
| json.dump(stats, f, indent=2, default=str) |
| print(f"\nDetailed stats saved to: {output_file}") |
|
|
| def generate_nightly_github_summary(self, stats: Dict): |
| """Generate GitHub Actions summary for nightly test analysis""" |
| try: |
| github_step_summary = os.environ.get("GITHUB_STEP_SUMMARY") |
| if not github_step_summary: |
| print( |
| "Not running in GitHub Actions, skipping nightly summary generation" |
| ) |
| return |
|
|
| print("Generating GitHub Actions summary for Nightly Analysis...") |
|
|
| summary_lines = [] |
| summary_lines.append("# Nightly Test Monitor Report") |
| summary_lines.append("") |
| summary_lines.append( |
| f"**Report Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" |
| ) |
| summary_lines.append("") |
|
|
| |
| total = stats["total_runs"] |
| success = stats["successful_runs"] |
| failed = stats["failed_runs"] |
| cancelled = stats["cancelled_runs"] |
|
|
| summary_lines.append("## Overall Statistics") |
| summary_lines.append("") |
| summary_lines.append("| Metric | Count | Percentage |") |
| summary_lines.append("|--------|-------|------------|") |
| summary_lines.append(f"| Total Runs | {total} | 100% |") |
| summary_lines.append( |
| f"| Successful | {success} | {success/max(1,total)*100:.1f}% |" |
| ) |
| summary_lines.append( |
| f"| Failed | {failed} | {failed/max(1,total)*100:.1f}% |" |
| ) |
| summary_lines.append( |
| f"| Cancelled | {cancelled} | {cancelled/max(1,total)*100:.1f}% |" |
| ) |
| summary_lines.append("") |
|
|
| |
| summary_lines.append("## Daily Trends") |
| summary_lines.append("") |
| summary_lines.append( |
| "| Date | Total Runs | Success | Failed | Success Rate |" |
| ) |
| summary_lines.append( |
| "|------|------------|---------|--------|--------------|" |
| ) |
|
|
| daily_stats = sorted(stats["daily_stats"].items(), reverse=True)[:7] |
| for date, day_stats in daily_stats: |
| success_rate = (day_stats["success"] / max(1, day_stats["total"])) * 100 |
| summary_lines.append( |
| f"| {date} | {day_stats['total']} | {day_stats['success']} | " |
| f"{day_stats['failure']} | {success_rate:.1f}% |" |
| ) |
| summary_lines.append("") |
|
|
| |
| if stats["job_stats"]: |
| summary_lines.append("## Job Statistics") |
| summary_lines.append("") |
|
|
| job_stats_sorted = sorted( |
| stats["job_stats"].items(), |
| key=lambda x: x[1]["failure"], |
| reverse=True, |
| ) |
|
|
| for job_name, job_stat in job_stats_sorted: |
| total_job = job_stat["total"] |
| success_job = job_stat["success"] |
| failure_job = job_stat["failure"] |
| success_rate_job = (success_job / max(1, total_job)) * 100 |
| avg_duration = job_stat["avg_duration_minutes"] |
|
|
| summary_lines.append(f"### {job_name}") |
| summary_lines.append("") |
| summary_lines.append( |
| f"**Stats:** {total_job} runs | {success_job} success ({success_rate_job:.1f}%) | " |
| f"{failure_job} failed | Avg duration: {avg_duration:.1f}m" |
| ) |
| summary_lines.append("") |
|
|
| |
| if job_stat.get("performance_metrics"): |
| summary_lines.append("**Performance Metrics:**") |
| summary_lines.append("") |
| summary_lines.append("| Metric | Avg Value | Samples |") |
| summary_lines.append("|--------|-----------|---------|") |
|
|
| for metric_name, metric_data in job_stat[ |
| "performance_metrics" |
| ].items(): |
| if metric_data: |
| values = [m["value"] for m in metric_data] |
| avg_value = sum(values) / len(values) |
| summary_lines.append( |
| f"| {metric_name} | {avg_value:.2f} | {len(values)} |" |
| ) |
| summary_lines.append("") |
|
|
| |
| if job_stat["recent_failures"]: |
| summary_lines.append("**Recent Failures:**") |
| for failure in job_stat["recent_failures"][:3]: |
| summary_lines.append( |
| f"- [Run #{failure['run_number']}]({failure['run_url']})" |
| ) |
| summary_lines.append("") |
|
|
| with open(github_step_summary, "a", encoding="utf-8") as f: |
| f.write("\n".join(summary_lines)) |
| f.write("\n\n---\n\n") |
|
|
| print("GitHub Actions nightly summary generated successfully") |
|
|
| except Exception as e: |
| print(f"Failed to generate nightly GitHub Actions summary: {e}") |
|
|
| def detect_nightly_regressions(self, stats: Dict) -> List[Dict]: |
| """Detect regressions in nightly tests""" |
| regressions = [] |
|
|
| for job_name, job_stat in stats["job_stats"].items(): |
| total = job_stat["total"] |
| failure = job_stat["failure"] |
|
|
| if total > 0: |
| failure_rate = (failure / total) * 100 |
|
|
| |
| if failure_rate > 30: |
| regressions.append( |
| { |
| "job_name": job_name, |
| "type": "high_failure_rate", |
| "failure_rate": failure_rate, |
| "total_runs": total, |
| "failures": failure, |
| } |
| ) |
|
|
| |
| recent_failures = len(job_stat["recent_failures"]) |
| if recent_failures >= 3: |
| regressions.append( |
| { |
| "job_name": job_name, |
| "type": "consecutive_failures", |
| "recent_failure_count": recent_failures, |
| } |
| ) |
|
|
| if regressions: |
| print("\n" + "=" * 80) |
| print("REGRESSIONS DETECTED:") |
| print("=" * 80) |
| for regression in regressions: |
| print(f"\nJob: {regression['job_name']}") |
| if regression["type"] == "high_failure_rate": |
| print( |
| f" High failure rate: {regression['failure_rate']:.1f}% " |
| f"({regression['failures']}/{regression['total_runs']})" |
| ) |
| elif regression["type"] == "consecutive_failures": |
| print( |
| f" {regression['recent_failure_count']} recent consecutive failures" |
| ) |
| print("=" * 80) |
|
|
| return regressions |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="SGLang CI Analyzer") |
| parser.add_argument("--token", required=True, help="GitHub Personal Access Token") |
| parser.add_argument( |
| "--mode", |
| choices=["ci", "nightly"], |
| default="ci", |
| help="Analysis mode: 'ci' for general CI analysis, 'nightly' for nightly test monitoring (default: ci)", |
| ) |
| parser.add_argument( |
| "--limit", |
| type=int, |
| default=100, |
| help="Number of runs to analyze (for ci mode, default: 100)", |
| ) |
| parser.add_argument( |
| "--days", |
| type=int, |
| default=2, |
| help="Number of days to analyze (for nightly mode, default: 2)", |
| ) |
| parser.add_argument( |
| "--output", |
| help="Output file for detailed stats (JSON)", |
| ) |
| parser.add_argument( |
| "--branch", |
| default=None, |
| help="Filter runs by branch (default: None - all branches). Specify branch name to filter.", |
| ) |
|
|
| args = parser.parse_args() |
|
|
| analyzer = SGLangCIAnalyzer(args.token) |
|
|
| try: |
| if args.mode == "nightly": |
| |
| runs = analyzer.get_nightly_runs(days=args.days) |
|
|
| if not runs: |
| print("No nightly test runs found in the specified time period.") |
| sys.exit(1) |
|
|
| stats = analyzer.analyze_nightly_with_metrics(runs) |
| analyzer.generate_nightly_report(stats, args.output) |
| analyzer.generate_nightly_github_summary(stats) |
| regressions = analyzer.detect_nightly_regressions(stats) |
|
|
| |
| if regressions: |
| print("\n⚠️ Regressions detected - see report above") |
| else: |
| print("\n✓ No significant regressions detected") |
| sys.exit(0) |
|
|
| else: |
| |
| branch = args.branch if args.branch else None |
| runs = analyzer.get_recent_runs(args.limit, branch) |
|
|
| if not runs: |
| print("No CI run data found") |
| return |
|
|
| stats = analyzer.analyze_ci_failures(runs) |
| analyzer.generate_report(stats) |
|
|
| output_file = args.output or "ci_analysis.json" |
| analyzer.save_detailed_report(stats, output_file) |
| analyzer.generate_github_summary(stats) |
|
|
| except Exception as e: |
| print(f"Error during analysis: {e}") |
| sys.exit(1) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|