import asyncio import logging import time from typing import Set, List, Tuple, Optional, TYPE_CHECKING import uuid import ray from ray.experimental.workflow import workflow_context from ray.experimental.workflow import workflow_storage from ray.experimental.workflow.common import (Workflow, WorkflowStatus, WorkflowMetaData, StepType) from ray.experimental.workflow.step_executor import commit_step from ray.experimental.workflow.storage import get_global_storage from ray.experimental.workflow.workflow_access import ( flatten_workflow_output, get_or_create_management_actor, get_management_actor) if TYPE_CHECKING: from ray.experimental.workflow.step_executor import WorkflowExecutionResult logger = logging.getLogger(__name__) def run(entry_workflow: Workflow, workflow_id: Optional[str] = None, overwrite: bool = True) -> ray.ObjectRef: """Run a workflow asynchronously. # TODO(suquark): The current "run" always overwrite existing workflow. # We need to fix this later. """ store = get_global_storage() assert ray.is_initialized() if workflow_id is None: # Workflow ID format: {Entry workflow UUID}.{Unix time to nanoseconds} workflow_id = f"{str(uuid.uuid4())}.{time.time():.9f}" logger.info(f"Workflow job created. [id=\"{workflow_id}\", storage_url=" f"\"{store.storage_url}\"].") with workflow_context.workflow_step_context(workflow_id, store.storage_url): # checkpoint the workflow ws = workflow_storage.get_workflow_storage(workflow_id) commit_step(ws, "", entry_workflow) workflow_manager = get_or_create_management_actor() ignore_existing = (entry_workflow.data.step_type != StepType.FUNCTION) # NOTE: It is important to 'ray.get' the returned output. This # ensures caller of 'run()' holds the reference to the workflow # result. Otherwise if the actor removes the reference of the # workflow output, the caller may fail to resolve the result. result: "WorkflowExecutionResult" = ray.get( workflow_manager.run_or_resume.remote(workflow_id, ignore_existing)) if entry_workflow.data.step_type == StepType.FUNCTION: return flatten_workflow_output(workflow_id, result.persisted_output) else: return flatten_workflow_output(workflow_id, result.volatile_output) # TODO(suquark): support recovery with ObjectRef inputs. def resume(workflow_id: str) -> ray.ObjectRef: """Resume a workflow asynchronously. See "api.resume()" for details. """ storage = get_global_storage() logger.info(f"Resuming workflow [id=\"{workflow_id}\", storage_url=" f"\"{storage.storage_url}\"].") workflow_manager = get_or_create_management_actor() # NOTE: It is important to 'ray.get' the returned output. This # ensures caller of 'run()' holds the reference to the workflow # result. Otherwise if the actor removes the reference of the # workflow output, the caller may fail to resolve the result. result: "WorkflowExecutionResult" = ray.get( workflow_manager.run_or_resume.remote( workflow_id, ignore_existing=False)) logger.info(f"Workflow job {workflow_id} resumed.") return flatten_workflow_output(workflow_id, result.persisted_output) def get_output(workflow_id: str, name: Optional[str]) -> ray.ObjectRef: """Get the output of a running workflow. See "api.get_output()" for details. """ assert ray.is_initialized() try: workflow_manager = get_management_actor() except ValueError as e: raise ValueError( "Failed to connect to the workflow management " "actor. The workflow could have already failed. You can use " "workflow.resume() to resume the workflow.") from e output = ray.get(workflow_manager.get_output.remote(workflow_id, name)) return flatten_workflow_output(workflow_id, output) def cancel(workflow_id: str) -> None: try: workflow_manager = get_management_actor() ray.get(workflow_manager.cancel_workflow.remote(workflow_id)) except ValueError: wf_store = workflow_storage.get_workflow_storage(workflow_id) wf_store.save_workflow_meta(WorkflowMetaData(WorkflowStatus.CANCELED)) def get_status(workflow_id: str) -> Optional[WorkflowStatus]: try: workflow_manager = get_management_actor() running = ray.get( workflow_manager.is_workflow_running.remote(workflow_id)) except Exception: running = False if running: return WorkflowStatus.RUNNING store = workflow_storage.get_workflow_storage(workflow_id) meta = store.load_workflow_meta() if meta is None: raise ValueError(f"No such workflow_id {workflow_id}") return meta.status def list_all(status_filter: Set[WorkflowStatus] ) -> List[Tuple[str, WorkflowStatus]]: try: workflow_manager = get_management_actor() except ValueError: workflow_manager = None if workflow_manager is None: runnings = [] else: runnings = ray.get(workflow_manager.list_running_workflow.remote()) if WorkflowStatus.RUNNING in status_filter and len(status_filter) == 1: return [(r, WorkflowStatus.RUNNING) for r in runnings] runnings = set(runnings) # Here we don't have workflow id, so use empty one instead store = workflow_storage.get_workflow_storage("") ret = [] for (k, s) in store.list_workflow(): if s == WorkflowStatus.RUNNING and k not in runnings: s = WorkflowStatus.RESUMABLE if s in status_filter: ret.append((k, s)) return ret def resume_all(with_failed: bool) -> List[Tuple[str, ray.ObjectRef]]: filter_set = {WorkflowStatus.RESUMABLE} if with_failed: filter_set.add(WorkflowStatus.FAILED) all_failed = list_all(filter_set) try: workflow_manager = get_management_actor() except Exception as e: raise RuntimeError("Failed to get management actor") from e async def _resume_one(wid: str) -> Tuple[str, Optional[ray.ObjectRef]]: try: result: "WorkflowExecutionResult" = ( await workflow_manager.run_or_resume.remote(wid)) obj = flatten_workflow_output(wid, result.persisted_output) return wid, obj except Exception: logger.error(f"Failed to resume workflow {wid}") return (wid, None) ret = workflow_storage.asyncio_run( asyncio.gather(*[_resume_one(wid) for (wid, _) in all_failed])) return [(wid, obj) for (wid, obj) in ret if obj is not None]