Spaces:
Sleeping
Sleeping
File size: 5,331 Bytes
496eca5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 | import os
import json
import uuid
import time
import threading
import asyncio
from typing import Dict, Optional
# Note: job processing will call into async functions from main via asyncio.run
JOBS_DIR = os.path.join(os.path.dirname(__file__), "jobs")
os.makedirs(JOBS_DIR, exist_ok=True)
class JobManager:
"""Simple persistent job queue and worker.
Jobs are stored as JSON files in `jobs/` so they survive restarts.
Worker runs in a dedicated thread and processes jobs sequentially.
"""
def __init__(self):
self._lock = threading.Lock()
self._stop_event = threading.Event()
self._thread: Optional[threading.Thread] = None
def start(self):
if self._thread and self._thread.is_alive():
return
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def stop(self):
self._stop_event.set()
if self._thread:
self._thread.join(timeout=2)
def _run(self):
# Continuously look for queued jobs and process them
while not self._stop_event.is_set():
job = self._pop_next_job()
if job:
try:
self._process_job(job)
except Exception as e:
# Mark job failed
job['status'] = 'failed'
job['error'] = str(e)
job['finished_at'] = time.time()
self._save_job(job)
else:
time.sleep(0.5)
def _job_path(self, job_id: str) -> str:
return os.path.join(JOBS_DIR, f"{job_id}.json")
def _save_job(self, job: Dict):
with open(self._job_path(job['id']), 'w', encoding='utf-8') as f:
json.dump(job, f, indent=2)
def _load_job(self, job_id: str) -> Optional[Dict]:
p = self._job_path(job_id)
if not os.path.exists(p):
return None
with open(p, 'r', encoding='utf-8') as f:
return json.load(f)
def _pop_next_job(self) -> Optional[Dict]:
# Find oldest job with status 'queued'
files = [f for f in os.listdir(JOBS_DIR) if f.endswith('.json')]
files.sort()
for fname in files:
path = os.path.join(JOBS_DIR, fname)
try:
with open(path, 'r', encoding='utf-8') as f:
job = json.load(f)
if job.get('status') == 'queued':
job['status'] = 'processing'
job['started_at'] = time.time()
self._save_job(job)
return job
except Exception:
continue
return None
def create_job(self, user_id: str, filename: str, file_path: str, username: Optional[str] = None) -> Dict:
job_id = uuid.uuid4().hex
job = {
'id': job_id,
'user_id': user_id,
'username': username,
'filename': filename,
'file_path': file_path,
'status': 'queued',
'created_at': time.time()
}
self._save_job(job)
return job
def get_job(self, job_id: str) -> Optional[Dict]:
return self._load_job(job_id)
def _process_job(self, job: Dict):
# Import heavy modules lazily to avoid startup cost
from main import rag_manager, hf_storage
job_id = job['id']
user_id = job['user_id']
filename = job['filename']
file_path = job['file_path']
try:
# Read content
with open(file_path, 'rb') as f:
content = f.read()
# Ensure RAG engine is ready (async call)
engine = asyncio.run(rag_manager.get_engine(user_id))
# Run upload_document in thread to avoid blocking event loop
result = asyncio.run(asyncio.to_thread(engine.upload_document, filename, content, 'auto'))
if result.get('status') != 'success':
job['status'] = 'failed'
job['error'] = result.get('message', 'processing error')
job['finished_at'] = time.time()
self._save_job(job)
return
# Save pdf to user's uploaded_pdfs folder
base_dir = os.path.dirname(os.path.abspath(__file__))
user_pdfs_dir = os.path.join(base_dir, 'users', user_id, 'uploaded_pdfs')
os.makedirs(user_pdfs_dir, exist_ok=True)
pdf_path = os.path.join(user_pdfs_dir, filename)
with open(pdf_path, 'wb') as f:
f.write(content)
# Push to HF (perform in thread)
asyncio.run(asyncio.to_thread(hf_storage.push_storage_to_hf, user_id, f"Upload {filename}"))
job['status'] = 'success'
job['result'] = {
'document_id': result.get('doc_id', ''),
'filename': filename,
'pages': result.get('pages'),
'chunks': result.get('chunks')
}
job['finished_at'] = time.time()
self._save_job(job)
except Exception as e:
job['status'] = 'failed'
job['error'] = str(e)
job['finished_at'] = time.time()
self._save_job(job)
job_manager = JobManager()
|