File size: 5,242 Bytes
d037fc6
 
 
 
 
 
115023b
d037fc6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
115023b
d037fc6
 
 
115023b
d037fc6
 
 
115023b
d037fc6
115023b
d037fc6
 
 
 
115023b
 
 
 
 
 
 
d037fc6
 
 
 
115023b
 
 
 
 
 
 
 
 
 
d037fc6
 
115023b
 
 
 
 
 
 
 
d037fc6
 
115023b
 
 
 
 
 
 
d037fc6
 
115023b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d037fc6
 
115023b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d037fc6
 
115023b
 
d037fc6
 
 
 
 
 
115023b
d037fc6
115023b
 
 
 
 
 
 
 
 
 
 
d037fc6
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import uuid
import time
from typing import Dict, Optional, Any
from dataclasses import dataclass, field
from enum import Enum
import threading
from pathlib import Path

class JobStatus(str, Enum):
    PENDING = "pending"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class Job:
    job_id: str
    status: JobStatus
    progress: float = 0.0
    message: str = ""
    result: Optional[Any] = None
    error: Optional[str] = None
    created_at: float = field(default_factory=time.time)
    updated_at: float = field(default_factory=time.time)
    total_steps: int = 100
    current_step: int = 0

class ProgressTracker:
    _instance = None
    _init_lock = threading.Lock()
    
    def __new__(cls):
        if cls._instance is None:
            with cls._init_lock:
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
                    cls._instance._jobs: Dict[str, Job] = {}
                    cls._instance._lock = threading.RLock()
                    cls._instance._cleanup_interval = 300
                    cls._instance._file_cleanup_interval = 600
        return cls._instance
    
    def create_job(self, message: str = "Starting...") -> str:
        job_id = str(uuid.uuid4())
        with self._lock:
            self._jobs[job_id] = Job(
                job_id=job_id,
                status=JobStatus.PENDING,
                message=message
            )
            self._cleanup_old_jobs_locked()
        return job_id
    
    def update_progress(self, job_id: str, progress: float, message: str = "", 
                       current_step: int = 0, total_steps: int = 100):
        with self._lock:
            if job_id in self._jobs:
                job = self._jobs[job_id]
                job.progress = min(progress, 100.0)
                job.current_step = current_step
                job.total_steps = total_steps
                if message:
                    job.message = message
                job.status = JobStatus.PROCESSING
                job.updated_at = time.time()
    
    def complete_job(self, job_id: str, result: Any = None, message: str = "Completed"):
        with self._lock:
            if job_id in self._jobs:
                job = self._jobs[job_id]
                job.status = JobStatus.COMPLETED
                job.progress = 100.0
                job.message = message
                job.result = result
                job.updated_at = time.time()
    
    def fail_job(self, job_id: str, error: str):
        with self._lock:
            if job_id in self._jobs:
                job = self._jobs[job_id]
                job.status = JobStatus.FAILED
                job.error = error
                job.message = f"Failed: {error}"
                job.updated_at = time.time()
    
    def get_job(self, job_id: str) -> Optional[Job]:
        with self._lock:
            job = self._jobs.get(job_id)
            if job:
                return Job(
                    job_id=job.job_id,
                    status=job.status,
                    progress=job.progress,
                    message=job.message,
                    result=job.result,
                    error=job.error,
                    created_at=job.created_at,
                    updated_at=job.updated_at,
                    total_steps=job.total_steps,
                    current_step=job.current_step
                )
            return None
    
    def get_progress(self, job_id: str) -> Optional[dict]:
        with self._lock:
            job = self._jobs.get(job_id)
            if job is None:
                return None
            return {
                "job_id": job.job_id,
                "status": job.status.value,
                "progress": round(job.progress, 1),
                "message": job.message,
                "current_step": job.current_step,
                "total_steps": job.total_steps,
                "has_result": job.result is not None,
                "error": job.error
            }
    
    def remove_job_and_cleanup(self, job_id: str) -> Optional[str]:
        """Remove a job and return the result file path for deletion."""
        with self._lock:
            job = self._jobs.pop(job_id, None)
            if job and job.result:
                return str(job.result)
            return None
    
    def _cleanup_old_jobs_locked(self):
        """Must be called with self._lock held."""
        current_time = time.time()
        expired_jobs = [
            job_id for job_id, job in self._jobs.items()
            if current_time - job.updated_at > self._cleanup_interval
            and job.status in (JobStatus.COMPLETED, JobStatus.FAILED)
        ]
        files_to_delete = []
        for job_id in expired_jobs:
            job = self._jobs.pop(job_id, None)
            if job and job.result:
                files_to_delete.append(str(job.result))
        
        for file_path in files_to_delete:
            try:
                path = Path(file_path)
                if path.exists():
                    path.unlink()
            except Exception:
                pass

def get_tracker() -> ProgressTracker:
    return ProgressTracker()