node-2 / main.py
sfdghsdvxfbgn's picture
Upload 7 files
d42d358 verified
Raw
History Blame Contribute Delete
39.7 kB
"""
MinerU OCR & Document Extraction Service
FastAPI application for Hugging Face Docker Space (CPU / pipeline backend)
Correct imports for magic-pdf >= 1.0.x (magic_pdf module):
from magic_pdf.data.data_reader_writer import FileBasedDataReader, FileBasedDataWriter
from magic_pdf.data.dataset import PymuDocDataset, ImageDataset
from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
from magic_pdf.config.enums import SupportedPdfParseMethod
OBSOLETE imports (removed in magic-pdf >= 1.0, do not use):
from magic_pdf.pipe.UNIPipe import UNIPipe ← removed
from magic_pdf.rw.DiskReaderWriter import ... ← removed
from magic_pdf.data.read_api import read_local_images ← NOT used here;
function expects a single string path in 1.x but crashes with
"stat: ... not list" if given a list. Use FileBasedDataReader instead.
Removed features vs original:
- DOCX / PPTX / XLSX (required LibreOffice; caused build OOM/timeout)
- subprocess (was only used for LibreOffice conversion)
- python-magic / libmagic (listed in requirements but never imported)
Endpoints:
GET /health β€” liveness (always fast, no dependency check)
GET /status β€” full node status including memory (via cgroups), uptime,
cache, active requests, lastModelLoadMs
POST /extract β€” single file (PDF or image) with SHA256 cache + memory guard
POST /batch β€” up to BATCH_MAX_FILES files; extras silently ignored
Structured error format (all non-2xx responses from /extract and /batch):
{
"success": false,
"stage": "upload" | "validation" | "decode" | "ocr" | "markdown" | "unknown",
"errorCode": "UNSUPPORTED_TYPE" | "FILE_TOO_LARGE" | "EMPTY_FILE" |
"LOW_MEMORY" | "IMAGE_DECODE_FAILED" | "OCR_PIPELINE_FAILED" |
"MARKDOWN_FAILED" | "INTERNAL_ERROR",
"message": "<human-readable detail>"
}
"""
import hashlib
import io
import os
import re
import shutil
import sys
import tempfile
import threading
import time
import traceback
import logging
from importlib.metadata import version as pkg_version
from typing import Any
import fitz # PyMuPDF β€” bundled with magic-pdf[full-cpu]
from PIL import Image
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
# ── Logging ───────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(name)s %(message)s",
)
logger = logging.getLogger("mineru-service")
# ── Start time ────────────────────────────────────────────────────────────────
_START_TIME: float = time.time()
# ── Upload / batch constants ──────────────────────────────────────────────────
MAX_UPLOAD_BYTES = 30 * 1024 * 1024 # 30 MB
BATCH_MAX_FILES = 8
# ── Supported file types ──────────────────────────────────────────────────────
PDF_EXTENSIONS = {"pdf"}
# Natively supported by ImageDataset via FileBasedDataReader
NATIVE_IMAGE_EXTENSIONS = {"jpg", "jpeg", "png"}
# Need Pillow conversion to PNG before feeding to MinerU
PILLOW_IMAGE_EXTENSIONS = {"webp", "bmp", "tiff", "tif", "gif", "heic", "heif", "avif"}
IMAGE_EXTENSIONS = NATIVE_IMAGE_EXTENSIONS | PILLOW_IMAGE_EXTENSIONS
ALLOWED_EXTENSIONS = PDF_EXTENSIONS | IMAGE_EXTENSIONS
# ── Memory safety thresholds ──────────────────────────────────────────────────
BYTES_PER_OCR_PAGE = 100 * 1024 * 1024 # ~100 MB / page (conservative)
IMAGE_MEMORY_FACTOR = 4 # decoded pixels Γ— 4 for pipeline buffers
MEM_SAFETY_FLOOR_MB = 1024 # always keep 1 GB free
# ── SHA256 extraction cache (in-process, bounded by available RAM) ────────────
_cache: dict[str, dict[str, Any]] = {}
_cache_lock = threading.Lock()
# ── Active-request counter ────────────────────────────────────────────────────
_active_requests: int = 0
_active_lock = threading.Lock()
# ── Model load timing ─────────────────────────────────────────────────────────
_model_load_ms: int = 0
# ── Startup self-test results (populated by startup handler) ──────────────────
_startup_issues: list[str] = []
_startup_done: bool = False
# ─────────────────────────────────────────────────────────────────────────────
# Structured error exception
# ─────────────────────────────────────────────────────────────────────────────
class ExtractionError(Exception):
"""
Raised anywhere in the extraction pipeline to produce a structured
JSON error response with stage + errorCode instead of a generic 500.
"""
def __init__(
self,
stage: str,
code: str,
message: str,
http_status: int = 422,
) -> None:
self.stage = stage
self.code = code
self.message = message
self.http_status = http_status
super().__init__(message)
def to_dict(self) -> dict[str, Any]:
return {
"success": False,
"stage": self.stage,
"errorCode": self.code,
"message": self.message,
}
def _err(stage: str, code: str, msg: str, status: int = 422) -> ExtractionError:
"""Shorthand constructor."""
return ExtractionError(stage, code, msg, status)
# ─────────────────────────────────────────────────────────────────────────────
# Active request helpers
# ─────────────────────────────────────────────────────────────────────────────
def _inc_active() -> None:
global _active_requests
with _active_lock:
_active_requests += 1
def _dec_active() -> None:
global _active_requests
with _active_lock:
_active_requests = max(0, _active_requests - 1)
# ─────────────────────────────────────────────────────────────────────────────
# Pipeline readiness (lazy, first-request check)
# ─────────────────────────────────────────────────────────────────────────────
_pipeline_ready: bool = False
_pipeline_lock = threading.Lock()
def _ensure_pipeline() -> None:
"""
Verify MinerU is importable and its config is present.
Sets _pipeline_ready on first success; raises ExtractionError on failure.
Checks are done under a lock so concurrent first-requests don't race.
"""
global _pipeline_ready, _model_load_ms
if _pipeline_ready:
return
with _pipeline_lock:
if _pipeline_ready: # double-checked locking
return
config_path = os.path.expanduser("~/magic-pdf.json")
if not os.path.exists(config_path):
raise _err(
"model_load", "CONFIG_MISSING",
f"magic-pdf.json not found at {config_path}. "
"Check Docker build log for download_models.py output.",
503,
)
# Trigger a lightweight import to confirm the package is usable.
t0 = time.perf_counter()
try:
from magic_pdf.data.dataset import PymuDocDataset, ImageDataset # noqa: F401
from magic_pdf.data.data_reader_writer import ( # noqa: F401
FileBasedDataReader, FileBasedDataWriter,
)
except ImportError as exc:
raise _err(
"model_load", "IMPORT_FAILED",
f"magic_pdf not importable: {exc}. Check Dockerfile pip layers.",
503,
) from exc
_model_load_ms = int((time.perf_counter() - t0) * 1000)
_pipeline_ready = True
logger.info("Pipeline ready (import check: %d ms).", _model_load_ms)
# ─────────────────────────────────────────────────────────────────────────────
# FastAPI app
# ─────────────────────────────────────────────────────────────────────────────
app = FastAPI(
title="MinerU OCR Service",
description=(
"OCR and document extraction via MinerU pipeline backend (CPU). "
"Supports PDF and image files up to 30 MB."
),
version="1.1.0",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
# ─────────────────────────────────────────────────────────────────────────────
# Startup self-test
# ─────────────────────────────────────────────────────────────────────────────
@app.on_event("startup")
async def startup_self_test() -> None:
"""
Run at container startup. Verifies all critical dependencies are present.
Never crashes the server β€” issues are stored in _startup_issues and
surfaced via GET /status so operators can diagnose without SSH access.
"""
global _startup_done
issues: list[str] = []
# 1. cv2 β€” most common missing dependency
try:
import cv2 # noqa: F401
logger.info("startup βœ“ cv2 available (version %s)", cv2.__version__)
except ImportError as exc:
msg = (
f"cv2 not importable: {exc}. "
"Add 'opencv-python-headless>=4.8.0' to pip layer 1 in Dockerfile."
)
issues.append(msg)
logger.critical("startup FAIL %s", msg)
# 2. magic_pdf core imports
try:
from magic_pdf.data.dataset import PymuDocDataset, ImageDataset # noqa: F401
from magic_pdf.data.data_reader_writer import ( # noqa: F401
FileBasedDataReader, FileBasedDataWriter,
)
from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze # noqa: F401
from magic_pdf.config.enums import SupportedPdfParseMethod # noqa: F401
logger.info("startup βœ“ magic_pdf imports OK")
except ImportError as exc:
msg = f"magic_pdf not importable: {exc}"
issues.append(msg)
logger.critical("startup FAIL %s", msg)
# 3. MinerU config
config_path = os.path.expanduser("~/magic-pdf.json")
if os.path.exists(config_path):
logger.info("startup βœ“ magic-pdf.json found at %s", config_path)
else:
msg = f"magic-pdf.json missing at {config_path} β€” run download_models.py"
issues.append(msg)
logger.critical("startup FAIL %s", msg)
# 4. Model files
models_dir = "/app/models/PDF-Extract-Kit-1.0/models"
if os.path.isdir(models_dir):
logger.info("startup βœ“ models directory found at %s", models_dir)
else:
msg = f"Models directory missing at {models_dir} β€” run download_models.py"
issues.append(msg)
logger.critical("startup FAIL %s", msg)
# 5. Temp storage writable
try:
td = tempfile.mkdtemp(prefix="mineru_selftest_")
shutil.rmtree(td)
logger.info("startup βœ“ temp storage writable")
except Exception as exc:
msg = f"Temp storage not writable: {exc}"
issues.append(msg)
logger.critical("startup FAIL %s", msg)
_startup_issues.extend(issues)
_startup_done = True
if not issues:
logger.info("=" * 60)
logger.info("Startup self-test PASSED β€” service ready.")
logger.info("=" * 60)
else:
logger.error("=" * 60)
logger.error("Startup self-test FAILED β€” %d issue(s). See above.", len(issues))
logger.error("Service will start but /extract will fail until fixed.")
logger.error("=" * 60)
# ─────────────────────────────────────────────────────────────────────────────
# GET /health
# ─────────────────────────────────────────────────────────────────────────────
@app.get("/health")
def health() -> dict[str, Any]:
"""
Liveness probe. Always returns 200 so HF Space marks the container as
running. Use GET /status to check whether the OCR pipeline is ready.
"""
return {"status": "healthy"}
# ─────────────────────────────────────────────────────────────────────────────
# GET /status
# ─────────────────────────────────────────────────────────────────────────────
@app.get("/status")
def status() -> dict[str, Any]:
"""
Full readiness report. Memory is read from cgroups (not /proc/meminfo)
so the container's actual allocation is reported β€” not the host's RAM.
/proc/meminfo inside a Docker container on HF shows the host machine's
RAM (e.g. 123 GB) which is misleading. Cgroups v2 β†’ v1 β†’ /proc fallback.
"""
used_mb, total_mb = _mem_mb()
return {
"status": "healthy" if not _startup_issues else "degraded",
"provider": "mineru",
"version": _mineru_version(),
"modelsLoaded": _pipeline_ready,
"startupIssues": _startup_issues,
"uptimeSeconds": int(time.time() - _START_TIME),
"memoryUsedMB": used_mb,
"memoryTotalMB": total_mb,
"activeRequests": _active_requests,
"cacheEntries": len(_cache),
"lastModelLoadMs": _model_load_ms,
}
# ─────────────────────────────────────────────────────────────────────────────
# POST /extract β€” single file
# ─────────────────────────────────────────────────────────────────────────────
@app.post("/extract")
async def extract(file: UploadFile = File(...)) -> JSONResponse:
try:
_ensure_pipeline()
raw, filename, ext = await _read_upload(file)
result = _run_extraction(raw, filename, ext)
return JSONResponse(content=result)
except ExtractionError as exc:
logger.warning(
"/extract structured error [%s/%s]: %s",
exc.stage, exc.code, exc.message,
)
return JSONResponse(status_code=exc.http_status, content=exc.to_dict())
except HTTPException:
raise
except Exception as exc:
logger.exception("/extract unhandled error")
return JSONResponse(
status_code=500,
content={
"success": False,
"stage": "unknown",
"errorCode": "INTERNAL_ERROR",
"message": str(exc),
"traceback": traceback.format_exc()[-2000:],
},
)
# ─────────────────────────────────────────────────────────────────────────────
# POST /batch β€” up to 8 files; extras silently ignored
# ─────────────────────────────────────────────────────────────────────────────
@app.post("/batch")
async def batch(files: list[UploadFile] = File(...)) -> JSONResponse:
"""
Policy:
- 1–8 files β†’ process all.
- > 8 files β†’ silently process only files[0:8].
Sequential processing to stay within CPU Basic memory limits.
Per-file failures use the structured error format; one failure never
aborts the rest of the batch.
"""
try:
_ensure_pipeline()
except ExtractionError as exc:
return JSONResponse(status_code=exc.http_status, content=exc.to_dict())
candidates = files[:BATCH_MAX_FILES]
results: list[dict[str, Any]] = []
for upload in candidates:
try:
raw, filename, ext = await _read_upload(upload)
result = _run_extraction(raw, filename, ext)
except ExtractionError as exc:
result = exc.to_dict()
result["filename"] = _sanitize_filename(upload.filename or "upload")
except Exception as exc:
fname = _sanitize_filename(upload.filename or "upload")
logger.exception("Batch item failed: %s", fname)
result = {
"success": False,
"filename": fname,
"stage": "unknown",
"errorCode": "INTERNAL_ERROR",
"message": str(exc),
}
results.append(result)
return JSONResponse(content={
"success": True,
"processed": len(results),
"results": results,
})
# ─────────────────────────────────────────────────────────────────────────────
# Upload reader (shared by /extract and /batch)
# ─────────────────────────────────────────────────────────────────────────────
async def _read_upload(upload: UploadFile) -> tuple[bytes, str, str]:
"""Validate and read one upload. Returns (raw_bytes, filename, ext)."""
filename = _sanitize_filename(upload.filename or "upload")
ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
if ext not in ALLOWED_EXTENSIONS:
raise _err(
"validation", "UNSUPPORTED_TYPE",
f"Unsupported file type '.{ext}'. "
f"Supported: {sorted(ALLOWED_EXTENSIONS)}",
415,
)
raw = await upload.read(MAX_UPLOAD_BYTES + 1)
if len(raw) > MAX_UPLOAD_BYTES:
raise _err(
"upload", "FILE_TOO_LARGE",
f"'{filename}' exceeds the {MAX_UPLOAD_BYTES // 1024 // 1024} MB limit.",
413,
)
if len(raw) == 0:
raise _err("upload", "EMPTY_FILE", f"'{filename}' is empty.", 400)
return raw, filename, ext
# ─────────────────────────────────────────────────────────────────────────────
# Extraction dispatcher (shared by /extract and /batch)
# ─────────────────────────────────────────────────────────────────────────────
def _run_extraction(raw: bytes, filename: str, ext: str) -> dict[str, Any]:
"""
1. SHA256 cache lookup β†’ return immediately on hit (cached: true)
2. Memory safety guard β†’ raise ExtractionError(LOW_MEMORY) if OOM likely
3. Dispatch to PDF or image processor
4. Cache successful result
5. Return with timing metadata
"""
# ── SHA256 cache ──────────────────────────────────────────────────────────
file_hash = hashlib.sha256(raw).hexdigest()
with _cache_lock:
cached = _cache.get(file_hash)
if cached is not None:
logger.info("Cache hit %s sha256=%.12s…", filename, file_hash)
result = {**cached}
result["metadata"] = {**cached["metadata"], "cached": True, "processingTimeMs": 0}
return result
# ── Memory safety guard ───────────────────────────────────────────────────
_assert_memory_safe(raw, ext)
# ── Process ───────────────────────────────────────────────────────────────
_inc_active()
work_dir = tempfile.mkdtemp(prefix="mineru_")
try:
t0 = time.perf_counter()
if ext in PDF_EXTENSIONS:
result = _process_pdf(raw, filename, work_dir)
else:
result = _process_image(raw, filename, ext, work_dir)
elapsed_ms = int((time.perf_counter() - t0) * 1000)
result["metadata"] = {
**result["metadata"],
"processingTimeMs": elapsed_ms,
"cached": False,
}
# Store without timing so cache entries stay lean
entry = {**result, "metadata": {k: v for k, v in result["metadata"].items()
if k not in ("processingTimeMs", "cached")}}
with _cache_lock:
_cache[file_hash] = entry
return result
except ExtractionError:
raise
except Exception as exc:
logger.exception("Extraction failed for %s", filename)
raise _err(
"unknown", "INTERNAL_ERROR",
f"Unexpected error during extraction: {exc}",
500,
) from exc
finally:
_dec_active()
shutil.rmtree(work_dir, ignore_errors=True)
# ─────────────────────────────────────────────────────────────────────────────
# Memory safety guard
# ─────────────────────────────────────────────────────────────────────────────
def _assert_memory_safe(raw: bytes, ext: str) -> None:
"""
Estimate peak memory the pipeline needs and reject with LOW_MEMORY if
available would drop below MEM_SAFETY_FLOOR_MB.
"""
used_mb, total_mb = _mem_mb()
if total_mb == 0:
return # cgroups and /proc both unavailable β€” skip guard
available_mb = total_mb - used_mb
if ext in PDF_EXTENSIONS:
page_count = max(1, _pdf_page_count(raw))
estimated_mb = (page_count * BYTES_PER_OCR_PAGE) // (1024 * 1024)
else:
estimated_mb = _image_memory_estimate(raw, ext) // (1024 * 1024)
free_after = available_mb - estimated_mb
logger.info(
"Memory check: avail=%d MB est=%d MB free_after=%d MB",
available_mb, estimated_mb, free_after,
)
if free_after < MEM_SAFETY_FLOOR_MB:
raise _err(
"validation", "LOW_MEMORY",
f"Insufficient memory. "
f"Available: {available_mb} MB, "
f"Estimated needed: {estimated_mb} MB, "
f"Safety floor: {MEM_SAFETY_FLOOR_MB} MB. "
"Try a smaller file or wait for active requests to complete.",
507,
)
def _image_memory_estimate(raw: bytes, ext: str) -> int:
try:
if ext in {"heic", "heif"}:
try:
from pillow_heif import register_heif_opener
register_heif_opener()
except ImportError:
pass
img = Image.open(io.BytesIO(raw))
w, h = img.size
channels = len(img.getbands()) or 3
img.close()
return w * h * channels * IMAGE_MEMORY_FACTOR
except Exception:
return len(raw) * 20 # conservative fallback
# ─────────────────────────────────────────────────────────────────────────────
# PDF processor
# ─────────────────────────────────────────────────────────────────────────────
def _process_pdf(raw: bytes, filename: str, work_dir: str) -> dict[str, Any]:
from magic_pdf.data.data_reader_writer import FileBasedDataWriter
from magic_pdf.data.dataset import PymuDocDataset
from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
from magic_pdf.config.enums import SupportedPdfParseMethod
images_dir = os.path.join(work_dir, "images")
os.makedirs(images_dir, exist_ok=True)
try:
image_writer = FileBasedDataWriter(images_dir)
except Exception as exc:
raise _err("decode", "PDF_WRITER_FAILED", f"Could not create image writer: {exc}") from exc
page_count = _pdf_page_count(raw)
try:
ds = PymuDocDataset(raw)
method = ds.classify()
except Exception as exc:
raise _err("decode", "PDF_PARSE_FAILED", f"Could not parse PDF: {exc}") from exc
try:
if method == SupportedPdfParseMethod.TXT:
infer_result = ds.apply(doc_analyze, ocr=False)
pipe_result = infer_result.pipe_txt_mode(image_writer)
parse_method = "txt"
confidence = 0.95
else:
infer_result = ds.apply(doc_analyze, ocr=True)
pipe_result = infer_result.pipe_ocr_mode(image_writer)
parse_method = "ocr"
confidence = 0.85
except Exception as exc:
raise _err("ocr", "OCR_PIPELINE_FAILED", f"doc_analyze/pipe failed: {exc}") from exc
try:
markdown = pipe_result.get_markdown(images_dir)
except Exception as exc:
raise _err("markdown", "MARKDOWN_FAILED", f"get_markdown failed: {exc}") from exc
content_list = _safe_content_list(pipe_result, images_dir)
doc_type = _classify_document(markdown, filename)
return {
"success": True,
"filename": filename,
"docType": doc_type,
"pageCount": page_count,
"confidence": confidence,
"markdown": markdown,
"metadata": {
"parseMethod": parse_method,
"backend": "pipeline",
"docTypeClassification": doc_type,
"imageCount": _count_images(content_list),
"tableCount": _count_tables(content_list),
"formulaCount": _count_formulas(content_list),
},
}
# ─────────────────────────────────────────────────────────────────────────────
# Image processor
# ─────────────────────────────────────────────────────────────────────────────
def _process_image(raw: bytes, filename: str, ext: str, work_dir: str) -> dict[str, Any]:
"""
NOTE: read_local_images() is intentionally NOT used here.
In magic-pdf 1.x it expects a single path string; passing a list causes:
"stat: path should be string, bytes, os.PathLike or integer, not list"
We use FileBasedDataReader + ImageDataset directly β€” explicit and safe.
"""
from magic_pdf.data.data_reader_writer import FileBasedDataReader, FileBasedDataWriter
from magic_pdf.data.dataset import ImageDataset
from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
images_dir = os.path.join(work_dir, "images")
os.makedirs(images_dir, exist_ok=True)
try:
image_writer = FileBasedDataWriter(images_dir)
except Exception as exc:
raise _err("decode", "IMAGE_WRITER_FAILED", f"Could not create image writer: {exc}") from exc
# Convert non-native formats to PNG before feeding to MinerU
try:
if ext in PILLOW_IMAGE_EXTENSIONS:
raw = _convert_image_to_png(raw, ext)
save_ext = "png"
else:
save_ext = ext
except ExtractionError:
raise
except Exception as exc:
raise _err("decode", "IMAGE_DECODE_FAILED", f"Could not decode image: {exc}") from exc
img_filename = f"input.{save_ext}"
img_path = os.path.join(work_dir, img_filename)
try:
with open(img_path, "wb") as fh:
fh.write(raw)
except OSError as exc:
raise _err("decode", "WRITE_FAILED", f"Could not write temp image: {exc}") from exc
try:
# FileBasedDataReader(base_dir).read(relative_name) β†’ bytes
reader = FileBasedDataReader(work_dir)
image_bytes = reader.read(img_filename)
ds = ImageDataset(image_bytes)
except Exception as exc:
raise _err("decode", "IMAGE_DATASET_FAILED",
f"Could not build ImageDataset: {exc}") from exc
try:
infer_result = ds.apply(doc_analyze, ocr=True)
pipe_result = infer_result.pipe_ocr_mode(image_writer)
except Exception as exc:
raise _err("ocr", "OCR_PIPELINE_FAILED", f"doc_analyze/pipe failed: {exc}") from exc
try:
markdown = pipe_result.get_markdown(images_dir)
except Exception as exc:
raise _err("markdown", "MARKDOWN_FAILED", f"get_markdown failed: {exc}") from exc
content_list = _safe_content_list(pipe_result, images_dir)
doc_type = _classify_document(markdown, filename)
return {
"success": True,
"filename": filename,
"docType": doc_type,
"pageCount": 1,
"confidence": 0.85,
"markdown": markdown,
"metadata": {
"parseMethod": "ocr",
"backend": "pipeline",
"docTypeClassification": doc_type,
"imageCount": _count_images(content_list),
"tableCount": _count_tables(content_list),
"formulaCount": _count_formulas(content_list),
},
}
# ─────────────────────────────────────────────────────────────────────────────
# Utility helpers
# ─────────────────────────────────────────────────────────────────────────────
def _sanitize_filename(name: str) -> str:
name = os.path.basename(name)
name = re.sub(r"[^\w.\-]", "_", name)
return name[:200] or "upload"
def _pdf_page_count(raw: bytes) -> int:
try:
doc = fitz.open(stream=raw, filetype="pdf")
count = doc.page_count
doc.close()
return count
except Exception:
return 0
def _convert_image_to_png(raw: bytes, ext: str) -> bytes:
if ext in {"heic", "heif"}:
try:
from pillow_heif import register_heif_opener
register_heif_opener()
except ImportError:
raise _err(
"decode", "HEIF_NOT_SUPPORTED",
"HEIC/HEIF support requires pillow-heif (not installed).",
415,
)
try:
img = Image.open(io.BytesIO(raw)).convert("RGB")
buf = io.BytesIO()
img.save(buf, format="PNG")
return buf.getvalue()
except Exception as exc:
raise _err("decode", "IMAGE_DECODE_FAILED", f"Pillow could not open image: {exc}") from exc
def _safe_content_list(pipe_result: Any, images_dir: str) -> list[dict]:
try:
return pipe_result.get_content_list(images_dir) or []
except Exception:
return []
def _count_images(content_list: list[dict]) -> int:
return sum(1 for item in content_list if item.get("type") == "image")
def _count_tables(content_list: list[dict]) -> int:
return sum(1 for item in content_list if item.get("type") == "table")
def _count_formulas(content_list: list[dict]) -> int:
return sum(
1 for item in content_list
if item.get("type") in {"equation", "formula", "interline_equation"}
)
def _classify_document(markdown: str, filename: str) -> str:
"""Keyword-based document type heuristic over extracted Markdown + filename."""
text = (markdown + " " + filename).lower()
rules: list[tuple[str, list[str]]] = [
("invoice", ["invoice", "bill to", "invoice number", "invoice #",
"due date", "amount due", "subtotal", "tax invoice"]),
("receipt", ["receipt", "thank you for your purchase", "order total",
"payment received", "transaction id", "cash receipt"]),
("marksheet", ["marksheet", "mark sheet", "grade sheet", "scorecard",
"score card", "cgpa", "sgpa", "semester result",
"result sheet", "marks obtained"]),
("resume", ["curriculum vitae", "cv", "resume", "work experience",
"education", "skills", "references", "objective",
"professional summary"]),
("research paper", ["abstract", "introduction", "methodology", "conclusion",
"references", "keywords", "doi:", "arxiv", "journal",
"proceedings"]),
("form", ["please fill", "signature", "date of birth", "applicant",
"application form", "form no", "checkbox", "tick", "field"]),
("contract", ["agreement", "hereby", "whereas", "terms and conditions",
"party of the first", "signed by", "witnesseth",
"indemnify", "governing law"]),
("screenshot", ["screenshot", "screen capture", "url:", "http://",
"https://", "browser", "toolbar", "desktop"]),
]
scores: dict[str, int] = {}
for doc_type, keywords in rules:
score = sum(1 for kw in keywords if kw in text)
if score:
scores[doc_type] = score
return max(scores, key=lambda k: scores[k]) if scores else "generic document"
# ─────────────────────────────────────────────────────────────────────────────
# Memory β€” cgroup-aware (fixes "105 GB / 123 GB" /proc/meminfo host bleed)
# ─────────────────────────────────────────────────────────────────────────────
def _mem_mb() -> tuple[int, int]:
"""
Return (used_mb, total_mb) for the CONTAINER, not the host.
Priority:
1. cgroups v2 /sys/fs/cgroup/memory.max + memory.current
2. cgroups v1 /sys/fs/cgroup/memory/memory.limit_in_bytes + usage_in_bytes
3. /proc/meminfo fallback (may show host memory in Docker β€” known inaccuracy)
/proc/meminfo is last resort because HF Docker containers typically do NOT
have cgroup memory limits mapped into /proc, so it shows the physical host
RAM (e.g. 123 GB on a 128 GB bare-metal host), misleading the memory guard.
"""
# ── cgroups v2 (preferred β€” modern Docker / HF Spaces) ───────────────────
try:
with open("/sys/fs/cgroup/memory.max") as f:
raw_max = f.read().strip()
if raw_max != "max":
limit_bytes = int(raw_max)
with open("/sys/fs/cgroup/memory.current") as f:
used_bytes = int(f.read().strip())
if limit_bytes > 0:
return used_bytes // (1024 * 1024), limit_bytes // (1024 * 1024)
except (FileNotFoundError, ValueError, OSError):
pass
# ── cgroups v1 ────────────────────────────────────────────────────────────
try:
with open("/sys/fs/cgroup/memory/memory.limit_in_bytes") as f:
limit_bytes = int(f.read().strip())
with open("/sys/fs/cgroup/memory/memory.usage_in_bytes") as f:
used_bytes = int(f.read().strip())
# Unconstrained cgroup reports a sentinel > 1 PB; skip it
if limit_bytes < 128 * 1024 * 1024 * 1024:
return used_bytes // (1024 * 1024), limit_bytes // (1024 * 1024)
except (FileNotFoundError, ValueError, OSError):
pass
# ── /proc/meminfo fallback ────────────────────────────────────────────────
try:
info: dict[str, int] = {}
with open("/proc/meminfo") as f:
for line in f:
parts = line.split()
if len(parts) >= 2:
info[parts[0].rstrip(":")] = int(parts[1]) # values are in kB
total_kb = info.get("MemTotal", 0)
avail_kb = info.get("MemAvailable", 0)
used_kb = total_kb - avail_kb
return used_kb // 1024, total_kb // 1024
except Exception:
return 0, 0
def _mineru_version() -> str:
for pkg in ("magic-pdf", "mineru"):
try:
return pkg_version(pkg)
except Exception:
continue
return "unknown"