add prefect API and flow for add task
Browse files- DB_utls.py +4 -4
- app.py +6 -1
- prefect_utils.py +9 -0
DB_utls.py
CHANGED
|
@@ -16,7 +16,7 @@ connection_string = blinded_connection_string.replace("<db_password>", MONGODB_P
|
|
| 16 |
def generate_empty_well():
|
| 17 |
dbclient = MongoClient(connection_string)
|
| 18 |
db = dbclient["LCM-OT-2-SLD"]
|
| 19 |
-
collection = db["wells"]
|
| 20 |
rows = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H']
|
| 21 |
columns = [str(i) for i in range(1, 13)]
|
| 22 |
for row in rows:
|
|
@@ -24,7 +24,7 @@ def generate_empty_well():
|
|
| 24 |
well = f"{row}{col}"
|
| 25 |
metadata = {
|
| 26 |
"well": well,
|
| 27 |
-
"status": "
|
| 28 |
"project": "OT2"
|
| 29 |
}
|
| 30 |
|
|
@@ -40,7 +40,7 @@ def generate_empty_well():
|
|
| 40 |
def update_used_wells(used_wells):
|
| 41 |
dbclient = MongoClient(connection_string)
|
| 42 |
db = dbclient["LCM-OT-2-SLD"]
|
| 43 |
-
collection = db["wells"]
|
| 44 |
|
| 45 |
for well in used_wells:
|
| 46 |
metadata = {
|
|
@@ -60,7 +60,7 @@ def update_used_wells(used_wells):
|
|
| 60 |
def find_unused_wells():
|
| 61 |
dbclient = MongoClient(connection_string)
|
| 62 |
db = dbclient["LCM-OT-2-SLD"]
|
| 63 |
-
collection = db["wells"]
|
| 64 |
query = {"status": "empty"}
|
| 65 |
response = list(collection.find(query))
|
| 66 |
df = pd.DataFrame(response)
|
|
|
|
| 16 |
def generate_empty_well():
|
| 17 |
dbclient = MongoClient(connection_string)
|
| 18 |
db = dbclient["LCM-OT-2-SLD"]
|
| 19 |
+
collection = db["wells-test"]
|
| 20 |
rows = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H']
|
| 21 |
columns = [str(i) for i in range(1, 13)]
|
| 22 |
for row in rows:
|
|
|
|
| 24 |
well = f"{row}{col}"
|
| 25 |
metadata = {
|
| 26 |
"well": well,
|
| 27 |
+
"status": "used",
|
| 28 |
"project": "OT2"
|
| 29 |
}
|
| 30 |
|
|
|
|
| 40 |
def update_used_wells(used_wells):
|
| 41 |
dbclient = MongoClient(connection_string)
|
| 42 |
db = dbclient["LCM-OT-2-SLD"]
|
| 43 |
+
collection = db["wells-test"]
|
| 44 |
|
| 45 |
for well in used_wells:
|
| 46 |
metadata = {
|
|
|
|
| 60 |
def find_unused_wells():
|
| 61 |
dbclient = MongoClient(connection_string)
|
| 62 |
db = dbclient["LCM-OT-2-SLD"]
|
| 63 |
+
collection = db["wells-test"]
|
| 64 |
query = {"status": "empty"}
|
| 65 |
response = list(collection.find(query))
|
| 66 |
df = pd.DataFrame(response)
|
app.py
CHANGED
|
@@ -8,6 +8,8 @@ import secrets
|
|
| 8 |
from DB_utls import find_unused_wells, update_used_wells, save_result, get_student_quota, decrement_student_quota
|
| 9 |
import os
|
| 10 |
from yt_utils import get_latest_video_id
|
|
|
|
|
|
|
| 11 |
|
| 12 |
# NOTE: New global dict to store tasks keyed by (student_id, experiment_id)
|
| 13 |
tasks_dict = {}
|
|
@@ -39,6 +41,9 @@ playlist_id = "PL8uZlc2CEpenUCAo9PeiOtbU04lFv48yD"
|
|
| 39 |
|
| 40 |
video_id = get_latest_video_id(channel_id, playlist_id = "PL8uZlc2CEpenUCAo9PeiOtbU04lFv48yD")
|
| 41 |
|
|
|
|
|
|
|
|
|
|
| 42 |
|
| 43 |
def check_student_quota(student_id):
|
| 44 |
"""Check student's remaining experiment quota"""
|
|
@@ -268,7 +273,7 @@ def update_queue_display():
|
|
| 268 |
except Exception as e:
|
| 269 |
return f"Error getting queue status: {str(e)}"
|
| 270 |
|
| 271 |
-
|
| 272 |
def add_to_queue(student_id, R, Y, B):
|
| 273 |
global queue_counter
|
| 274 |
|
|
|
|
| 8 |
from DB_utls import find_unused_wells, update_used_wells, save_result, get_student_quota, decrement_student_quota
|
| 9 |
import os
|
| 10 |
from yt_utils import get_latest_video_id
|
| 11 |
+
from prefect import flow
|
| 12 |
+
from prefect_utils import get_prefect_client
|
| 13 |
|
| 14 |
# NOTE: New global dict to store tasks keyed by (student_id, experiment_id)
|
| 15 |
tasks_dict = {}
|
|
|
|
| 41 |
|
| 42 |
video_id = get_latest_video_id(channel_id, playlist_id = "PL8uZlc2CEpenUCAo9PeiOtbU04lFv48yD")
|
| 43 |
|
| 44 |
+
Prefect_client = get_prefect_client()
|
| 45 |
+
print("[TEST]Prefect client initialized")
|
| 46 |
+
|
| 47 |
|
| 48 |
def check_student_quota(student_id):
|
| 49 |
"""Check student's remaining experiment quota"""
|
|
|
|
| 273 |
except Exception as e:
|
| 274 |
return f"Error getting queue status: {str(e)}"
|
| 275 |
|
| 276 |
+
@flow
|
| 277 |
def add_to_queue(student_id, R, Y, B):
|
| 278 |
global queue_counter
|
| 279 |
|
prefect_utils.py
ADDED
|
@@ -0,0 +1,9 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
from prefect import get_client
|
| 2 |
+
from prefect.client.orchestration import PrefectClient
|
| 3 |
+
import os
|
| 4 |
+
|
| 5 |
+
async def get_prefect_client() -> PrefectClient:
|
| 6 |
+
api_key = os.getenv("PREFECT_API_KEY")
|
| 7 |
+
if not api_key:
|
| 8 |
+
raise RuntimeError("PREFECT_API_KEY is missing")
|
| 9 |
+
return await get_client()
|