multi-pdf-rag-api / job_worker.py
Hamza4100's picture
Update job_worker.py
496eca5 verified
import os
import json
import uuid
import time
import threading
import asyncio
from typing import Dict, Optional
# Note: job processing will call into async functions from main via asyncio.run
JOBS_DIR = os.path.join(os.path.dirname(__file__), "jobs")
os.makedirs(JOBS_DIR, exist_ok=True)
class JobManager:
"""Simple persistent job queue and worker.
Jobs are stored as JSON files in `jobs/` so they survive restarts.
Worker runs in a dedicated thread and processes jobs sequentially.
"""
def __init__(self):
self._lock = threading.Lock()
self._stop_event = threading.Event()
self._thread: Optional[threading.Thread] = None
def start(self):
if self._thread and self._thread.is_alive():
return
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def stop(self):
self._stop_event.set()
if self._thread:
self._thread.join(timeout=2)
def _run(self):
# Continuously look for queued jobs and process them
while not self._stop_event.is_set():
job = self._pop_next_job()
if job:
try:
self._process_job(job)
except Exception as e:
# Mark job failed
job['status'] = 'failed'
job['error'] = str(e)
job['finished_at'] = time.time()
self._save_job(job)
else:
time.sleep(0.5)
def _job_path(self, job_id: str) -> str:
return os.path.join(JOBS_DIR, f"{job_id}.json")
def _save_job(self, job: Dict):
with open(self._job_path(job['id']), 'w', encoding='utf-8') as f:
json.dump(job, f, indent=2)
def _load_job(self, job_id: str) -> Optional[Dict]:
p = self._job_path(job_id)
if not os.path.exists(p):
return None
with open(p, 'r', encoding='utf-8') as f:
return json.load(f)
def _pop_next_job(self) -> Optional[Dict]:
# Find oldest job with status 'queued'
files = [f for f in os.listdir(JOBS_DIR) if f.endswith('.json')]
files.sort()
for fname in files:
path = os.path.join(JOBS_DIR, fname)
try:
with open(path, 'r', encoding='utf-8') as f:
job = json.load(f)
if job.get('status') == 'queued':
job['status'] = 'processing'
job['started_at'] = time.time()
self._save_job(job)
return job
except Exception:
continue
return None
def create_job(self, user_id: str, filename: str, file_path: str, username: Optional[str] = None) -> Dict:
job_id = uuid.uuid4().hex
job = {
'id': job_id,
'user_id': user_id,
'username': username,
'filename': filename,
'file_path': file_path,
'status': 'queued',
'created_at': time.time()
}
self._save_job(job)
return job
def get_job(self, job_id: str) -> Optional[Dict]:
return self._load_job(job_id)
def _process_job(self, job: Dict):
# Import heavy modules lazily to avoid startup cost
from main import rag_manager, hf_storage
job_id = job['id']
user_id = job['user_id']
filename = job['filename']
file_path = job['file_path']
try:
# Read content
with open(file_path, 'rb') as f:
content = f.read()
# Ensure RAG engine is ready (async call)
engine = asyncio.run(rag_manager.get_engine(user_id))
# Run upload_document in thread to avoid blocking event loop
result = asyncio.run(asyncio.to_thread(engine.upload_document, filename, content, 'auto'))
if result.get('status') != 'success':
job['status'] = 'failed'
job['error'] = result.get('message', 'processing error')
job['finished_at'] = time.time()
self._save_job(job)
return
# Save pdf to user's uploaded_pdfs folder
base_dir = os.path.dirname(os.path.abspath(__file__))
user_pdfs_dir = os.path.join(base_dir, 'users', user_id, 'uploaded_pdfs')
os.makedirs(user_pdfs_dir, exist_ok=True)
pdf_path = os.path.join(user_pdfs_dir, filename)
with open(pdf_path, 'wb') as f:
f.write(content)
# Push to HF (perform in thread)
asyncio.run(asyncio.to_thread(hf_storage.push_storage_to_hf, user_id, f"Upload {filename}"))
job['status'] = 'success'
job['result'] = {
'document_id': result.get('doc_id', ''),
'filename': filename,
'pages': result.get('pages'),
'chunks': result.get('chunks')
}
job['finished_at'] = time.time()
self._save_job(job)
except Exception as e:
job['status'] = 'failed'
job['error'] = str(e)
job['finished_at'] = time.time()
self._save_job(job)
job_manager = JobManager()