| | """ |
| | AgentIC Human-in-the-Loop Approval Manager |
| | Manages per-design, per-stage asyncio Events for pause/resume orchestration. |
| | """ |
| | import asyncio |
| | import threading |
| | import time |
| | from dataclasses import dataclass, field |
| | from typing import Any, Dict, List, Optional |
| |
|
| |
|
| | @dataclass |
| | class StageApproval: |
| | """Tracks the approval state for a single stage in a single design build.""" |
| | stage: str |
| | design_name: str |
| | event: threading.Event = field(default_factory=threading.Event) |
| | approved: bool = False |
| | rejected: bool = False |
| | feedback: Optional[str] = None |
| | timestamp: float = field(default_factory=time.time) |
| |
|
| |
|
| | class ApprovalManager: |
| | """Per-design, per-stage approval gate manager. |
| | |
| | Uses threading.Events since the orchestrator runs in a background thread |
| | (not asyncio). The web API (FastAPI/async) signals the events from |
| | async handlers via thread-safe Event.set(). |
| | """ |
| |
|
| | def __init__(self): |
| | |
| | self._gates: Dict[tuple, StageApproval] = {} |
| | self._lock = threading.Lock() |
| | |
| | self._pending_feedback: Dict[str, str] = {} |
| | |
| | self._stage_data: Dict[tuple, dict] = {} |
| |
|
| | def create_gate(self, design_name: str, stage: str) -> StageApproval: |
| | """Create an approval gate for a specific design+stage.""" |
| | key = (design_name, stage) |
| | with self._lock: |
| | gate = StageApproval(stage=stage, design_name=design_name) |
| | self._gates[key] = gate |
| | return gate |
| |
|
| | def wait_for_approval(self, design_name: str, stage: str, timeout: float = 7200.0) -> StageApproval: |
| | """Block the calling thread until approval or rejection is signalled. |
| | |
| | This is called from the build thread (not async). It blocks until |
| | the FastAPI async handler calls approve() or reject(). |
| | |
| | Args: |
| | design_name: The design being built |
| | stage: The current stage name |
| | timeout: Max seconds to wait (default 2 hours) |
| | |
| | Returns: |
| | The StageApproval with approved/rejected/feedback set |
| | """ |
| | key = (design_name, stage) |
| | with self._lock: |
| | gate = self._gates.get(key) |
| | if gate is None: |
| | gate = self.create_gate(design_name, stage) |
| |
|
| | |
| | gate.event.wait(timeout=timeout) |
| | return gate |
| |
|
| | def approve(self, design_name: str, stage: str) -> bool: |
| | """Signal approval for a waiting stage. Called from async endpoint.""" |
| | key = (design_name, stage) |
| | with self._lock: |
| | gate = self._gates.get(key) |
| | if gate is None: |
| | return False |
| | gate.approved = True |
| | gate.rejected = False |
| | gate.event.set() |
| | return True |
| |
|
| | def reject(self, design_name: str, stage: str, feedback: Optional[str] = None) -> bool: |
| | """Signal rejection for a waiting stage. Called from async endpoint.""" |
| | key = (design_name, stage) |
| | with self._lock: |
| | gate = self._gates.get(key) |
| | if gate is None: |
| | return False |
| | gate.approved = False |
| | gate.rejected = True |
| | gate.feedback = feedback |
| | gate.event.set() |
| | |
| | if feedback: |
| | self._pending_feedback[design_name] = feedback |
| | return True |
| |
|
| | def get_pending_feedback(self, design_name: str) -> Optional[str]: |
| | """Retrieve and clear pending user feedback for a design.""" |
| | with self._lock: |
| | return self._pending_feedback.pop(design_name, None) |
| |
|
| | def store_stage_data(self, design_name: str, stage: str, data: dict): |
| | """Store stage completion data for retrieval by frontend.""" |
| | key = (design_name, stage) |
| | with self._lock: |
| | self._stage_data[key] = data |
| |
|
| | def get_stage_data(self, design_name: str, stage: str) -> Optional[dict]: |
| | """Retrieve stage completion data.""" |
| | key = (design_name, stage) |
| | with self._lock: |
| | return self._stage_data.get(key) |
| |
|
| | def cleanup(self, design_name: str): |
| | """Remove all gates for a completed/failed design.""" |
| | with self._lock: |
| | keys_to_remove = [k for k in self._gates if k[0] == design_name] |
| | for k in keys_to_remove: |
| | del self._gates[k] |
| | keys_to_remove = [k for k in self._stage_data if k[0] == design_name] |
| | for k in keys_to_remove: |
| | del self._stage_data[k] |
| | self._pending_feedback.pop(design_name, None) |
| |
|
| | def get_waiting_stages(self) -> List[dict]: |
| | """List all stages currently waiting for approval.""" |
| | with self._lock: |
| | waiting = [] |
| | for key, gate in self._gates.items(): |
| | if not gate.event.is_set(): |
| | waiting.append({ |
| | "design_name": gate.design_name, |
| | "stage": gate.stage, |
| | "waiting_since": gate.timestamp, |
| | }) |
| | return waiting |
| |
|
| |
|
| | |
| | approval_manager = ApprovalManager() |
| |
|