motiongraphics / task_queue.py
karthikeya1212's picture
Upload 26 files
7782338 verified
# # queue.py
# import os
# import time
# import pickle
# import asyncio
# import nest_asyncio
# from pathlib import Path
# from enum import Enum
# from collections import deque
# from concurrent.futures import ThreadPoolExecutor
# from typing import Callable, Optional
# # allow nested event loops in some environments
# nest_asyncio.apply()
# class TaskStatus(Enum):
# QUEUED = "queued"
# RUNNING = "running"
# COMPLETED = "completed"
# FAILED = "failed"
# CANCELLED = "cancelled"
# class TaskQueue:
# """
# File-backed persistent queue that stores tasks as metadata dicts.
# Uses ThreadPoolExecutor to run CPU-bound pipeline in worker threads.
# """
# def __init__(self, base_dir: Path | str, max_workers: int = 2):
# self.base_dir = Path(base_dir)
# self.base_dir.mkdir(parents=True, exist_ok=True)
# self._queue_file = self.base_dir / "queue.pkl"
# self._state_file = self.base_dir / "state.pkl"
# self._dq = deque() # stores metadata dicts
# self._tasks = {} # task_id -> metadata
# self._statuses = {} # task_id -> TaskStatus
# self._lock = asyncio.Lock()
# self._executor = ThreadPoolExecutor(max_workers=max_workers)
# self._worker_task: Optional[asyncio.Task] = None
# self._shutdown = False
# self._processor: Optional[Callable] = None # sync function to run per task
# self._load_state()
# # ------------------------
# # persistence
# # ------------------------
# def _save_state(self):
# try:
# tmp = self._state_file.with_suffix(".tmp")
# with tmp.open("wb") as f:
# pickle.dump({
# "queue": list(self._dq),
# "tasks": self._tasks,
# "statuses": self._statuses,
# }, f)
# tmp.replace(self._state_file)
# except Exception:
# # do not crash app on disk save error; log in real app
# pass
# def _load_state(self):
# if self._state_file.exists():
# try:
# with self._state_file.open("rb") as f:
# data = pickle.load(f)
# for item in data.get("queue", []):
# self._dq.append(item)
# self._tasks.update(data.get("tasks", {}))
# self._statuses.update(data.get("statuses", {}))
# except Exception:
# # if corrupted, start fresh
# self._dq = deque()
# self._tasks = {}
# self._statuses = {}
# # ------------------------
# # public API
# # ------------------------
# def enqueue(self, task_meta: dict):
# task_id = task_meta.get("task_id")
# if not task_id:
# raise ValueError("task_meta must contain 'task_id'")
# self._dq.append(task_meta)
# self._tasks[task_id] = task_meta
# self._statuses[task_id] = TaskStatus.QUEUED
# self._save_state()
# def get_status(self, task_id: str):
# return self._statuses.get(task_id)
# def get_task_info(self, task_id: str):
# return self._tasks.get(task_id)
# def remove_task(self, task_id: str):
# # Remove from tasks and statuses; queue items will be filtered by worker
# self._tasks.pop(task_id, None)
# self._statuses.pop(task_id, None)
# self._save_state()
# # ------------------------
# # lifecycle
# # ------------------------
# async def start(self, processor: Callable):
# """Start the background worker loop. processor should be a sync function accept task_meta."""
# if self._worker_task:
# return
# self._processor = processor
# self._shutdown = False
# loop = asyncio.get_event_loop()
# self._worker_task = loop.create_task(self._worker_loop())
# async def stop(self):
# self._shutdown = True
# if self._worker_task:
# await self._worker_task
# self._worker_task = None
# self._executor.shutdown(wait=True)
# self._save_state()
# # ------------------------
# # worker loop
# # ------------------------
# async def _worker_loop(self):
# logger.info("πŸ”„ Worker loop started")
# while not self._shutdown:
# if not self._dq:
# await asyncio.sleep(0.5)
# continue
# task_meta = self._dq.popleft()
# task_id = task_meta.get("task_id")
# logger.info(f"Processing task {task_id}")
# try:
# self._statuses[task_id] = TaskStatus.RUNNING
# self._save_state()
# loop = asyncio.get_event_loop()
# logger.debug(f"Running processor for task {task_id}")
# future = loop.run_in_executor(
# self._executor,
# self._run_processor_safe,
# task_meta
# )
# result = await future
# logger.debug(f"Processor result: {result}")
# if isinstance(result, dict) and result.get("success"):
# self._statuses[task_id] = TaskStatus.COMPLETED
# self._tasks[task_id].update({
# "output_path": result.get("output_path"),
# "output_bytes": result.get("output_bytes")
# })
# logger.info(f"Task {task_id} completed successfully")
# else:
# self._statuses[task_id] = TaskStatus.FAILED
# logger.error(f"Task {task_id} failed: {result}")
# self._save_state()
# except Exception as e:
# logger.exception(f"Error processing task {task_id}")
# self._statuses[task_id] = TaskStatus.FAILED
# self._save_state()
# await asyncio.sleep(0.1)
# logger.info("πŸ”š Worker loop exiting cleanly.")
# def _run_processor_safe(self, task_meta: dict) -> dict:
# try:
# if not self._processor:
# logger.error("❌ No processor configured, cannot run task.")
# return {"success": False, "error": "No processor configured"}
# task_id = task_meta.get("task_id")
# logger.debug(f"🚧 Running processor for task {task_id}...")
# result = self._processor(task_meta)
# logger.debug(f"🎯 Processor result for {task_id}: {result}")
# return result or {"success": False}
# except Exception as e:
# logger.exception(f"πŸ’₯ Processor crashed for task {task_meta.get('task_id')}: {e}")
# return {"success": False, "error": str(e)}
# queue.py
import os
import time
import pickle
import asyncio
import nest_asyncio
from pathlib import Path
from enum import Enum
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from typing import Callable, Optional
# allow nested event loops in some environments
nest_asyncio.apply()
# --------------------------------------------------
# Logging setup
# --------------------------------------------------
import logging
logging.basicConfig(
level=logging.DEBUG,
format="🧩 [%(asctime)s] [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger("queue_system")
class TaskStatus(Enum):
QUEUED = "queued"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class TaskQueue:
"""
File-backed persistent queue that stores tasks as metadata dicts.
Uses ThreadPoolExecutor to run CPU-bound pipeline in worker threads.
"""
def __init__(self, base_dir: Path | str, max_workers: int = 2):
self.base_dir = Path(base_dir)
self.base_dir.mkdir(parents=True, exist_ok=True)
self._queue_file = self.base_dir / "queue.pkl"
self._state_file = self.base_dir / "state.pkl"
self._dq = deque()
self._tasks = {}
self._statuses = {}
self._lock = asyncio.Lock()
self._executor = ThreadPoolExecutor(max_workers=max_workers)
self._worker_task: Optional[asyncio.Task] = None
self._shutdown = False
self._processor: Optional[Callable] = None
logger.info(f"πŸš€ TaskQueue initialized | base_dir={self.base_dir} | workers={max_workers}")
self._load_state()
# ------------------------
# persistence
# ------------------------
def _save_state(self):
try:
tmp = self._state_file.with_suffix(".tmp")
with tmp.open("wb") as f:
pickle.dump({
"queue": list(self._dq),
"tasks": self._tasks,
"statuses": self._statuses,
}, f)
tmp.replace(self._state_file)
logger.debug(f"πŸ’Ύ Queue state saved | queued={len(self._dq)} tasks")
except Exception as e:
logger.warning(f"⚠️ Failed to save state: {e}")
def _load_state(self):
if self._state_file.exists():
try:
with self._state_file.open("rb") as f:
data = pickle.load(f)
for item in data.get("queue", []):
self._dq.append(item)
self._tasks.update(data.get("tasks", {}))
self._statuses.update(data.get("statuses", {}))
logger.info(f"πŸ“‚ Loaded previous queue state | tasks={len(self._tasks)}")
except Exception as e:
logger.error(f"❌ Failed to load state file, starting fresh: {e}")
self._dq = deque()
self._tasks = {}
self._statuses = {}
# ------------------------
# public API
# ------------------------
def enqueue(self, task_meta: dict):
task_id = task_meta.get("task_id")
if not task_id:
raise ValueError("task_meta must contain 'task_id'")
self._dq.append(task_meta)
self._tasks[task_id] = task_meta
self._statuses[task_id] = TaskStatus.QUEUED
self._save_state()
logger.info(f"πŸ†• Task enqueued | id={task_id} | total_queued={len(self._dq)}")
def get_status(self, task_id: str):
st = self._statuses.get(task_id)
logger.debug(f"πŸ“Š get_status({task_id}) β†’ {st}")
return st
def get_task_info(self, task_id: str):
info = self._tasks.get(task_id)
logger.debug(f"ℹ️ get_task_info({task_id}) β†’ {'found' if info else 'not found'}")
return info
def remove_task(self, task_id: str):
self._tasks.pop(task_id, None)
self._statuses.pop(task_id, None)
self._save_state()
logger.info(f"πŸ—‘ Task removed from system | id={task_id}")
# ------------------------
# lifecycle
# ------------------------
async def start(self, processor: Callable):
if self._worker_task:
logger.warning("⚠️ Queue worker already running, ignoring start request.")
return
self._processor = processor
self._shutdown = False
loop = asyncio.get_event_loop()
self._worker_task = loop.create_task(self._worker_loop())
logger.info("🏁 Background worker started successfully.")
async def stop(self):
logger.info("πŸ›‘ Stopping background worker...")
self._shutdown = True
if self._worker_task:
await self._worker_task
self._worker_task = None
self._executor.shutdown(wait=True)
self._save_state()
logger.info("βœ… Worker stopped and executor shut down cleanly.")
# ------------------------
# worker loop
# ------------------------
async def _worker_loop(self):
logger.info("πŸ”„ Worker loop started")
while not self._shutdown:
if not self._dq:
await asyncio.sleep(0.5)
continue
task_meta = self._dq.popleft()
task_id = task_meta.get("task_id")
logger.info(f"Processing task {task_id}")
try:
self._statuses[task_id] = TaskStatus.RUNNING
self._save_state()
loop = asyncio.get_event_loop()
logger.debug(f"Running processor for task {task_id}")
future = loop.run_in_executor(
self._executor,
self._run_processor_safe,
task_meta
)
result = await future
logger.debug(f"Processor result: {result}")
if isinstance(result, dict) and result.get("success"):
self._statuses[task_id] = TaskStatus.COMPLETED
self._tasks[task_id].update({
"output_path": result.get("output_path"),
"output_bytes": result.get("output_bytes")
})
logger.info(f"Task {task_id} completed successfully")
else:
self._statuses[task_id] = TaskStatus.FAILED
logger.error(f"Task {task_id} failed: {result}")
self._save_state()
except Exception as e:
logger.exception(f"Error processing task {task_id}")
self._statuses[task_id] = TaskStatus.FAILED
self._save_state()
await asyncio.sleep(0.1)
logger.info("πŸ”š Worker loop exiting cleanly.")
def _run_processor_safe(self, task_meta: dict) -> dict:
try:
if not self._processor:
logger.error("❌ No processor configured, cannot run task.")
return {"success": False, "error": "No processor configured"}
task_id = task_meta.get("task_id")
logger.debug(f"🚧 Running processor for task {task_id}...")
result = self._processor(task_meta)
logger.debug(f"🎯 Processor result for {task_id}: {result}")
return result or {"success": False}
except Exception as e:
logger.exception(f"πŸ’₯ Processor crashed for task {task_meta.get('task_id')}: {e}")
return {"success": False, "error": str(e)}