| """Process engine: load + validate flows, walk steps and branches. |
| |
| Loads the YAML flow(s), validates against the schema, and answers the questions |
| the UI needs: what's the next step, what branches are available here, what does |
| this step require, and is everything referenced actually defined. |
| """ |
| from __future__ import annotations |
|
|
| from pathlib import Path |
| from typing import Dict, List, Optional |
|
|
| import yaml |
|
|
| from app.process.schema import Flow, Step |
|
|
| FLOWS_DIR = Path(__file__).resolve().parent / "flows" |
|
|
|
|
| class ProcessEngine: |
| def __init__(self, flow: Flow) -> None: |
| self.flow = flow |
| self._steps: Dict[str, Step] = flow.step_map() |
| self.validate() |
|
|
| |
| @classmethod |
| def from_yaml(cls, path: Optional[Path] = None) -> "ProcessEngine": |
| path = path or (FLOWS_DIR / "il_divorce.yaml") |
| with open(path, "r", encoding="utf-8") as fh: |
| data = yaml.safe_load(fh) |
| flow = Flow(**data) |
| return cls(flow) |
|
|
| |
| def validate(self) -> None: |
| ids = set(self._steps) |
| if self.flow.start not in ids: |
| raise ValueError("start step '%s' is not defined" % self.flow.start) |
| for step in self.flow.steps: |
| for tr in step.transitions: |
| if tr.to not in ids: |
| raise ValueError( |
| "step '%s' transitions to unknown step '%s'" |
| % (step.id, tr.to) |
| ) |
| for ht in step.help_triggers: |
| if ht.to not in ids: |
| raise ValueError( |
| "step '%s' help_trigger points to unknown step '%s'" |
| % (step.id, ht.to) |
| ) |
|
|
| |
| def get(self, step_id: str) -> Optional[Step]: |
| return self._steps.get(step_id) |
|
|
| @property |
| def start(self) -> Step: |
| return self._steps[self.flow.start] |
|
|
| def main_line(self) -> List[Step]: |
| return self.flow.main_line() |
|
|
| def subflows(self) -> List[Step]: |
| return self.flow.subflows() |
|
|
| def next_steps(self, step_id: str) -> List[Step]: |
| """All steps directly reachable from this one (default + conditional).""" |
| step = self.get(step_id) |
| if not step: |
| return [] |
| out, seen = [], set() |
| for tr in step.transitions: |
| if tr.to not in seen and tr.to in self._steps: |
| seen.add(tr.to) |
| out.append(self._steps[tr.to]) |
| return out |
|
|
| def branches(self, step_id: str) -> List[Step]: |
| """Conditional next-steps + help-trigger sub-flows for this step.""" |
| step = self.get(step_id) |
| if not step: |
| return [] |
| out, seen = [], set() |
| for tr in step.transitions: |
| if tr.when and tr.to in self._steps and tr.to not in seen: |
| seen.add(tr.to) |
| out.append(self._steps[tr.to]) |
| for ht in step.help_triggers: |
| if ht.to in self._steps and ht.to not in seen: |
| seen.add(ht.to) |
| out.append(self._steps[ht.to]) |
| return out |
|
|
| def verify_items(self) -> List[Dict[str, str]]: |
| """Every place flagged ``verify: true`` — the human-review checklist.""" |
| items: List[Dict[str, str]] = [] |
| for step in self.flow.steps: |
| if step.verify: |
| items.append( |
| { |
| "step": step.id, |
| "what": "Step-level details need verification", |
| "source": "; ".join(step.citations) or "(see step)", |
| } |
| ) |
| for f in step.required_forms: |
| if f.verify or f.form_id == "verify-against-source": |
| items.append( |
| { |
| "step": step.id, |
| "what": "Form: %s (ID/details)" % f.name, |
| "source": f.source, |
| } |
| ) |
| for d in step.deadlines: |
| if d.verify: |
| items.append( |
| { |
| "step": step.id, |
| "what": "Deadline: %s" % d.description, |
| "source": d.citation or d.basis, |
| } |
| ) |
| return items |
|
|
|
|
| _ENGINE: Optional[ProcessEngine] = None |
|
|
|
|
| def get_engine() -> ProcessEngine: |
| global _ENGINE |
| if _ENGINE is None: |
| _ENGINE = ProcessEngine.from_yaml() |
| return _ENGINE |
|
|
|
|
| def reset_engine() -> None: |
| global _ENGINE |
| _ENGINE = None |
|
|