HeartBot / pyspur /docs /api-reference /workflow-management.mdx
shibbir24's picture
Upload 520 files
e6410cf verified
# Workflow Management API
This document outlines the API endpoints for managing workflows in PySpur.
## Create Workflow
**Description**: Creates a new workflow. If no definition is provided, creates a default workflow with an input node. For chatbots, creates a workflow with required input/output fields for handling chat interactions. The workflow name will be made unique if a workflow with the same name already exists.
**URL**: `/wf/`
**Method**: POST
**Request Payload**:
```python
class WorkflowCreateRequestSchema(BaseModel):
name: str # Name of the workflow
description: str = "" # Description of the workflow
definition: Optional[WorkflowDefinitionSchema] = None # Definition of the workflow
```
Where `WorkflowDefinitionSchema` contains:
```python
class WorkflowDefinitionSchema(BaseModel):
nodes: List[WorkflowNodeSchema] # List of nodes in the workflow
links: List[WorkflowLinkSchema] # List of links between nodes
test_inputs: List[Dict[str, Any]] = [] # Test inputs for the workflow
spur_type: SpurType = SpurType.WORKFLOW # Type of workflow (WORKFLOW, CHATBOT, AGENT)
```
**Response Schema**:
```python
class WorkflowResponseSchema(BaseModel):
id: str # Workflow ID
name: str # Name of the workflow
description: Optional[str] # Description of the workflow
definition: WorkflowDefinitionSchema # Definition of the workflow
created_at: datetime # When the workflow was created
updated_at: datetime # When the workflow was last updated
```
## Update Workflow
**Description**: Updates an existing workflow's definition, name, and description. The workflow definition is required for updates. This endpoint allows for modifying the structure and behavior of a workflow.
**URL**: `/wf/{workflow_id}/`
**Method**: PUT
**Parameters**:
```python
workflow_id: str # ID of the workflow to update
```
**Request Payload**: Same as Create Workflow
**Response Schema**: Same as Create Workflow
## List Workflows
**Description**: Lists all workflows with pagination support, ordered by creation date descending. Only valid workflows that can be properly validated are included in the response.
**URL**: `/wf/`
**Method**: GET
**Query Parameters**:
```python
page: int # Page number (default: 1, min: 1)
page_size: int # Number of items per page (default: 10, min: 1, max: 100)
```
**Response Schema**:
```python
List[WorkflowResponseSchema]
```
## Get Workflow
**Description**: Retrieves a specific workflow by its ID, including its complete definition, metadata, and timestamps.
**URL**: `/wf/{workflow_id}/`
**Method**: GET
**Parameters**:
```python
workflow_id: str # ID of the workflow to retrieve
```
**Response Schema**: Same as Create Workflow
## Reset Workflow
**Description**: Resets a workflow to its initial state with just an input node. This is useful when you want to start over with a workflow design without deleting and recreating it.
**URL**: `/wf/{workflow_id}/reset/`
**Method**: PUT
**Parameters**:
```python
workflow_id: str # ID of the workflow to reset
```
**Response Schema**: Same as Create Workflow
## Delete Workflow
**Description**: Deletes a workflow and its associated test files. This operation is permanent and will remove all data related to the workflow, including test files stored in the file system.
**URL**: `/wf/{workflow_id}/`
**Method**: DELETE
**Parameters**:
```python
workflow_id: str # ID of the workflow to delete
```
**Response**: 204 No Content
## Duplicate Workflow
**Description**: Creates a copy of an existing workflow with "(Copy)" appended to its name. This is useful for creating variations of a workflow without modifying the original.
**URL**: `/wf/{workflow_id}/duplicate/`
**Method**: POST
**Parameters**:
```python
workflow_id: str # ID of the workflow to duplicate
```
**Response Schema**: Same as Create Workflow
## Get Workflow Output Variables
**Description**: Retrieves the output variables (leaf nodes) of a workflow, including their node IDs and variable names. This is useful for understanding what outputs are available from a workflow.
**URL**: `/wf/{workflow_id}/output_variables/`
**Method**: GET
**Parameters**:
```python
workflow_id: str # ID of the workflow
```
**Response Schema**:
```python
List[Dict[str, str]] # List of output variables with node IDs and variable names
```
Each dictionary in the list contains:
```python
{
"node_id": str, # ID of the node
"variable_name": str, # Name of the output variable
"prefixed_variable": str # Variable name prefixed with node ID (node_id-variable_name)
}
```
## Upload Test Files
**Description**: Uploads test files for a specific node in a workflow and returns their paths. The files are stored in a workflow-specific directory and can be used as inputs for testing the workflow.
**URL**: `/wf/upload_test_files/`
**Method**: POST
**Form Data**:
```python
workflow_id: str # ID of the workflow
files: List[UploadFile] # List of files to upload
node_id: str # ID of the node to associate files with
```
**Response Schema**:
```python
Dict[str, List[str]] # Dictionary mapping node ID to list of file paths
```
Example:
```python
{
"node_id": ["test_files/workflow_id/timestamp_filename.ext", ...]
}
```
## Get Workflow Versions
**Description**: Retrieves all versions of a workflow, ordered by version number descending, with pagination support. This allows tracking the evolution of a workflow over time and reverting to previous versions if needed.
**URL**: `/wf/{workflow_id}/versions/`
**Method**: GET
**Parameters**:
```python
workflow_id: str # ID of the workflow
page: int # Page number (default: 1, min: 1)
page_size: int # Number of items per page (default: 10, min: 1, max: 100)
```
**Response Schema**:
```python
List[WorkflowVersionResponseSchema]
```
Where `WorkflowVersionResponseSchema` contains:
```python
class WorkflowVersionResponseSchema(BaseModel):
version: int # Version number
name: str # Name of the workflow version
description: Optional[str] # Description of the workflow version
definition: Any # Definition of the workflow version
definition_hash: str # Hash of the definition for tracking changes
created_at: datetime # When the version was created
updated_at: datetime # When the version was last updated
```
## List Paused Workflows
**Description**: Lists all workflows that are currently in a paused state, with pagination support. This endpoint is useful for monitoring workflows that require human intervention.
**URL**: `/wf/paused_workflows/`
**Method**: GET
**Query Parameters**:
```python
page: int # Page number (default: 1, min: 1)
page_size: int # Number of items per page (default: 10, min: 1, max: 100)
```
**Response Schema**:
```python
List[PausedWorkflowResponseSchema]
```
Where `PausedWorkflowResponseSchema` contains:
```python
class PausedWorkflowResponseSchema(BaseModel):
run: RunResponseSchema # Information about the workflow run
current_pause: PauseHistoryResponseSchema # Details about the current pause state
workflow: WorkflowDefinitionSchema # The workflow definition
```
## Get Pause History
**Description**: Retrieves the pause history for a specific workflow run, showing when and why the workflow was paused, and any actions taken to resume it.
**URL**: `/wf/pause_history/{run_id}/`
**Method**: GET
**Parameters**:
```python
run_id: str # ID of the workflow run
```
**Response Schema**:
```python
List[PauseHistoryResponseSchema]
```
Where `PauseHistoryResponseSchema` contains:
```python
class PauseHistoryResponseSchema(BaseModel):
id: str # Synthetic ID for API compatibility
run_id: str # ID of the run
node_id: str # ID of the node where the pause occurred
pause_message: Optional[str] # Message explaining the pause reason
pause_time: datetime # When the workflow was paused
resume_time: Optional[datetime] # When the workflow was resumed (if applicable)
resume_user_id: Optional[str] # ID of the user who resumed the workflow
resume_action: Optional[PauseAction] # Action taken (APPROVE/DECLINE/OVERRIDE)
input_data: Optional[Dict[str, Any]] # Input data at the time of pause
comments: Optional[str] # Additional comments about the pause/resume
```
## Process Pause Action
**Description**: Processes an action on a paused workflow, allowing for approval, decline, or override of a workflow that has been paused for human intervention. The workflow will resume execution based on the action taken.
**URL**: `/wf/process_pause_action/{run_id}/`
**Method**: POST
**Parameters**:
```python
run_id: str # ID of the workflow run
```
**Request Payload**:
```python
class ResumeRunRequestSchema(BaseModel):
inputs: Dict[str, Any] # Human-provided inputs for the paused node
user_id: str # ID of the user resuming the workflow
action: PauseAction # Action taken (APPROVE/DECLINE/OVERRIDE)
comments: Optional[str] = None # Optional comments about the decision
```
**Response Schema**:
```python
class RunResponseSchema(BaseModel):
id: str # Run ID
workflow_id: str # ID of the workflow
workflow_version_id: Optional[str] # ID of the workflow version
workflow_version: Optional[WorkflowVersionResponseSchema] # Details of the workflow version
status: RunStatus # Current status of the run
start_time: datetime # When the run started
end_time: Optional[datetime] # When the run ended (if completed)
initial_inputs: Optional[Dict[str, Dict[str, Any]]] # Initial inputs to the workflow
outputs: Optional[Dict[str, Dict[str, Any]]] # Outputs from the workflow
tasks: List[TaskResponseSchema] # List of tasks in the run
parent_run_id: Optional[str] # ID of the parent run (if applicable)
run_type: str # Type of run (e.g., "interactive")
output_file_id: Optional[str] # ID of the output file
input_dataset_id: Optional[str] # ID of the input dataset
message: Optional[str] # Additional information about the run
duration: Optional[float] # Duration of the run in seconds
percentage_complete: float # Percentage of tasks completed
```