webdesign-tools / main.py
zasprince8's picture
Upload main.py
b45ff8d verified
import os
import io
import asyncio
import time
import base64
import numpy as np
import tempfile
import subprocess
import zipfile
from typing import List, Optional
from pydantic import BaseModel
from PIL import Image
# FastAPI
from fastapi import FastAPI, UploadFile, File, HTTPException, Query, APIRouter, Form, Depends, Security, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import Response, StreamingResponse, FileResponse, JSONResponse
from fastapi.security.api_key import APIKeyHeader
# Rate Limiting
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from starlette.requests import Request
# Backend 1: PDF & OCR Libraries
import easyocr
import fitz # PyMuPDF
from DrissionPage import ChromiumPage, ChromiumOptions
from pdf2docx import Converter
import tabula
import pandas as pd
from pdf2image import convert_from_path
import pikepdf
import pytesseract
from weasyprint import HTML
from bs4 import BeautifulSoup
import ebooklib
from ebooklib import epub
# Backend 2: Image Libraries
import rembg
# --- App Initialization ---
app = FastAPI(
title="QuickPDF Studio Unified API",
description="Unified backend for PDF/OCR and Image processing services.",
version="1.0.0"
)
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# CORS Configuration
origins = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# API Gatekeeper
API_TOKEN = "HUGGING_FACE_SECURE_TOKEN_2026"
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=True)
async def get_api_key(api_key: str = Security(api_key_header)):
if api_key != API_TOKEN:
raise HTTPException(status_code=401, detail="Invalid API Key")
return api_key
def cleanup_file(filepath: str):
try:
if os.path.exists(filepath):
os.remove(filepath)
except Exception as e:
print(f"Cleanup error: {e}")
# --- Global Shared Resources (Lazy-loaded) ---
_ocr_reader = None
_rembg_session = None
def get_ocr_reader():
global _ocr_reader
if _ocr_reader is None:
_ocr_reader = easyocr.Reader(['en', 'de'], gpu=False)
return _ocr_reader
def get_rembg_session():
global _rembg_session
if _rembg_session is None:
try:
_rembg_session = rembg.new_session("isnet-general-use")
except:
_rembg_session = rembg.new_session()
return _rembg_session
# --- PDF & OCR Router (/api/pdf) ---
pdf_router = APIRouter(prefix="/api/pdf", tags=["PDF & OCR"])
@pdf_router.get("/health")
async def pdf_health():
return {"status": "healthy", "service": "ocr-pdf-engine"}
# --- OCR Endpoints ---
@pdf_router.post("/ocr")
async def perform_ocr(file: UploadFile = File(...), languages: Optional[str] = Query(None)):
data = await file.read()
ext = os.path.splitext(file.filename)[1].lower()
results_pages = []
full_text = ""
try:
if ext == '.pdf':
doc = fitz.open(stream=data, filetype="pdf")
for page_num in range(len(doc)):
page = doc.load_page(page_num)
pix = page.get_pixmap(matrix=fitz.Matrix(1.5, 1.5))
img = Image.open(io.BytesIO(pix.tobytes("png"))).convert('RGB')
raw_results = get_ocr_reader().readtext(np.array(img), detail=1)
page_words = []
lines = []
sorted_results = sorted(raw_results, key=lambda x: min(p[1] for p in x[0]))
for bbox, text, conf in sorted_results:
xs, ys = [p[0] for p in bbox], [p[1] for p in bbox]
min_x, min_y, max_x, max_y = min(xs), min(ys), max(xs), max(ys)
mid_y = (min_y + max_y) / 2
word_obj = {"text": text, "confidence": float(conf), "x": (min_x/pix.width)*100, "y": (min_y/pix.height)*100, "width": ((max_x-min_x)/pix.width)*100, "height": ((max_y-min_y)/pix.height)*100, "bbox": {"x": min_x, "y": min_y, "width": max_x-min_x, "height": max_y-min_y}}
page_words.append(word_obj)
found = False
for line in lines:
if abs(mid_y - sum((w['bbox']['y']+w['bbox']['height']/2) for w in line)/len(line)) < (max_y-min_y)*0.5:
line.append(word_obj); found = True; break
if not found: lines.append([word_obj])
formatted = ""
for line in lines:
line.sort(key=lambda w: w['bbox']['x'])
formatted += " ".join(w['text'] for w in line) + "\n"
results_pages.append({"pageNum": page_num+1, "fullText": formatted.strip(), "words": page_words, "imageWidth": pix.width, "imageHeight": pix.height})
full_text += formatted + "\n"
doc.close()
else:
img = Image.open(io.BytesIO(data)).convert('RGB')
raw_results = get_ocr_reader().readtext(np.array(img), detail=1)
lines = []
sorted_results = sorted(raw_results, key=lambda x: min(p[1] for p in x[0]))
for bbox, text, conf in sorted_results:
xs, ys = [p[0] for p in bbox], [p[1] for p in bbox]
min_x, min_y, max_x, max_y = min(xs), min(ys), max(xs), max(ys)
mid_y = (min_y + max_y) / 2
word_obj = {"text": text, "confidence": float(conf), "bbox": {"x": min_x, "y": min_y, "width": max_x-min_x, "height": max_y-min_y}}
found = False
for line in lines:
if abs(mid_y - sum((w['bbox']['y']+w['bbox']['height']/2) for w in line)/len(line)) < (max_y-min_y)*0.5:
line.append(word_obj); found = True; break
if not found: lines.append([word_obj])
formatted = ""
for line in lines:
line.sort(key=lambda w: w['bbox']['x'])
formatted += " ".join(w['text'] for w in line) + "\n"
full_text = formatted
results_pages.append({"pageNum": 1, "fullText": full_text.strip()})
return {"success": True, "text": full_text.strip(), "pages": results_pages}
except Exception as e: raise HTTPException(status_code=500, detail=str(e))
# --- Conversion Endpoints ---
@pdf_router.post("/convert/url-to-pdf")
async def url_to_pdf(payload: dict):
try:
options = ChromiumOptions().headless(True).set_argument('--no-sandbox').set_argument('--disable-gpu').set_argument('--disable-dev-shm-usage')
ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
if payload.get("device") == "mobile":
ua = "Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1"
options.set_user_agent(ua)
page = ChromiumPage(options)
try:
page.get(payload["url"])
if payload.get("delay", 0) > 0: time.sleep(payload["delay"])
else: page.wait.load_start()
print_options = {'printBackground': True, 'marginTop': 0.4, 'marginBottom': 0.4, 'marginLeft': 0.4, 'marginRight': 0.4}
if payload.get("format") == "a4": print_options['paperWidth'], print_options['paperHeight'] = 8.27, 11.69
else:
body_height = page.run_js('return document.documentElement.scrollHeight')
print_options['paperHeight'] = (body_height / 96) + 1
print_options['marginTop'] = print_options['marginBottom'] = 0
result = page.run_cdp('Page.printToPDF', **print_options)
return Response(content=base64.b64decode(result['data']), media_type="application/pdf", headers={"Content-Disposition": "attachment; filename=web-capture.pdf"})
finally: page.quit()
except Exception as e: raise HTTPException(status_code=500, detail=str(e))
@pdf_router.post("/convert/pdf-to-word")
@limiter.limit("10/minute")
async def pdf_to_word(request: Request, background_tasks: BackgroundTasks, file: UploadFile = File(...), api_key: str = Depends(get_api_key)):
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf:
tmp_pdf.write(await file.read())
tmp_pdf_path = tmp_pdf.name
tmp_docx_path = tmp_pdf_path.replace(".pdf", ".docx")
try:
cv = Converter(tmp_pdf_path)
cv.convert(tmp_docx_path, start=0, end=None, multi_processing=False) # Free tier stable
cv.close()
background_tasks.add_task(cleanup_file, tmp_pdf_path)
background_tasks.add_task(cleanup_file, tmp_docx_path)
return FileResponse(tmp_docx_path, filename="converted.docx", media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document")
except Exception as e:
cleanup_file(tmp_pdf_path)
raise HTTPException(status_code=500, detail=str(e))
@pdf_router.post("/convert/word-to-pdf")
@limiter.limit("10/minute")
async def word_to_pdf(request: Request, background_tasks: BackgroundTasks, file: UploadFile = File(...), api_key: str = Depends(get_api_key)):
with tempfile.NamedTemporaryFile(delete=False, suffix=".docx") as tmp_docx:
tmp_docx.write(await file.read())
tmp_docx_path = tmp_docx.name
try:
subprocess.run(["libreoffice", "--headless", "--convert-to", "pdf", tmp_docx_path, "--outdir", os.path.dirname(tmp_docx_path)], check=True)
tmp_pdf_path = tmp_docx_path.replace(".docx", ".pdf")
background_tasks.add_task(cleanup_file, tmp_docx_path)
background_tasks.add_task(cleanup_file, tmp_pdf_path)
return FileResponse(tmp_pdf_path, filename="converted.pdf", media_type="application/pdf")
except Exception as e:
cleanup_file(tmp_docx_path)
raise HTTPException(status_code=500, detail=str(e))
@pdf_router.post("/convert/html-to-pdf")
@limiter.limit("10/minute")
async def html_to_pdf(request: Request, background_tasks: BackgroundTasks, file: UploadFile = File(None), url: str = Form(None), api_key: str = Depends(get_api_key)):
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf:
tmp_pdf_path = tmp_pdf.name
try:
if file: HTML(string=(await file.read()).decode('utf-8', errors='ignore')).write_pdf(tmp_pdf_path)
elif url: HTML(url=url).write_pdf(tmp_pdf_path)
background_tasks.add_task(cleanup_file, tmp_pdf_path)
return FileResponse(tmp_pdf_path, filename="converted.pdf", media_type="application/pdf")
except Exception as e:
cleanup_file(tmp_pdf_path)
raise HTTPException(status_code=500, detail=str(e))
@pdf_router.post("/convert/pdf-to-excel")
@limiter.limit("10/minute")
async def pdf_to_excel(request: Request, background_tasks: BackgroundTasks, file: UploadFile = File(...), api_key: str = Depends(get_api_key)):
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf:
tmp_pdf.write(await file.read())
tmp_pdf_path = tmp_pdf.name
tmp_xlsx_path = tmp_pdf_path.replace(".pdf", ".xlsx")
try:
dfs = tabula.read_pdf(tmp_pdf_path, pages='all', multiple_tables=True)
# Filter out empty dataframes and verify we have at least one valid sheet
valid_dfs = [df for df in dfs if not df.empty] if dfs else []
if not valid_dfs:
cleanup_file(tmp_pdf_path)
return JSONResponse(
status_code=400,
content={"error": "No clear tables were detected inside the provided PDF. This tool requires documents with defined rows and columns."}
)
with pd.ExcelWriter(tmp_xlsx_path, engine='openpyxl') as writer:
for i, df in enumerate(valid_dfs): df.to_excel(writer, sheet_name=f"Table_{i+1}", index=False)
background_tasks.add_task(cleanup_file, tmp_pdf_path)
background_tasks.add_task(cleanup_file, tmp_xlsx_path)
return FileResponse(tmp_xlsx_path, filename="tables.xlsx", media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
except Exception as e:
cleanup_file(tmp_pdf_path)
raise HTTPException(status_code=500, detail=f"Excel processing failed: {str(e)}")
@pdf_router.post("/convert/pdf-to-jpg")
@limiter.limit("10/minute")
async def pdf_to_jpg(request: Request, background_tasks: BackgroundTasks, file: UploadFile = File(...), api_key: str = Depends(get_api_key)):
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf:
tmp_pdf.write(await file.read()); tmp_pdf_path = tmp_pdf.name
try:
images = convert_from_path(tmp_pdf_path)
tmp_zip_path = tmp_pdf_path.replace(".pdf", ".zip")
with zipfile.ZipFile(tmp_zip_path, 'w') as zipf:
for i, image in enumerate(images):
p = f"{tmp_pdf_path}_{i}.jpg"; image.save(p, 'JPEG')
zipf.write(p, arcname=f"page_{i+1}.jpg"); background_tasks.add_task(cleanup_file, p)
background_tasks.add_task(cleanup_file, tmp_pdf_path); background_tasks.add_task(cleanup_file, tmp_zip_path)
return FileResponse(tmp_zip_path, filename="images.zip", media_type="application/zip")
except Exception as e: cleanup_file(tmp_pdf_path); raise HTTPException(status_code=500, detail=str(e))
@pdf_router.post("/compress/pdf")
@limiter.limit("10/minute")
async def compress_pdf(request: Request, background_tasks: BackgroundTasks, file: UploadFile = File(...), quality: str = Form("recommended"), api_key: str = Depends(get_api_key)):
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf:
tmp_pdf.write(await file.read()); tmp_pdf_path = tmp_pdf.name
tmp_out = tmp_pdf_path.replace(".pdf", "_comp.pdf")
q_map = {"extreme": "/screen", "recommended": "/ebook", "less": "/printer"}
try:
subprocess.run(["gs", "-sDEVICE=pdfwrite", "-dCompatibilityLevel=1.4", f"-dPDFSETTINGS={q_map.get(quality, '/ebook')}", "-dNOPAUSE", "-dQUIET", "-dBATCH", f"-sOutputFile={tmp_out}", tmp_pdf_path], check=True)
background_tasks.add_task(cleanup_file, tmp_pdf_path); background_tasks.add_task(cleanup_file, tmp_out)
return FileResponse(tmp_out, filename="compressed.pdf", media_type="application/pdf")
except Exception as e: cleanup_file(tmp_pdf_path); raise HTTPException(status_code=500, detail=str(e))
@pdf_router.post("/ocr/pdf")
@limiter.limit("10/minute")
async def ocr_pdf_legacy(request: Request, background_tasks: BackgroundTasks, file: UploadFile = File(...), api_key: str = Depends(get_api_key)):
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf:
tmp_pdf.write(await file.read()); tmp_pdf_path = tmp_pdf.name
try:
images = convert_from_path(tmp_pdf_path)
text = "\n---\n".join([pytesseract.image_to_string(img) for img in images])
background_tasks.add_task(cleanup_file, tmp_pdf_path)
return JSONResponse(content={"text": text.strip()})
except Exception as e: cleanup_file(tmp_pdf_path); raise HTTPException(status_code=500, detail=str(e))
# --- Image Router (/api/image) ---
image_router = APIRouter(prefix="/api/image", tags=["Image Tools"])
@image_router.post("/remove-bg")
async def remove_background(file: UploadFile = File(...)):
try:
input_image = Image.open(io.BytesIO(await file.read())).convert("RGBA")
output_data = rembg.remove(input_image, session=get_rembg_session())
buf = io.BytesIO(); output_data.save(buf, format="PNG"); buf.seek(0)
return StreamingResponse(buf, media_type="image/png")
except Exception as e: raise HTTPException(status_code=500, detail=str(e))
app.include_router(pdf_router)
app.include_router(image_router)
@app.get("/")
async def root(): return {"message": "QuickPDF Unified API is active."}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)