Spaces:
Sleeping
Sleeping
| # # queue.py | |
| # import os | |
| # import time | |
| # import pickle | |
| # import asyncio | |
| # import nest_asyncio | |
| # from pathlib import Path | |
| # from enum import Enum | |
| # from collections import deque | |
| # from concurrent.futures import ThreadPoolExecutor | |
| # from typing import Callable, Optional | |
| # # allow nested event loops in some environments | |
| # nest_asyncio.apply() | |
| # class TaskStatus(Enum): | |
| # QUEUED = "queued" | |
| # RUNNING = "running" | |
| # COMPLETED = "completed" | |
| # FAILED = "failed" | |
| # CANCELLED = "cancelled" | |
| # class TaskQueue: | |
| # """ | |
| # File-backed persistent queue that stores tasks as metadata dicts. | |
| # Uses ThreadPoolExecutor to run CPU-bound pipeline in worker threads. | |
| # """ | |
| # def __init__(self, base_dir: Path | str, max_workers: int = 2): | |
| # self.base_dir = Path(base_dir) | |
| # self.base_dir.mkdir(parents=True, exist_ok=True) | |
| # self._queue_file = self.base_dir / "queue.pkl" | |
| # self._state_file = self.base_dir / "state.pkl" | |
| # self._dq = deque() # stores metadata dicts | |
| # self._tasks = {} # task_id -> metadata | |
| # self._statuses = {} # task_id -> TaskStatus | |
| # self._lock = asyncio.Lock() | |
| # self._executor = ThreadPoolExecutor(max_workers=max_workers) | |
| # self._worker_task: Optional[asyncio.Task] = None | |
| # self._shutdown = False | |
| # self._processor: Optional[Callable] = None # sync function to run per task | |
| # self._load_state() | |
| # # ------------------------ | |
| # # persistence | |
| # # ------------------------ | |
| # def _save_state(self): | |
| # try: | |
| # tmp = self._state_file.with_suffix(".tmp") | |
| # with tmp.open("wb") as f: | |
| # pickle.dump({ | |
| # "queue": list(self._dq), | |
| # "tasks": self._tasks, | |
| # "statuses": self._statuses, | |
| # }, f) | |
| # tmp.replace(self._state_file) | |
| # except Exception: | |
| # # do not crash app on disk save error; log in real app | |
| # pass | |
| # def _load_state(self): | |
| # if self._state_file.exists(): | |
| # try: | |
| # with self._state_file.open("rb") as f: | |
| # data = pickle.load(f) | |
| # for item in data.get("queue", []): | |
| # self._dq.append(item) | |
| # self._tasks.update(data.get("tasks", {})) | |
| # self._statuses.update(data.get("statuses", {})) | |
| # except Exception: | |
| # # if corrupted, start fresh | |
| # self._dq = deque() | |
| # self._tasks = {} | |
| # self._statuses = {} | |
| # # ------------------------ | |
| # # public API | |
| # # ------------------------ | |
| # def enqueue(self, task_meta: dict): | |
| # task_id = task_meta.get("task_id") | |
| # if not task_id: | |
| # raise ValueError("task_meta must contain 'task_id'") | |
| # self._dq.append(task_meta) | |
| # self._tasks[task_id] = task_meta | |
| # self._statuses[task_id] = TaskStatus.QUEUED | |
| # self._save_state() | |
| # def get_status(self, task_id: str): | |
| # return self._statuses.get(task_id) | |
| # def get_task_info(self, task_id: str): | |
| # return self._tasks.get(task_id) | |
| # def remove_task(self, task_id: str): | |
| # # Remove from tasks and statuses; queue items will be filtered by worker | |
| # self._tasks.pop(task_id, None) | |
| # self._statuses.pop(task_id, None) | |
| # self._save_state() | |
| # # ------------------------ | |
| # # lifecycle | |
| # # ------------------------ | |
| # async def start(self, processor: Callable): | |
| # """Start the background worker loop. processor should be a sync function accept task_meta.""" | |
| # if self._worker_task: | |
| # return | |
| # self._processor = processor | |
| # self._shutdown = False | |
| # loop = asyncio.get_event_loop() | |
| # self._worker_task = loop.create_task(self._worker_loop()) | |
| # async def stop(self): | |
| # self._shutdown = True | |
| # if self._worker_task: | |
| # await self._worker_task | |
| # self._worker_task = None | |
| # self._executor.shutdown(wait=True) | |
| # self._save_state() | |
| # # ------------------------ | |
| # # worker loop | |
| # # ------------------------ | |
| # async def _worker_loop(self): | |
| # logger.info("π Worker loop started") | |
| # while not self._shutdown: | |
| # if not self._dq: | |
| # await asyncio.sleep(0.5) | |
| # continue | |
| # task_meta = self._dq.popleft() | |
| # task_id = task_meta.get("task_id") | |
| # logger.info(f"Processing task {task_id}") | |
| # try: | |
| # self._statuses[task_id] = TaskStatus.RUNNING | |
| # self._save_state() | |
| # loop = asyncio.get_event_loop() | |
| # logger.debug(f"Running processor for task {task_id}") | |
| # future = loop.run_in_executor( | |
| # self._executor, | |
| # self._run_processor_safe, | |
| # task_meta | |
| # ) | |
| # result = await future | |
| # logger.debug(f"Processor result: {result}") | |
| # if isinstance(result, dict) and result.get("success"): | |
| # self._statuses[task_id] = TaskStatus.COMPLETED | |
| # self._tasks[task_id].update({ | |
| # "output_path": result.get("output_path"), | |
| # "output_bytes": result.get("output_bytes") | |
| # }) | |
| # logger.info(f"Task {task_id} completed successfully") | |
| # else: | |
| # self._statuses[task_id] = TaskStatus.FAILED | |
| # logger.error(f"Task {task_id} failed: {result}") | |
| # self._save_state() | |
| # except Exception as e: | |
| # logger.exception(f"Error processing task {task_id}") | |
| # self._statuses[task_id] = TaskStatus.FAILED | |
| # self._save_state() | |
| # await asyncio.sleep(0.1) | |
| # logger.info("π Worker loop exiting cleanly.") | |
| # def _run_processor_safe(self, task_meta: dict) -> dict: | |
| # try: | |
| # if not self._processor: | |
| # logger.error("β No processor configured, cannot run task.") | |
| # return {"success": False, "error": "No processor configured"} | |
| # task_id = task_meta.get("task_id") | |
| # logger.debug(f"π§ Running processor for task {task_id}...") | |
| # result = self._processor(task_meta) | |
| # logger.debug(f"π― Processor result for {task_id}: {result}") | |
| # return result or {"success": False} | |
| # except Exception as e: | |
| # logger.exception(f"π₯ Processor crashed for task {task_meta.get('task_id')}: {e}") | |
| # return {"success": False, "error": str(e)} | |
| # queue.py | |
| import os | |
| import time | |
| import pickle | |
| import asyncio | |
| import nest_asyncio | |
| from pathlib import Path | |
| from enum import Enum | |
| from collections import deque | |
| from concurrent.futures import ThreadPoolExecutor | |
| from typing import Callable, Optional | |
| # allow nested event loops in some environments | |
| nest_asyncio.apply() | |
| # -------------------------------------------------- | |
| # Logging setup | |
| # -------------------------------------------------- | |
| import logging | |
| logging.basicConfig( | |
| level=logging.DEBUG, | |
| format="π§© [%(asctime)s] [%(levelname)s] %(message)s", | |
| datefmt="%H:%M:%S", | |
| ) | |
| logger = logging.getLogger("queue_system") | |
| class TaskStatus(Enum): | |
| QUEUED = "queued" | |
| RUNNING = "running" | |
| COMPLETED = "completed" | |
| FAILED = "failed" | |
| CANCELLED = "cancelled" | |
| class TaskQueue: | |
| """ | |
| File-backed persistent queue that stores tasks as metadata dicts. | |
| Uses ThreadPoolExecutor to run CPU-bound pipeline in worker threads. | |
| """ | |
| def __init__(self, base_dir: Path | str, max_workers: int = 2): | |
| self.base_dir = Path(base_dir) | |
| self.base_dir.mkdir(parents=True, exist_ok=True) | |
| self._queue_file = self.base_dir / "queue.pkl" | |
| self._state_file = self.base_dir / "state.pkl" | |
| self._dq = deque() | |
| self._tasks = {} | |
| self._statuses = {} | |
| self._lock = asyncio.Lock() | |
| self._executor = ThreadPoolExecutor(max_workers=max_workers) | |
| self._worker_task: Optional[asyncio.Task] = None | |
| self._shutdown = False | |
| self._processor: Optional[Callable] = None | |
| logger.info(f"π TaskQueue initialized | base_dir={self.base_dir} | workers={max_workers}") | |
| self._load_state() | |
| # ------------------------ | |
| # persistence | |
| # ------------------------ | |
| def _save_state(self): | |
| try: | |
| tmp = self._state_file.with_suffix(".tmp") | |
| with tmp.open("wb") as f: | |
| pickle.dump({ | |
| "queue": list(self._dq), | |
| "tasks": self._tasks, | |
| "statuses": self._statuses, | |
| }, f) | |
| tmp.replace(self._state_file) | |
| logger.debug(f"πΎ Queue state saved | queued={len(self._dq)} tasks") | |
| except Exception as e: | |
| logger.warning(f"β οΈ Failed to save state: {e}") | |
| def _load_state(self): | |
| if self._state_file.exists(): | |
| try: | |
| with self._state_file.open("rb") as f: | |
| data = pickle.load(f) | |
| for item in data.get("queue", []): | |
| self._dq.append(item) | |
| self._tasks.update(data.get("tasks", {})) | |
| self._statuses.update(data.get("statuses", {})) | |
| logger.info(f"π Loaded previous queue state | tasks={len(self._tasks)}") | |
| except Exception as e: | |
| logger.error(f"β Failed to load state file, starting fresh: {e}") | |
| self._dq = deque() | |
| self._tasks = {} | |
| self._statuses = {} | |
| # ------------------------ | |
| # public API | |
| # ------------------------ | |
| def enqueue(self, task_meta: dict): | |
| task_id = task_meta.get("task_id") | |
| if not task_id: | |
| raise ValueError("task_meta must contain 'task_id'") | |
| self._dq.append(task_meta) | |
| self._tasks[task_id] = task_meta | |
| self._statuses[task_id] = TaskStatus.QUEUED | |
| self._save_state() | |
| logger.info(f"π Task enqueued | id={task_id} | total_queued={len(self._dq)}") | |
| def get_status(self, task_id: str): | |
| st = self._statuses.get(task_id) | |
| logger.debug(f"π get_status({task_id}) β {st}") | |
| return st | |
| def get_task_info(self, task_id: str): | |
| info = self._tasks.get(task_id) | |
| logger.debug(f"βΉοΈ get_task_info({task_id}) β {'found' if info else 'not found'}") | |
| return info | |
| def remove_task(self, task_id: str): | |
| self._tasks.pop(task_id, None) | |
| self._statuses.pop(task_id, None) | |
| self._save_state() | |
| logger.info(f"π Task removed from system | id={task_id}") | |
| # ------------------------ | |
| # lifecycle | |
| # ------------------------ | |
| async def start(self, processor: Callable): | |
| if self._worker_task: | |
| logger.warning("β οΈ Queue worker already running, ignoring start request.") | |
| return | |
| self._processor = processor | |
| self._shutdown = False | |
| loop = asyncio.get_event_loop() | |
| self._worker_task = loop.create_task(self._worker_loop()) | |
| logger.info("π Background worker started successfully.") | |
| async def stop(self): | |
| logger.info("π Stopping background worker...") | |
| self._shutdown = True | |
| if self._worker_task: | |
| await self._worker_task | |
| self._worker_task = None | |
| self._executor.shutdown(wait=True) | |
| self._save_state() | |
| logger.info("β Worker stopped and executor shut down cleanly.") | |
| # ------------------------ | |
| # worker loop | |
| # ------------------------ | |
| async def _worker_loop(self): | |
| logger.info("π Worker loop started") | |
| while not self._shutdown: | |
| if not self._dq: | |
| await asyncio.sleep(0.5) | |
| continue | |
| task_meta = self._dq.popleft() | |
| task_id = task_meta.get("task_id") | |
| logger.info(f"Processing task {task_id}") | |
| try: | |
| self._statuses[task_id] = TaskStatus.RUNNING | |
| self._save_state() | |
| loop = asyncio.get_event_loop() | |
| logger.debug(f"Running processor for task {task_id}") | |
| future = loop.run_in_executor( | |
| self._executor, | |
| self._run_processor_safe, | |
| task_meta | |
| ) | |
| result = await future | |
| logger.debug(f"Processor result: {result}") | |
| if isinstance(result, dict) and result.get("success"): | |
| self._statuses[task_id] = TaskStatus.COMPLETED | |
| self._tasks[task_id].update({ | |
| "output_path": result.get("output_path"), | |
| "output_bytes": result.get("output_bytes") | |
| }) | |
| logger.info(f"Task {task_id} completed successfully") | |
| else: | |
| self._statuses[task_id] = TaskStatus.FAILED | |
| logger.error(f"Task {task_id} failed: {result}") | |
| self._save_state() | |
| except Exception as e: | |
| logger.exception(f"Error processing task {task_id}") | |
| self._statuses[task_id] = TaskStatus.FAILED | |
| self._save_state() | |
| await asyncio.sleep(0.1) | |
| logger.info("π Worker loop exiting cleanly.") | |
| def _run_processor_safe(self, task_meta: dict) -> dict: | |
| try: | |
| if not self._processor: | |
| logger.error("β No processor configured, cannot run task.") | |
| return {"success": False, "error": "No processor configured"} | |
| task_id = task_meta.get("task_id") | |
| logger.debug(f"π§ Running processor for task {task_id}...") | |
| result = self._processor(task_meta) | |
| logger.debug(f"π― Processor result for {task_id}: {result}") | |
| return result or {"success": False} | |
| except Exception as e: | |
| logger.exception(f"π₯ Processor crashed for task {task_meta.get('task_id')}: {e}") | |
| return {"success": False, "error": str(e)} | |