| """ |
| Docling Hugging Face Spaces API |
| Deploy this on Hugging Face Spaces to provide Docling extraction API |
| """ |
| import os |
| import tempfile |
| from pathlib import Path |
|
|
| from fastapi import FastAPI, File, UploadFile, HTTPException |
| from fastapi.responses import JSONResponse |
| from fastapi.middleware.cors import CORSMiddleware |
| from docling.document_converter import DocumentConverter |
| from docling.datamodel.base_models import InputFormat |
| import uvicorn |
|
|
| app = FastAPI( |
| title="Docling Document Converter API", |
| description="Convert documents using Docling AI", |
| version="1.0.0" |
| ) |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| converter = None |
|
|
|
|
| def get_converter(): |
| """Get or create DocumentConverter instance""" |
| global converter |
| if converter is None: |
| converter = DocumentConverter() |
| return converter |
|
|
|
|
| @app.get("/") |
| def root(): |
| """Health check""" |
| return { |
| "status": "ok", |
| "service": "Docling API", |
| "version": "1.0.0" |
| } |
|
|
|
|
| @app.get("/health") |
| def health(): |
| """Health check""" |
| return {"status": "ok", "gpu": "available"} |
|
|
|
|
| @app.post("/convert") |
| async def convert_document(file: UploadFile = File(...)): |
| """ |
| Convert document to structured data |
| |
| Returns: JSON with markdown, tables, and metadata |
| """ |
| if not file.filename: |
| raise HTTPException(status_code=400, detail="No file provided") |
| |
| supported_extensions = ['.pdf', '.docx', '.xlsx', '.pptx', '.html', '.txt', '.md'] |
| ext = Path(file.filename).suffix.lower() |
| if ext not in supported_extensions: |
| raise HTTPException( |
| status_code=400, |
| detail=f"Unsupported format: {ext}. Supported: {supported_extensions}" |
| ) |
| |
| try: |
| |
| with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as tmp: |
| content = await file.read() |
| tmp.write(content) |
| tmp_path = tmp.name |
| |
| |
| converter = get_converter() |
| result = converter.convert(tmp_path) |
| |
| |
| doc = result.document |
| |
| |
| markdown_text = doc.export_to_markdown() |
| |
| |
| tables_data = [] |
| for table_idx, table in enumerate(doc.tables): |
| try: |
| df = table.export_to_dataframe() |
| table_dict = { |
| "table_index": table_idx, |
| "rows": df.to_dict('records'), |
| "row_count": len(df) |
| } |
| tables_data.append(table_dict) |
| except Exception as e: |
| tables_data.append({ |
| "table_index": table_idx, |
| "error": str(e) |
| }) |
| |
| |
| response = { |
| "success": True, |
| "file_name": file.filename, |
| "document": { |
| "markdown": markdown_text, |
| "text": doc.export_to_text() if hasattr(doc, 'export_to_text') else markdown_text, |
| "num_pages": len(doc.pages) if hasattr(doc, 'pages') else 0, |
| "tables": tables_data, |
| "tables_count": len(tables_data) |
| }, |
| "metadata": { |
| "format": ext, |
| "engine": "docling", |
| "model": "docling-default" |
| } |
| } |
| |
| |
| os.unlink(tmp_path) |
| |
| return JSONResponse(content=response) |
| |
| except Exception as e: |
| |
| if 'tmp_path' in locals(): |
| try: |
| os.unlink(tmp_path) |
| except: |
| pass |
| |
| raise HTTPException(status_code=500, detail=f"Conversion failed: {str(e)}") |
|
|
|
|
| @app.post("/convert/markdown") |
| async def convert_to_markdown(file: UploadFile = File(...)): |
| """Convert document to markdown only (lightweight)""" |
| try: |
| with tempfile.NamedTemporaryFile(delete=False, suffix=Path(file.filename).suffix.lower()) as tmp: |
| content = await file.read() |
| tmp.write(content) |
| tmp_path = tmp.name |
| |
| converter = get_converter() |
| result = converter.convert(tmp_path) |
| |
| markdown = result.document.export_to_markdown() |
| |
| os.unlink(tmp_path) |
| |
| return { |
| "success": True, |
| "markdown": markdown, |
| "file_name": file.filename |
| } |
| |
| except Exception as e: |
| if 'tmp_path' in locals(): |
| try: |
| os.unlink(tmp_path) |
| except: |
| pass |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.post("/convert/tables") |
| async def convert_tables(file: UploadFile = File(...)): |
| """Extract tables only from document""" |
| try: |
| with tempfile.NamedTemporaryFile(delete=False, suffix=Path(file.filename).suffix.lower()) as tmp: |
| content = await file.read() |
| tmp.write(content) |
| tmp_path = tmp.name |
| |
| converter = get_converter() |
| result = converter.convert(tmp_path) |
| |
| tables_data = [] |
| for table_idx, table in enumerate(result.document.tables): |
| try: |
| df = table.export_to_dataframe() |
| tables_data.append({ |
| "table_index": table_idx, |
| "headers": list(df.columns), |
| "rows": df.to_dict('records'), |
| "row_count": len(df) |
| }) |
| except: |
| pass |
| |
| os.unlink(tmp_path) |
| |
| return { |
| "success": True, |
| "tables": tables_data, |
| "tables_count": len(tables_data), |
| "file_name": file.filename |
| } |
| |
| except Exception as e: |
| if 'tmp_path' in locals(): |
| try: |
| os.unlink(tmp_path) |
| except: |
| pass |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| if __name__ == "__main__": |
| print("="*60) |
| print("Docling Document Converter API") |
| print("="*60) |
| print("URL: http://localhost:8080") |
| print("Docs: http://localhost:8080/docs") |
| print("="*60) |
| |
| uvicorn.run( |
| "app:app", |
| host="0.0.0.0", |
| port=8080, |
| reload=True |
| ) |
|
|