Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import uuid | |
| import time | |
| import threading | |
| import asyncio | |
| from typing import Dict, Optional | |
| # Note: job processing will call into async functions from main via asyncio.run | |
| JOBS_DIR = os.path.join(os.path.dirname(__file__), "jobs") | |
| os.makedirs(JOBS_DIR, exist_ok=True) | |
| class JobManager: | |
| """Simple persistent job queue and worker. | |
| Jobs are stored as JSON files in `jobs/` so they survive restarts. | |
| Worker runs in a dedicated thread and processes jobs sequentially. | |
| """ | |
| def __init__(self): | |
| self._lock = threading.Lock() | |
| self._stop_event = threading.Event() | |
| self._thread: Optional[threading.Thread] = None | |
| def start(self): | |
| if self._thread and self._thread.is_alive(): | |
| return | |
| self._thread = threading.Thread(target=self._run, daemon=True) | |
| self._thread.start() | |
| def stop(self): | |
| self._stop_event.set() | |
| if self._thread: | |
| self._thread.join(timeout=2) | |
| def _run(self): | |
| # Continuously look for queued jobs and process them | |
| while not self._stop_event.is_set(): | |
| job = self._pop_next_job() | |
| if job: | |
| try: | |
| self._process_job(job) | |
| except Exception as e: | |
| # Mark job failed | |
| job['status'] = 'failed' | |
| job['error'] = str(e) | |
| job['finished_at'] = time.time() | |
| self._save_job(job) | |
| else: | |
| time.sleep(0.5) | |
| def _job_path(self, job_id: str) -> str: | |
| return os.path.join(JOBS_DIR, f"{job_id}.json") | |
| def _save_job(self, job: Dict): | |
| with open(self._job_path(job['id']), 'w', encoding='utf-8') as f: | |
| json.dump(job, f, indent=2) | |
| def _load_job(self, job_id: str) -> Optional[Dict]: | |
| p = self._job_path(job_id) | |
| if not os.path.exists(p): | |
| return None | |
| with open(p, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| def _pop_next_job(self) -> Optional[Dict]: | |
| # Find oldest job with status 'queued' | |
| files = [f for f in os.listdir(JOBS_DIR) if f.endswith('.json')] | |
| files.sort() | |
| for fname in files: | |
| path = os.path.join(JOBS_DIR, fname) | |
| try: | |
| with open(path, 'r', encoding='utf-8') as f: | |
| job = json.load(f) | |
| if job.get('status') == 'queued': | |
| job['status'] = 'processing' | |
| job['started_at'] = time.time() | |
| self._save_job(job) | |
| return job | |
| except Exception: | |
| continue | |
| return None | |
| def create_job(self, user_id: str, filename: str, file_path: str, username: Optional[str] = None) -> Dict: | |
| job_id = uuid.uuid4().hex | |
| job = { | |
| 'id': job_id, | |
| 'user_id': user_id, | |
| 'username': username, | |
| 'filename': filename, | |
| 'file_path': file_path, | |
| 'status': 'queued', | |
| 'created_at': time.time() | |
| } | |
| self._save_job(job) | |
| return job | |
| def get_job(self, job_id: str) -> Optional[Dict]: | |
| return self._load_job(job_id) | |
| def _process_job(self, job: Dict): | |
| # Import heavy modules lazily to avoid startup cost | |
| from main import rag_manager, hf_storage | |
| job_id = job['id'] | |
| user_id = job['user_id'] | |
| filename = job['filename'] | |
| file_path = job['file_path'] | |
| try: | |
| # Read content | |
| with open(file_path, 'rb') as f: | |
| content = f.read() | |
| # Ensure RAG engine is ready (async call) | |
| engine = asyncio.run(rag_manager.get_engine(user_id)) | |
| # Run upload_document in thread to avoid blocking event loop | |
| result = asyncio.run(asyncio.to_thread(engine.upload_document, filename, content, 'auto')) | |
| if result.get('status') != 'success': | |
| job['status'] = 'failed' | |
| job['error'] = result.get('message', 'processing error') | |
| job['finished_at'] = time.time() | |
| self._save_job(job) | |
| return | |
| # Save pdf to user's uploaded_pdfs folder | |
| base_dir = os.path.dirname(os.path.abspath(__file__)) | |
| user_pdfs_dir = os.path.join(base_dir, 'users', user_id, 'uploaded_pdfs') | |
| os.makedirs(user_pdfs_dir, exist_ok=True) | |
| pdf_path = os.path.join(user_pdfs_dir, filename) | |
| with open(pdf_path, 'wb') as f: | |
| f.write(content) | |
| # Push to HF (perform in thread) | |
| asyncio.run(asyncio.to_thread(hf_storage.push_storage_to_hf, user_id, f"Upload {filename}")) | |
| job['status'] = 'success' | |
| job['result'] = { | |
| 'document_id': result.get('doc_id', ''), | |
| 'filename': filename, | |
| 'pages': result.get('pages'), | |
| 'chunks': result.get('chunks') | |
| } | |
| job['finished_at'] = time.time() | |
| self._save_job(job) | |
| except Exception as e: | |
| job['status'] = 'failed' | |
| job['error'] = str(e) | |
| job['finished_at'] = time.time() | |
| self._save_job(job) | |
| job_manager = JobManager() | |