Um34ER commited on
Commit
a97fa9d
Β·
verified Β·
1 Parent(s): 46b529d

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +106 -43
app.py CHANGED
@@ -887,6 +887,11 @@ ocr_engine = SmartOCR()
887
  semaphore = asyncio.Semaphore(CONCURRENCY_LIMIT)
888
  result_cache: Dict[str, dict] = {} # hash β†’ {result, timestamp}
889
 
 
 
 
 
 
890
  # ── FastAPI App ───────────────────────────────────────────────────────────────
891
  from contextlib import asynccontextmanager
892
 
@@ -940,7 +945,6 @@ def _cache_get(h: str) -> dict | None:
940
 
941
 
942
  def _cache_put(h: str, result: dict):
943
- # Evict oldest if over limit
944
  if len(result_cache) >= CACHE_SIZE:
945
  oldest_key = min(result_cache, key=lambda k: result_cache[k]["ts"])
946
  del result_cache[oldest_key]
@@ -950,19 +954,18 @@ def _cache_put(h: str, result: dict):
950
  # ── Image Loading ─────────────────────────────────────────────────────────────
951
 
952
  def load_image(raw_bytes: bytes) -> np.ndarray:
953
- """Load image bytes β†’ RGB numpy array, with size validation."""
954
  size_mb = len(raw_bytes) / (1024 * 1024)
955
  if size_mb > MAX_IMAGE_SIZE_MB:
956
  raise ValueError(f"Image too large: {size_mb:.1f} MB (max {MAX_IMAGE_SIZE_MB})")
957
-
958
  pil = Image.open(io.BytesIO(raw_bytes)).convert("RGB")
959
  return np.array(pil)
960
 
961
 
962
- # ── Core Processing ──────────────────────────────────────────────────────────
963
 
964
  def process_image(rgb: np.ndarray) -> Dict[str, Any]:
965
- """Full pipeline: preprocess β†’ VLM β†’ brain β†’ structured JSON."""
966
  t0 = time.time()
967
 
968
  # Step 1: Image quality analysis
@@ -978,95 +981,155 @@ def process_image(rgb: np.ndarray) -> Dict[str, Any]:
978
  logger.info("VLM raw output (%d chars)", len(raw_text))
979
 
980
  if not raw_text.strip():
981
- # Last resort: try with fully enhanced (binarized) image
982
  logger.info("Retrying with binarized image...")
983
  enhanced_rgb = enhance(rgb)
984
  pil_enhanced = Image.fromarray(enhanced_rgb)
985
  raw_text = ocr_engine.extract_text(pil_enhanced)
986
 
987
- # Step 4: Brain β€” try JSON parse first, then regex
988
  result = try_parse_json_response(raw_text)
989
  if not result:
990
  result = process_raw_text(raw_text)
991
 
992
  # Step 5: Enrich with metadata
993
  result["processing_time_ms"] = int((time.time() - t0) * 1000)
994
- result["raw_text"] = raw_text[:500] # truncated for debug
995
  result["image_quality"] = quality
996
  result["engine"] = ocr_engine.health_check()
997
 
998
  return result
999
 
1000
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1001
  # ── Endpoints ─────────────────────────────────────────────────────────────────
1002
 
1003
  @app.post("/process-parchi")
1004
  async def process_parchi(image: UploadFile = File(...)):
1005
- """Process a parchi image and return structured JSON."""
1006
- request_id = str(uuid.uuid4())[:8]
1007
- logger.info("[%s] Processing: %s (%s)", request_id, image.filename, image.content_type)
 
 
 
 
 
 
 
 
1008
 
1009
  try:
1010
  raw_bytes = await image.read()
1011
  except Exception as e:
1012
  raise HTTPException(400, f"Failed to read file: {e}")
1013
 
1014
- # Cache check
1015
  img_hash = _image_hash(raw_bytes)
1016
  cached = _cache_get(img_hash)
1017
  if cached:
1018
- logger.info("[%s] Cache hit: %s", request_id, img_hash)
1019
- cached["request_id"] = request_id
1020
  cached["cached"] = True
 
1021
  return JSONResponse(cached)
1022
 
1023
- # Concurrency gate
1024
- async with semaphore:
1025
- try:
1026
- rgb = load_image(raw_bytes)
1027
- except ValueError as e:
1028
- raise HTTPException(400, str(e))
1029
- except Exception as e:
1030
- raise HTTPException(400, f"Invalid image: {e}")
1031
 
1032
- # Run in thread pool (VLM inference is blocking)
1033
- loop = asyncio.get_event_loop()
1034
- try:
1035
- result = await loop.run_in_executor(None, process_image, rgb)
1036
- except Exception as e:
1037
- logger.exception("[%s] Processing failed", request_id)
1038
- raise HTTPException(500, f"Processing error: {e}")
1039
- finally:
1040
- gc.collect()
1041
 
1042
- result["request_id"] = request_id
1043
- result["success"] = bool(result.get("items"))
1044
- result["cached"] = False
1045
 
1046
- # Cache result
1047
- _cache_put(img_hash, result)
 
 
 
 
 
1048
 
1049
- return JSONResponse(result)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1050
 
1051
 
1052
  @app.get("/health")
1053
  async def health():
1054
- """Health check endpoint with engine status."""
 
1055
  return {
1056
  "status": "healthy",
1057
- "version": "7.0.0",
1058
- "architecture": "Local Hybrid (Qaari + GOT-OCR)",
1059
  "engine": ocr_engine.health_check(),
1060
  "cache_entries": len(result_cache),
 
 
1061
  }
1062
 
1063
 
1064
  @app.get("/")
1065
  async def root():
1066
- """Root endpoint β€” redirect info."""
1067
  return {
1068
- "service": "Smart Parchi OCR v7",
1069
  "docs": "/docs",
1070
  "health": "/health",
1071
- "endpoint": "POST /process-parchi",
 
1072
  }
 
 
887
  semaphore = asyncio.Semaphore(CONCURRENCY_LIMIT)
888
  result_cache: Dict[str, dict] = {} # hash β†’ {result, timestamp}
889
 
890
+ # ── Async Job Store (bypasses HF platform HTTP timeout) ──────────────────────────
891
+ # Jobs older than JOB_TTL seconds are pruned automatically
892
+ JOB_TTL = 3600 # 1 hour
893
+ job_store: Dict[str, dict] = {} # job_id β†’ {status, result, ts, error}
894
+
895
  # ── FastAPI App ───────────────────────────────────────────────────────────────
896
  from contextlib import asynccontextmanager
897
 
 
945
 
946
 
947
  def _cache_put(h: str, result: dict):
 
948
  if len(result_cache) >= CACHE_SIZE:
949
  oldest_key = min(result_cache, key=lambda k: result_cache[k]["ts"])
950
  del result_cache[oldest_key]
 
954
  # ── Image Loading ─────────────────────────────────────────────────────────────
955
 
956
  def load_image(raw_bytes: bytes) -> np.ndarray:
957
+ """Load image bytes -> RGB numpy array, with size validation."""
958
  size_mb = len(raw_bytes) / (1024 * 1024)
959
  if size_mb > MAX_IMAGE_SIZE_MB:
960
  raise ValueError(f"Image too large: {size_mb:.1f} MB (max {MAX_IMAGE_SIZE_MB})")
 
961
  pil = Image.open(io.BytesIO(raw_bytes)).convert("RGB")
962
  return np.array(pil)
963
 
964
 
965
+ # ── Core Processing ───────────────────────────────────────────────────────────
966
 
967
  def process_image(rgb: np.ndarray) -> Dict[str, Any]:
968
+ """Full pipeline: preprocess -> VLM -> brain -> structured JSON."""
969
  t0 = time.time()
970
 
971
  # Step 1: Image quality analysis
 
981
  logger.info("VLM raw output (%d chars)", len(raw_text))
982
 
983
  if not raw_text.strip():
 
984
  logger.info("Retrying with binarized image...")
985
  enhanced_rgb = enhance(rgb)
986
  pil_enhanced = Image.fromarray(enhanced_rgb)
987
  raw_text = ocr_engine.extract_text(pil_enhanced)
988
 
989
+ # Step 4: Brain -- try JSON parse first, then regex
990
  result = try_parse_json_response(raw_text)
991
  if not result:
992
  result = process_raw_text(raw_text)
993
 
994
  # Step 5: Enrich with metadata
995
  result["processing_time_ms"] = int((time.time() - t0) * 1000)
996
+ result["raw_text"] = raw_text[:500]
997
  result["image_quality"] = quality
998
  result["engine"] = ocr_engine.health_check()
999
 
1000
  return result
1001
 
1002
 
1003
+ # ── Background OCR Worker (Async Job Queue) ───────────────────────────────────
1004
+
1005
+ def _run_ocr_job(job_id: str, raw_bytes: bytes, img_hash: str):
1006
+ """Blocking OCR function executed in a thread-pool worker."""
1007
+ try:
1008
+ job_store[job_id]["status"] = "processing"
1009
+ rgb = load_image(raw_bytes)
1010
+ result = process_image(rgb)
1011
+ result["job_id"] = job_id
1012
+ result["success"] = bool(result.get("items"))
1013
+ result["cached"] = False
1014
+ _cache_put(img_hash, result)
1015
+ job_store[job_id].update({"status": "done", "result": result})
1016
+ elapsed = time.time() - job_store[job_id]["ts"]
1017
+ logger.info("[%s] Job completed in %.1fs", job_id, elapsed)
1018
+ except Exception as e:
1019
+ logger.exception("[%s] Job failed", job_id)
1020
+ job_store[job_id].update({"status": "error", "error": str(e)})
1021
+ finally:
1022
+ gc.collect()
1023
+
1024
+
1025
  # ── Endpoints ─────────────────────────────────────────────────────────────────
1026
 
1027
  @app.post("/process-parchi")
1028
  async def process_parchi(image: UploadFile = File(...)):
1029
+ """
1030
+ Submit a parchi image for OCR processing.
1031
+
1032
+ Returns immediately with a job_id (typically <1s).
1033
+ Poll GET /result/{job_id} every 10s until status == 'done'.
1034
+
1035
+ This async pattern is required because CPU inference takes 2-4 minutes,
1036
+ which exceeds the HF platform HTTP timeout (~60s).
1037
+ """
1038
+ job_id = str(uuid.uuid4())[:12]
1039
+ logger.info("[%s] Received: %s (%s)", job_id, image.filename, image.content_type)
1040
 
1041
  try:
1042
  raw_bytes = await image.read()
1043
  except Exception as e:
1044
  raise HTTPException(400, f"Failed to read file: {e}")
1045
 
1046
+ # Cache hit -- return result immediately without spawning a job
1047
  img_hash = _image_hash(raw_bytes)
1048
  cached = _cache_get(img_hash)
1049
  if cached:
1050
+ logger.info("[%s] Cache hit -- returning immediately", job_id)
1051
+ cached["job_id"] = job_id
1052
  cached["cached"] = True
1053
+ cached["status"] = "done"
1054
  return JSONResponse(cached)
1055
 
1056
+ # Validate image before queuing
1057
+ try:
1058
+ load_image(raw_bytes)
1059
+ except ValueError as e:
1060
+ raise HTTPException(400, str(e))
1061
+ except Exception as e:
1062
+ raise HTTPException(400, f"Invalid image: {e}")
 
1063
 
1064
+ # Register job and prune stale ones
1065
+ job_store[job_id] = {"status": "queued", "ts": time.time(), "result": None, "error": None}
1066
+ now = time.time()
1067
+ stale = [k for k, v in job_store.items() if now - v["ts"] > JOB_TTL]
1068
+ for k in stale:
1069
+ del job_store[k]
 
 
 
1070
 
1071
+ # Submit to thread pool (non-blocking -- returns immediately)
1072
+ loop = asyncio.get_event_loop()
1073
+ loop.run_in_executor(None, _run_ocr_job, job_id, raw_bytes, img_hash)
1074
 
1075
+ logger.info("[%s] Job queued -- returning job_id immediately", job_id)
1076
+ return JSONResponse({
1077
+ "job_id": job_id,
1078
+ "status": "queued",
1079
+ "poll_url": f"/result/{job_id}",
1080
+ "message": "Image accepted. Poll /result/{job_id} every 10s until status=done.",
1081
+ })
1082
 
1083
+
1084
+ @app.get("/result/{job_id}")
1085
+ async def get_result(job_id: str):
1086
+ """
1087
+ Poll for OCR job result.
1088
+
1089
+ Returns:
1090
+ status=queued|processing : not ready yet, poll again in 10s
1091
+ status=done : result field contains the structured parchi JSON
1092
+ status=error : error field contains the failure message
1093
+ """
1094
+ job = job_store.get(job_id)
1095
+ if not job:
1096
+ raise HTTPException(404, f"Job '{job_id}' not found. It may have expired (TTL=1h).")
1097
+
1098
+ response: Dict[str, Any] = {"job_id": job_id, "status": job["status"]}
1099
+ if job["status"] == "done":
1100
+ response.update(job["result"] or {})
1101
+ elif job["status"] == "error":
1102
+ response["error"] = job["error"]
1103
+ else:
1104
+ response["elapsed_seconds"] = int(time.time() - job["ts"])
1105
+ response["message"] = "Job is processing. Poll again in 10 seconds."
1106
+
1107
+ return JSONResponse(response)
1108
 
1109
 
1110
  @app.get("/health")
1111
  async def health():
1112
+ """Health check with engine and queue status."""
1113
+ active = sum(1 for j in job_store.values() if j["status"] in ("queued", "processing"))
1114
  return {
1115
  "status": "healthy",
1116
+ "version": "7.1.0",
1117
+ "architecture": "Local Hybrid (Qaari + GOT-OCR) -- Async Job Queue",
1118
  "engine": ocr_engine.health_check(),
1119
  "cache_entries": len(result_cache),
1120
+ "active_jobs": active,
1121
+ "total_jobs": len(job_store),
1122
  }
1123
 
1124
 
1125
  @app.get("/")
1126
  async def root():
1127
+ """Root endpoint."""
1128
  return {
1129
+ "service": "Smart Parchi OCR v7.1",
1130
  "docs": "/docs",
1131
  "health": "/health",
1132
+ "submit": "POST /process-parchi -> {job_id, status: queued}",
1133
+ "poll": "GET /result/{job_id} -> {status, result (when done)}",
1134
  }
1135
+