Spaces:
Running
Running
| """ | |
| queue_manager.py — Thread-pool job queue with status tracking. | |
| """ | |
| from __future__ import annotations | |
| import threading | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| from typing import Any, Callable | |
| class State(str, Enum): | |
| QUEUED = "queued" | |
| RUNNING = "running" | |
| DONE = "done" | |
| FAILED = "failed" | |
| class JobStatus: | |
| job_id: str | |
| state: State = State.QUEUED | |
| message: str = "" | |
| created_at: float = field(default_factory=time.time) | |
| updated_at: float = field(default_factory=time.time) | |
| def update(self, state: State, message: str = "") -> None: | |
| self.state = state | |
| self.message = message | |
| self.updated_at = time.time() | |
| class JobQueue: | |
| def __init__(self, max_workers: int = 8, max_jobs: int = 100) -> None: | |
| self._max_jobs = max_jobs | |
| self._executor = ThreadPoolExecutor(max_workers=max_workers) | |
| self._jobs: dict[str, JobStatus] = {} | |
| self._lock = threading.Lock() | |
| def is_full(self) -> bool: | |
| with self._lock: | |
| active = sum( | |
| 1 for s in self._jobs.values() | |
| if s.state in (State.QUEUED, State.RUNNING) | |
| ) | |
| return active >= self._max_jobs | |
| def get_status(self, job_id: str) -> JobStatus | None: | |
| with self._lock: | |
| return self._jobs.get(job_id) | |
| def _set_state(self, job_id: str, state: State, message: str = "") -> None: | |
| with self._lock: | |
| if job_id in self._jobs: | |
| self._jobs[job_id].update(state, message) | |
| def register(self, job_id: str) -> JobStatus: | |
| status = JobStatus(job_id=job_id) | |
| with self._lock: | |
| self._jobs[job_id] = status | |
| return status | |