| | """Test result object""" |
| |
|
| | import io |
| | import sys |
| | import traceback |
| |
|
| | from . import util |
| | from functools import wraps |
| |
|
| | __unittest = True |
| |
|
| | def failfast(method): |
| | @wraps(method) |
| | def inner(self, *args, **kw): |
| | if getattr(self, 'failfast', False): |
| | self.stop() |
| | return method(self, *args, **kw) |
| | return inner |
| |
|
| | STDOUT_LINE = '\nStdout:\n%s' |
| | STDERR_LINE = '\nStderr:\n%s' |
| |
|
| |
|
| | class TestResult(object): |
| | """Holder for test result information. |
| | |
| | Test results are automatically managed by the TestCase and TestSuite |
| | classes, and do not need to be explicitly manipulated by writers of tests. |
| | |
| | Each instance holds the total number of tests run, and collections of |
| | failures and errors that occurred among those test runs. The collections |
| | contain tuples of (testcase, exceptioninfo), where exceptioninfo is the |
| | formatted traceback of the error that occurred. |
| | """ |
| | _previousTestClass = None |
| | _testRunEntered = False |
| | _moduleSetUpFailed = False |
| | def __init__(self, stream=None, descriptions=None, verbosity=None): |
| | self.failfast = False |
| | self.failures = [] |
| | self.errors = [] |
| | self.testsRun = 0 |
| | self.skipped = [] |
| | self.expectedFailures = [] |
| | self.unexpectedSuccesses = [] |
| | self.shouldStop = False |
| | self.buffer = False |
| | self.tb_locals = False |
| | self._stdout_buffer = None |
| | self._stderr_buffer = None |
| | self._original_stdout = sys.stdout |
| | self._original_stderr = sys.stderr |
| | self._mirrorOutput = False |
| |
|
| | def printErrors(self): |
| | "Called by TestRunner after test run" |
| |
|
| | def startTest(self, test): |
| | "Called when the given test is about to be run" |
| | self.testsRun += 1 |
| | self._mirrorOutput = False |
| | self._setupStdout() |
| |
|
| | def _setupStdout(self): |
| | if self.buffer: |
| | if self._stderr_buffer is None: |
| | self._stderr_buffer = io.StringIO() |
| | self._stdout_buffer = io.StringIO() |
| | sys.stdout = self._stdout_buffer |
| | sys.stderr = self._stderr_buffer |
| |
|
| | def startTestRun(self): |
| | """Called once before any tests are executed. |
| | |
| | See startTest for a method called before each test. |
| | """ |
| |
|
| | def stopTest(self, test): |
| | """Called when the given test has been run""" |
| | self._restoreStdout() |
| | self._mirrorOutput = False |
| |
|
| | def _restoreStdout(self): |
| | if self.buffer: |
| | if self._mirrorOutput: |
| | output = sys.stdout.getvalue() |
| | error = sys.stderr.getvalue() |
| | if output: |
| | if not output.endswith('\n'): |
| | output += '\n' |
| | self._original_stdout.write(STDOUT_LINE % output) |
| | if error: |
| | if not error.endswith('\n'): |
| | error += '\n' |
| | self._original_stderr.write(STDERR_LINE % error) |
| |
|
| | sys.stdout = self._original_stdout |
| | sys.stderr = self._original_stderr |
| | self._stdout_buffer.seek(0) |
| | self._stdout_buffer.truncate() |
| | self._stderr_buffer.seek(0) |
| | self._stderr_buffer.truncate() |
| |
|
| | def stopTestRun(self): |
| | """Called once after all tests are executed. |
| | |
| | See stopTest for a method called after each test. |
| | """ |
| |
|
| | @failfast |
| | def addError(self, test, err): |
| | """Called when an error has occurred. 'err' is a tuple of values as |
| | returned by sys.exc_info(). |
| | """ |
| | self.errors.append((test, self._exc_info_to_string(err, test))) |
| | self._mirrorOutput = True |
| |
|
| | @failfast |
| | def addFailure(self, test, err): |
| | """Called when an error has occurred. 'err' is a tuple of values as |
| | returned by sys.exc_info().""" |
| | self.failures.append((test, self._exc_info_to_string(err, test))) |
| | self._mirrorOutput = True |
| |
|
| | def addSubTest(self, test, subtest, err): |
| | """Called at the end of a subtest. |
| | 'err' is None if the subtest ended successfully, otherwise it's a |
| | tuple of values as returned by sys.exc_info(). |
| | """ |
| | |
| | |
| | if err is not None: |
| | if getattr(self, 'failfast', False): |
| | self.stop() |
| | if issubclass(err[0], test.failureException): |
| | errors = self.failures |
| | else: |
| | errors = self.errors |
| | errors.append((subtest, self._exc_info_to_string(err, test))) |
| | self._mirrorOutput = True |
| |
|
| | def addSuccess(self, test): |
| | "Called when a test has completed successfully" |
| | pass |
| |
|
| | def addSkip(self, test, reason): |
| | """Called when a test is skipped.""" |
| | self.skipped.append((test, reason)) |
| |
|
| | def addExpectedFailure(self, test, err): |
| | """Called when an expected failure/error occurred.""" |
| | self.expectedFailures.append( |
| | (test, self._exc_info_to_string(err, test))) |
| |
|
| | @failfast |
| | def addUnexpectedSuccess(self, test): |
| | """Called when a test was expected to fail, but succeed.""" |
| | self.unexpectedSuccesses.append(test) |
| |
|
| | def wasSuccessful(self): |
| | """Tells whether or not this result was a success.""" |
| | |
| | |
| | |
| | return ((len(self.failures) == len(self.errors) == 0) and |
| | (not hasattr(self, 'unexpectedSuccesses') or |
| | len(self.unexpectedSuccesses) == 0)) |
| |
|
| | def stop(self): |
| | """Indicates that the tests should be aborted.""" |
| | self.shouldStop = True |
| |
|
| | def _exc_info_to_string(self, err, test): |
| | """Converts a sys.exc_info()-style tuple of values into a string.""" |
| | exctype, value, tb = err |
| | tb = self._clean_tracebacks(exctype, value, tb, test) |
| | tb_e = traceback.TracebackException( |
| | exctype, value, tb, capture_locals=self.tb_locals) |
| | msgLines = list(tb_e.format()) |
| |
|
| | if self.buffer: |
| | output = sys.stdout.getvalue() |
| | error = sys.stderr.getvalue() |
| | if output: |
| | if not output.endswith('\n'): |
| | output += '\n' |
| | msgLines.append(STDOUT_LINE % output) |
| | if error: |
| | if not error.endswith('\n'): |
| | error += '\n' |
| | msgLines.append(STDERR_LINE % error) |
| | return ''.join(msgLines) |
| |
|
| | def _clean_tracebacks(self, exctype, value, tb, test): |
| | ret = None |
| | first = True |
| | excs = [(exctype, value, tb)] |
| | while excs: |
| | (exctype, value, tb) = excs.pop() |
| | |
| | while tb and self._is_relevant_tb_level(tb): |
| | tb = tb.tb_next |
| |
|
| | |
| | if exctype is test.failureException: |
| | self._remove_unittest_tb_frames(tb) |
| |
|
| | if first: |
| | ret = tb |
| | first = False |
| | else: |
| | value.__traceback__ = tb |
| |
|
| | if value is not None: |
| | for c in (value.__cause__, value.__context__): |
| | if c is not None: |
| | excs.append((type(c), c, c.__traceback__)) |
| | return ret |
| |
|
| | def _is_relevant_tb_level(self, tb): |
| | return '__unittest' in tb.tb_frame.f_globals |
| |
|
| | def _remove_unittest_tb_frames(self, tb): |
| | '''Truncates usercode tb at the first unittest frame. |
| | |
| | If the first frame of the traceback is in user code, |
| | the prefix up to the first unittest frame is returned. |
| | If the first frame is already in the unittest module, |
| | the traceback is not modified. |
| | ''' |
| | prev = None |
| | while tb and not self._is_relevant_tb_level(tb): |
| | prev = tb |
| | tb = tb.tb_next |
| | if prev is not None: |
| | prev.tb_next = None |
| |
|
| | def __repr__(self): |
| | return ("<%s run=%i errors=%i failures=%i>" % |
| | (util.strclass(self.__class__), self.testsRun, len(self.errors), |
| | len(self.failures))) |
| |
|