| | |
| | import argparse |
| | import glob |
| | import os |
| | import re |
| | from datetime import date, datetime |
| |
|
| | from slack_sdk import WebClient |
| | from tabulate import tabulate |
| |
|
| |
|
| | MAX_LEN_MESSAGE = 3001 |
| |
|
| | parser = argparse.ArgumentParser() |
| | parser.add_argument("--slack_channel_name", default="diffusers-ci-nightly") |
| | parser.add_argument( |
| | "--reports_dir", |
| | default="reports", |
| | help="Directory containing test reports (will search recursively in all subdirectories)", |
| | ) |
| | parser.add_argument("--output_file", default=None, help="Path to save the consolidated report (markdown format)") |
| |
|
| |
|
| | def parse_stats_file(file_path): |
| | """Parse a stats file to extract test statistics.""" |
| | try: |
| | with open(file_path, "r") as f: |
| | content = f.read() |
| |
|
| | |
| | tests_pattern = r"collected (\d+) items" |
| | passed_pattern = r"(\d+) passed" |
| | failed_pattern = r"(\d+) failed" |
| | skipped_pattern = r"(\d+) skipped" |
| | xpassed_pattern = r"(\d+) xpassed" |
| |
|
| | tests_match = re.search(tests_pattern, content) |
| | passed_match = re.search(passed_pattern, content) |
| | failed_match = re.search(failed_pattern, content) |
| | skipped_match = re.search(skipped_pattern, content) |
| | xpassed_match = re.search(xpassed_pattern, content) |
| |
|
| | passed = int(passed_match.group(1)) if passed_match else 0 |
| | failed = int(failed_match.group(1)) if failed_match else 0 |
| | skipped = int(skipped_match.group(1)) if skipped_match else 0 |
| | xpassed = int(xpassed_match.group(1)) if xpassed_match else 0 |
| |
|
| | |
| | if tests_match: |
| | tests = int(tests_match.group(1)) |
| | else: |
| | tests = passed + failed + skipped + xpassed |
| |
|
| | |
| | timing_pattern = r"slowest \d+ test durations[\s\S]*?\n([\s\S]*?)={70}" |
| | timing_match = re.search(timing_pattern, content, re.MULTILINE) |
| | slowest_tests = [] |
| |
|
| | if timing_match: |
| | timing_text = timing_match.group(1).strip() |
| | test_timing_lines = timing_text.split("\n") |
| | for line in test_timing_lines: |
| | if line.strip(): |
| | |
| | parts = line.strip().split() |
| | if len(parts) >= 3: |
| | time_str = parts[0] |
| | test_path = " ".join(parts[2:]) |
| |
|
| | |
| | if "secs were omitted" in test_path: |
| | continue |
| |
|
| | try: |
| | time_seconds = float(time_str.rstrip("s")) |
| | slowest_tests.append({"test": test_path, "duration": time_seconds}) |
| | except ValueError: |
| | pass |
| |
|
| | return { |
| | "tests": tests, |
| | "passed": passed, |
| | "failed": failed, |
| | "skipped": skipped, |
| | "slowest_tests": slowest_tests, |
| | } |
| | except Exception as e: |
| | print(f"Error parsing {file_path}: {e}") |
| | return {"tests": 0, "passed": 0, "failed": 0, "skipped": 0, "slowest_tests": []} |
| |
|
| |
|
| | def parse_durations_file(file_path): |
| | """Parse a durations file to extract test timing information.""" |
| | slowest_tests = [] |
| | try: |
| | durations_file = file_path.replace("_stats.txt", "_durations.txt") |
| | if os.path.exists(durations_file): |
| | with open(durations_file, "r") as f: |
| | content = f.read() |
| |
|
| | |
| | for line in content.split("\n")[1:]: |
| | if line.strip(): |
| | |
| | parts = line.strip().split() |
| | if len(parts) >= 3: |
| | time_str = parts[0] |
| | test_path = " ".join(parts[2:]) |
| |
|
| | |
| | if "secs were omitted" in test_path: |
| | continue |
| |
|
| | try: |
| | time_seconds = float(time_str.rstrip("s")) |
| | slowest_tests.append({"test": test_path, "duration": time_seconds}) |
| | except ValueError: |
| | |
| | |
| | if test_path.startswith("<") and "secs were omitted" in test_path: |
| | |
| | try: |
| | |
| | dur_match = re.search(r"(\d+(?:\.\d+)?)", test_path) |
| | if dur_match: |
| | time_seconds = float(dur_match.group(1)) |
| | slowest_tests.append({"test": test_path, "duration": time_seconds}) |
| | except ValueError: |
| | pass |
| | except Exception as e: |
| | print(f"Error parsing durations file {file_path.replace('_stats.txt', '_durations.txt')}: {e}") |
| |
|
| | return slowest_tests |
| |
|
| |
|
| | def parse_failures_file(file_path): |
| | """Parse a failures file to extract failed test details.""" |
| | failures = [] |
| | try: |
| | with open(file_path, "r") as f: |
| | content = f.read() |
| |
|
| | |
| |
|
| | |
| | if "============================= FAILURES SHORT STACK =============================" in content: |
| | |
| | test_headers = re.findall(r"_{5,}\s+([^_\n]+?)\s+_{5,}", content) |
| |
|
| | for test_name in test_headers: |
| | test_name = test_name.strip() |
| | |
| | if "." in test_name and not test_name.replace(".", "").isdigit(): |
| | |
| | |
| | if not test_name.endswith(".py") and "::" not in test_name and "/" not in test_name: |
| | |
| | line_file = file_path.replace("_failures_short.txt", "_failures_line.txt") |
| | if os.path.exists(line_file): |
| | try: |
| | with open(line_file, "r") as lf: |
| | line_content = lf.read() |
| | |
| | path_match = re.search( |
| | r"(tests/[\w/]+\.py::[^:]+::" + test_name.split(".")[-1] + ")", |
| | line_content, |
| | ) |
| | if path_match: |
| | test_name = path_match.group(1) |
| | except Exception: |
| | pass |
| |
|
| | failures.append( |
| | { |
| | "test": test_name, |
| | "error": "Error occurred", |
| | "original_test_name": test_name, |
| | } |
| | ) |
| |
|
| | |
| | if not failures: |
| | |
| | first_lines = content.split("\n")[:20] |
| | for line in first_lines: |
| | |
| | |
| | path_match = re.search(r"(tests/[\w/]+\.py::[\w\.]+::\w+)", line) |
| | |
| | class_match = re.search(r"([A-Za-z][A-Za-z0-9_]+\.[A-Za-z][A-Za-z0-9_]+)", line) |
| |
|
| | if path_match: |
| | test_name = path_match.group(1) |
| | failures.append( |
| | {"test": test_name, "error": "Error occurred", "original_test_name": test_name} |
| | ) |
| | break |
| | elif class_match and "test" in line.lower(): |
| | test_name = class_match.group(1) |
| | |
| | if "test" in test_name.lower(): |
| | failures.append( |
| | {"test": test_name, "error": "Error occurred", "original_test_name": test_name} |
| | ) |
| | else: |
| | |
| | failure_blocks = re.split(r"={70}", content) |
| |
|
| | for block in failure_blocks: |
| | if not block.strip(): |
| | continue |
| |
|
| | |
| | path_matches = re.findall(r"([\w/]+\.py::[\w\.]+::\w+)", block) |
| | if path_matches: |
| | for test_name in path_matches: |
| | failures.append( |
| | {"test": test_name, "error": "Error occurred", "original_test_name": test_name} |
| | ) |
| | else: |
| | |
| | class_matches = re.findall(r"([A-Za-z][A-Za-z0-9_]+\.[A-Za-z][A-Za-z0-9_]+)", block) |
| | for test_name in class_matches: |
| | |
| | if ( |
| | not test_name.startswith(("e.g", "i.e", "etc.")) |
| | and not test_name.isdigit() |
| | and "test" in test_name.lower() |
| | ): |
| | failures.append( |
| | {"test": test_name, "error": "Error occurred", "original_test_name": test_name} |
| | ) |
| |
|
| | except Exception as e: |
| | print(f"Error parsing failures in {file_path}: {e}") |
| |
|
| | return failures |
| |
|
| |
|
| | def consolidate_reports(reports_dir): |
| | """Consolidate test reports from multiple test runs, including from subdirectories.""" |
| | |
| | stats_files = glob.glob(f"{reports_dir}/**/*_stats.txt", recursive=True) |
| |
|
| | results = {} |
| | total_stats = {"tests": 0, "passed": 0, "failed": 0, "skipped": 0} |
| |
|
| | |
| | all_slow_tests = [] |
| |
|
| | |
| | for stats_file in stats_files: |
| | |
| | base_name = os.path.basename(stats_file).replace("_stats.txt", "") |
| |
|
| | |
| | rel_path = os.path.relpath(os.path.dirname(stats_file), reports_dir) |
| | if rel_path and rel_path != ".": |
| | |
| | dir_name = os.path.basename(rel_path) |
| | if dir_name.endswith("_test_reports"): |
| | dir_name = dir_name[:-13] |
| | base_name = f"{dir_name}/{base_name}" |
| |
|
| | |
| | stats = parse_stats_file(stats_file) |
| |
|
| | |
| | if not stats.get("slowest_tests"): |
| | stats["slowest_tests"] = parse_durations_file(stats_file) |
| |
|
| | |
| | for key in ["tests", "passed", "failed", "skipped"]: |
| | total_stats[key] += stats[key] |
| |
|
| | |
| | for slow_test in stats.get("slowest_tests", []): |
| | all_slow_tests.append({"test": slow_test["test"], "duration": slow_test["duration"], "suite": base_name}) |
| |
|
| | |
| | failures = [] |
| | if stats["failed"] > 0: |
| | |
| | summary_file = stats_file.replace("_stats.txt", "_summary_short.txt") |
| | if os.path.exists(summary_file): |
| | try: |
| | with open(summary_file, "r") as f: |
| | content = f.read() |
| | |
| | failed_test_lines = re.findall( |
| | r"FAILED\s+(tests/[\w/]+\.py::[A-Za-z0-9_\.]+::[A-Za-z0-9_]+)(?:\s+-\s+(.+))?", content |
| | ) |
| |
|
| | if failed_test_lines: |
| | for match in failed_test_lines: |
| | test_path = match[0] |
| | error_msg = match[1] if len(match) > 1 and match[1] else "No error message" |
| |
|
| | failures.append({"test": test_path, "error": error_msg}) |
| | except Exception as e: |
| | print(f"Error parsing summary file: {e}") |
| |
|
| | |
| | if not failures: |
| | failure_patterns = ["_failures_short.txt", "_failures.txt", "_failures_line.txt", "_failures_long.txt"] |
| |
|
| | for pattern in failure_patterns: |
| | failures_file = stats_file.replace("_stats.txt", pattern) |
| | if os.path.exists(failures_file): |
| | failures = parse_failures_file(failures_file) |
| | if failures: |
| | break |
| |
|
| | |
| |
|
| | |
| | results[base_name] = {"stats": stats, "failures": failures} |
| |
|
| | |
| | filtered_slow_tests = [test for test in all_slow_tests if "secs were omitted" not in test["test"]] |
| |
|
| | |
| | filtered_slow_tests.sort(key=lambda x: x["duration"], reverse=True) |
| |
|
| | |
| | num_slowest_tests = int(os.environ.get("SHOW_SLOWEST_TESTS", "10")) |
| | top_slowest_tests = filtered_slow_tests[:num_slowest_tests] if filtered_slow_tests else [] |
| |
|
| | |
| | total_duration = sum(test["duration"] for test in all_slow_tests) |
| |
|
| | |
| | suite_durations = {} |
| | for test in all_slow_tests: |
| | suite_name = test["suite"] |
| | if suite_name not in suite_durations: |
| | suite_durations[suite_name] = 0 |
| | suite_durations[suite_name] += test["duration"] |
| |
|
| | |
| |
|
| | return { |
| | "total_stats": total_stats, |
| | "test_suites": results, |
| | "slowest_tests": top_slowest_tests, |
| | "duration_stats": {"total_duration": total_duration, "suite_durations": suite_durations}, |
| | } |
| |
|
| |
|
| | def generate_report(consolidated_data): |
| | """Generate a comprehensive markdown report from consolidated data.""" |
| | report = [] |
| |
|
| | |
| | report.append("# Diffusers Nightly Test Report") |
| | report.append(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") |
| |
|
| | |
| |
|
| | |
| | total = consolidated_data["total_stats"] |
| | report.append("## Summary") |
| |
|
| | |
| | duration_stats = consolidated_data.get("duration_stats", {}) |
| | total_duration = duration_stats.get("total_duration", 0) |
| |
|
| | summary_table = [ |
| | ["Total Tests", total["tests"]], |
| | ["Passed", total["passed"]], |
| | ["Failed", total["failed"]], |
| | ["Skipped", total["skipped"]], |
| | ["Success Rate", f"{(total['passed'] / total['tests'] * 100):.2f}%" if total["tests"] > 0 else "N/A"], |
| | ["Total Duration", f"{total_duration:.2f}s" if total_duration else "N/A"], |
| | ] |
| |
|
| | report.append(tabulate(summary_table, tablefmt="pipe")) |
| | report.append("") |
| |
|
| | |
| |
|
| | |
| | report.append("## Test Suites") |
| |
|
| | |
| | suite_durations = consolidated_data.get("duration_stats", {}).get("suite_durations", {}) |
| |
|
| | if suite_durations: |
| | suites_table = [["Test Suite", "Tests", "Passed", "Failed", "Skipped", "Success Rate", "Duration (s)"]] |
| | else: |
| | suites_table = [["Test Suite", "Tests", "Passed", "Failed", "Skipped", "Success Rate"]] |
| |
|
| | |
| | sorted_suites = sorted( |
| | consolidated_data["test_suites"].items(), |
| | key=lambda x: (x[1]["stats"]["passed"] / x[1]["stats"]["tests"] * 100) if x[1]["stats"]["tests"] > 0 else 0, |
| | reverse=False, |
| | ) |
| |
|
| | for suite_name, suite_data in sorted_suites: |
| | stats = suite_data["stats"] |
| | success_rate = f"{(stats['passed'] / stats['tests'] * 100):.2f}%" if stats["tests"] > 0 else "N/A" |
| |
|
| | if suite_durations: |
| | duration = suite_durations.get(suite_name, 0) |
| | suites_table.append( |
| | [ |
| | suite_name, |
| | stats["tests"], |
| | stats["passed"], |
| | stats["failed"], |
| | stats["skipped"], |
| | success_rate, |
| | f"{duration:.2f}", |
| | ] |
| | ) |
| | else: |
| | suites_table.append( |
| | [suite_name, stats["tests"], stats["passed"], stats["failed"], stats["skipped"], success_rate] |
| | ) |
| |
|
| | report.append(tabulate(suites_table, headers="firstrow", tablefmt="pipe")) |
| | report.append("") |
| |
|
| | |
| | slowest_tests = consolidated_data.get("slowest_tests", []) |
| | if slowest_tests: |
| | report.append("## Slowest Tests") |
| |
|
| | slowest_table = [["Rank", "Test", "Duration (s)", "Test Suite"]] |
| | for i, test in enumerate(slowest_tests, 1): |
| | |
| | if "< 0.05 secs were omitted" in test["test"]: |
| | continue |
| | slowest_table.append([i, test["test"], f"{test['duration']:.2f}", test["suite"]]) |
| |
|
| | report.append(tabulate(slowest_table, headers="firstrow", tablefmt="pipe")) |
| | report.append("") |
| |
|
| | |
| | failed_suites = [s for s in sorted_suites if s[1]["stats"]["failed"] > 0] |
| |
|
| | if failed_suites: |
| | report.append("## Failures") |
| |
|
| | |
| | failures_by_module = {} |
| |
|
| | for suite_name, suite_data in failed_suites: |
| | |
| | for failure in suite_data.get("failures", []): |
| | test_name = failure["test"] |
| |
|
| | |
| | if not ("/" in test_name or "::" in test_name) and "." in test_name: |
| | |
| | |
| | if suite_name.startswith("tests_") and "_cuda" in suite_name: |
| | |
| | component = suite_name.replace("tests_", "").replace("_cuda", "") |
| | if "." in test_name: |
| | class_name, method_name = test_name.split(".", 1) |
| | possible_path = f"tests/{component}/test_{component}.py::{class_name}::{method_name}" |
| | |
| | if "test_" in method_name: |
| | test_name = possible_path |
| |
|
| | |
| | if "::" in test_name: |
| | |
| | parts = test_name.split("::") |
| | module_name = parts[-2] if len(parts) >= 2 else "Other" |
| | elif "." in test_name: |
| | |
| | parts = test_name.split(".") |
| | module_name = parts[0] |
| | else: |
| | module_name = "Other" |
| |
|
| | |
| | if ( |
| | module_name.startswith(("e.g", "i.e", "etc")) |
| | or module_name.replace(".", "").isdigit() |
| | or len(module_name) < 3 |
| | ): |
| | module_name = "Other" |
| |
|
| | |
| | if module_name not in failures_by_module: |
| | failures_by_module[module_name] = [] |
| |
|
| | |
| | if "/" not in test_name and suite_name not in test_name: |
| | full_test_name = f"{suite_name}::{test_name}" |
| | else: |
| | full_test_name = test_name |
| |
|
| | |
| | failures_by_module[module_name].append( |
| | {"test": full_test_name, "original_test": test_name, "error": failure["error"]} |
| | ) |
| |
|
| | |
| | if failures_by_module: |
| | for module_name, failures in sorted(failures_by_module.items()): |
| | report.append(f"### {module_name}") |
| |
|
| | |
| | report.append("```") |
| | for failure in failures: |
| | |
| | if failure.get("error") and failure["error"] != "No error message": |
| | report.append(f"{failure['test']} - {failure['error']}") |
| | else: |
| | report.append(failure["test"]) |
| | report.append("```") |
| |
|
| | report.append("") |
| | else: |
| | report.append("*No detailed failure information available*") |
| | report.append("") |
| |
|
| | return "\n".join(report) |
| |
|
| |
|
| | def create_test_groups_table(test_groups, total_tests, total_success_rate): |
| | """Create a table-like format for test groups showing total tests and success rate.""" |
| | if not test_groups: |
| | return None |
| |
|
| | |
| | sorted_groups = sorted(test_groups.items(), key=lambda x: x[1]["total"], reverse=True) |
| |
|
| | |
| | table_lines = ["```"] |
| | table_lines.append("Test Results Summary") |
| | table_lines.append("-------------------") |
| | table_lines.append(f"Total Tests: {total_tests:,}") |
| | table_lines.append(f"Success Rate: {total_success_rate}") |
| | table_lines.append("") |
| | table_lines.append("Category | Total Tests | Failed | Success Rate") |
| | table_lines.append("------------------- | ----------- | ------ | ------------") |
| |
|
| | |
| | for category, stats in sorted_groups: |
| | |
| | padded_cat = category[:19].ljust(19) |
| | |
| | padded_total = str(stats["total"]).rjust(11) |
| | padded_failed = str(stats["failed"]).rjust(6) |
| | |
| | if stats["total"] > 0: |
| | cat_success_rate = f"{((stats['total'] - stats['failed']) / stats['total'] * 100):.1f}%" |
| | else: |
| | cat_success_rate = "N/A" |
| | padded_rate = cat_success_rate.rjust(12) |
| | table_lines.append(f"{padded_cat} | {padded_total} | {padded_failed} | {padded_rate}") |
| |
|
| | table_lines.append("```") |
| |
|
| | total_failures = sum(stats["failed"] for stats in test_groups.values()) |
| | return ( |
| | f"*Test Groups Summary ({total_failures} {'failure' if total_failures == 1 else 'failures'}):*\n" |
| | + "\n".join(table_lines) |
| | ) |
| |
|
| |
|
| | def create_slack_payload(consolidated_data): |
| | """Create a concise Slack message payload from consolidated data.""" |
| | total = consolidated_data["total_stats"] |
| | success_rate = f"{(total['passed'] / total['tests'] * 100):.2f}%" if total["tests"] > 0 else "N/A" |
| |
|
| | |
| | if total["failed"] == 0: |
| | emoji = "✅" |
| | elif total["failed"] / total["tests"] < 0.1: |
| | emoji = "⚠️" |
| | else: |
| | emoji = "❌" |
| |
|
| | |
| | summary = f"{emoji} *Diffusers Nightly Tests:* {success_rate} success ({total['passed']}/{total['tests']} tests" |
| | if total["skipped"] > 0: |
| | summary += f", {total['skipped']} skipped" |
| | summary += ")" |
| |
|
| | |
| | |
| | table_lines = [] |
| | table_lines.append("```") |
| |
|
| | |
| | sorted_suites = sorted( |
| | consolidated_data["test_suites"].items(), |
| | key=lambda x: (x[1]["stats"]["passed"] / x[1]["stats"]["tests"] * 100) if x[1]["stats"]["tests"] > 0 else 0, |
| | reverse=False, |
| | ) |
| |
|
| | |
| | max_suite_name_len = max(len(suite_name) for suite_name, _ in sorted_suites) if sorted_suites else 10 |
| | max_suite_name_len = max(max_suite_name_len, len("Test Suite")) |
| |
|
| | |
| | header = f"| {'Test Suite'.ljust(max_suite_name_len)} | {'Tests'.rjust(6)} | {'Failed'.rjust(6)} | {'Success Rate'.ljust(12)} |" |
| | separator = f"|:{'-' * max_suite_name_len}|{'-' * 7}:|{'-' * 7}:|:{'-' * 11}|" |
| |
|
| | table_lines.append(header) |
| | table_lines.append(separator) |
| |
|
| | |
| | for suite_name, suite_data in sorted_suites: |
| | stats = suite_data["stats"] |
| | suite_success_rate = f"{(stats['passed'] / stats['tests'] * 100):.2f}%" if stats["tests"] > 0 else "N/A" |
| |
|
| | row = f"| {suite_name.ljust(max_suite_name_len)} | {str(stats['tests']).rjust(6)} | {str(stats['failed']).rjust(6)} | {suite_success_rate.ljust(12)} |" |
| |
|
| | table_lines.append(row) |
| |
|
| | table_lines.append("```") |
| |
|
| | |
| | payload = [ |
| | {"type": "section", "text": {"type": "mrkdwn", "text": summary}}, |
| | {"type": "section", "text": {"type": "mrkdwn", "text": "\n".join(table_lines)}}, |
| | ] |
| |
|
| | |
| | if os.environ.get("GITHUB_RUN_ID"): |
| | run_id = os.environ["GITHUB_RUN_ID"] |
| | payload.append( |
| | { |
| | "type": "section", |
| | "text": { |
| | "type": "mrkdwn", |
| | "text": f"*<https://github.com/huggingface/diffusers/actions/runs/{run_id}|View full report on GitHub>*", |
| | }, |
| | } |
| | ) |
| |
|
| | |
| | payload.append( |
| | { |
| | "type": "context", |
| | "elements": [ |
| | { |
| | "type": "plain_text", |
| | "text": f"Results for {date.today()}", |
| | }, |
| | ], |
| | } |
| | ) |
| |
|
| | |
| | payload_text = str(payload) |
| | if len(payload_text) > MAX_LEN_MESSAGE: |
| | |
| | |
| | original_table_lines = table_lines[:] |
| | while len(str(payload)) > MAX_LEN_MESSAGE and len(table_lines) > 3: |
| | |
| | table_lines.pop(-2) |
| |
|
| | |
| | payload[1] = {"type": "section", "text": {"type": "mrkdwn", "text": "\n".join(table_lines)}} |
| |
|
| | |
| | if len(table_lines) < len(original_table_lines): |
| | truncated_count = len(original_table_lines) - len(table_lines) |
| | table_lines.insert(-1, f"... {truncated_count} more test suites (truncated due to message limit)") |
| | payload[1] = {"type": "section", "text": {"type": "mrkdwn", "text": "\n".join(table_lines)}} |
| |
|
| | return payload |
| |
|
| |
|
| | def create_failed_tests_by_suite_ordered(consolidated_data): |
| | """Group failed tests by test suite, ordered by success rate (ascending).""" |
| | |
| | sorted_suites = sorted( |
| | consolidated_data["test_suites"].items(), |
| | key=lambda x: (x[1]["stats"]["passed"] / x[1]["stats"]["tests"] * 100) if x[1]["stats"]["tests"] > 0 else 0, |
| | reverse=False, |
| | ) |
| |
|
| | failed_suite_tests = [] |
| |
|
| | |
| | for suite_name, suite_data in sorted_suites: |
| | if suite_data["stats"]["failed"] > 0: |
| | suite_failures = [] |
| |
|
| | for failure in suite_data.get("failures", []): |
| | test_name = failure["test"] |
| |
|
| | |
| | if "::" in test_name and "/" in test_name: |
| | full_test_name = test_name |
| | elif "::" in test_name or "." in test_name: |
| | if "/" not in test_name and suite_name not in test_name: |
| | full_test_name = f"{suite_name}::{test_name}" |
| | else: |
| | full_test_name = test_name |
| | else: |
| | full_test_name = f"{suite_name}::{test_name}" |
| |
|
| | suite_failures.append(full_test_name) |
| |
|
| | |
| | suite_failures = sorted(set(suite_failures)) |
| |
|
| | if suite_failures: |
| | failed_suite_tests.append( |
| | { |
| | "suite_name": suite_name, |
| | "tests": suite_failures, |
| | "success_rate": (suite_data["stats"]["passed"] / suite_data["stats"]["tests"] * 100) |
| | if suite_data["stats"]["tests"] > 0 |
| | else 0, |
| | } |
| | ) |
| |
|
| | return failed_suite_tests |
| |
|
| |
|
| | def main(args): |
| | |
| | if not os.path.isdir(args.reports_dir): |
| | print(f"Error: Reports directory '{args.reports_dir}' does not exist.") |
| | return |
| |
|
| | |
| | consolidated_data = consolidate_reports(args.reports_dir) |
| |
|
| | |
| | if consolidated_data["total_stats"]["tests"] == 0: |
| | print(f"Warning: No test results found in '{args.reports_dir}' or its subdirectories.") |
| |
|
| | |
| | report = generate_report(consolidated_data) |
| |
|
| | |
| | if args.output_file: |
| | |
| | output_dir = os.path.dirname(args.output_file) |
| | if output_dir and not os.path.exists(output_dir): |
| | os.makedirs(output_dir) |
| |
|
| | with open(args.output_file, "w") as f: |
| | f.write(report) |
| |
|
| | |
| | print(report) |
| |
|
| | |
| | slack_token = os.environ.get("SLACK_API_TOKEN") |
| | if slack_token and args.slack_channel_name: |
| | payload = create_slack_payload(consolidated_data) |
| |
|
| | try: |
| | client = WebClient(token=slack_token) |
| | |
| | response = client.chat_postMessage(channel=f"#{args.slack_channel_name}", blocks=payload) |
| | print(f"Report sent to Slack channel: {args.slack_channel_name}") |
| |
|
| | |
| | total = consolidated_data["total_stats"] |
| | if total["failed"] > 0: |
| | failed_suites = create_failed_tests_by_suite_ordered(consolidated_data) |
| | for suite_info in failed_suites: |
| | suite_name = suite_info["suite_name"] |
| | suite_tests = suite_info["tests"] |
| | success_rate = suite_info["success_rate"] |
| | message_text = ( |
| | f"**{suite_name}** (Success Rate: {success_rate:.2f}%)\n```\n" |
| | + "\n".join(suite_tests) |
| | + "\n```" |
| | ) |
| | client.chat_postMessage( |
| | channel=f"#{args.slack_channel_name}", |
| | thread_ts=response["ts"], |
| | text=message_text, |
| | ) |
| | print(f"Failed tests details sent as {len(failed_suites)} thread replies") |
| | except Exception as e: |
| | print(f"Error sending report to Slack: {e}") |
| |
|
| |
|
| | if __name__ == "__main__": |
| | args = parser.parse_args() |
| | main(args) |
| |
|