Spaces:
Sleeping
Sleeping
| # Licensed under the MIT License | |
| # https://github.com/craigahobbs/unittest-parallel/blob/main/LICENSE | |
| # SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | |
| # SPDX-License-Identifier: LicenseRef-NvidiaProprietary | |
| # | |
| # NVIDIA CORPORATION, its affiliates and licensors retain all intellectual | |
| # property and proprietary rights in and to this material, related | |
| # documentation and any modifications thereto. Any use, reproduction, | |
| # disclosure or distribution of this material and related documentation | |
| # without an express license agreement from NVIDIA CORPORATION or | |
| # its affiliates is strictly prohibited. | |
| """ | |
| unittest-parallel command-line script main module | |
| """ | |
| import argparse | |
| import concurrent.futures # NVIDIA Modification | |
| import multiprocessing | |
| import os | |
| import sys | |
| import tempfile | |
| import time | |
| import unittest | |
| from contextlib import contextmanager | |
| from io import StringIO | |
| import warp.tests.unittest_suites # NVIDIA Modification | |
| try: | |
| import coverage | |
| COVERAGE_AVAILABLE = True # NVIDIA Modification | |
| except ImportError: | |
| COVERAGE_AVAILABLE = False # NVIDIA Modification | |
| # The following variables are NVIDIA Modifications | |
| RUNNING_IN_TEAMCITY = os.environ.get("TEAMCITY_VERSION") is not None | |
| TEST_SUITE_NAME = "WarpTests" | |
| START_DIRECTORY = os.path.dirname(__file__) # The directory to start test discovery | |
| def main(argv=None): | |
| """ | |
| unittest-parallel command-line script main entry point | |
| """ | |
| # Command line arguments | |
| parser = argparse.ArgumentParser(prog="unittest-parallel") | |
| # parser.add_argument("-v", "--verbose", action="store_const", const=2, default=1, help="Verbose output") | |
| parser.add_argument("-q", "--quiet", dest="verbose", action="store_const", const=0, default=2, help="Quiet output") | |
| parser.add_argument("-f", "--failfast", action="store_true", default=False, help="Stop on first fail or error") | |
| parser.add_argument( | |
| "-b", "--buffer", action="store_true", default=False, help="Buffer stdout and stderr during tests" | |
| ) | |
| parser.add_argument( | |
| "-k", | |
| dest="testNamePatterns", | |
| action="append", | |
| type=_convert_select_pattern, | |
| help="Only run tests which match the given substring", | |
| ) | |
| parser.add_argument( | |
| "-p", "--pattern", metavar="PATTERN", default="test*.py", help="Pattern to match tests ('test*.py' default)" | |
| ) | |
| parser.add_argument( | |
| "-t", | |
| "--top-level-directory", | |
| metavar="TOP", | |
| help="Top level directory of project (defaults to start directory)", | |
| ) | |
| group_parallel = parser.add_argument_group("parallelization options") | |
| group_parallel.add_argument( | |
| "-j", | |
| "--jobs", | |
| metavar="COUNT", | |
| type=int, | |
| default=0, | |
| help="The number of test processes (default is 0, all cores)", | |
| ) | |
| group_parallel.add_argument( | |
| "-m", | |
| "--maxjobs", | |
| metavar="MAXCOUNT", | |
| type=int, | |
| default=8, | |
| help="The maximum number of test processes (default is 8)", | |
| ) # NVIDIA Modification | |
| group_parallel.add_argument( | |
| "--level", | |
| choices=["module", "class", "test"], | |
| default="class", | |
| help="Set the test parallelism level (default is 'class')", | |
| ) | |
| group_parallel.add_argument( | |
| "--disable-process-pooling", | |
| action="store_true", | |
| default=False, | |
| help="Do not reuse processes used to run test suites", | |
| ) | |
| group_parallel.add_argument( | |
| "--disable-concurrent-futures", | |
| action="store_true", | |
| default=False, | |
| help="Use multiprocessing instead of concurrent.futures.", | |
| ) # NVIDIA Modification | |
| group_parallel.add_argument( | |
| "--serial-fallback", | |
| action="store_true", | |
| default=False, | |
| help="Run in a single-process (no spawning) mode without multiprocessing or concurrent.futures.", | |
| ) # NVIDIA Modification | |
| group_coverage = parser.add_argument_group("coverage options") | |
| group_coverage.add_argument("--coverage", action="store_true", help="Run tests with coverage") | |
| group_coverage.add_argument("--coverage-branch", action="store_true", help="Run tests with branch coverage") | |
| group_coverage.add_argument( | |
| "--coverage-html", | |
| metavar="DIR", | |
| help="Generate coverage HTML report", | |
| default=os.path.join(START_DIRECTORY, "..", "..", "htmlcov"), | |
| ) | |
| group_coverage.add_argument("--coverage-xml", metavar="FILE", help="Generate coverage XML report") | |
| group_coverage.add_argument( | |
| "--coverage-fail-under", metavar="MIN", type=float, help="Fail if coverage percentage under min" | |
| ) | |
| args = parser.parse_args(args=argv) | |
| if args.coverage_branch: | |
| args.coverage = args.coverage_branch | |
| if args.coverage and not COVERAGE_AVAILABLE: | |
| parser.exit( | |
| status=2, message="--coverage was used, but coverage was not found. Is it installed?\n" | |
| ) # NVIDIA Modification | |
| process_count = max(0, args.jobs) | |
| if process_count == 0: | |
| process_count = multiprocessing.cpu_count() | |
| process_count = min(process_count, args.maxjobs) # NVIDIA Modification | |
| # Create the temporary directory (for coverage files) | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| # Discover tests | |
| with _coverage(args, temp_dir): | |
| test_loader = unittest.TestLoader() | |
| if args.testNamePatterns: | |
| test_loader.testNamePatterns = args.testNamePatterns | |
| discover_suite = warp.tests.unittest_suites.auto_discover_suite( | |
| test_loader, args.pattern | |
| ) # NVIDIA Modification | |
| # discover_suite = warp.tests.unittest_suites.explicit_suite() | |
| # Get the parallelizable test suites | |
| if args.level == "test": | |
| test_suites = list(_iter_test_cases(discover_suite)) | |
| elif args.level == "class": | |
| test_suites = list(_iter_class_suites(discover_suite)) | |
| else: # args.level == 'module' | |
| test_suites = list(_iter_module_suites(discover_suite)) | |
| # Don't use more processes than test suites | |
| process_count = max(1, min(len(test_suites), process_count)) | |
| if RUNNING_IN_TEAMCITY: | |
| print(f"##teamcity[testSuiteStarted name='{TEST_SUITE_NAME}']") # NVIDIA Modification for TC | |
| if not args.serial_fallback: | |
| # Report test suites and processes | |
| print( | |
| f"Running {len(test_suites)} test suites ({discover_suite.countTestCases()} total tests) across {process_count} processes", | |
| file=sys.stderr, | |
| ) | |
| if args.verbose > 1: | |
| print(file=sys.stderr) | |
| # Run the tests in parallel | |
| start_time = time.perf_counter() | |
| if args.disable_concurrent_futures: | |
| multiprocessing_context = multiprocessing.get_context(method="spawn") | |
| maxtasksperchild = 1 if args.disable_process_pooling else None | |
| with multiprocessing_context.Pool( | |
| process_count, | |
| maxtasksperchild=maxtasksperchild, | |
| initializer=set_worker_cache, | |
| initargs=(args, temp_dir), | |
| ) as pool, multiprocessing.Manager() as manager: | |
| test_manager = ParallelTestManager(manager, args, temp_dir) | |
| results = pool.map(test_manager.run_tests, test_suites) | |
| else: | |
| # NVIDIA Modification added concurrent.futures | |
| with concurrent.futures.ProcessPoolExecutor( | |
| max_workers=process_count, | |
| mp_context=multiprocessing.get_context(method="spawn"), | |
| initializer=set_worker_cache, | |
| initargs=(args, temp_dir), | |
| ) as executor, multiprocessing.Manager() as manager: | |
| test_manager = ParallelTestManager(manager, args, temp_dir) | |
| results = list(executor.map(test_manager.run_tests, test_suites, timeout=3600)) | |
| else: | |
| # This entire path is an NVIDIA Modification | |
| # Report test suites and processes | |
| print(f"Running {discover_suite.countTestCases()} total tests (serial fallback)", file=sys.stderr) | |
| if args.verbose > 1: | |
| print(file=sys.stderr) | |
| import warp as wp | |
| # force rebuild of all kernels | |
| wp.build.clear_kernel_cache() | |
| print("Cleared Warp kernel cache") | |
| # Run the tests in serial | |
| start_time = time.perf_counter() | |
| with multiprocessing.Manager() as manager: | |
| test_manager = ParallelTestManager(manager, args, temp_dir) | |
| results = [test_manager.run_tests(discover_suite)] | |
| stop_time = time.perf_counter() | |
| test_duration = stop_time - start_time | |
| if RUNNING_IN_TEAMCITY: | |
| print(f"##teamcity[testSuiteFinished name='{TEST_SUITE_NAME}']") # NVIDIA Modification for TC | |
| # Aggregate parallel test run results | |
| tests_run = 0 | |
| errors = [] | |
| failures = [] | |
| skipped = 0 | |
| expected_failures = 0 | |
| unexpected_successes = 0 | |
| for result in results: | |
| tests_run += result[0] | |
| errors.extend(result[1]) | |
| failures.extend(result[2]) | |
| skipped += result[3] | |
| expected_failures += result[4] | |
| unexpected_successes += result[5] | |
| is_success = not (errors or failures or unexpected_successes) | |
| # Compute test info | |
| infos = [] | |
| if failures: | |
| infos.append(f"failures={len(failures)}") | |
| if errors: | |
| infos.append(f"errors={len(errors)}") | |
| if skipped: | |
| infos.append(f"skipped={skipped}") | |
| if expected_failures: | |
| infos.append(f"expected failures={expected_failures}") | |
| if unexpected_successes: | |
| infos.append(f"unexpected successes={unexpected_successes}") | |
| # Report test errors | |
| if errors or failures: | |
| print(file=sys.stderr) | |
| for error in errors: | |
| print(error, file=sys.stderr) | |
| for failure in failures: | |
| print(failure, file=sys.stderr) | |
| elif args.verbose > 0: | |
| print(file=sys.stderr) | |
| # Test report | |
| print(unittest.TextTestResult.separator2, file=sys.stderr) | |
| print(f'Ran {tests_run} {"tests" if tests_run > 1 else "test"} in {test_duration:.3f}s', file=sys.stderr) | |
| print(file=sys.stderr) | |
| print(f'{"OK" if is_success else "FAILED"}{" (" + ", ".join(infos) + ")" if infos else ""}', file=sys.stderr) | |
| # Return an error status on failure | |
| if not is_success: | |
| if RUNNING_IN_TEAMCITY: | |
| print("##teamcity[buildStatus status='FAILURE']") # NVIDIA Modification for TC | |
| parser.exit(status=len(errors) + len(failures) + unexpected_successes) | |
| # Coverage? | |
| if args.coverage: | |
| # Combine the coverage files | |
| cov_options = {} | |
| cov_options["config_file"] = True # Grab configuration from pyproject.toml (must install coverage[toml]) | |
| cov = coverage.Coverage(**cov_options) | |
| cov.combine(data_paths=[os.path.join(temp_dir, x) for x in os.listdir(temp_dir)]) | |
| # Coverage report | |
| print(file=sys.stderr) | |
| percent_covered = cov.report(ignore_errors=True, file=sys.stderr) | |
| print(f"Total coverage is {percent_covered:.2f}%", file=sys.stderr) | |
| # HTML coverage report | |
| if args.coverage_html: | |
| cov.html_report(directory=args.coverage_html, ignore_errors=True) | |
| # XML coverage report | |
| if args.coverage_xml: | |
| cov.xml_report(outfile=args.coverage_xml, ignore_errors=True) | |
| # Fail under | |
| if args.coverage_fail_under and percent_covered < args.coverage_fail_under: | |
| parser.exit(status=2) | |
| def _convert_select_pattern(pattern): | |
| if "*" not in pattern: | |
| return f"*{pattern}*" | |
| return pattern | |
| def _coverage(args, temp_dir): | |
| # Running tests with coverage? | |
| if args.coverage: | |
| # Generate a random coverage data file name - file is deleted along with containing directory | |
| with tempfile.NamedTemporaryFile(dir=temp_dir, delete=False) as coverage_file: | |
| pass | |
| # Create the coverage object | |
| cov_options = { | |
| "branch": args.coverage_branch, | |
| "data_file": coverage_file.name, | |
| # NVIDIA Modification removed unneeded options | |
| } | |
| cov_options["config_file"] = True # Grab configuration from pyproject.toml (must install coverage[toml]) | |
| cov = coverage.Coverage(**cov_options) | |
| try: | |
| # Start measuring code coverage | |
| cov.start() | |
| # Yield for unit test running | |
| yield cov | |
| finally: | |
| # Stop measuring code coverage | |
| cov.stop() | |
| # Save the collected coverage data to the data file | |
| cov.save() | |
| else: | |
| # Not running tests with coverage - yield for unit test running | |
| yield None | |
| # Iterate module-level test suites - all top-level test suites returned from TestLoader.discover | |
| def _iter_module_suites(test_suite): | |
| for module_suite in test_suite: | |
| if module_suite.countTestCases(): | |
| yield module_suite | |
| # Iterate class-level test suites - test suites that contains test cases | |
| def _iter_class_suites(test_suite): | |
| has_cases = any(isinstance(suite, unittest.TestCase) for suite in test_suite) | |
| if has_cases: | |
| yield test_suite | |
| else: | |
| for suite in test_suite: | |
| yield from _iter_class_suites(suite) | |
| # Iterate test cases (methods) | |
| def _iter_test_cases(test_suite): | |
| if isinstance(test_suite, unittest.TestCase): | |
| yield test_suite | |
| else: | |
| for suite in test_suite: | |
| yield from _iter_test_cases(suite) | |
| class ParallelTestManager: | |
| def __init__(self, manager, args, temp_dir): | |
| self.args = args | |
| self.temp_dir = temp_dir | |
| self.failfast = manager.Event() | |
| def run_tests(self, test_suite): | |
| # Fail fast? | |
| if self.failfast.is_set(): | |
| return [0, [], [], 0, 0, 0] | |
| # Run unit tests | |
| with _coverage(self.args, self.temp_dir): | |
| runner = unittest.TextTestRunner( | |
| stream=StringIO(), | |
| resultclass=ParallelTeamCityTestResult | |
| if RUNNING_IN_TEAMCITY | |
| else ParallelTextTestResult, # NVIDIA Modification for TC | |
| verbosity=self.args.verbose, | |
| failfast=self.args.failfast, | |
| buffer=self.args.buffer, | |
| ) | |
| result = runner.run(test_suite) | |
| # Set failfast, if necessary | |
| if result.shouldStop: | |
| self.failfast.set() | |
| # Return (test_count, errors, failures, skipped_count, expected_failure_count, unexpected_success_count) | |
| return ( | |
| result.testsRun, | |
| [self._format_error(result, error) for error in result.errors], | |
| [self._format_error(result, failure) for failure in result.failures], | |
| len(result.skipped), | |
| len(result.expectedFailures), | |
| len(result.unexpectedSuccesses), | |
| ) | |
| def _format_error(result, error): | |
| return "\n".join( | |
| [ | |
| unittest.TextTestResult.separator1, | |
| result.getDescription(error[0]), | |
| unittest.TextTestResult.separator2, | |
| error[1], | |
| ] | |
| ) | |
| class ParallelTextTestResult(unittest.TextTestResult): | |
| def __init__(self, stream, descriptions, verbosity): | |
| stream = type(stream)(sys.stderr) | |
| super().__init__(stream, descriptions, verbosity) | |
| def startTest(self, test): | |
| if self.showAll: | |
| self.stream.writeln(f"{self.getDescription(test)} ...") | |
| self.stream.flush() | |
| super(unittest.TextTestResult, self).startTest(test) | |
| def _add_helper(self, test, dots_message, show_all_message): | |
| if self.showAll: | |
| self.stream.writeln(f"{self.getDescription(test)} ... {show_all_message}") | |
| elif self.dots: | |
| self.stream.write(dots_message) | |
| self.stream.flush() | |
| def addSuccess(self, test): | |
| super(unittest.TextTestResult, self).addSuccess(test) | |
| self._add_helper(test, ".", "ok") | |
| def addError(self, test, err): | |
| super(unittest.TextTestResult, self).addError(test, err) | |
| self._add_helper(test, "E", "ERROR") | |
| def addFailure(self, test, err): | |
| super(unittest.TextTestResult, self).addFailure(test, err) | |
| self._add_helper(test, "F", "FAIL") | |
| def addSkip(self, test, reason): | |
| super(unittest.TextTestResult, self).addSkip(test, reason) | |
| self._add_helper(test, "s", f"skipped {reason!r}") | |
| def addExpectedFailure(self, test, err): | |
| super(unittest.TextTestResult, self).addExpectedFailure(test, err) | |
| self._add_helper(test, "x", "expected failure") | |
| def addUnexpectedSuccess(self, test): | |
| super(unittest.TextTestResult, self).addUnexpectedSuccess(test) | |
| self._add_helper(test, "u", "unexpected success") | |
| def printErrors(self): | |
| pass | |
| # NVIDIA Modifications from here until end of file | |
| def set_worker_cache(args, temp_dir): | |
| """This function is run at the start of ever new process. It changes the Warp cache to avoid conflicts.""" | |
| with _coverage(args, temp_dir): | |
| import warp as wp | |
| from warp.thirdparty import appdirs | |
| pid = os.getpid() | |
| cache_root_dir = appdirs.user_cache_dir( | |
| appname="warp", appauthor="NVIDIA Corporation", version=f"{wp.config.version}-{pid}" | |
| ) | |
| wp.config.kernel_cache_dir = cache_root_dir | |
| wp.build.clear_kernel_cache() | |
| def _tc_escape(s): | |
| s = s.replace("|", "||") | |
| s = s.replace("\n", "|n") | |
| s = s.replace("\r", "|r") | |
| s = s.replace("'", "|'") | |
| s = s.replace("[", "|[") | |
| s = s.replace("]", "|]") | |
| return s | |
| class ParallelTeamCityTestResult(unittest.TextTestResult): | |
| def __init__(self, stream, descriptions, verbosity): | |
| stream = type(stream)(sys.stderr) | |
| super().__init__(stream, descriptions, verbosity) | |
| def startTest(self, test): | |
| if self.showAll: | |
| self.stream.writeln(f"{self.getDescription(test)} ...") | |
| self.stream.flush() | |
| self.start_time = time.perf_counter_ns() | |
| super(unittest.TextTestResult, self).startTest(test) | |
| def _add_helper(self, test, dots_message, show_all_message): | |
| if self.showAll: | |
| self.stream.writeln(f"{self.getDescription(test)} ... {show_all_message}") | |
| elif self.dots: | |
| self.stream.write(dots_message) | |
| self.stream.flush() | |
| def addSuccess(self, test): | |
| super(unittest.TextTestResult, self).addSuccess(test) | |
| self._add_helper(test, ".", "ok") | |
| self.reportSuccess(test) | |
| def addError(self, test, err): | |
| super(unittest.TextTestResult, self).addError(test, err) | |
| self._add_helper(test, "E", "ERROR") | |
| self.reportFailure(test, err) | |
| def addFailure(self, test, err): | |
| super(unittest.TextTestResult, self).addFailure(test, err) | |
| self._add_helper(test, "F", "FAIL") | |
| self.reportFailure(test, err) | |
| def addSkip(self, test, reason): | |
| super(unittest.TextTestResult, self).addSkip(test, reason) | |
| self._add_helper(test, "s", f"skipped {reason!r}") | |
| self.reportIgnored(test, reason) | |
| def addExpectedFailure(self, test, err): | |
| super(unittest.TextTestResult, self).addExpectedFailure(test, err) | |
| self._add_helper(test, "x", "expected failure") | |
| self.reportSuccess(test) | |
| def addUnexpectedSuccess(self, test): | |
| super(unittest.TextTestResult, self).addUnexpectedSuccess(test) | |
| self._add_helper(test, "u", "unexpected success") | |
| self.reportFailure(test, "unexpected success") | |
| def addSubTest(self, test, subtest, err): | |
| super(unittest.TextTestResult, self).addSubTest(test, subtest, err) | |
| if err is not None: | |
| self._add_helper(test, "E", "ERROR") | |
| self.reportSubTestFailure(test, err) | |
| def printErrors(self): | |
| pass | |
| def reportIgnored(self, test, reason): | |
| test_id = test.id() | |
| reason_str = str(reason) | |
| print(reason_str) | |
| self.stream.writeln(f"##teamcity[testIgnored name='{test_id}' message='{_tc_escape(str(reason))}']") | |
| self.stream.flush() | |
| def reportSuccess(self, test): | |
| duration = round((time.perf_counter_ns() - self.start_time) / 1e6) # [ms] | |
| test_id = test.id() | |
| self.stream.writeln(f"##teamcity[testStarted name='{test_id}']") | |
| self.stream.writeln(f"##teamcity[testFinished name='{test_id}' duration='{duration}']") | |
| self.stream.flush() | |
| def reportFailure(self, test, err): | |
| test_id = test.id() | |
| self.stream.writeln(f"##teamcity[testStarted name='{test_id}']") | |
| self.stream.writeln( | |
| f"##teamcity[testFailed name='{test_id}' message='{_tc_escape(str(err[1]))}' details='{_tc_escape(str(err[2]))}']" | |
| ) | |
| self.stream.writeln(f"##teamcity[testFinished name='{test_id}']") | |
| self.stream.flush() | |
| def reportSubTestFailure(self, test, err): | |
| test_id = test.id() | |
| self.stream.writeln(f"##teamcity[testStarted name='{test_id}']") | |
| self.stream.writeln( | |
| f"##teamcity[testFailed name='{test_id}' message='{_tc_escape(str(err[1]))}' details='{_tc_escape(self._exc_info_to_string(err, test))}']" | |
| ) | |
| self.stream.writeln(f"##teamcity[testFinished name='{test_id}']") | |
| self.stream.flush() | |
| if __name__ == "__main__": # pragma: no cover | |
| main() | |