# # queue.py # import os # import time # import pickle # import asyncio # import nest_asyncio # from pathlib import Path # from enum import Enum # from collections import deque # from concurrent.futures import ThreadPoolExecutor # from typing import Callable, Optional # # allow nested event loops in some environments # nest_asyncio.apply() # class TaskStatus(Enum): # QUEUED = "queued" # RUNNING = "running" # COMPLETED = "completed" # FAILED = "failed" # CANCELLED = "cancelled" # class TaskQueue: # """ # File-backed persistent queue that stores tasks as metadata dicts. # Uses ThreadPoolExecutor to run CPU-bound pipeline in worker threads. # """ # def __init__(self, base_dir: Path | str, max_workers: int = 2): # self.base_dir = Path(base_dir) # self.base_dir.mkdir(parents=True, exist_ok=True) # self._queue_file = self.base_dir / "queue.pkl" # self._state_file = self.base_dir / "state.pkl" # self._dq = deque() # stores metadata dicts # self._tasks = {} # task_id -> metadata # self._statuses = {} # task_id -> TaskStatus # self._lock = asyncio.Lock() # self._executor = ThreadPoolExecutor(max_workers=max_workers) # self._worker_task: Optional[asyncio.Task] = None # self._shutdown = False # self._processor: Optional[Callable] = None # sync function to run per task # self._load_state() # # ------------------------ # # persistence # # ------------------------ # def _save_state(self): # try: # tmp = self._state_file.with_suffix(".tmp") # with tmp.open("wb") as f: # pickle.dump({ # "queue": list(self._dq), # "tasks": self._tasks, # "statuses": self._statuses, # }, f) # tmp.replace(self._state_file) # except Exception: # # do not crash app on disk save error; log in real app # pass # def _load_state(self): # if self._state_file.exists(): # try: # with self._state_file.open("rb") as f: # data = pickle.load(f) # for item in data.get("queue", []): # self._dq.append(item) # self._tasks.update(data.get("tasks", {})) # self._statuses.update(data.get("statuses", {})) # except Exception: # # if corrupted, start fresh # self._dq = deque() # self._tasks = {} # self._statuses = {} # # ------------------------ # # public API # # ------------------------ # def enqueue(self, task_meta: dict): # task_id = task_meta.get("task_id") # if not task_id: # raise ValueError("task_meta must contain 'task_id'") # self._dq.append(task_meta) # self._tasks[task_id] = task_meta # self._statuses[task_id] = TaskStatus.QUEUED # self._save_state() # def get_status(self, task_id: str): # return self._statuses.get(task_id) # def get_task_info(self, task_id: str): # return self._tasks.get(task_id) # def remove_task(self, task_id: str): # # Remove from tasks and statuses; queue items will be filtered by worker # self._tasks.pop(task_id, None) # self._statuses.pop(task_id, None) # self._save_state() # # ------------------------ # # lifecycle # # ------------------------ # async def start(self, processor: Callable): # """Start the background worker loop. processor should be a sync function accept task_meta.""" # if self._worker_task: # return # self._processor = processor # self._shutdown = False # loop = asyncio.get_event_loop() # self._worker_task = loop.create_task(self._worker_loop()) # async def stop(self): # self._shutdown = True # if self._worker_task: # await self._worker_task # self._worker_task = None # self._executor.shutdown(wait=True) # self._save_state() # # ------------------------ # # worker loop # # ------------------------ # async def _worker_loop(self): # logger.info("🔄 Worker loop started") # while not self._shutdown: # if not self._dq: # await asyncio.sleep(0.5) # continue # task_meta = self._dq.popleft() # task_id = task_meta.get("task_id") # logger.info(f"Processing task {task_id}") # try: # self._statuses[task_id] = TaskStatus.RUNNING # self._save_state() # loop = asyncio.get_event_loop() # logger.debug(f"Running processor for task {task_id}") # future = loop.run_in_executor( # self._executor, # self._run_processor_safe, # task_meta # ) # result = await future # logger.debug(f"Processor result: {result}") # if isinstance(result, dict) and result.get("success"): # self._statuses[task_id] = TaskStatus.COMPLETED # self._tasks[task_id].update({ # "output_path": result.get("output_path"), # "output_bytes": result.get("output_bytes") # }) # logger.info(f"Task {task_id} completed successfully") # else: # self._statuses[task_id] = TaskStatus.FAILED # logger.error(f"Task {task_id} failed: {result}") # self._save_state() # except Exception as e: # logger.exception(f"Error processing task {task_id}") # self._statuses[task_id] = TaskStatus.FAILED # self._save_state() # await asyncio.sleep(0.1) # logger.info("🔚 Worker loop exiting cleanly.") # def _run_processor_safe(self, task_meta: dict) -> dict: # try: # if not self._processor: # logger.error("❌ No processor configured, cannot run task.") # return {"success": False, "error": "No processor configured"} # task_id = task_meta.get("task_id") # logger.debug(f"🚧 Running processor for task {task_id}...") # result = self._processor(task_meta) # logger.debug(f"đŸŽ¯ Processor result for {task_id}: {result}") # return result or {"success": False} # except Exception as e: # logger.exception(f"đŸ’Ĩ Processor crashed for task {task_meta.get('task_id')}: {e}") # return {"success": False, "error": str(e)} # queue.py import os import time import pickle import asyncio import nest_asyncio from pathlib import Path from enum import Enum from collections import deque from concurrent.futures import ThreadPoolExecutor from typing import Callable, Optional # allow nested event loops in some environments nest_asyncio.apply() # -------------------------------------------------- # Logging setup # -------------------------------------------------- import logging logging.basicConfig( level=logging.DEBUG, format="🧩 [%(asctime)s] [%(levelname)s] %(message)s", datefmt="%H:%M:%S", ) logger = logging.getLogger("queue_system") class TaskStatus(Enum): QUEUED = "queued" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" class TaskQueue: """ File-backed persistent queue that stores tasks as metadata dicts. Uses ThreadPoolExecutor to run CPU-bound pipeline in worker threads. """ def __init__(self, base_dir: Path | str, max_workers: int = 2): self.base_dir = Path(base_dir) self.base_dir.mkdir(parents=True, exist_ok=True) self._queue_file = self.base_dir / "queue.pkl" self._state_file = self.base_dir / "state.pkl" self._dq = deque() self._tasks = {} self._statuses = {} self._lock = asyncio.Lock() self._executor = ThreadPoolExecutor(max_workers=max_workers) self._worker_task: Optional[asyncio.Task] = None self._shutdown = False self._processor: Optional[Callable] = None logger.info(f"🚀 TaskQueue initialized | base_dir={self.base_dir} | workers={max_workers}") self._load_state() # ------------------------ # persistence # ------------------------ def _save_state(self): try: tmp = self._state_file.with_suffix(".tmp") with tmp.open("wb") as f: pickle.dump({ "queue": list(self._dq), "tasks": self._tasks, "statuses": self._statuses, }, f) tmp.replace(self._state_file) logger.debug(f"💾 Queue state saved | queued={len(self._dq)} tasks") except Exception as e: logger.warning(f"âš ī¸ Failed to save state: {e}") def _load_state(self): if self._state_file.exists(): try: with self._state_file.open("rb") as f: data = pickle.load(f) for item in data.get("queue", []): self._dq.append(item) self._tasks.update(data.get("tasks", {})) self._statuses.update(data.get("statuses", {})) logger.info(f"📂 Loaded previous queue state | tasks={len(self._tasks)}") except Exception as e: logger.error(f"❌ Failed to load state file, starting fresh: {e}") self._dq = deque() self._tasks = {} self._statuses = {} # ------------------------ # public API # ------------------------ def enqueue(self, task_meta: dict): task_id = task_meta.get("task_id") if not task_id: raise ValueError("task_meta must contain 'task_id'") self._dq.append(task_meta) self._tasks[task_id] = task_meta self._statuses[task_id] = TaskStatus.QUEUED self._save_state() logger.info(f"🆕 Task enqueued | id={task_id} | total_queued={len(self._dq)}") def get_status(self, task_id: str): st = self._statuses.get(task_id) logger.debug(f"📊 get_status({task_id}) → {st}") return st def get_task_info(self, task_id: str): info = self._tasks.get(task_id) logger.debug(f"â„šī¸ get_task_info({task_id}) → {'found' if info else 'not found'}") return info def remove_task(self, task_id: str): self._tasks.pop(task_id, None) self._statuses.pop(task_id, None) self._save_state() logger.info(f"🗑 Task removed from system | id={task_id}") # ------------------------ # lifecycle # ------------------------ async def start(self, processor: Callable): if self._worker_task: logger.warning("âš ī¸ Queue worker already running, ignoring start request.") return self._processor = processor self._shutdown = False loop = asyncio.get_event_loop() self._worker_task = loop.create_task(self._worker_loop()) logger.info("🏁 Background worker started successfully.") async def stop(self): logger.info("🛑 Stopping background worker...") self._shutdown = True if self._worker_task: await self._worker_task self._worker_task = None self._executor.shutdown(wait=True) self._save_state() logger.info("✅ Worker stopped and executor shut down cleanly.") # ------------------------ # worker loop # ------------------------ async def _worker_loop(self): logger.info("🔄 Worker loop started") while not self._shutdown: if not self._dq: await asyncio.sleep(0.5) continue task_meta = self._dq.popleft() task_id = task_meta.get("task_id") logger.info(f"Processing task {task_id}") try: self._statuses[task_id] = TaskStatus.RUNNING self._save_state() loop = asyncio.get_event_loop() logger.debug(f"Running processor for task {task_id}") future = loop.run_in_executor( self._executor, self._run_processor_safe, task_meta ) result = await future logger.debug(f"Processor result: {result}") if isinstance(result, dict) and result.get("success"): self._statuses[task_id] = TaskStatus.COMPLETED self._tasks[task_id].update({ "output_path": result.get("output_path"), "output_bytes": result.get("output_bytes") }) logger.info(f"Task {task_id} completed successfully") else: self._statuses[task_id] = TaskStatus.FAILED logger.error(f"Task {task_id} failed: {result}") self._save_state() except Exception as e: logger.exception(f"Error processing task {task_id}") self._statuses[task_id] = TaskStatus.FAILED self._save_state() await asyncio.sleep(0.1) logger.info("🔚 Worker loop exiting cleanly.") def _run_processor_safe(self, task_meta: dict) -> dict: try: if not self._processor: logger.error("❌ No processor configured, cannot run task.") return {"success": False, "error": "No processor configured"} task_id = task_meta.get("task_id") logger.debug(f"🚧 Running processor for task {task_id}...") result = self._processor(task_meta) logger.debug(f"đŸŽ¯ Processor result for {task_id}: {result}") return result or {"success": False} except Exception as e: logger.exception(f"đŸ’Ĩ Processor crashed for task {task_meta.get('task_id')}: {e}") return {"success": False, "error": str(e)}