| |
| """ |
| Post CI failure analysis results to Slack. |
| |
| This is a standalone script that doesn't depend on sglang package installation. |
| """ |
|
|
| import argparse |
| import json |
| import logging |
| import os |
| import sys |
| from datetime import datetime |
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| def post_ci_failures_to_slack(report_file: str) -> bool: |
| """ |
| Post CI failure report to Slack with threaded details. |
| |
| Creates a parent message with summary (workflow: job1, job2, ...) |
| and a threaded reply with detailed failure information. |
| |
| Args: |
| report_file: Path to JSON file containing failure analysis from ci_failures_analysis.py |
| |
| Returns: |
| bool: True if successful, False otherwise |
| """ |
| try: |
| from slack_sdk import WebClient |
|
|
| token = os.environ.get("SGLANG_DIFFUSION_SLACK_TOKEN") |
| if not token: |
| logger.info("Slack post failed: no token") |
| return False |
|
|
| |
| channel_id = "C0A2DG0R7CJ" |
|
|
| |
| run_id = os.environ.get("GITHUB_RUN_ID", "") |
|
|
| |
| with open(report_file, "r") as f: |
| report_data = json.load(f) |
|
|
| client = WebClient(token=token) |
|
|
| |
| |
| |
|
|
| critical_failures = [] |
|
|
| |
| |
| |
| workflow_info_map = { |
| |
| "pr_test_nvidia_scheduled_data": ("PR Test", "Nvidia", 0), |
| "nightly_nvidia_scheduled_data": ("Nightly", "Nvidia", 1), |
| |
| "pr_test_amd_scheduled_data": ("PR Test", "AMD", 0), |
| "nightly_amd_scheduled_data": ("Nightly", "AMD", 1), |
| |
| "pr_test_xeon_scheduled_data": ("PR Test", "Intel", 0), |
| "nightly_intel_scheduled_data": ("Nightly", "Intel", 1), |
| |
| "pr_test_xpu_scheduled_data": ("PR Test", "XPU", 0), |
| |
| "pr_test_npu_scheduled_data": ("PR Test", "NPU", 0), |
| "nightly_npu_scheduled_data": ("Nightly", "NPU", 1), |
| } |
|
|
| |
| hardware_order = ["Nvidia", "AMD", "Intel", "XPU", "NPU"] |
|
|
| |
| for workflow_key, workflow_data in report_data.items(): |
| |
| if not isinstance(workflow_data, dict) or not any( |
| isinstance(v, dict) and "current_streak" in v |
| for v in workflow_data.values() |
| ): |
| continue |
|
|
| |
| if workflow_key not in workflow_info_map: |
| continue |
|
|
| test_type, hardware, test_order = workflow_info_map[workflow_key] |
|
|
| |
| for job_name, job_data in workflow_data.items(): |
| if not isinstance(job_data, dict): |
| continue |
|
|
| current_streak = job_data.get("current_streak", 0) |
|
|
| |
| if current_streak >= 2: |
| first_failure = job_data.get("first_failure_in_streak", {}) |
| last_failure = job_data.get("last_failure_in_streak", {}) |
|
|
| critical_failures.append( |
| { |
| "hardware": hardware, |
| "test_type": test_type, |
| "test_order": test_order, |
| "job_name": job_name, |
| "consecutive_failures": current_streak, |
| "first_failed_at": ( |
| first_failure.get("created_at", "unknown") |
| if first_failure |
| else "unknown" |
| ), |
| "first_failed_url": ( |
| first_failure.get("job_url", "") |
| if first_failure |
| else "" |
| ), |
| "last_failed_at": ( |
| last_failure.get("created_at", "unknown") |
| if last_failure |
| else "unknown" |
| ), |
| "last_failed_url": ( |
| last_failure.get("job_url", "") if last_failure else "" |
| ), |
| } |
| ) |
|
|
| |
| |
| hardware_jobs = {} |
| for job in critical_failures: |
| hardware = job.get("hardware", "Unknown") |
| test_type = job.get("test_type", "Unknown") |
| job_name = job.get("job_name", "unknown") |
| if hardware not in hardware_jobs: |
| hardware_jobs[hardware] = {} |
| if test_type not in hardware_jobs[hardware]: |
| hardware_jobs[hardware][test_type] = [] |
| hardware_jobs[hardware][test_type].append(job_name) |
|
|
| |
| workflow_url = "" |
| if run_id: |
| workflow_url = ( |
| f"https://github.com/sgl-project/sglang/actions/runs/{run_id}" |
| ) |
|
|
| if not hardware_jobs: |
| summary = "β
No critical failures detected in scheduled runs" |
| if workflow_url: |
| summary += f"\n<{workflow_url}|View CI Monitor Run>" |
| color = "good" |
| else: |
| |
| mentions = "<@U09R55D8EAY> <@U09ABMCKQPM>" |
| summary_lines = [f"{mentions} π¨ *CI Critical Failures (Scheduled Runs)*"] |
|
|
| |
| test_type_order = ["PR Test", "Nightly"] |
| for hardware in hardware_order: |
| if hardware not in hardware_jobs: |
| continue |
| summary_lines.append(f"\n*{hardware}:*") |
| for test_type in test_type_order: |
| if test_type not in hardware_jobs[hardware]: |
| continue |
| jobs = hardware_jobs[hardware][test_type] |
| job_list = ", ".join(jobs) |
| summary_lines.append(f" β’ {test_type}: {job_list}") |
|
|
| if workflow_url: |
| summary_lines.append(f"\n<{workflow_url}|View Full CI Monitor Report>") |
| summary = "\n".join(summary_lines) |
| color = "danger" |
|
|
| |
| response = client.chat_postMessage( |
| channel=channel_id, |
| text=summary, |
| attachments=[ |
| { |
| "color": color, |
| "footer": "SGLang CI Monitor", |
| "footer_icon": "https://github.githubassets.com/images/modules/logos_page/GitHub-Mark.png", |
| "ts": int(datetime.now().timestamp()), |
| } |
| ], |
| ) |
|
|
| thread_ts = response["ts"] |
|
|
| |
| if hardware_jobs: |
| details_lines = ["*Detailed Failure Breakdown*\n"] |
|
|
| |
| hardware_order_map = {hw: i for i, hw in enumerate(hardware_order)} |
| sorted_failures = sorted( |
| critical_failures, |
| key=lambda x: ( |
| hardware_order_map.get(x.get("hardware", ""), 99), |
| x.get("test_order", 99), |
| x.get("job_name", ""), |
| ), |
| ) |
|
|
| current_hardware = None |
| for job in sorted_failures: |
| hardware = job.get("hardware", "Unknown") |
| test_type = job.get("test_type", "Unknown") |
| job_name = job.get("job_name", "unknown") |
| consecutive = job.get("consecutive_failures", 0) |
| first_url = job.get("first_failed_url", "") |
| first_at = job.get("first_failed_at", "unknown") |
| last_url = job.get("last_failed_url", "") |
| last_at = job.get("last_failed_at", "unknown") |
|
|
| |
| if hardware != current_hardware: |
| details_lines.append(f"\n*βββ {hardware} βββ*") |
| current_hardware = hardware |
|
|
| details_lines.append( |
| f"β’ *{test_type}* β `{job_name}`\n" |
| f" Consecutive failures: {consecutive}\n" |
| f" First failed: <{first_url}|{first_at}>\n" |
| f" Last failed: <{last_url}|{last_at}>\n" |
| ) |
|
|
| details_text = "\n".join(details_lines) |
|
|
| client.chat_postMessage( |
| channel=channel_id, |
| thread_ts=thread_ts, |
| text=details_text, |
| ) |
|
|
| logger.info("CI failure report posted to Slack successfully") |
| return True |
|
|
| except Exception as e: |
| logger.error(f"Failed to post CI failures to Slack: {e}") |
| return False |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser( |
| description="Post CI failure analysis results to Slack" |
| ) |
| parser.add_argument( |
| "--report-file", |
| type=str, |
| required=True, |
| help="Path to CI failure analysis JSON report", |
| ) |
|
|
| args = parser.parse_args() |
|
|
| success = post_ci_failures_to_slack(args.report_file) |
| sys.exit(0 if success else 1) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|