marker-pdf / app.py
haryde's picture
Update app.py
9e8b5e4 verified
import os
import shutil
import subprocess
import requests
import gradio as gr
from urllib.parse import urlparse
import time
import re
def sanear_nombre(nombre):
nombre_base = os.path.splitext(nombre)[0]
return re.sub(r"[^\w\-_.]", "_", nombre_base)
def obtener_ruta_segura(base_dir, nombre_archivo):
nombre_base = sanear_nombre(nombre_archivo)
ruta_final = os.path.join(base_dir, nombre_base)
contador = 1
while os.path.exists(ruta_final + ".pdf"):
ruta_final = os.path.join(base_dir, f"{nombre_base}_{contador}")
contador += 1
return ruta_final + ".pdf"
def ejecutar_marker(cmd):
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=900)
return result
def procesar_pdf(pdf_file=None, url_pdf=None, formatos=[], verbose="auto"):
output_base = "./marker_output"
os.makedirs(output_base, exist_ok=True)
estado = ""
try:
subprocess.run(["marker_single", "--help"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except FileNotFoundError:
yield "Error: el comando 'marker_single' no está disponible.\nVerifica que marker-pdf esté en requirements.txt", None, None
return
if not formatos:
yield "Debes seleccionar al menos un formato de salida.", None, None
return
try:
if url_pdf:
parsed = urlparse(url_pdf)
nombre_pdf = parsed.path.split("/")[-1] or "documento"
ruta_pdf = obtener_ruta_segura(output_base, nombre_pdf)
response = requests.get(url_pdf)
if response.status_code != 200:
yield "No se pudo descargar el PDF desde la URL proporcionada.", None, None
return
with open(ruta_pdf, "wb") as f:
f.write(response.content)
elif pdf_file:
nombre_pdf = pdf_file.name or "documento"
ruta_pdf = obtener_ruta_segura(output_base, nombre_pdf)
shutil.copyfile(pdf_file.name, ruta_pdf)
else:
yield "No se proporcionó ni archivo ni URL.", None, None
return
with open(ruta_pdf, "rb") as f:
if f.read(4) != b"%PDF":
yield "El archivo proporcionado no es un PDF válido.", None, None
return
if os.path.getsize(ruta_pdf) > 3 * 1024 * 1024:
estado += "Aviso: el PDF es grande y puede tardar más de lo normal.\n"
nombre_sin_ext = os.path.splitext(os.path.basename(ruta_pdf))[0]
carpeta_salida = os.path.join(output_base, nombre_sin_ext)
if os.path.exists(carpeta_salida):
shutil.rmtree(carpeta_salida)
os.makedirs(carpeta_salida, exist_ok=True)
for fmt in formatos:
estado += f"\nProcesando formato '{fmt}'..."
yield estado, None, None
# Primer intento sin OCR (si aplica)
if fmt == "md":
cmd = ["marker_single", ruta_pdf, "--output_format", "markdown", "--disable_ocr", "--output_dir", carpeta_salida]
elif fmt == "md + ocr":
cmd = ["marker_single", ruta_pdf, "--output_format", "markdown", "--output_dir", carpeta_salida]
else:
cmd = ["marker_single", ruta_pdf, "--output_format", fmt, "--output_dir", carpeta_salida]
result = ejecutar_marker(cmd)
md_path = os.path.join(carpeta_salida, f"{nombre_sin_ext}.md")
if fmt == "md" and not os.path.exists(md_path):
estado += "\nNo se generó el archivo Markdown en el primer intento."
estado += f"\n\n---\nSalida del comando marker_single:\n{result.stderr.strip()}\n"
estado += "\nReintentando con OCR..."
yield estado, None, None
start_time = time.time()
cmd = ["marker_single", ruta_pdf, "--output_format", "markdown", "--output_dir", carpeta_salida]
result = ejecutar_marker(cmd)
if not os.path.exists(md_path):
estado += "\nEl reintento con OCR también falló."
estado += f"\n\n---\nSalida del reintento:\n{result.stderr.strip()}"
yield estado, None, None
return
elif result.returncode != 0:
estado += f"\n\nError ejecutando marker_single:\n{result.stderr.strip()}"
yield estado, None, None
return
if not os.path.exists(md_path):
estado += "\nNo se generó el archivo Markdown. Puede que el PDF no contenga texto reconocible."
yield estado, None, None
return
with open(md_path, "r", encoding="utf-8") as f:
md_content = f.read()
palabras = len(md_content.split())
caracteres = len(md_content)
figuras = len([f for f in os.listdir(carpeta_salida) if f.lower().endswith(".jpeg")])
tablas = len(os.listdir(os.path.join(carpeta_salida, "tables"))) if os.path.exists(os.path.join(carpeta_salida, "tables")) else 0
tamaño_kb = os.path.getsize(md_path) / 1024
tiempo_total = round(time.time() - start_time, 1)
resumen = f"\n\n---\nResumen del procesamiento:\n"
resumen += f"- Palabras: {palabras}\n"
resumen += f"- Caracteres: {caracteres}\n"
resumen += f"- Tamaño Markdown: {tamaño_kb:.2f} KB\n"
resumen += f"- Imágenes extraídas: {figuras}\n"
resumen += f"- Tablas detectadas: {tablas}\n"
resumen += f"- Tiempo total: {tiempo_total} s"
zip_path = os.path.join(output_base, f"{nombre_sin_ext}.zip")
shutil.make_archive(base_name=zip_path.replace(".zip", ""), format="zip", root_dir=carpeta_salida)
estado += "\n\nProcesamiento finalizado correctamente."
yield estado + resumen, gr.update(value=md_path, visible=True), gr.update(value=zip_path, visible=True)
except Exception as e:
yield f"Error general inesperado: {str(e)}", None, None
demo = gr.Interface(
fn=procesar_pdf,
inputs=[
gr.File(label="Sube un PDF (opcional)", file_types=[".pdf"]),
gr.Textbox(label="O introduce una URL directa", placeholder="https://arxiv.org/pdf/..."),
gr.CheckboxGroup(
choices=["md", "md + ocr", "html", "json"],
value=["md"],
label="Formatos de salida (elige al menos uno)"
)
],
outputs=[
gr.Textbox(label="Estado del procesamiento", lines=12),
gr.File(label="Descargar Markdown (.md)", visible=False),
gr.File(label="Descargar ZIP completo", visible=False)
],
title="Marker PDF",
description="Convierte artículos científicos en Markdown, HTML o JSON. Reintenta automáticamente con OCR si es necesario.",
flagging_mode="never"
)
if __name__ == "__main__":
demo.launch()