Spaces:
Sleeping
Sleeping
| """ | |
| MinerU OCR & Document Extraction Service | |
| FastAPI application for Hugging Face Docker Space (CPU / pipeline backend) | |
| Correct imports for magic-pdf >= 1.0.x (magic_pdf module): | |
| from magic_pdf.data.data_reader_writer import FileBasedDataReader, FileBasedDataWriter | |
| from magic_pdf.data.dataset import PymuDocDataset, ImageDataset | |
| from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze | |
| from magic_pdf.config.enums import SupportedPdfParseMethod | |
| OBSOLETE imports (removed in magic-pdf >= 1.0, do not use): | |
| from magic_pdf.pipe.UNIPipe import UNIPipe β removed | |
| from magic_pdf.rw.DiskReaderWriter import ... β removed | |
| from magic_pdf.data.read_api import read_local_images β NOT used here; | |
| function expects a single string path in 1.x but crashes with | |
| "stat: ... not list" if given a list. Use FileBasedDataReader instead. | |
| Removed features vs original: | |
| - DOCX / PPTX / XLSX (required LibreOffice; caused build OOM/timeout) | |
| - subprocess (was only used for LibreOffice conversion) | |
| - python-magic / libmagic (listed in requirements but never imported) | |
| Endpoints: | |
| GET /health β liveness (always fast, no dependency check) | |
| GET /status β full node status including memory (via cgroups), uptime, | |
| cache, active requests, lastModelLoadMs | |
| POST /extract β single file (PDF or image) with SHA256 cache + memory guard | |
| POST /batch β up to BATCH_MAX_FILES files; extras silently ignored | |
| Structured error format (all non-2xx responses from /extract and /batch): | |
| { | |
| "success": false, | |
| "stage": "upload" | "validation" | "decode" | "ocr" | "markdown" | "unknown", | |
| "errorCode": "UNSUPPORTED_TYPE" | "FILE_TOO_LARGE" | "EMPTY_FILE" | | |
| "LOW_MEMORY" | "IMAGE_DECODE_FAILED" | "OCR_PIPELINE_FAILED" | | |
| "MARKDOWN_FAILED" | "INTERNAL_ERROR", | |
| "message": "<human-readable detail>" | |
| } | |
| """ | |
| import hashlib | |
| import io | |
| import os | |
| import re | |
| import shutil | |
| import sys | |
| import tempfile | |
| import threading | |
| import time | |
| import traceback | |
| import logging | |
| from importlib.metadata import version as pkg_version | |
| from typing import Any | |
| import fitz # PyMuPDF β bundled with magic-pdf[full-cpu] | |
| from PIL import Image | |
| from fastapi import FastAPI, File, UploadFile, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| # ββ Logging βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s %(levelname)-8s %(name)s %(message)s", | |
| ) | |
| logger = logging.getLogger("mineru-service") | |
| # ββ Start time ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _START_TIME: float = time.time() | |
| # ββ Upload / batch constants ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| MAX_UPLOAD_BYTES = 30 * 1024 * 1024 # 30 MB | |
| BATCH_MAX_FILES = 8 | |
| # ββ Supported file types ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| PDF_EXTENSIONS = {"pdf"} | |
| # Natively supported by ImageDataset via FileBasedDataReader | |
| NATIVE_IMAGE_EXTENSIONS = {"jpg", "jpeg", "png"} | |
| # Need Pillow conversion to PNG before feeding to MinerU | |
| PILLOW_IMAGE_EXTENSIONS = {"webp", "bmp", "tiff", "tif", "gif", "heic", "heif", "avif"} | |
| IMAGE_EXTENSIONS = NATIVE_IMAGE_EXTENSIONS | PILLOW_IMAGE_EXTENSIONS | |
| ALLOWED_EXTENSIONS = PDF_EXTENSIONS | IMAGE_EXTENSIONS | |
| # ββ Memory safety thresholds ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| BYTES_PER_OCR_PAGE = 100 * 1024 * 1024 # ~100 MB / page (conservative) | |
| IMAGE_MEMORY_FACTOR = 4 # decoded pixels Γ 4 for pipeline buffers | |
| MEM_SAFETY_FLOOR_MB = 1024 # always keep 1 GB free | |
| # ββ SHA256 extraction cache (in-process, bounded by available RAM) ββββββββββββ | |
| _cache: dict[str, dict[str, Any]] = {} | |
| _cache_lock = threading.Lock() | |
| # ββ Active-request counter ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _active_requests: int = 0 | |
| _active_lock = threading.Lock() | |
| # ββ Model load timing βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _model_load_ms: int = 0 | |
| # ββ Startup self-test results (populated by startup handler) ββββββββββββββββββ | |
| _startup_issues: list[str] = [] | |
| _startup_done: bool = False | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Structured error exception | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ExtractionError(Exception): | |
| """ | |
| Raised anywhere in the extraction pipeline to produce a structured | |
| JSON error response with stage + errorCode instead of a generic 500. | |
| """ | |
| def __init__( | |
| self, | |
| stage: str, | |
| code: str, | |
| message: str, | |
| http_status: int = 422, | |
| ) -> None: | |
| self.stage = stage | |
| self.code = code | |
| self.message = message | |
| self.http_status = http_status | |
| super().__init__(message) | |
| def to_dict(self) -> dict[str, Any]: | |
| return { | |
| "success": False, | |
| "stage": self.stage, | |
| "errorCode": self.code, | |
| "message": self.message, | |
| } | |
| def _err(stage: str, code: str, msg: str, status: int = 422) -> ExtractionError: | |
| """Shorthand constructor.""" | |
| return ExtractionError(stage, code, msg, status) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Active request helpers | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _inc_active() -> None: | |
| global _active_requests | |
| with _active_lock: | |
| _active_requests += 1 | |
| def _dec_active() -> None: | |
| global _active_requests | |
| with _active_lock: | |
| _active_requests = max(0, _active_requests - 1) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Pipeline readiness (lazy, first-request check) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _pipeline_ready: bool = False | |
| _pipeline_lock = threading.Lock() | |
| def _ensure_pipeline() -> None: | |
| """ | |
| Verify MinerU is importable and its config is present. | |
| Sets _pipeline_ready on first success; raises ExtractionError on failure. | |
| Checks are done under a lock so concurrent first-requests don't race. | |
| """ | |
| global _pipeline_ready, _model_load_ms | |
| if _pipeline_ready: | |
| return | |
| with _pipeline_lock: | |
| if _pipeline_ready: # double-checked locking | |
| return | |
| config_path = os.path.expanduser("~/magic-pdf.json") | |
| if not os.path.exists(config_path): | |
| raise _err( | |
| "model_load", "CONFIG_MISSING", | |
| f"magic-pdf.json not found at {config_path}. " | |
| "Check Docker build log for download_models.py output.", | |
| 503, | |
| ) | |
| # Trigger a lightweight import to confirm the package is usable. | |
| t0 = time.perf_counter() | |
| try: | |
| from magic_pdf.data.dataset import PymuDocDataset, ImageDataset # noqa: F401 | |
| from magic_pdf.data.data_reader_writer import ( # noqa: F401 | |
| FileBasedDataReader, FileBasedDataWriter, | |
| ) | |
| except ImportError as exc: | |
| raise _err( | |
| "model_load", "IMPORT_FAILED", | |
| f"magic_pdf not importable: {exc}. Check Dockerfile pip layers.", | |
| 503, | |
| ) from exc | |
| _model_load_ms = int((time.perf_counter() - t0) * 1000) | |
| _pipeline_ready = True | |
| logger.info("Pipeline ready (import check: %d ms).", _model_load_ms) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # FastAPI app | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| app = FastAPI( | |
| title="MinerU OCR Service", | |
| description=( | |
| "OCR and document extraction via MinerU pipeline backend (CPU). " | |
| "Supports PDF and image files up to 30 MB." | |
| ), | |
| version="1.1.0", | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["GET", "POST"], | |
| allow_headers=["*"], | |
| ) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Startup self-test | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def startup_self_test() -> None: | |
| """ | |
| Run at container startup. Verifies all critical dependencies are present. | |
| Never crashes the server β issues are stored in _startup_issues and | |
| surfaced via GET /status so operators can diagnose without SSH access. | |
| """ | |
| global _startup_done | |
| issues: list[str] = [] | |
| # 1. cv2 β most common missing dependency | |
| try: | |
| import cv2 # noqa: F401 | |
| logger.info("startup β cv2 available (version %s)", cv2.__version__) | |
| except ImportError as exc: | |
| msg = ( | |
| f"cv2 not importable: {exc}. " | |
| "Add 'opencv-python-headless>=4.8.0' to pip layer 1 in Dockerfile." | |
| ) | |
| issues.append(msg) | |
| logger.critical("startup FAIL %s", msg) | |
| # 2. magic_pdf core imports | |
| try: | |
| from magic_pdf.data.dataset import PymuDocDataset, ImageDataset # noqa: F401 | |
| from magic_pdf.data.data_reader_writer import ( # noqa: F401 | |
| FileBasedDataReader, FileBasedDataWriter, | |
| ) | |
| from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze # noqa: F401 | |
| from magic_pdf.config.enums import SupportedPdfParseMethod # noqa: F401 | |
| logger.info("startup β magic_pdf imports OK") | |
| except ImportError as exc: | |
| msg = f"magic_pdf not importable: {exc}" | |
| issues.append(msg) | |
| logger.critical("startup FAIL %s", msg) | |
| # 3. MinerU config | |
| config_path = os.path.expanduser("~/magic-pdf.json") | |
| if os.path.exists(config_path): | |
| logger.info("startup β magic-pdf.json found at %s", config_path) | |
| else: | |
| msg = f"magic-pdf.json missing at {config_path} β run download_models.py" | |
| issues.append(msg) | |
| logger.critical("startup FAIL %s", msg) | |
| # 4. Model files | |
| models_dir = "/app/models/PDF-Extract-Kit-1.0/models" | |
| if os.path.isdir(models_dir): | |
| logger.info("startup β models directory found at %s", models_dir) | |
| else: | |
| msg = f"Models directory missing at {models_dir} β run download_models.py" | |
| issues.append(msg) | |
| logger.critical("startup FAIL %s", msg) | |
| # 5. Temp storage writable | |
| try: | |
| td = tempfile.mkdtemp(prefix="mineru_selftest_") | |
| shutil.rmtree(td) | |
| logger.info("startup β temp storage writable") | |
| except Exception as exc: | |
| msg = f"Temp storage not writable: {exc}" | |
| issues.append(msg) | |
| logger.critical("startup FAIL %s", msg) | |
| _startup_issues.extend(issues) | |
| _startup_done = True | |
| if not issues: | |
| logger.info("=" * 60) | |
| logger.info("Startup self-test PASSED β service ready.") | |
| logger.info("=" * 60) | |
| else: | |
| logger.error("=" * 60) | |
| logger.error("Startup self-test FAILED β %d issue(s). See above.", len(issues)) | |
| logger.error("Service will start but /extract will fail until fixed.") | |
| logger.error("=" * 60) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # GET /health | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def health() -> dict[str, Any]: | |
| """ | |
| Liveness probe. Always returns 200 so HF Space marks the container as | |
| running. Use GET /status to check whether the OCR pipeline is ready. | |
| """ | |
| return {"status": "healthy"} | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # GET /status | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def status() -> dict[str, Any]: | |
| """ | |
| Full readiness report. Memory is read from cgroups (not /proc/meminfo) | |
| so the container's actual allocation is reported β not the host's RAM. | |
| /proc/meminfo inside a Docker container on HF shows the host machine's | |
| RAM (e.g. 123 GB) which is misleading. Cgroups v2 β v1 β /proc fallback. | |
| """ | |
| used_mb, total_mb = _mem_mb() | |
| return { | |
| "status": "healthy" if not _startup_issues else "degraded", | |
| "provider": "mineru", | |
| "version": _mineru_version(), | |
| "modelsLoaded": _pipeline_ready, | |
| "startupIssues": _startup_issues, | |
| "uptimeSeconds": int(time.time() - _START_TIME), | |
| "memoryUsedMB": used_mb, | |
| "memoryTotalMB": total_mb, | |
| "activeRequests": _active_requests, | |
| "cacheEntries": len(_cache), | |
| "lastModelLoadMs": _model_load_ms, | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # POST /extract β single file | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def extract(file: UploadFile = File(...)) -> JSONResponse: | |
| try: | |
| _ensure_pipeline() | |
| raw, filename, ext = await _read_upload(file) | |
| result = _run_extraction(raw, filename, ext) | |
| return JSONResponse(content=result) | |
| except ExtractionError as exc: | |
| logger.warning( | |
| "/extract structured error [%s/%s]: %s", | |
| exc.stage, exc.code, exc.message, | |
| ) | |
| return JSONResponse(status_code=exc.http_status, content=exc.to_dict()) | |
| except HTTPException: | |
| raise | |
| except Exception as exc: | |
| logger.exception("/extract unhandled error") | |
| return JSONResponse( | |
| status_code=500, | |
| content={ | |
| "success": False, | |
| "stage": "unknown", | |
| "errorCode": "INTERNAL_ERROR", | |
| "message": str(exc), | |
| "traceback": traceback.format_exc()[-2000:], | |
| }, | |
| ) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # POST /batch β up to 8 files; extras silently ignored | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def batch(files: list[UploadFile] = File(...)) -> JSONResponse: | |
| """ | |
| Policy: | |
| - 1β8 files β process all. | |
| - > 8 files β silently process only files[0:8]. | |
| Sequential processing to stay within CPU Basic memory limits. | |
| Per-file failures use the structured error format; one failure never | |
| aborts the rest of the batch. | |
| """ | |
| try: | |
| _ensure_pipeline() | |
| except ExtractionError as exc: | |
| return JSONResponse(status_code=exc.http_status, content=exc.to_dict()) | |
| candidates = files[:BATCH_MAX_FILES] | |
| results: list[dict[str, Any]] = [] | |
| for upload in candidates: | |
| try: | |
| raw, filename, ext = await _read_upload(upload) | |
| result = _run_extraction(raw, filename, ext) | |
| except ExtractionError as exc: | |
| result = exc.to_dict() | |
| result["filename"] = _sanitize_filename(upload.filename or "upload") | |
| except Exception as exc: | |
| fname = _sanitize_filename(upload.filename or "upload") | |
| logger.exception("Batch item failed: %s", fname) | |
| result = { | |
| "success": False, | |
| "filename": fname, | |
| "stage": "unknown", | |
| "errorCode": "INTERNAL_ERROR", | |
| "message": str(exc), | |
| } | |
| results.append(result) | |
| return JSONResponse(content={ | |
| "success": True, | |
| "processed": len(results), | |
| "results": results, | |
| }) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Upload reader (shared by /extract and /batch) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def _read_upload(upload: UploadFile) -> tuple[bytes, str, str]: | |
| """Validate and read one upload. Returns (raw_bytes, filename, ext).""" | |
| filename = _sanitize_filename(upload.filename or "upload") | |
| ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else "" | |
| if ext not in ALLOWED_EXTENSIONS: | |
| raise _err( | |
| "validation", "UNSUPPORTED_TYPE", | |
| f"Unsupported file type '.{ext}'. " | |
| f"Supported: {sorted(ALLOWED_EXTENSIONS)}", | |
| 415, | |
| ) | |
| raw = await upload.read(MAX_UPLOAD_BYTES + 1) | |
| if len(raw) > MAX_UPLOAD_BYTES: | |
| raise _err( | |
| "upload", "FILE_TOO_LARGE", | |
| f"'{filename}' exceeds the {MAX_UPLOAD_BYTES // 1024 // 1024} MB limit.", | |
| 413, | |
| ) | |
| if len(raw) == 0: | |
| raise _err("upload", "EMPTY_FILE", f"'{filename}' is empty.", 400) | |
| return raw, filename, ext | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Extraction dispatcher (shared by /extract and /batch) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _run_extraction(raw: bytes, filename: str, ext: str) -> dict[str, Any]: | |
| """ | |
| 1. SHA256 cache lookup β return immediately on hit (cached: true) | |
| 2. Memory safety guard β raise ExtractionError(LOW_MEMORY) if OOM likely | |
| 3. Dispatch to PDF or image processor | |
| 4. Cache successful result | |
| 5. Return with timing metadata | |
| """ | |
| # ββ SHA256 cache ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| file_hash = hashlib.sha256(raw).hexdigest() | |
| with _cache_lock: | |
| cached = _cache.get(file_hash) | |
| if cached is not None: | |
| logger.info("Cache hit %s sha256=%.12sβ¦", filename, file_hash) | |
| result = {**cached} | |
| result["metadata"] = {**cached["metadata"], "cached": True, "processingTimeMs": 0} | |
| return result | |
| # ββ Memory safety guard βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _assert_memory_safe(raw, ext) | |
| # ββ Process βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _inc_active() | |
| work_dir = tempfile.mkdtemp(prefix="mineru_") | |
| try: | |
| t0 = time.perf_counter() | |
| if ext in PDF_EXTENSIONS: | |
| result = _process_pdf(raw, filename, work_dir) | |
| else: | |
| result = _process_image(raw, filename, ext, work_dir) | |
| elapsed_ms = int((time.perf_counter() - t0) * 1000) | |
| result["metadata"] = { | |
| **result["metadata"], | |
| "processingTimeMs": elapsed_ms, | |
| "cached": False, | |
| } | |
| # Store without timing so cache entries stay lean | |
| entry = {**result, "metadata": {k: v for k, v in result["metadata"].items() | |
| if k not in ("processingTimeMs", "cached")}} | |
| with _cache_lock: | |
| _cache[file_hash] = entry | |
| return result | |
| except ExtractionError: | |
| raise | |
| except Exception as exc: | |
| logger.exception("Extraction failed for %s", filename) | |
| raise _err( | |
| "unknown", "INTERNAL_ERROR", | |
| f"Unexpected error during extraction: {exc}", | |
| 500, | |
| ) from exc | |
| finally: | |
| _dec_active() | |
| shutil.rmtree(work_dir, ignore_errors=True) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Memory safety guard | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _assert_memory_safe(raw: bytes, ext: str) -> None: | |
| """ | |
| Estimate peak memory the pipeline needs and reject with LOW_MEMORY if | |
| available would drop below MEM_SAFETY_FLOOR_MB. | |
| """ | |
| used_mb, total_mb = _mem_mb() | |
| if total_mb == 0: | |
| return # cgroups and /proc both unavailable β skip guard | |
| available_mb = total_mb - used_mb | |
| if ext in PDF_EXTENSIONS: | |
| page_count = max(1, _pdf_page_count(raw)) | |
| estimated_mb = (page_count * BYTES_PER_OCR_PAGE) // (1024 * 1024) | |
| else: | |
| estimated_mb = _image_memory_estimate(raw, ext) // (1024 * 1024) | |
| free_after = available_mb - estimated_mb | |
| logger.info( | |
| "Memory check: avail=%d MB est=%d MB free_after=%d MB", | |
| available_mb, estimated_mb, free_after, | |
| ) | |
| if free_after < MEM_SAFETY_FLOOR_MB: | |
| raise _err( | |
| "validation", "LOW_MEMORY", | |
| f"Insufficient memory. " | |
| f"Available: {available_mb} MB, " | |
| f"Estimated needed: {estimated_mb} MB, " | |
| f"Safety floor: {MEM_SAFETY_FLOOR_MB} MB. " | |
| "Try a smaller file or wait for active requests to complete.", | |
| 507, | |
| ) | |
| def _image_memory_estimate(raw: bytes, ext: str) -> int: | |
| try: | |
| if ext in {"heic", "heif"}: | |
| try: | |
| from pillow_heif import register_heif_opener | |
| register_heif_opener() | |
| except ImportError: | |
| pass | |
| img = Image.open(io.BytesIO(raw)) | |
| w, h = img.size | |
| channels = len(img.getbands()) or 3 | |
| img.close() | |
| return w * h * channels * IMAGE_MEMORY_FACTOR | |
| except Exception: | |
| return len(raw) * 20 # conservative fallback | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # PDF processor | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _process_pdf(raw: bytes, filename: str, work_dir: str) -> dict[str, Any]: | |
| from magic_pdf.data.data_reader_writer import FileBasedDataWriter | |
| from magic_pdf.data.dataset import PymuDocDataset | |
| from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze | |
| from magic_pdf.config.enums import SupportedPdfParseMethod | |
| images_dir = os.path.join(work_dir, "images") | |
| os.makedirs(images_dir, exist_ok=True) | |
| try: | |
| image_writer = FileBasedDataWriter(images_dir) | |
| except Exception as exc: | |
| raise _err("decode", "PDF_WRITER_FAILED", f"Could not create image writer: {exc}") from exc | |
| page_count = _pdf_page_count(raw) | |
| try: | |
| ds = PymuDocDataset(raw) | |
| method = ds.classify() | |
| except Exception as exc: | |
| raise _err("decode", "PDF_PARSE_FAILED", f"Could not parse PDF: {exc}") from exc | |
| try: | |
| if method == SupportedPdfParseMethod.TXT: | |
| infer_result = ds.apply(doc_analyze, ocr=False) | |
| pipe_result = infer_result.pipe_txt_mode(image_writer) | |
| parse_method = "txt" | |
| confidence = 0.95 | |
| else: | |
| infer_result = ds.apply(doc_analyze, ocr=True) | |
| pipe_result = infer_result.pipe_ocr_mode(image_writer) | |
| parse_method = "ocr" | |
| confidence = 0.85 | |
| except Exception as exc: | |
| raise _err("ocr", "OCR_PIPELINE_FAILED", f"doc_analyze/pipe failed: {exc}") from exc | |
| try: | |
| markdown = pipe_result.get_markdown(images_dir) | |
| except Exception as exc: | |
| raise _err("markdown", "MARKDOWN_FAILED", f"get_markdown failed: {exc}") from exc | |
| content_list = _safe_content_list(pipe_result, images_dir) | |
| doc_type = _classify_document(markdown, filename) | |
| return { | |
| "success": True, | |
| "filename": filename, | |
| "docType": doc_type, | |
| "pageCount": page_count, | |
| "confidence": confidence, | |
| "markdown": markdown, | |
| "metadata": { | |
| "parseMethod": parse_method, | |
| "backend": "pipeline", | |
| "docTypeClassification": doc_type, | |
| "imageCount": _count_images(content_list), | |
| "tableCount": _count_tables(content_list), | |
| "formulaCount": _count_formulas(content_list), | |
| }, | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Image processor | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _process_image(raw: bytes, filename: str, ext: str, work_dir: str) -> dict[str, Any]: | |
| """ | |
| NOTE: read_local_images() is intentionally NOT used here. | |
| In magic-pdf 1.x it expects a single path string; passing a list causes: | |
| "stat: path should be string, bytes, os.PathLike or integer, not list" | |
| We use FileBasedDataReader + ImageDataset directly β explicit and safe. | |
| """ | |
| from magic_pdf.data.data_reader_writer import FileBasedDataReader, FileBasedDataWriter | |
| from magic_pdf.data.dataset import ImageDataset | |
| from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze | |
| images_dir = os.path.join(work_dir, "images") | |
| os.makedirs(images_dir, exist_ok=True) | |
| try: | |
| image_writer = FileBasedDataWriter(images_dir) | |
| except Exception as exc: | |
| raise _err("decode", "IMAGE_WRITER_FAILED", f"Could not create image writer: {exc}") from exc | |
| # Convert non-native formats to PNG before feeding to MinerU | |
| try: | |
| if ext in PILLOW_IMAGE_EXTENSIONS: | |
| raw = _convert_image_to_png(raw, ext) | |
| save_ext = "png" | |
| else: | |
| save_ext = ext | |
| except ExtractionError: | |
| raise | |
| except Exception as exc: | |
| raise _err("decode", "IMAGE_DECODE_FAILED", f"Could not decode image: {exc}") from exc | |
| img_filename = f"input.{save_ext}" | |
| img_path = os.path.join(work_dir, img_filename) | |
| try: | |
| with open(img_path, "wb") as fh: | |
| fh.write(raw) | |
| except OSError as exc: | |
| raise _err("decode", "WRITE_FAILED", f"Could not write temp image: {exc}") from exc | |
| try: | |
| # FileBasedDataReader(base_dir).read(relative_name) β bytes | |
| reader = FileBasedDataReader(work_dir) | |
| image_bytes = reader.read(img_filename) | |
| ds = ImageDataset(image_bytes) | |
| except Exception as exc: | |
| raise _err("decode", "IMAGE_DATASET_FAILED", | |
| f"Could not build ImageDataset: {exc}") from exc | |
| try: | |
| infer_result = ds.apply(doc_analyze, ocr=True) | |
| pipe_result = infer_result.pipe_ocr_mode(image_writer) | |
| except Exception as exc: | |
| raise _err("ocr", "OCR_PIPELINE_FAILED", f"doc_analyze/pipe failed: {exc}") from exc | |
| try: | |
| markdown = pipe_result.get_markdown(images_dir) | |
| except Exception as exc: | |
| raise _err("markdown", "MARKDOWN_FAILED", f"get_markdown failed: {exc}") from exc | |
| content_list = _safe_content_list(pipe_result, images_dir) | |
| doc_type = _classify_document(markdown, filename) | |
| return { | |
| "success": True, | |
| "filename": filename, | |
| "docType": doc_type, | |
| "pageCount": 1, | |
| "confidence": 0.85, | |
| "markdown": markdown, | |
| "metadata": { | |
| "parseMethod": "ocr", | |
| "backend": "pipeline", | |
| "docTypeClassification": doc_type, | |
| "imageCount": _count_images(content_list), | |
| "tableCount": _count_tables(content_list), | |
| "formulaCount": _count_formulas(content_list), | |
| }, | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Utility helpers | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _sanitize_filename(name: str) -> str: | |
| name = os.path.basename(name) | |
| name = re.sub(r"[^\w.\-]", "_", name) | |
| return name[:200] or "upload" | |
| def _pdf_page_count(raw: bytes) -> int: | |
| try: | |
| doc = fitz.open(stream=raw, filetype="pdf") | |
| count = doc.page_count | |
| doc.close() | |
| return count | |
| except Exception: | |
| return 0 | |
| def _convert_image_to_png(raw: bytes, ext: str) -> bytes: | |
| if ext in {"heic", "heif"}: | |
| try: | |
| from pillow_heif import register_heif_opener | |
| register_heif_opener() | |
| except ImportError: | |
| raise _err( | |
| "decode", "HEIF_NOT_SUPPORTED", | |
| "HEIC/HEIF support requires pillow-heif (not installed).", | |
| 415, | |
| ) | |
| try: | |
| img = Image.open(io.BytesIO(raw)).convert("RGB") | |
| buf = io.BytesIO() | |
| img.save(buf, format="PNG") | |
| return buf.getvalue() | |
| except Exception as exc: | |
| raise _err("decode", "IMAGE_DECODE_FAILED", f"Pillow could not open image: {exc}") from exc | |
| def _safe_content_list(pipe_result: Any, images_dir: str) -> list[dict]: | |
| try: | |
| return pipe_result.get_content_list(images_dir) or [] | |
| except Exception: | |
| return [] | |
| def _count_images(content_list: list[dict]) -> int: | |
| return sum(1 for item in content_list if item.get("type") == "image") | |
| def _count_tables(content_list: list[dict]) -> int: | |
| return sum(1 for item in content_list if item.get("type") == "table") | |
| def _count_formulas(content_list: list[dict]) -> int: | |
| return sum( | |
| 1 for item in content_list | |
| if item.get("type") in {"equation", "formula", "interline_equation"} | |
| ) | |
| def _classify_document(markdown: str, filename: str) -> str: | |
| """Keyword-based document type heuristic over extracted Markdown + filename.""" | |
| text = (markdown + " " + filename).lower() | |
| rules: list[tuple[str, list[str]]] = [ | |
| ("invoice", ["invoice", "bill to", "invoice number", "invoice #", | |
| "due date", "amount due", "subtotal", "tax invoice"]), | |
| ("receipt", ["receipt", "thank you for your purchase", "order total", | |
| "payment received", "transaction id", "cash receipt"]), | |
| ("marksheet", ["marksheet", "mark sheet", "grade sheet", "scorecard", | |
| "score card", "cgpa", "sgpa", "semester result", | |
| "result sheet", "marks obtained"]), | |
| ("resume", ["curriculum vitae", "cv", "resume", "work experience", | |
| "education", "skills", "references", "objective", | |
| "professional summary"]), | |
| ("research paper", ["abstract", "introduction", "methodology", "conclusion", | |
| "references", "keywords", "doi:", "arxiv", "journal", | |
| "proceedings"]), | |
| ("form", ["please fill", "signature", "date of birth", "applicant", | |
| "application form", "form no", "checkbox", "tick", "field"]), | |
| ("contract", ["agreement", "hereby", "whereas", "terms and conditions", | |
| "party of the first", "signed by", "witnesseth", | |
| "indemnify", "governing law"]), | |
| ("screenshot", ["screenshot", "screen capture", "url:", "http://", | |
| "https://", "browser", "toolbar", "desktop"]), | |
| ] | |
| scores: dict[str, int] = {} | |
| for doc_type, keywords in rules: | |
| score = sum(1 for kw in keywords if kw in text) | |
| if score: | |
| scores[doc_type] = score | |
| return max(scores, key=lambda k: scores[k]) if scores else "generic document" | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Memory β cgroup-aware (fixes "105 GB / 123 GB" /proc/meminfo host bleed) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _mem_mb() -> tuple[int, int]: | |
| """ | |
| Return (used_mb, total_mb) for the CONTAINER, not the host. | |
| Priority: | |
| 1. cgroups v2 /sys/fs/cgroup/memory.max + memory.current | |
| 2. cgroups v1 /sys/fs/cgroup/memory/memory.limit_in_bytes + usage_in_bytes | |
| 3. /proc/meminfo fallback (may show host memory in Docker β known inaccuracy) | |
| /proc/meminfo is last resort because HF Docker containers typically do NOT | |
| have cgroup memory limits mapped into /proc, so it shows the physical host | |
| RAM (e.g. 123 GB on a 128 GB bare-metal host), misleading the memory guard. | |
| """ | |
| # ββ cgroups v2 (preferred β modern Docker / HF Spaces) βββββββββββββββββββ | |
| try: | |
| with open("/sys/fs/cgroup/memory.max") as f: | |
| raw_max = f.read().strip() | |
| if raw_max != "max": | |
| limit_bytes = int(raw_max) | |
| with open("/sys/fs/cgroup/memory.current") as f: | |
| used_bytes = int(f.read().strip()) | |
| if limit_bytes > 0: | |
| return used_bytes // (1024 * 1024), limit_bytes // (1024 * 1024) | |
| except (FileNotFoundError, ValueError, OSError): | |
| pass | |
| # ββ cgroups v1 ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| try: | |
| with open("/sys/fs/cgroup/memory/memory.limit_in_bytes") as f: | |
| limit_bytes = int(f.read().strip()) | |
| with open("/sys/fs/cgroup/memory/memory.usage_in_bytes") as f: | |
| used_bytes = int(f.read().strip()) | |
| # Unconstrained cgroup reports a sentinel > 1 PB; skip it | |
| if limit_bytes < 128 * 1024 * 1024 * 1024: | |
| return used_bytes // (1024 * 1024), limit_bytes // (1024 * 1024) | |
| except (FileNotFoundError, ValueError, OSError): | |
| pass | |
| # ββ /proc/meminfo fallback ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| try: | |
| info: dict[str, int] = {} | |
| with open("/proc/meminfo") as f: | |
| for line in f: | |
| parts = line.split() | |
| if len(parts) >= 2: | |
| info[parts[0].rstrip(":")] = int(parts[1]) # values are in kB | |
| total_kb = info.get("MemTotal", 0) | |
| avail_kb = info.get("MemAvailable", 0) | |
| used_kb = total_kb - avail_kb | |
| return used_kb // 1024, total_kb // 1024 | |
| except Exception: | |
| return 0, 0 | |
| def _mineru_version() -> str: | |
| for pkg in ("magic-pdf", "mineru"): | |
| try: | |
| return pkg_version(pkg) | |
| except Exception: | |
| continue | |
| return "unknown" | |