Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException, BackgroundTasks, UploadFile, File, Form | |
| from fastapi.responses import JSONResponse | |
| from pathlib import Path | |
| import tempfile | |
| import uuid | |
| import os | |
| import aiohttp | |
| import pdfplumber | |
| from PIL import Image | |
| import subprocess | |
| from bs4 import BeautifulSoup | |
| import io | |
| import docx | |
| from docx.shared import Inches | |
| import logging | |
| import base64 | |
| import time | |
| # Setup | |
| API_KEY = os.getenv("PDF_API_KEY") | |
| ZAI_SECRET_API_KEY = os.getenv("ZAI_SECRET_API_KEY") | |
| app = FastAPI() | |
| api_tokens = {"client-1": API_KEY} | |
| MAX_PDF_SIZE_MB = 40 | |
| # Logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| def health(): | |
| return {"status": "ok"} | |
| async def convert_pdf_endpoint( | |
| background_tasks: BackgroundTasks, | |
| file: UploadFile = File(...), | |
| client_id: str = Form(...), | |
| token: str = Form(...), | |
| callback_url: str = Form(...), | |
| book_id : int = Form(...) | |
| ): | |
| # Auth | |
| if client_id not in api_tokens or api_tokens[client_id] != token: | |
| raise HTTPException(status_code=401, detail="Invalid API credentials") | |
| # Check file type | |
| if not file.filename.lower().endswith(".pdf"): | |
| raise HTTPException(status_code=400, detail="Only PDF files are supported") | |
| # Read file | |
| try: | |
| pdf_bytes = await file.read() | |
| except Exception as e: | |
| logger.error(f"Failed to read uploaded file: {e}") | |
| raise HTTPException(status_code=400, detail="Unable to read file") | |
| # Enforce size limit | |
| if len(pdf_bytes) > MAX_PDF_SIZE_MB * 1024 * 1024: | |
| raise HTTPException(status_code=413, detail=f"PDF too large (> {MAX_PDF_SIZE_MB} MB)") | |
| # Write to temp file | |
| try: | |
| tmp_dir = Path(tempfile.mkdtemp()) | |
| tmp_pdf_path = tmp_dir / f"{uuid.uuid4()}.pdf" | |
| tmp_pdf_path.write_bytes(pdf_bytes) | |
| except Exception as e: | |
| logger.error(f"Failed to write PDF file: {e}") | |
| raise HTTPException(status_code=500, detail="Internal error writing PDF") | |
| # Background processing | |
| task_id = str(uuid.uuid4()) | |
| background_tasks.add_task(handle_pdf_processing, str(tmp_pdf_path), callback_url, task_id, book_id) | |
| logger.info(f"Started task {task_id} for client {client_id}") | |
| return JSONResponse({"status": "processing", "task_id": task_id}) | |
| async def handle_pdf_processing(pdf_path: str, callback_url: str, task_id: str, book_id:int): | |
| try: | |
| with open(pdf_path, "rb") as f: | |
| html, _ = extract_pdf_to_html(f) | |
| except Exception as e: | |
| logger.error(f"PDF processing failed: {e}") | |
| html = f"<p>Error extracting PDF: {e}</p>" | |
| finally: | |
| try: | |
| Path(pdf_path).unlink(missing_ok=True) | |
| except Exception as e: | |
| logger.warning(f"Temp file cleanup failed: {e}") | |
| try: | |
| headers = { "x-api-key": ZAI_SECRET_API_KEY } | |
| async with aiohttp.ClientSession() as session: | |
| await session.post(callback_url, json={ | |
| "task_id": task_id, | |
| "content": html, | |
| "book_id": book_id | |
| }, headers=headers ) | |
| logger.info(f"Content Generated \n {html} ") | |
| logger.info(f"Callback sent for task {task_id}") | |
| except Exception as e: | |
| logger.error(f"Callback failed for task {task_id}: {e}") | |
| def extract_text_from_image(image: Image.Image) -> str: | |
| temp_img_path = Path(tempfile.mktemp(suffix=".png")) | |
| image.save(temp_img_path) | |
| try: | |
| result = subprocess.run( | |
| ["latexocr", str(temp_img_path)], | |
| capture_output=True, | |
| text=True | |
| ) | |
| if result.returncode == 0: | |
| return f"<pre>\\[{result.stdout.strip()}\\]</pre>" | |
| else: | |
| return "" | |
| except Exception as e: | |
| return f"<i>LaTeX-OCR error: {str(e)}</i>" | |
| finally: | |
| try: | |
| temp_img_path.unlink(missing_ok=True) | |
| except Exception: | |
| pass | |
| def extract_pdf_to_html(file) -> tuple[str, docx.Document]: | |
| html_output = "" | |
| docx_output = docx.Document() | |
| toc = [] | |
| with pdfplumber.open(file) as pdf: | |
| for page_num, page in enumerate(pdf.pages): | |
| start = time.time() | |
| page_title = f"Page {page_num + 1}" | |
| toc.append(f"<li><a href='#page{page_num+1}'>{page_title}</a></li>") | |
| html_output += f"<h2 id='page{page_num+1}'>{page_title}</h2>\n" | |
| docx_output.add_heading(page_title, level=2) | |
| text = page.extract_text() | |
| if text: | |
| for line in text.split("\n"): | |
| html_output += f"<p>{line}</p>\n" | |
| docx_output.add_paragraph(line) | |
| else: | |
| html_output += "<p><i>No text detected on this page.</i></p>" | |
| for img_obj in page.images: | |
| try: | |
| x0, top, x1, bottom = img_obj["x0"], img_obj["top"], img_obj["x1"], img_obj["bottom"] | |
| cropped = page.crop((x0, top, x1, bottom)).to_image(resolution=300).original | |
| except Exception: | |
| continue | |
| math_html = extract_text_from_image(cropped) | |
| if math_html.strip(): | |
| html_output += f"<div>{math_html}</div>\n" | |
| docx_output.add_paragraph(BeautifulSoup(math_html, "html.parser").text) | |
| buffer = io.BytesIO() | |
| cropped.save(buffer, format="PNG") | |
| buffer.seek(0) | |
| b64_img = base64.b64encode(buffer.read()).decode("utf-8") | |
| html_output += f'<img src="data:image/png;base64,{b64_img}" style="width:100%; margin: 1rem 0;" />\n' | |
| buffer.seek(0) | |
| try: | |
| docx_output.add_picture(buffer, width=Inches(5)) | |
| except Exception: | |
| pass | |
| logger.info(f"Processed page {page_num + 1} in {time.time() - start:.2f}s") | |
| full_html = f"<ul>{''.join(toc)}</ul>\n" + html_output | |
| return full_html, docx_output | |