File size: 7,995 Bytes
e6410cf | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 | # Workflow Runs API
This document outlines the API endpoints for running and managing workflow executions in PySpur.
## Run Workflow (Blocking)
**Description**: Executes a workflow synchronously and returns the outputs. This is a blocking call that waits for the workflow to complete before returning a response. If the workflow contains a human intervention node, it may pause execution and return a pause exception.
**URL**: `/wf/{workflow_id}/run/`
**Method**: POST
**Parameters**:
```python
workflow_id: str # ID of the workflow to run
```
**Request Payload**:
```python
class StartRunRequestSchema(BaseModel):
initial_inputs: Optional[Dict[str, Dict[str, Any]]] = None # Initial inputs for the workflow
parent_run_id: Optional[str] = None # ID of the parent run (for nested workflows)
files: Optional[Dict[str, List[str]]] = None # Files to use in the workflow
```
**Response Schema**:
```python
Dict[str, Any] # Dictionary of node outputs
```
## Start Run (Non-Blocking)
**Description**: Starts a workflow execution asynchronously and returns immediately with the run details. The workflow continues execution in the background. This is useful for long-running workflows where you don't want to wait for completion.
**URL**: `/wf/{workflow_id}/start_run/`
**Method**: POST
**Parameters**:
```python
workflow_id: str # ID of the workflow to run
```
**Request Payload**: Same as Run Workflow (Blocking)
**Response Schema**:
```python
class RunResponseSchema(BaseModel):
id: str # Run ID
workflow_id: str # ID of the workflow
workflow_version_id: Optional[str] # ID of the workflow version
workflow_version: Optional[WorkflowVersionResponseSchema] # Details of the workflow version
status: RunStatus # Current status of the run
start_time: datetime # When the run started
end_time: Optional[datetime] # When the run ended (if completed)
initial_inputs: Optional[Dict[str, Dict[str, Any]]] # Initial inputs to the workflow
outputs: Optional[Dict[str, Dict[str, Any]]] # Outputs from the workflow
tasks: List[TaskResponseSchema] # List of tasks in the run
parent_run_id: Optional[str] # ID of the parent run (if applicable)
run_type: str # Type of run (e.g., "interactive")
output_file_id: Optional[str] # ID of the output file
input_dataset_id: Optional[str] # ID of the input dataset
message: Optional[str] # Additional information about the run
duration: Optional[float] # Duration of the run in seconds
percentage_complete: float # Percentage of tasks completed
```
## Run Partial Workflow
**Description**: Executes a partial workflow starting from a specific node, using precomputed outputs for upstream nodes. This is useful for testing specific parts of a workflow without running the entire workflow.
**URL**: `/wf/{workflow_id}/run_partial/`
**Method**: POST
**Parameters**:
```python
workflow_id: str # ID of the workflow to run
```
**Request Payload**:
```python
class PartialRunRequestSchema(BaseModel):
node_id: str # ID of the node to start execution from
initial_inputs: Optional[Dict[str, Dict[str, Any]]] = None # Initial inputs for the workflow
partial_outputs: Optional[Dict[str, Dict[str, Any]]] = None # Precomputed outputs for upstream nodes
```
**Response Schema**:
```python
Dict[str, Any] # Dictionary of node outputs
```
## Start Batch Run
**Description**: Starts a batch execution of a workflow over a dataset. The workflow is run once for each row in the dataset, with dataset columns mapped to workflow inputs. Results are written to an output file.
**URL**: `/wf/{workflow_id}/start_batch_run/`
**Method**: POST
**Parameters**:
```python
workflow_id: str # ID of the workflow to run
```
**Request Payload**:
```python
class BatchRunRequestSchema(BaseModel):
dataset_id: str # ID of the dataset to use
mini_batch_size: int = 10 # Number of rows to process in each mini-batch
```
**Response Schema**: Same as Start Run (Non-Blocking)
## List Runs
**Description**: Lists all runs for a specific workflow with pagination support, ordered by start time descending. This endpoint also updates run status based on task status.
**URL**: `/wf/{workflow_id}/runs/`
**Method**: GET
**Parameters**:
```python
workflow_id: str # ID of the workflow
page: int # Page number (default: 1, min: 1)
page_size: int # Number of items per page (default: 10, min: 1, max: 100)
```
**Response Schema**:
```python
List[RunResponseSchema] # List of run details
```
## List Paused Workflows
**Description**: Lists all workflows that are currently in a paused state, with pagination support. This endpoint is useful for monitoring workflows that require human intervention.
**URL**: `/wf/paused_workflows/`
**Method**: GET
**Query Parameters**:
```python
page: int # Page number (default: 1, min: 1)
page_size: int # Number of items per page (default: 10, min: 1, max: 100)
```
**Response Schema**:
```python
List[PausedWorkflowResponseSchema]
```
Where `PausedWorkflowResponseSchema` contains:
```python
class PausedWorkflowResponseSchema(BaseModel):
run: RunResponseSchema # Information about the workflow run
current_pause: PauseHistoryResponseSchema # Details about the current pause state
workflow: WorkflowDefinitionSchema # The workflow definition
```
## Get Pause History
**Description**: Retrieves the pause history for a specific workflow run, showing when and why the workflow was paused, and any actions taken to resume it.
**URL**: `/wf/pause_history/{run_id}/`
**Method**: GET
**Parameters**:
```python
run_id: str # ID of the workflow run
```
**Response Schema**:
```python
List[PauseHistoryResponseSchema]
```
Where `PauseHistoryResponseSchema` contains:
```python
class PauseHistoryResponseSchema(BaseModel):
id: str # Synthetic ID for API compatibility
run_id: str # ID of the run
node_id: str # ID of the node where the pause occurred
pause_message: Optional[str] # Message explaining the pause reason
pause_time: datetime # When the workflow was paused
resume_time: Optional[datetime] # When the workflow was resumed (if applicable)
resume_user_id: Optional[str] # ID of the user who resumed the workflow
resume_action: Optional[PauseAction] # Action taken (APPROVE/DECLINE/OVERRIDE)
input_data: Optional[Dict[str, Any]] # Input data at the time of pause
comments: Optional[str] # Additional comments about the pause/resume
```
## Process Pause Action
**Description**: Processes an action on a paused workflow, allowing for approval, decline, or override of a workflow that has been paused for human intervention. The workflow will resume execution based on the action taken.
**URL**: `/wf/process_pause_action/{run_id}/`
**Method**: POST
**Parameters**:
```python
run_id: str # ID of the workflow run
```
**Request Payload**:
```python
class ResumeRunRequestSchema(BaseModel):
inputs: Dict[str, Any] # Human-provided inputs for the paused node
user_id: str # ID of the user resuming the workflow
action: PauseAction # Action taken (APPROVE/DECLINE/OVERRIDE)
comments: Optional[str] = None # Optional comments about the decision
```
**Response Schema**: Same as Start Run (Non-Blocking)
## Cancel Workflow
**Description**: Cancels a workflow that is currently paused or running. This will mark the run as CANCELED in the database and update all pending, running, and paused tasks to CANCELED as well.
**URL**: `/wf/cancel_workflow/{run_id}/`
**Method**: POST
**Parameters**:
```python
run_id: str # ID of the run to cancel
```
**Response Schema**: Same as Start Run (Non-Blocking) with a message indicating the workflow has been canceled successfully.
|