| """ |
| class Ruler |
| |
| Helper class, used by [[MarkdownIt#core]], [[MarkdownIt#block]] and |
| [[MarkdownIt#inline]] to manage sequences of functions (rules): |
| |
| - keep rules in defined order |
| - assign the name to each rule |
| - enable/disable rules |
| - add/replace rules |
| - allow assign rules to additional named chains (in the same) |
| - caching lists of active rules |
| |
| You will not need use this class directly until write plugins. For simple |
| rules control use [[MarkdownIt.disable]], [[MarkdownIt.enable]] and |
| [[MarkdownIt.use]]. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from collections.abc import Iterable |
| from dataclasses import dataclass, field |
| from typing import TYPE_CHECKING, Generic, TypedDict, TypeVar |
| import warnings |
|
|
| from .utils import EnvType |
|
|
| if TYPE_CHECKING: |
| from markdown_it import MarkdownIt |
|
|
|
|
| class StateBase: |
| def __init__(self, src: str, md: MarkdownIt, env: EnvType): |
| self.src = src |
| self.env = env |
| self.md = md |
|
|
| @property |
| def src(self) -> str: |
| return self._src |
|
|
| @src.setter |
| def src(self, value: str) -> None: |
| self._src = value |
| self._srcCharCode: tuple[int, ...] | None = None |
|
|
| @property |
| def srcCharCode(self) -> tuple[int, ...]: |
| warnings.warn( |
| "StateBase.srcCharCode is deprecated. Use StateBase.src instead.", |
| DeprecationWarning, |
| stacklevel=2, |
| ) |
| if self._srcCharCode is None: |
| self._srcCharCode = tuple(ord(c) for c in self._src) |
| return self._srcCharCode |
|
|
|
|
| class RuleOptionsType(TypedDict, total=False): |
| alt: list[str] |
|
|
|
|
| RuleFuncTv = TypeVar("RuleFuncTv") |
| """A rule function, whose signature is dependent on the state type.""" |
|
|
|
|
| @dataclass(slots=True) |
| class Rule(Generic[RuleFuncTv]): |
| name: str |
| enabled: bool |
| fn: RuleFuncTv = field(repr=False) |
| alt: list[str] |
|
|
|
|
| class Ruler(Generic[RuleFuncTv]): |
| def __init__(self) -> None: |
| |
| self.__rules__: list[Rule[RuleFuncTv]] = [] |
| |
| |
| |
| self.__cache__: dict[str, list[RuleFuncTv]] | None = None |
|
|
| def __find__(self, name: str) -> int: |
| """Find rule index by name""" |
| for i, rule in enumerate(self.__rules__): |
| if rule.name == name: |
| return i |
| return -1 |
|
|
| def __compile__(self) -> None: |
| """Build rules lookup cache""" |
| chains = {""} |
| |
| for rule in self.__rules__: |
| if not rule.enabled: |
| continue |
| for name in rule.alt: |
| chains.add(name) |
| self.__cache__ = {} |
| for chain in chains: |
| self.__cache__[chain] = [] |
| for rule in self.__rules__: |
| if not rule.enabled: |
| continue |
| if chain and (chain not in rule.alt): |
| continue |
| self.__cache__[chain].append(rule.fn) |
|
|
| def at( |
| self, ruleName: str, fn: RuleFuncTv, options: RuleOptionsType | None = None |
| ) -> None: |
| """Replace rule by name with new function & options. |
| |
| :param ruleName: rule name to replace. |
| :param fn: new rule function. |
| :param options: new rule options (not mandatory). |
| :raises: KeyError if name not found |
| """ |
| index = self.__find__(ruleName) |
| options = options or {} |
| if index == -1: |
| raise KeyError(f"Parser rule not found: {ruleName}") |
| self.__rules__[index].fn = fn |
| self.__rules__[index].alt = options.get("alt", []) |
| self.__cache__ = None |
|
|
| def before( |
| self, |
| beforeName: str, |
| ruleName: str, |
| fn: RuleFuncTv, |
| options: RuleOptionsType | None = None, |
| ) -> None: |
| """Add new rule to chain before one with given name. |
| |
| :param beforeName: new rule will be added before this one. |
| :param ruleName: new rule will be added before this one. |
| :param fn: new rule function. |
| :param options: new rule options (not mandatory). |
| :raises: KeyError if name not found |
| """ |
| index = self.__find__(beforeName) |
| options = options or {} |
| if index == -1: |
| raise KeyError(f"Parser rule not found: {beforeName}") |
| self.__rules__.insert( |
| index, Rule[RuleFuncTv](ruleName, True, fn, options.get("alt", [])) |
| ) |
| self.__cache__ = None |
|
|
| def after( |
| self, |
| afterName: str, |
| ruleName: str, |
| fn: RuleFuncTv, |
| options: RuleOptionsType | None = None, |
| ) -> None: |
| """Add new rule to chain after one with given name. |
| |
| :param afterName: new rule will be added after this one. |
| :param ruleName: new rule will be added after this one. |
| :param fn: new rule function. |
| :param options: new rule options (not mandatory). |
| :raises: KeyError if name not found |
| """ |
| index = self.__find__(afterName) |
| options = options or {} |
| if index == -1: |
| raise KeyError(f"Parser rule not found: {afterName}") |
| self.__rules__.insert( |
| index + 1, Rule[RuleFuncTv](ruleName, True, fn, options.get("alt", [])) |
| ) |
| self.__cache__ = None |
|
|
| def push( |
| self, ruleName: str, fn: RuleFuncTv, options: RuleOptionsType | None = None |
| ) -> None: |
| """Push new rule to the end of chain. |
| |
| :param ruleName: new rule will be added to the end of chain. |
| :param fn: new rule function. |
| :param options: new rule options (not mandatory). |
| |
| """ |
| self.__rules__.append( |
| Rule[RuleFuncTv](ruleName, True, fn, (options or {}).get("alt", [])) |
| ) |
| self.__cache__ = None |
|
|
| def enable( |
| self, names: str | Iterable[str], ignoreInvalid: bool = False |
| ) -> list[str]: |
| """Enable rules with given names. |
| |
| :param names: name or list of rule names to enable. |
| :param ignoreInvalid: ignore errors when rule not found |
| :raises: KeyError if name not found and not ignoreInvalid |
| :return: list of found rule names |
| """ |
| if isinstance(names, str): |
| names = [names] |
| result: list[str] = [] |
| for name in names: |
| idx = self.__find__(name) |
| if (idx < 0) and ignoreInvalid: |
| continue |
| if (idx < 0) and not ignoreInvalid: |
| raise KeyError(f"Rules manager: invalid rule name {name}") |
| self.__rules__[idx].enabled = True |
| result.append(name) |
| self.__cache__ = None |
| return result |
|
|
| def enableOnly( |
| self, names: str | Iterable[str], ignoreInvalid: bool = False |
| ) -> list[str]: |
| """Enable rules with given names, and disable everything else. |
| |
| :param names: name or list of rule names to enable. |
| :param ignoreInvalid: ignore errors when rule not found |
| :raises: KeyError if name not found and not ignoreInvalid |
| :return: list of found rule names |
| """ |
| if isinstance(names, str): |
| names = [names] |
| for rule in self.__rules__: |
| rule.enabled = False |
| return self.enable(names, ignoreInvalid) |
|
|
| def disable( |
| self, names: str | Iterable[str], ignoreInvalid: bool = False |
| ) -> list[str]: |
| """Disable rules with given names. |
| |
| :param names: name or list of rule names to enable. |
| :param ignoreInvalid: ignore errors when rule not found |
| :raises: KeyError if name not found and not ignoreInvalid |
| :return: list of found rule names |
| """ |
| if isinstance(names, str): |
| names = [names] |
| result = [] |
| for name in names: |
| idx = self.__find__(name) |
| if (idx < 0) and ignoreInvalid: |
| continue |
| if (idx < 0) and not ignoreInvalid: |
| raise KeyError(f"Rules manager: invalid rule name {name}") |
| self.__rules__[idx].enabled = False |
| result.append(name) |
| self.__cache__ = None |
| return result |
|
|
| def getRules(self, chainName: str = "") -> list[RuleFuncTv]: |
| """Return array of active functions (rules) for given chain name. |
| It analyzes rules configuration, compiles caches if not exists and returns result. |
| |
| Default chain name is `''` (empty string). It can't be skipped. |
| That's done intentionally, to keep signature monomorphic for high speed. |
| |
| """ |
| if self.__cache__ is None: |
| self.__compile__() |
| assert self.__cache__ is not None |
| |
| return self.__cache__.get(chainName, []) or [] |
|
|
| def get_all_rules(self) -> list[str]: |
| """Return all available rule names.""" |
| return [r.name for r in self.__rules__] |
|
|
| def get_active_rules(self) -> list[str]: |
| """Return the active rule names.""" |
| return [r.name for r in self.__rules__ if r.enabled] |
|
|