""" Backend-only Modal app for GUI-driven Protify workflows. This module intentionally avoids browser UI dependencies. It exposes remote functions that the local Tk GUI can call to deploy, submit jobs, monitor status, cancel jobs, and fetch artifacts. """ import base64 import json import os import random import shutil import string import subprocess import threading import time from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Optional import modal import yaml SCRIPT_DIR = Path(__file__).parent.resolve() PROJECT_ROOT = SCRIPT_DIR.parents[1] APP_NAME = "protify-backend" PROTIFY_DEFAULT_GPU = "A10" AVAILABLE_GPUS = ["H200", "H100", "A100-80GB", "A100", "L40S", "A10", "L4", "T4"] GPU_CPU_MIN, GPU_CPU_MAX = 8.0, 16.0 GPU_MEMORY_MIN, GPU_MEMORY_MAX = 65536, 262144 MAX_CONTAINERS_GPU = 8 CPU_MEMORY_MIN, CPU_MEMORY_MAX = 4096, 8192 CPU_COUNT_MIN, CPU_COUNT_MAX = 2.0, 4.0 MAX_CONTAINERS_CPU = 10 SCALEDOWN_WINDOW_GPU = 10 SCALEDOWN_WINDOW_CPU = 300 TIMEOUT_SECONDS = 86400 HEARTBEAT_SECONDS = 10 STATUS_FILE_PATH = "/data/job_status.json" LOG_DIR_DEFAULT = "/data/logs" RESULTS_DIR_DEFAULT = "/data/results" PLOTS_DIR_DEFAULT = "/data/plots" WEIGHTS_DIR_DEFAULT = "/data/weights" EMBED_DIR_DEFAULT = "/data/embeddings" DOWNLOAD_DIR_DEFAULT = "/data/downloads" def _build_image(): image = ( modal.Image.debian_slim(python_version="3.10") .apt_install("git", "wget", "curl") .run_commands("pip install --upgrade pip setuptools") ) req_file_path = "requirements.txt" if (PROJECT_ROOT / req_file_path).exists(): image = image.add_local_file(req_file_path, "/tmp/requirements.txt", copy=True) image = image.run_commands("pip install -r /tmp/requirements.txt") else: image = image.run_commands("pip install torch transformers datasets") src_dir_path = "src" if (PROJECT_ROOT / src_dir_path).exists(): image = image.add_local_dir(src_dir_path, "/root/src", copy=True) readme_file_path = "README.md" if (PROJECT_ROOT / readme_file_path).exists(): image = image.add_local_file(readme_file_path, "/root/README.md", copy=True) image = image.env( { "TF_CPP_MIN_LOG_LEVEL": "2", "TF_ENABLE_ONEDNN_OPTS": "0", "TOKENIZERS_PARALLELISM": "true", "CUBLAS_WORKSPACE_CONFIG": ":4096:8", } ) return image app = modal.App(APP_NAME) image = _build_image() volume = modal.Volume.from_name("protify-data", create_if_missing=True) _status_lock = threading.Lock() def _now_utc_iso() -> str: return datetime.now(timezone.utc).isoformat() def _generate_job_id() -> str: random_letters = "".join(random.choices(string.ascii_uppercase, k=4)) date_str = datetime.now().strftime("%Y-%m-%d-%H-%M") return f"{date_str}_{random_letters}" def _safe_read_json(json_path: str) -> Dict[str, Any]: if not os.path.exists(json_path): return {} try: with open(json_path, "r", encoding="utf-8") as file: return json.load(file) except Exception: return {} def _safe_write_json(json_path: str, payload: Dict[str, Any]) -> None: os.makedirs(os.path.dirname(json_path), exist_ok=True) with open(json_path, "w", encoding="utf-8") as file: json.dump(payload, file, indent=2) def _update_job_status(job_id: str, patch: Dict[str, Any]) -> Dict[str, Any]: with _status_lock: status_store = _safe_read_json(STATUS_FILE_PATH) if job_id not in status_store: status_store[job_id] = { "job_id": job_id, "status": "PENDING", "phase": "created", "created_at_utc": _now_utc_iso(), "updated_at_utc": _now_utc_iso(), } status_store[job_id].update(patch) status_store[job_id]["updated_at_utc"] = _now_utc_iso() _safe_write_json(STATUS_FILE_PATH, status_store) volume.commit() return status_store[job_id] def _infer_phase_from_line(line: str, current_phase: str) -> str: lowered = line.lower() if "loading and preparing datasets" in lowered or "getting data" in lowered: return "data_loading" if "computing embeddings" in lowered or "saving embeddings" in lowered or "download embeddings" in lowered: return "embedding" if "starting training" in lowered or "training probe" in lowered or "run_wandb_hyperopt" in lowered: return "training" if "proteingym" in lowered: return "proteingym" if "generating visualization plots" in lowered: return "plotting" if "successfully saved model to huggingface hub" in lowered: return "pushing_to_hub" return current_phase def _tail_text(text: str, max_chars: int) -> str: if len(text) <= max_chars: return text return text[-max_chars:] def _fix_paths(config_obj: Any) -> Any: if isinstance(config_obj, dict): for key in list(config_obj.keys()): value = config_obj[key] if isinstance(value, str): if value.startswith("data/") or value.startswith("local_data/"): config_obj[key] = f"/data/{value.split('/', 1)[1]}" elif key.endswith("_dir") and (not os.path.isabs(value)): config_obj[key] = f"/data/{value}" elif isinstance(value, list): config_obj[key] = [_fix_paths(item) for item in value] elif isinstance(value, dict): config_obj[key] = _fix_paths(value) elif isinstance(config_obj, list): return [_fix_paths(item) for item in config_obj] return config_obj def _prepare_config(config: Dict[str, Any]) -> Dict[str, Any]: config_copy = dict(config) config_copy = _fix_paths(config_copy) if ("log_dir" not in config_copy) or (not config_copy["log_dir"]): config_copy["log_dir"] = LOG_DIR_DEFAULT if ("results_dir" not in config_copy) or (not config_copy["results_dir"]): config_copy["results_dir"] = RESULTS_DIR_DEFAULT if ("model_save_dir" not in config_copy) or (not config_copy["model_save_dir"]): config_copy["model_save_dir"] = WEIGHTS_DIR_DEFAULT if ("embedding_save_dir" not in config_copy) or (not config_copy["embedding_save_dir"]): config_copy["embedding_save_dir"] = EMBED_DIR_DEFAULT if ("plots_dir" not in config_copy) or (not config_copy["plots_dir"]): config_copy["plots_dir"] = PLOTS_DIR_DEFAULT if ("download_dir" not in config_copy) or (not config_copy["download_dir"]): config_copy["download_dir"] = DOWNLOAD_DIR_DEFAULT if "replay_path" not in config_copy: config_copy["replay_path"] = None if "pretrained_probe_path" not in config_copy: config_copy["pretrained_probe_path"] = None if "hf_home" not in config_copy: config_copy["hf_home"] = None path_keys = ["log_dir", "results_dir", "model_save_dir", "embedding_save_dir", "plots_dir", "download_dir"] for path_key in path_keys: os.makedirs(config_copy[path_key], exist_ok=True) return config_copy def _execute_protify_job( config: Dict[str, Any], hf_token: Optional[str] = None, wandb_api_key: Optional[str] = None, synthyra_api_key: Optional[str] = None, job_id: Optional[str] = None, gpu_type: Optional[str] = None, timeout_seconds: int = TIMEOUT_SECONDS, ) -> Dict[str, Any]: if job_id is None: job_id = _generate_job_id() selected_gpu = gpu_type if gpu_type in AVAILABLE_GPUS else PROTIFY_DEFAULT_GPU _update_job_status( job_id, { "status": "RUNNING", "phase": "startup", "gpu_type": selected_gpu, "last_heartbeat_utc": _now_utc_iso(), "started_at_utc": _now_utc_iso(), "error": None, }, ) active_hf_token = hf_token if active_hf_token is None: active_hf_token = os.environ.get("HF_TOKEN") if active_hf_token is not None: try: from huggingface_hub import login os.environ["HF_TOKEN"] = active_hf_token login(active_hf_token) except Exception: pass prepared_config = _prepare_config(config) log_file_path = os.path.join(prepared_config["log_dir"], f"{job_id}.txt") _update_job_status( job_id, { "log_file_path": log_file_path, "results_dir": prepared_config["results_dir"], "plots_dir": prepared_config["plots_dir"], }, ) run_dir = Path("/tmp/protify_run") / job_id run_dir.mkdir(parents=True, exist_ok=True) config_path = run_dir / "config.yaml" config_to_dump = dict(prepared_config) config_to_dump["hf_token"] = None config_to_dump["wandb_api_key"] = None config_to_dump["synthyra_api_key"] = None with open(config_path, "w", encoding="utf-8") as config_file: yaml.dump(config_to_dump, config_file, default_flow_style=False, allow_unicode=True, sort_keys=False) command = ["python", "-u", "main.py", "--yaml_path", str(config_path)] if active_hf_token is not None: command.extend(["--hf_token", active_hf_token]) if wandb_api_key is not None: command.extend(["--wandb_api_key", wandb_api_key]) if synthyra_api_key is not None: command.extend(["--synthyra_api_key", synthyra_api_key]) process_env = os.environ.copy() process_env["PYTHONPATH"] = "/root/src" process_env["WORKING_DIR"] = "/root" process_env["PYTHONUNBUFFERED"] = "1" process_env["PROTIFY_JOB_ID"] = job_id process_env["CUDA_VISIBLE_DEVICES"] = "0" if active_hf_token is not None: process_env["HF_TOKEN"] = active_hf_token if wandb_api_key is not None: process_env["WANDB_API_KEY"] = wandb_api_key os.makedirs(os.path.dirname(log_file_path), exist_ok=True) with open(log_file_path, "w", encoding="utf-8") as log_file: log_file.write(f"[{_now_utc_iso()}] Starting job {job_id}\n") log_file.write(f"GPU={selected_gpu}\n") log_file.write(f"Command={' '.join(command)}\n") volume.commit() stdout_lines = [] stderr_lines = [] log_lock = threading.Lock() phase_state = {"phase": "startup"} def append_log(log_line: str) -> None: with log_lock: with open(log_file_path, "a", encoding="utf-8", errors="ignore") as log_file: log_file.write(log_line + "\n") def stream_output(pipe, output_list, prefix: str = ""): try: for line in iter(pipe.readline, ""): if not line: continue clean_line = line.rstrip("\n") full_line = f"{prefix}{clean_line}" output_list.append(full_line) phase_state["phase"] = _infer_phase_from_line(clean_line, phase_state["phase"]) append_log(full_line) print(full_line, flush=True) finally: pipe.close() timed_out = False process = None try: process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, cwd="/root/src/protify", env=process_env, ) stdout_thread = threading.Thread(target=stream_output, args=(process.stdout, stdout_lines, ""), daemon=True) stderr_thread = threading.Thread(target=stream_output, args=(process.stderr, stderr_lines, "[STDERR] "), daemon=True) stdout_thread.start() stderr_thread.start() max_runtime_seconds = timeout_seconds if max_runtime_seconds > TIMEOUT_SECONDS - 60: max_runtime_seconds = TIMEOUT_SECONDS - 60 start_time = time.time() last_heartbeat = 0.0 while process.poll() is None: now = time.time() if now - start_time > max_runtime_seconds: timed_out = True process.kill() break if now - last_heartbeat >= HEARTBEAT_SECONDS: _update_job_status( job_id, { "status": "RUNNING", "phase": phase_state["phase"], "last_heartbeat_utc": _now_utc_iso(), }, ) last_heartbeat = now time.sleep(1) stdout_thread.join(timeout=5) stderr_thread.join(timeout=5) return_code = process.returncode if process is not None else -1 stdout_text = "\n".join(stdout_lines) stderr_text = "\n".join(stderr_lines) if timed_out: _update_job_status( job_id, { "status": "TIMEOUT", "phase": "timeout", "last_heartbeat_utc": _now_utc_iso(), "error": f"Process timed out after {max_runtime_seconds} seconds.", "exit_code": -1, "finished_at_utc": _now_utc_iso(), }, ) return { "success": False, "job_id": job_id, "status": "TIMEOUT", "error": f"Process timed out after {max_runtime_seconds} seconds.", "stdout": _tail_text(stdout_text, 5000), } if return_code != 0: _update_job_status( job_id, { "status": "FAILED", "phase": "failed", "last_heartbeat_utc": _now_utc_iso(), "error": _tail_text(stderr_text, 5000) if stderr_text else "Unknown subprocess error.", "exit_code": return_code, "finished_at_utc": _now_utc_iso(), }, ) return { "success": False, "job_id": job_id, "status": "FAILED", "error": _tail_text(stderr_text, 5000) if stderr_text else "Unknown subprocess error.", "stdout": _tail_text(stdout_text, 5000), } _update_job_status( job_id, { "status": "SUCCESS", "phase": "completed", "last_heartbeat_utc": _now_utc_iso(), "error": None, "exit_code": return_code, "finished_at_utc": _now_utc_iso(), }, ) return { "success": True, "job_id": job_id, "status": "SUCCESS", "stdout": _tail_text(stdout_text, 5000), } except Exception as error: _update_job_status( job_id, { "status": "FAILED", "phase": "exception", "last_heartbeat_utc": _now_utc_iso(), "error": str(error), "exit_code": -1, "finished_at_utc": _now_utc_iso(), }, ) return { "success": False, "job_id": job_id, "status": "FAILED", "error": str(error), "stdout": "", } @app.function( image=image, gpu="H200", volumes={"/data": volume}, memory=(GPU_MEMORY_MIN, GPU_MEMORY_MAX), cpu=(GPU_CPU_MIN, GPU_CPU_MAX), max_containers=MAX_CONTAINERS_GPU, scaledown_window=SCALEDOWN_WINDOW_GPU, timeout=TIMEOUT_SECONDS, ) def run_protify_job_h200( config: Dict[str, Any], hf_token: Optional[str] = None, wandb_api_key: Optional[str] = None, synthyra_api_key: Optional[str] = None, job_id: Optional[str] = None, gpu_type: Optional[str] = None, timeout_seconds: int = TIMEOUT_SECONDS, ) -> Dict[str, Any]: return _execute_protify_job(config, hf_token, wandb_api_key, synthyra_api_key, job_id, gpu_type, timeout_seconds) @app.function( image=image, gpu="H100", volumes={"/data": volume}, memory=(GPU_MEMORY_MIN, GPU_MEMORY_MAX), cpu=(GPU_CPU_MIN, GPU_CPU_MAX), max_containers=MAX_CONTAINERS_GPU, scaledown_window=SCALEDOWN_WINDOW_GPU, timeout=TIMEOUT_SECONDS, ) def run_protify_job_h100( config: Dict[str, Any], hf_token: Optional[str] = None, wandb_api_key: Optional[str] = None, synthyra_api_key: Optional[str] = None, job_id: Optional[str] = None, gpu_type: Optional[str] = None, timeout_seconds: int = TIMEOUT_SECONDS, ) -> Dict[str, Any]: return _execute_protify_job(config, hf_token, wandb_api_key, synthyra_api_key, job_id, gpu_type, timeout_seconds) @app.function( image=image, gpu="A100-80GB", volumes={"/data": volume}, memory=(GPU_MEMORY_MIN, GPU_MEMORY_MAX), cpu=(GPU_CPU_MIN, GPU_CPU_MAX), max_containers=MAX_CONTAINERS_GPU, scaledown_window=SCALEDOWN_WINDOW_GPU, timeout=TIMEOUT_SECONDS, ) def run_protify_job_a100_80gb( config: Dict[str, Any], hf_token: Optional[str] = None, wandb_api_key: Optional[str] = None, synthyra_api_key: Optional[str] = None, job_id: Optional[str] = None, gpu_type: Optional[str] = None, timeout_seconds: int = TIMEOUT_SECONDS, ) -> Dict[str, Any]: return _execute_protify_job(config, hf_token, wandb_api_key, synthyra_api_key, job_id, gpu_type, timeout_seconds) @app.function( image=image, gpu="A100", volumes={"/data": volume}, memory=(GPU_MEMORY_MIN, GPU_MEMORY_MAX), cpu=(GPU_CPU_MIN, GPU_CPU_MAX), max_containers=MAX_CONTAINERS_GPU, scaledown_window=SCALEDOWN_WINDOW_GPU, timeout=TIMEOUT_SECONDS, ) def run_protify_job_a100( config: Dict[str, Any], hf_token: Optional[str] = None, wandb_api_key: Optional[str] = None, synthyra_api_key: Optional[str] = None, job_id: Optional[str] = None, gpu_type: Optional[str] = None, timeout_seconds: int = TIMEOUT_SECONDS, ) -> Dict[str, Any]: return _execute_protify_job(config, hf_token, wandb_api_key, synthyra_api_key, job_id, gpu_type, timeout_seconds) @app.function( image=image, gpu="L40S", volumes={"/data": volume}, memory=(GPU_MEMORY_MIN, GPU_MEMORY_MAX), cpu=(GPU_CPU_MIN, GPU_CPU_MAX), max_containers=MAX_CONTAINERS_GPU, scaledown_window=SCALEDOWN_WINDOW_GPU, timeout=TIMEOUT_SECONDS, ) def run_protify_job_l40s( config: Dict[str, Any], hf_token: Optional[str] = None, wandb_api_key: Optional[str] = None, synthyra_api_key: Optional[str] = None, job_id: Optional[str] = None, gpu_type: Optional[str] = None, timeout_seconds: int = TIMEOUT_SECONDS, ) -> Dict[str, Any]: return _execute_protify_job(config, hf_token, wandb_api_key, synthyra_api_key, job_id, gpu_type, timeout_seconds) @app.function( image=image, gpu="A10", volumes={"/data": volume}, memory=(GPU_MEMORY_MIN, GPU_MEMORY_MAX), cpu=(GPU_CPU_MIN, GPU_CPU_MAX), max_containers=MAX_CONTAINERS_GPU, scaledown_window=SCALEDOWN_WINDOW_GPU, timeout=TIMEOUT_SECONDS, ) def run_protify_job_a10( config: Dict[str, Any], hf_token: Optional[str] = None, wandb_api_key: Optional[str] = None, synthyra_api_key: Optional[str] = None, job_id: Optional[str] = None, gpu_type: Optional[str] = None, timeout_seconds: int = TIMEOUT_SECONDS, ) -> Dict[str, Any]: return _execute_protify_job(config, hf_token, wandb_api_key, synthyra_api_key, job_id, gpu_type, timeout_seconds) @app.function( image=image, gpu="L4", volumes={"/data": volume}, memory=(GPU_MEMORY_MIN, GPU_MEMORY_MAX), cpu=(GPU_CPU_MIN, GPU_CPU_MAX), max_containers=MAX_CONTAINERS_GPU, scaledown_window=SCALEDOWN_WINDOW_GPU, timeout=TIMEOUT_SECONDS, ) def run_protify_job_l4( config: Dict[str, Any], hf_token: Optional[str] = None, wandb_api_key: Optional[str] = None, synthyra_api_key: Optional[str] = None, job_id: Optional[str] = None, gpu_type: Optional[str] = None, timeout_seconds: int = TIMEOUT_SECONDS, ) -> Dict[str, Any]: return _execute_protify_job(config, hf_token, wandb_api_key, synthyra_api_key, job_id, gpu_type, timeout_seconds) @app.function( image=image, gpu="T4", volumes={"/data": volume}, memory=(GPU_MEMORY_MIN, GPU_MEMORY_MAX), cpu=(GPU_CPU_MIN, GPU_CPU_MAX), max_containers=MAX_CONTAINERS_GPU, scaledown_window=SCALEDOWN_WINDOW_GPU, timeout=TIMEOUT_SECONDS, ) def run_protify_job_t4( config: Dict[str, Any], hf_token: Optional[str] = None, wandb_api_key: Optional[str] = None, synthyra_api_key: Optional[str] = None, job_id: Optional[str] = None, gpu_type: Optional[str] = None, timeout_seconds: int = TIMEOUT_SECONDS, ) -> Dict[str, Any]: return _execute_protify_job(config, hf_token, wandb_api_key, synthyra_api_key, job_id, gpu_type, timeout_seconds) gpu_functions = { "H200": run_protify_job_h200, "H100": run_protify_job_h100, "A100-80GB": run_protify_job_a100_80gb, "A100": run_protify_job_a100, "L40S": run_protify_job_l40s, "A10": run_protify_job_a10, "L4": run_protify_job_l4, "T4": run_protify_job_t4, } @app.function( image=image, volumes={"/data": volume}, memory=(CPU_MEMORY_MIN, CPU_MEMORY_MAX), cpu=(CPU_COUNT_MIN, CPU_COUNT_MAX), max_containers=MAX_CONTAINERS_CPU, scaledown_window=SCALEDOWN_WINDOW_CPU, ) def submit_protify_job( config: Dict[str, Any], gpu_type: str = PROTIFY_DEFAULT_GPU, hf_token: Optional[str] = None, wandb_api_key: Optional[str] = None, synthyra_api_key: Optional[str] = None, timeout_seconds: int = TIMEOUT_SECONDS, job_id: Optional[str] = None, ) -> Dict[str, Any]: if job_id is None: job_id = _generate_job_id() selected_gpu = gpu_type if gpu_type in AVAILABLE_GPUS else PROTIFY_DEFAULT_GPU _update_job_status( job_id, { "status": "PENDING", "phase": "queued", "gpu_type": selected_gpu, "last_heartbeat_utc": _now_utc_iso(), "error": None, }, ) selected_gpu_function = gpu_functions[selected_gpu] handle = selected_gpu_function.spawn( config=config, hf_token=hf_token, wandb_api_key=wandb_api_key, synthyra_api_key=synthyra_api_key, job_id=job_id, gpu_type=selected_gpu, timeout_seconds=timeout_seconds, ) function_call_id = handle.object_id _update_job_status( job_id, { "status": "RUNNING", "phase": "queued", "function_call_id": function_call_id, "last_heartbeat_utc": _now_utc_iso(), }, ) return { "success": True, "job_id": job_id, "function_call_id": function_call_id, "status": "RUNNING", "gpu_type": selected_gpu, } @app.function( image=image, volumes={"/data": volume}, memory=(CPU_MEMORY_MIN, CPU_MEMORY_MAX), cpu=(CPU_COUNT_MIN, CPU_COUNT_MAX), max_containers=MAX_CONTAINERS_CPU, scaledown_window=SCALEDOWN_WINDOW_CPU, ) def get_job_status(job_id: str) -> Dict[str, Any]: volume.reload() status_store = _safe_read_json(STATUS_FILE_PATH) if job_id not in status_store: return {"success": False, "job_id": job_id, "error": "Job ID not found."} job_status = status_store[job_id] heartbeat_age_seconds = None if "last_heartbeat_utc" in job_status and job_status["last_heartbeat_utc"]: try: heartbeat_time = datetime.fromisoformat(job_status["last_heartbeat_utc"]) heartbeat_age_seconds = (datetime.now(timezone.utc) - heartbeat_time).total_seconds() except Exception: heartbeat_age_seconds = None job_status["heartbeat_age_seconds"] = heartbeat_age_seconds job_status["success"] = True return job_status @app.function( image=image, volumes={"/data": volume}, memory=(CPU_MEMORY_MIN, CPU_MEMORY_MAX), cpu=(CPU_COUNT_MIN, CPU_COUNT_MAX), max_containers=MAX_CONTAINERS_CPU, scaledown_window=SCALEDOWN_WINDOW_CPU, ) def get_job_log_tail(job_id: str, max_chars: int = 5000) -> Dict[str, Any]: volume.reload() status_store = _safe_read_json(STATUS_FILE_PATH) status_entry = status_store[job_id] if job_id in status_store else None if status_entry is not None and "log_file_path" in status_entry and status_entry["log_file_path"]: log_file_path = status_entry["log_file_path"] else: log_file_path = os.path.join(LOG_DIR_DEFAULT, f"{job_id}.txt") if not os.path.exists(log_file_path): if status_entry is None: return {"success": False, "job_id": job_id, "error": "Job ID not found.", "log_tail": ""} return {"success": True, "job_id": job_id, "log_tail": ""} with open(log_file_path, "r", encoding="utf-8", errors="ignore") as log_file: text = log_file.read() return {"success": True, "job_id": job_id, "log_tail": _tail_text(text, max_chars)} @app.function( image=image, volumes={"/data": volume}, memory=(CPU_MEMORY_MIN, CPU_MEMORY_MAX), cpu=(CPU_COUNT_MIN, CPU_COUNT_MAX), max_containers=MAX_CONTAINERS_CPU, scaledown_window=SCALEDOWN_WINDOW_CPU, ) def get_job_log_delta(job_id: str, offset: int = 0, max_chars: int = 5000) -> Dict[str, Any]: volume.reload() if offset < 0: offset = 0 if max_chars <= 0: max_chars = 1 status_store = _safe_read_json(STATUS_FILE_PATH) status_entry = status_store[job_id] if job_id in status_store else None if status_entry is not None and "log_file_path" in status_entry and status_entry["log_file_path"]: log_file_path = status_entry["log_file_path"] else: log_file_path = os.path.join(LOG_DIR_DEFAULT, f"{job_id}.txt") if not os.path.exists(log_file_path): return { "success": True, "job_id": job_id, "file_exists": False, "chunk": "", "next_offset": offset, "file_size": 0, } with open(log_file_path, "r", encoding="utf-8", errors="ignore") as log_file: text = log_file.read() file_size = len(text) if offset > file_size: offset = file_size end_offset = offset + max_chars if end_offset > file_size: end_offset = file_size chunk = text[offset:end_offset] return { "success": True, "job_id": job_id, "file_exists": True, "chunk": chunk, "next_offset": end_offset, "file_size": file_size, } @app.function( image=image, volumes={"/data": volume}, memory=(CPU_MEMORY_MIN, CPU_MEMORY_MAX), cpu=(CPU_COUNT_MIN, CPU_COUNT_MAX), max_containers=MAX_CONTAINERS_CPU, scaledown_window=SCALEDOWN_WINDOW_CPU, ) def delete_modal_embeddings() -> Dict[str, Any]: volume.reload() embedding_dir = Path(EMBED_DIR_DEFAULT) if not embedding_dir.exists(): return { "success": True, "message": f"Embedding directory does not exist: {EMBED_DIR_DEFAULT}", "deleted_files": 0, "deleted_dirs": 0, } deleted_files = 0 deleted_dirs = 0 for path in embedding_dir.glob("*"): if path.is_file(): path.unlink() deleted_files += 1 elif path.is_dir(): shutil.rmtree(path) deleted_dirs += 1 volume.commit() return { "success": True, "message": f"Deleted modal embedding cache contents ({deleted_files} files, {deleted_dirs} directories).", "deleted_files": deleted_files, "deleted_dirs": deleted_dirs, } @app.function( image=image, volumes={"/data": volume}, memory=(CPU_MEMORY_MIN, CPU_MEMORY_MAX), cpu=(CPU_COUNT_MIN, CPU_COUNT_MAX), max_containers=MAX_CONTAINERS_CPU, scaledown_window=SCALEDOWN_WINDOW_CPU, ) def cancel_protify_job(function_call_id: str, job_id: Optional[str] = None) -> Dict[str, Any]: function_call = modal.FunctionCall.from_id(function_call_id) function_call.cancel() if job_id is not None: _update_job_status( job_id, { "status": "TERMINATED", "phase": "cancelled", "last_heartbeat_utc": _now_utc_iso(), "finished_at_utc": _now_utc_iso(), }, ) return {"success": True, "function_call_id": function_call_id, "job_id": job_id} @app.function( image=image, volumes={"/data": volume}, memory=(CPU_MEMORY_MIN, CPU_MEMORY_MAX), cpu=(CPU_COUNT_MIN, CPU_COUNT_MAX), max_containers=MAX_CONTAINERS_CPU, scaledown_window=SCALEDOWN_WINDOW_CPU, ) def get_results(job_id: str) -> Dict[str, Any]: volume.reload() image_extensions = {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".svg", ".webp"} results = {"success": True, "files": {}, "images": {}} results_dir = Path(RESULTS_DIR_DEFAULT) plots_dir = Path(PLOTS_DIR_DEFAULT) logs_dir = Path(LOG_DIR_DEFAULT) collected_files = set() result_file = results_dir / f"{job_id}.tsv" if result_file.exists(): collected_files.add(result_file) log_file = logs_dir / f"{job_id}.txt" if log_file.exists(): collected_files.add(log_file) plot_dir = plots_dir / job_id if plot_dir.exists() and plot_dir.is_dir(): for file_path in plot_dir.rglob("*"): if file_path.is_file(): collected_files.add(file_path) for file_path in collected_files: relative_path = str(file_path.relative_to(Path("/data"))) suffix = file_path.suffix.lower() try: if suffix in image_extensions: with open(file_path, "rb") as image_file: encoded = base64.b64encode(image_file.read()).decode("utf-8") mime_type = f"image/{suffix[1:]}" if suffix != ".svg" else "image/svg+xml" results["images"][relative_path] = {"data": encoded, "mime_type": mime_type} else: with open(file_path, "r", encoding="utf-8", errors="ignore") as text_file: results["files"][relative_path] = text_file.read() except Exception as error: if suffix in image_extensions: results["images"][relative_path] = {"error": str(error)} else: results["files"][relative_path] = f"Error reading file: {error}" return results @app.function( image=image, volumes={"/data": volume}, memory=(CPU_MEMORY_MIN, CPU_MEMORY_MAX), cpu=(CPU_COUNT_MIN, CPU_COUNT_MAX), max_containers=MAX_CONTAINERS_CPU, scaledown_window=SCALEDOWN_WINDOW_CPU, ) def list_jobs() -> Dict[str, Any]: volume.reload() status_store = _safe_read_json(STATUS_FILE_PATH) jobs = [] for job_id in status_store: jobs.append(status_store[job_id]) jobs.sort(key=lambda item: item["job_id"], reverse=True) return {"success": True, "jobs": jobs} if __name__ == "__main__": with app.run(): pass