File size: 4,850 Bytes
d787a09 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | """Process engine: load + validate flows, walk steps and branches.
Loads the YAML flow(s), validates against the schema, and answers the questions
the UI needs: what's the next step, what branches are available here, what does
this step require, and is everything referenced actually defined.
"""
from __future__ import annotations
from pathlib import Path
from typing import Dict, List, Optional
import yaml
from app.process.schema import Flow, Step
FLOWS_DIR = Path(__file__).resolve().parent / "flows"
class ProcessEngine:
def __init__(self, flow: Flow) -> None:
self.flow = flow
self._steps: Dict[str, Step] = flow.step_map()
self.validate()
# ---- loading -----------------------------------------------------------
@classmethod
def from_yaml(cls, path: Optional[Path] = None) -> "ProcessEngine":
path = path or (FLOWS_DIR / "il_divorce.yaml")
with open(path, "r", encoding="utf-8") as fh:
data = yaml.safe_load(fh)
flow = Flow(**data)
return cls(flow)
# ---- validation --------------------------------------------------------
def validate(self) -> None:
ids = set(self._steps)
if self.flow.start not in ids:
raise ValueError("start step '%s' is not defined" % self.flow.start)
for step in self.flow.steps:
for tr in step.transitions:
if tr.to not in ids:
raise ValueError(
"step '%s' transitions to unknown step '%s'"
% (step.id, tr.to)
)
for ht in step.help_triggers:
if ht.to not in ids:
raise ValueError(
"step '%s' help_trigger points to unknown step '%s'"
% (step.id, ht.to)
)
# ---- queries -----------------------------------------------------------
def get(self, step_id: str) -> Optional[Step]:
return self._steps.get(step_id)
@property
def start(self) -> Step:
return self._steps[self.flow.start]
def main_line(self) -> List[Step]:
return self.flow.main_line()
def subflows(self) -> List[Step]:
return self.flow.subflows()
def next_steps(self, step_id: str) -> List[Step]:
"""All steps directly reachable from this one (default + conditional)."""
step = self.get(step_id)
if not step:
return []
out, seen = [], set()
for tr in step.transitions:
if tr.to not in seen and tr.to in self._steps:
seen.add(tr.to)
out.append(self._steps[tr.to])
return out
def branches(self, step_id: str) -> List[Step]:
"""Conditional next-steps + help-trigger sub-flows for this step."""
step = self.get(step_id)
if not step:
return []
out, seen = [], set()
for tr in step.transitions:
if tr.when and tr.to in self._steps and tr.to not in seen:
seen.add(tr.to)
out.append(self._steps[tr.to])
for ht in step.help_triggers:
if ht.to in self._steps and ht.to not in seen:
seen.add(ht.to)
out.append(self._steps[ht.to])
return out
def verify_items(self) -> List[Dict[str, str]]:
"""Every place flagged ``verify: true`` — the human-review checklist."""
items: List[Dict[str, str]] = []
for step in self.flow.steps:
if step.verify:
items.append(
{
"step": step.id,
"what": "Step-level details need verification",
"source": "; ".join(step.citations) or "(see step)",
}
)
for f in step.required_forms:
if f.verify or f.form_id == "verify-against-source":
items.append(
{
"step": step.id,
"what": "Form: %s (ID/details)" % f.name,
"source": f.source,
}
)
for d in step.deadlines:
if d.verify:
items.append(
{
"step": step.id,
"what": "Deadline: %s" % d.description,
"source": d.citation or d.basis,
}
)
return items
_ENGINE: Optional[ProcessEngine] = None
def get_engine() -> ProcessEngine:
global _ENGINE
if _ENGINE is None:
_ENGINE = ProcessEngine.from_yaml()
return _ENGINE
def reset_engine() -> None:
global _ENGINE
_ENGINE = None
|