microbe-model / src /microbe_model /features /kegg_modules.py
Miyu Horiuchi
Deploy app from main@a3254bf (no paper/ binaries)
0ed74db
"""KEGG module rule parser + per-genome completeness scorer.
A KEGG module DEFINITION is a boolean rule over KO IDs:
K00844 (K01810,K06859,K13810) K00850 (K01623,K11645)
Grammar (we implement the common subset that covers ~all metabolic modules):
- whitespace between tokens → AND
- commas inside parens → OR
- parens → grouping
- "+", "-" appear in some modules → treated as AND/optional (we ignore "-")
Completeness convention: we evaluate the rule with the per-genome KO set
substituted as 1/0, where AND = product, OR = max. The result is a 0.0-1.0
fractional score: 1.0 = pathway complete, 0.0 = no genes present, intermediate
values reflect partial coverage of the AND chain.
Example:
rule = "K00001 (K00002,K00003) K00004"
ko_set = {"K00001", "K00003"} # has step 1, has alt for step 2, missing step 3
completeness = 1 * max(0,1) * 0 # = 0 (chain broken at step 3)
For partial-credit grading, AND is replaced by a *fraction-present* aggregator
(`fractional=True` mode): the score becomes the average of the step scores,
weighted equally. This is what most KEGG completeness tools report.
"""
from __future__ import annotations
import re
from dataclasses import dataclass
# Tokenizer: KO IDs, parens, commas, plus, minus, whitespace.
TOKEN_RE = re.compile(r"K\d{5}|[()+,-]|\s+")
@dataclass
class Node:
"""AST node. type: 'KO' | 'AND' | 'OR'."""
type: str
ko: str | None = None
children: list["Node"] | None = None
def tokenize(rule: str) -> list[str]:
out: list[str] = []
for m in TOKEN_RE.finditer(rule):
tok = m.group(0)
if tok.isspace():
out.append(" ")
else:
out.append(tok)
# Collapse runs of spaces
cleaned: list[str] = []
for tok in out:
if tok == " ":
if cleaned and cleaned[-1] != " ":
cleaned.append(" ")
else:
cleaned.append(tok)
if cleaned and cleaned[0] == " ":
cleaned = cleaned[1:]
if cleaned and cleaned[-1] == " ":
cleaned = cleaned[:-1]
return cleaned
class _Parser:
"""Recursive-descent parser over the token stream."""
def __init__(self, tokens: list[str]) -> None:
self.toks = tokens
self.i = 0
def peek(self) -> str | None:
return self.toks[self.i] if self.i < len(self.toks) else None
def consume(self) -> str:
tok = self.toks[self.i]
self.i += 1
return tok
def parse(self) -> Node:
return self._and_seq(end_tokens={None, ")"})
def _and_seq(self, end_tokens: set[str | None]) -> Node:
children: list[Node] = []
while self.peek() not in end_tokens:
tok = self.peek()
if tok == " ":
self.consume()
continue
if tok == "+" or tok == "-": # treat as AND-like glue
self.consume()
continue
children.append(self._or_seq())
if len(children) == 1:
return children[0]
return Node(type="AND", children=children)
def _or_seq(self) -> Node:
# An OR-sequence is a comma-separated list of factors *only inside parens*.
# At the top level, commas don't appear (KEGG normalizes that).
first = self._factor()
children: list[Node] = [first]
while self.peek() == ",":
self.consume()
children.append(self._factor())
if len(children) == 1:
return first
return Node(type="OR", children=children)
def _factor(self) -> Node:
tok = self.peek()
if tok == "(":
self.consume()
inner = self._and_seq(end_tokens={")"})
if self.peek() == ")":
self.consume()
return inner
if tok and tok.startswith("K") and len(tok) == 6 and tok[1:].isdigit():
self.consume()
return Node(type="KO", ko=tok)
# Skip stray tokens defensively
if tok is not None:
self.consume()
return Node(type="AND", children=[])
def parse_definition(definition: str) -> Node:
return _Parser(tokenize(definition)).parse()
def evaluate(node: Node, ko_set: set[str], fractional: bool = True) -> float:
"""Return 0.0-1.0 completeness for this AST under the given ko_set.
fractional=True → AND = mean of children (KEGG-style partial credit)
fractional=False → AND = min of children (strict; pathway must be intact)
"""
if node.type == "KO":
return 1.0 if node.ko in ko_set else 0.0
if not node.children:
return 0.0
scores = [evaluate(c, ko_set, fractional) for c in node.children]
if node.type == "AND":
return float(sum(scores) / len(scores)) if fractional else float(min(scores))
if node.type == "OR":
return float(max(scores))
return 0.0
def module_completeness(definition: str, ko_set: set[str], fractional: bool = True) -> float:
return evaluate(parse_definition(definition), ko_set, fractional=fractional)