Spaces:
Running
Running
| """KEGG module rule parser + per-genome completeness scorer. | |
| A KEGG module DEFINITION is a boolean rule over KO IDs: | |
| K00844 (K01810,K06859,K13810) K00850 (K01623,K11645) | |
| Grammar (we implement the common subset that covers ~all metabolic modules): | |
| - whitespace between tokens → AND | |
| - commas inside parens → OR | |
| - parens → grouping | |
| - "+", "-" appear in some modules → treated as AND/optional (we ignore "-") | |
| Completeness convention: we evaluate the rule with the per-genome KO set | |
| substituted as 1/0, where AND = product, OR = max. The result is a 0.0-1.0 | |
| fractional score: 1.0 = pathway complete, 0.0 = no genes present, intermediate | |
| values reflect partial coverage of the AND chain. | |
| Example: | |
| rule = "K00001 (K00002,K00003) K00004" | |
| ko_set = {"K00001", "K00003"} # has step 1, has alt for step 2, missing step 3 | |
| completeness = 1 * max(0,1) * 0 # = 0 (chain broken at step 3) | |
| For partial-credit grading, AND is replaced by a *fraction-present* aggregator | |
| (`fractional=True` mode): the score becomes the average of the step scores, | |
| weighted equally. This is what most KEGG completeness tools report. | |
| """ | |
| from __future__ import annotations | |
| import re | |
| from dataclasses import dataclass | |
| # Tokenizer: KO IDs, parens, commas, plus, minus, whitespace. | |
| TOKEN_RE = re.compile(r"K\d{5}|[()+,-]|\s+") | |
| class Node: | |
| """AST node. type: 'KO' | 'AND' | 'OR'.""" | |
| type: str | |
| ko: str | None = None | |
| children: list["Node"] | None = None | |
| def tokenize(rule: str) -> list[str]: | |
| out: list[str] = [] | |
| for m in TOKEN_RE.finditer(rule): | |
| tok = m.group(0) | |
| if tok.isspace(): | |
| out.append(" ") | |
| else: | |
| out.append(tok) | |
| # Collapse runs of spaces | |
| cleaned: list[str] = [] | |
| for tok in out: | |
| if tok == " ": | |
| if cleaned and cleaned[-1] != " ": | |
| cleaned.append(" ") | |
| else: | |
| cleaned.append(tok) | |
| if cleaned and cleaned[0] == " ": | |
| cleaned = cleaned[1:] | |
| if cleaned and cleaned[-1] == " ": | |
| cleaned = cleaned[:-1] | |
| return cleaned | |
| class _Parser: | |
| """Recursive-descent parser over the token stream.""" | |
| def __init__(self, tokens: list[str]) -> None: | |
| self.toks = tokens | |
| self.i = 0 | |
| def peek(self) -> str | None: | |
| return self.toks[self.i] if self.i < len(self.toks) else None | |
| def consume(self) -> str: | |
| tok = self.toks[self.i] | |
| self.i += 1 | |
| return tok | |
| def parse(self) -> Node: | |
| return self._and_seq(end_tokens={None, ")"}) | |
| def _and_seq(self, end_tokens: set[str | None]) -> Node: | |
| children: list[Node] = [] | |
| while self.peek() not in end_tokens: | |
| tok = self.peek() | |
| if tok == " ": | |
| self.consume() | |
| continue | |
| if tok == "+" or tok == "-": # treat as AND-like glue | |
| self.consume() | |
| continue | |
| children.append(self._or_seq()) | |
| if len(children) == 1: | |
| return children[0] | |
| return Node(type="AND", children=children) | |
| def _or_seq(self) -> Node: | |
| # An OR-sequence is a comma-separated list of factors *only inside parens*. | |
| # At the top level, commas don't appear (KEGG normalizes that). | |
| first = self._factor() | |
| children: list[Node] = [first] | |
| while self.peek() == ",": | |
| self.consume() | |
| children.append(self._factor()) | |
| if len(children) == 1: | |
| return first | |
| return Node(type="OR", children=children) | |
| def _factor(self) -> Node: | |
| tok = self.peek() | |
| if tok == "(": | |
| self.consume() | |
| inner = self._and_seq(end_tokens={")"}) | |
| if self.peek() == ")": | |
| self.consume() | |
| return inner | |
| if tok and tok.startswith("K") and len(tok) == 6 and tok[1:].isdigit(): | |
| self.consume() | |
| return Node(type="KO", ko=tok) | |
| # Skip stray tokens defensively | |
| if tok is not None: | |
| self.consume() | |
| return Node(type="AND", children=[]) | |
| def parse_definition(definition: str) -> Node: | |
| return _Parser(tokenize(definition)).parse() | |
| def evaluate(node: Node, ko_set: set[str], fractional: bool = True) -> float: | |
| """Return 0.0-1.0 completeness for this AST under the given ko_set. | |
| fractional=True → AND = mean of children (KEGG-style partial credit) | |
| fractional=False → AND = min of children (strict; pathway must be intact) | |
| """ | |
| if node.type == "KO": | |
| return 1.0 if node.ko in ko_set else 0.0 | |
| if not node.children: | |
| return 0.0 | |
| scores = [evaluate(c, ko_set, fractional) for c in node.children] | |
| if node.type == "AND": | |
| return float(sum(scores) / len(scores)) if fractional else float(min(scores)) | |
| if node.type == "OR": | |
| return float(max(scores)) | |
| return 0.0 | |
| def module_completeness(definition: str, ko_set: set[str], fractional: bool = True) -> float: | |
| return evaluate(parse_definition(definition), ko_set, fractional=fractional) | |