| """Run controller for VentureForge UI. |
| |
| Provides a thin abstraction over the LangGraph pipeline so that the |
| Gradio UI can: |
| |
| * start a new run (returns run_id) |
| * poll the latest state for a run_id |
| * request cancellation (interrupts the LangGraph execution) |
| |
| NOTE: The actual persistence is handled by the LangGraph SQLite |
| checkpointer configured in ``src.graph``. ``GRAPH.get_state`` is used to |
| recover the latest VentureForgeState snapshot for a given run_id. |
| """ |
| from __future__ import annotations |
|
|
| import logging |
| import threading |
| from typing import Optional |
|
|
| from pydantic import ValidationError |
|
|
| from src.graph import GRAPH |
| from src.main import make_initial_state |
| from src.state.schema import VentureForgeState |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| _active_thread: Optional[threading.Thread] = None |
| _active_run_id: Optional[str] = None |
| _cancel_requested: set[str] = set() |
|
|
|
|
| class RunInProgressError(RuntimeError): |
| """Raised when attempting to start a new run while one is active.""" |
|
|
|
|
| def start_run( |
| domain: str, |
| *, |
| max_pain_points: int | None = None, |
| ideas_per_run: int | None = None, |
| ) -> str: |
| """Start a new pipeline run in a background thread. |
| |
| Returns the newly created ``run_id``. Raises ``RunInProgressError`` |
| if a previous run is still active. |
| """ |
| global _active_thread, _active_run_id |
|
|
| if _active_thread is not None and _active_thread.is_alive(): |
| raise RunInProgressError("A run is already in progress") |
|
|
| state = make_initial_state( |
| domain, |
| max_pain_points=max_pain_points, |
| ideas_per_run=ideas_per_run, |
| ) |
| run_id = state.run_id |
|
|
| def _worker() -> None: |
| global _active_thread, _active_run_id |
| import time |
| last_step_time = time.time() |
| timeout_seconds = 300 |
| |
| try: |
| logger.info(f"[run_controller] Starting pipeline run {run_id}") |
| |
| |
| |
| |
| step_count = 0 |
| for chunk in GRAPH.stream( |
| state, |
| config={ |
| "recursion_limit": 150, |
| "configurable": {"thread_id": run_id}, |
| }, |
| ): |
| step_count += 1 |
| current_time = time.time() |
| elapsed = current_time - last_step_time |
| |
| |
| if chunk: |
| node_name = list(chunk.keys())[0] if chunk else "unknown" |
| logger.info(f"[run_controller] Step {step_count}: {node_name} (took {elapsed:.1f}s)") |
| |
| |
| if elapsed > timeout_seconds: |
| logger.warning( |
| f"[run_controller] Step {step_count} ({node_name}) took {elapsed:.1f}s " |
| f"(timeout threshold: {timeout_seconds}s)" |
| ) |
| |
| last_step_time = current_time |
| |
| |
| if is_cancel_requested(run_id): |
| logger.info(f"[run_controller] Cancellation requested for {run_id}, stopping") |
| |
| current_state = poll_state(run_id) |
| if current_state: |
| GRAPH.update_state( |
| config={"configurable": {"thread_id": run_id}}, |
| values=current_state.mark_cancelled("User requested stop"), |
| ) |
| break |
| |
| logger.info(f"[run_controller] Pipeline run {run_id} completed after {step_count} steps") |
| except Exception as exc: |
| |
| |
| |
| |
| logger.error(f"[run_controller] Pipeline worker crashed: {exc!r}", exc_info=True) |
| print(f"[run_controller] Pipeline worker crashed: {exc!r}", flush=True) |
| finally: |
| |
| |
| logger.info(f"[run_controller] Worker thread for {run_id} finished") |
| _active_thread = None |
| _active_run_id = None |
|
|
| t = threading.Thread(target=_worker, daemon=True) |
| _active_thread = t |
| _active_run_id = run_id |
| t.start() |
|
|
| return run_id |
|
|
|
|
| def poll_state(run_id: str) -> VentureForgeState | None: |
| """Return the latest VentureForgeState for ``run_id`` from checkpoints. |
| |
| Returns ``None`` if no state has been persisted yet. |
| """ |
| stored = GRAPH.get_state(config={"configurable": {"thread_id": run_id}}) |
| |
| |
| if stored is None or not stored.values: |
| |
| |
| |
| |
| return None |
|
|
| try: |
| return VentureForgeState.model_validate(stored.values) |
| except ValidationError: |
| |
| |
| return None |
|
|
|
|
| def request_cancel(run_id: str) -> None: |
| """Record that the user requested cancellation for ``run_id``. |
| |
| The cancellation will take effect between LangGraph steps. If an agent |
| is currently executing (e.g., waiting for an LLM response), the |
| cancellation will be processed after that agent completes. |
| """ |
| logger.info(f"[run_controller] Cancellation requested for run {run_id}") |
| _cancel_requested.add(run_id) |
|
|
|
|
| def is_cancel_requested(run_id: str) -> bool: |
| """Return True if cancellation was requested for this run_id.""" |
| return run_id in _cancel_requested |
|
|
|
|
| def is_run_active() -> bool: |
| """Return True if a pipeline run is currently active.""" |
| return _active_thread is not None and _active_thread.is_alive() |
|
|