import os import io import asyncio import time import base64 import numpy as np import tempfile import subprocess import zipfile from typing import List, Optional from pydantic import BaseModel from PIL import Image # FastAPI from fastapi import FastAPI, UploadFile, File, HTTPException, Query, APIRouter, Form, Depends, Security, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import Response, StreamingResponse, FileResponse, JSONResponse from fastapi.security.api_key import APIKeyHeader # Rate Limiting from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded from starlette.requests import Request # Backend 1: PDF & OCR Libraries import easyocr import fitz # PyMuPDF from DrissionPage import ChromiumPage, ChromiumOptions from pdf2docx import Converter import tabula import pandas as pd from pdf2image import convert_from_path import pikepdf import pytesseract from weasyprint import HTML from bs4 import BeautifulSoup import ebooklib from ebooklib import epub # Backend 2: Image Libraries import rembg # --- App Initialization --- app = FastAPI( title="QuickPDF Studio Unified API", description="Unified backend for PDF/OCR and Image processing services.", version="1.0.0" ) limiter = Limiter(key_func=get_remote_address) app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) # CORS Configuration origins = ["*"] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # API Gatekeeper API_TOKEN = "HUGGING_FACE_SECURE_TOKEN_2026" api_key_header = APIKeyHeader(name="X-API-Key", auto_error=True) async def get_api_key(api_key: str = Security(api_key_header)): if api_key != API_TOKEN: raise HTTPException(status_code=401, detail="Invalid API Key") return api_key def cleanup_file(filepath: str): try: if os.path.exists(filepath): os.remove(filepath) except Exception as e: print(f"Cleanup error: {e}") # --- Global Shared Resources (Lazy-loaded) --- _ocr_reader = None _rembg_session = None def get_ocr_reader(): global _ocr_reader if _ocr_reader is None: _ocr_reader = easyocr.Reader(['en', 'de'], gpu=False) return _ocr_reader def get_rembg_session(): global _rembg_session if _rembg_session is None: try: _rembg_session = rembg.new_session("isnet-general-use") except: _rembg_session = rembg.new_session() return _rembg_session # --- PDF & OCR Router (/api/pdf) --- pdf_router = APIRouter(prefix="/api/pdf", tags=["PDF & OCR"]) @pdf_router.get("/health") async def pdf_health(): return {"status": "healthy", "service": "ocr-pdf-engine"} # --- OCR Endpoints --- @pdf_router.post("/ocr") async def perform_ocr(file: UploadFile = File(...), languages: Optional[str] = Query(None)): data = await file.read() ext = os.path.splitext(file.filename)[1].lower() results_pages = [] full_text = "" try: if ext == '.pdf': doc = fitz.open(stream=data, filetype="pdf") for page_num in range(len(doc)): page = doc.load_page(page_num) pix = page.get_pixmap(matrix=fitz.Matrix(1.5, 1.5)) img = Image.open(io.BytesIO(pix.tobytes("png"))).convert('RGB') raw_results = get_ocr_reader().readtext(np.array(img), detail=1) page_words = [] lines = [] sorted_results = sorted(raw_results, key=lambda x: min(p[1] for p in x[0])) for bbox, text, conf in sorted_results: xs, ys = [p[0] for p in bbox], [p[1] for p in bbox] min_x, min_y, max_x, max_y = min(xs), min(ys), max(xs), max(ys) mid_y = (min_y + max_y) / 2 word_obj = {"text": text, "confidence": float(conf), "x": (min_x/pix.width)*100, "y": (min_y/pix.height)*100, "width": ((max_x-min_x)/pix.width)*100, "height": ((max_y-min_y)/pix.height)*100, "bbox": {"x": min_x, "y": min_y, "width": max_x-min_x, "height": max_y-min_y}} page_words.append(word_obj) found = False for line in lines: if abs(mid_y - sum((w['bbox']['y']+w['bbox']['height']/2) for w in line)/len(line)) < (max_y-min_y)*0.5: line.append(word_obj); found = True; break if not found: lines.append([word_obj]) formatted = "" for line in lines: line.sort(key=lambda w: w['bbox']['x']) formatted += " ".join(w['text'] for w in line) + "\n" results_pages.append({"pageNum": page_num+1, "fullText": formatted.strip(), "words": page_words, "imageWidth": pix.width, "imageHeight": pix.height}) full_text += formatted + "\n" doc.close() else: img = Image.open(io.BytesIO(data)).convert('RGB') raw_results = get_ocr_reader().readtext(np.array(img), detail=1) lines = [] sorted_results = sorted(raw_results, key=lambda x: min(p[1] for p in x[0])) for bbox, text, conf in sorted_results: xs, ys = [p[0] for p in bbox], [p[1] for p in bbox] min_x, min_y, max_x, max_y = min(xs), min(ys), max(xs), max(ys) mid_y = (min_y + max_y) / 2 word_obj = {"text": text, "confidence": float(conf), "bbox": {"x": min_x, "y": min_y, "width": max_x-min_x, "height": max_y-min_y}} found = False for line in lines: if abs(mid_y - sum((w['bbox']['y']+w['bbox']['height']/2) for w in line)/len(line)) < (max_y-min_y)*0.5: line.append(word_obj); found = True; break if not found: lines.append([word_obj]) formatted = "" for line in lines: line.sort(key=lambda w: w['bbox']['x']) formatted += " ".join(w['text'] for w in line) + "\n" full_text = formatted results_pages.append({"pageNum": 1, "fullText": full_text.strip()}) return {"success": True, "text": full_text.strip(), "pages": results_pages} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # --- Conversion Endpoints --- @pdf_router.post("/convert/url-to-pdf") async def url_to_pdf(payload: dict): try: options = ChromiumOptions().headless(True).set_argument('--no-sandbox').set_argument('--disable-gpu').set_argument('--disable-dev-shm-usage') ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" if payload.get("device") == "mobile": ua = "Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1" options.set_user_agent(ua) page = ChromiumPage(options) try: page.get(payload["url"]) if payload.get("delay", 0) > 0: time.sleep(payload["delay"]) else: page.wait.load_start() print_options = {'printBackground': True, 'marginTop': 0.4, 'marginBottom': 0.4, 'marginLeft': 0.4, 'marginRight': 0.4} if payload.get("format") == "a4": print_options['paperWidth'], print_options['paperHeight'] = 8.27, 11.69 else: body_height = page.run_js('return document.documentElement.scrollHeight') print_options['paperHeight'] = (body_height / 96) + 1 print_options['marginTop'] = print_options['marginBottom'] = 0 result = page.run_cdp('Page.printToPDF', **print_options) return Response(content=base64.b64decode(result['data']), media_type="application/pdf", headers={"Content-Disposition": "attachment; filename=web-capture.pdf"}) finally: page.quit() except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @pdf_router.post("/convert/pdf-to-word") @limiter.limit("10/minute") async def pdf_to_word(request: Request, background_tasks: BackgroundTasks, file: UploadFile = File(...), api_key: str = Depends(get_api_key)): with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf: tmp_pdf.write(await file.read()) tmp_pdf_path = tmp_pdf.name tmp_docx_path = tmp_pdf_path.replace(".pdf", ".docx") try: cv = Converter(tmp_pdf_path) cv.convert(tmp_docx_path, start=0, end=None, multi_processing=False) # Free tier stable cv.close() background_tasks.add_task(cleanup_file, tmp_pdf_path) background_tasks.add_task(cleanup_file, tmp_docx_path) return FileResponse(tmp_docx_path, filename="converted.docx", media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document") except Exception as e: cleanup_file(tmp_pdf_path) raise HTTPException(status_code=500, detail=str(e)) @pdf_router.post("/convert/word-to-pdf") @limiter.limit("10/minute") async def word_to_pdf(request: Request, background_tasks: BackgroundTasks, file: UploadFile = File(...), api_key: str = Depends(get_api_key)): with tempfile.NamedTemporaryFile(delete=False, suffix=".docx") as tmp_docx: tmp_docx.write(await file.read()) tmp_docx_path = tmp_docx.name try: subprocess.run(["libreoffice", "--headless", "--convert-to", "pdf", tmp_docx_path, "--outdir", os.path.dirname(tmp_docx_path)], check=True) tmp_pdf_path = tmp_docx_path.replace(".docx", ".pdf") background_tasks.add_task(cleanup_file, tmp_docx_path) background_tasks.add_task(cleanup_file, tmp_pdf_path) return FileResponse(tmp_pdf_path, filename="converted.pdf", media_type="application/pdf") except Exception as e: cleanup_file(tmp_docx_path) raise HTTPException(status_code=500, detail=str(e)) @pdf_router.post("/convert/html-to-pdf") @limiter.limit("10/minute") async def html_to_pdf(request: Request, background_tasks: BackgroundTasks, file: UploadFile = File(None), url: str = Form(None), api_key: str = Depends(get_api_key)): with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf: tmp_pdf_path = tmp_pdf.name try: if file: HTML(string=(await file.read()).decode('utf-8', errors='ignore')).write_pdf(tmp_pdf_path) elif url: HTML(url=url).write_pdf(tmp_pdf_path) background_tasks.add_task(cleanup_file, tmp_pdf_path) return FileResponse(tmp_pdf_path, filename="converted.pdf", media_type="application/pdf") except Exception as e: cleanup_file(tmp_pdf_path) raise HTTPException(status_code=500, detail=str(e)) @pdf_router.post("/convert/pdf-to-excel") @limiter.limit("10/minute") async def pdf_to_excel(request: Request, background_tasks: BackgroundTasks, file: UploadFile = File(...), api_key: str = Depends(get_api_key)): with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf: tmp_pdf.write(await file.read()) tmp_pdf_path = tmp_pdf.name tmp_xlsx_path = tmp_pdf_path.replace(".pdf", ".xlsx") try: dfs = tabula.read_pdf(tmp_pdf_path, pages='all', multiple_tables=True) # Filter out empty dataframes and verify we have at least one valid sheet valid_dfs = [df for df in dfs if not df.empty] if dfs else [] if not valid_dfs: cleanup_file(tmp_pdf_path) return JSONResponse( status_code=400, content={"error": "No clear tables were detected inside the provided PDF. This tool requires documents with defined rows and columns."} ) with pd.ExcelWriter(tmp_xlsx_path, engine='openpyxl') as writer: for i, df in enumerate(valid_dfs): df.to_excel(writer, sheet_name=f"Table_{i+1}", index=False) background_tasks.add_task(cleanup_file, tmp_pdf_path) background_tasks.add_task(cleanup_file, tmp_xlsx_path) return FileResponse(tmp_xlsx_path, filename="tables.xlsx", media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet") except Exception as e: cleanup_file(tmp_pdf_path) raise HTTPException(status_code=500, detail=f"Excel processing failed: {str(e)}") @pdf_router.post("/convert/pdf-to-jpg") @limiter.limit("10/minute") async def pdf_to_jpg(request: Request, background_tasks: BackgroundTasks, file: UploadFile = File(...), api_key: str = Depends(get_api_key)): with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf: tmp_pdf.write(await file.read()); tmp_pdf_path = tmp_pdf.name try: images = convert_from_path(tmp_pdf_path) tmp_zip_path = tmp_pdf_path.replace(".pdf", ".zip") with zipfile.ZipFile(tmp_zip_path, 'w') as zipf: for i, image in enumerate(images): p = f"{tmp_pdf_path}_{i}.jpg"; image.save(p, 'JPEG') zipf.write(p, arcname=f"page_{i+1}.jpg"); background_tasks.add_task(cleanup_file, p) background_tasks.add_task(cleanup_file, tmp_pdf_path); background_tasks.add_task(cleanup_file, tmp_zip_path) return FileResponse(tmp_zip_path, filename="images.zip", media_type="application/zip") except Exception as e: cleanup_file(tmp_pdf_path); raise HTTPException(status_code=500, detail=str(e)) @pdf_router.post("/compress/pdf") @limiter.limit("10/minute") async def compress_pdf(request: Request, background_tasks: BackgroundTasks, file: UploadFile = File(...), quality: str = Form("recommended"), api_key: str = Depends(get_api_key)): with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf: tmp_pdf.write(await file.read()); tmp_pdf_path = tmp_pdf.name tmp_out = tmp_pdf_path.replace(".pdf", "_comp.pdf") q_map = {"extreme": "/screen", "recommended": "/ebook", "less": "/printer"} try: subprocess.run(["gs", "-sDEVICE=pdfwrite", "-dCompatibilityLevel=1.4", f"-dPDFSETTINGS={q_map.get(quality, '/ebook')}", "-dNOPAUSE", "-dQUIET", "-dBATCH", f"-sOutputFile={tmp_out}", tmp_pdf_path], check=True) background_tasks.add_task(cleanup_file, tmp_pdf_path); background_tasks.add_task(cleanup_file, tmp_out) return FileResponse(tmp_out, filename="compressed.pdf", media_type="application/pdf") except Exception as e: cleanup_file(tmp_pdf_path); raise HTTPException(status_code=500, detail=str(e)) @pdf_router.post("/ocr/pdf") @limiter.limit("10/minute") async def ocr_pdf_legacy(request: Request, background_tasks: BackgroundTasks, file: UploadFile = File(...), api_key: str = Depends(get_api_key)): with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf: tmp_pdf.write(await file.read()); tmp_pdf_path = tmp_pdf.name try: images = convert_from_path(tmp_pdf_path) text = "\n---\n".join([pytesseract.image_to_string(img) for img in images]) background_tasks.add_task(cleanup_file, tmp_pdf_path) return JSONResponse(content={"text": text.strip()}) except Exception as e: cleanup_file(tmp_pdf_path); raise HTTPException(status_code=500, detail=str(e)) # --- Image Router (/api/image) --- image_router = APIRouter(prefix="/api/image", tags=["Image Tools"]) @image_router.post("/remove-bg") async def remove_background(file: UploadFile = File(...)): try: input_image = Image.open(io.BytesIO(await file.read())).convert("RGBA") output_data = rembg.remove(input_image, session=get_rembg_session()) buf = io.BytesIO(); output_data.save(buf, format="PNG"); buf.seek(0) return StreamingResponse(buf, media_type="image/png") except Exception as e: raise HTTPException(status_code=500, detail=str(e)) app.include_router(pdf_router) app.include_router(image_router) @app.get("/") async def root(): return {"message": "QuickPDF Unified API is active."} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)