|
|
|
|
|
"""Access and control log capturing.""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
from contextlib import contextmanager |
|
|
from contextlib import nullcontext |
|
|
from datetime import datetime |
|
|
from datetime import timedelta |
|
|
from datetime import timezone |
|
|
import io |
|
|
from io import StringIO |
|
|
import logging |
|
|
from logging import LogRecord |
|
|
import os |
|
|
from pathlib import Path |
|
|
import re |
|
|
from types import TracebackType |
|
|
from typing import AbstractSet |
|
|
from typing import Dict |
|
|
from typing import final |
|
|
from typing import Generator |
|
|
from typing import Generic |
|
|
from typing import List |
|
|
from typing import Literal |
|
|
from typing import Mapping |
|
|
from typing import TYPE_CHECKING |
|
|
from typing import TypeVar |
|
|
|
|
|
from _pytest import nodes |
|
|
from _pytest._io import TerminalWriter |
|
|
from _pytest.capture import CaptureManager |
|
|
from _pytest.config import _strtobool |
|
|
from _pytest.config import Config |
|
|
from _pytest.config import create_terminal_writer |
|
|
from _pytest.config import hookimpl |
|
|
from _pytest.config import UsageError |
|
|
from _pytest.config.argparsing import Parser |
|
|
from _pytest.deprecated import check_ispytest |
|
|
from _pytest.fixtures import fixture |
|
|
from _pytest.fixtures import FixtureRequest |
|
|
from _pytest.main import Session |
|
|
from _pytest.stash import StashKey |
|
|
from _pytest.terminal import TerminalReporter |
|
|
|
|
|
|
|
|
if TYPE_CHECKING: |
|
|
logging_StreamHandler = logging.StreamHandler[StringIO] |
|
|
else: |
|
|
logging_StreamHandler = logging.StreamHandler |
|
|
|
|
|
DEFAULT_LOG_FORMAT = "%(levelname)-8s %(name)s:%(filename)s:%(lineno)d %(message)s" |
|
|
DEFAULT_LOG_DATE_FORMAT = "%H:%M:%S" |
|
|
_ANSI_ESCAPE_SEQ = re.compile(r"\x1b\[[\d;]+m") |
|
|
caplog_handler_key = StashKey["LogCaptureHandler"]() |
|
|
caplog_records_key = StashKey[Dict[str, List[logging.LogRecord]]]() |
|
|
|
|
|
|
|
|
def _remove_ansi_escape_sequences(text: str) -> str: |
|
|
return _ANSI_ESCAPE_SEQ.sub("", text) |
|
|
|
|
|
|
|
|
class DatetimeFormatter(logging.Formatter): |
|
|
"""A logging formatter which formats record with |
|
|
:func:`datetime.datetime.strftime` formatter instead of |
|
|
:func:`time.strftime` in case of microseconds in format string. |
|
|
""" |
|
|
|
|
|
def formatTime(self, record: LogRecord, datefmt: str | None = None) -> str: |
|
|
if datefmt and "%f" in datefmt: |
|
|
ct = self.converter(record.created) |
|
|
tz = timezone(timedelta(seconds=ct.tm_gmtoff), ct.tm_zone) |
|
|
|
|
|
|
|
|
|
|
|
dt = datetime(*ct[0:6], microsecond=int(record.msecs * 1000), tzinfo=tz) |
|
|
return dt.strftime(datefmt) |
|
|
|
|
|
return super().formatTime(record, datefmt) |
|
|
|
|
|
|
|
|
class ColoredLevelFormatter(DatetimeFormatter): |
|
|
"""A logging formatter which colorizes the %(levelname)..s part of the |
|
|
log format passed to __init__.""" |
|
|
|
|
|
LOGLEVEL_COLOROPTS: Mapping[int, AbstractSet[str]] = { |
|
|
logging.CRITICAL: {"red"}, |
|
|
logging.ERROR: {"red", "bold"}, |
|
|
logging.WARNING: {"yellow"}, |
|
|
logging.WARN: {"yellow"}, |
|
|
logging.INFO: {"green"}, |
|
|
logging.DEBUG: {"purple"}, |
|
|
logging.NOTSET: set(), |
|
|
} |
|
|
LEVELNAME_FMT_REGEX = re.compile(r"%\(levelname\)([+-.]?\d*(?:\.\d+)?s)") |
|
|
|
|
|
def __init__(self, terminalwriter: TerminalWriter, *args, **kwargs) -> None: |
|
|
super().__init__(*args, **kwargs) |
|
|
self._terminalwriter = terminalwriter |
|
|
self._original_fmt = self._style._fmt |
|
|
self._level_to_fmt_mapping: dict[int, str] = {} |
|
|
|
|
|
for level, color_opts in self.LOGLEVEL_COLOROPTS.items(): |
|
|
self.add_color_level(level, *color_opts) |
|
|
|
|
|
def add_color_level(self, level: int, *color_opts: str) -> None: |
|
|
"""Add or update color opts for a log level. |
|
|
|
|
|
:param level: |
|
|
Log level to apply a style to, e.g. ``logging.INFO``. |
|
|
:param color_opts: |
|
|
ANSI escape sequence color options. Capitalized colors indicates |
|
|
background color, i.e. ``'green', 'Yellow', 'bold'`` will give bold |
|
|
green text on yellow background. |
|
|
|
|
|
.. warning:: |
|
|
This is an experimental API. |
|
|
""" |
|
|
assert self._fmt is not None |
|
|
levelname_fmt_match = self.LEVELNAME_FMT_REGEX.search(self._fmt) |
|
|
if not levelname_fmt_match: |
|
|
return |
|
|
levelname_fmt = levelname_fmt_match.group() |
|
|
|
|
|
formatted_levelname = levelname_fmt % {"levelname": logging.getLevelName(level)} |
|
|
|
|
|
|
|
|
color_kwargs = {name: True for name in color_opts} |
|
|
colorized_formatted_levelname = self._terminalwriter.markup( |
|
|
formatted_levelname, **color_kwargs |
|
|
) |
|
|
self._level_to_fmt_mapping[level] = self.LEVELNAME_FMT_REGEX.sub( |
|
|
colorized_formatted_levelname, self._fmt |
|
|
) |
|
|
|
|
|
def format(self, record: logging.LogRecord) -> str: |
|
|
fmt = self._level_to_fmt_mapping.get(record.levelno, self._original_fmt) |
|
|
self._style._fmt = fmt |
|
|
return super().format(record) |
|
|
|
|
|
|
|
|
class PercentStyleMultiline(logging.PercentStyle): |
|
|
"""A logging style with special support for multiline messages. |
|
|
|
|
|
If the message of a record consists of multiple lines, this style |
|
|
formats the message as if each line were logged separately. |
|
|
""" |
|
|
|
|
|
def __init__(self, fmt: str, auto_indent: int | str | bool | None) -> None: |
|
|
super().__init__(fmt) |
|
|
self._auto_indent = self._get_auto_indent(auto_indent) |
|
|
|
|
|
@staticmethod |
|
|
def _get_auto_indent(auto_indent_option: int | str | bool | None) -> int: |
|
|
"""Determine the current auto indentation setting. |
|
|
|
|
|
Specify auto indent behavior (on/off/fixed) by passing in |
|
|
extra={"auto_indent": [value]} to the call to logging.log() or |
|
|
using a --log-auto-indent [value] command line or the |
|
|
log_auto_indent [value] config option. |
|
|
|
|
|
Default behavior is auto-indent off. |
|
|
|
|
|
Using the string "True" or "on" or the boolean True as the value |
|
|
turns auto indent on, using the string "False" or "off" or the |
|
|
boolean False or the int 0 turns it off, and specifying a |
|
|
positive integer fixes the indentation position to the value |
|
|
specified. |
|
|
|
|
|
Any other values for the option are invalid, and will silently be |
|
|
converted to the default. |
|
|
|
|
|
:param None|bool|int|str auto_indent_option: |
|
|
User specified option for indentation from command line, config |
|
|
or extra kwarg. Accepts int, bool or str. str option accepts the |
|
|
same range of values as boolean config options, as well as |
|
|
positive integers represented in str form. |
|
|
|
|
|
:returns: |
|
|
Indentation value, which can be |
|
|
-1 (automatically determine indentation) or |
|
|
0 (auto-indent turned off) or |
|
|
>0 (explicitly set indentation position). |
|
|
""" |
|
|
if auto_indent_option is None: |
|
|
return 0 |
|
|
elif isinstance(auto_indent_option, bool): |
|
|
if auto_indent_option: |
|
|
return -1 |
|
|
else: |
|
|
return 0 |
|
|
elif isinstance(auto_indent_option, int): |
|
|
return int(auto_indent_option) |
|
|
elif isinstance(auto_indent_option, str): |
|
|
try: |
|
|
return int(auto_indent_option) |
|
|
except ValueError: |
|
|
pass |
|
|
try: |
|
|
if _strtobool(auto_indent_option): |
|
|
return -1 |
|
|
except ValueError: |
|
|
return 0 |
|
|
|
|
|
return 0 |
|
|
|
|
|
def format(self, record: logging.LogRecord) -> str: |
|
|
if "\n" in record.message: |
|
|
if hasattr(record, "auto_indent"): |
|
|
|
|
|
auto_indent = self._get_auto_indent(record.auto_indent) |
|
|
else: |
|
|
auto_indent = self._auto_indent |
|
|
|
|
|
if auto_indent: |
|
|
lines = record.message.splitlines() |
|
|
formatted = self._fmt % {**record.__dict__, "message": lines[0]} |
|
|
|
|
|
if auto_indent < 0: |
|
|
indentation = _remove_ansi_escape_sequences(formatted).find( |
|
|
lines[0] |
|
|
) |
|
|
else: |
|
|
|
|
|
indentation = auto_indent |
|
|
lines[0] = formatted |
|
|
return ("\n" + " " * indentation).join(lines) |
|
|
return self._fmt % record.__dict__ |
|
|
|
|
|
|
|
|
def get_option_ini(config: Config, *names: str): |
|
|
for name in names: |
|
|
ret = config.getoption(name) |
|
|
if ret is None: |
|
|
ret = config.getini(name) |
|
|
if ret: |
|
|
return ret |
|
|
|
|
|
|
|
|
def pytest_addoption(parser: Parser) -> None: |
|
|
"""Add options to control log capturing.""" |
|
|
group = parser.getgroup("logging") |
|
|
|
|
|
def add_option_ini(option, dest, default=None, type=None, **kwargs): |
|
|
parser.addini( |
|
|
dest, default=default, type=type, help="Default value for " + option |
|
|
) |
|
|
group.addoption(option, dest=dest, **kwargs) |
|
|
|
|
|
add_option_ini( |
|
|
"--log-level", |
|
|
dest="log_level", |
|
|
default=None, |
|
|
metavar="LEVEL", |
|
|
help=( |
|
|
"Level of messages to catch/display." |
|
|
" Not set by default, so it depends on the root/parent log handler's" |
|
|
' effective level, where it is "WARNING" by default.' |
|
|
), |
|
|
) |
|
|
add_option_ini( |
|
|
"--log-format", |
|
|
dest="log_format", |
|
|
default=DEFAULT_LOG_FORMAT, |
|
|
help="Log format used by the logging module", |
|
|
) |
|
|
add_option_ini( |
|
|
"--log-date-format", |
|
|
dest="log_date_format", |
|
|
default=DEFAULT_LOG_DATE_FORMAT, |
|
|
help="Log date format used by the logging module", |
|
|
) |
|
|
parser.addini( |
|
|
"log_cli", |
|
|
default=False, |
|
|
type="bool", |
|
|
help='Enable log display during test run (also known as "live logging")', |
|
|
) |
|
|
add_option_ini( |
|
|
"--log-cli-level", dest="log_cli_level", default=None, help="CLI logging level" |
|
|
) |
|
|
add_option_ini( |
|
|
"--log-cli-format", |
|
|
dest="log_cli_format", |
|
|
default=None, |
|
|
help="Log format used by the logging module", |
|
|
) |
|
|
add_option_ini( |
|
|
"--log-cli-date-format", |
|
|
dest="log_cli_date_format", |
|
|
default=None, |
|
|
help="Log date format used by the logging module", |
|
|
) |
|
|
add_option_ini( |
|
|
"--log-file", |
|
|
dest="log_file", |
|
|
default=None, |
|
|
help="Path to a file when logging will be written to", |
|
|
) |
|
|
add_option_ini( |
|
|
"--log-file-mode", |
|
|
dest="log_file_mode", |
|
|
default="w", |
|
|
choices=["w", "a"], |
|
|
help="Log file open mode", |
|
|
) |
|
|
add_option_ini( |
|
|
"--log-file-level", |
|
|
dest="log_file_level", |
|
|
default=None, |
|
|
help="Log file logging level", |
|
|
) |
|
|
add_option_ini( |
|
|
"--log-file-format", |
|
|
dest="log_file_format", |
|
|
default=None, |
|
|
help="Log format used by the logging module", |
|
|
) |
|
|
add_option_ini( |
|
|
"--log-file-date-format", |
|
|
dest="log_file_date_format", |
|
|
default=None, |
|
|
help="Log date format used by the logging module", |
|
|
) |
|
|
add_option_ini( |
|
|
"--log-auto-indent", |
|
|
dest="log_auto_indent", |
|
|
default=None, |
|
|
help="Auto-indent multiline messages passed to the logging module. Accepts true|on, false|off or an integer.", |
|
|
) |
|
|
group.addoption( |
|
|
"--log-disable", |
|
|
action="append", |
|
|
default=[], |
|
|
dest="logger_disable", |
|
|
help="Disable a logger by name. Can be passed multiple times.", |
|
|
) |
|
|
|
|
|
|
|
|
_HandlerType = TypeVar("_HandlerType", bound=logging.Handler) |
|
|
|
|
|
|
|
|
|
|
|
class catching_logs(Generic[_HandlerType]): |
|
|
"""Context manager that prepares the whole logging machinery properly.""" |
|
|
|
|
|
__slots__ = ("handler", "level", "orig_level") |
|
|
|
|
|
def __init__(self, handler: _HandlerType, level: int | None = None) -> None: |
|
|
self.handler = handler |
|
|
self.level = level |
|
|
|
|
|
def __enter__(self) -> _HandlerType: |
|
|
root_logger = logging.getLogger() |
|
|
if self.level is not None: |
|
|
self.handler.setLevel(self.level) |
|
|
root_logger.addHandler(self.handler) |
|
|
if self.level is not None: |
|
|
self.orig_level = root_logger.level |
|
|
root_logger.setLevel(min(self.orig_level, self.level)) |
|
|
return self.handler |
|
|
|
|
|
def __exit__( |
|
|
self, |
|
|
exc_type: type[BaseException] | None, |
|
|
exc_val: BaseException | None, |
|
|
exc_tb: TracebackType | None, |
|
|
) -> None: |
|
|
root_logger = logging.getLogger() |
|
|
if self.level is not None: |
|
|
root_logger.setLevel(self.orig_level) |
|
|
root_logger.removeHandler(self.handler) |
|
|
|
|
|
|
|
|
class LogCaptureHandler(logging_StreamHandler): |
|
|
"""A logging handler that stores log records and the log text.""" |
|
|
|
|
|
def __init__(self) -> None: |
|
|
"""Create a new log handler.""" |
|
|
super().__init__(StringIO()) |
|
|
self.records: list[logging.LogRecord] = [] |
|
|
|
|
|
def emit(self, record: logging.LogRecord) -> None: |
|
|
"""Keep the log records in a list in addition to the log text.""" |
|
|
self.records.append(record) |
|
|
super().emit(record) |
|
|
|
|
|
def reset(self) -> None: |
|
|
self.records = [] |
|
|
self.stream = StringIO() |
|
|
|
|
|
def clear(self) -> None: |
|
|
self.records.clear() |
|
|
self.stream = StringIO() |
|
|
|
|
|
def handleError(self, record: logging.LogRecord) -> None: |
|
|
if logging.raiseExceptions: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
raise |
|
|
|
|
|
|
|
|
@final |
|
|
class LogCaptureFixture: |
|
|
"""Provides access and control of log capturing.""" |
|
|
|
|
|
def __init__(self, item: nodes.Node, *, _ispytest: bool = False) -> None: |
|
|
check_ispytest(_ispytest) |
|
|
self._item = item |
|
|
self._initial_handler_level: int | None = None |
|
|
|
|
|
self._initial_logger_levels: dict[str | None, int] = {} |
|
|
self._initial_disabled_logging_level: int | None = None |
|
|
|
|
|
def _finalize(self) -> None: |
|
|
"""Finalize the fixture. |
|
|
|
|
|
This restores the log levels and the disabled logging levels changed by :meth:`set_level`. |
|
|
""" |
|
|
|
|
|
if self._initial_handler_level is not None: |
|
|
self.handler.setLevel(self._initial_handler_level) |
|
|
for logger_name, level in self._initial_logger_levels.items(): |
|
|
logger = logging.getLogger(logger_name) |
|
|
logger.setLevel(level) |
|
|
|
|
|
if self._initial_disabled_logging_level is not None: |
|
|
logging.disable(self._initial_disabled_logging_level) |
|
|
self._initial_disabled_logging_level = None |
|
|
|
|
|
@property |
|
|
def handler(self) -> LogCaptureHandler: |
|
|
"""Get the logging handler used by the fixture.""" |
|
|
return self._item.stash[caplog_handler_key] |
|
|
|
|
|
def get_records( |
|
|
self, when: Literal["setup", "call", "teardown"] |
|
|
) -> list[logging.LogRecord]: |
|
|
"""Get the logging records for one of the possible test phases. |
|
|
|
|
|
:param when: |
|
|
Which test phase to obtain the records from. |
|
|
Valid values are: "setup", "call" and "teardown". |
|
|
|
|
|
:returns: The list of captured records at the given stage. |
|
|
|
|
|
.. versionadded:: 3.4 |
|
|
""" |
|
|
return self._item.stash[caplog_records_key].get(when, []) |
|
|
|
|
|
@property |
|
|
def text(self) -> str: |
|
|
"""The formatted log text.""" |
|
|
return _remove_ansi_escape_sequences(self.handler.stream.getvalue()) |
|
|
|
|
|
@property |
|
|
def records(self) -> list[logging.LogRecord]: |
|
|
"""The list of log records.""" |
|
|
return self.handler.records |
|
|
|
|
|
@property |
|
|
def record_tuples(self) -> list[tuple[str, int, str]]: |
|
|
"""A list of a stripped down version of log records intended |
|
|
for use in assertion comparison. |
|
|
|
|
|
The format of the tuple is: |
|
|
|
|
|
(logger_name, log_level, message) |
|
|
""" |
|
|
return [(r.name, r.levelno, r.getMessage()) for r in self.records] |
|
|
|
|
|
@property |
|
|
def messages(self) -> list[str]: |
|
|
"""A list of format-interpolated log messages. |
|
|
|
|
|
Unlike 'records', which contains the format string and parameters for |
|
|
interpolation, log messages in this list are all interpolated. |
|
|
|
|
|
Unlike 'text', which contains the output from the handler, log |
|
|
messages in this list are unadorned with levels, timestamps, etc, |
|
|
making exact comparisons more reliable. |
|
|
|
|
|
Note that traceback or stack info (from :func:`logging.exception` or |
|
|
the `exc_info` or `stack_info` arguments to the logging functions) is |
|
|
not included, as this is added by the formatter in the handler. |
|
|
|
|
|
.. versionadded:: 3.7 |
|
|
""" |
|
|
return [r.getMessage() for r in self.records] |
|
|
|
|
|
def clear(self) -> None: |
|
|
"""Reset the list of log records and the captured log text.""" |
|
|
self.handler.clear() |
|
|
|
|
|
def _force_enable_logging( |
|
|
self, level: int | str, logger_obj: logging.Logger |
|
|
) -> int: |
|
|
"""Enable the desired logging level if the global level was disabled via ``logging.disabled``. |
|
|
|
|
|
Only enables logging levels greater than or equal to the requested ``level``. |
|
|
|
|
|
Does nothing if the desired ``level`` wasn't disabled. |
|
|
|
|
|
:param level: |
|
|
The logger level caplog should capture. |
|
|
All logging is enabled if a non-standard logging level string is supplied. |
|
|
Valid level strings are in :data:`logging._nameToLevel`. |
|
|
:param logger_obj: The logger object to check. |
|
|
|
|
|
:return: The original disabled logging level. |
|
|
""" |
|
|
original_disable_level: int = logger_obj.manager.disable |
|
|
|
|
|
if isinstance(level, str): |
|
|
|
|
|
level = logging.getLevelName(level) |
|
|
|
|
|
if not isinstance(level, int): |
|
|
|
|
|
logging.disable(logging.NOTSET) |
|
|
elif not logger_obj.isEnabledFor(level): |
|
|
|
|
|
|
|
|
disable_level = max(level - 10, logging.NOTSET) |
|
|
logging.disable(disable_level) |
|
|
|
|
|
return original_disable_level |
|
|
|
|
|
def set_level(self, level: int | str, logger: str | None = None) -> None: |
|
|
"""Set the threshold level of a logger for the duration of a test. |
|
|
|
|
|
Logging messages which are less severe than this level will not be captured. |
|
|
|
|
|
.. versionchanged:: 3.4 |
|
|
The levels of the loggers changed by this function will be |
|
|
restored to their initial values at the end of the test. |
|
|
|
|
|
Will enable the requested logging level if it was disabled via :func:`logging.disable`. |
|
|
|
|
|
:param level: The level. |
|
|
:param logger: The logger to update. If not given, the root logger. |
|
|
""" |
|
|
logger_obj = logging.getLogger(logger) |
|
|
|
|
|
self._initial_logger_levels.setdefault(logger, logger_obj.level) |
|
|
logger_obj.setLevel(level) |
|
|
if self._initial_handler_level is None: |
|
|
self._initial_handler_level = self.handler.level |
|
|
self.handler.setLevel(level) |
|
|
initial_disabled_logging_level = self._force_enable_logging(level, logger_obj) |
|
|
if self._initial_disabled_logging_level is None: |
|
|
self._initial_disabled_logging_level = initial_disabled_logging_level |
|
|
|
|
|
@contextmanager |
|
|
def at_level(self, level: int | str, logger: str | None = None) -> Generator[None]: |
|
|
"""Context manager that sets the level for capturing of logs. After |
|
|
the end of the 'with' statement the level is restored to its original |
|
|
value. |
|
|
|
|
|
Will enable the requested logging level if it was disabled via :func:`logging.disable`. |
|
|
|
|
|
:param level: The level. |
|
|
:param logger: The logger to update. If not given, the root logger. |
|
|
""" |
|
|
logger_obj = logging.getLogger(logger) |
|
|
orig_level = logger_obj.level |
|
|
logger_obj.setLevel(level) |
|
|
handler_orig_level = self.handler.level |
|
|
self.handler.setLevel(level) |
|
|
original_disable_level = self._force_enable_logging(level, logger_obj) |
|
|
try: |
|
|
yield |
|
|
finally: |
|
|
logger_obj.setLevel(orig_level) |
|
|
self.handler.setLevel(handler_orig_level) |
|
|
logging.disable(original_disable_level) |
|
|
|
|
|
@contextmanager |
|
|
def filtering(self, filter_: logging.Filter) -> Generator[None]: |
|
|
"""Context manager that temporarily adds the given filter to the caplog's |
|
|
:meth:`handler` for the 'with' statement block, and removes that filter at the |
|
|
end of the block. |
|
|
|
|
|
:param filter_: A custom :class:`logging.Filter` object. |
|
|
|
|
|
.. versionadded:: 7.5 |
|
|
""" |
|
|
self.handler.addFilter(filter_) |
|
|
try: |
|
|
yield |
|
|
finally: |
|
|
self.handler.removeFilter(filter_) |
|
|
|
|
|
|
|
|
@fixture |
|
|
def caplog(request: FixtureRequest) -> Generator[LogCaptureFixture]: |
|
|
"""Access and control log capturing. |
|
|
|
|
|
Captured logs are available through the following properties/methods:: |
|
|
|
|
|
* caplog.messages -> list of format-interpolated log messages |
|
|
* caplog.text -> string containing formatted log output |
|
|
* caplog.records -> list of logging.LogRecord instances |
|
|
* caplog.record_tuples -> list of (logger_name, level, message) tuples |
|
|
* caplog.clear() -> clear captured records and formatted log output string |
|
|
""" |
|
|
result = LogCaptureFixture(request.node, _ispytest=True) |
|
|
yield result |
|
|
result._finalize() |
|
|
|
|
|
|
|
|
def get_log_level_for_setting(config: Config, *setting_names: str) -> int | None: |
|
|
for setting_name in setting_names: |
|
|
log_level = config.getoption(setting_name) |
|
|
if log_level is None: |
|
|
log_level = config.getini(setting_name) |
|
|
if log_level: |
|
|
break |
|
|
else: |
|
|
return None |
|
|
|
|
|
if isinstance(log_level, str): |
|
|
log_level = log_level.upper() |
|
|
try: |
|
|
return int(getattr(logging, log_level, log_level)) |
|
|
except ValueError as e: |
|
|
|
|
|
raise UsageError( |
|
|
f"'{log_level}' is not recognized as a logging level name for " |
|
|
f"'{setting_name}'. Please consider passing the " |
|
|
"logging level num instead." |
|
|
) from e |
|
|
|
|
|
|
|
|
|
|
|
@hookimpl(trylast=True) |
|
|
def pytest_configure(config: Config) -> None: |
|
|
config.pluginmanager.register(LoggingPlugin(config), "logging-plugin") |
|
|
|
|
|
|
|
|
class LoggingPlugin: |
|
|
"""Attaches to the logging module and captures log messages for each test.""" |
|
|
|
|
|
def __init__(self, config: Config) -> None: |
|
|
"""Create a new plugin to capture log messages. |
|
|
|
|
|
The formatter can be safely shared across all handlers so |
|
|
create a single one for the entire test session here. |
|
|
""" |
|
|
self._config = config |
|
|
|
|
|
|
|
|
self.formatter = self._create_formatter( |
|
|
get_option_ini(config, "log_format"), |
|
|
get_option_ini(config, "log_date_format"), |
|
|
get_option_ini(config, "log_auto_indent"), |
|
|
) |
|
|
self.log_level = get_log_level_for_setting(config, "log_level") |
|
|
self.caplog_handler = LogCaptureHandler() |
|
|
self.caplog_handler.setFormatter(self.formatter) |
|
|
self.report_handler = LogCaptureHandler() |
|
|
self.report_handler.setFormatter(self.formatter) |
|
|
|
|
|
|
|
|
self.log_file_level = get_log_level_for_setting( |
|
|
config, "log_file_level", "log_level" |
|
|
) |
|
|
log_file = get_option_ini(config, "log_file") or os.devnull |
|
|
if log_file != os.devnull: |
|
|
directory = os.path.dirname(os.path.abspath(log_file)) |
|
|
if not os.path.isdir(directory): |
|
|
os.makedirs(directory) |
|
|
|
|
|
self.log_file_mode = get_option_ini(config, "log_file_mode") or "w" |
|
|
self.log_file_handler = _FileHandler( |
|
|
log_file, mode=self.log_file_mode, encoding="UTF-8" |
|
|
) |
|
|
log_file_format = get_option_ini(config, "log_file_format", "log_format") |
|
|
log_file_date_format = get_option_ini( |
|
|
config, "log_file_date_format", "log_date_format" |
|
|
) |
|
|
|
|
|
log_file_formatter = DatetimeFormatter( |
|
|
log_file_format, datefmt=log_file_date_format |
|
|
) |
|
|
self.log_file_handler.setFormatter(log_file_formatter) |
|
|
|
|
|
|
|
|
self.log_cli_level = get_log_level_for_setting( |
|
|
config, "log_cli_level", "log_level" |
|
|
) |
|
|
if self._log_cli_enabled(): |
|
|
terminal_reporter = config.pluginmanager.get_plugin("terminalreporter") |
|
|
|
|
|
assert terminal_reporter is not None |
|
|
capture_manager = config.pluginmanager.get_plugin("capturemanager") |
|
|
|
|
|
self.log_cli_handler: ( |
|
|
_LiveLoggingStreamHandler | _LiveLoggingNullHandler |
|
|
) = _LiveLoggingStreamHandler(terminal_reporter, capture_manager) |
|
|
else: |
|
|
self.log_cli_handler = _LiveLoggingNullHandler() |
|
|
log_cli_formatter = self._create_formatter( |
|
|
get_option_ini(config, "log_cli_format", "log_format"), |
|
|
get_option_ini(config, "log_cli_date_format", "log_date_format"), |
|
|
get_option_ini(config, "log_auto_indent"), |
|
|
) |
|
|
self.log_cli_handler.setFormatter(log_cli_formatter) |
|
|
self._disable_loggers(loggers_to_disable=config.option.logger_disable) |
|
|
|
|
|
def _disable_loggers(self, loggers_to_disable: list[str]) -> None: |
|
|
if not loggers_to_disable: |
|
|
return |
|
|
|
|
|
for name in loggers_to_disable: |
|
|
logger = logging.getLogger(name) |
|
|
logger.disabled = True |
|
|
|
|
|
def _create_formatter(self, log_format, log_date_format, auto_indent): |
|
|
|
|
|
color = getattr(self._config.option, "color", "no") |
|
|
if color != "no" and ColoredLevelFormatter.LEVELNAME_FMT_REGEX.search( |
|
|
log_format |
|
|
): |
|
|
formatter: logging.Formatter = ColoredLevelFormatter( |
|
|
create_terminal_writer(self._config), log_format, log_date_format |
|
|
) |
|
|
else: |
|
|
formatter = DatetimeFormatter(log_format, log_date_format) |
|
|
|
|
|
formatter._style = PercentStyleMultiline( |
|
|
formatter._style._fmt, auto_indent=auto_indent |
|
|
) |
|
|
|
|
|
return formatter |
|
|
|
|
|
def set_log_path(self, fname: str) -> None: |
|
|
"""Set the filename parameter for Logging.FileHandler(). |
|
|
|
|
|
Creates parent directory if it does not exist. |
|
|
|
|
|
.. warning:: |
|
|
This is an experimental API. |
|
|
""" |
|
|
fpath = Path(fname) |
|
|
|
|
|
if not fpath.is_absolute(): |
|
|
fpath = self._config.rootpath / fpath |
|
|
|
|
|
if not fpath.parent.exists(): |
|
|
fpath.parent.mkdir(exist_ok=True, parents=True) |
|
|
|
|
|
|
|
|
stream: io.TextIOWrapper = fpath.open(mode=self.log_file_mode, encoding="UTF-8") |
|
|
old_stream = self.log_file_handler.setStream(stream) |
|
|
if old_stream: |
|
|
old_stream.close() |
|
|
|
|
|
def _log_cli_enabled(self) -> bool: |
|
|
"""Return whether live logging is enabled.""" |
|
|
enabled = self._config.getoption( |
|
|
"--log-cli-level" |
|
|
) is not None or self._config.getini("log_cli") |
|
|
if not enabled: |
|
|
return False |
|
|
|
|
|
terminal_reporter = self._config.pluginmanager.get_plugin("terminalreporter") |
|
|
if terminal_reporter is None: |
|
|
|
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
@hookimpl(wrapper=True, tryfirst=True) |
|
|
def pytest_sessionstart(self) -> Generator[None]: |
|
|
self.log_cli_handler.set_when("sessionstart") |
|
|
|
|
|
with catching_logs(self.log_cli_handler, level=self.log_cli_level): |
|
|
with catching_logs(self.log_file_handler, level=self.log_file_level): |
|
|
return (yield) |
|
|
|
|
|
@hookimpl(wrapper=True, tryfirst=True) |
|
|
def pytest_collection(self) -> Generator[None]: |
|
|
self.log_cli_handler.set_when("collection") |
|
|
|
|
|
with catching_logs(self.log_cli_handler, level=self.log_cli_level): |
|
|
with catching_logs(self.log_file_handler, level=self.log_file_level): |
|
|
return (yield) |
|
|
|
|
|
@hookimpl(wrapper=True) |
|
|
def pytest_runtestloop(self, session: Session) -> Generator[None, object, object]: |
|
|
if session.config.option.collectonly: |
|
|
return (yield) |
|
|
|
|
|
if self._log_cli_enabled() and self._config.get_verbosity() < 1: |
|
|
|
|
|
self._config.option.verbose = 1 |
|
|
|
|
|
with catching_logs(self.log_cli_handler, level=self.log_cli_level): |
|
|
with catching_logs(self.log_file_handler, level=self.log_file_level): |
|
|
return (yield) |
|
|
|
|
|
@hookimpl |
|
|
def pytest_runtest_logstart(self) -> None: |
|
|
self.log_cli_handler.reset() |
|
|
self.log_cli_handler.set_when("start") |
|
|
|
|
|
@hookimpl |
|
|
def pytest_runtest_logreport(self) -> None: |
|
|
self.log_cli_handler.set_when("logreport") |
|
|
|
|
|
def _runtest_for(self, item: nodes.Item, when: str) -> Generator[None]: |
|
|
"""Implement the internals of the pytest_runtest_xxx() hooks.""" |
|
|
with catching_logs( |
|
|
self.caplog_handler, |
|
|
level=self.log_level, |
|
|
) as caplog_handler, catching_logs( |
|
|
self.report_handler, |
|
|
level=self.log_level, |
|
|
) as report_handler: |
|
|
caplog_handler.reset() |
|
|
report_handler.reset() |
|
|
item.stash[caplog_records_key][when] = caplog_handler.records |
|
|
item.stash[caplog_handler_key] = caplog_handler |
|
|
|
|
|
try: |
|
|
yield |
|
|
finally: |
|
|
log = report_handler.stream.getvalue().strip() |
|
|
item.add_report_section(when, "log", log) |
|
|
|
|
|
@hookimpl(wrapper=True) |
|
|
def pytest_runtest_setup(self, item: nodes.Item) -> Generator[None]: |
|
|
self.log_cli_handler.set_when("setup") |
|
|
|
|
|
empty: dict[str, list[logging.LogRecord]] = {} |
|
|
item.stash[caplog_records_key] = empty |
|
|
yield from self._runtest_for(item, "setup") |
|
|
|
|
|
@hookimpl(wrapper=True) |
|
|
def pytest_runtest_call(self, item: nodes.Item) -> Generator[None]: |
|
|
self.log_cli_handler.set_when("call") |
|
|
|
|
|
yield from self._runtest_for(item, "call") |
|
|
|
|
|
@hookimpl(wrapper=True) |
|
|
def pytest_runtest_teardown(self, item: nodes.Item) -> Generator[None]: |
|
|
self.log_cli_handler.set_when("teardown") |
|
|
|
|
|
try: |
|
|
yield from self._runtest_for(item, "teardown") |
|
|
finally: |
|
|
del item.stash[caplog_records_key] |
|
|
del item.stash[caplog_handler_key] |
|
|
|
|
|
@hookimpl |
|
|
def pytest_runtest_logfinish(self) -> None: |
|
|
self.log_cli_handler.set_when("finish") |
|
|
|
|
|
@hookimpl(wrapper=True, tryfirst=True) |
|
|
def pytest_sessionfinish(self) -> Generator[None]: |
|
|
self.log_cli_handler.set_when("sessionfinish") |
|
|
|
|
|
with catching_logs(self.log_cli_handler, level=self.log_cli_level): |
|
|
with catching_logs(self.log_file_handler, level=self.log_file_level): |
|
|
return (yield) |
|
|
|
|
|
@hookimpl |
|
|
def pytest_unconfigure(self) -> None: |
|
|
|
|
|
|
|
|
self.log_file_handler.close() |
|
|
|
|
|
|
|
|
class _FileHandler(logging.FileHandler): |
|
|
"""A logging FileHandler with pytest tweaks.""" |
|
|
|
|
|
def handleError(self, record: logging.LogRecord) -> None: |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
class _LiveLoggingStreamHandler(logging_StreamHandler): |
|
|
"""A logging StreamHandler used by the live logging feature: it will |
|
|
write a newline before the first log message in each test. |
|
|
|
|
|
During live logging we must also explicitly disable stdout/stderr |
|
|
capturing otherwise it will get captured and won't appear in the |
|
|
terminal. |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
stream: TerminalReporter = None |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
terminal_reporter: TerminalReporter, |
|
|
capture_manager: CaptureManager | None, |
|
|
) -> None: |
|
|
super().__init__(stream=terminal_reporter) |
|
|
self.capture_manager = capture_manager |
|
|
self.reset() |
|
|
self.set_when(None) |
|
|
self._test_outcome_written = False |
|
|
|
|
|
def reset(self) -> None: |
|
|
"""Reset the handler; should be called before the start of each test.""" |
|
|
self._first_record_emitted = False |
|
|
|
|
|
def set_when(self, when: str | None) -> None: |
|
|
"""Prepare for the given test phase (setup/call/teardown).""" |
|
|
self._when = when |
|
|
self._section_name_shown = False |
|
|
if when == "start": |
|
|
self._test_outcome_written = False |
|
|
|
|
|
def emit(self, record: logging.LogRecord) -> None: |
|
|
ctx_manager = ( |
|
|
self.capture_manager.global_and_fixture_disabled() |
|
|
if self.capture_manager |
|
|
else nullcontext() |
|
|
) |
|
|
with ctx_manager: |
|
|
if not self._first_record_emitted: |
|
|
self.stream.write("\n") |
|
|
self._first_record_emitted = True |
|
|
elif self._when in ("teardown", "finish"): |
|
|
if not self._test_outcome_written: |
|
|
self._test_outcome_written = True |
|
|
self.stream.write("\n") |
|
|
if not self._section_name_shown and self._when: |
|
|
self.stream.section("live log " + self._when, sep="-", bold=True) |
|
|
self._section_name_shown = True |
|
|
super().emit(record) |
|
|
|
|
|
def handleError(self, record: logging.LogRecord) -> None: |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
class _LiveLoggingNullHandler(logging.NullHandler): |
|
|
"""A logging handler used when live logging is disabled.""" |
|
|
|
|
|
def reset(self) -> None: |
|
|
pass |
|
|
|
|
|
def set_when(self, when: str) -> None: |
|
|
pass |
|
|
|
|
|
def handleError(self, record: logging.LogRecord) -> None: |
|
|
|
|
|
pass |
|
|
|