ispg-backend / models /progress_tracker.py
urestrange's picture
Upload 162 files
cc2c355 verified
Raw
History Blame Contribute Delete
6.71 kB
# ==========================================================
# ISPG PROGRESS TRACKER MODULE (FINAL CLEAN VERSION)
# FILE: models/progress_tracker.py
# ==========================================================
# PURPOSE:
# - Manage pipeline progress JSON for processing UI
# - Support stepper display (8 steps)
# - Prevent race conditions / corrupted JSON
# - Attach outputs after pipeline done
#
# OUTPUT JSON FORMAT:
# {
# "task_id": "...",
# "paper_id": "...",
# "filename": "...",
# "status": "running/success/failed",
# "step": 1,
# "total_steps": 8,
# "percent": 25,
# "message": "...",
# "started": true,
# "outputs": {...},
# "error": "...",
# "traceback": "...",
# "updated_at": "..."
# }
# ==========================================================
import os
import json
from datetime import datetime
from typing import Dict, Any
# DEFAULT PROGRESS FOLDER
PROGRESS_FOLDER = os.path.join("static", "progress")
# ==========================================================
# INTERNAL HELPERS
# ==========================================================
def _progress_file(task_id: str) -> str:
os.makedirs(PROGRESS_FOLDER, exist_ok=True)
return os.path.join(PROGRESS_FOLDER, f"{task_id}.json")
def _safe_write_json(path: str, data: Dict[str, Any]):
os.makedirs(os.path.dirname(path), exist_ok=True)
# atomic write safe (prevent corrupted JSON)
temp_path = path + ".tmp"
with open(temp_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=4, ensure_ascii=False)
os.replace(temp_path, path)
def _safe_read_json(path: str):
if not os.path.exists(path):
return None
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except:
return None
def _now_str():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# ==========================================================
# INIT PROGRESS
# ==========================================================
def init_progress(task_id: str, paper_id: str = "", filename: str = ""):
"""
Create progress file for new task.
"""
path = _progress_file(task_id)
data = {
"task_id": task_id,
"paper_id": paper_id,
"filename": filename,
"status": "running",
"step": 0,
"total_steps": 8,
"percent": 0,
"message": "Waiting to start...",
"started": False,
"outputs": {},
"error": "",
"traceback": "",
"updated_at": _now_str()
}
_safe_write_json(path, data)
return data
# ==========================================================
# RESET PROGRESS
# ==========================================================
def reset_progress(task_id: str):
"""
Delete existing progress file if exists.
"""
path = _progress_file(task_id)
if os.path.exists(path):
try:
os.remove(path)
except:
pass
# ==========================================================
# READ PROGRESS
# ==========================================================
def read_progress(task_id: str):
path = _progress_file(task_id)
return _safe_read_json(path)
# ==========================================================
# UPDATE PROGRESS
# ==========================================================
def update_progress(task_id: str, step: int, percent: int, message: str):
"""
Update pipeline progress.
"""
path = _progress_file(task_id)
data = _safe_read_json(path)
if not data:
data = init_progress(task_id)
# normalize
try:
step = int(step)
except:
step = 0
try:
percent = int(percent)
except:
percent = 0
if percent < 0:
percent = 0
if percent > 100:
percent = 100
data["step"] = step
data["percent"] = percent
data["message"] = str(message)
# keep status running unless already success/failed
if data.get("status") not in ["success", "failed"]:
data["status"] = "running"
data["updated_at"] = _now_str()
_safe_write_json(path, data)
return data
# ==========================================================
# MARK SUCCESS
# ==========================================================
def mark_success(task_id: str, message: str = "Processing complete."):
path = _progress_file(task_id)
data = _safe_read_json(path)
if not data:
data = init_progress(task_id)
data["status"] = "success"
data["step"] = data.get("total_steps", 8)
data["percent"] = 100
data["message"] = message
data["updated_at"] = _now_str()
_safe_write_json(path, data)
return data
# ==========================================================
# MARK FAILED
# ==========================================================
def mark_failed(task_id: str, error_message: str, traceback_text: str = ""):
path = _progress_file(task_id)
data = _safe_read_json(path)
if not data:
data = init_progress(task_id)
data["status"] = "failed"
data["error"] = str(error_message)
data["traceback"] = str(traceback_text)
data["message"] = "❌ Failed: " + str(error_message)
data["updated_at"] = _now_str()
_safe_write_json(path, data)
return data
# ==========================================================
# ATTACH OUTPUTS
# ==========================================================
def attach_outputs(task_id: str, **kwargs):
"""
Example:
attach_outputs(task_id,
paper_id="paper123",
poster_json_path="static/poster_json/paper123.json"
)
"""
path = _progress_file(task_id)
data = _safe_read_json(path)
if not data:
data = init_progress(task_id)
if "outputs" not in data or not isinstance(data["outputs"], dict):
data["outputs"] = {}
for k, v in kwargs.items():
data["outputs"][k] = v
data["updated_at"] = _now_str()
_safe_write_json(path, data)
return data
# ==========================================================
# SET STARTED FLAG
# ==========================================================
def set_started(task_id: str, started: bool = True):
path = _progress_file(task_id)
data = _safe_read_json(path)
if not data:
data = init_progress(task_id)
data["started"] = bool(started)
data["updated_at"] = _now_str()
_safe_write_json(path, data)
return data