File size: 4,850 Bytes
d787a09
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""Process engine: load + validate flows, walk steps and branches.

Loads the YAML flow(s), validates against the schema, and answers the questions
the UI needs: what's the next step, what branches are available here, what does
this step require, and is everything referenced actually defined.
"""
from __future__ import annotations

from pathlib import Path
from typing import Dict, List, Optional

import yaml

from app.process.schema import Flow, Step

FLOWS_DIR = Path(__file__).resolve().parent / "flows"


class ProcessEngine:
    def __init__(self, flow: Flow) -> None:
        self.flow = flow
        self._steps: Dict[str, Step] = flow.step_map()
        self.validate()

    # ---- loading -----------------------------------------------------------
    @classmethod
    def from_yaml(cls, path: Optional[Path] = None) -> "ProcessEngine":
        path = path or (FLOWS_DIR / "il_divorce.yaml")
        with open(path, "r", encoding="utf-8") as fh:
            data = yaml.safe_load(fh)
        flow = Flow(**data)
        return cls(flow)

    # ---- validation --------------------------------------------------------
    def validate(self) -> None:
        ids = set(self._steps)
        if self.flow.start not in ids:
            raise ValueError("start step '%s' is not defined" % self.flow.start)
        for step in self.flow.steps:
            for tr in step.transitions:
                if tr.to not in ids:
                    raise ValueError(
                        "step '%s' transitions to unknown step '%s'"
                        % (step.id, tr.to)
                    )
            for ht in step.help_triggers:
                if ht.to not in ids:
                    raise ValueError(
                        "step '%s' help_trigger points to unknown step '%s'"
                        % (step.id, ht.to)
                    )

    # ---- queries -----------------------------------------------------------
    def get(self, step_id: str) -> Optional[Step]:
        return self._steps.get(step_id)

    @property
    def start(self) -> Step:
        return self._steps[self.flow.start]

    def main_line(self) -> List[Step]:
        return self.flow.main_line()

    def subflows(self) -> List[Step]:
        return self.flow.subflows()

    def next_steps(self, step_id: str) -> List[Step]:
        """All steps directly reachable from this one (default + conditional)."""
        step = self.get(step_id)
        if not step:
            return []
        out, seen = [], set()
        for tr in step.transitions:
            if tr.to not in seen and tr.to in self._steps:
                seen.add(tr.to)
                out.append(self._steps[tr.to])
        return out

    def branches(self, step_id: str) -> List[Step]:
        """Conditional next-steps + help-trigger sub-flows for this step."""
        step = self.get(step_id)
        if not step:
            return []
        out, seen = [], set()
        for tr in step.transitions:
            if tr.when and tr.to in self._steps and tr.to not in seen:
                seen.add(tr.to)
                out.append(self._steps[tr.to])
        for ht in step.help_triggers:
            if ht.to in self._steps and ht.to not in seen:
                seen.add(ht.to)
                out.append(self._steps[ht.to])
        return out

    def verify_items(self) -> List[Dict[str, str]]:
        """Every place flagged ``verify: true`` — the human-review checklist."""
        items: List[Dict[str, str]] = []
        for step in self.flow.steps:
            if step.verify:
                items.append(
                    {
                        "step": step.id,
                        "what": "Step-level details need verification",
                        "source": "; ".join(step.citations) or "(see step)",
                    }
                )
            for f in step.required_forms:
                if f.verify or f.form_id == "verify-against-source":
                    items.append(
                        {
                            "step": step.id,
                            "what": "Form: %s (ID/details)" % f.name,
                            "source": f.source,
                        }
                    )
            for d in step.deadlines:
                if d.verify:
                    items.append(
                        {
                            "step": step.id,
                            "what": "Deadline: %s" % d.description,
                            "source": d.citation or d.basis,
                        }
                    )
        return items


_ENGINE: Optional[ProcessEngine] = None


def get_engine() -> ProcessEngine:
    global _ENGINE
    if _ENGINE is None:
        _ENGINE = ProcessEngine.from_yaml()
    return _ENGINE


def reset_engine() -> None:
    global _ENGINE
    _ENGINE = None