# ========================================================== # ISPG PROGRESS TRACKER MODULE (FINAL CLEAN VERSION) # FILE: models/progress_tracker.py # ========================================================== # PURPOSE: # - Manage pipeline progress JSON for processing UI # - Support stepper display (8 steps) # - Prevent race conditions / corrupted JSON # - Attach outputs after pipeline done # # OUTPUT JSON FORMAT: # { # "task_id": "...", # "paper_id": "...", # "filename": "...", # "status": "running/success/failed", # "step": 1, # "total_steps": 8, # "percent": 25, # "message": "...", # "started": true, # "outputs": {...}, # "error": "...", # "traceback": "...", # "updated_at": "..." # } # ========================================================== import os import json from datetime import datetime from typing import Dict, Any # DEFAULT PROGRESS FOLDER PROGRESS_FOLDER = os.path.join("static", "progress") # ========================================================== # INTERNAL HELPERS # ========================================================== def _progress_file(task_id: str) -> str: os.makedirs(PROGRESS_FOLDER, exist_ok=True) return os.path.join(PROGRESS_FOLDER, f"{task_id}.json") def _safe_write_json(path: str, data: Dict[str, Any]): os.makedirs(os.path.dirname(path), exist_ok=True) # atomic write safe (prevent corrupted JSON) temp_path = path + ".tmp" with open(temp_path, "w", encoding="utf-8") as f: json.dump(data, f, indent=4, ensure_ascii=False) os.replace(temp_path, path) def _safe_read_json(path: str): if not os.path.exists(path): return None try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except: return None def _now_str(): return datetime.now().strftime("%Y-%m-%d %H:%M:%S") # ========================================================== # INIT PROGRESS # ========================================================== def init_progress(task_id: str, paper_id: str = "", filename: str = ""): """ Create progress file for new task. """ path = _progress_file(task_id) data = { "task_id": task_id, "paper_id": paper_id, "filename": filename, "status": "running", "step": 0, "total_steps": 8, "percent": 0, "message": "Waiting to start...", "started": False, "outputs": {}, "error": "", "traceback": "", "updated_at": _now_str() } _safe_write_json(path, data) return data # ========================================================== # RESET PROGRESS # ========================================================== def reset_progress(task_id: str): """ Delete existing progress file if exists. """ path = _progress_file(task_id) if os.path.exists(path): try: os.remove(path) except: pass # ========================================================== # READ PROGRESS # ========================================================== def read_progress(task_id: str): path = _progress_file(task_id) return _safe_read_json(path) # ========================================================== # UPDATE PROGRESS # ========================================================== def update_progress(task_id: str, step: int, percent: int, message: str): """ Update pipeline progress. """ path = _progress_file(task_id) data = _safe_read_json(path) if not data: data = init_progress(task_id) # normalize try: step = int(step) except: step = 0 try: percent = int(percent) except: percent = 0 if percent < 0: percent = 0 if percent > 100: percent = 100 data["step"] = step data["percent"] = percent data["message"] = str(message) # keep status running unless already success/failed if data.get("status") not in ["success", "failed"]: data["status"] = "running" data["updated_at"] = _now_str() _safe_write_json(path, data) return data # ========================================================== # MARK SUCCESS # ========================================================== def mark_success(task_id: str, message: str = "Processing complete."): path = _progress_file(task_id) data = _safe_read_json(path) if not data: data = init_progress(task_id) data["status"] = "success" data["step"] = data.get("total_steps", 8) data["percent"] = 100 data["message"] = message data["updated_at"] = _now_str() _safe_write_json(path, data) return data # ========================================================== # MARK FAILED # ========================================================== def mark_failed(task_id: str, error_message: str, traceback_text: str = ""): path = _progress_file(task_id) data = _safe_read_json(path) if not data: data = init_progress(task_id) data["status"] = "failed" data["error"] = str(error_message) data["traceback"] = str(traceback_text) data["message"] = "❌ Failed: " + str(error_message) data["updated_at"] = _now_str() _safe_write_json(path, data) return data # ========================================================== # ATTACH OUTPUTS # ========================================================== def attach_outputs(task_id: str, **kwargs): """ Example: attach_outputs(task_id, paper_id="paper123", poster_json_path="static/poster_json/paper123.json" ) """ path = _progress_file(task_id) data = _safe_read_json(path) if not data: data = init_progress(task_id) if "outputs" not in data or not isinstance(data["outputs"], dict): data["outputs"] = {} for k, v in kwargs.items(): data["outputs"][k] = v data["updated_at"] = _now_str() _safe_write_json(path, data) return data # ========================================================== # SET STARTED FLAG # ========================================================== def set_started(task_id: str, started: bool = True): path = _progress_file(task_id) data = _safe_read_json(path) if not data: data = init_progress(task_id) data["started"] = bool(started) data["updated_at"] = _now_str() _safe_write_json(path, data) return data