| | |
| | """Access and control log capturing.""" |
| |
|
| | from __future__ import annotations |
| |
|
| | from contextlib import contextmanager |
| | from contextlib import nullcontext |
| | from datetime import datetime |
| | from datetime import timedelta |
| | from datetime import timezone |
| | import io |
| | from io import StringIO |
| | import logging |
| | from logging import LogRecord |
| | import os |
| | from pathlib import Path |
| | import re |
| | from types import TracebackType |
| | from typing import AbstractSet |
| | from typing import Dict |
| | from typing import final |
| | from typing import Generator |
| | from typing import Generic |
| | from typing import List |
| | from typing import Literal |
| | from typing import Mapping |
| | from typing import TYPE_CHECKING |
| | from typing import TypeVar |
| |
|
| | from _pytest import nodes |
| | from _pytest._io import TerminalWriter |
| | from _pytest.capture import CaptureManager |
| | from _pytest.config import _strtobool |
| | from _pytest.config import Config |
| | from _pytest.config import create_terminal_writer |
| | from _pytest.config import hookimpl |
| | from _pytest.config import UsageError |
| | from _pytest.config.argparsing import Parser |
| | from _pytest.deprecated import check_ispytest |
| | from _pytest.fixtures import fixture |
| | from _pytest.fixtures import FixtureRequest |
| | from _pytest.main import Session |
| | from _pytest.stash import StashKey |
| | from _pytest.terminal import TerminalReporter |
| |
|
| |
|
| | if TYPE_CHECKING: |
| | logging_StreamHandler = logging.StreamHandler[StringIO] |
| | else: |
| | logging_StreamHandler = logging.StreamHandler |
| |
|
| | DEFAULT_LOG_FORMAT = "%(levelname)-8s %(name)s:%(filename)s:%(lineno)d %(message)s" |
| | DEFAULT_LOG_DATE_FORMAT = "%H:%M:%S" |
| | _ANSI_ESCAPE_SEQ = re.compile(r"\x1b\[[\d;]+m") |
| | caplog_handler_key = StashKey["LogCaptureHandler"]() |
| | caplog_records_key = StashKey[Dict[str, List[logging.LogRecord]]]() |
| |
|
| |
|
| | def _remove_ansi_escape_sequences(text: str) -> str: |
| | return _ANSI_ESCAPE_SEQ.sub("", text) |
| |
|
| |
|
| | class DatetimeFormatter(logging.Formatter): |
| | """A logging formatter which formats record with |
| | :func:`datetime.datetime.strftime` formatter instead of |
| | :func:`time.strftime` in case of microseconds in format string. |
| | """ |
| |
|
| | def formatTime(self, record: LogRecord, datefmt: str | None = None) -> str: |
| | if datefmt and "%f" in datefmt: |
| | ct = self.converter(record.created) |
| | tz = timezone(timedelta(seconds=ct.tm_gmtoff), ct.tm_zone) |
| | |
| | |
| | |
| | dt = datetime(*ct[0:6], microsecond=int(record.msecs * 1000), tzinfo=tz) |
| | return dt.strftime(datefmt) |
| | |
| | return super().formatTime(record, datefmt) |
| |
|
| |
|
| | class ColoredLevelFormatter(DatetimeFormatter): |
| | """A logging formatter which colorizes the %(levelname)..s part of the |
| | log format passed to __init__.""" |
| |
|
| | LOGLEVEL_COLOROPTS: Mapping[int, AbstractSet[str]] = { |
| | logging.CRITICAL: {"red"}, |
| | logging.ERROR: {"red", "bold"}, |
| | logging.WARNING: {"yellow"}, |
| | logging.WARN: {"yellow"}, |
| | logging.INFO: {"green"}, |
| | logging.DEBUG: {"purple"}, |
| | logging.NOTSET: set(), |
| | } |
| | LEVELNAME_FMT_REGEX = re.compile(r"%\(levelname\)([+-.]?\d*(?:\.\d+)?s)") |
| |
|
| | def __init__(self, terminalwriter: TerminalWriter, *args, **kwargs) -> None: |
| | super().__init__(*args, **kwargs) |
| | self._terminalwriter = terminalwriter |
| | self._original_fmt = self._style._fmt |
| | self._level_to_fmt_mapping: dict[int, str] = {} |
| |
|
| | for level, color_opts in self.LOGLEVEL_COLOROPTS.items(): |
| | self.add_color_level(level, *color_opts) |
| |
|
| | def add_color_level(self, level: int, *color_opts: str) -> None: |
| | """Add or update color opts for a log level. |
| | |
| | :param level: |
| | Log level to apply a style to, e.g. ``logging.INFO``. |
| | :param color_opts: |
| | ANSI escape sequence color options. Capitalized colors indicates |
| | background color, i.e. ``'green', 'Yellow', 'bold'`` will give bold |
| | green text on yellow background. |
| | |
| | .. warning:: |
| | This is an experimental API. |
| | """ |
| | assert self._fmt is not None |
| | levelname_fmt_match = self.LEVELNAME_FMT_REGEX.search(self._fmt) |
| | if not levelname_fmt_match: |
| | return |
| | levelname_fmt = levelname_fmt_match.group() |
| |
|
| | formatted_levelname = levelname_fmt % {"levelname": logging.getLevelName(level)} |
| |
|
| | |
| | color_kwargs = {name: True for name in color_opts} |
| | colorized_formatted_levelname = self._terminalwriter.markup( |
| | formatted_levelname, **color_kwargs |
| | ) |
| | self._level_to_fmt_mapping[level] = self.LEVELNAME_FMT_REGEX.sub( |
| | colorized_formatted_levelname, self._fmt |
| | ) |
| |
|
| | def format(self, record: logging.LogRecord) -> str: |
| | fmt = self._level_to_fmt_mapping.get(record.levelno, self._original_fmt) |
| | self._style._fmt = fmt |
| | return super().format(record) |
| |
|
| |
|
| | class PercentStyleMultiline(logging.PercentStyle): |
| | """A logging style with special support for multiline messages. |
| | |
| | If the message of a record consists of multiple lines, this style |
| | formats the message as if each line were logged separately. |
| | """ |
| |
|
| | def __init__(self, fmt: str, auto_indent: int | str | bool | None) -> None: |
| | super().__init__(fmt) |
| | self._auto_indent = self._get_auto_indent(auto_indent) |
| |
|
| | @staticmethod |
| | def _get_auto_indent(auto_indent_option: int | str | bool | None) -> int: |
| | """Determine the current auto indentation setting. |
| | |
| | Specify auto indent behavior (on/off/fixed) by passing in |
| | extra={"auto_indent": [value]} to the call to logging.log() or |
| | using a --log-auto-indent [value] command line or the |
| | log_auto_indent [value] config option. |
| | |
| | Default behavior is auto-indent off. |
| | |
| | Using the string "True" or "on" or the boolean True as the value |
| | turns auto indent on, using the string "False" or "off" or the |
| | boolean False or the int 0 turns it off, and specifying a |
| | positive integer fixes the indentation position to the value |
| | specified. |
| | |
| | Any other values for the option are invalid, and will silently be |
| | converted to the default. |
| | |
| | :param None|bool|int|str auto_indent_option: |
| | User specified option for indentation from command line, config |
| | or extra kwarg. Accepts int, bool or str. str option accepts the |
| | same range of values as boolean config options, as well as |
| | positive integers represented in str form. |
| | |
| | :returns: |
| | Indentation value, which can be |
| | -1 (automatically determine indentation) or |
| | 0 (auto-indent turned off) or |
| | >0 (explicitly set indentation position). |
| | """ |
| | if auto_indent_option is None: |
| | return 0 |
| | elif isinstance(auto_indent_option, bool): |
| | if auto_indent_option: |
| | return -1 |
| | else: |
| | return 0 |
| | elif isinstance(auto_indent_option, int): |
| | return int(auto_indent_option) |
| | elif isinstance(auto_indent_option, str): |
| | try: |
| | return int(auto_indent_option) |
| | except ValueError: |
| | pass |
| | try: |
| | if _strtobool(auto_indent_option): |
| | return -1 |
| | except ValueError: |
| | return 0 |
| |
|
| | return 0 |
| |
|
| | def format(self, record: logging.LogRecord) -> str: |
| | if "\n" in record.message: |
| | if hasattr(record, "auto_indent"): |
| | |
| | auto_indent = self._get_auto_indent(record.auto_indent) |
| | else: |
| | auto_indent = self._auto_indent |
| |
|
| | if auto_indent: |
| | lines = record.message.splitlines() |
| | formatted = self._fmt % {**record.__dict__, "message": lines[0]} |
| |
|
| | if auto_indent < 0: |
| | indentation = _remove_ansi_escape_sequences(formatted).find( |
| | lines[0] |
| | ) |
| | else: |
| | |
| | indentation = auto_indent |
| | lines[0] = formatted |
| | return ("\n" + " " * indentation).join(lines) |
| | return self._fmt % record.__dict__ |
| |
|
| |
|
| | def get_option_ini(config: Config, *names: str): |
| | for name in names: |
| | ret = config.getoption(name) |
| | if ret is None: |
| | ret = config.getini(name) |
| | if ret: |
| | return ret |
| |
|
| |
|
| | def pytest_addoption(parser: Parser) -> None: |
| | """Add options to control log capturing.""" |
| | group = parser.getgroup("logging") |
| |
|
| | def add_option_ini(option, dest, default=None, type=None, **kwargs): |
| | parser.addini( |
| | dest, default=default, type=type, help="Default value for " + option |
| | ) |
| | group.addoption(option, dest=dest, **kwargs) |
| |
|
| | add_option_ini( |
| | "--log-level", |
| | dest="log_level", |
| | default=None, |
| | metavar="LEVEL", |
| | help=( |
| | "Level of messages to catch/display." |
| | " Not set by default, so it depends on the root/parent log handler's" |
| | ' effective level, where it is "WARNING" by default.' |
| | ), |
| | ) |
| | add_option_ini( |
| | "--log-format", |
| | dest="log_format", |
| | default=DEFAULT_LOG_FORMAT, |
| | help="Log format used by the logging module", |
| | ) |
| | add_option_ini( |
| | "--log-date-format", |
| | dest="log_date_format", |
| | default=DEFAULT_LOG_DATE_FORMAT, |
| | help="Log date format used by the logging module", |
| | ) |
| | parser.addini( |
| | "log_cli", |
| | default=False, |
| | type="bool", |
| | help='Enable log display during test run (also known as "live logging")', |
| | ) |
| | add_option_ini( |
| | "--log-cli-level", dest="log_cli_level", default=None, help="CLI logging level" |
| | ) |
| | add_option_ini( |
| | "--log-cli-format", |
| | dest="log_cli_format", |
| | default=None, |
| | help="Log format used by the logging module", |
| | ) |
| | add_option_ini( |
| | "--log-cli-date-format", |
| | dest="log_cli_date_format", |
| | default=None, |
| | help="Log date format used by the logging module", |
| | ) |
| | add_option_ini( |
| | "--log-file", |
| | dest="log_file", |
| | default=None, |
| | help="Path to a file when logging will be written to", |
| | ) |
| | add_option_ini( |
| | "--log-file-mode", |
| | dest="log_file_mode", |
| | default="w", |
| | choices=["w", "a"], |
| | help="Log file open mode", |
| | ) |
| | add_option_ini( |
| | "--log-file-level", |
| | dest="log_file_level", |
| | default=None, |
| | help="Log file logging level", |
| | ) |
| | add_option_ini( |
| | "--log-file-format", |
| | dest="log_file_format", |
| | default=None, |
| | help="Log format used by the logging module", |
| | ) |
| | add_option_ini( |
| | "--log-file-date-format", |
| | dest="log_file_date_format", |
| | default=None, |
| | help="Log date format used by the logging module", |
| | ) |
| | add_option_ini( |
| | "--log-auto-indent", |
| | dest="log_auto_indent", |
| | default=None, |
| | help="Auto-indent multiline messages passed to the logging module. Accepts true|on, false|off or an integer.", |
| | ) |
| | group.addoption( |
| | "--log-disable", |
| | action="append", |
| | default=[], |
| | dest="logger_disable", |
| | help="Disable a logger by name. Can be passed multiple times.", |
| | ) |
| |
|
| |
|
| | _HandlerType = TypeVar("_HandlerType", bound=logging.Handler) |
| |
|
| |
|
| | |
| | class catching_logs(Generic[_HandlerType]): |
| | """Context manager that prepares the whole logging machinery properly.""" |
| |
|
| | __slots__ = ("handler", "level", "orig_level") |
| |
|
| | def __init__(self, handler: _HandlerType, level: int | None = None) -> None: |
| | self.handler = handler |
| | self.level = level |
| |
|
| | def __enter__(self) -> _HandlerType: |
| | root_logger = logging.getLogger() |
| | if self.level is not None: |
| | self.handler.setLevel(self.level) |
| | root_logger.addHandler(self.handler) |
| | if self.level is not None: |
| | self.orig_level = root_logger.level |
| | root_logger.setLevel(min(self.orig_level, self.level)) |
| | return self.handler |
| |
|
| | def __exit__( |
| | self, |
| | exc_type: type[BaseException] | None, |
| | exc_val: BaseException | None, |
| | exc_tb: TracebackType | None, |
| | ) -> None: |
| | root_logger = logging.getLogger() |
| | if self.level is not None: |
| | root_logger.setLevel(self.orig_level) |
| | root_logger.removeHandler(self.handler) |
| |
|
| |
|
| | class LogCaptureHandler(logging_StreamHandler): |
| | """A logging handler that stores log records and the log text.""" |
| |
|
| | def __init__(self) -> None: |
| | """Create a new log handler.""" |
| | super().__init__(StringIO()) |
| | self.records: list[logging.LogRecord] = [] |
| |
|
| | def emit(self, record: logging.LogRecord) -> None: |
| | """Keep the log records in a list in addition to the log text.""" |
| | self.records.append(record) |
| | super().emit(record) |
| |
|
| | def reset(self) -> None: |
| | self.records = [] |
| | self.stream = StringIO() |
| |
|
| | def clear(self) -> None: |
| | self.records.clear() |
| | self.stream = StringIO() |
| |
|
| | def handleError(self, record: logging.LogRecord) -> None: |
| | if logging.raiseExceptions: |
| | |
| | |
| | |
| | |
| | raise |
| |
|
| |
|
| | @final |
| | class LogCaptureFixture: |
| | """Provides access and control of log capturing.""" |
| |
|
| | def __init__(self, item: nodes.Node, *, _ispytest: bool = False) -> None: |
| | check_ispytest(_ispytest) |
| | self._item = item |
| | self._initial_handler_level: int | None = None |
| | |
| | self._initial_logger_levels: dict[str | None, int] = {} |
| | self._initial_disabled_logging_level: int | None = None |
| |
|
| | def _finalize(self) -> None: |
| | """Finalize the fixture. |
| | |
| | This restores the log levels and the disabled logging levels changed by :meth:`set_level`. |
| | """ |
| | |
| | if self._initial_handler_level is not None: |
| | self.handler.setLevel(self._initial_handler_level) |
| | for logger_name, level in self._initial_logger_levels.items(): |
| | logger = logging.getLogger(logger_name) |
| | logger.setLevel(level) |
| | |
| | if self._initial_disabled_logging_level is not None: |
| | logging.disable(self._initial_disabled_logging_level) |
| | self._initial_disabled_logging_level = None |
| |
|
| | @property |
| | def handler(self) -> LogCaptureHandler: |
| | """Get the logging handler used by the fixture.""" |
| | return self._item.stash[caplog_handler_key] |
| |
|
| | def get_records( |
| | self, when: Literal["setup", "call", "teardown"] |
| | ) -> list[logging.LogRecord]: |
| | """Get the logging records for one of the possible test phases. |
| | |
| | :param when: |
| | Which test phase to obtain the records from. |
| | Valid values are: "setup", "call" and "teardown". |
| | |
| | :returns: The list of captured records at the given stage. |
| | |
| | .. versionadded:: 3.4 |
| | """ |
| | return self._item.stash[caplog_records_key].get(when, []) |
| |
|
| | @property |
| | def text(self) -> str: |
| | """The formatted log text.""" |
| | return _remove_ansi_escape_sequences(self.handler.stream.getvalue()) |
| |
|
| | @property |
| | def records(self) -> list[logging.LogRecord]: |
| | """The list of log records.""" |
| | return self.handler.records |
| |
|
| | @property |
| | def record_tuples(self) -> list[tuple[str, int, str]]: |
| | """A list of a stripped down version of log records intended |
| | for use in assertion comparison. |
| | |
| | The format of the tuple is: |
| | |
| | (logger_name, log_level, message) |
| | """ |
| | return [(r.name, r.levelno, r.getMessage()) for r in self.records] |
| |
|
| | @property |
| | def messages(self) -> list[str]: |
| | """A list of format-interpolated log messages. |
| | |
| | Unlike 'records', which contains the format string and parameters for |
| | interpolation, log messages in this list are all interpolated. |
| | |
| | Unlike 'text', which contains the output from the handler, log |
| | messages in this list are unadorned with levels, timestamps, etc, |
| | making exact comparisons more reliable. |
| | |
| | Note that traceback or stack info (from :func:`logging.exception` or |
| | the `exc_info` or `stack_info` arguments to the logging functions) is |
| | not included, as this is added by the formatter in the handler. |
| | |
| | .. versionadded:: 3.7 |
| | """ |
| | return [r.getMessage() for r in self.records] |
| |
|
| | def clear(self) -> None: |
| | """Reset the list of log records and the captured log text.""" |
| | self.handler.clear() |
| |
|
| | def _force_enable_logging( |
| | self, level: int | str, logger_obj: logging.Logger |
| | ) -> int: |
| | """Enable the desired logging level if the global level was disabled via ``logging.disabled``. |
| | |
| | Only enables logging levels greater than or equal to the requested ``level``. |
| | |
| | Does nothing if the desired ``level`` wasn't disabled. |
| | |
| | :param level: |
| | The logger level caplog should capture. |
| | All logging is enabled if a non-standard logging level string is supplied. |
| | Valid level strings are in :data:`logging._nameToLevel`. |
| | :param logger_obj: The logger object to check. |
| | |
| | :return: The original disabled logging level. |
| | """ |
| | original_disable_level: int = logger_obj.manager.disable |
| |
|
| | if isinstance(level, str): |
| | |
| | level = logging.getLevelName(level) |
| |
|
| | if not isinstance(level, int): |
| | |
| | logging.disable(logging.NOTSET) |
| | elif not logger_obj.isEnabledFor(level): |
| | |
| | |
| | disable_level = max(level - 10, logging.NOTSET) |
| | logging.disable(disable_level) |
| |
|
| | return original_disable_level |
| |
|
| | def set_level(self, level: int | str, logger: str | None = None) -> None: |
| | """Set the threshold level of a logger for the duration of a test. |
| | |
| | Logging messages which are less severe than this level will not be captured. |
| | |
| | .. versionchanged:: 3.4 |
| | The levels of the loggers changed by this function will be |
| | restored to their initial values at the end of the test. |
| | |
| | Will enable the requested logging level if it was disabled via :func:`logging.disable`. |
| | |
| | :param level: The level. |
| | :param logger: The logger to update. If not given, the root logger. |
| | """ |
| | logger_obj = logging.getLogger(logger) |
| | |
| | self._initial_logger_levels.setdefault(logger, logger_obj.level) |
| | logger_obj.setLevel(level) |
| | if self._initial_handler_level is None: |
| | self._initial_handler_level = self.handler.level |
| | self.handler.setLevel(level) |
| | initial_disabled_logging_level = self._force_enable_logging(level, logger_obj) |
| | if self._initial_disabled_logging_level is None: |
| | self._initial_disabled_logging_level = initial_disabled_logging_level |
| |
|
| | @contextmanager |
| | def at_level(self, level: int | str, logger: str | None = None) -> Generator[None]: |
| | """Context manager that sets the level for capturing of logs. After |
| | the end of the 'with' statement the level is restored to its original |
| | value. |
| | |
| | Will enable the requested logging level if it was disabled via :func:`logging.disable`. |
| | |
| | :param level: The level. |
| | :param logger: The logger to update. If not given, the root logger. |
| | """ |
| | logger_obj = logging.getLogger(logger) |
| | orig_level = logger_obj.level |
| | logger_obj.setLevel(level) |
| | handler_orig_level = self.handler.level |
| | self.handler.setLevel(level) |
| | original_disable_level = self._force_enable_logging(level, logger_obj) |
| | try: |
| | yield |
| | finally: |
| | logger_obj.setLevel(orig_level) |
| | self.handler.setLevel(handler_orig_level) |
| | logging.disable(original_disable_level) |
| |
|
| | @contextmanager |
| | def filtering(self, filter_: logging.Filter) -> Generator[None]: |
| | """Context manager that temporarily adds the given filter to the caplog's |
| | :meth:`handler` for the 'with' statement block, and removes that filter at the |
| | end of the block. |
| | |
| | :param filter_: A custom :class:`logging.Filter` object. |
| | |
| | .. versionadded:: 7.5 |
| | """ |
| | self.handler.addFilter(filter_) |
| | try: |
| | yield |
| | finally: |
| | self.handler.removeFilter(filter_) |
| |
|
| |
|
| | @fixture |
| | def caplog(request: FixtureRequest) -> Generator[LogCaptureFixture]: |
| | """Access and control log capturing. |
| | |
| | Captured logs are available through the following properties/methods:: |
| | |
| | * caplog.messages -> list of format-interpolated log messages |
| | * caplog.text -> string containing formatted log output |
| | * caplog.records -> list of logging.LogRecord instances |
| | * caplog.record_tuples -> list of (logger_name, level, message) tuples |
| | * caplog.clear() -> clear captured records and formatted log output string |
| | """ |
| | result = LogCaptureFixture(request.node, _ispytest=True) |
| | yield result |
| | result._finalize() |
| |
|
| |
|
| | def get_log_level_for_setting(config: Config, *setting_names: str) -> int | None: |
| | for setting_name in setting_names: |
| | log_level = config.getoption(setting_name) |
| | if log_level is None: |
| | log_level = config.getini(setting_name) |
| | if log_level: |
| | break |
| | else: |
| | return None |
| |
|
| | if isinstance(log_level, str): |
| | log_level = log_level.upper() |
| | try: |
| | return int(getattr(logging, log_level, log_level)) |
| | except ValueError as e: |
| | |
| | raise UsageError( |
| | f"'{log_level}' is not recognized as a logging level name for " |
| | f"'{setting_name}'. Please consider passing the " |
| | "logging level num instead." |
| | ) from e |
| |
|
| |
|
| | |
| | @hookimpl(trylast=True) |
| | def pytest_configure(config: Config) -> None: |
| | config.pluginmanager.register(LoggingPlugin(config), "logging-plugin") |
| |
|
| |
|
| | class LoggingPlugin: |
| | """Attaches to the logging module and captures log messages for each test.""" |
| |
|
| | def __init__(self, config: Config) -> None: |
| | """Create a new plugin to capture log messages. |
| | |
| | The formatter can be safely shared across all handlers so |
| | create a single one for the entire test session here. |
| | """ |
| | self._config = config |
| |
|
| | |
| | self.formatter = self._create_formatter( |
| | get_option_ini(config, "log_format"), |
| | get_option_ini(config, "log_date_format"), |
| | get_option_ini(config, "log_auto_indent"), |
| | ) |
| | self.log_level = get_log_level_for_setting(config, "log_level") |
| | self.caplog_handler = LogCaptureHandler() |
| | self.caplog_handler.setFormatter(self.formatter) |
| | self.report_handler = LogCaptureHandler() |
| | self.report_handler.setFormatter(self.formatter) |
| |
|
| | |
| | self.log_file_level = get_log_level_for_setting( |
| | config, "log_file_level", "log_level" |
| | ) |
| | log_file = get_option_ini(config, "log_file") or os.devnull |
| | if log_file != os.devnull: |
| | directory = os.path.dirname(os.path.abspath(log_file)) |
| | if not os.path.isdir(directory): |
| | os.makedirs(directory) |
| |
|
| | self.log_file_mode = get_option_ini(config, "log_file_mode") or "w" |
| | self.log_file_handler = _FileHandler( |
| | log_file, mode=self.log_file_mode, encoding="UTF-8" |
| | ) |
| | log_file_format = get_option_ini(config, "log_file_format", "log_format") |
| | log_file_date_format = get_option_ini( |
| | config, "log_file_date_format", "log_date_format" |
| | ) |
| |
|
| | log_file_formatter = DatetimeFormatter( |
| | log_file_format, datefmt=log_file_date_format |
| | ) |
| | self.log_file_handler.setFormatter(log_file_formatter) |
| |
|
| | |
| | self.log_cli_level = get_log_level_for_setting( |
| | config, "log_cli_level", "log_level" |
| | ) |
| | if self._log_cli_enabled(): |
| | terminal_reporter = config.pluginmanager.get_plugin("terminalreporter") |
| | |
| | assert terminal_reporter is not None |
| | capture_manager = config.pluginmanager.get_plugin("capturemanager") |
| | |
| | self.log_cli_handler: ( |
| | _LiveLoggingStreamHandler | _LiveLoggingNullHandler |
| | ) = _LiveLoggingStreamHandler(terminal_reporter, capture_manager) |
| | else: |
| | self.log_cli_handler = _LiveLoggingNullHandler() |
| | log_cli_formatter = self._create_formatter( |
| | get_option_ini(config, "log_cli_format", "log_format"), |
| | get_option_ini(config, "log_cli_date_format", "log_date_format"), |
| | get_option_ini(config, "log_auto_indent"), |
| | ) |
| | self.log_cli_handler.setFormatter(log_cli_formatter) |
| | self._disable_loggers(loggers_to_disable=config.option.logger_disable) |
| |
|
| | def _disable_loggers(self, loggers_to_disable: list[str]) -> None: |
| | if not loggers_to_disable: |
| | return |
| |
|
| | for name in loggers_to_disable: |
| | logger = logging.getLogger(name) |
| | logger.disabled = True |
| |
|
| | def _create_formatter(self, log_format, log_date_format, auto_indent): |
| | |
| | color = getattr(self._config.option, "color", "no") |
| | if color != "no" and ColoredLevelFormatter.LEVELNAME_FMT_REGEX.search( |
| | log_format |
| | ): |
| | formatter: logging.Formatter = ColoredLevelFormatter( |
| | create_terminal_writer(self._config), log_format, log_date_format |
| | ) |
| | else: |
| | formatter = DatetimeFormatter(log_format, log_date_format) |
| |
|
| | formatter._style = PercentStyleMultiline( |
| | formatter._style._fmt, auto_indent=auto_indent |
| | ) |
| |
|
| | return formatter |
| |
|
| | def set_log_path(self, fname: str) -> None: |
| | """Set the filename parameter for Logging.FileHandler(). |
| | |
| | Creates parent directory if it does not exist. |
| | |
| | .. warning:: |
| | This is an experimental API. |
| | """ |
| | fpath = Path(fname) |
| |
|
| | if not fpath.is_absolute(): |
| | fpath = self._config.rootpath / fpath |
| |
|
| | if not fpath.parent.exists(): |
| | fpath.parent.mkdir(exist_ok=True, parents=True) |
| |
|
| | |
| | stream: io.TextIOWrapper = fpath.open(mode=self.log_file_mode, encoding="UTF-8") |
| | old_stream = self.log_file_handler.setStream(stream) |
| | if old_stream: |
| | old_stream.close() |
| |
|
| | def _log_cli_enabled(self) -> bool: |
| | """Return whether live logging is enabled.""" |
| | enabled = self._config.getoption( |
| | "--log-cli-level" |
| | ) is not None or self._config.getini("log_cli") |
| | if not enabled: |
| | return False |
| |
|
| | terminal_reporter = self._config.pluginmanager.get_plugin("terminalreporter") |
| | if terminal_reporter is None: |
| | |
| | return False |
| |
|
| | return True |
| |
|
| | @hookimpl(wrapper=True, tryfirst=True) |
| | def pytest_sessionstart(self) -> Generator[None]: |
| | self.log_cli_handler.set_when("sessionstart") |
| |
|
| | with catching_logs(self.log_cli_handler, level=self.log_cli_level): |
| | with catching_logs(self.log_file_handler, level=self.log_file_level): |
| | return (yield) |
| |
|
| | @hookimpl(wrapper=True, tryfirst=True) |
| | def pytest_collection(self) -> Generator[None]: |
| | self.log_cli_handler.set_when("collection") |
| |
|
| | with catching_logs(self.log_cli_handler, level=self.log_cli_level): |
| | with catching_logs(self.log_file_handler, level=self.log_file_level): |
| | return (yield) |
| |
|
| | @hookimpl(wrapper=True) |
| | def pytest_runtestloop(self, session: Session) -> Generator[None, object, object]: |
| | if session.config.option.collectonly: |
| | return (yield) |
| |
|
| | if self._log_cli_enabled() and self._config.get_verbosity() < 1: |
| | |
| | self._config.option.verbose = 1 |
| |
|
| | with catching_logs(self.log_cli_handler, level=self.log_cli_level): |
| | with catching_logs(self.log_file_handler, level=self.log_file_level): |
| | return (yield) |
| |
|
| | @hookimpl |
| | def pytest_runtest_logstart(self) -> None: |
| | self.log_cli_handler.reset() |
| | self.log_cli_handler.set_when("start") |
| |
|
| | @hookimpl |
| | def pytest_runtest_logreport(self) -> None: |
| | self.log_cli_handler.set_when("logreport") |
| |
|
| | def _runtest_for(self, item: nodes.Item, when: str) -> Generator[None]: |
| | """Implement the internals of the pytest_runtest_xxx() hooks.""" |
| | with catching_logs( |
| | self.caplog_handler, |
| | level=self.log_level, |
| | ) as caplog_handler, catching_logs( |
| | self.report_handler, |
| | level=self.log_level, |
| | ) as report_handler: |
| | caplog_handler.reset() |
| | report_handler.reset() |
| | item.stash[caplog_records_key][when] = caplog_handler.records |
| | item.stash[caplog_handler_key] = caplog_handler |
| |
|
| | try: |
| | yield |
| | finally: |
| | log = report_handler.stream.getvalue().strip() |
| | item.add_report_section(when, "log", log) |
| |
|
| | @hookimpl(wrapper=True) |
| | def pytest_runtest_setup(self, item: nodes.Item) -> Generator[None]: |
| | self.log_cli_handler.set_when("setup") |
| |
|
| | empty: dict[str, list[logging.LogRecord]] = {} |
| | item.stash[caplog_records_key] = empty |
| | yield from self._runtest_for(item, "setup") |
| |
|
| | @hookimpl(wrapper=True) |
| | def pytest_runtest_call(self, item: nodes.Item) -> Generator[None]: |
| | self.log_cli_handler.set_when("call") |
| |
|
| | yield from self._runtest_for(item, "call") |
| |
|
| | @hookimpl(wrapper=True) |
| | def pytest_runtest_teardown(self, item: nodes.Item) -> Generator[None]: |
| | self.log_cli_handler.set_when("teardown") |
| |
|
| | try: |
| | yield from self._runtest_for(item, "teardown") |
| | finally: |
| | del item.stash[caplog_records_key] |
| | del item.stash[caplog_handler_key] |
| |
|
| | @hookimpl |
| | def pytest_runtest_logfinish(self) -> None: |
| | self.log_cli_handler.set_when("finish") |
| |
|
| | @hookimpl(wrapper=True, tryfirst=True) |
| | def pytest_sessionfinish(self) -> Generator[None]: |
| | self.log_cli_handler.set_when("sessionfinish") |
| |
|
| | with catching_logs(self.log_cli_handler, level=self.log_cli_level): |
| | with catching_logs(self.log_file_handler, level=self.log_file_level): |
| | return (yield) |
| |
|
| | @hookimpl |
| | def pytest_unconfigure(self) -> None: |
| | |
| | |
| | self.log_file_handler.close() |
| |
|
| |
|
| | class _FileHandler(logging.FileHandler): |
| | """A logging FileHandler with pytest tweaks.""" |
| |
|
| | def handleError(self, record: logging.LogRecord) -> None: |
| | |
| | pass |
| |
|
| |
|
| | class _LiveLoggingStreamHandler(logging_StreamHandler): |
| | """A logging StreamHandler used by the live logging feature: it will |
| | write a newline before the first log message in each test. |
| | |
| | During live logging we must also explicitly disable stdout/stderr |
| | capturing otherwise it will get captured and won't appear in the |
| | terminal. |
| | """ |
| |
|
| | |
| | |
| | stream: TerminalReporter = None |
| |
|
| | def __init__( |
| | self, |
| | terminal_reporter: TerminalReporter, |
| | capture_manager: CaptureManager | None, |
| | ) -> None: |
| | super().__init__(stream=terminal_reporter) |
| | self.capture_manager = capture_manager |
| | self.reset() |
| | self.set_when(None) |
| | self._test_outcome_written = False |
| |
|
| | def reset(self) -> None: |
| | """Reset the handler; should be called before the start of each test.""" |
| | self._first_record_emitted = False |
| |
|
| | def set_when(self, when: str | None) -> None: |
| | """Prepare for the given test phase (setup/call/teardown).""" |
| | self._when = when |
| | self._section_name_shown = False |
| | if when == "start": |
| | self._test_outcome_written = False |
| |
|
| | def emit(self, record: logging.LogRecord) -> None: |
| | ctx_manager = ( |
| | self.capture_manager.global_and_fixture_disabled() |
| | if self.capture_manager |
| | else nullcontext() |
| | ) |
| | with ctx_manager: |
| | if not self._first_record_emitted: |
| | self.stream.write("\n") |
| | self._first_record_emitted = True |
| | elif self._when in ("teardown", "finish"): |
| | if not self._test_outcome_written: |
| | self._test_outcome_written = True |
| | self.stream.write("\n") |
| | if not self._section_name_shown and self._when: |
| | self.stream.section("live log " + self._when, sep="-", bold=True) |
| | self._section_name_shown = True |
| | super().emit(record) |
| |
|
| | def handleError(self, record: logging.LogRecord) -> None: |
| | |
| | pass |
| |
|
| |
|
| | class _LiveLoggingNullHandler(logging.NullHandler): |
| | """A logging handler used when live logging is disabled.""" |
| |
|
| | def reset(self) -> None: |
| | pass |
| |
|
| | def set_when(self, when: str) -> None: |
| | pass |
| |
|
| | def handleError(self, record: logging.LogRecord) -> None: |
| | |
| | pass |
| |
|