import os import json import uuid import time import threading import asyncio from typing import Dict, Optional # Note: job processing will call into async functions from main via asyncio.run JOBS_DIR = os.path.join(os.path.dirname(__file__), "jobs") os.makedirs(JOBS_DIR, exist_ok=True) class JobManager: """Simple persistent job queue and worker. Jobs are stored as JSON files in `jobs/` so they survive restarts. Worker runs in a dedicated thread and processes jobs sequentially. """ def __init__(self): self._lock = threading.Lock() self._stop_event = threading.Event() self._thread: Optional[threading.Thread] = None def start(self): if self._thread and self._thread.is_alive(): return self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() def stop(self): self._stop_event.set() if self._thread: self._thread.join(timeout=2) def _run(self): # Continuously look for queued jobs and process them while not self._stop_event.is_set(): job = self._pop_next_job() if job: try: self._process_job(job) except Exception as e: # Mark job failed job['status'] = 'failed' job['error'] = str(e) job['finished_at'] = time.time() self._save_job(job) else: time.sleep(0.5) def _job_path(self, job_id: str) -> str: return os.path.join(JOBS_DIR, f"{job_id}.json") def _save_job(self, job: Dict): with open(self._job_path(job['id']), 'w', encoding='utf-8') as f: json.dump(job, f, indent=2) def _load_job(self, job_id: str) -> Optional[Dict]: p = self._job_path(job_id) if not os.path.exists(p): return None with open(p, 'r', encoding='utf-8') as f: return json.load(f) def _pop_next_job(self) -> Optional[Dict]: # Find oldest job with status 'queued' files = [f for f in os.listdir(JOBS_DIR) if f.endswith('.json')] files.sort() for fname in files: path = os.path.join(JOBS_DIR, fname) try: with open(path, 'r', encoding='utf-8') as f: job = json.load(f) if job.get('status') == 'queued': job['status'] = 'processing' job['started_at'] = time.time() self._save_job(job) return job except Exception: continue return None def create_job(self, user_id: str, filename: str, file_path: str, username: Optional[str] = None) -> Dict: job_id = uuid.uuid4().hex job = { 'id': job_id, 'user_id': user_id, 'username': username, 'filename': filename, 'file_path': file_path, 'status': 'queued', 'created_at': time.time() } self._save_job(job) return job def get_job(self, job_id: str) -> Optional[Dict]: return self._load_job(job_id) def _process_job(self, job: Dict): # Import heavy modules lazily to avoid startup cost from main import rag_manager, hf_storage job_id = job['id'] user_id = job['user_id'] filename = job['filename'] file_path = job['file_path'] try: # Read content with open(file_path, 'rb') as f: content = f.read() # Ensure RAG engine is ready (async call) engine = asyncio.run(rag_manager.get_engine(user_id)) # Run upload_document in thread to avoid blocking event loop result = asyncio.run(asyncio.to_thread(engine.upload_document, filename, content, 'auto')) if result.get('status') != 'success': job['status'] = 'failed' job['error'] = result.get('message', 'processing error') job['finished_at'] = time.time() self._save_job(job) return # Save pdf to user's uploaded_pdfs folder base_dir = os.path.dirname(os.path.abspath(__file__)) user_pdfs_dir = os.path.join(base_dir, 'users', user_id, 'uploaded_pdfs') os.makedirs(user_pdfs_dir, exist_ok=True) pdf_path = os.path.join(user_pdfs_dir, filename) with open(pdf_path, 'wb') as f: f.write(content) # Push to HF (perform in thread) asyncio.run(asyncio.to_thread(hf_storage.push_storage_to_hf, user_id, f"Upload {filename}")) job['status'] = 'success' job['result'] = { 'document_id': result.get('doc_id', ''), 'filename': filename, 'pages': result.get('pages'), 'chunks': result.get('chunks') } job['finished_at'] = time.time() self._save_job(job) except Exception as e: job['status'] = 'failed' job['error'] = str(e) job['finished_at'] = time.time() self._save_job(job) job_manager = JobManager()