File size: 2,014 Bytes
3d74263
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# queue_manager.py
import os
import threading
import queue
import time
from typing import Dict, Any
from models_job import JobStatus, JobResult

UPLOAD_DIR = os.environ.get("UPLOAD_DIR", "/app/data/uploads")
RESULTS_DIR = os.environ.get("RESULTS_DIR", "/app/data/results")

class JobStore:
    """

    Almacena estados y resultados en memoria.

    Para producción: sustituir por Redis / DB persistente si lo necesitas.

    """
    def __init__(self):
        self.status: Dict[str, JobStatus] = {}
        self.result: Dict[str, JobResult] = {}
        self.lock = threading.Lock()

    def set_status(self, job_id: str, status: JobStatus):
        with self.lock:
            self.status[job_id] = status

    def get_status(self, job_id: str) -> JobStatus | None:
        with self.lock:
            return self.status.get(job_id)

    def set_result(self, job_id: str, result: JobResult):
        with self.lock:
            self.result[job_id] = result

    def get_result(self, job_id: str) -> JobResult | None:
        with self.lock:
            return self.result.get(job_id)

job_store = JobStore()
job_queue: "queue.Queue[Dict[str, Any]]" = queue.Queue()

def worker_loop(process_fn):
    while True:
        job = job_queue.get()
        if job is None:
            break
        try:
            process_fn(job)
        except Exception as e:
            # Marca como failed
            st = job_store.get_status(job["job_id"])
            if st:
                st.status = "failed"
                st.message = f"Error: {e}"
                st.progress = 0
                job_store.set_status(job["job_id"], st)
        finally:
            job_queue.task_done()

_worker_thread = None

def start_worker(process_fn):
    global _worker_thread
    if _worker_thread is None or not _worker_thread.is_alive():
        _worker_thread = threading.Thread(target=worker_loop, args=(process_fn,), daemon=True)
        _worker_thread.start()