Spaces:
Running
Running
| # ========================================================== | |
| # ISPG PROGRESS TRACKER MODULE (FINAL CLEAN VERSION) | |
| # FILE: models/progress_tracker.py | |
| # ========================================================== | |
| # PURPOSE: | |
| # - Manage pipeline progress JSON for processing UI | |
| # - Support stepper display (8 steps) | |
| # - Prevent race conditions / corrupted JSON | |
| # - Attach outputs after pipeline done | |
| # | |
| # OUTPUT JSON FORMAT: | |
| # { | |
| # "task_id": "...", | |
| # "paper_id": "...", | |
| # "filename": "...", | |
| # "status": "running/success/failed", | |
| # "step": 1, | |
| # "total_steps": 8, | |
| # "percent": 25, | |
| # "message": "...", | |
| # "started": true, | |
| # "outputs": {...}, | |
| # "error": "...", | |
| # "traceback": "...", | |
| # "updated_at": "..." | |
| # } | |
| # ========================================================== | |
| import os | |
| import json | |
| from datetime import datetime | |
| from typing import Dict, Any | |
| # DEFAULT PROGRESS FOLDER | |
| PROGRESS_FOLDER = os.path.join("static", "progress") | |
| # ========================================================== | |
| # INTERNAL HELPERS | |
| # ========================================================== | |
| def _progress_file(task_id: str) -> str: | |
| os.makedirs(PROGRESS_FOLDER, exist_ok=True) | |
| return os.path.join(PROGRESS_FOLDER, f"{task_id}.json") | |
| def _safe_write_json(path: str, data: Dict[str, Any]): | |
| os.makedirs(os.path.dirname(path), exist_ok=True) | |
| # atomic write safe (prevent corrupted JSON) | |
| temp_path = path + ".tmp" | |
| with open(temp_path, "w", encoding="utf-8") as f: | |
| json.dump(data, f, indent=4, ensure_ascii=False) | |
| os.replace(temp_path, path) | |
| def _safe_read_json(path: str): | |
| if not os.path.exists(path): | |
| return None | |
| try: | |
| with open(path, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| except: | |
| return None | |
| def _now_str(): | |
| return datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| # ========================================================== | |
| # INIT PROGRESS | |
| # ========================================================== | |
| def init_progress(task_id: str, paper_id: str = "", filename: str = ""): | |
| """ | |
| Create progress file for new task. | |
| """ | |
| path = _progress_file(task_id) | |
| data = { | |
| "task_id": task_id, | |
| "paper_id": paper_id, | |
| "filename": filename, | |
| "status": "running", | |
| "step": 0, | |
| "total_steps": 8, | |
| "percent": 0, | |
| "message": "Waiting to start...", | |
| "started": False, | |
| "outputs": {}, | |
| "error": "", | |
| "traceback": "", | |
| "updated_at": _now_str() | |
| } | |
| _safe_write_json(path, data) | |
| return data | |
| # ========================================================== | |
| # RESET PROGRESS | |
| # ========================================================== | |
| def reset_progress(task_id: str): | |
| """ | |
| Delete existing progress file if exists. | |
| """ | |
| path = _progress_file(task_id) | |
| if os.path.exists(path): | |
| try: | |
| os.remove(path) | |
| except: | |
| pass | |
| # ========================================================== | |
| # READ PROGRESS | |
| # ========================================================== | |
| def read_progress(task_id: str): | |
| path = _progress_file(task_id) | |
| return _safe_read_json(path) | |
| # ========================================================== | |
| # UPDATE PROGRESS | |
| # ========================================================== | |
| def update_progress(task_id: str, step: int, percent: int, message: str): | |
| """ | |
| Update pipeline progress. | |
| """ | |
| path = _progress_file(task_id) | |
| data = _safe_read_json(path) | |
| if not data: | |
| data = init_progress(task_id) | |
| # normalize | |
| try: | |
| step = int(step) | |
| except: | |
| step = 0 | |
| try: | |
| percent = int(percent) | |
| except: | |
| percent = 0 | |
| if percent < 0: | |
| percent = 0 | |
| if percent > 100: | |
| percent = 100 | |
| data["step"] = step | |
| data["percent"] = percent | |
| data["message"] = str(message) | |
| # keep status running unless already success/failed | |
| if data.get("status") not in ["success", "failed"]: | |
| data["status"] = "running" | |
| data["updated_at"] = _now_str() | |
| _safe_write_json(path, data) | |
| return data | |
| # ========================================================== | |
| # MARK SUCCESS | |
| # ========================================================== | |
| def mark_success(task_id: str, message: str = "Processing complete."): | |
| path = _progress_file(task_id) | |
| data = _safe_read_json(path) | |
| if not data: | |
| data = init_progress(task_id) | |
| data["status"] = "success" | |
| data["step"] = data.get("total_steps", 8) | |
| data["percent"] = 100 | |
| data["message"] = message | |
| data["updated_at"] = _now_str() | |
| _safe_write_json(path, data) | |
| return data | |
| # ========================================================== | |
| # MARK FAILED | |
| # ========================================================== | |
| def mark_failed(task_id: str, error_message: str, traceback_text: str = ""): | |
| path = _progress_file(task_id) | |
| data = _safe_read_json(path) | |
| if not data: | |
| data = init_progress(task_id) | |
| data["status"] = "failed" | |
| data["error"] = str(error_message) | |
| data["traceback"] = str(traceback_text) | |
| data["message"] = "❌ Failed: " + str(error_message) | |
| data["updated_at"] = _now_str() | |
| _safe_write_json(path, data) | |
| return data | |
| # ========================================================== | |
| # ATTACH OUTPUTS | |
| # ========================================================== | |
| def attach_outputs(task_id: str, **kwargs): | |
| """ | |
| Example: | |
| attach_outputs(task_id, | |
| paper_id="paper123", | |
| poster_json_path="static/poster_json/paper123.json" | |
| ) | |
| """ | |
| path = _progress_file(task_id) | |
| data = _safe_read_json(path) | |
| if not data: | |
| data = init_progress(task_id) | |
| if "outputs" not in data or not isinstance(data["outputs"], dict): | |
| data["outputs"] = {} | |
| for k, v in kwargs.items(): | |
| data["outputs"][k] = v | |
| data["updated_at"] = _now_str() | |
| _safe_write_json(path, data) | |
| return data | |
| # ========================================================== | |
| # SET STARTED FLAG | |
| # ========================================================== | |
| def set_started(task_id: str, started: bool = True): | |
| path = _progress_file(task_id) | |
| data = _safe_read_json(path) | |
| if not data: | |
| data = init_progress(task_id) | |
| data["started"] = bool(started) | |
| data["updated_at"] = _now_str() | |
| _safe_write_json(path, data) | |
| return data |