Spaces:
Running
Running
| import os | |
| import io | |
| import asyncio | |
| from typing import List, Optional | |
| from pydantic import BaseModel | |
| from fastapi import FastAPI, UploadFile, File, HTTPException, Query | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import easyocr | |
| import fitz # PyMuPDF | |
| import numpy as np | |
| from PIL import Image | |
| from DrissionPage import ChromiumPage, ChromiumOptions | |
| import base64 | |
| import time | |
| from fastapi.responses import Response | |
| app = FastAPI( | |
| title="QuickPDF Studio OCR Service", | |
| description="Dedicated OCR backend for extracting text from images and scanned PDFs.", | |
| version="1.0.0" | |
| ) | |
| # CORS Configuration | |
| # Supporting both production and local development environments | |
| origins = [ | |
| "https://quickpdfstudio.vercel.app", | |
| "http://localhost:5173", | |
| "http://localhost:3000", | |
| ] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=origins, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Global variable to store the shared EasyOCR reader instance | |
| _reader_instance = None | |
| def get_reader(): | |
| """Lazy-load the EasyOCR reader to avoid startup timeouts.""" | |
| global _reader_instance | |
| if _reader_instance is None: | |
| # English and German are the core supported languages | |
| _reader_instance = easyocr.Reader(['en', 'de'], gpu=False) | |
| return _reader_instance | |
| MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB limit | |
| async def health_check(): | |
| return {"status": "healthy", "service": "ocr-engine"} | |
| async def perform_ocr( | |
| file: UploadFile = File(...), | |
| languages: Optional[str] = Query(None, description="Comma-separated language codes: en,de") | |
| ): | |
| # 1. Validation | |
| if file.size > MAX_FILE_SIZE: | |
| raise HTTPException(status_code=413, detail="File too large. Maximum size is 10MB.") | |
| data = await file.read() | |
| ext = os.path.splitext(file.filename)[1].lower() | |
| # 2. Setup Languages (Default: English and German) | |
| # The reader is initialized globally with ['en', 'de'] | |
| results_pages = [] | |
| full_text = "" | |
| try: | |
| if ext == '.pdf': | |
| doc = fitz.open(stream=data, filetype="pdf") | |
| for page_num in range(len(doc)): | |
| page = doc.load_page(page_num) | |
| pix = page.get_pixmap(matrix=fitz.Matrix(1.5, 1.5)) | |
| img_data = pix.tobytes("png") | |
| img = Image.open(io.BytesIO(img_data)).convert('RGB') | |
| img_np = np.array(img) | |
| reader_instance = get_reader() | |
| raw_results = reader_instance.readtext(img_np, detail=1) | |
| page_words = [] | |
| # ── Line Clustering Logic ── | |
| lines = [] | |
| # Sort by Y first | |
| sorted_results = sorted(raw_results, key=lambda x: min(p[1] for p in x[0])) | |
| for bbox, text, conf in sorted_results: | |
| # Calculate bounding box | |
| xs = [p[0] for p in bbox] | |
| ys = [p[1] for p in bbox] | |
| min_x, min_y, max_x, max_y = min(xs), min(ys), max(xs), max(ys) | |
| mid_y = (min_y + max_y) / 2 | |
| # Calculate word object with percentage-based coordinates | |
| word_obj = { | |
| "text": text, | |
| "confidence": float(conf), | |
| "x": float((min_x / pix.width) * 100), | |
| "y": float((min_y / pix.height) * 100), | |
| "width": float(((max_x - min_x) / pix.width) * 100), | |
| "height": float(((max_y - min_y) / pix.height) * 100), | |
| "bbox": { # Keep legacy bbox for potential other uses | |
| "x": float(min_x), | |
| "y": float(min_y), | |
| "width": float(max_x - min_x), | |
| "height": float(max_y - min_y) | |
| } | |
| } | |
| page_words.append(word_obj) | |
| found_line = False | |
| for line in lines: | |
| line_avg_y = sum((w['bbox']['y'] + w['bbox']['height'] / 2) for w in line) / len(line) | |
| if abs(mid_y - line_avg_y) < (max_y - min_y) * 0.5: | |
| line.append(word_obj) | |
| found_line = True | |
| break | |
| if not found_line: | |
| lines.append([word_obj]) | |
| # Sort words within each line by X-coordinate | |
| formatted_page_text = "" | |
| for line in lines: | |
| line.sort(key=lambda w: w['bbox']['x']) | |
| formatted_page_text += " ".join(w['text'] for w in line) + "\n" | |
| results_pages.append({ | |
| "pageNum": page_num + 1, | |
| "fullText": formatted_page_text.strip(), | |
| "words": page_words, | |
| "imageWidth": pix.width, | |
| "imageHeight": pix.height, | |
| "pageWidth": page.rect.width, | |
| "pageHeight": page.rect.height | |
| }) | |
| full_text += formatted_page_text + "\n" | |
| doc.close() | |
| else: | |
| # PROCESS IMAGE | |
| img = Image.open(io.BytesIO(data)).convert('RGB') | |
| img_np = np.array(img) | |
| w, h = img.size | |
| reader_instance = get_reader() | |
| raw_results = reader_instance.readtext(img_np, detail=1) | |
| img_words = [] | |
| # ── Line Clustering Logic for Image ── | |
| lines = [] | |
| sorted_results = sorted(raw_results, key=lambda x: min(p[1] for p in x[0])) | |
| for bbox, text, conf in sorted_results: | |
| xs = [p[0] for p in bbox] | |
| ys = [p[1] for p in bbox] | |
| min_x, min_y, max_x, max_y = min(xs), min(ys), max(xs), max(ys) | |
| mid_y = (min_y + max_y) / 2 | |
| word_obj = { | |
| "text": text, | |
| "confidence": float(conf), | |
| "x": float((min_x / w) * 100), | |
| "y": float((min_y / h) * 100), | |
| "width": float(((max_x - min_x) / w) * 100), | |
| "height": float(((max_y - min_y) / h) * 100), | |
| "bbox": { | |
| "x": float(min_x), | |
| "y": float(min_y), | |
| "width": float(max_x - min_x), | |
| "height": float(max_y - min_y) | |
| } | |
| } | |
| img_words.append(word_obj) | |
| found_line = False | |
| for line in lines: | |
| line_avg_y = sum((w['bbox']['y'] + w['bbox']['height'] / 2) for w in line) / len(line) | |
| if abs(mid_y - line_avg_y) < (max_y - min_y) * 0.5: | |
| line.append(word_obj) | |
| found_line = True | |
| break | |
| if not found_line: | |
| lines.append([word_obj]) | |
| formatted_text = "" | |
| for line in lines: | |
| line.sort(key=lambda w: w['bbox']['x']) | |
| formatted_text += " ".join(w['text'] for w in line) + "\n" | |
| results_pages.append({ | |
| "pageNum": 1, | |
| "fullText": formatted_text.strip(), | |
| "words": img_words, | |
| "imageWidth": w, | |
| "imageHeight": h, | |
| "pageWidth": w, | |
| "pageHeight": h | |
| }) | |
| full_text = formatted_text | |
| return { | |
| "success": True, | |
| "text": full_text.strip(), | |
| "pages": results_pages | |
| } | |
| except Exception as e: | |
| import traceback | |
| print(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"OCR Error: {str(e)}") | |
| class UrlToPdfRequest(BaseModel): | |
| url: str | |
| cleanMode: bool = False | |
| device: str = "desktop" # "desktop" | "mobile" | |
| format: str = "a4" # "a4" | "fullPage" | |
| delay: int = 0 # seconds | |
| async def url_to_pdf(payload: UrlToPdfRequest): | |
| url = payload.url | |
| if not url: | |
| raise HTTPException(status_code=400, detail="URL is required") | |
| try: | |
| # 1. Configure Chromium Options | |
| options = ChromiumOptions() | |
| options.headless(True) | |
| options.set_argument('--no-sandbox') | |
| options.set_argument('--disable-gpu') | |
| options.set_argument('--disable-dev-shm-usage') | |
| # Set Human-like User Agent based on device | |
| ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" | |
| if payload.device == "mobile": | |
| ua = "Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1" | |
| options.set_user_agent(ua) | |
| # Initialize Page | |
| page = ChromiumPage(options) | |
| try: | |
| # 2. Navigate | |
| page.get(url) | |
| # 3. Wait for content | |
| if payload.delay > 0: | |
| time.sleep(payload.delay) | |
| else: | |
| # Default wait for readiness | |
| page.wait.load_start() | |
| # 4. Clean Mode (Reader View) Injection | |
| if payload.cleanMode: | |
| # Inject Readability from CDN and transform the page | |
| clean_script = """ | |
| async function applyReaderView() { | |
| const { Readability } = await import('https://cdn.skypack.dev/@mozilla/readability'); | |
| const article = new Readability(document).parse(); | |
| if (article) { | |
| document.body.innerHTML = ` | |
| <div style="max-width: 800px; margin: 40px auto; font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Helvetica, Arial, sans-serif; line-height: 1.6; color: #333; padding: 20px;"> | |
| <h1 style="font-size: 2.5rem; margin-bottom: 0.5rem; line-height: 1.2;">${article.title}</h1> | |
| ${article.byline ? `<p style="color: #666; margin-bottom: 2rem;">By ${article.byline}</p>` : ''} | |
| <div style="font-size: 1.1rem;">${article.content}</div> | |
| </div> | |
| `; | |
| } | |
| } | |
| await applyReaderView(); | |
| """ | |
| page.run_js(clean_script) | |
| # 5. Generate PDF via CDP | |
| print_options = { | |
| 'printBackground': True, | |
| 'marginTop': 0.4, | |
| 'marginBottom': 0.4, | |
| 'marginLeft': 0.4, | |
| 'marginRight': 0.4 | |
| } | |
| if payload.format == "a4": | |
| print_options['paperWidth'] = 8.27 | |
| print_options['paperHeight'] = 11.69 | |
| else: | |
| # Full Page - calculate content height | |
| body_height = page.run_js('return document.documentElement.scrollHeight') | |
| # Convert pixels to inches (approx 96 DPI) | |
| print_options['paperHeight'] = (body_height / 96) + 1 | |
| # Remove margins for continuous flow | |
| print_options['marginTop'] = 0 | |
| print_options['marginBottom'] = 0 | |
| result = page.run_cdp('Page.printToPDF', **print_options) | |
| pdf_bytes = base64.b64decode(result['data']) | |
| return Response( | |
| content=pdf_bytes, | |
| media_type="application/pdf", | |
| headers={ | |
| "Content-Disposition": "attachment; filename=web-capture.pdf" | |
| } | |
| ) | |
| finally: | |
| page.quit() | |
| except Exception as e: | |
| import traceback | |
| print(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"Conversion failed: {str(e)}") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |