Spaces:
Sleeping
Sleeping
| """Run controller for VentureForge UI. | |
| Provides a thin abstraction over the LangGraph pipeline so that the | |
| Gradio UI can: | |
| * start a new run (returns run_id) | |
| * poll the latest state for a run_id | |
| * request cancellation (interrupts the LangGraph execution) | |
| NOTE: The actual persistence is handled by the LangGraph SQLite | |
| checkpointer configured in ``src.graph``. ``GRAPH.get_state`` is used to | |
| recover the latest VentureForgeState snapshot for a given run_id. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import threading | |
| from typing import Optional | |
| from pydantic import ValidationError | |
| from src.graph import GRAPH | |
| from src.main import make_initial_state | |
| from src.state.schema import VentureForgeState | |
| logger = logging.getLogger(__name__) | |
| # Single active run tracking (hackathon scope: one run at a time) | |
| _active_thread: Optional[threading.Thread] = None | |
| _active_run_id: Optional[str] = None | |
| _cancel_requested: set[str] = set() | |
| class RunInProgressError(RuntimeError): | |
| """Raised when attempting to start a new run while one is active.""" | |
| def start_run( | |
| domain: str, | |
| *, | |
| max_pain_points: int | None = None, | |
| ideas_per_run: int | None = None, | |
| ) -> str: | |
| """Start a new pipeline run in a background thread. | |
| Returns the newly created ``run_id``. Raises ``RunInProgressError`` | |
| if a previous run is still active. | |
| """ | |
| global _active_thread, _active_run_id | |
| if _active_thread is not None and _active_thread.is_alive(): | |
| raise RunInProgressError("A run is already in progress") | |
| state = make_initial_state( | |
| domain, | |
| max_pain_points=max_pain_points, | |
| ideas_per_run=ideas_per_run, | |
| ) | |
| run_id = state.run_id | |
| def _worker() -> None: | |
| global _active_thread, _active_run_id | |
| import time | |
| last_step_time = time.time() | |
| timeout_seconds = 300 # 5 minutes per step | |
| try: | |
| logger.info(f"[run_controller] Starting pipeline run {run_id}") | |
| # Drive the pipeline via LangGraph streaming so we can | |
| # respond to cancellation requests between steps. The | |
| # checkpointer persists intermediate states; ``poll_state`` | |
| # reads them for the UI. | |
| step_count = 0 | |
| for chunk in GRAPH.stream( | |
| state, | |
| config={ | |
| "recursion_limit": 150, # Increased from 80 to handle retry loops | |
| "configurable": {"thread_id": run_id}, | |
| }, | |
| ): | |
| step_count += 1 | |
| current_time = time.time() | |
| elapsed = current_time - last_step_time | |
| # Log each step for debugging | |
| if chunk: | |
| node_name = list(chunk.keys())[0] if chunk else "unknown" | |
| logger.info(f"[run_controller] Step {step_count}: {node_name} (took {elapsed:.1f}s)") | |
| # Warn if step took too long | |
| if elapsed > timeout_seconds: | |
| logger.warning( | |
| f"[run_controller] Step {step_count} ({node_name}) took {elapsed:.1f}s " | |
| f"(timeout threshold: {timeout_seconds}s)" | |
| ) | |
| last_step_time = current_time | |
| # Check for cancellation between steps | |
| if is_cancel_requested(run_id): | |
| logger.info(f"[run_controller] Cancellation requested for {run_id}, stopping") | |
| # Update state to mark as cancelled | |
| current_state = poll_state(run_id) | |
| if current_state: | |
| GRAPH.update_state( | |
| config={"configurable": {"thread_id": run_id}}, | |
| values=current_state.mark_cancelled("User requested stop"), | |
| ) | |
| break | |
| logger.info(f"[run_controller] Pipeline run {run_id} completed after {step_count} steps") | |
| except Exception as exc: # pragma: no cover - surfaced via UI logs | |
| # Surface worker errors to stdout so they are visible when | |
| # running the Gradio UI. Without this, exceptions inside | |
| # the background thread can make the UI hang without any | |
| # clear error message. | |
| logger.error(f"[run_controller] Pipeline worker crashed: {exc!r}", exc_info=True) | |
| print(f"[run_controller] Pipeline worker crashed: {exc!r}", flush=True) | |
| finally: | |
| # Mark thread as finished; the last checkpoint remains | |
| # available for inspection. | |
| logger.info(f"[run_controller] Worker thread for {run_id} finished") | |
| _active_thread = None | |
| _active_run_id = None | |
| t = threading.Thread(target=_worker, daemon=True) | |
| _active_thread = t | |
| _active_run_id = run_id | |
| t.start() | |
| return run_id | |
| def poll_state(run_id: str) -> VentureForgeState | None: | |
| """Return the latest VentureForgeState for ``run_id`` from checkpoints. | |
| Returns ``None`` if no state has been persisted yet. | |
| """ | |
| stored = GRAPH.get_state(config={"configurable": {"thread_id": run_id}}) | |
| # ``stored`` is a LangGraph StoredState; ``values`` holds the latest | |
| # state dict that was checkpointed. | |
| if stored is None or not stored.values: | |
| # No meaningful state yet (LangGraph may persist an empty dict | |
| # before the first node runs). Treat this the same as "no | |
| # checkpoints" so the UI keeps waiting instead of raising a | |
| # validation error about missing required fields like ``domain``. | |
| return None | |
| try: | |
| return VentureForgeState.model_validate(stored.values) | |
| except ValidationError: | |
| # If the stored values are incomplete or from an older schema, | |
| # fail gracefully and let the UI keep polling. | |
| return None | |
| def request_cancel(run_id: str) -> None: | |
| """Record that the user requested cancellation for ``run_id``. | |
| The cancellation will take effect between LangGraph steps. If an agent | |
| is currently executing (e.g., waiting for an LLM response), the | |
| cancellation will be processed after that agent completes. | |
| """ | |
| logger.info(f"[run_controller] Cancellation requested for run {run_id}") | |
| _cancel_requested.add(run_id) | |
| def is_cancel_requested(run_id: str) -> bool: | |
| """Return True if cancellation was requested for this run_id.""" | |
| return run_id in _cancel_requested | |
| def is_run_active() -> bool: | |
| """Return True if a pipeline run is currently active.""" | |
| return _active_thread is not None and _active_thread.is_alive() | |