qbhf2's picture
added NvidiaWarp and GarmentCode repos
66c9c8a
# Licensed under the MIT License
# https://github.com/craigahobbs/unittest-parallel/blob/main/LICENSE
# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: LicenseRef-NvidiaProprietary
#
# NVIDIA CORPORATION, its affiliates and licensors retain all intellectual
# property and proprietary rights in and to this material, related
# documentation and any modifications thereto. Any use, reproduction,
# disclosure or distribution of this material and related documentation
# without an express license agreement from NVIDIA CORPORATION or
# its affiliates is strictly prohibited.
"""
unittest-parallel command-line script main module
"""
import argparse
import concurrent.futures # NVIDIA Modification
import multiprocessing
import os
import sys
import tempfile
import time
import unittest
from contextlib import contextmanager
from io import StringIO
import warp.tests.unittest_suites # NVIDIA Modification
try:
import coverage
COVERAGE_AVAILABLE = True # NVIDIA Modification
except ImportError:
COVERAGE_AVAILABLE = False # NVIDIA Modification
# The following variables are NVIDIA Modifications
RUNNING_IN_TEAMCITY = os.environ.get("TEAMCITY_VERSION") is not None
TEST_SUITE_NAME = "WarpTests"
START_DIRECTORY = os.path.dirname(__file__) # The directory to start test discovery
def main(argv=None):
"""
unittest-parallel command-line script main entry point
"""
# Command line arguments
parser = argparse.ArgumentParser(prog="unittest-parallel")
# parser.add_argument("-v", "--verbose", action="store_const", const=2, default=1, help="Verbose output")
parser.add_argument("-q", "--quiet", dest="verbose", action="store_const", const=0, default=2, help="Quiet output")
parser.add_argument("-f", "--failfast", action="store_true", default=False, help="Stop on first fail or error")
parser.add_argument(
"-b", "--buffer", action="store_true", default=False, help="Buffer stdout and stderr during tests"
)
parser.add_argument(
"-k",
dest="testNamePatterns",
action="append",
type=_convert_select_pattern,
help="Only run tests which match the given substring",
)
parser.add_argument(
"-p", "--pattern", metavar="PATTERN", default="test*.py", help="Pattern to match tests ('test*.py' default)"
)
parser.add_argument(
"-t",
"--top-level-directory",
metavar="TOP",
help="Top level directory of project (defaults to start directory)",
)
group_parallel = parser.add_argument_group("parallelization options")
group_parallel.add_argument(
"-j",
"--jobs",
metavar="COUNT",
type=int,
default=0,
help="The number of test processes (default is 0, all cores)",
)
group_parallel.add_argument(
"-m",
"--maxjobs",
metavar="MAXCOUNT",
type=int,
default=8,
help="The maximum number of test processes (default is 8)",
) # NVIDIA Modification
group_parallel.add_argument(
"--level",
choices=["module", "class", "test"],
default="class",
help="Set the test parallelism level (default is 'class')",
)
group_parallel.add_argument(
"--disable-process-pooling",
action="store_true",
default=False,
help="Do not reuse processes used to run test suites",
)
group_parallel.add_argument(
"--disable-concurrent-futures",
action="store_true",
default=False,
help="Use multiprocessing instead of concurrent.futures.",
) # NVIDIA Modification
group_parallel.add_argument(
"--serial-fallback",
action="store_true",
default=False,
help="Run in a single-process (no spawning) mode without multiprocessing or concurrent.futures.",
) # NVIDIA Modification
group_coverage = parser.add_argument_group("coverage options")
group_coverage.add_argument("--coverage", action="store_true", help="Run tests with coverage")
group_coverage.add_argument("--coverage-branch", action="store_true", help="Run tests with branch coverage")
group_coverage.add_argument(
"--coverage-html",
metavar="DIR",
help="Generate coverage HTML report",
default=os.path.join(START_DIRECTORY, "..", "..", "htmlcov"),
)
group_coverage.add_argument("--coverage-xml", metavar="FILE", help="Generate coverage XML report")
group_coverage.add_argument(
"--coverage-fail-under", metavar="MIN", type=float, help="Fail if coverage percentage under min"
)
args = parser.parse_args(args=argv)
if args.coverage_branch:
args.coverage = args.coverage_branch
if args.coverage and not COVERAGE_AVAILABLE:
parser.exit(
status=2, message="--coverage was used, but coverage was not found. Is it installed?\n"
) # NVIDIA Modification
process_count = max(0, args.jobs)
if process_count == 0:
process_count = multiprocessing.cpu_count()
process_count = min(process_count, args.maxjobs) # NVIDIA Modification
# Create the temporary directory (for coverage files)
with tempfile.TemporaryDirectory() as temp_dir:
# Discover tests
with _coverage(args, temp_dir):
test_loader = unittest.TestLoader()
if args.testNamePatterns:
test_loader.testNamePatterns = args.testNamePatterns
discover_suite = warp.tests.unittest_suites.auto_discover_suite(
test_loader, args.pattern
) # NVIDIA Modification
# discover_suite = warp.tests.unittest_suites.explicit_suite()
# Get the parallelizable test suites
if args.level == "test":
test_suites = list(_iter_test_cases(discover_suite))
elif args.level == "class":
test_suites = list(_iter_class_suites(discover_suite))
else: # args.level == 'module'
test_suites = list(_iter_module_suites(discover_suite))
# Don't use more processes than test suites
process_count = max(1, min(len(test_suites), process_count))
if RUNNING_IN_TEAMCITY:
print(f"##teamcity[testSuiteStarted name='{TEST_SUITE_NAME}']") # NVIDIA Modification for TC
if not args.serial_fallback:
# Report test suites and processes
print(
f"Running {len(test_suites)} test suites ({discover_suite.countTestCases()} total tests) across {process_count} processes",
file=sys.stderr,
)
if args.verbose > 1:
print(file=sys.stderr)
# Run the tests in parallel
start_time = time.perf_counter()
if args.disable_concurrent_futures:
multiprocessing_context = multiprocessing.get_context(method="spawn")
maxtasksperchild = 1 if args.disable_process_pooling else None
with multiprocessing_context.Pool(
process_count,
maxtasksperchild=maxtasksperchild,
initializer=set_worker_cache,
initargs=(args, temp_dir),
) as pool, multiprocessing.Manager() as manager:
test_manager = ParallelTestManager(manager, args, temp_dir)
results = pool.map(test_manager.run_tests, test_suites)
else:
# NVIDIA Modification added concurrent.futures
with concurrent.futures.ProcessPoolExecutor(
max_workers=process_count,
mp_context=multiprocessing.get_context(method="spawn"),
initializer=set_worker_cache,
initargs=(args, temp_dir),
) as executor, multiprocessing.Manager() as manager:
test_manager = ParallelTestManager(manager, args, temp_dir)
results = list(executor.map(test_manager.run_tests, test_suites, timeout=3600))
else:
# This entire path is an NVIDIA Modification
# Report test suites and processes
print(f"Running {discover_suite.countTestCases()} total tests (serial fallback)", file=sys.stderr)
if args.verbose > 1:
print(file=sys.stderr)
import warp as wp
# force rebuild of all kernels
wp.build.clear_kernel_cache()
print("Cleared Warp kernel cache")
# Run the tests in serial
start_time = time.perf_counter()
with multiprocessing.Manager() as manager:
test_manager = ParallelTestManager(manager, args, temp_dir)
results = [test_manager.run_tests(discover_suite)]
stop_time = time.perf_counter()
test_duration = stop_time - start_time
if RUNNING_IN_TEAMCITY:
print(f"##teamcity[testSuiteFinished name='{TEST_SUITE_NAME}']") # NVIDIA Modification for TC
# Aggregate parallel test run results
tests_run = 0
errors = []
failures = []
skipped = 0
expected_failures = 0
unexpected_successes = 0
for result in results:
tests_run += result[0]
errors.extend(result[1])
failures.extend(result[2])
skipped += result[3]
expected_failures += result[4]
unexpected_successes += result[5]
is_success = not (errors or failures or unexpected_successes)
# Compute test info
infos = []
if failures:
infos.append(f"failures={len(failures)}")
if errors:
infos.append(f"errors={len(errors)}")
if skipped:
infos.append(f"skipped={skipped}")
if expected_failures:
infos.append(f"expected failures={expected_failures}")
if unexpected_successes:
infos.append(f"unexpected successes={unexpected_successes}")
# Report test errors
if errors or failures:
print(file=sys.stderr)
for error in errors:
print(error, file=sys.stderr)
for failure in failures:
print(failure, file=sys.stderr)
elif args.verbose > 0:
print(file=sys.stderr)
# Test report
print(unittest.TextTestResult.separator2, file=sys.stderr)
print(f'Ran {tests_run} {"tests" if tests_run > 1 else "test"} in {test_duration:.3f}s', file=sys.stderr)
print(file=sys.stderr)
print(f'{"OK" if is_success else "FAILED"}{" (" + ", ".join(infos) + ")" if infos else ""}', file=sys.stderr)
# Return an error status on failure
if not is_success:
if RUNNING_IN_TEAMCITY:
print("##teamcity[buildStatus status='FAILURE']") # NVIDIA Modification for TC
parser.exit(status=len(errors) + len(failures) + unexpected_successes)
# Coverage?
if args.coverage:
# Combine the coverage files
cov_options = {}
cov_options["config_file"] = True # Grab configuration from pyproject.toml (must install coverage[toml])
cov = coverage.Coverage(**cov_options)
cov.combine(data_paths=[os.path.join(temp_dir, x) for x in os.listdir(temp_dir)])
# Coverage report
print(file=sys.stderr)
percent_covered = cov.report(ignore_errors=True, file=sys.stderr)
print(f"Total coverage is {percent_covered:.2f}%", file=sys.stderr)
# HTML coverage report
if args.coverage_html:
cov.html_report(directory=args.coverage_html, ignore_errors=True)
# XML coverage report
if args.coverage_xml:
cov.xml_report(outfile=args.coverage_xml, ignore_errors=True)
# Fail under
if args.coverage_fail_under and percent_covered < args.coverage_fail_under:
parser.exit(status=2)
def _convert_select_pattern(pattern):
if "*" not in pattern:
return f"*{pattern}*"
return pattern
@contextmanager
def _coverage(args, temp_dir):
# Running tests with coverage?
if args.coverage:
# Generate a random coverage data file name - file is deleted along with containing directory
with tempfile.NamedTemporaryFile(dir=temp_dir, delete=False) as coverage_file:
pass
# Create the coverage object
cov_options = {
"branch": args.coverage_branch,
"data_file": coverage_file.name,
# NVIDIA Modification removed unneeded options
}
cov_options["config_file"] = True # Grab configuration from pyproject.toml (must install coverage[toml])
cov = coverage.Coverage(**cov_options)
try:
# Start measuring code coverage
cov.start()
# Yield for unit test running
yield cov
finally:
# Stop measuring code coverage
cov.stop()
# Save the collected coverage data to the data file
cov.save()
else:
# Not running tests with coverage - yield for unit test running
yield None
# Iterate module-level test suites - all top-level test suites returned from TestLoader.discover
def _iter_module_suites(test_suite):
for module_suite in test_suite:
if module_suite.countTestCases():
yield module_suite
# Iterate class-level test suites - test suites that contains test cases
def _iter_class_suites(test_suite):
has_cases = any(isinstance(suite, unittest.TestCase) for suite in test_suite)
if has_cases:
yield test_suite
else:
for suite in test_suite:
yield from _iter_class_suites(suite)
# Iterate test cases (methods)
def _iter_test_cases(test_suite):
if isinstance(test_suite, unittest.TestCase):
yield test_suite
else:
for suite in test_suite:
yield from _iter_test_cases(suite)
class ParallelTestManager:
def __init__(self, manager, args, temp_dir):
self.args = args
self.temp_dir = temp_dir
self.failfast = manager.Event()
def run_tests(self, test_suite):
# Fail fast?
if self.failfast.is_set():
return [0, [], [], 0, 0, 0]
# Run unit tests
with _coverage(self.args, self.temp_dir):
runner = unittest.TextTestRunner(
stream=StringIO(),
resultclass=ParallelTeamCityTestResult
if RUNNING_IN_TEAMCITY
else ParallelTextTestResult, # NVIDIA Modification for TC
verbosity=self.args.verbose,
failfast=self.args.failfast,
buffer=self.args.buffer,
)
result = runner.run(test_suite)
# Set failfast, if necessary
if result.shouldStop:
self.failfast.set()
# Return (test_count, errors, failures, skipped_count, expected_failure_count, unexpected_success_count)
return (
result.testsRun,
[self._format_error(result, error) for error in result.errors],
[self._format_error(result, failure) for failure in result.failures],
len(result.skipped),
len(result.expectedFailures),
len(result.unexpectedSuccesses),
)
@staticmethod
def _format_error(result, error):
return "\n".join(
[
unittest.TextTestResult.separator1,
result.getDescription(error[0]),
unittest.TextTestResult.separator2,
error[1],
]
)
class ParallelTextTestResult(unittest.TextTestResult):
def __init__(self, stream, descriptions, verbosity):
stream = type(stream)(sys.stderr)
super().__init__(stream, descriptions, verbosity)
def startTest(self, test):
if self.showAll:
self.stream.writeln(f"{self.getDescription(test)} ...")
self.stream.flush()
super(unittest.TextTestResult, self).startTest(test)
def _add_helper(self, test, dots_message, show_all_message):
if self.showAll:
self.stream.writeln(f"{self.getDescription(test)} ... {show_all_message}")
elif self.dots:
self.stream.write(dots_message)
self.stream.flush()
def addSuccess(self, test):
super(unittest.TextTestResult, self).addSuccess(test)
self._add_helper(test, ".", "ok")
def addError(self, test, err):
super(unittest.TextTestResult, self).addError(test, err)
self._add_helper(test, "E", "ERROR")
def addFailure(self, test, err):
super(unittest.TextTestResult, self).addFailure(test, err)
self._add_helper(test, "F", "FAIL")
def addSkip(self, test, reason):
super(unittest.TextTestResult, self).addSkip(test, reason)
self._add_helper(test, "s", f"skipped {reason!r}")
def addExpectedFailure(self, test, err):
super(unittest.TextTestResult, self).addExpectedFailure(test, err)
self._add_helper(test, "x", "expected failure")
def addUnexpectedSuccess(self, test):
super(unittest.TextTestResult, self).addUnexpectedSuccess(test)
self._add_helper(test, "u", "unexpected success")
def printErrors(self):
pass
# NVIDIA Modifications from here until end of file
def set_worker_cache(args, temp_dir):
"""This function is run at the start of ever new process. It changes the Warp cache to avoid conflicts."""
with _coverage(args, temp_dir):
import warp as wp
from warp.thirdparty import appdirs
pid = os.getpid()
cache_root_dir = appdirs.user_cache_dir(
appname="warp", appauthor="NVIDIA Corporation", version=f"{wp.config.version}-{pid}"
)
wp.config.kernel_cache_dir = cache_root_dir
wp.build.clear_kernel_cache()
def _tc_escape(s):
s = s.replace("|", "||")
s = s.replace("\n", "|n")
s = s.replace("\r", "|r")
s = s.replace("'", "|'")
s = s.replace("[", "|[")
s = s.replace("]", "|]")
return s
class ParallelTeamCityTestResult(unittest.TextTestResult):
def __init__(self, stream, descriptions, verbosity):
stream = type(stream)(sys.stderr)
super().__init__(stream, descriptions, verbosity)
def startTest(self, test):
if self.showAll:
self.stream.writeln(f"{self.getDescription(test)} ...")
self.stream.flush()
self.start_time = time.perf_counter_ns()
super(unittest.TextTestResult, self).startTest(test)
def _add_helper(self, test, dots_message, show_all_message):
if self.showAll:
self.stream.writeln(f"{self.getDescription(test)} ... {show_all_message}")
elif self.dots:
self.stream.write(dots_message)
self.stream.flush()
def addSuccess(self, test):
super(unittest.TextTestResult, self).addSuccess(test)
self._add_helper(test, ".", "ok")
self.reportSuccess(test)
def addError(self, test, err):
super(unittest.TextTestResult, self).addError(test, err)
self._add_helper(test, "E", "ERROR")
self.reportFailure(test, err)
def addFailure(self, test, err):
super(unittest.TextTestResult, self).addFailure(test, err)
self._add_helper(test, "F", "FAIL")
self.reportFailure(test, err)
def addSkip(self, test, reason):
super(unittest.TextTestResult, self).addSkip(test, reason)
self._add_helper(test, "s", f"skipped {reason!r}")
self.reportIgnored(test, reason)
def addExpectedFailure(self, test, err):
super(unittest.TextTestResult, self).addExpectedFailure(test, err)
self._add_helper(test, "x", "expected failure")
self.reportSuccess(test)
def addUnexpectedSuccess(self, test):
super(unittest.TextTestResult, self).addUnexpectedSuccess(test)
self._add_helper(test, "u", "unexpected success")
self.reportFailure(test, "unexpected success")
def addSubTest(self, test, subtest, err):
super(unittest.TextTestResult, self).addSubTest(test, subtest, err)
if err is not None:
self._add_helper(test, "E", "ERROR")
self.reportSubTestFailure(test, err)
def printErrors(self):
pass
def reportIgnored(self, test, reason):
test_id = test.id()
reason_str = str(reason)
print(reason_str)
self.stream.writeln(f"##teamcity[testIgnored name='{test_id}' message='{_tc_escape(str(reason))}']")
self.stream.flush()
def reportSuccess(self, test):
duration = round((time.perf_counter_ns() - self.start_time) / 1e6) # [ms]
test_id = test.id()
self.stream.writeln(f"##teamcity[testStarted name='{test_id}']")
self.stream.writeln(f"##teamcity[testFinished name='{test_id}' duration='{duration}']")
self.stream.flush()
def reportFailure(self, test, err):
test_id = test.id()
self.stream.writeln(f"##teamcity[testStarted name='{test_id}']")
self.stream.writeln(
f"##teamcity[testFailed name='{test_id}' message='{_tc_escape(str(err[1]))}' details='{_tc_escape(str(err[2]))}']"
)
self.stream.writeln(f"##teamcity[testFinished name='{test_id}']")
self.stream.flush()
def reportSubTestFailure(self, test, err):
test_id = test.id()
self.stream.writeln(f"##teamcity[testStarted name='{test_id}']")
self.stream.writeln(
f"##teamcity[testFailed name='{test_id}' message='{_tc_escape(str(err[1]))}' details='{_tc_escape(self._exc_info_to_string(err, test))}']"
)
self.stream.writeln(f"##teamcity[testFinished name='{test_id}']")
self.stream.flush()
if __name__ == "__main__": # pragma: no cover
main()