## Option 1: Run single flow with Prefect `ThreadPoolTaskRunner`.

This method will start Prefect ThreadPoolTaskRunner by default and run tasks concurrently. Here only one single task is being run, so we won't see multiple tasks being run concurrently. See https://docs.prefect.io/v3/concepts/task-runners#available-task-runners for details.

In [None]:
from pathlib import Path

from mlip_arena.models import REGISTRY
from mlip_arena.tasks.combustion.flow import hydrogen_combustion

model = "MACE-MPA"
run_dir = Path(".").parent / REGISTRY[model]["family"]

hydrogen_combustion.with_options(persist_result=True)(
 model=model, # can be MLIPEnum name or ASE calculator object
 run_dir=run_dir,
)

## Option 2: Run single flow with `DaskTaskRunner`.

To achieve true parallelism, we can use `DaskTaskRunner` or `RayTaskRunner` to ask HPC resourcces and submit tasks to the cluster for queuing and execution. The task are automatically resubmitted if killed or running overtime. This might be more desirable option becase this task takes longer. See https://docs.prefect.io/integrations/prefect-dask/index#prefect-dask for details.

In [None]:
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
from prefect_dask import DaskTaskRunner

nodes_per_alloc = 1
gpus_per_alloc = 1

cluster_kwargs = dict(
 cores=1,
 memory="64 GB",
 processes=1,
 shebang="#!/bin/bash",
 account="matgen",
 walltime="04:00:00",
 job_mem="0",
 job_script_prologue=[
 "source ~/.bashrc",
 "module load python",
 "source activate /pscratch/sd/c/cyrusyc/.conda/mlip-arena",
 ],
 job_directives_skip=["-n", "--cpus-per-task", "-J"],
 job_extra_directives=[
 "-J arena-combustion",
 "-q preempt",
 "--time-min=00:30:00",
 "--comment=12:00:00",
 f"-N {nodes_per_alloc}",
 "-C gpu",
 f"-G {gpus_per_alloc}",
 ],
)
cluster = SLURMCluster(**cluster_kwargs)
print(cluster.job_script())
cluster.adapt(minimum_jobs=1, maximum_jobs=2)
client = Client(cluster)

In [None]:
hydrogen_combustion.with_options(
 persist_result=True,
 task_runner=DaskTaskRunner(address=client.scheduler.address),
 log_prints=True,
)(model=model, run_dir=run_dir)