| """ |
| Backend-only Modal app for GUI-driven Protify workflows. |
| |
| This module intentionally avoids browser UI dependencies. It exposes remote |
| functions that the local Tk GUI can call to deploy, submit jobs, monitor status, |
| cancel jobs, and fetch artifacts. |
| """ |
|
|
| import base64 |
| import json |
| import os |
| import random |
| import shutil |
| import string |
| import subprocess |
| import threading |
| import time |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from typing import Any, Dict, Optional |
|
|
| import modal |
| import yaml |
|
|
|
|
| SCRIPT_DIR = Path(__file__).parent.resolve() |
| PROJECT_ROOT = SCRIPT_DIR.parents[1] |
|
|
| APP_NAME = "protify-backend" |
| PROTIFY_DEFAULT_GPU = "A10" |
| AVAILABLE_GPUS = ["H200", "H100", "A100-80GB", "A100", "L40S", "A10", "L4", "T4"] |
|
|
| GPU_CPU_MIN, GPU_CPU_MAX = 8.0, 16.0 |
| GPU_MEMORY_MIN, GPU_MEMORY_MAX = 65536, 262144 |
| MAX_CONTAINERS_GPU = 8 |
| CPU_MEMORY_MIN, CPU_MEMORY_MAX = 4096, 8192 |
| CPU_COUNT_MIN, CPU_COUNT_MAX = 2.0, 4.0 |
| MAX_CONTAINERS_CPU = 10 |
| SCALEDOWN_WINDOW_GPU = 10 |
| SCALEDOWN_WINDOW_CPU = 300 |
| TIMEOUT_SECONDS = 86400 |
| HEARTBEAT_SECONDS = 10 |
|
|
| STATUS_FILE_PATH = "/data/job_status.json" |
| LOG_DIR_DEFAULT = "/data/logs" |
| RESULTS_DIR_DEFAULT = "/data/results" |
| PLOTS_DIR_DEFAULT = "/data/plots" |
| WEIGHTS_DIR_DEFAULT = "/data/weights" |
| EMBED_DIR_DEFAULT = "/data/embeddings" |
| DOWNLOAD_DIR_DEFAULT = "/data/downloads" |
|
|
|
|
| def _build_image(): |
| image = ( |
| modal.Image.debian_slim(python_version="3.10") |
| .apt_install("git", "wget", "curl") |
| .run_commands("pip install --upgrade pip setuptools") |
| ) |
|
|
| req_file_path = "requirements.txt" |
| if (PROJECT_ROOT / req_file_path).exists(): |
| image = image.add_local_file(req_file_path, "/tmp/requirements.txt", copy=True) |
| image = image.run_commands("pip install -r /tmp/requirements.txt") |
| else: |
| image = image.run_commands("pip install torch transformers datasets") |
|
|
| src_dir_path = "src" |
| if (PROJECT_ROOT / src_dir_path).exists(): |
| image = image.add_local_dir(src_dir_path, "/root/src", copy=True) |
|
|
| readme_file_path = "README.md" |
| if (PROJECT_ROOT / readme_file_path).exists(): |
| image = image.add_local_file(readme_file_path, "/root/README.md", copy=True) |
|
|
| image = image.env( |
| { |
| "TF_CPP_MIN_LOG_LEVEL": "2", |
| "TF_ENABLE_ONEDNN_OPTS": "0", |
| "TOKENIZERS_PARALLELISM": "true", |
| "CUBLAS_WORKSPACE_CONFIG": ":4096:8", |
| } |
| ) |
| return image |
|
|
|
|
| app = modal.App(APP_NAME) |
| image = _build_image() |
| volume = modal.Volume.from_name("protify-data", create_if_missing=True) |
|
|
| _status_lock = threading.Lock() |
|
|
|
|
| def _now_utc_iso() -> str: |
| return datetime.now(timezone.utc).isoformat() |
|
|
|
|
| def _generate_job_id() -> str: |
| random_letters = "".join(random.choices(string.ascii_uppercase, k=4)) |
| date_str = datetime.now().strftime("%Y-%m-%d-%H-%M") |
| return f"{date_str}_{random_letters}" |
|
|
|
|
| def _safe_read_json(json_path: str) -> Dict[str, Any]: |
| if not os.path.exists(json_path): |
| return {} |
| try: |
| with open(json_path, "r", encoding="utf-8") as file: |
| return json.load(file) |
| except Exception: |
| return {} |
|
|
|
|
| def _safe_write_json(json_path: str, payload: Dict[str, Any]) -> None: |
| os.makedirs(os.path.dirname(json_path), exist_ok=True) |
| with open(json_path, "w", encoding="utf-8") as file: |
| json.dump(payload, file, indent=2) |
|
|
|
|
| def _update_job_status(job_id: str, patch: Dict[str, Any]) -> Dict[str, Any]: |
| with _status_lock: |
| status_store = _safe_read_json(STATUS_FILE_PATH) |
| if job_id not in status_store: |
| status_store[job_id] = { |
| "job_id": job_id, |
| "status": "PENDING", |
| "phase": "created", |
| "created_at_utc": _now_utc_iso(), |
| "updated_at_utc": _now_utc_iso(), |
| } |
| status_store[job_id].update(patch) |
| status_store[job_id]["updated_at_utc"] = _now_utc_iso() |
| _safe_write_json(STATUS_FILE_PATH, status_store) |
| volume.commit() |
| return status_store[job_id] |
|
|
|
|
| def _infer_phase_from_line(line: str, current_phase: str) -> str: |
| lowered = line.lower() |
| if "loading and preparing datasets" in lowered or "getting data" in lowered: |
| return "data_loading" |
| if "computing embeddings" in lowered or "saving embeddings" in lowered or "download embeddings" in lowered: |
| return "embedding" |
| if "starting training" in lowered or "training probe" in lowered or "run_wandb_hyperopt" in lowered: |
| return "training" |
| if "proteingym" in lowered: |
| return "proteingym" |
| if "generating visualization plots" in lowered: |
| return "plotting" |
| if "successfully saved model to huggingface hub" in lowered: |
| return "pushing_to_hub" |
| return current_phase |
|
|
|
|
| def _tail_text(text: str, max_chars: int) -> str: |
| if len(text) <= max_chars: |
| return text |
| return text[-max_chars:] |
|
|
|
|
| def _fix_paths(config_obj: Any) -> Any: |
| if isinstance(config_obj, dict): |
| for key in list(config_obj.keys()): |
| value = config_obj[key] |
| if isinstance(value, str): |
| if value.startswith("data/") or value.startswith("local_data/"): |
| config_obj[key] = f"/data/{value.split('/', 1)[1]}" |
| elif key.endswith("_dir") and (not os.path.isabs(value)): |
| config_obj[key] = f"/data/{value}" |
| elif isinstance(value, list): |
| config_obj[key] = [_fix_paths(item) for item in value] |
| elif isinstance(value, dict): |
| config_obj[key] = _fix_paths(value) |
| elif isinstance(config_obj, list): |
| return [_fix_paths(item) for item in config_obj] |
| return config_obj |
|
|
|
|
| def _prepare_config(config: Dict[str, Any]) -> Dict[str, Any]: |
| config_copy = dict(config) |
| config_copy = _fix_paths(config_copy) |
|
|
| if ("log_dir" not in config_copy) or (not config_copy["log_dir"]): |
| config_copy["log_dir"] = LOG_DIR_DEFAULT |
| if ("results_dir" not in config_copy) or (not config_copy["results_dir"]): |
| config_copy["results_dir"] = RESULTS_DIR_DEFAULT |
| if ("model_save_dir" not in config_copy) or (not config_copy["model_save_dir"]): |
| config_copy["model_save_dir"] = WEIGHTS_DIR_DEFAULT |
| if ("embedding_save_dir" not in config_copy) or (not config_copy["embedding_save_dir"]): |
| config_copy["embedding_save_dir"] = EMBED_DIR_DEFAULT |
| if ("plots_dir" not in config_copy) or (not config_copy["plots_dir"]): |
| config_copy["plots_dir"] = PLOTS_DIR_DEFAULT |
| if ("download_dir" not in config_copy) or (not config_copy["download_dir"]): |
| config_copy["download_dir"] = DOWNLOAD_DIR_DEFAULT |
| if "replay_path" not in config_copy: |
| config_copy["replay_path"] = None |
| if "pretrained_probe_path" not in config_copy: |
| config_copy["pretrained_probe_path"] = None |
| if "hf_home" not in config_copy: |
| config_copy["hf_home"] = None |
|
|
| path_keys = ["log_dir", "results_dir", "model_save_dir", "embedding_save_dir", "plots_dir", "download_dir"] |
| for path_key in path_keys: |
| os.makedirs(config_copy[path_key], exist_ok=True) |
|
|
| return config_copy |
|
|
|
|
| def _execute_protify_job( |
| config: Dict[str, Any], |
| hf_token: Optional[str] = None, |
| wandb_api_key: Optional[str] = None, |
| synthyra_api_key: Optional[str] = None, |
| job_id: Optional[str] = None, |
| gpu_type: Optional[str] = None, |
| timeout_seconds: int = TIMEOUT_SECONDS, |
| ) -> Dict[str, Any]: |
| if job_id is None: |
| job_id = _generate_job_id() |
|
|
| selected_gpu = gpu_type if gpu_type in AVAILABLE_GPUS else PROTIFY_DEFAULT_GPU |
| _update_job_status( |
| job_id, |
| { |
| "status": "RUNNING", |
| "phase": "startup", |
| "gpu_type": selected_gpu, |
| "last_heartbeat_utc": _now_utc_iso(), |
| "started_at_utc": _now_utc_iso(), |
| "error": None, |
| }, |
| ) |
|
|
| active_hf_token = hf_token |
| if active_hf_token is None: |
| active_hf_token = os.environ.get("HF_TOKEN") |
|
|
| if active_hf_token is not None: |
| try: |
| from huggingface_hub import login |
|
|
| os.environ["HF_TOKEN"] = active_hf_token |
| login(active_hf_token) |
| except Exception: |
| pass |
|
|
| prepared_config = _prepare_config(config) |
| log_file_path = os.path.join(prepared_config["log_dir"], f"{job_id}.txt") |
| _update_job_status( |
| job_id, |
| { |
| "log_file_path": log_file_path, |
| "results_dir": prepared_config["results_dir"], |
| "plots_dir": prepared_config["plots_dir"], |
| }, |
| ) |
|
|
| run_dir = Path("/tmp/protify_run") / job_id |
| run_dir.mkdir(parents=True, exist_ok=True) |
| config_path = run_dir / "config.yaml" |
|
|
| config_to_dump = dict(prepared_config) |
| config_to_dump["hf_token"] = None |
| config_to_dump["wandb_api_key"] = None |
| config_to_dump["synthyra_api_key"] = None |
| with open(config_path, "w", encoding="utf-8") as config_file: |
| yaml.dump(config_to_dump, config_file, default_flow_style=False, allow_unicode=True, sort_keys=False) |
|
|
| command = ["python", "-u", "main.py", "--yaml_path", str(config_path)] |
| if active_hf_token is not None: |
| command.extend(["--hf_token", active_hf_token]) |
| if wandb_api_key is not None: |
| command.extend(["--wandb_api_key", wandb_api_key]) |
| if synthyra_api_key is not None: |
| command.extend(["--synthyra_api_key", synthyra_api_key]) |
|
|
| process_env = os.environ.copy() |
| process_env["PYTHONPATH"] = "/root/src" |
| process_env["WORKING_DIR"] = "/root" |
| process_env["PYTHONUNBUFFERED"] = "1" |
| process_env["PROTIFY_JOB_ID"] = job_id |
| process_env["CUDA_VISIBLE_DEVICES"] = "0" |
| if active_hf_token is not None: |
| process_env["HF_TOKEN"] = active_hf_token |
| if wandb_api_key is not None: |
| process_env["WANDB_API_KEY"] = wandb_api_key |
|
|
| os.makedirs(os.path.dirname(log_file_path), exist_ok=True) |
| with open(log_file_path, "w", encoding="utf-8") as log_file: |
| log_file.write(f"[{_now_utc_iso()}] Starting job {job_id}\n") |
| log_file.write(f"GPU={selected_gpu}\n") |
| log_file.write(f"Command={' '.join(command)}\n") |
| volume.commit() |
|
|
| stdout_lines = [] |
| stderr_lines = [] |
| log_lock = threading.Lock() |
| phase_state = {"phase": "startup"} |
|
|
| def append_log(log_line: str) -> None: |
| with log_lock: |
| with open(log_file_path, "a", encoding="utf-8", errors="ignore") as log_file: |
| log_file.write(log_line + "\n") |
|
|
| def stream_output(pipe, output_list, prefix: str = ""): |
| try: |
| for line in iter(pipe.readline, ""): |
| if not line: |
| continue |
| clean_line = line.rstrip("\n") |
| full_line = f"{prefix}{clean_line}" |
| output_list.append(full_line) |
| phase_state["phase"] = _infer_phase_from_line(clean_line, phase_state["phase"]) |
| append_log(full_line) |
| print(full_line, flush=True) |
| finally: |
| pipe.close() |
|
|
| timed_out = False |
| process = None |
| try: |
| process = subprocess.Popen( |
| command, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| text=True, |
| bufsize=1, |
| cwd="/root/src/protify", |
| env=process_env, |
| ) |
|
|
| stdout_thread = threading.Thread(target=stream_output, args=(process.stdout, stdout_lines, ""), daemon=True) |
| stderr_thread = threading.Thread(target=stream_output, args=(process.stderr, stderr_lines, "[STDERR] "), daemon=True) |
| stdout_thread.start() |
| stderr_thread.start() |
|
|
| max_runtime_seconds = timeout_seconds |
| if max_runtime_seconds > TIMEOUT_SECONDS - 60: |
| max_runtime_seconds = TIMEOUT_SECONDS - 60 |
|
|
| start_time = time.time() |
| last_heartbeat = 0.0 |
| while process.poll() is None: |
| now = time.time() |
| if now - start_time > max_runtime_seconds: |
| timed_out = True |
| process.kill() |
| break |
| if now - last_heartbeat >= HEARTBEAT_SECONDS: |
| _update_job_status( |
| job_id, |
| { |
| "status": "RUNNING", |
| "phase": phase_state["phase"], |
| "last_heartbeat_utc": _now_utc_iso(), |
| }, |
| ) |
| last_heartbeat = now |
| time.sleep(1) |
|
|
| stdout_thread.join(timeout=5) |
| stderr_thread.join(timeout=5) |
|
|
| return_code = process.returncode if process is not None else -1 |
| stdout_text = "\n".join(stdout_lines) |
| stderr_text = "\n".join(stderr_lines) |
|
|
| if timed_out: |
| _update_job_status( |
| job_id, |
| { |
| "status": "TIMEOUT", |
| "phase": "timeout", |
| "last_heartbeat_utc": _now_utc_iso(), |
| "error": f"Process timed out after {max_runtime_seconds} seconds.", |
| "exit_code": -1, |
| "finished_at_utc": _now_utc_iso(), |
| }, |
| ) |
| return { |
| "success": False, |
| "job_id": job_id, |
| "status": "TIMEOUT", |
| "error": f"Process timed out after {max_runtime_seconds} seconds.", |
| "stdout": _tail_text(stdout_text, 5000), |
| } |
|
|
| if return_code != 0: |
| _update_job_status( |
| job_id, |
| { |
| "status": "FAILED", |
| "phase": "failed", |
| "last_heartbeat_utc": _now_utc_iso(), |
| "error": _tail_text(stderr_text, 5000) if stderr_text else "Unknown subprocess error.", |
| "exit_code": return_code, |
| "finished_at_utc": _now_utc_iso(), |
| }, |
| ) |
| return { |
| "success": False, |
| "job_id": job_id, |
| "status": "FAILED", |
| "error": _tail_text(stderr_text, 5000) if stderr_text else "Unknown subprocess error.", |
| "stdout": _tail_text(stdout_text, 5000), |
| } |
|
|
| _update_job_status( |
| job_id, |
| { |
| "status": "SUCCESS", |
| "phase": "completed", |
| "last_heartbeat_utc": _now_utc_iso(), |
| "error": None, |
| "exit_code": return_code, |
| "finished_at_utc": _now_utc_iso(), |
| }, |
| ) |
| return { |
| "success": True, |
| "job_id": job_id, |
| "status": "SUCCESS", |
| "stdout": _tail_text(stdout_text, 5000), |
| } |
| except Exception as error: |
| _update_job_status( |
| job_id, |
| { |
| "status": "FAILED", |
| "phase": "exception", |
| "last_heartbeat_utc": _now_utc_iso(), |
| "error": str(error), |
| "exit_code": -1, |
| "finished_at_utc": _now_utc_iso(), |
| }, |
| ) |
| return { |
| "success": False, |
| "job_id": job_id, |
| "status": "FAILED", |
| "error": str(error), |
| "stdout": "", |
| } |
|
|
|
|
| @app.function( |
| image=image, |
| gpu="H200", |
| volumes={"/data": volume}, |
| memory=(GPU_MEMORY_MIN, GPU_MEMORY_MAX), |
| cpu=(GPU_CPU_MIN, GPU_CPU_MAX), |
| max_containers=MAX_CONTAINERS_GPU, |
| scaledown_window=SCALEDOWN_WINDOW_GPU, |
| timeout=TIMEOUT_SECONDS, |
| ) |
| def run_protify_job_h200( |
| config: Dict[str, Any], |
| hf_token: Optional[str] = None, |
| wandb_api_key: Optional[str] = None, |
| synthyra_api_key: Optional[str] = None, |
| job_id: Optional[str] = None, |
| gpu_type: Optional[str] = None, |
| timeout_seconds: int = TIMEOUT_SECONDS, |
| ) -> Dict[str, Any]: |
| return _execute_protify_job(config, hf_token, wandb_api_key, synthyra_api_key, job_id, gpu_type, timeout_seconds) |
|
|
|
|
| @app.function( |
| image=image, |
| gpu="H100", |
| volumes={"/data": volume}, |
| memory=(GPU_MEMORY_MIN, GPU_MEMORY_MAX), |
| cpu=(GPU_CPU_MIN, GPU_CPU_MAX), |
| max_containers=MAX_CONTAINERS_GPU, |
| scaledown_window=SCALEDOWN_WINDOW_GPU, |
| timeout=TIMEOUT_SECONDS, |
| ) |
| def run_protify_job_h100( |
| config: Dict[str, Any], |
| hf_token: Optional[str] = None, |
| wandb_api_key: Optional[str] = None, |
| synthyra_api_key: Optional[str] = None, |
| job_id: Optional[str] = None, |
| gpu_type: Optional[str] = None, |
| timeout_seconds: int = TIMEOUT_SECONDS, |
| ) -> Dict[str, Any]: |
| return _execute_protify_job(config, hf_token, wandb_api_key, synthyra_api_key, job_id, gpu_type, timeout_seconds) |
|
|
|
|
| @app.function( |
| image=image, |
| gpu="A100-80GB", |
| volumes={"/data": volume}, |
| memory=(GPU_MEMORY_MIN, GPU_MEMORY_MAX), |
| cpu=(GPU_CPU_MIN, GPU_CPU_MAX), |
| max_containers=MAX_CONTAINERS_GPU, |
| scaledown_window=SCALEDOWN_WINDOW_GPU, |
| timeout=TIMEOUT_SECONDS, |
| ) |
| def run_protify_job_a100_80gb( |
| config: Dict[str, Any], |
| hf_token: Optional[str] = None, |
| wandb_api_key: Optional[str] = None, |
| synthyra_api_key: Optional[str] = None, |
| job_id: Optional[str] = None, |
| gpu_type: Optional[str] = None, |
| timeout_seconds: int = TIMEOUT_SECONDS, |
| ) -> Dict[str, Any]: |
| return _execute_protify_job(config, hf_token, wandb_api_key, synthyra_api_key, job_id, gpu_type, timeout_seconds) |
|
|
|
|
| @app.function( |
| image=image, |
| gpu="A100", |
| volumes={"/data": volume}, |
| memory=(GPU_MEMORY_MIN, GPU_MEMORY_MAX), |
| cpu=(GPU_CPU_MIN, GPU_CPU_MAX), |
| max_containers=MAX_CONTAINERS_GPU, |
| scaledown_window=SCALEDOWN_WINDOW_GPU, |
| timeout=TIMEOUT_SECONDS, |
| ) |
| def run_protify_job_a100( |
| config: Dict[str, Any], |
| hf_token: Optional[str] = None, |
| wandb_api_key: Optional[str] = None, |
| synthyra_api_key: Optional[str] = None, |
| job_id: Optional[str] = None, |
| gpu_type: Optional[str] = None, |
| timeout_seconds: int = TIMEOUT_SECONDS, |
| ) -> Dict[str, Any]: |
| return _execute_protify_job(config, hf_token, wandb_api_key, synthyra_api_key, job_id, gpu_type, timeout_seconds) |
|
|
|
|
| @app.function( |
| image=image, |
| gpu="L40S", |
| volumes={"/data": volume}, |
| memory=(GPU_MEMORY_MIN, GPU_MEMORY_MAX), |
| cpu=(GPU_CPU_MIN, GPU_CPU_MAX), |
| max_containers=MAX_CONTAINERS_GPU, |
| scaledown_window=SCALEDOWN_WINDOW_GPU, |
| timeout=TIMEOUT_SECONDS, |
| ) |
| def run_protify_job_l40s( |
| config: Dict[str, Any], |
| hf_token: Optional[str] = None, |
| wandb_api_key: Optional[str] = None, |
| synthyra_api_key: Optional[str] = None, |
| job_id: Optional[str] = None, |
| gpu_type: Optional[str] = None, |
| timeout_seconds: int = TIMEOUT_SECONDS, |
| ) -> Dict[str, Any]: |
| return _execute_protify_job(config, hf_token, wandb_api_key, synthyra_api_key, job_id, gpu_type, timeout_seconds) |
|
|
|
|
| @app.function( |
| image=image, |
| gpu="A10", |
| volumes={"/data": volume}, |
| memory=(GPU_MEMORY_MIN, GPU_MEMORY_MAX), |
| cpu=(GPU_CPU_MIN, GPU_CPU_MAX), |
| max_containers=MAX_CONTAINERS_GPU, |
| scaledown_window=SCALEDOWN_WINDOW_GPU, |
| timeout=TIMEOUT_SECONDS, |
| ) |
| def run_protify_job_a10( |
| config: Dict[str, Any], |
| hf_token: Optional[str] = None, |
| wandb_api_key: Optional[str] = None, |
| synthyra_api_key: Optional[str] = None, |
| job_id: Optional[str] = None, |
| gpu_type: Optional[str] = None, |
| timeout_seconds: int = TIMEOUT_SECONDS, |
| ) -> Dict[str, Any]: |
| return _execute_protify_job(config, hf_token, wandb_api_key, synthyra_api_key, job_id, gpu_type, timeout_seconds) |
|
|
|
|
| @app.function( |
| image=image, |
| gpu="L4", |
| volumes={"/data": volume}, |
| memory=(GPU_MEMORY_MIN, GPU_MEMORY_MAX), |
| cpu=(GPU_CPU_MIN, GPU_CPU_MAX), |
| max_containers=MAX_CONTAINERS_GPU, |
| scaledown_window=SCALEDOWN_WINDOW_GPU, |
| timeout=TIMEOUT_SECONDS, |
| ) |
| def run_protify_job_l4( |
| config: Dict[str, Any], |
| hf_token: Optional[str] = None, |
| wandb_api_key: Optional[str] = None, |
| synthyra_api_key: Optional[str] = None, |
| job_id: Optional[str] = None, |
| gpu_type: Optional[str] = None, |
| timeout_seconds: int = TIMEOUT_SECONDS, |
| ) -> Dict[str, Any]: |
| return _execute_protify_job(config, hf_token, wandb_api_key, synthyra_api_key, job_id, gpu_type, timeout_seconds) |
|
|
|
|
| @app.function( |
| image=image, |
| gpu="T4", |
| volumes={"/data": volume}, |
| memory=(GPU_MEMORY_MIN, GPU_MEMORY_MAX), |
| cpu=(GPU_CPU_MIN, GPU_CPU_MAX), |
| max_containers=MAX_CONTAINERS_GPU, |
| scaledown_window=SCALEDOWN_WINDOW_GPU, |
| timeout=TIMEOUT_SECONDS, |
| ) |
| def run_protify_job_t4( |
| config: Dict[str, Any], |
| hf_token: Optional[str] = None, |
| wandb_api_key: Optional[str] = None, |
| synthyra_api_key: Optional[str] = None, |
| job_id: Optional[str] = None, |
| gpu_type: Optional[str] = None, |
| timeout_seconds: int = TIMEOUT_SECONDS, |
| ) -> Dict[str, Any]: |
| return _execute_protify_job(config, hf_token, wandb_api_key, synthyra_api_key, job_id, gpu_type, timeout_seconds) |
|
|
|
|
| gpu_functions = { |
| "H200": run_protify_job_h200, |
| "H100": run_protify_job_h100, |
| "A100-80GB": run_protify_job_a100_80gb, |
| "A100": run_protify_job_a100, |
| "L40S": run_protify_job_l40s, |
| "A10": run_protify_job_a10, |
| "L4": run_protify_job_l4, |
| "T4": run_protify_job_t4, |
| } |
|
|
|
|
| @app.function( |
| image=image, |
| volumes={"/data": volume}, |
| memory=(CPU_MEMORY_MIN, CPU_MEMORY_MAX), |
| cpu=(CPU_COUNT_MIN, CPU_COUNT_MAX), |
| max_containers=MAX_CONTAINERS_CPU, |
| scaledown_window=SCALEDOWN_WINDOW_CPU, |
| ) |
| def submit_protify_job( |
| config: Dict[str, Any], |
| gpu_type: str = PROTIFY_DEFAULT_GPU, |
| hf_token: Optional[str] = None, |
| wandb_api_key: Optional[str] = None, |
| synthyra_api_key: Optional[str] = None, |
| timeout_seconds: int = TIMEOUT_SECONDS, |
| job_id: Optional[str] = None, |
| ) -> Dict[str, Any]: |
| if job_id is None: |
| job_id = _generate_job_id() |
|
|
| selected_gpu = gpu_type if gpu_type in AVAILABLE_GPUS else PROTIFY_DEFAULT_GPU |
| _update_job_status( |
| job_id, |
| { |
| "status": "PENDING", |
| "phase": "queued", |
| "gpu_type": selected_gpu, |
| "last_heartbeat_utc": _now_utc_iso(), |
| "error": None, |
| }, |
| ) |
|
|
| selected_gpu_function = gpu_functions[selected_gpu] |
| handle = selected_gpu_function.spawn( |
| config=config, |
| hf_token=hf_token, |
| wandb_api_key=wandb_api_key, |
| synthyra_api_key=synthyra_api_key, |
| job_id=job_id, |
| gpu_type=selected_gpu, |
| timeout_seconds=timeout_seconds, |
| ) |
| function_call_id = handle.object_id |
| _update_job_status( |
| job_id, |
| { |
| "status": "RUNNING", |
| "phase": "queued", |
| "function_call_id": function_call_id, |
| "last_heartbeat_utc": _now_utc_iso(), |
| }, |
| ) |
| return { |
| "success": True, |
| "job_id": job_id, |
| "function_call_id": function_call_id, |
| "status": "RUNNING", |
| "gpu_type": selected_gpu, |
| } |
|
|
|
|
| @app.function( |
| image=image, |
| volumes={"/data": volume}, |
| memory=(CPU_MEMORY_MIN, CPU_MEMORY_MAX), |
| cpu=(CPU_COUNT_MIN, CPU_COUNT_MAX), |
| max_containers=MAX_CONTAINERS_CPU, |
| scaledown_window=SCALEDOWN_WINDOW_CPU, |
| ) |
| def get_job_status(job_id: str) -> Dict[str, Any]: |
| volume.reload() |
| status_store = _safe_read_json(STATUS_FILE_PATH) |
| if job_id not in status_store: |
| return {"success": False, "job_id": job_id, "error": "Job ID not found."} |
|
|
| job_status = status_store[job_id] |
| heartbeat_age_seconds = None |
| if "last_heartbeat_utc" in job_status and job_status["last_heartbeat_utc"]: |
| try: |
| heartbeat_time = datetime.fromisoformat(job_status["last_heartbeat_utc"]) |
| heartbeat_age_seconds = (datetime.now(timezone.utc) - heartbeat_time).total_seconds() |
| except Exception: |
| heartbeat_age_seconds = None |
| job_status["heartbeat_age_seconds"] = heartbeat_age_seconds |
| job_status["success"] = True |
| return job_status |
|
|
|
|
| @app.function( |
| image=image, |
| volumes={"/data": volume}, |
| memory=(CPU_MEMORY_MIN, CPU_MEMORY_MAX), |
| cpu=(CPU_COUNT_MIN, CPU_COUNT_MAX), |
| max_containers=MAX_CONTAINERS_CPU, |
| scaledown_window=SCALEDOWN_WINDOW_CPU, |
| ) |
| def get_job_log_tail(job_id: str, max_chars: int = 5000) -> Dict[str, Any]: |
| volume.reload() |
| status_store = _safe_read_json(STATUS_FILE_PATH) |
| status_entry = status_store[job_id] if job_id in status_store else None |
| if status_entry is not None and "log_file_path" in status_entry and status_entry["log_file_path"]: |
| log_file_path = status_entry["log_file_path"] |
| else: |
| log_file_path = os.path.join(LOG_DIR_DEFAULT, f"{job_id}.txt") |
|
|
| if not os.path.exists(log_file_path): |
| if status_entry is None: |
| return {"success": False, "job_id": job_id, "error": "Job ID not found.", "log_tail": ""} |
| return {"success": True, "job_id": job_id, "log_tail": ""} |
|
|
| with open(log_file_path, "r", encoding="utf-8", errors="ignore") as log_file: |
| text = log_file.read() |
| return {"success": True, "job_id": job_id, "log_tail": _tail_text(text, max_chars)} |
|
|
|
|
| @app.function( |
| image=image, |
| volumes={"/data": volume}, |
| memory=(CPU_MEMORY_MIN, CPU_MEMORY_MAX), |
| cpu=(CPU_COUNT_MIN, CPU_COUNT_MAX), |
| max_containers=MAX_CONTAINERS_CPU, |
| scaledown_window=SCALEDOWN_WINDOW_CPU, |
| ) |
| def get_job_log_delta(job_id: str, offset: int = 0, max_chars: int = 5000) -> Dict[str, Any]: |
| volume.reload() |
| if offset < 0: |
| offset = 0 |
| if max_chars <= 0: |
| max_chars = 1 |
|
|
| status_store = _safe_read_json(STATUS_FILE_PATH) |
| status_entry = status_store[job_id] if job_id in status_store else None |
| if status_entry is not None and "log_file_path" in status_entry and status_entry["log_file_path"]: |
| log_file_path = status_entry["log_file_path"] |
| else: |
| log_file_path = os.path.join(LOG_DIR_DEFAULT, f"{job_id}.txt") |
|
|
| if not os.path.exists(log_file_path): |
| return { |
| "success": True, |
| "job_id": job_id, |
| "file_exists": False, |
| "chunk": "", |
| "next_offset": offset, |
| "file_size": 0, |
| } |
|
|
| with open(log_file_path, "r", encoding="utf-8", errors="ignore") as log_file: |
| text = log_file.read() |
| file_size = len(text) |
| if offset > file_size: |
| offset = file_size |
| end_offset = offset + max_chars |
| if end_offset > file_size: |
| end_offset = file_size |
| chunk = text[offset:end_offset] |
| return { |
| "success": True, |
| "job_id": job_id, |
| "file_exists": True, |
| "chunk": chunk, |
| "next_offset": end_offset, |
| "file_size": file_size, |
| } |
|
|
|
|
| @app.function( |
| image=image, |
| volumes={"/data": volume}, |
| memory=(CPU_MEMORY_MIN, CPU_MEMORY_MAX), |
| cpu=(CPU_COUNT_MIN, CPU_COUNT_MAX), |
| max_containers=MAX_CONTAINERS_CPU, |
| scaledown_window=SCALEDOWN_WINDOW_CPU, |
| ) |
| def delete_modal_embeddings() -> Dict[str, Any]: |
| volume.reload() |
| embedding_dir = Path(EMBED_DIR_DEFAULT) |
| if not embedding_dir.exists(): |
| return { |
| "success": True, |
| "message": f"Embedding directory does not exist: {EMBED_DIR_DEFAULT}", |
| "deleted_files": 0, |
| "deleted_dirs": 0, |
| } |
|
|
| deleted_files = 0 |
| deleted_dirs = 0 |
| for path in embedding_dir.glob("*"): |
| if path.is_file(): |
| path.unlink() |
| deleted_files += 1 |
| elif path.is_dir(): |
| shutil.rmtree(path) |
| deleted_dirs += 1 |
|
|
| volume.commit() |
| return { |
| "success": True, |
| "message": f"Deleted modal embedding cache contents ({deleted_files} files, {deleted_dirs} directories).", |
| "deleted_files": deleted_files, |
| "deleted_dirs": deleted_dirs, |
| } |
|
|
|
|
| @app.function( |
| image=image, |
| volumes={"/data": volume}, |
| memory=(CPU_MEMORY_MIN, CPU_MEMORY_MAX), |
| cpu=(CPU_COUNT_MIN, CPU_COUNT_MAX), |
| max_containers=MAX_CONTAINERS_CPU, |
| scaledown_window=SCALEDOWN_WINDOW_CPU, |
| ) |
| def cancel_protify_job(function_call_id: str, job_id: Optional[str] = None) -> Dict[str, Any]: |
| function_call = modal.FunctionCall.from_id(function_call_id) |
| function_call.cancel() |
| if job_id is not None: |
| _update_job_status( |
| job_id, |
| { |
| "status": "TERMINATED", |
| "phase": "cancelled", |
| "last_heartbeat_utc": _now_utc_iso(), |
| "finished_at_utc": _now_utc_iso(), |
| }, |
| ) |
| return {"success": True, "function_call_id": function_call_id, "job_id": job_id} |
|
|
|
|
| @app.function( |
| image=image, |
| volumes={"/data": volume}, |
| memory=(CPU_MEMORY_MIN, CPU_MEMORY_MAX), |
| cpu=(CPU_COUNT_MIN, CPU_COUNT_MAX), |
| max_containers=MAX_CONTAINERS_CPU, |
| scaledown_window=SCALEDOWN_WINDOW_CPU, |
| ) |
| def get_results(job_id: str) -> Dict[str, Any]: |
| volume.reload() |
| image_extensions = {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".svg", ".webp"} |
| results = {"success": True, "files": {}, "images": {}} |
|
|
| results_dir = Path(RESULTS_DIR_DEFAULT) |
| plots_dir = Path(PLOTS_DIR_DEFAULT) |
| logs_dir = Path(LOG_DIR_DEFAULT) |
|
|
| collected_files = set() |
| result_file = results_dir / f"{job_id}.tsv" |
| if result_file.exists(): |
| collected_files.add(result_file) |
| log_file = logs_dir / f"{job_id}.txt" |
| if log_file.exists(): |
| collected_files.add(log_file) |
| plot_dir = plots_dir / job_id |
| if plot_dir.exists() and plot_dir.is_dir(): |
| for file_path in plot_dir.rglob("*"): |
| if file_path.is_file(): |
| collected_files.add(file_path) |
|
|
| for file_path in collected_files: |
| relative_path = str(file_path.relative_to(Path("/data"))) |
| suffix = file_path.suffix.lower() |
| try: |
| if suffix in image_extensions: |
| with open(file_path, "rb") as image_file: |
| encoded = base64.b64encode(image_file.read()).decode("utf-8") |
| mime_type = f"image/{suffix[1:]}" if suffix != ".svg" else "image/svg+xml" |
| results["images"][relative_path] = {"data": encoded, "mime_type": mime_type} |
| else: |
| with open(file_path, "r", encoding="utf-8", errors="ignore") as text_file: |
| results["files"][relative_path] = text_file.read() |
| except Exception as error: |
| if suffix in image_extensions: |
| results["images"][relative_path] = {"error": str(error)} |
| else: |
| results["files"][relative_path] = f"Error reading file: {error}" |
|
|
| return results |
|
|
|
|
| @app.function( |
| image=image, |
| volumes={"/data": volume}, |
| memory=(CPU_MEMORY_MIN, CPU_MEMORY_MAX), |
| cpu=(CPU_COUNT_MIN, CPU_COUNT_MAX), |
| max_containers=MAX_CONTAINERS_CPU, |
| scaledown_window=SCALEDOWN_WINDOW_CPU, |
| ) |
| def list_jobs() -> Dict[str, Any]: |
| volume.reload() |
| status_store = _safe_read_json(STATUS_FILE_PATH) |
| jobs = [] |
| for job_id in status_store: |
| jobs.append(status_store[job_id]) |
| jobs.sort(key=lambda item: item["job_id"], reverse=True) |
| return {"success": True, "jobs": jobs} |
|
|
|
|
| if __name__ == "__main__": |
| with app.run(): |
| pass |
|
|