| | """ |
| | class Ruler |
| | |
| | Helper class, used by [[MarkdownIt#core]], [[MarkdownIt#block]] and |
| | [[MarkdownIt#inline]] to manage sequences of functions (rules): |
| | |
| | - keep rules in defined order |
| | - assign the name to each rule |
| | - enable/disable rules |
| | - add/replace rules |
| | - allow assign rules to additional named chains (in the same) |
| | - caching lists of active rules |
| | |
| | You will not need use this class directly until write plugins. For simple |
| | rules control use [[MarkdownIt.disable]], [[MarkdownIt.enable]] and |
| | [[MarkdownIt.use]]. |
| | """ |
| | from __future__ import annotations |
| |
|
| | from collections.abc import Iterable |
| | from dataclasses import dataclass, field |
| | from typing import TYPE_CHECKING, Generic, TypedDict, TypeVar |
| | import warnings |
| |
|
| | from markdown_it._compat import DATACLASS_KWARGS |
| |
|
| | from .utils import EnvType |
| |
|
| | if TYPE_CHECKING: |
| | from markdown_it import MarkdownIt |
| |
|
| |
|
| | class StateBase: |
| | def __init__(self, src: str, md: MarkdownIt, env: EnvType): |
| | self.src = src |
| | self.env = env |
| | self.md = md |
| |
|
| | @property |
| | def src(self) -> str: |
| | return self._src |
| |
|
| | @src.setter |
| | def src(self, value: str) -> None: |
| | self._src = value |
| | self._srcCharCode: tuple[int, ...] | None = None |
| |
|
| | @property |
| | def srcCharCode(self) -> tuple[int, ...]: |
| | warnings.warn( |
| | "StateBase.srcCharCode is deprecated. Use StateBase.src instead.", |
| | DeprecationWarning, |
| | stacklevel=2, |
| | ) |
| | if self._srcCharCode is None: |
| | self._srcCharCode = tuple(ord(c) for c in self._src) |
| | return self._srcCharCode |
| |
|
| |
|
| | class RuleOptionsType(TypedDict, total=False): |
| | alt: list[str] |
| |
|
| |
|
| | RuleFuncTv = TypeVar("RuleFuncTv") |
| | """A rule function, whose signature is dependent on the state type.""" |
| |
|
| |
|
| | @dataclass(**DATACLASS_KWARGS) |
| | class Rule(Generic[RuleFuncTv]): |
| | name: str |
| | enabled: bool |
| | fn: RuleFuncTv = field(repr=False) |
| | alt: list[str] |
| |
|
| |
|
| | class Ruler(Generic[RuleFuncTv]): |
| | def __init__(self) -> None: |
| | |
| | self.__rules__: list[Rule[RuleFuncTv]] = [] |
| | |
| | |
| | |
| | self.__cache__: dict[str, list[RuleFuncTv]] | None = None |
| |
|
| | def __find__(self, name: str) -> int: |
| | """Find rule index by name""" |
| | for i, rule in enumerate(self.__rules__): |
| | if rule.name == name: |
| | return i |
| | return -1 |
| |
|
| | def __compile__(self) -> None: |
| | """Build rules lookup cache""" |
| | chains = {""} |
| | |
| | for rule in self.__rules__: |
| | if not rule.enabled: |
| | continue |
| | for name in rule.alt: |
| | chains.add(name) |
| | self.__cache__ = {} |
| | for chain in chains: |
| | self.__cache__[chain] = [] |
| | for rule in self.__rules__: |
| | if not rule.enabled: |
| | continue |
| | if chain and (chain not in rule.alt): |
| | continue |
| | self.__cache__[chain].append(rule.fn) |
| |
|
| | def at( |
| | self, ruleName: str, fn: RuleFuncTv, options: RuleOptionsType | None = None |
| | ) -> None: |
| | """Replace rule by name with new function & options. |
| | |
| | :param ruleName: rule name to replace. |
| | :param fn: new rule function. |
| | :param options: new rule options (not mandatory). |
| | :raises: KeyError if name not found |
| | """ |
| | index = self.__find__(ruleName) |
| | options = options or {} |
| | if index == -1: |
| | raise KeyError(f"Parser rule not found: {ruleName}") |
| | self.__rules__[index].fn = fn |
| | self.__rules__[index].alt = options.get("alt", []) |
| | self.__cache__ = None |
| |
|
| | def before( |
| | self, |
| | beforeName: str, |
| | ruleName: str, |
| | fn: RuleFuncTv, |
| | options: RuleOptionsType | None = None, |
| | ) -> None: |
| | """Add new rule to chain before one with given name. |
| | |
| | :param beforeName: new rule will be added before this one. |
| | :param ruleName: new rule will be added before this one. |
| | :param fn: new rule function. |
| | :param options: new rule options (not mandatory). |
| | :raises: KeyError if name not found |
| | """ |
| | index = self.__find__(beforeName) |
| | options = options or {} |
| | if index == -1: |
| | raise KeyError(f"Parser rule not found: {beforeName}") |
| | self.__rules__.insert( |
| | index, Rule[RuleFuncTv](ruleName, True, fn, options.get("alt", [])) |
| | ) |
| | self.__cache__ = None |
| |
|
| | def after( |
| | self, |
| | afterName: str, |
| | ruleName: str, |
| | fn: RuleFuncTv, |
| | options: RuleOptionsType | None = None, |
| | ) -> None: |
| | """Add new rule to chain after one with given name. |
| | |
| | :param afterName: new rule will be added after this one. |
| | :param ruleName: new rule will be added after this one. |
| | :param fn: new rule function. |
| | :param options: new rule options (not mandatory). |
| | :raises: KeyError if name not found |
| | """ |
| | index = self.__find__(afterName) |
| | options = options or {} |
| | if index == -1: |
| | raise KeyError(f"Parser rule not found: {afterName}") |
| | self.__rules__.insert( |
| | index + 1, Rule[RuleFuncTv](ruleName, True, fn, options.get("alt", [])) |
| | ) |
| | self.__cache__ = None |
| |
|
| | def push( |
| | self, ruleName: str, fn: RuleFuncTv, options: RuleOptionsType | None = None |
| | ) -> None: |
| | """Push new rule to the end of chain. |
| | |
| | :param ruleName: new rule will be added to the end of chain. |
| | :param fn: new rule function. |
| | :param options: new rule options (not mandatory). |
| | |
| | """ |
| | self.__rules__.append( |
| | Rule[RuleFuncTv](ruleName, True, fn, (options or {}).get("alt", [])) |
| | ) |
| | self.__cache__ = None |
| |
|
| | def enable( |
| | self, names: str | Iterable[str], ignoreInvalid: bool = False |
| | ) -> list[str]: |
| | """Enable rules with given names. |
| | |
| | :param names: name or list of rule names to enable. |
| | :param ignoreInvalid: ignore errors when rule not found |
| | :raises: KeyError if name not found and not ignoreInvalid |
| | :return: list of found rule names |
| | """ |
| | if isinstance(names, str): |
| | names = [names] |
| | result: list[str] = [] |
| | for name in names: |
| | idx = self.__find__(name) |
| | if (idx < 0) and ignoreInvalid: |
| | continue |
| | if (idx < 0) and not ignoreInvalid: |
| | raise KeyError(f"Rules manager: invalid rule name {name}") |
| | self.__rules__[idx].enabled = True |
| | result.append(name) |
| | self.__cache__ = None |
| | return result |
| |
|
| | def enableOnly( |
| | self, names: str | Iterable[str], ignoreInvalid: bool = False |
| | ) -> list[str]: |
| | """Enable rules with given names, and disable everything else. |
| | |
| | :param names: name or list of rule names to enable. |
| | :param ignoreInvalid: ignore errors when rule not found |
| | :raises: KeyError if name not found and not ignoreInvalid |
| | :return: list of found rule names |
| | """ |
| | if isinstance(names, str): |
| | names = [names] |
| | for rule in self.__rules__: |
| | rule.enabled = False |
| | return self.enable(names, ignoreInvalid) |
| |
|
| | def disable( |
| | self, names: str | Iterable[str], ignoreInvalid: bool = False |
| | ) -> list[str]: |
| | """Disable rules with given names. |
| | |
| | :param names: name or list of rule names to enable. |
| | :param ignoreInvalid: ignore errors when rule not found |
| | :raises: KeyError if name not found and not ignoreInvalid |
| | :return: list of found rule names |
| | """ |
| | if isinstance(names, str): |
| | names = [names] |
| | result = [] |
| | for name in names: |
| | idx = self.__find__(name) |
| | if (idx < 0) and ignoreInvalid: |
| | continue |
| | if (idx < 0) and not ignoreInvalid: |
| | raise KeyError(f"Rules manager: invalid rule name {name}") |
| | self.__rules__[idx].enabled = False |
| | result.append(name) |
| | self.__cache__ = None |
| | return result |
| |
|
| | def getRules(self, chainName: str = "") -> list[RuleFuncTv]: |
| | """Return array of active functions (rules) for given chain name. |
| | It analyzes rules configuration, compiles caches if not exists and returns result. |
| | |
| | Default chain name is `''` (empty string). It can't be skipped. |
| | That's done intentionally, to keep signature monomorphic for high speed. |
| | |
| | """ |
| | if self.__cache__ is None: |
| | self.__compile__() |
| | assert self.__cache__ is not None |
| | |
| | return self.__cache__.get(chainName, []) or [] |
| |
|
| | def get_all_rules(self) -> list[str]: |
| | """Return all available rule names.""" |
| | return [r.name for r in self.__rules__] |
| |
|
| | def get_active_rules(self) -> list[str]: |
| | """Return the active rule names.""" |
| | return [r.name for r in self.__rules__ if r.enabled] |
| |
|