Neil-YL commited on
Commit
0cbf4c9
·
verified ·
1 Parent(s): a3dd477
Files changed (4) hide show
  1. maintenance_flow.py +53 -0
  2. prefect.yaml +34 -0
  3. prefect_utils.py +28 -0
  4. prefectignore +41 -0
maintenance_flow.py ADDED
@@ -0,0 +1,53 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from prefect import flow,get_run_logger, pause_flow_run, settings
2
+ from prefect.blocks.notifications import SlackWebhook
3
+ from prefect.context import get_run_context
4
+ from DB_utils import set_maintenance_status,generate_empty_well,insert_maintenance_log
5
+ from prefect.input import RunInput
6
+
7
+ class UserInput(RunInput):
8
+ Maintenance_Completed : bool
9
+ name: str
10
+
11
+
12
+ @flow
13
+ def request_wells_maintenance(maintenance_type):
14
+ MESSAGE = "HITL request"
15
+ logger = get_run_logger()
16
+ slack_block = SlackWebhook.load("prefect-test")
17
+ message = str(MESSAGE)
18
+ flow_run = get_run_context().flow_run
19
+
20
+ if flow_run and settings.PREFECT_UI_URL:
21
+ flow_run_url = (
22
+ f"{settings.PREFECT_UI_URL.value()}/flow-runs/flow-run/{flow_run.id}"
23
+ )
24
+ message += f"\n\nOT2-LCM requests a {maintenance_type}, please open the <{flow_run_url}|paused flow run>, complete with your user name and then click 'Resume'"
25
+ slack_block.notify(message)
26
+
27
+ description = f"""
28
+ OT2-LCM requests a {maintenance_type}!
29
+ Please finish the maintenance and enter your name:
30
+ """
31
+
32
+ user_input = pause_flow_run(
33
+ wait_for_input=UserInput.with_initial_data(
34
+ description=description
35
+ ),
36
+ timeout = 3000 #Should we have a timeout, or just let it waits until maintenance complete?
37
+ )
38
+ #Same as above, and even without this timeout handler, following database update codes will not run
39
+ if not user_input:
40
+ logger.warning("[DEBUG]Flow resumed automatically due to timeout.")
41
+ return "Maintenance flow timed out with no user interaction. " \
42
+ "The requested maintenance was not carried out."
43
+
44
+ set_maintenance_status(maintenance_type,0)
45
+
46
+ if maintenance_type == "wellplate_maintenance":
47
+ generate_empty_well()
48
+ msg_out = f"Updating wells status on DB by {user_input.name}"
49
+ logger.info(msg_out)
50
+
51
+ insert_maintenance_log(maintenance_type, user_input.name)
52
+
53
+ return msg_out
prefect.yaml ADDED
@@ -0,0 +1,34 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Welcome to your prefect.yaml file! You can use this file for storing and managing
2
+ # configuration for deploying your flows. We recommend committing this file to source
3
+ # control along with your flow code.
4
+
5
+ # Generic metadata about this project
6
+ name: OT-2-LCM-test
7
+ prefect-version: 3.1.5
8
+
9
+ # build section allows you to manage and build docker images
10
+ build:
11
+
12
+ # push section allows you to manage if and how this project is uploaded to remote locations
13
+ push:
14
+
15
+ # pull section allows you to provide instructions for cloning this project in remote locations
16
+ pull:
17
+ - prefect.deployments.steps.set_working_directory:
18
+ directory: /home/user/app
19
+
20
+ # the deployments section allows you to provide configuration for deploying flows
21
+ deployments:
22
+ - name: wells-maintenance
23
+ version:
24
+ tags: []
25
+ concurrency_limit:
26
+ description:
27
+ entrypoint: maintenance_flow.py:request_wells_maintenance
28
+ parameters: {}
29
+ work_pool:
30
+ name: ot2-pool
31
+ work_queue_name:
32
+ job_variables: {}
33
+ enforce_parameter_schema: true
34
+ schedules: []
prefect_utils.py ADDED
@@ -0,0 +1,28 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import subprocess
2
+ import threading
3
+
4
+ def start_prefect_worker(work_pool_name: str = "ot2-pool"):
5
+ def worker_runner():
6
+ print(f"Starting Prefect Worker for work pool: {work_pool_name}")
7
+ subprocess.run([
8
+ "prefect", "worker", "start",
9
+ "--pool", work_pool_name
10
+ ])
11
+ worker_thread = threading.Thread(target=worker_runner, daemon=True)
12
+ worker_thread.start()
13
+ print("Prefect Worker started in background thread.")
14
+
15
+
16
+ from prefect.client import get_client
17
+
18
+ async def trigger_maintenance_request(maintenance_type: str):
19
+ async with get_client() as client:
20
+ # find deployment id
21
+ deployment = await client.read_deployment_by_name(
22
+ name="request-wells-maintenance/wells-maintenance"
23
+ )
24
+
25
+ await client.create_flow_run_from_deployment(
26
+ deployment_id=deployment.id,
27
+ parameters={"maintenance_type": maintenance_type}
28
+ )
prefectignore ADDED
@@ -0,0 +1,41 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # prefect artifacts
2
+ .prefectignore
3
+
4
+ # python artifacts
5
+ __pycache__/
6
+ *.py[cod]
7
+ *$py.class
8
+ *.egg-info/
9
+ *.egg
10
+
11
+ # Type checking artifacts
12
+ .mypy_cache/
13
+ .dmypy.json
14
+ dmypy.json
15
+ .pyre/
16
+
17
+ # IPython
18
+ profile_default/
19
+ ipython_config.py
20
+ *.ipynb_checkpoints/*
21
+
22
+ # Environments
23
+ .python-version
24
+ .env
25
+ .venv
26
+ env/
27
+ venv/
28
+
29
+ # MacOS
30
+ .DS_Store
31
+
32
+ # Dask
33
+ dask-worker-space/
34
+
35
+ # Editors
36
+ .idea/
37
+ .vscode/
38
+
39
+ # VCS
40
+ .git/
41
+ .hg/