Spaces:
Sleeping
Sleeping
| from pathlib import Path | |
| from typing import Any, Dict, List | |
| import json | |
| import os | |
| import shutil | |
| import torch | |
| from fastapi import FastAPI, File, Form, Request, Response, UploadFile | |
| from fastapi.responses import FileResponse, HTMLResponse, JSONResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.templating import Jinja2Templates | |
| from loguru import logger | |
| from werkzeug.utils import secure_filename | |
| import main as extractor | |
| app = FastAPI() | |
| # Static files and templates ------------------------------------------------- | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| templates = Jinja2Templates(directory="templates") | |
| def flask_like_url_for(endpoint: str, **kwargs: Any) -> str: | |
| """Minimal Flask-like url_for for templates using filename= for static. | |
| The Jinja template calls url_for('static', filename='css/styles.css'), | |
| which is Flask style. We emulate that here so templates work unchanged. | |
| """ | |
| if endpoint == "static": | |
| filename = str(kwargs.get("filename", "")) | |
| return "/static/" + filename.lstrip("/") | |
| # Fallback: just return "/<endpoint>"; templates only use static. | |
| return "/" + endpoint.lstrip("/") | |
| templates.env.globals["url_for"] = flask_like_url_for | |
| # Configuration ------------------------------------------------------------- | |
| UPLOAD_FOLDER = Path("./uploads") | |
| OUTPUT_FOLDER = Path("./output") | |
| MAX_CONTENT_LENGTH = 500 * 1024 * 1024 # 500MB | |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
| os.makedirs(OUTPUT_FOLDER, exist_ok=True) | |
| # Global model cache -------------------------------------------------------- | |
| _model: Any = None | |
| def get_device_info() -> Dict[str, Any]: | |
| """Get information about GPU/CPU availability.""" | |
| cuda_available = torch.cuda.is_available() | |
| device = "cuda" if cuda_available else "cpu" | |
| info: Dict[str, Any] = { | |
| "device": device, | |
| "cuda_available": cuda_available, | |
| "device_name": None, | |
| "device_count": 0, | |
| } | |
| if cuda_available: | |
| info["device_name"] = torch.cuda.get_device_name(0) | |
| info["device_count"] = torch.cuda.device_count() | |
| return info | |
| def load_model_once() -> Any: | |
| """Load the DocLayout-YOLO model once and cache it in this process.""" | |
| global _model | |
| if _model is None: | |
| logger.info("Loading DocLayout-YOLO model...") | |
| _model = extractor.get_model() | |
| logger.info("Model loaded successfully") | |
| return _model | |
| # Routes -------------------------------------------------------------------- | |
| async def index(request: Request) -> HTMLResponse: | |
| """Main page, equivalent to the Flask index route.""" | |
| device_info = get_device_info() | |
| return templates.TemplateResponse( | |
| "index.html", {"request": request, "device_info": device_info} | |
| ) | |
| async def device_info() -> Dict[str, Any]: | |
| """API endpoint to get device information.""" | |
| return get_device_info() | |
| async def upload_files( | |
| request: Request, | |
| files: List[UploadFile] = File(..., alias="files[]"), | |
| extraction_mode: str = Form("images"), | |
| ) -> JSONResponse: | |
| """Handle multiple PDF file uploads (FastAPI version of Flask route).""" | |
| if not files or all((f.filename or "") == "" for f in files): | |
| return JSONResponse({"error": "No files selected"}, status_code=400) | |
| include_images = extraction_mode != "markdown" | |
| include_markdown = extraction_mode != "images" | |
| results: List[Dict[str, Any]] = [] | |
| for upload in files: | |
| filename = upload.filename or "" | |
| if not filename.endswith(".pdf"): | |
| continue | |
| try: | |
| safe_name = secure_filename(filename) | |
| stem = Path(safe_name).stem | |
| upload_path = UPLOAD_FOLDER / safe_name | |
| # Save uploaded file to disk | |
| with upload_path.open("wb") as out_f: | |
| while True: | |
| chunk = await upload.read(1024 * 1024) | |
| if not chunk: | |
| break | |
| out_f.write(chunk) | |
| # Prepare output directory | |
| output_dir = OUTPUT_FOLDER / stem | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| # Move PDF into output directory | |
| pdf_path = output_dir / safe_name | |
| upload_path.replace(pdf_path) | |
| # Process PDF | |
| extractor.USE_MULTIPROCESSING = False | |
| logger.info( | |
| f"Processing {safe_name} (images={include_images}, markdown={include_markdown})" | |
| ) | |
| if include_images: | |
| load_model_once() | |
| extractor.process_pdf_with_pool( | |
| pdf_path, | |
| output_dir, | |
| pool=None, | |
| extract_images=include_images, | |
| extract_markdown=include_markdown, | |
| ) | |
| # Collect results | |
| json_path = output_dir / f"{stem}_content_list.json" | |
| elements: List[Dict[str, Any]] = [] | |
| if include_images and json_path.exists(): | |
| elements = json.loads(json_path.read_text(encoding="utf-8")) | |
| annotated_pdf: str | None = None | |
| if include_images: | |
| candidate_pdf = output_dir / f"{stem}_layout.pdf" | |
| if candidate_pdf.exists(): | |
| annotated_pdf = str(candidate_pdf.relative_to(OUTPUT_FOLDER)) | |
| markdown_path: str | None = None | |
| if include_markdown: | |
| candidate_md = output_dir / f"{stem}.md" | |
| if candidate_md.exists(): | |
| markdown_path = str(candidate_md.relative_to(OUTPUT_FOLDER)) | |
| figures = [e for e in elements if e.get("type") == "figure"] | |
| tables = [e for e in elements if e.get("type") == "table"] | |
| results.append( | |
| { | |
| "filename": safe_name, | |
| "stem": stem, | |
| "output_dir": str(output_dir.relative_to(OUTPUT_FOLDER)), | |
| "figures_count": len(figures), | |
| "tables_count": len(tables), | |
| "elements_count": len(elements), | |
| "annotated_pdf": annotated_pdf, | |
| "markdown_path": markdown_path, | |
| "include_images": include_images, | |
| "include_markdown": include_markdown, | |
| } | |
| ) | |
| except Exception as e: # pragma: no cover - runtime error path | |
| logger.error(f"Error processing {filename}: {e}") | |
| results.append({"filename": filename, "error": str(e)}) | |
| return JSONResponse({"results": results}) | |
| async def pdf_list() -> Dict[str, Any]: | |
| """Get list of processed PDFs.""" | |
| pdfs: List[Dict[str, Any]] = [] | |
| output_dir = OUTPUT_FOLDER | |
| if not output_dir.exists(): | |
| return {"pdfs": pdfs} | |
| for item in output_dir.iterdir(): | |
| if item.is_dir(): | |
| json_files = list(item.glob("*_content_list.json")) | |
| md_files = list(item.glob("*.md")) | |
| pdf_files = list(item.glob("*.pdf")) | |
| if json_files or md_files or pdf_files: | |
| stem = item.name | |
| pdfs.append( | |
| { | |
| "stem": stem, | |
| "output_dir": str(item.relative_to(output_dir)), | |
| } | |
| ) | |
| return {"pdfs": pdfs} | |
| async def pdf_details(pdf_stem: str) -> JSONResponse: | |
| """Get detailed information about a processed PDF.""" | |
| output_dir = OUTPUT_FOLDER / pdf_stem | |
| if not output_dir.exists(): | |
| return JSONResponse({"error": "PDF not found"}, status_code=404) | |
| json_files = list(output_dir.glob("*_content_list.json")) | |
| elements: List[Dict[str, Any]] = [] | |
| if json_files: | |
| elements = json.loads(json_files[0].read_text(encoding="utf-8")) | |
| figures = [e for e in elements if e.get("type") == "figure"] | |
| tables = [e for e in elements if e.get("type") == "table"] | |
| annotated_pdf: str | None = None | |
| pdf_files = list(output_dir.glob("*_layout.pdf")) | |
| if pdf_files: | |
| annotated_pdf = str(pdf_files[0].relative_to(OUTPUT_FOLDER)) | |
| markdown_path: str | None = None | |
| md_files = list(output_dir.glob("*.md")) | |
| if md_files: | |
| markdown_path = str(md_files[0].relative_to(OUTPUT_FOLDER)) | |
| figure_dir = output_dir / "figures" | |
| table_dir = output_dir / "tables" | |
| figure_images: List[str] = [] | |
| if figure_dir.exists(): | |
| figure_images = [ | |
| str(f.relative_to(OUTPUT_FOLDER)) for f in sorted(figure_dir.glob("*.png")) | |
| ] | |
| table_images: List[str] = [] | |
| if table_dir.exists(): | |
| table_images = [ | |
| str(t.relative_to(OUTPUT_FOLDER)) for t in sorted(table_dir.glob("*.png")) | |
| ] | |
| return JSONResponse( | |
| { | |
| "stem": pdf_stem, | |
| "figures": figures, | |
| "tables": tables, | |
| "figures_count": len(figures), | |
| "tables_count": len(tables), | |
| "elements_count": len(elements), | |
| "annotated_pdf": annotated_pdf, | |
| "markdown_path": markdown_path, | |
| "figure_images": figure_images, | |
| "table_images": table_images, | |
| } | |
| ) | |
| async def output_file(filename: str): | |
| """Serve output files (PDFs, images, markdown).""" | |
| output_root = OUTPUT_FOLDER.resolve() | |
| file_path = (output_root / filename).resolve() | |
| if output_root not in file_path.parents and file_path != output_root: | |
| return JSONResponse({"error": "Invalid path"}, status_code=400) | |
| if not file_path.exists() or not file_path.is_file(): | |
| return JSONResponse({"error": "Not found"}, status_code=404) | |
| return FileResponse(file_path) | |
| def _delete_by_stem(stem_raw: str) -> JSONResponse: | |
| stem = (stem_raw or "").strip() | |
| if not stem: | |
| return JSONResponse({"error": "Missing stem"}, status_code=400) | |
| output_root = OUTPUT_FOLDER.resolve() | |
| target_dir = (output_root / stem).resolve() | |
| if output_root not in target_dir.parents and target_dir != output_root: | |
| return JSONResponse({"error": "Invalid stem path"}, status_code=400) | |
| if not target_dir.exists() or not target_dir.is_dir(): | |
| return JSONResponse({"error": "Not found"}, status_code=404) | |
| shutil.rmtree(target_dir, ignore_errors=False) | |
| logger.info(f"Deleted processed output: {target_dir}") | |
| return JSONResponse({"ok": True, "deleted": stem}) | |
| async def delete_pdf(request: Request, stem_form: str | None = Form(default=None)) -> JSONResponse: | |
| """Delete a processed PDF directory by stem (JSON or form body).""" | |
| try: | |
| stem = (stem_form or "").strip() | |
| if not stem: | |
| data: Dict[str, Any] = {} | |
| try: | |
| data = await request.json() | |
| except Exception: | |
| data = {} | |
| stem = (str(data.get("stem") or "")).strip() | |
| return _delete_by_stem(stem) | |
| except Exception as e: # pragma: no cover - runtime error path | |
| logger.error(f"Delete failed: {e}") | |
| return JSONResponse({"error": str(e)}, status_code=500) | |
| async def delete_pdf_by_path(stem: str) -> JSONResponse: | |
| """Alternate endpoint to delete using URL path, for clients avoiding bodies.""" | |
| try: | |
| return _delete_by_stem(stem) | |
| except Exception as e: # pragma: no cover - runtime error path | |
| logger.error(f"Delete failed: {e}") | |
| return JSONResponse({"error": str(e)}, status_code=500) | |