Skier8402 commited on
Commit
4038942
·
verified ·
1 Parent(s): 41c6b37

Update app.py

Browse files

add status endpoint

Files changed (1) hide show
  1. app.py +140 -16
app.py CHANGED
@@ -118,6 +118,7 @@ import logging
118
  import sys
119
  from logging.handlers import RotatingFileHandler
120
  import asyncio
 
121
  from pydantic import BaseModel
122
 
123
  # Import core helpers from the existing codebase
@@ -200,6 +201,36 @@ app.add_middleware(
200
  SESSIONS = {}
201
 
202
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
203
  async def _background_process(session_id: str, tmpdir: str, paths: list):
204
  """Background worker to parse uploaded PDFs and build vectorstore/chain.
205
 
@@ -208,12 +239,32 @@ async def _background_process(session_id: str, tmpdir: str, paths: list):
208
  try:
209
  from generate_testset import get_documents_from_pdfs
210
 
 
 
 
 
 
 
 
211
  documents = await asyncio.to_thread(get_documents_from_pdfs, paths)
212
  if not documents:
213
- SESSIONS[session_id]["status"] = "failed"
214
- SESSIONS[session_id]["error"] = "No documents parsed from uploads"
 
 
 
 
 
215
  return
216
 
 
 
 
 
 
 
 
 
217
  texts, metadatas = get_text_chunks(
218
  [
219
  {
@@ -232,29 +283,56 @@ async def _background_process(session_id: str, tmpdir: str, paths: list):
232
  ]
233
  )
234
 
 
 
 
 
 
 
 
 
235
  vectorstore = await asyncio.to_thread(get_vectorstore, texts, metadatas)
236
 
 
 
 
 
 
 
 
237
  try:
238
  conversation = await asyncio.to_thread(get_conversation_chain, vectorstore)
239
  except Exception as e:
240
- SESSIONS[session_id]["status"] = "failed"
241
- SESSIONS[session_id]["error"] = f"Conversation creation failed: {e}"
 
 
 
 
 
242
  return
243
 
244
- SESSIONS[session_id].update(
245
- {
246
- "vectorstore": vectorstore,
247
- "conversation": conversation,
248
- "status": "ready",
249
- }
 
 
250
  )
251
  logger.info("Background processing complete for session %s", session_id)
252
  except Exception as e:
253
  logger.exception(
254
  "Background processing failed for session %s: %s", session_id, e
255
  )
256
- SESSIONS[session_id]["status"] = "failed"
257
- SESSIONS[session_id]["error"] = str(e)
 
 
 
 
 
258
  finally:
259
  try:
260
  shutil.rmtree(tmpdir)
@@ -267,6 +345,14 @@ def health():
267
  return {"ok": True}
268
 
269
 
 
 
 
 
 
 
 
 
270
  @app.post("/process")
271
  async def process(
272
  files: List[UploadFile] = File(...), api_key: str = Depends(verify_api_key)
@@ -285,8 +371,26 @@ async def process(
285
  paths = []
286
  # create session early and schedule background processing
287
  session_id = str(uuid.uuid4())
288
- SESSIONS[session_id] = {"status": "processing", "tmpdir": tmpdir}
 
 
 
 
 
 
 
 
 
 
 
 
289
  try:
 
 
 
 
 
 
290
  for f in files:
291
  dest = os.path.join(tmpdir, f.filename or f"upload-{uuid.uuid4()}.pdf")
292
  with open(dest, "wb") as out:
@@ -294,12 +398,29 @@ async def process(
294
  out.write(content)
295
  paths.append(dest)
296
 
 
 
 
 
 
 
 
 
297
  # schedule background processing to build index & chain
298
  asyncio.create_task(_background_process(session_id, tmpdir, paths))
299
  logger.info("Scheduled background processing for session %s", session_id)
300
- return {"session_id": session_id, "status": "processing"}
 
 
301
  except Exception:
302
  # on unexpected error, clean up tmpdir and re-raise
 
 
 
 
 
 
 
303
  try:
304
  shutil.rmtree(tmpdir)
305
  except Exception:
@@ -323,11 +444,14 @@ async def ask(
323
  if not sess:
324
  raise HTTPException(status_code=404, detail="session_id not found")
325
 
326
- # If processing not complete, return 202 Accepted with status
327
  status = sess.get("status")
 
 
 
328
  if status != "ready":
329
  return JSONResponse(
330
- status_code=202, content={"session_id": session_id, "status": status}
331
  )
332
 
333
  conv = sess.get("conversation")
 
118
  import sys
119
  from logging.handlers import RotatingFileHandler
120
  import asyncio
121
+ from datetime import datetime, timezone
122
  from pydantic import BaseModel
123
 
124
  # Import core helpers from the existing codebase
 
201
  SESSIONS = {}
202
 
203
 
204
+ def _utc_now_iso() -> str:
205
+ return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
206
+
207
+
208
+ def _build_status_payload(session_id: str, sess: dict) -> dict:
209
+ return {
210
+ "session_id": session_id,
211
+ "status": sess.get("status", "queued"),
212
+ "progress_pct": sess.get("progress_pct", 0),
213
+ "message": sess.get("message", ""),
214
+ "doc_count": sess.get("doc_count"),
215
+ "chunk_count": sess.get("chunk_count"),
216
+ "started_at": sess.get("started_at"),
217
+ "updated_at": sess.get("updated_at"),
218
+ "completed_at": sess.get("completed_at"),
219
+ "error": sess.get("error"),
220
+ }
221
+
222
+
223
+ def _update_session(session_id: str, **updates):
224
+ sess = SESSIONS.get(session_id)
225
+ if not sess:
226
+ return
227
+ now = _utc_now_iso()
228
+ updates["updated_at"] = now
229
+ if updates.get("status") in {"ready", "failed"} and not updates.get("completed_at"):
230
+ updates["completed_at"] = now
231
+ sess.update(updates)
232
+
233
+
234
  async def _background_process(session_id: str, tmpdir: str, paths: list):
235
  """Background worker to parse uploaded PDFs and build vectorstore/chain.
236
 
 
239
  try:
240
  from generate_testset import get_documents_from_pdfs
241
 
242
+ _update_session(
243
+ session_id,
244
+ status="parsing",
245
+ progress_pct=10,
246
+ message="Parsing uploaded PDFs",
247
+ error=None,
248
+ )
249
  documents = await asyncio.to_thread(get_documents_from_pdfs, paths)
250
  if not documents:
251
+ _update_session(
252
+ session_id,
253
+ status="failed",
254
+ progress_pct=100,
255
+ message="Parsing failed: no documents parsed",
256
+ error="No documents parsed from uploads",
257
+ )
258
  return
259
 
260
+ _update_session(
261
+ session_id,
262
+ status="chunking",
263
+ progress_pct=35,
264
+ message="Chunking parsed text",
265
+ doc_count=len(documents),
266
+ )
267
+
268
  texts, metadatas = get_text_chunks(
269
  [
270
  {
 
283
  ]
284
  )
285
 
286
+ _update_session(
287
+ session_id,
288
+ status="indexing",
289
+ progress_pct=60,
290
+ message="Building vector store",
291
+ chunk_count=len(texts),
292
+ )
293
+
294
  vectorstore = await asyncio.to_thread(get_vectorstore, texts, metadatas)
295
 
296
+ _update_session(
297
+ session_id,
298
+ status="building_chain",
299
+ progress_pct=85,
300
+ message="Building conversation chain",
301
+ )
302
+
303
  try:
304
  conversation = await asyncio.to_thread(get_conversation_chain, vectorstore)
305
  except Exception as e:
306
+ _update_session(
307
+ session_id,
308
+ status="failed",
309
+ progress_pct=100,
310
+ message="Conversation chain creation failed",
311
+ error=f"Conversation creation failed: {e}",
312
+ )
313
  return
314
 
315
+ _update_session(
316
+ session_id,
317
+ vectorstore=vectorstore,
318
+ conversation=conversation,
319
+ status="ready",
320
+ progress_pct=100,
321
+ message="Session is ready for questions",
322
+ error=None,
323
  )
324
  logger.info("Background processing complete for session %s", session_id)
325
  except Exception as e:
326
  logger.exception(
327
  "Background processing failed for session %s: %s", session_id, e
328
  )
329
+ _update_session(
330
+ session_id,
331
+ status="failed",
332
+ progress_pct=100,
333
+ message="Background processing failed",
334
+ error=str(e),
335
+ )
336
  finally:
337
  try:
338
  shutil.rmtree(tmpdir)
 
345
  return {"ok": True}
346
 
347
 
348
+ @app.get("/status")
349
+ def get_status(session_id: str, api_key: str = Depends(verify_api_key)):
350
+ sess = SESSIONS.get(session_id)
351
+ if not sess:
352
+ raise HTTPException(status_code=404, detail="session_id not found")
353
+ return _build_status_payload(session_id, sess)
354
+
355
+
356
  @app.post("/process")
357
  async def process(
358
  files: List[UploadFile] = File(...), api_key: str = Depends(verify_api_key)
 
371
  paths = []
372
  # create session early and schedule background processing
373
  session_id = str(uuid.uuid4())
374
+ now = _utc_now_iso()
375
+ SESSIONS[session_id] = {
376
+ "status": "queued",
377
+ "progress_pct": 0,
378
+ "message": "Upload received and queued",
379
+ "started_at": now,
380
+ "updated_at": now,
381
+ "completed_at": None,
382
+ "error": None,
383
+ "doc_count": None,
384
+ "chunk_count": None,
385
+ "tmpdir": tmpdir,
386
+ }
387
  try:
388
+ _update_session(
389
+ session_id,
390
+ status="parsing",
391
+ progress_pct=5,
392
+ message="Saving uploaded files",
393
+ )
394
  for f in files:
395
  dest = os.path.join(tmpdir, f.filename or f"upload-{uuid.uuid4()}.pdf")
396
  with open(dest, "wb") as out:
 
398
  out.write(content)
399
  paths.append(dest)
400
 
401
+ _update_session(
402
+ session_id,
403
+ status="queued",
404
+ progress_pct=8,
405
+ message="Files saved; waiting for background processing",
406
+ doc_count=len(paths),
407
+ )
408
+
409
  # schedule background processing to build index & chain
410
  asyncio.create_task(_background_process(session_id, tmpdir, paths))
411
  logger.info("Scheduled background processing for session %s", session_id)
412
+ payload = _build_status_payload(session_id, SESSIONS[session_id])
413
+ payload["estimated_poll_interval_ms"] = 2500
414
+ return payload
415
  except Exception:
416
  # on unexpected error, clean up tmpdir and re-raise
417
+ _update_session(
418
+ session_id,
419
+ status="failed",
420
+ progress_pct=100,
421
+ message="Failed while saving uploads",
422
+ error="Unexpected error while handling uploaded files",
423
+ )
424
  try:
425
  shutil.rmtree(tmpdir)
426
  except Exception:
 
444
  if not sess:
445
  raise HTTPException(status_code=404, detail="session_id not found")
446
 
447
+ # If processing not complete, return 202 Accepted with full status payload
448
  status = sess.get("status")
449
+ if status == "failed":
450
+ payload = _build_status_payload(session_id, sess)
451
+ return JSONResponse(status_code=409, content=payload)
452
  if status != "ready":
453
  return JSONResponse(
454
+ status_code=202, content=_build_status_payload(session_id, sess)
455
  )
456
 
457
  conv = sess.get("conversation")