File size: 5,110 Bytes
0ed74db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""KEGG module rule parser + per-genome completeness scorer.

A KEGG module DEFINITION is a boolean rule over KO IDs:

    K00844 (K01810,K06859,K13810) K00850 (K01623,K11645)

Grammar (we implement the common subset that covers ~all metabolic modules):
  - whitespace between tokens   → AND
  - commas inside parens        → OR
  - parens                      → grouping
  - "+", "-" appear in some modules → treated as AND/optional (we ignore "-")

Completeness convention: we evaluate the rule with the per-genome KO set
substituted as 1/0, where AND = product, OR = max. The result is a 0.0-1.0
fractional score: 1.0 = pathway complete, 0.0 = no genes present, intermediate
values reflect partial coverage of the AND chain.

Example:
    rule = "K00001 (K00002,K00003) K00004"
    ko_set = {"K00001", "K00003"}      # has step 1, has alt for step 2, missing step 3
    completeness = 1 * max(0,1) * 0     # = 0  (chain broken at step 3)

For partial-credit grading, AND is replaced by a *fraction-present* aggregator
(`fractional=True` mode): the score becomes the average of the step scores,
weighted equally. This is what most KEGG completeness tools report.
"""
from __future__ import annotations

import re
from dataclasses import dataclass

# Tokenizer: KO IDs, parens, commas, plus, minus, whitespace.
TOKEN_RE = re.compile(r"K\d{5}|[()+,-]|\s+")


@dataclass
class Node:
    """AST node. type: 'KO' | 'AND' | 'OR'."""
    type: str
    ko: str | None = None
    children: list["Node"] | None = None


def tokenize(rule: str) -> list[str]:
    out: list[str] = []
    for m in TOKEN_RE.finditer(rule):
        tok = m.group(0)
        if tok.isspace():
            out.append(" ")
        else:
            out.append(tok)
    # Collapse runs of spaces
    cleaned: list[str] = []
    for tok in out:
        if tok == " ":
            if cleaned and cleaned[-1] != " ":
                cleaned.append(" ")
        else:
            cleaned.append(tok)
    if cleaned and cleaned[0] == " ":
        cleaned = cleaned[1:]
    if cleaned and cleaned[-1] == " ":
        cleaned = cleaned[:-1]
    return cleaned


class _Parser:
    """Recursive-descent parser over the token stream."""

    def __init__(self, tokens: list[str]) -> None:
        self.toks = tokens
        self.i = 0

    def peek(self) -> str | None:
        return self.toks[self.i] if self.i < len(self.toks) else None

    def consume(self) -> str:
        tok = self.toks[self.i]
        self.i += 1
        return tok

    def parse(self) -> Node:
        return self._and_seq(end_tokens={None, ")"})

    def _and_seq(self, end_tokens: set[str | None]) -> Node:
        children: list[Node] = []
        while self.peek() not in end_tokens:
            tok = self.peek()
            if tok == " ":
                self.consume()
                continue
            if tok == "+" or tok == "-":  # treat as AND-like glue
                self.consume()
                continue
            children.append(self._or_seq())
        if len(children) == 1:
            return children[0]
        return Node(type="AND", children=children)

    def _or_seq(self) -> Node:
        # An OR-sequence is a comma-separated list of factors *only inside parens*.
        # At the top level, commas don't appear (KEGG normalizes that).
        first = self._factor()
        children: list[Node] = [first]
        while self.peek() == ",":
            self.consume()
            children.append(self._factor())
        if len(children) == 1:
            return first
        return Node(type="OR", children=children)

    def _factor(self) -> Node:
        tok = self.peek()
        if tok == "(":
            self.consume()
            inner = self._and_seq(end_tokens={")"})
            if self.peek() == ")":
                self.consume()
            return inner
        if tok and tok.startswith("K") and len(tok) == 6 and tok[1:].isdigit():
            self.consume()
            return Node(type="KO", ko=tok)
        # Skip stray tokens defensively
        if tok is not None:
            self.consume()
        return Node(type="AND", children=[])


def parse_definition(definition: str) -> Node:
    return _Parser(tokenize(definition)).parse()


def evaluate(node: Node, ko_set: set[str], fractional: bool = True) -> float:
    """Return 0.0-1.0 completeness for this AST under the given ko_set.

    fractional=True  → AND = mean of children   (KEGG-style partial credit)
    fractional=False → AND = min of children    (strict; pathway must be intact)
    """
    if node.type == "KO":
        return 1.0 if node.ko in ko_set else 0.0
    if not node.children:
        return 0.0
    scores = [evaluate(c, ko_set, fractional) for c in node.children]
    if node.type == "AND":
        return float(sum(scores) / len(scores)) if fractional else float(min(scores))
    if node.type == "OR":
        return float(max(scores))
    return 0.0


def module_completeness(definition: str, ko_set: set[str], fractional: bool = True) -> float:
    return evaluate(parse_definition(definition), ko_set, fractional=fractional)