File size: 10,221 Bytes
61ba51e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 | #!/usr/bin/env python3
"""
Post CI failure analysis results to Slack.
This is a standalone script that doesn't depend on sglang package installation.
"""
import argparse
import json
import logging
import os
import sys
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def post_ci_failures_to_slack(report_file: str) -> bool:
"""
Post CI failure report to Slack with threaded details.
Creates a parent message with summary (workflow: job1, job2, ...)
and a threaded reply with detailed failure information.
Args:
report_file: Path to JSON file containing failure analysis from ci_failures_analysis.py
Returns:
bool: True if successful, False otherwise
"""
try:
from slack_sdk import WebClient
token = os.environ.get("SGLANG_DIFFUSION_SLACK_TOKEN")
if not token:
logger.info("Slack post failed: no token")
return False
# CI failures channel
channel_id = "C0A2DG0R7CJ"
# Get GitHub run ID for linking to the workflow run
run_id = os.environ.get("GITHUB_RUN_ID", "")
# Load report data
with open(report_file, "r") as f:
report_data = json.load(f)
client = WebClient(token=token)
# Parse the real JSON structure
# The JSON has workflow sections like "pr_test_nvidia_scheduled_data", "nightly_scheduled_data"
# Each section contains jobs with their stats including "current_streak"
critical_failures = []
# Map workflow data keys to display names and hardware category
# Format: (display_name, hardware, test_type_order)
# test_type_order: 0 = PR Test, 1 = Nightly (so PR Test comes first)
workflow_info_map = {
# Nvidia
"pr_test_nvidia_scheduled_data": ("PR Test", "Nvidia", 0),
"nightly_nvidia_scheduled_data": ("Nightly", "Nvidia", 1),
# AMD
"pr_test_amd_scheduled_data": ("PR Test", "AMD", 0),
"nightly_amd_scheduled_data": ("Nightly", "AMD", 1),
# Intel/Xeon
"pr_test_xeon_scheduled_data": ("PR Test", "Intel", 0),
"nightly_intel_scheduled_data": ("Nightly", "Intel", 1),
# XPU
"pr_test_xpu_scheduled_data": ("PR Test", "XPU", 0),
# NPU
"pr_test_npu_scheduled_data": ("PR Test", "NPU", 0),
"nightly_npu_scheduled_data": ("Nightly", "NPU", 1),
}
# Hardware priority order (Nvidia first)
hardware_order = ["Nvidia", "AMD", "Intel", "XPU", "NPU"]
# Iterate through each workflow section
for workflow_key, workflow_data in report_data.items():
# Skip non-workflow keys (summary, limits, etc.)
if not isinstance(workflow_data, dict) or not any(
isinstance(v, dict) and "current_streak" in v
for v in workflow_data.values()
):
continue
# Only process scheduled workflows that are in our map
if workflow_key not in workflow_info_map:
continue
test_type, hardware, test_order = workflow_info_map[workflow_key]
# Check each job in this workflow
for job_name, job_data in workflow_data.items():
if not isinstance(job_data, dict):
continue
current_streak = job_data.get("current_streak", 0)
# Filter for jobs with streak >= 2
if current_streak >= 2:
first_failure = job_data.get("first_failure_in_streak", {})
last_failure = job_data.get("last_failure_in_streak", {})
critical_failures.append(
{
"hardware": hardware,
"test_type": test_type,
"test_order": test_order,
"job_name": job_name,
"consecutive_failures": current_streak,
"first_failed_at": (
first_failure.get("created_at", "unknown")
if first_failure
else "unknown"
),
"first_failed_url": (
first_failure.get("job_url", "")
if first_failure
else ""
),
"last_failed_at": (
last_failure.get("created_at", "unknown")
if last_failure
else "unknown"
),
"last_failed_url": (
last_failure.get("job_url", "") if last_failure else ""
),
}
)
# Group by hardware, then by test type
# Structure: {hardware: {test_type: [job_names]}}
hardware_jobs = {}
for job in critical_failures:
hardware = job.get("hardware", "Unknown")
test_type = job.get("test_type", "Unknown")
job_name = job.get("job_name", "unknown")
if hardware not in hardware_jobs:
hardware_jobs[hardware] = {}
if test_type not in hardware_jobs[hardware]:
hardware_jobs[hardware][test_type] = []
hardware_jobs[hardware][test_type].append(job_name)
# Create summary message
workflow_url = ""
if run_id:
workflow_url = (
f"https://github.com/sgl-project/sglang/actions/runs/{run_id}"
)
if not hardware_jobs:
summary = "β
No critical failures detected in scheduled runs"
if workflow_url:
summary += f"\n<{workflow_url}|View CI Monitor Run>"
color = "good"
else:
# Ping relevant people when there are failures
mentions = "<@U09R55D8EAY> <@U09ABMCKQPM>"
summary_lines = [f"{mentions} π¨ *CI Critical Failures (Scheduled Runs)*"]
# Iterate in hardware priority order, with PR Test before Nightly
test_type_order = ["PR Test", "Nightly"]
for hardware in hardware_order:
if hardware not in hardware_jobs:
continue
summary_lines.append(f"\n*{hardware}:*")
for test_type in test_type_order:
if test_type not in hardware_jobs[hardware]:
continue
jobs = hardware_jobs[hardware][test_type]
job_list = ", ".join(jobs)
summary_lines.append(f" β’ {test_type}: {job_list}")
if workflow_url:
summary_lines.append(f"\n<{workflow_url}|View Full CI Monitor Report>")
summary = "\n".join(summary_lines)
color = "danger"
# Post parent message
response = client.chat_postMessage(
channel=channel_id,
text=summary,
attachments=[
{
"color": color,
"footer": "SGLang CI Monitor",
"footer_icon": "https://github.githubassets.com/images/modules/logos_page/GitHub-Mark.png",
"ts": int(datetime.now().timestamp()),
}
],
)
thread_ts = response["ts"]
# If there are failures, post detailed breakdown in thread
if hardware_jobs:
details_lines = ["*Detailed Failure Breakdown*\n"]
# Sort critical_failures by hardware order, then test_order
hardware_order_map = {hw: i for i, hw in enumerate(hardware_order)}
sorted_failures = sorted(
critical_failures,
key=lambda x: (
hardware_order_map.get(x.get("hardware", ""), 99),
x.get("test_order", 99),
x.get("job_name", ""),
),
)
current_hardware = None
for job in sorted_failures:
hardware = job.get("hardware", "Unknown")
test_type = job.get("test_type", "Unknown")
job_name = job.get("job_name", "unknown")
consecutive = job.get("consecutive_failures", 0)
first_url = job.get("first_failed_url", "")
first_at = job.get("first_failed_at", "unknown")
last_url = job.get("last_failed_url", "")
last_at = job.get("last_failed_at", "unknown")
# Add hardware section header
if hardware != current_hardware:
details_lines.append(f"\n*βββ {hardware} βββ*")
current_hardware = hardware
details_lines.append(
f"β’ *{test_type}* β `{job_name}`\n"
f" Consecutive failures: {consecutive}\n"
f" First failed: <{first_url}|{first_at}>\n"
f" Last failed: <{last_url}|{last_at}>\n"
)
details_text = "\n".join(details_lines)
client.chat_postMessage(
channel=channel_id,
thread_ts=thread_ts,
text=details_text,
)
logger.info("CI failure report posted to Slack successfully")
return True
except Exception as e:
logger.error(f"Failed to post CI failures to Slack: {e}")
return False
def main():
parser = argparse.ArgumentParser(
description="Post CI failure analysis results to Slack"
)
parser.add_argument(
"--report-file",
type=str,
required=True,
help="Path to CI failure analysis JSON report",
)
args = parser.parse_args()
success = post_ci_failures_to_slack(args.report_file)
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()
|