Spaces:
Sleeping
Sleeping
| import os | |
| import uuid | |
| from fastapi import FastAPI, UploadFile, File, HTTPException | |
| from fastapi.responses import JSONResponse | |
| from typing import List | |
| import fitz | |
| from PIL import Image | |
| # ------------------------------------------------------------------- | |
| # FORCE PADDLEX / PADDLEOCR CACHE DIRECTORIES TO WRITABLE LOCATIONS | |
| # ------------------------------------------------------------------- | |
| os.environ["PADDLE_HOME"] = "/app/paddle_home" | |
| os.environ["XDG_CACHE_HOME"] = "/app/xdg_cache" | |
| os.makedirs("/app/paddle_home", exist_ok=True) | |
| os.makedirs("/app/xdg_cache", exist_ok=True) | |
| # now safe to import paddlex/paddleocr | |
| from paddleocr import PaddleOCR | |
| # ------------------------------------------------------------------- | |
| # CONFIGURATION | |
| # ------------------------------------------------------------------- | |
| MAX_DIMENSION = 1024 # Max width or height for OCR processing | |
| PDF_DPI = 150 # Lower DPI = faster (was 220) | |
| # ------------------------------------------------------------------- | |
| # IMAGE OPTIMIZATION | |
| # ------------------------------------------------------------------- | |
| def optimize_image_for_ocr(input_path: str, output_path: str) -> str: | |
| """Resize image if too large, keeping aspect ratio.""" | |
| with Image.open(input_path) as img: | |
| # Convert to RGB if needed | |
| if img.mode in ('RGBA', 'LA', 'P'): | |
| img = img.convert('RGB') | |
| elif img.mode != 'RGB': | |
| img = img.convert('RGB') | |
| width, height = img.size | |
| # Only resize if larger than MAX_DIMENSION | |
| if width > MAX_DIMENSION or height > MAX_DIMENSION: | |
| if width > height: | |
| new_width = MAX_DIMENSION | |
| new_height = int(height * (MAX_DIMENSION / width)) | |
| else: | |
| new_height = MAX_DIMENSION | |
| new_width = int(width * (MAX_DIMENSION / height)) | |
| img = img.resize((new_width, new_height), Image.LANCZOS) | |
| img.save(output_path, 'JPEG', quality=85) | |
| return output_path | |
| # ------------------------------------------------------------------- | |
| # PDF → IMAGE (optimized) | |
| # ------------------------------------------------------------------- | |
| def pdf_to_images(pdf_path: str, max_pages: int | None = 3) -> List[str]: | |
| if not os.path.exists(pdf_path): | |
| raise FileNotFoundError(pdf_path) | |
| doc = fitz.open(pdf_path) | |
| page_count = len(doc) | |
| limit = page_count if max_pages is None else min(max_pages, page_count) | |
| output_paths: List[str] = [] | |
| out_dir = "/app/pdf_images" | |
| os.makedirs(out_dir, exist_ok=True) | |
| for i in range(limit): | |
| page = doc.load_page(i) | |
| pix = page.get_pixmap(dpi=PDF_DPI) # Lower DPI for speed | |
| img_name = f"{uuid.uuid4()}.jpg" | |
| img_path = os.path.join(out_dir, img_name) | |
| # Save initial | |
| temp_path = img_path + ".tmp.jpg" | |
| pix.save(temp_path) | |
| # Optimize (resize if needed) | |
| optimize_image_for_ocr(temp_path, img_path) | |
| # Cleanup temp | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| output_paths.append(img_path) | |
| doc.close() | |
| return output_paths | |
| # ------------------------------------------------------------------- | |
| # OCR ENGINE | |
| # ------------------------------------------------------------------- | |
| ocr_engine = PaddleOCR( | |
| lang="mr", | |
| text_recognition_model_name="devanagari_PP-OCRv5_mobile_rec", | |
| use_doc_orientation_classify=False, | |
| use_doc_unwarping=False, | |
| use_textline_orientation=False | |
| ) | |
| def extract_text(image_path: str): | |
| result = ocr_engine.predict(input=image_path) | |
| output = [] | |
| for block in result: | |
| texts = block["rec_texts"] | |
| scores = block["rec_scores"] | |
| for t, s in zip(texts, scores): | |
| output.append({"text": t, "confidence": float(s)}) | |
| return output | |
| # ------------------------------------------------------------------- | |
| # FASTAPI | |
| # ------------------------------------------------------------------- | |
| app = FastAPI() | |
| UPLOAD_DIR = "/app/uploads" | |
| os.makedirs(UPLOAD_DIR, exist_ok=True) | |
| async def ocr_endpoint(files: List[UploadFile] = File(...), max_pages: int | None = 3): | |
| if len(files) > 15: | |
| raise HTTPException(status_code=400, detail="Maximum 15 files allowed.") | |
| structured_output = {"files": []} | |
| for index, file in enumerate(files, start=1): | |
| filename = file.filename.lower() | |
| ext = filename.split(".")[-1] | |
| temp_name = f"{uuid.uuid4()}.{ext}" | |
| temp_path = os.path.join(UPLOAD_DIR, temp_name) | |
| with open(temp_path, "wb") as f: | |
| f.write(await file.read()) | |
| file_record = { | |
| "file_id": f"file_{index}", | |
| "filename": filename, | |
| "pages": [] | |
| } | |
| # ------------------------------- | |
| # ------------------------------- | |
| if filename.endswith(".pdf"): | |
| img_paths = pdf_to_images(temp_path, max_pages=max_pages) | |
| for page_idx, img_path in enumerate(img_paths): | |
| ocr_results = extract_text(img_path) | |
| file_record["pages"].append({ | |
| "page_index": page_idx, | |
| "results": ocr_results | |
| }) | |
| # Cleanup processed image | |
| if os.path.exists(img_path): | |
| os.remove(img_path) | |
| # ------------------------------- | |
| # IMAGE | |
| # ------------------------------- | |
| elif filename.endswith((".jpg", ".jpeg", ".png")): | |
| # Optimize image before OCR | |
| optimized_path = os.path.join(UPLOAD_DIR, f"opt_{uuid.uuid4()}.jpg") | |
| optimize_image_for_ocr(temp_path, optimized_path) | |
| ocr_results = extract_text(optimized_path) | |
| file_record["pages"].append({ | |
| "page_index": 0, | |
| "results": ocr_results | |
| }) | |
| # Cleanup optimized image | |
| if os.path.exists(optimized_path): | |
| os.remove(optimized_path) | |
| else: | |
| raise HTTPException(status_code=400, detail=f"Unsupported type: {filename}") | |
| # Cleanup uploaded file | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| structured_output["files"].append(file_record) | |
| return JSONResponse(structured_output) |