| """ |
| delivery.decisions.queue — DecisionQueue data model + builder. |
| |
| Same pattern as evidence views: pure data here, rendering elsewhere. The |
| builder takes the raw rows ``DecisionQueueRepository.list_for_tenant`` |
| emits plus the ``status_summary`` counts, so the queue object can be built |
| in tests without touching the DB. |
| """ |
| from __future__ import annotations |
|
|
| from dataclasses import dataclass, field |
| from typing import Any, Dict, List, Mapping, Optional |
|
|
| STATUS_ORDER = ("open", "in_progress", "resolved", "dismissed") |
|
|
|
|
| @dataclass |
| class DecisionQueueItem: |
| decision_id: str |
| issue_id: str |
| run_id: str |
| entity_id: str |
| entity_type: str |
| urgency: float |
| recommendation: str |
| confidence: float |
| status: str |
| owner: Optional[str] |
| triage_notes: Optional[str] |
| run_finished_at: str |
| updated_at: Optional[str] |
|
|
|
|
| @dataclass |
| class DecisionQueue: |
| tenant_id: str |
| status_filter: Optional[str] |
| items: List[DecisionQueueItem] = field(default_factory=list) |
| status_counts: Dict[str, int] = field(default_factory=dict) |
|
|
|
|
| def _item_from_row(row: Mapping[str, Any]) -> DecisionQueueItem: |
| return DecisionQueueItem( |
| decision_id=str(row.get("decision_id", "")), |
| issue_id=str(row.get("issue_id", "")), |
| run_id=str(row.get("run_id", "")), |
| entity_id=str(row.get("entity_id", "")), |
| entity_type=str(row.get("entity_type", "")), |
| urgency=float(row.get("urgency", 0.0)), |
| recommendation=str(row.get("recommendation", "")), |
| confidence=float(row.get("confidence", 0.0)), |
| status=str(row.get("status", "open")), |
| owner=row.get("owner") or None, |
| triage_notes=row.get("triage_notes") or None, |
| run_finished_at=str(row.get("run_finished_at", "")), |
| updated_at=row.get("updated_at") or None, |
| ) |
|
|
|
|
| def build_queue( |
| tenant_id: str, |
| rows: List[Mapping[str, Any]], |
| status_counts: Mapping[str, int], |
| status_filter: Optional[str] = None, |
| ) -> DecisionQueue: |
| """Build a DecisionQueue from raw repository rows + a counts dict. |
| |
| Rows are expected in the order the repository returned them (urgency |
| desc); we do not re-sort here so the queue mirrors what the DB ranks. |
| Counts are normalised so every known status is present, with 0 for any |
| status the tenant has no decisions in. |
| """ |
| counts: Dict[str, int] = {s: 0 for s in STATUS_ORDER} |
| counts.update(status_counts) |
| return DecisionQueue( |
| tenant_id=tenant_id, |
| status_filter=status_filter, |
| items=[_item_from_row(r) for r in rows], |
| status_counts=counts, |
| ) |
|
|
|
|
| def build_queue_from_service( |
| service, |
| tenant_id: str, |
| status: Optional[str] = "open", |
| owner: Optional[str] = None, |
| entity_id: Optional[str] = None, |
| limit: int = 50, |
| offset: int = 0, |
| ) -> DecisionQueue: |
| """Convenience wrapper: fetch rows + counts via an OrgStateService. |
| |
| ``status='open'`` by default — the typical triage workflow surfaces |
| only the work that still needs attention. Pass ``status=None`` for an |
| unfiltered queue. |
| """ |
| rows = service.list_decisions( |
| tenant_id, status=status, owner=owner, entity_id=entity_id, |
| limit=limit, offset=offset, |
| ) |
| counts = service.decision_summary(tenant_id) |
| return build_queue(tenant_id, rows, counts, status_filter=status) |
|
|