|
|
""" |
|
|
Modal deployment for Datalab Marker PDF conversion service. |
|
|
""" |
|
|
|
|
|
import modal |
|
|
import os |
|
|
from typing import Optional |
|
|
|
|
|
|
|
|
app = modal.App("datalab-marker-modal-demo") |
|
|
GPU_TYPE = "L40S" |
|
|
MODEL_PATH_PREFIX = "/root/.cache/datalab/models" |
|
|
|
|
|
|
|
|
image = ( |
|
|
modal.Image.debian_slim(python_version="3.10") |
|
|
.apt_install(["git", "wget"]) |
|
|
.env({"TORCH_DEVICE": "cuda"}) |
|
|
.pip_install([ |
|
|
"marker-pdf[full]", |
|
|
"fastapi==0.104.1", |
|
|
"uvicorn==0.24.0", |
|
|
"python-multipart==0.0.6", |
|
|
"torch>=2.2.2,<3.0.0", |
|
|
"torchvision>=0.17.0", |
|
|
"torchaudio>=2.2.0", |
|
|
]) |
|
|
) |
|
|
|
|
|
|
|
|
models_volume = modal.Volume.from_name("marker-models-modal-demo", create_if_missing=True) |
|
|
|
|
|
def setup_models_with_cache_check(logger, commit_volume=False): |
|
|
""" |
|
|
Shared function to create models and handle cache checking/logging. |
|
|
""" |
|
|
import os |
|
|
import gc |
|
|
from marker.models import create_model_dict |
|
|
|
|
|
|
|
|
models_dir_exists = os.path.exists(MODEL_PATH_PREFIX) |
|
|
models_dir_contents = os.listdir(MODEL_PATH_PREFIX) if models_dir_exists else [] |
|
|
|
|
|
logger.info(f"Models cache directory exists: {models_dir_exists}") |
|
|
logger.info(f"Models cache directory contents: {models_dir_contents}") |
|
|
|
|
|
if models_dir_exists and models_dir_contents: |
|
|
logger.info("Found existing models in volume cache, loading from cache...") |
|
|
else: |
|
|
logger.warning("No models found in volume cache. Models will be downloaded now (this may take several minutes).") |
|
|
|
|
|
|
|
|
models = create_model_dict() |
|
|
logger.info(f"Successfully loaded {len(models)} models") |
|
|
|
|
|
|
|
|
if os.path.exists(MODEL_PATH_PREFIX): |
|
|
contents = os.listdir(MODEL_PATH_PREFIX) |
|
|
logger.info(f"Models in cache: {contents}") |
|
|
|
|
|
|
|
|
if commit_volume: |
|
|
gc.collect() |
|
|
logger.info("Attempting to commit volume...") |
|
|
models_volume.commit() |
|
|
logger.info("Volume committed successfully") |
|
|
|
|
|
return models |
|
|
|
|
|
@app.function( |
|
|
image=image, |
|
|
volumes={MODEL_PATH_PREFIX: models_volume}, |
|
|
gpu=GPU_TYPE, |
|
|
timeout=600, |
|
|
) |
|
|
def download_models(): |
|
|
""" |
|
|
Helper function to download models used in marker into a Modal volume. |
|
|
""" |
|
|
import logging |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
logger.info("Downloading models to persistent volume...") |
|
|
logger.info(f"Volume mounted at: {MODEL_PATH_PREFIX}") |
|
|
|
|
|
try: |
|
|
models = setup_models_with_cache_check(logger, commit_volume=True) |
|
|
return f"Models downloaded successfully: {list(models.keys())}" |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to download models: {e}") |
|
|
raise |
|
|
|
|
|
@app.cls( |
|
|
image=image, |
|
|
gpu=GPU_TYPE, |
|
|
memory=16384, |
|
|
timeout=600, |
|
|
volumes={MODEL_PATH_PREFIX: models_volume}, |
|
|
scaledown_window=300, |
|
|
) |
|
|
class MarkerModalDemoService: |
|
|
@modal.enter() |
|
|
def load_models(self): |
|
|
"""Load models once per container using @modal.enter() for efficiency.""" |
|
|
import logging |
|
|
import traceback |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
logger.info("Loading Marker models using @modal.enter()...") |
|
|
try: |
|
|
self.models = setup_models_with_cache_check(logger, commit_volume=True) |
|
|
except Exception as e: |
|
|
logger.error(f"Error loading models: {e}") |
|
|
traceback.print_exc() |
|
|
self.models = None |
|
|
|
|
|
@modal.asgi_app() |
|
|
def marker_api(self): |
|
|
import traceback |
|
|
import io |
|
|
import base64 |
|
|
import logging |
|
|
from contextlib import asynccontextmanager |
|
|
from typing import Optional |
|
|
from pathlib import Path |
|
|
|
|
|
from fastapi import FastAPI, Form, File, UploadFile, HTTPException |
|
|
from fastapi.responses import JSONResponse |
|
|
|
|
|
from marker.converters.pdf import PdfConverter |
|
|
from marker.config.parser import ConfigParser |
|
|
from marker.settings import settings |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
@asynccontextmanager |
|
|
async def lifespan(app: FastAPI): |
|
|
|
|
|
logger.info("Datalab Marker / Modal demo app starting up...") |
|
|
yield |
|
|
logger.info("Datalab Marker / Modal demo app shutting down...") |
|
|
|
|
|
|
|
|
web_app = FastAPI( |
|
|
title="Datalab Marker PDF Conversion Service - Modal Demo", |
|
|
description="Convert PDFs and documents to markdown, JSON, or HTML using Marker, deployed on Modal", |
|
|
version="1.0.0", |
|
|
lifespan=lifespan |
|
|
) |
|
|
|
|
|
@web_app.get("/health") |
|
|
async def health_check(): |
|
|
models_loaded = hasattr(self, 'models') and self.models is not None |
|
|
model_count = len(self.models) if models_loaded else 0 |
|
|
|
|
|
|
|
|
cache_exists = os.path.exists(MODEL_PATH_PREFIX) |
|
|
cache_contents = os.listdir(MODEL_PATH_PREFIX) if cache_exists else [] |
|
|
|
|
|
return { |
|
|
"status": "healthy" if models_loaded else "loading", |
|
|
"models_loaded": models_loaded, |
|
|
"model_count": model_count, |
|
|
"cache_dir": MODEL_PATH_PREFIX, |
|
|
"cache_exists": cache_exists, |
|
|
"cache_contents": cache_contents[:10] |
|
|
} |
|
|
|
|
|
@web_app.post("/convert") |
|
|
async def convert_document( |
|
|
file: UploadFile = File(..., description="Document to convert"), |
|
|
page_range: Optional[str] = Form(None), |
|
|
force_ocr: bool = Form(False), |
|
|
paginate_output: bool = Form(False), |
|
|
output_format: str = Form("markdown"), |
|
|
use_llm: bool = Form(False), |
|
|
): |
|
|
"""Convert uploaded document to specified format.""" |
|
|
|
|
|
if not hasattr(self, 'models') or self.models is None: |
|
|
logger.error("Models not available for conversion") |
|
|
raise HTTPException(status_code=503, detail="Models not loaded yet. Please wait for model initialization.") |
|
|
|
|
|
|
|
|
allowed_extensions = {'.pdf', '.png', '.jpg', '.jpeg', '.tiff', '.bmp'} |
|
|
file_ext = Path(file.filename).suffix.lower() |
|
|
if file_ext not in allowed_extensions: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail=f"Unsupported file type: {file_ext}. Supported: {allowed_extensions}" |
|
|
) |
|
|
|
|
|
|
|
|
if output_format not in ["markdown", "json", "html", "chunks"]: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail="Output format must be one of: markdown, json, html, chunks" |
|
|
) |
|
|
|
|
|
try: |
|
|
|
|
|
file_content = await file.read() |
|
|
|
|
|
|
|
|
temp_path = f"/tmp/{file.filename}" |
|
|
with open(temp_path, "wb") as temp_file: |
|
|
temp_file.write(file_content) |
|
|
|
|
|
|
|
|
config = { |
|
|
"filepath": temp_path, |
|
|
"page_range": page_range, |
|
|
"force_ocr": force_ocr, |
|
|
"paginate_output": paginate_output, |
|
|
"output_format": output_format, |
|
|
"use_llm": use_llm, |
|
|
} |
|
|
|
|
|
|
|
|
config_parser = ConfigParser(config) |
|
|
config_dict = config_parser.generate_config_dict() |
|
|
config_dict["pdftext_workers"] = 1 |
|
|
|
|
|
converter = PdfConverter( |
|
|
config=config_dict, |
|
|
artifact_dict=self.models, |
|
|
processor_list=config_parser.get_processors(), |
|
|
renderer=config_parser.get_renderer(), |
|
|
llm_service=config_parser.get_llm_service() if use_llm else None, |
|
|
) |
|
|
|
|
|
|
|
|
logger.info(f"Converting {file.filename} to {output_format}...") |
|
|
rendered_output = converter(temp_path) |
|
|
|
|
|
|
|
|
json_content = None |
|
|
html_content = None |
|
|
markdown_content = None |
|
|
encoded_images = {} |
|
|
|
|
|
if output_format == "json": |
|
|
|
|
|
json_content = rendered_output.model_dump() |
|
|
else: |
|
|
from marker.output import text_from_rendered |
|
|
text, _, images = text_from_rendered(rendered_output) |
|
|
|
|
|
|
|
|
if output_format == "html": |
|
|
html_content = text |
|
|
else: |
|
|
markdown_content = text |
|
|
|
|
|
|
|
|
for img_name, img_obj in images.items(): |
|
|
byte_stream = io.BytesIO() |
|
|
img_obj.save(byte_stream, format=settings.OUTPUT_IMAGE_FORMAT) |
|
|
encoded_images[img_name] = base64.b64encode(byte_stream.getvalue()).decode('utf-8') |
|
|
|
|
|
metadata = rendered_output.metadata |
|
|
|
|
|
logger.info(f"Conversion completed for {file.filename}") |
|
|
|
|
|
|
|
|
os.unlink(temp_path) |
|
|
|
|
|
return JSONResponse({ |
|
|
"success": True, |
|
|
"filename": file.filename, |
|
|
"output_format": output_format, |
|
|
"json": json_content, |
|
|
"html": html_content, |
|
|
"markdown": markdown_content, |
|
|
"images": encoded_images, |
|
|
"metadata": metadata, |
|
|
"page_count": len(metadata.get("page_stats", [])), |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
if os.path.exists(temp_path): |
|
|
os.unlink(temp_path) |
|
|
|
|
|
logger.error(f"Conversion error for {file.filename}: {str(e)}") |
|
|
traceback.print_exc() |
|
|
|
|
|
raise HTTPException( |
|
|
status_code=500, |
|
|
detail=f"Conversion failed: {str(e)}" |
|
|
) |
|
|
|
|
|
return web_app |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.local_entrypoint() |
|
|
async def invoke_conversion( |
|
|
pdf_file: Optional[str] = None, |
|
|
output_format: str = "markdown", |
|
|
env: str = 'main' |
|
|
): |
|
|
""" |
|
|
Local entrypoint to test your deployed Marker endpoint in Modal. |
|
|
|
|
|
Usage: |
|
|
modal run marker_modal_deployment.py::invoke_conversion --pdf-file /path/to/file.pdf --output-format markdown |
|
|
""" |
|
|
import requests |
|
|
import json |
|
|
from pathlib import Path |
|
|
|
|
|
if not pdf_file: |
|
|
print("No PDF file specified. Use --pdf-file /path/to/your.pdf") |
|
|
return |
|
|
|
|
|
pdf_path = Path(pdf_file) |
|
|
if not pdf_path.exists(): |
|
|
print(f"File not found: {pdf_file}") |
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
service = modal.Cls.from_name( |
|
|
"datalab-marker-modal-demo", |
|
|
"MarkerModalDemoService", |
|
|
environment_name=env |
|
|
) |
|
|
web_url = service().marker_api.get_web_url() |
|
|
print(f"Found deployed service at: {web_url}") |
|
|
except Exception as e: |
|
|
print(f"Error getting web URL: {e}") |
|
|
print("Make sure you've deployed the service first with: modal deploy marker_modal_deployment.py") |
|
|
return |
|
|
|
|
|
print(f"Testing conversion of: {pdf_path.name}") |
|
|
print(f"Output format: {output_format}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
health_response = requests.get(f"{web_url}/health") |
|
|
health_data = health_response.json() |
|
|
print(f"Service health: {health_data['status']}") |
|
|
print(f"Models loaded: {health_data['models_loaded']} ({health_data['model_count']} models)") |
|
|
|
|
|
if not health_data['models_loaded']: |
|
|
print("Warning: Models not loaded yet. First request may be slow.") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Health check failed: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
with open(pdf_path, 'rb') as f: |
|
|
files = {'file': (pdf_path.name, f, 'application/pdf')} |
|
|
data = {'output_format': output_format} |
|
|
|
|
|
print(f"Sending request to {web_url}/convert...") |
|
|
response = requests.post(f"{web_url}/convert", files=files, data=data) |
|
|
|
|
|
if response.status_code == 200: |
|
|
result = response.json() |
|
|
print(f"β
Conversion successful!") |
|
|
print(f"Filename: {result['filename']}") |
|
|
print(f"Format: {result['output_format']}") |
|
|
print(f"Pages: {result['page_count']}") |
|
|
|
|
|
output_file = f"{pdf_path.stem}_response.json" |
|
|
with open(output_file, 'w', encoding='utf-8') as f: |
|
|
json.dump(result, f, indent=2, ensure_ascii=False) |
|
|
print(f"Full API response saved to: {output_file}") |
|
|
|
|
|
if result['images']: |
|
|
print(f"Images extracted: {len(result['images'])}") |
|
|
|
|
|
else: |
|
|
print(f"β Conversion failed: {response.status_code}") |
|
|
print(f"Error: {response.text}") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Request failed: {e}") |
|
|
|