# queue_manager.py import os import threading import queue import time from typing import Dict, Any from models_job import JobStatus, JobResult UPLOAD_DIR = os.environ.get("UPLOAD_DIR", "/app/data/uploads") RESULTS_DIR = os.environ.get("RESULTS_DIR", "/app/data/results") class JobStore: """ Almacena estados y resultados en memoria. Para producción: sustituir por Redis / DB persistente si lo necesitas. """ def __init__(self): self.status: Dict[str, JobStatus] = {} self.result: Dict[str, JobResult] = {} self.lock = threading.Lock() def set_status(self, job_id: str, status: JobStatus): with self.lock: self.status[job_id] = status def get_status(self, job_id: str) -> JobStatus | None: with self.lock: return self.status.get(job_id) def set_result(self, job_id: str, result: JobResult): with self.lock: self.result[job_id] = result def get_result(self, job_id: str) -> JobResult | None: with self.lock: return self.result.get(job_id) job_store = JobStore() job_queue: "queue.Queue[Dict[str, Any]]" = queue.Queue() def worker_loop(process_fn): while True: job = job_queue.get() if job is None: break try: process_fn(job) except Exception as e: # Marca como failed st = job_store.get_status(job["job_id"]) if st: st.status = "failed" st.message = f"Error: {e}" st.progress = 0 job_store.set_status(job["job_id"], st) finally: job_queue.task_done() _worker_thread = None def start_worker(process_fn): global _worker_thread if _worker_thread is None or not _worker_thread.is_alive(): _worker_thread = threading.Thread(target=worker_loop, args=(process_fn,), daemon=True) _worker_thread.start()