sarveshpatel commited on
Commit
a4a5d95
Β·
verified Β·
1 Parent(s): bcb99a3

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +161 -104
app.py CHANGED
@@ -1,134 +1,191 @@
1
  import os
2
- import asyncio
3
  import time
4
- import io
5
  import gc
 
6
  from concurrent.futures import ProcessPoolExecutor
7
- from typing import List
8
- import numpy as np
9
- from PIL import Image
10
- from fastapi import FastAPI, UploadFile, File, HTTPException
11
  from fastapi.responses import JSONResponse
12
- from paddleocr import PaddleOCR
13
 
14
- # --- CPU-ONLY CONFIGURATION ---
15
- os.environ["CUDA_VISIBLE_DEVICES"] = ""
16
- os.environ["PADDLE_HOME"] = "/app/paddle_home"
17
- os.makedirs("/app/paddle_home", exist_ok=True)
 
 
18
 
19
- # --- WORKER INITIALIZATION ---
20
- def init_worker():
21
- """Initialize OCR engine once per worker process"""
22
- global worker_ocr
23
- # Let PaddleOCR auto-select Marathi models - cleanest approach
24
- worker_ocr = PaddleOCR(
25
- lang="mr",
26
- use_gpu=False,
27
- enable_mkldnn=True, # 2-3x speedup on modern CPUs
28
- cpu_threads=2, # Threads per worker (tune 1-3)
29
- )
30
 
31
- # --- RESOURCE MANAGEMENT ---
32
- MAX_WORKERS = min(4, max(1, os.cpu_count() - 1))
33
- CPU_EXECUTOR = ProcessPoolExecutor(max_workers=MAX_WORKERS, initializer=init_worker)
34
 
35
- # --- FASTAPI APP ---
36
- app = FastAPI(title="CPU-Optimized OCR API", version="1.0.0")
 
 
 
 
 
37
 
38
- @app.middleware("http")
39
- async def add_metrics(request, call_next):
40
- """Add performance metrics to response headers"""
41
- start = time.perf_counter()
42
- response = await call_next(request)
43
- response.headers["X-Process-Time"] = f"{time.perf_counter() - start:.3f}s"
44
- return response
45
 
46
- # --- CORE PROCESSING ---
47
- def process_single_image(image_data: bytes) -> dict:
48
- """Process one image in isolated worker process"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49
  try:
50
- # Zero-copy image loading from memory
51
- img = Image.open(io.BytesIO(image_data)).convert("RGB")
52
- np_img = np.array(img)
53
-
54
- # Run OCR
55
- ocr_result = worker_ocr.predict(input=np_img)
56
-
57
- # Parse results
58
- results = [
59
- {"text": text, "confidence": float(score)}
60
- for block in ocr_result
61
- for text, score in zip(block.get("rec_texts", []), block.get("rec_scores", []))
62
- ]
63
 
64
- return {"success": True, "results": results}
 
 
 
 
 
 
 
 
 
65
  except Exception as e:
66
- return {"success": False, "error": str(e)}
 
 
 
 
 
 
67
  finally:
68
- # Lightweight cleanup
69
- gc.collect()
 
 
 
 
 
 
 
 
 
 
 
 
 
70
 
 
 
 
71
  @app.post("/ocr")
72
- async def ocr_endpoint(files: List[UploadFile] = File(...)):
73
- """Process up to 10 images in parallel"""
74
- if len(files) > 10:
75
- raise HTTPException(status_code=400, detail="Maximum 10 files allowed.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
76
 
77
- # --- VALIDATION & LOAD ---
78
- file_data = []
79
- for file in files:
80
- # Check format
81
- ext = file.filename.split(".")[-1].lower()
82
- if ext not in ("jpg", "jpeg", "png"):
83
- raise HTTPException(status_code=400, detail=f"Unsupported: {file.filename}")
 
84
 
85
- # Read file (max 10MB per file)
86
- content = await file.read()
87
- if len(content) > 10 * 1024 * 1024:
88
- raise HTTPException(status_code=400, detail=f"File too large: {file.filename}")
89
 
90
- file_data.append(content)
91
-
92
- # --- PARALLEL PROCESSING ---
93
- loop = asyncio.get_event_loop()
94
- tasks = [
95
- loop.run_in_executor(CPU_EXECUTOR, process_single_image, data)
96
- for data in file_data
97
- ]
98
 
99
- results = await asyncio.gather(*tasks)
100
 
101
- # --- BUILD RESPONSE ---
102
- structured_output = {"files": []}
103
- for idx, result in enumerate(results):
104
- file_info = {
105
- "file_id": f"file_{idx + 1}",
106
- "filename": files[idx].filename,
 
 
107
  }
108
- if result["success"]:
109
- file_info["pages"] = [{"page_index": 0, "results": result["results"]}]
110
- else:
111
- file_info["error"] = result["error"]
112
- structured_output["files"].append(file_info)
113
 
114
- # Final cleanup
115
- del file_data
116
  gc.collect()
117
-
118
- return JSONResponse(structured_output)
119
 
120
- @app.get("/health")
121
- async def health_check():
122
- """System status endpoint"""
 
 
 
 
 
123
  return {
124
- "status": "healthy",
125
- "cpu_workers": MAX_WORKERS,
126
- "cpu_threads_per_worker": 2,
127
- "supported_formats": ["jpg", "jpeg", "png"],
128
- "max_file_size": "10MB",
 
 
129
  }
130
 
 
 
 
 
 
 
 
 
 
 
 
 
131
  @app.on_event("shutdown")
132
- async def shutdown_workers():
133
- """Graceful shutdown"""
134
- CPU_EXECUTOR.shutdown(wait=True)
 
1
  import os
2
+ import uuid
3
  import time
 
4
  import gc
5
+ import asyncio
6
  from concurrent.futures import ProcessPoolExecutor
7
+ from typing import List, Dict, Any
8
+ from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks
 
 
9
  from fastapi.responses import JSONResponse
10
+ import threading
11
 
12
+ # ───────────────────────────────────────────────────────────────────
13
+ # CONFIGURATION (2 vCPU + 16GB RAM OPTIMIZED)
14
+ # ───────────────────────────────────────────────────────────────────
15
+ MAX_WORKERS = 3 # 2 cores + 1 overlap
16
+ MAX_FILES = 15
17
+ CACHE_TTL = 3600
18
 
19
+ os.environ.update({
20
+ "PADDLE_HOME": "/app/paddle_home",
21
+ "XDG_CACHE_HOME": "/app/xdg_cache"
22
+ })
 
 
 
 
 
 
 
23
 
24
+ for path in ["/app/paddle_home", "/app/xdg_cache", "/app/uploads"]:
25
+ os.makedirs(path, exist_ok=True)
 
26
 
27
+ # ───────────────────────────────────────────────────────────────────
28
+ # GLOBAL STATE
29
+ # ───────────────────────────────────────────────────────────────────
30
+ executor = ProcessPoolExecutor(max_workers=MAX_WORKERS)
31
+ semaphore = asyncio.Semaphore(MAX_WORKERS)
32
+ active_count = 0
33
+ state_lock = threading.Lock()
34
 
35
+ app = FastAPI(title="Ultra-Fast Image OCR API", version="3.0")
 
 
 
 
 
 
36
 
37
+ # ───────────────────────────────────────────────────────────────────
38
+ # WORKER FUNCTION
39
+ # ───────────────────────────────────────────────────────────────────
40
+ def process_image(file_tuple: tuple) -> Dict[str, Any]:
41
+ """Process single image in isolated worker: (file_id, filepath)"""
42
+ # Lazy-load model once per worker process
43
+ if not hasattr(process_image, 'engine'):
44
+ from paddleocr import PaddleOCR
45
+ process_image.engine = PaddleOCR(
46
+ lang="mr",
47
+ text_recognition_model_name="devanagari_PP-OCRv5_mobile_rec",
48
+ use_doc_orientation_classify=False,
49
+ use_doc_unwarping=False,
50
+ use_textline_orientation=False
51
+ )
52
+
53
+ file_id, filepath = file_tuple
54
+
55
  try:
56
+ start = time.perf_counter()
57
+ result = process_image.engine.predict(filepath)
 
 
 
 
 
 
 
 
 
 
 
58
 
59
+ return {
60
+ "file_id": file_id,
61
+ "status": "success",
62
+ "processing_time_seconds": round(time.perf_counter() - start, 3),
63
+ "results": [
64
+ {"text": txt, "confidence": float(conf)}
65
+ for block in result
66
+ for txt, conf in zip(block["rec_texts"], block["rec_scores"])
67
+ ]
68
+ }
69
  except Exception as e:
70
+ return {
71
+ "file_id": file_id,
72
+ "status": "error",
73
+ "processing_time_seconds": 0,
74
+ "error": str(e),
75
+ "results": []
76
+ }
77
  finally:
78
+ os.remove(filepath)
79
+
80
+ # ───────────────────────────────────────────────────────────────────
81
+ # BACKGROUND MAINTENANCE
82
+ # ───────────────────────────────────────────────────────────────────
83
+ def cleanup_cache():
84
+ """Remove old uploaded files"""
85
+ cutoff = time.time() - CACHE_TTL
86
+ for f in os.listdir("/app/uploads"):
87
+ try:
88
+ path = os.path.join("/app/uploads", f)
89
+ if os.path.getmtime(path) < cutoff:
90
+ os.remove(path)
91
+ except:
92
+ pass
93
 
94
+ # ───────────────────────────────────────────────────────────────────
95
+ # API ENDPOINTS
96
+ # ───────────────────────────────────────────────────────────────────
97
  @app.post("/ocr")
98
+ async def ocr_endpoint(
99
+ background_tasks: BackgroundTasks,
100
+ files: List[UploadFile] = File(...)
101
+ ):
102
+ """Process up to 15 images with parallelization and wait tracking"""
103
+ if len(files) > MAX_FILES:
104
+ raise HTTPException(400, f"Maximum {MAX_FILES} images allowed")
105
+
106
+ # Validate image types only
107
+ for f in files:
108
+ if not f.filename.lower().endswith(('.jpg', '.jpeg', '.png')):
109
+ raise HTTPException(400, f"Unsupported: {f.filename}. Use JPG/PNG only.")
110
+
111
+ background_tasks.add_task(cleanup_cache)
112
+ request_start = time.perf_counter()
113
+
114
+ # Save files
115
+ file_tuples = []
116
+ for idx, file in enumerate(files, 1):
117
+ ext = file.filename.split('.')[-1].lower()
118
+ path = f"/app/uploads/{uuid.uuid4()}.{ext}"
119
+ with open(path, "wb") as f:
120
+ f.write(await file.read())
121
+ file_tuples.append((f"file_{idx}", path))
122
 
123
+ # Check capacity before processing
124
+ with state_lock:
125
+ had_to_wait = active_count >= MAX_WORKERS
126
+
127
+ # Process with concurrency control
128
+ async def track_and_process(file_info):
129
+ global active_count
130
+ file_id = file_info[0]
131
 
132
+ with state_lock:
133
+ active_count += 1
 
 
134
 
135
+ try:
136
+ async with semaphore:
137
+ loop = asyncio.get_event_loop()
138
+ return await loop.run_in_executor(executor, process_image, file_info)
139
+ finally:
140
+ with state_lock:
141
+ active_count -= 1
 
142
 
143
+ results = await asyncio.gather(*[track_and_process(ft) for ft in file_tuples])
144
 
145
+ response = {
146
+ "files": results,
147
+ "performance_summary": {
148
+ "total_request_time_seconds": round(time.perf_counter() - request_start, 3),
149
+ "successful_files": sum(1 for r in results if r["status"] == "success"),
150
+ "failed_files": sum(1 for r in results if r["status"] == "error"),
151
+ "had_to_wait_for_worker": had_to_wait,
152
+ "request_status": "processed_immediately" if not had_to_wait else "processed_after_waiting"
153
  }
154
+ }
 
 
 
 
155
 
 
 
156
  gc.collect()
157
+ return response
 
158
 
159
+ @app.get("/status")
160
+ async def get_status():
161
+ """Live capacity monitor - check this before sending requests"""
162
+ with state_lock:
163
+ current = active_count
164
+
165
+ available = MAX_WORKERS - current
166
+
167
  return {
168
+ "system_capacity": {
169
+ "currently_processing": current,
170
+ "max_capacity": MAX_WORKERS,
171
+ "available_slots": available
172
+ },
173
+ "can_accept_immediately": available > 0,
174
+ "recommended_action": "send_request" if available > 0 else "wait_or_retry_later"
175
  }
176
 
177
+ @app.delete("/cache")
178
+ async def clear_cache():
179
+ """Manual cache cleanup"""
180
+ cleanup_cache()
181
+ gc.collect()
182
+ return {"status": "cache_cleared"}
183
+
184
+ @app.get("/health")
185
+ async def health():
186
+ """Health check"""
187
+ return {"status": "healthy", "active_processes": active_count}
188
+
189
  @app.on_event("shutdown")
190
+ def shutdown():
191
+ executor.shutdown(wait=True)