import os import shutil import subprocess import requests import gradio as gr from urllib.parse import urlparse import time import re def sanear_nombre(nombre): nombre_base = os.path.splitext(nombre)[0] return re.sub(r"[^\w\-_.]", "_", nombre_base) def obtener_ruta_segura(base_dir, nombre_archivo): nombre_base = sanear_nombre(nombre_archivo) ruta_final = os.path.join(base_dir, nombre_base) contador = 1 while os.path.exists(ruta_final + ".pdf"): ruta_final = os.path.join(base_dir, f"{nombre_base}_{contador}") contador += 1 return ruta_final + ".pdf" def ejecutar_marker(cmd): result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=900) return result def procesar_pdf(pdf_file=None, url_pdf=None, formatos=[], verbose="auto"): output_base = "./marker_output" os.makedirs(output_base, exist_ok=True) estado = "" try: subprocess.run(["marker_single", "--help"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) except FileNotFoundError: yield "Error: el comando 'marker_single' no está disponible.\nVerifica que marker-pdf esté en requirements.txt", None, None return if not formatos: yield "Debes seleccionar al menos un formato de salida.", None, None return try: if url_pdf: parsed = urlparse(url_pdf) nombre_pdf = parsed.path.split("/")[-1] or "documento" ruta_pdf = obtener_ruta_segura(output_base, nombre_pdf) response = requests.get(url_pdf) if response.status_code != 200: yield "No se pudo descargar el PDF desde la URL proporcionada.", None, None return with open(ruta_pdf, "wb") as f: f.write(response.content) elif pdf_file: nombre_pdf = pdf_file.name or "documento" ruta_pdf = obtener_ruta_segura(output_base, nombre_pdf) shutil.copyfile(pdf_file.name, ruta_pdf) else: yield "No se proporcionó ni archivo ni URL.", None, None return with open(ruta_pdf, "rb") as f: if f.read(4) != b"%PDF": yield "El archivo proporcionado no es un PDF válido.", None, None return if os.path.getsize(ruta_pdf) > 3 * 1024 * 1024: estado += "Aviso: el PDF es grande y puede tardar más de lo normal.\n" nombre_sin_ext = os.path.splitext(os.path.basename(ruta_pdf))[0] carpeta_salida = os.path.join(output_base, nombre_sin_ext) if os.path.exists(carpeta_salida): shutil.rmtree(carpeta_salida) os.makedirs(carpeta_salida, exist_ok=True) for fmt in formatos: estado += f"\nProcesando formato '{fmt}'..." yield estado, None, None # Primer intento sin OCR (si aplica) if fmt == "md": cmd = ["marker_single", ruta_pdf, "--output_format", "markdown", "--disable_ocr", "--output_dir", carpeta_salida] elif fmt == "md + ocr": cmd = ["marker_single", ruta_pdf, "--output_format", "markdown", "--output_dir", carpeta_salida] else: cmd = ["marker_single", ruta_pdf, "--output_format", fmt, "--output_dir", carpeta_salida] result = ejecutar_marker(cmd) md_path = os.path.join(carpeta_salida, f"{nombre_sin_ext}.md") if fmt == "md" and not os.path.exists(md_path): estado += "\nNo se generó el archivo Markdown en el primer intento." estado += f"\n\n---\nSalida del comando marker_single:\n{result.stderr.strip()}\n" estado += "\nReintentando con OCR..." yield estado, None, None start_time = time.time() cmd = ["marker_single", ruta_pdf, "--output_format", "markdown", "--output_dir", carpeta_salida] result = ejecutar_marker(cmd) if not os.path.exists(md_path): estado += "\nEl reintento con OCR también falló." estado += f"\n\n---\nSalida del reintento:\n{result.stderr.strip()}" yield estado, None, None return elif result.returncode != 0: estado += f"\n\nError ejecutando marker_single:\n{result.stderr.strip()}" yield estado, None, None return if not os.path.exists(md_path): estado += "\nNo se generó el archivo Markdown. Puede que el PDF no contenga texto reconocible." yield estado, None, None return with open(md_path, "r", encoding="utf-8") as f: md_content = f.read() palabras = len(md_content.split()) caracteres = len(md_content) figuras = len([f for f in os.listdir(carpeta_salida) if f.lower().endswith(".jpeg")]) tablas = len(os.listdir(os.path.join(carpeta_salida, "tables"))) if os.path.exists(os.path.join(carpeta_salida, "tables")) else 0 tamaño_kb = os.path.getsize(md_path) / 1024 tiempo_total = round(time.time() - start_time, 1) resumen = f"\n\n---\nResumen del procesamiento:\n" resumen += f"- Palabras: {palabras}\n" resumen += f"- Caracteres: {caracteres}\n" resumen += f"- Tamaño Markdown: {tamaño_kb:.2f} KB\n" resumen += f"- Imágenes extraídas: {figuras}\n" resumen += f"- Tablas detectadas: {tablas}\n" resumen += f"- Tiempo total: {tiempo_total} s" zip_path = os.path.join(output_base, f"{nombre_sin_ext}.zip") shutil.make_archive(base_name=zip_path.replace(".zip", ""), format="zip", root_dir=carpeta_salida) estado += "\n\nProcesamiento finalizado correctamente." yield estado + resumen, gr.update(value=md_path, visible=True), gr.update(value=zip_path, visible=True) except Exception as e: yield f"Error general inesperado: {str(e)}", None, None demo = gr.Interface( fn=procesar_pdf, inputs=[ gr.File(label="Sube un PDF (opcional)", file_types=[".pdf"]), gr.Textbox(label="O introduce una URL directa", placeholder="https://arxiv.org/pdf/..."), gr.CheckboxGroup( choices=["md", "md + ocr", "html", "json"], value=["md"], label="Formatos de salida (elige al menos uno)" ) ], outputs=[ gr.Textbox(label="Estado del procesamiento", lines=12), gr.File(label="Descargar Markdown (.md)", visible=False), gr.File(label="Descargar ZIP completo", visible=False) ], title="Marker PDF", description="Convierte artículos científicos en Markdown, HTML o JSON. Reintenta automáticamente con OCR si es necesario.", flagging_mode="never" ) if __name__ == "__main__": demo.launch()