File size: 7,995 Bytes
e6410cf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# Workflow Runs API

This document outlines the API endpoints for running and managing workflow executions in PySpur.

## Run Workflow (Blocking)

**Description**: Executes a workflow synchronously and returns the outputs. This is a blocking call that waits for the workflow to complete before returning a response. If the workflow contains a human intervention node, it may pause execution and return a pause exception.

**URL**: `/wf/{workflow_id}/run/`

**Method**: POST

**Parameters**:
```python
workflow_id: str  # ID of the workflow to run
```

**Request Payload**:
```python
class StartRunRequestSchema(BaseModel):
    initial_inputs: Optional[Dict[str, Dict[str, Any]]] = None  # Initial inputs for the workflow
    parent_run_id: Optional[str] = None  # ID of the parent run (for nested workflows)
    files: Optional[Dict[str, List[str]]] = None  # Files to use in the workflow
```

**Response Schema**:
```python
Dict[str, Any]  # Dictionary of node outputs
```

## Start Run (Non-Blocking)

**Description**: Starts a workflow execution asynchronously and returns immediately with the run details. The workflow continues execution in the background. This is useful for long-running workflows where you don't want to wait for completion.

**URL**: `/wf/{workflow_id}/start_run/`

**Method**: POST

**Parameters**:
```python
workflow_id: str  # ID of the workflow to run
```

**Request Payload**: Same as Run Workflow (Blocking)

**Response Schema**:
```python
class RunResponseSchema(BaseModel):
    id: str  # Run ID
    workflow_id: str  # ID of the workflow
    workflow_version_id: Optional[str]  # ID of the workflow version
    workflow_version: Optional[WorkflowVersionResponseSchema]  # Details of the workflow version
    status: RunStatus  # Current status of the run
    start_time: datetime  # When the run started
    end_time: Optional[datetime]  # When the run ended (if completed)
    initial_inputs: Optional[Dict[str, Dict[str, Any]]]  # Initial inputs to the workflow
    outputs: Optional[Dict[str, Dict[str, Any]]]  # Outputs from the workflow
    tasks: List[TaskResponseSchema]  # List of tasks in the run
    parent_run_id: Optional[str]  # ID of the parent run (if applicable)
    run_type: str  # Type of run (e.g., "interactive")
    output_file_id: Optional[str]  # ID of the output file
    input_dataset_id: Optional[str]  # ID of the input dataset
    message: Optional[str]  # Additional information about the run
    duration: Optional[float]  # Duration of the run in seconds
    percentage_complete: float  # Percentage of tasks completed
```

## Run Partial Workflow

**Description**: Executes a partial workflow starting from a specific node, using precomputed outputs for upstream nodes. This is useful for testing specific parts of a workflow without running the entire workflow.

**URL**: `/wf/{workflow_id}/run_partial/`

**Method**: POST

**Parameters**:
```python
workflow_id: str  # ID of the workflow to run
```

**Request Payload**:
```python
class PartialRunRequestSchema(BaseModel):
    node_id: str  # ID of the node to start execution from
    initial_inputs: Optional[Dict[str, Dict[str, Any]]] = None  # Initial inputs for the workflow
    partial_outputs: Optional[Dict[str, Dict[str, Any]]] = None  # Precomputed outputs for upstream nodes
```

**Response Schema**:
```python
Dict[str, Any]  # Dictionary of node outputs
```

## Start Batch Run

**Description**: Starts a batch execution of a workflow over a dataset. The workflow is run once for each row in the dataset, with dataset columns mapped to workflow inputs. Results are written to an output file.

**URL**: `/wf/{workflow_id}/start_batch_run/`

**Method**: POST

**Parameters**:
```python
workflow_id: str  # ID of the workflow to run
```

**Request Payload**:
```python
class BatchRunRequestSchema(BaseModel):
    dataset_id: str  # ID of the dataset to use
    mini_batch_size: int = 10  # Number of rows to process in each mini-batch
```

**Response Schema**: Same as Start Run (Non-Blocking)

## List Runs

**Description**: Lists all runs for a specific workflow with pagination support, ordered by start time descending. This endpoint also updates run status based on task status.

**URL**: `/wf/{workflow_id}/runs/`

**Method**: GET

**Parameters**:
```python
workflow_id: str  # ID of the workflow
page: int  # Page number (default: 1, min: 1)
page_size: int  # Number of items per page (default: 10, min: 1, max: 100)
```

**Response Schema**:
```python
List[RunResponseSchema]  # List of run details
```

## List Paused Workflows

**Description**: Lists all workflows that are currently in a paused state, with pagination support. This endpoint is useful for monitoring workflows that require human intervention.

**URL**: `/wf/paused_workflows/`

**Method**: GET

**Query Parameters**:
```python
page: int  # Page number (default: 1, min: 1)
page_size: int  # Number of items per page (default: 10, min: 1, max: 100)
```

**Response Schema**:
```python
List[PausedWorkflowResponseSchema]
```

Where `PausedWorkflowResponseSchema` contains:
```python
class PausedWorkflowResponseSchema(BaseModel):
    run: RunResponseSchema  # Information about the workflow run
    current_pause: PauseHistoryResponseSchema  # Details about the current pause state
    workflow: WorkflowDefinitionSchema  # The workflow definition
```

## Get Pause History

**Description**: Retrieves the pause history for a specific workflow run, showing when and why the workflow was paused, and any actions taken to resume it.

**URL**: `/wf/pause_history/{run_id}/`

**Method**: GET

**Parameters**:
```python
run_id: str  # ID of the workflow run
```

**Response Schema**:
```python
List[PauseHistoryResponseSchema]
```

Where `PauseHistoryResponseSchema` contains:
```python
class PauseHistoryResponseSchema(BaseModel):
    id: str  # Synthetic ID for API compatibility
    run_id: str  # ID of the run
    node_id: str  # ID of the node where the pause occurred
    pause_message: Optional[str]  # Message explaining the pause reason
    pause_time: datetime  # When the workflow was paused
    resume_time: Optional[datetime]  # When the workflow was resumed (if applicable)
    resume_user_id: Optional[str]  # ID of the user who resumed the workflow
    resume_action: Optional[PauseAction]  # Action taken (APPROVE/DECLINE/OVERRIDE)
    input_data: Optional[Dict[str, Any]]  # Input data at the time of pause
    comments: Optional[str]  # Additional comments about the pause/resume
```

## Process Pause Action

**Description**: Processes an action on a paused workflow, allowing for approval, decline, or override of a workflow that has been paused for human intervention. The workflow will resume execution based on the action taken.

**URL**: `/wf/process_pause_action/{run_id}/`

**Method**: POST

**Parameters**:
```python
run_id: str  # ID of the workflow run
```

**Request Payload**:
```python
class ResumeRunRequestSchema(BaseModel):
    inputs: Dict[str, Any]  # Human-provided inputs for the paused node
    user_id: str  # ID of the user resuming the workflow
    action: PauseAction  # Action taken (APPROVE/DECLINE/OVERRIDE)
    comments: Optional[str] = None  # Optional comments about the decision
```

**Response Schema**: Same as Start Run (Non-Blocking)

## Cancel Workflow

**Description**: Cancels a workflow that is currently paused or running. This will mark the run as CANCELED in the database and update all pending, running, and paused tasks to CANCELED as well.

**URL**: `/wf/cancel_workflow/{run_id}/`

**Method**: POST

**Parameters**:
```python
run_id: str  # ID of the run to cancel
```

**Response Schema**: Same as Start Run (Non-Blocking) with a message indicating the workflow has been canceled successfully.