File size: 5,331 Bytes
496eca5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import os
import json
import uuid
import time
import threading
import asyncio
from typing import Dict, Optional

# Note: job processing will call into async functions from main via asyncio.run

JOBS_DIR = os.path.join(os.path.dirname(__file__), "jobs")
os.makedirs(JOBS_DIR, exist_ok=True)


class JobManager:
    """Simple persistent job queue and worker.

    Jobs are stored as JSON files in `jobs/` so they survive restarts.
    Worker runs in a dedicated thread and processes jobs sequentially.
    """

    def __init__(self):
        self._lock = threading.Lock()
        self._stop_event = threading.Event()
        self._thread: Optional[threading.Thread] = None

    def start(self):
        if self._thread and self._thread.is_alive():
            return
        self._thread = threading.Thread(target=self._run, daemon=True)
        self._thread.start()

    def stop(self):
        self._stop_event.set()
        if self._thread:
            self._thread.join(timeout=2)

    def _run(self):
        # Continuously look for queued jobs and process them
        while not self._stop_event.is_set():
            job = self._pop_next_job()
            if job:
                try:
                    self._process_job(job)
                except Exception as e:
                    # Mark job failed
                    job['status'] = 'failed'
                    job['error'] = str(e)
                    job['finished_at'] = time.time()
                    self._save_job(job)
            else:
                time.sleep(0.5)

    def _job_path(self, job_id: str) -> str:
        return os.path.join(JOBS_DIR, f"{job_id}.json")

    def _save_job(self, job: Dict):
        with open(self._job_path(job['id']), 'w', encoding='utf-8') as f:
            json.dump(job, f, indent=2)

    def _load_job(self, job_id: str) -> Optional[Dict]:
        p = self._job_path(job_id)
        if not os.path.exists(p):
            return None
        with open(p, 'r', encoding='utf-8') as f:
            return json.load(f)

    def _pop_next_job(self) -> Optional[Dict]:
        # Find oldest job with status 'queued'
        files = [f for f in os.listdir(JOBS_DIR) if f.endswith('.json')]
        files.sort()
        for fname in files:
            path = os.path.join(JOBS_DIR, fname)
            try:
                with open(path, 'r', encoding='utf-8') as f:
                    job = json.load(f)
                if job.get('status') == 'queued':
                    job['status'] = 'processing'
                    job['started_at'] = time.time()
                    self._save_job(job)
                    return job
            except Exception:
                continue
        return None

    def create_job(self, user_id: str, filename: str, file_path: str, username: Optional[str] = None) -> Dict:
        job_id = uuid.uuid4().hex
        job = {
            'id': job_id,
            'user_id': user_id,
            'username': username,
            'filename': filename,
            'file_path': file_path,
            'status': 'queued',
            'created_at': time.time()
        }
        self._save_job(job)
        return job

    def get_job(self, job_id: str) -> Optional[Dict]:
        return self._load_job(job_id)

    def _process_job(self, job: Dict):
        # Import heavy modules lazily to avoid startup cost
        from main import rag_manager, hf_storage

        job_id = job['id']
        user_id = job['user_id']
        filename = job['filename']
        file_path = job['file_path']

        try:
            # Read content
            with open(file_path, 'rb') as f:
                content = f.read()

            # Ensure RAG engine is ready (async call)
            engine = asyncio.run(rag_manager.get_engine(user_id))

            # Run upload_document in thread to avoid blocking event loop
            result = asyncio.run(asyncio.to_thread(engine.upload_document, filename, content, 'auto'))

            if result.get('status') != 'success':
                job['status'] = 'failed'
                job['error'] = result.get('message', 'processing error')
                job['finished_at'] = time.time()
                self._save_job(job)
                return

            # Save pdf to user's uploaded_pdfs folder
            base_dir = os.path.dirname(os.path.abspath(__file__))
            user_pdfs_dir = os.path.join(base_dir, 'users', user_id, 'uploaded_pdfs')
            os.makedirs(user_pdfs_dir, exist_ok=True)
            pdf_path = os.path.join(user_pdfs_dir, filename)
            with open(pdf_path, 'wb') as f:
                f.write(content)

            # Push to HF (perform in thread)
            asyncio.run(asyncio.to_thread(hf_storage.push_storage_to_hf, user_id, f"Upload {filename}"))

            job['status'] = 'success'
            job['result'] = {
                'document_id': result.get('doc_id', ''),
                'filename': filename,
                'pages': result.get('pages'),
                'chunks': result.get('chunks')
            }
            job['finished_at'] = time.time()
            self._save_job(job)

        except Exception as e:
            job['status'] = 'failed'
            job['error'] = str(e)
            job['finished_at'] = time.time()
            self._save_job(job)


job_manager = JobManager()