Spaces:
Running
Running
| import os | |
| import io | |
| import asyncio | |
| import time | |
| import base64 | |
| import numpy as np | |
| import tempfile | |
| import subprocess | |
| import zipfile | |
| from typing import List, Optional | |
| from pydantic import BaseModel | |
| from PIL import Image | |
| # FastAPI | |
| from fastapi import FastAPI, UploadFile, File, HTTPException, Query, APIRouter, Form, Depends, Security, BackgroundTasks | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import Response, StreamingResponse, FileResponse, JSONResponse | |
| from fastapi.security.api_key import APIKeyHeader | |
| # Rate Limiting | |
| from slowapi import Limiter, _rate_limit_exceeded_handler | |
| from slowapi.util import get_remote_address | |
| from slowapi.errors import RateLimitExceeded | |
| from starlette.requests import Request | |
| # Backend 1: PDF & OCR Libraries | |
| import easyocr | |
| import fitz # PyMuPDF | |
| from DrissionPage import ChromiumPage, ChromiumOptions | |
| from pdf2docx import Converter | |
| import tabula | |
| import pandas as pd | |
| from pdf2image import convert_from_path | |
| import pikepdf | |
| import pytesseract | |
| from weasyprint import HTML | |
| from bs4 import BeautifulSoup | |
| import ebooklib | |
| from ebooklib import epub | |
| # Backend 2: Image Libraries | |
| import rembg | |
| # --- App Initialization --- | |
| app = FastAPI( | |
| title="QuickPDF Studio Unified API", | |
| description="Unified backend for PDF/OCR and Image processing services.", | |
| version="1.0.0" | |
| ) | |
| limiter = Limiter(key_func=get_remote_address) | |
| app.state.limiter = limiter | |
| app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) | |
| # CORS Configuration | |
| origins = ["*"] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=origins, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # API Gatekeeper | |
| API_TOKEN = "HUGGING_FACE_SECURE_TOKEN_2026" | |
| api_key_header = APIKeyHeader(name="X-API-Key", auto_error=True) | |
| async def get_api_key(api_key: str = Security(api_key_header)): | |
| if api_key != API_TOKEN: | |
| raise HTTPException(status_code=401, detail="Invalid API Key") | |
| return api_key | |
| def cleanup_file(filepath: str): | |
| try: | |
| if os.path.exists(filepath): | |
| os.remove(filepath) | |
| except Exception as e: | |
| print(f"Cleanup error: {e}") | |
| # --- Global Shared Resources (Lazy-loaded) --- | |
| _ocr_reader = None | |
| _rembg_session = None | |
| def get_ocr_reader(): | |
| global _ocr_reader | |
| if _ocr_reader is None: | |
| _ocr_reader = easyocr.Reader(['en', 'de'], gpu=False) | |
| return _ocr_reader | |
| def get_rembg_session(): | |
| global _rembg_session | |
| if _rembg_session is None: | |
| try: | |
| _rembg_session = rembg.new_session("isnet-general-use") | |
| except: | |
| _rembg_session = rembg.new_session() | |
| return _rembg_session | |
| # --- PDF & OCR Router (/api/pdf) --- | |
| pdf_router = APIRouter(prefix="/api/pdf", tags=["PDF & OCR"]) | |
| async def pdf_health(): | |
| return {"status": "healthy", "service": "ocr-pdf-engine"} | |
| # --- OCR Endpoints --- | |
| async def perform_ocr(file: UploadFile = File(...), languages: Optional[str] = Query(None)): | |
| data = await file.read() | |
| ext = os.path.splitext(file.filename)[1].lower() | |
| results_pages = [] | |
| full_text = "" | |
| try: | |
| if ext == '.pdf': | |
| doc = fitz.open(stream=data, filetype="pdf") | |
| for page_num in range(len(doc)): | |
| page = doc.load_page(page_num) | |
| pix = page.get_pixmap(matrix=fitz.Matrix(1.5, 1.5)) | |
| img = Image.open(io.BytesIO(pix.tobytes("png"))).convert('RGB') | |
| raw_results = get_ocr_reader().readtext(np.array(img), detail=1) | |
| page_words = [] | |
| lines = [] | |
| sorted_results = sorted(raw_results, key=lambda x: min(p[1] for p in x[0])) | |
| for bbox, text, conf in sorted_results: | |
| xs, ys = [p[0] for p in bbox], [p[1] for p in bbox] | |
| min_x, min_y, max_x, max_y = min(xs), min(ys), max(xs), max(ys) | |
| mid_y = (min_y + max_y) / 2 | |
| word_obj = {"text": text, "confidence": float(conf), "x": (min_x/pix.width)*100, "y": (min_y/pix.height)*100, "width": ((max_x-min_x)/pix.width)*100, "height": ((max_y-min_y)/pix.height)*100, "bbox": {"x": min_x, "y": min_y, "width": max_x-min_x, "height": max_y-min_y}} | |
| page_words.append(word_obj) | |
| found = False | |
| for line in lines: | |
| if abs(mid_y - sum((w['bbox']['y']+w['bbox']['height']/2) for w in line)/len(line)) < (max_y-min_y)*0.5: | |
| line.append(word_obj); found = True; break | |
| if not found: lines.append([word_obj]) | |
| formatted = "" | |
| for line in lines: | |
| line.sort(key=lambda w: w['bbox']['x']) | |
| formatted += " ".join(w['text'] for w in line) + "\n" | |
| results_pages.append({"pageNum": page_num+1, "fullText": formatted.strip(), "words": page_words, "imageWidth": pix.width, "imageHeight": pix.height}) | |
| full_text += formatted + "\n" | |
| doc.close() | |
| else: | |
| img = Image.open(io.BytesIO(data)).convert('RGB') | |
| raw_results = get_ocr_reader().readtext(np.array(img), detail=1) | |
| lines = [] | |
| sorted_results = sorted(raw_results, key=lambda x: min(p[1] for p in x[0])) | |
| for bbox, text, conf in sorted_results: | |
| xs, ys = [p[0] for p in bbox], [p[1] for p in bbox] | |
| min_x, min_y, max_x, max_y = min(xs), min(ys), max(xs), max(ys) | |
| mid_y = (min_y + max_y) / 2 | |
| word_obj = {"text": text, "confidence": float(conf), "bbox": {"x": min_x, "y": min_y, "width": max_x-min_x, "height": max_y-min_y}} | |
| found = False | |
| for line in lines: | |
| if abs(mid_y - sum((w['bbox']['y']+w['bbox']['height']/2) for w in line)/len(line)) < (max_y-min_y)*0.5: | |
| line.append(word_obj); found = True; break | |
| if not found: lines.append([word_obj]) | |
| formatted = "" | |
| for line in lines: | |
| line.sort(key=lambda w: w['bbox']['x']) | |
| formatted += " ".join(w['text'] for w in line) + "\n" | |
| full_text = formatted | |
| results_pages.append({"pageNum": 1, "fullText": full_text.strip()}) | |
| return {"success": True, "text": full_text.strip(), "pages": results_pages} | |
| except Exception as e: raise HTTPException(status_code=500, detail=str(e)) | |
| # --- Conversion Endpoints --- | |
| async def url_to_pdf(payload: dict): | |
| try: | |
| options = ChromiumOptions().headless(True).set_argument('--no-sandbox').set_argument('--disable-gpu').set_argument('--disable-dev-shm-usage') | |
| ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" | |
| if payload.get("device") == "mobile": | |
| ua = "Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1" | |
| options.set_user_agent(ua) | |
| page = ChromiumPage(options) | |
| try: | |
| page.get(payload["url"]) | |
| if payload.get("delay", 0) > 0: time.sleep(payload["delay"]) | |
| else: page.wait.load_start() | |
| print_options = {'printBackground': True, 'marginTop': 0.4, 'marginBottom': 0.4, 'marginLeft': 0.4, 'marginRight': 0.4} | |
| if payload.get("format") == "a4": print_options['paperWidth'], print_options['paperHeight'] = 8.27, 11.69 | |
| else: | |
| body_height = page.run_js('return document.documentElement.scrollHeight') | |
| print_options['paperHeight'] = (body_height / 96) + 1 | |
| print_options['marginTop'] = print_options['marginBottom'] = 0 | |
| result = page.run_cdp('Page.printToPDF', **print_options) | |
| return Response(content=base64.b64decode(result['data']), media_type="application/pdf", headers={"Content-Disposition": "attachment; filename=web-capture.pdf"}) | |
| finally: page.quit() | |
| except Exception as e: raise HTTPException(status_code=500, detail=str(e)) | |
| async def pdf_to_word(request: Request, background_tasks: BackgroundTasks, file: UploadFile = File(...), api_key: str = Depends(get_api_key)): | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf: | |
| tmp_pdf.write(await file.read()) | |
| tmp_pdf_path = tmp_pdf.name | |
| tmp_docx_path = tmp_pdf_path.replace(".pdf", ".docx") | |
| try: | |
| cv = Converter(tmp_pdf_path) | |
| cv.convert(tmp_docx_path, start=0, end=None, multi_processing=False) # Free tier stable | |
| cv.close() | |
| background_tasks.add_task(cleanup_file, tmp_pdf_path) | |
| background_tasks.add_task(cleanup_file, tmp_docx_path) | |
| return FileResponse(tmp_docx_path, filename="converted.docx", media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document") | |
| except Exception as e: | |
| cleanup_file(tmp_pdf_path) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def word_to_pdf(request: Request, background_tasks: BackgroundTasks, file: UploadFile = File(...), api_key: str = Depends(get_api_key)): | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".docx") as tmp_docx: | |
| tmp_docx.write(await file.read()) | |
| tmp_docx_path = tmp_docx.name | |
| try: | |
| subprocess.run(["libreoffice", "--headless", "--convert-to", "pdf", tmp_docx_path, "--outdir", os.path.dirname(tmp_docx_path)], check=True) | |
| tmp_pdf_path = tmp_docx_path.replace(".docx", ".pdf") | |
| background_tasks.add_task(cleanup_file, tmp_docx_path) | |
| background_tasks.add_task(cleanup_file, tmp_pdf_path) | |
| return FileResponse(tmp_pdf_path, filename="converted.pdf", media_type="application/pdf") | |
| except Exception as e: | |
| cleanup_file(tmp_docx_path) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def html_to_pdf(request: Request, background_tasks: BackgroundTasks, file: UploadFile = File(None), url: str = Form(None), api_key: str = Depends(get_api_key)): | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf: | |
| tmp_pdf_path = tmp_pdf.name | |
| try: | |
| if file: HTML(string=(await file.read()).decode('utf-8', errors='ignore')).write_pdf(tmp_pdf_path) | |
| elif url: HTML(url=url).write_pdf(tmp_pdf_path) | |
| background_tasks.add_task(cleanup_file, tmp_pdf_path) | |
| return FileResponse(tmp_pdf_path, filename="converted.pdf", media_type="application/pdf") | |
| except Exception as e: | |
| cleanup_file(tmp_pdf_path) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def pdf_to_excel(request: Request, background_tasks: BackgroundTasks, file: UploadFile = File(...), api_key: str = Depends(get_api_key)): | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf: | |
| tmp_pdf.write(await file.read()) | |
| tmp_pdf_path = tmp_pdf.name | |
| tmp_xlsx_path = tmp_pdf_path.replace(".pdf", ".xlsx") | |
| try: | |
| dfs = tabula.read_pdf(tmp_pdf_path, pages='all', multiple_tables=True) | |
| # Filter out empty dataframes and verify we have at least one valid sheet | |
| valid_dfs = [df for df in dfs if not df.empty] if dfs else [] | |
| if not valid_dfs: | |
| cleanup_file(tmp_pdf_path) | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": "No clear tables were detected inside the provided PDF. This tool requires documents with defined rows and columns."} | |
| ) | |
| with pd.ExcelWriter(tmp_xlsx_path, engine='openpyxl') as writer: | |
| for i, df in enumerate(valid_dfs): df.to_excel(writer, sheet_name=f"Table_{i+1}", index=False) | |
| background_tasks.add_task(cleanup_file, tmp_pdf_path) | |
| background_tasks.add_task(cleanup_file, tmp_xlsx_path) | |
| return FileResponse(tmp_xlsx_path, filename="tables.xlsx", media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet") | |
| except Exception as e: | |
| cleanup_file(tmp_pdf_path) | |
| raise HTTPException(status_code=500, detail=f"Excel processing failed: {str(e)}") | |
| async def pdf_to_jpg(request: Request, background_tasks: BackgroundTasks, file: UploadFile = File(...), api_key: str = Depends(get_api_key)): | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf: | |
| tmp_pdf.write(await file.read()); tmp_pdf_path = tmp_pdf.name | |
| try: | |
| images = convert_from_path(tmp_pdf_path) | |
| tmp_zip_path = tmp_pdf_path.replace(".pdf", ".zip") | |
| with zipfile.ZipFile(tmp_zip_path, 'w') as zipf: | |
| for i, image in enumerate(images): | |
| p = f"{tmp_pdf_path}_{i}.jpg"; image.save(p, 'JPEG') | |
| zipf.write(p, arcname=f"page_{i+1}.jpg"); background_tasks.add_task(cleanup_file, p) | |
| background_tasks.add_task(cleanup_file, tmp_pdf_path); background_tasks.add_task(cleanup_file, tmp_zip_path) | |
| return FileResponse(tmp_zip_path, filename="images.zip", media_type="application/zip") | |
| except Exception as e: cleanup_file(tmp_pdf_path); raise HTTPException(status_code=500, detail=str(e)) | |
| async def compress_pdf(request: Request, background_tasks: BackgroundTasks, file: UploadFile = File(...), quality: str = Form("recommended"), api_key: str = Depends(get_api_key)): | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf: | |
| tmp_pdf.write(await file.read()); tmp_pdf_path = tmp_pdf.name | |
| tmp_out = tmp_pdf_path.replace(".pdf", "_comp.pdf") | |
| q_map = {"extreme": "/screen", "recommended": "/ebook", "less": "/printer"} | |
| try: | |
| subprocess.run(["gs", "-sDEVICE=pdfwrite", "-dCompatibilityLevel=1.4", f"-dPDFSETTINGS={q_map.get(quality, '/ebook')}", "-dNOPAUSE", "-dQUIET", "-dBATCH", f"-sOutputFile={tmp_out}", tmp_pdf_path], check=True) | |
| background_tasks.add_task(cleanup_file, tmp_pdf_path); background_tasks.add_task(cleanup_file, tmp_out) | |
| return FileResponse(tmp_out, filename="compressed.pdf", media_type="application/pdf") | |
| except Exception as e: cleanup_file(tmp_pdf_path); raise HTTPException(status_code=500, detail=str(e)) | |
| async def ocr_pdf_legacy(request: Request, background_tasks: BackgroundTasks, file: UploadFile = File(...), api_key: str = Depends(get_api_key)): | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf: | |
| tmp_pdf.write(await file.read()); tmp_pdf_path = tmp_pdf.name | |
| try: | |
| images = convert_from_path(tmp_pdf_path) | |
| text = "\n---\n".join([pytesseract.image_to_string(img) for img in images]) | |
| background_tasks.add_task(cleanup_file, tmp_pdf_path) | |
| return JSONResponse(content={"text": text.strip()}) | |
| except Exception as e: cleanup_file(tmp_pdf_path); raise HTTPException(status_code=500, detail=str(e)) | |
| # --- Image Router (/api/image) --- | |
| image_router = APIRouter(prefix="/api/image", tags=["Image Tools"]) | |
| async def remove_background(file: UploadFile = File(...)): | |
| try: | |
| input_image = Image.open(io.BytesIO(await file.read())).convert("RGBA") | |
| output_data = rembg.remove(input_image, session=get_rembg_session()) | |
| buf = io.BytesIO(); output_data.save(buf, format="PNG"); buf.seek(0) | |
| return StreamingResponse(buf, media_type="image/png") | |
| except Exception as e: raise HTTPException(status_code=500, detail=str(e)) | |
| app.include_router(pdf_router) | |
| app.include_router(image_router) | |
| async def root(): return {"message": "QuickPDF Unified API is active."} | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |