Hanrui / sglang /scripts /ci_monitor /post_ci_failures_to_slack.py
Lekr0's picture
Add files using upload-large-folder tool
61ba51e verified
#!/usr/bin/env python3
"""
Post CI failure analysis results to Slack.
This is a standalone script that doesn't depend on sglang package installation.
"""
import argparse
import json
import logging
import os
import sys
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def post_ci_failures_to_slack(report_file: str) -> bool:
"""
Post CI failure report to Slack with threaded details.
Creates a parent message with summary (workflow: job1, job2, ...)
and a threaded reply with detailed failure information.
Args:
report_file: Path to JSON file containing failure analysis from ci_failures_analysis.py
Returns:
bool: True if successful, False otherwise
"""
try:
from slack_sdk import WebClient
token = os.environ.get("SGLANG_DIFFUSION_SLACK_TOKEN")
if not token:
logger.info("Slack post failed: no token")
return False
# CI failures channel
channel_id = "C0A2DG0R7CJ"
# Get GitHub run ID for linking to the workflow run
run_id = os.environ.get("GITHUB_RUN_ID", "")
# Load report data
with open(report_file, "r") as f:
report_data = json.load(f)
client = WebClient(token=token)
# Parse the real JSON structure
# The JSON has workflow sections like "pr_test_nvidia_scheduled_data", "nightly_scheduled_data"
# Each section contains jobs with their stats including "current_streak"
critical_failures = []
# Map workflow data keys to display names and hardware category
# Format: (display_name, hardware, test_type_order)
# test_type_order: 0 = PR Test, 1 = Nightly (so PR Test comes first)
workflow_info_map = {
# Nvidia
"pr_test_nvidia_scheduled_data": ("PR Test", "Nvidia", 0),
"nightly_nvidia_scheduled_data": ("Nightly", "Nvidia", 1),
# AMD
"pr_test_amd_scheduled_data": ("PR Test", "AMD", 0),
"nightly_amd_scheduled_data": ("Nightly", "AMD", 1),
# Intel/Xeon
"pr_test_xeon_scheduled_data": ("PR Test", "Intel", 0),
"nightly_intel_scheduled_data": ("Nightly", "Intel", 1),
# XPU
"pr_test_xpu_scheduled_data": ("PR Test", "XPU", 0),
# NPU
"pr_test_npu_scheduled_data": ("PR Test", "NPU", 0),
"nightly_npu_scheduled_data": ("Nightly", "NPU", 1),
}
# Hardware priority order (Nvidia first)
hardware_order = ["Nvidia", "AMD", "Intel", "XPU", "NPU"]
# Iterate through each workflow section
for workflow_key, workflow_data in report_data.items():
# Skip non-workflow keys (summary, limits, etc.)
if not isinstance(workflow_data, dict) or not any(
isinstance(v, dict) and "current_streak" in v
for v in workflow_data.values()
):
continue
# Only process scheduled workflows that are in our map
if workflow_key not in workflow_info_map:
continue
test_type, hardware, test_order = workflow_info_map[workflow_key]
# Check each job in this workflow
for job_name, job_data in workflow_data.items():
if not isinstance(job_data, dict):
continue
current_streak = job_data.get("current_streak", 0)
# Filter for jobs with streak >= 2
if current_streak >= 2:
first_failure = job_data.get("first_failure_in_streak", {})
last_failure = job_data.get("last_failure_in_streak", {})
critical_failures.append(
{
"hardware": hardware,
"test_type": test_type,
"test_order": test_order,
"job_name": job_name,
"consecutive_failures": current_streak,
"first_failed_at": (
first_failure.get("created_at", "unknown")
if first_failure
else "unknown"
),
"first_failed_url": (
first_failure.get("job_url", "")
if first_failure
else ""
),
"last_failed_at": (
last_failure.get("created_at", "unknown")
if last_failure
else "unknown"
),
"last_failed_url": (
last_failure.get("job_url", "") if last_failure else ""
),
}
)
# Group by hardware, then by test type
# Structure: {hardware: {test_type: [job_names]}}
hardware_jobs = {}
for job in critical_failures:
hardware = job.get("hardware", "Unknown")
test_type = job.get("test_type", "Unknown")
job_name = job.get("job_name", "unknown")
if hardware not in hardware_jobs:
hardware_jobs[hardware] = {}
if test_type not in hardware_jobs[hardware]:
hardware_jobs[hardware][test_type] = []
hardware_jobs[hardware][test_type].append(job_name)
# Create summary message
workflow_url = ""
if run_id:
workflow_url = (
f"https://github.com/sgl-project/sglang/actions/runs/{run_id}"
)
if not hardware_jobs:
summary = "βœ… No critical failures detected in scheduled runs"
if workflow_url:
summary += f"\n<{workflow_url}|View CI Monitor Run>"
color = "good"
else:
# Ping relevant people when there are failures
mentions = "<@U09R55D8EAY> <@U09ABMCKQPM>"
summary_lines = [f"{mentions} 🚨 *CI Critical Failures (Scheduled Runs)*"]
# Iterate in hardware priority order, with PR Test before Nightly
test_type_order = ["PR Test", "Nightly"]
for hardware in hardware_order:
if hardware not in hardware_jobs:
continue
summary_lines.append(f"\n*{hardware}:*")
for test_type in test_type_order:
if test_type not in hardware_jobs[hardware]:
continue
jobs = hardware_jobs[hardware][test_type]
job_list = ", ".join(jobs)
summary_lines.append(f" β€’ {test_type}: {job_list}")
if workflow_url:
summary_lines.append(f"\n<{workflow_url}|View Full CI Monitor Report>")
summary = "\n".join(summary_lines)
color = "danger"
# Post parent message
response = client.chat_postMessage(
channel=channel_id,
text=summary,
attachments=[
{
"color": color,
"footer": "SGLang CI Monitor",
"footer_icon": "https://github.githubassets.com/images/modules/logos_page/GitHub-Mark.png",
"ts": int(datetime.now().timestamp()),
}
],
)
thread_ts = response["ts"]
# If there are failures, post detailed breakdown in thread
if hardware_jobs:
details_lines = ["*Detailed Failure Breakdown*\n"]
# Sort critical_failures by hardware order, then test_order
hardware_order_map = {hw: i for i, hw in enumerate(hardware_order)}
sorted_failures = sorted(
critical_failures,
key=lambda x: (
hardware_order_map.get(x.get("hardware", ""), 99),
x.get("test_order", 99),
x.get("job_name", ""),
),
)
current_hardware = None
for job in sorted_failures:
hardware = job.get("hardware", "Unknown")
test_type = job.get("test_type", "Unknown")
job_name = job.get("job_name", "unknown")
consecutive = job.get("consecutive_failures", 0)
first_url = job.get("first_failed_url", "")
first_at = job.get("first_failed_at", "unknown")
last_url = job.get("last_failed_url", "")
last_at = job.get("last_failed_at", "unknown")
# Add hardware section header
if hardware != current_hardware:
details_lines.append(f"\n*━━━ {hardware} ━━━*")
current_hardware = hardware
details_lines.append(
f"β€’ *{test_type}* β†’ `{job_name}`\n"
f" Consecutive failures: {consecutive}\n"
f" First failed: <{first_url}|{first_at}>\n"
f" Last failed: <{last_url}|{last_at}>\n"
)
details_text = "\n".join(details_lines)
client.chat_postMessage(
channel=channel_id,
thread_ts=thread_ts,
text=details_text,
)
logger.info("CI failure report posted to Slack successfully")
return True
except Exception as e:
logger.error(f"Failed to post CI failures to Slack: {e}")
return False
def main():
parser = argparse.ArgumentParser(
description="Post CI failure analysis results to Slack"
)
parser.add_argument(
"--report-file",
type=str,
required=True,
help="Path to CI failure analysis JSON report",
)
args = parser.parse_args()
success = post_ci_failures_to_slack(args.report_file)
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()