Gabriel Ramos
feat: Docling Document Processor - Gradio + ZeroGPU
780413d
raw
history blame
14.5 kB
"""
Docling Document Processor - Aplicação Principal.
Este é o ponto de entrada da aplicação Gradio que permite
o upload e processamento de documentos usando Docling.
Recursos:
- Upload múltiplo (1-5 arquivos)
- Formatos: PDF, DOC, DOCX
- Saída: JSON, Markdown ou ambos (ZIP)
- Aceleração GPU via ZeroGPU
"""
import os
import sys
import time
import traceback
from collections import defaultdict
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
import gradio as gr
# Importação condicional do spaces para ZeroGPU
try:
import spaces
HAS_SPACES = True
except ImportError:
HAS_SPACES = False
# Adiciona o diretório atual ao path para imports locais
sys.path.insert(0, str(Path(__file__).parent))
import config
from utils.validators import validate_files, ValidationError
from utils.file_handler import (
create_temp_directory,
cleanup_old_files,
create_zip_output,
save_output_file,
)
from utils.logger import setup_logger, get_logger
from processors.docling_processor import DoclingProcessor
from processors.json_formatter import format_to_json, JSONFormatter
from processors.markdown_formatter import format_to_markdown, MarkdownFormatter
# Configura logger
logger = setup_logger("docling_space")
# =============================================================================
# RATE LIMITING (in-memory)
# =============================================================================
# Armazena requisições por IP: {ip: [timestamps]}
_rate_limit_store: dict[str, list[datetime]] = defaultdict(list)
def check_rate_limit(request: gr.Request) -> bool:
"""
Verifica se o IP excedeu o limite de requisições.
Args:
request: Objeto de request do Gradio.
Returns:
True se está dentro do limite, False se excedeu.
"""
if request is None:
return True
# Obtém IP do cliente
client_ip = getattr(request, "client", {})
if isinstance(client_ip, dict):
ip = client_ip.get("host", "unknown")
else:
ip = str(client_ip)
now = datetime.now()
window_start = now - timedelta(hours=config.RATE_LIMIT_WINDOW_HOURS)
# Limpa requisições antigas
_rate_limit_store[ip] = [
ts for ts in _rate_limit_store[ip]
if ts > window_start
]
# Verifica limite
if len(_rate_limit_store[ip]) >= config.RATE_LIMIT_REQUESTS:
logger.warning(f"Rate limit excedido para IP: {ip}")
return False
# Registra nova requisição
_rate_limit_store[ip].append(now)
return True
# =============================================================================
# FUNÇÃO DE PROCESSAMENTO PRINCIPAL
# =============================================================================
def _process_documents_internal(
files: list,
output_format: str,
progress: Optional[gr.Progress] = None
) -> tuple[str | list[str], str]:
"""
Função interna de processamento (sem decorator GPU).
Args:
files: Lista de arquivos enviados.
output_format: Formato de saída ("JSON", "Markdown", "Ambos").
progress: Objeto de progresso do Gradio.
Returns:
Tupla (caminho(s) do arquivo de saída, mensagem de status).
"""
start_time = time.time()
# Limpa arquivos temporários antigos
cleanup_old_files()
# Valida arquivos
if progress:
progress(0.1, desc="Validando arquivos...")
try:
validated_files = validate_files(files)
except ValidationError as e:
logger.warning(f"Erro de validação: {e.message}")
raise gr.Error(e.message)
# Prepara processador
if progress:
progress(0.2, desc="Inicializando Docling...")
processor = DoclingProcessor(
enable_ocr=True,
enable_table_detection=True,
use_gpu=HAS_SPACES
)
# Cria diretório de saída
output_dir = create_temp_directory(prefix="output_")
output_files = []
processed_count = 0
total_files = len(validated_files)
# Processa cada arquivo
for i, (file_path, sanitized_name) in enumerate(validated_files):
progress_pct = 0.2 + (0.6 * (i / total_files))
if progress:
progress(progress_pct, desc=f"Processando {sanitized_name}...")
try:
# Processa documento
processed_data = processor.process_document(file_path)
# Gera nome base sem extensão
base_name = Path(sanitized_name).stem
# Formata saída
if output_format == "JSON":
json_content = format_to_json(processed_data, sanitized_name)
json_path = save_output_file(
json_content,
f"{base_name}.json",
output_dir
)
output_files.append((json_path, f"{base_name}.json"))
elif output_format == "Markdown":
md_content = format_to_markdown(processed_data)
md_path = save_output_file(
md_content,
f"{base_name}.md",
output_dir
)
output_files.append((md_path, f"{base_name}.md"))
else: # Ambos
json_content = format_to_json(processed_data, sanitized_name)
md_content = format_to_markdown(processed_data)
json_path = save_output_file(
json_content,
f"{base_name}.json",
output_dir
)
md_path = save_output_file(
md_content,
f"{base_name}.md",
output_dir
)
output_files.append((json_path, f"{base_name}.json"))
output_files.append((md_path, f"{base_name}.md"))
processed_count += 1
logger.info(f"Processado: {sanitized_name}")
except Exception as e:
logger.error(f"Erro ao processar {sanitized_name}: {e}")
logger.debug(traceback.format_exc())
# Continua com próximos arquivos
if total_files == 1:
raise gr.Error(
f"❌ Erro ao processar {sanitized_name}: {str(e)}"
)
# Prepara saída final
if progress:
progress(0.9, desc="Preparando download...")
if not output_files:
raise gr.Error("❌ Nenhum arquivo foi processado com sucesso.")
# Se há múltiplos arquivos ou formato "Ambos", cria ZIP
if len(output_files) > 1 or output_format == "Ambos":
zip_path = create_zip_output(
output_files,
output_name="documentos_processados"
)
final_output = str(zip_path)
else:
final_output = str(output_files[0][0])
# Calcula tempo total
elapsed_time = time.time() - start_time
if progress:
progress(1.0, desc="Concluído!")
# Mensagem de status
status_msg = (
f"✅ Processamento concluído!\n\n"
f"📄 **Arquivos processados:** {processed_count}/{total_files}\n"
f"📦 **Formato:** {output_format}\n"
f"⏱️ **Tempo:** {elapsed_time:.1f} segundos"
)
logger.info(
f"Batch concluído: {processed_count}/{total_files} arquivos, "
f"{elapsed_time:.1f}s, formato={output_format}"
)
return final_output, status_msg
# Versão com GPU (se disponível)
if HAS_SPACES:
@spaces.GPU(duration=config.GPU_TIMEOUT_SECONDS)
def process_documents_gpu(
files: list,
output_format: str,
progress: gr.Progress = gr.Progress()
) -> tuple[str | list[str], str]:
"""Processamento com aceleração GPU via ZeroGPU."""
return _process_documents_internal(files, output_format, progress)
else:
process_documents_gpu = None
def process_documents(
files: list,
output_format: str,
request: gr.Request,
progress: gr.Progress = gr.Progress()
) -> tuple[str | list[str], str]:
"""
Função principal de processamento.
Usa GPU se disponível, senão fallback para CPU.
Args:
files: Lista de arquivos enviados.
output_format: Formato de saída.
request: Request do Gradio (para rate limiting).
progress: Objeto de progresso.
Returns:
Tupla (caminho do arquivo de saída, mensagem de status).
"""
# Verifica rate limit
if not check_rate_limit(request):
raise gr.Error(
f"⚠️ Limite de requisições excedido. "
f"Máximo: {config.RATE_LIMIT_REQUESTS} por hora. "
f"Tente novamente mais tarde."
)
try:
# Tenta usar GPU
if HAS_SPACES and process_documents_gpu is not None:
logger.info("Usando processamento GPU (ZeroGPU)")
return process_documents_gpu(files, output_format, progress)
else:
logger.info("Usando processamento CPU (fallback)")
return _process_documents_internal(files, output_format, progress)
except gr.Error:
# Re-raise erros do Gradio
raise
except TimeoutError:
logger.error("Timeout no processamento")
raise gr.Error(
"⏱️ Tempo limite excedido. Tente com arquivos menores ou menos arquivos."
)
except MemoryError:
logger.error("Memória insuficiente")
raise gr.Error(
"💾 Memória insuficiente. Tente com arquivos menores."
)
except Exception as e:
logger.error(f"Erro inesperado: {e}")
logger.debug(traceback.format_exc())
raise gr.Error(f"❌ Erro inesperado: {str(e)}")
# =============================================================================
# INTERFACE GRADIO
# =============================================================================
# CSS customizado
CUSTOM_CSS = """
.main-container {
max-width: 900px;
margin: 0 auto;
}
.upload-box {
border: 2px dashed #4a90a4;
border-radius: 12px;
padding: 20px;
background: linear-gradient(135deg, #f8f9fa 0%, #e9ecef 100%);
}
.status-box {
background: #f0f7f4;
border-radius: 8px;
padding: 15px;
margin-top: 10px;
}
.info-text {
font-size: 0.9em;
color: #666;
}
"""
# Texto de descrição
DESCRIPTION = """
# 📄 Docling Document Processor
Converta documentos PDF, DOC e DOCX em formatos estruturados usando IA.
## Recursos
- 🔍 **Extração inteligente** de texto, tabelas e metadados
- **Detecção automática** de idioma
- 🚀 **Aceleração GPU** para processamento rápido
- 📊 **Preserva estrutura** hierárquica do documento
"""
INSTRUCTIONS = """
### Como usar
1. **Upload**: Arraste ou selecione seus arquivos (máx. 5 arquivos, 50MB cada)
2. **Formato**: Escolha o formato de saída desejado
3. **Processar**: Clique no botão e aguarde
4. **Download**: Baixe o resultado quando concluído
### Formatos suportados
- **Entrada**: PDF, DOC, DOCX
- **Saída**: JSON, Markdown ou ambos (ZIP)
"""
def create_interface() -> gr.Blocks:
"""Cria e retorna a interface Gradio."""
with gr.Blocks(
title="Docling Document Processor",
theme=gr.themes.Soft(
primary_hue="teal",
secondary_hue="blue",
),
css=CUSTOM_CSS,
) as demo:
# Header
gr.Markdown(DESCRIPTION)
with gr.Row():
# Coluna principal
with gr.Column(scale=2):
# Upload de arquivos
file_input = gr.File(
file_count="multiple",
file_types=[".pdf", ".doc", ".docx"],
label="📁 Upload de Documentos",
elem_classes=["upload-box"],
)
# Seletor de formato
format_selector = gr.Radio(
choices=config.OUTPUT_FORMATS,
value="Markdown",
label="📤 Formato de Saída",
info="Escolha como deseja receber o documento processado",
)
# Botão de processar
process_btn = gr.Button(
"🚀 Processar Documentos",
variant="primary",
size="lg",
)
# Coluna de informações
with gr.Column(scale=1):
gr.Markdown(INSTRUCTIONS)
# Área de resultados
with gr.Row():
with gr.Column():
# Status
status_output = gr.Markdown(
label="Status",
elem_classes=["status-box"],
)
# Arquivo de saída
file_output = gr.File(
label="📥 Download do Resultado",
interactive=False,
)
# Informações de limites
gr.Markdown(
f"""
---
**Limites:** {config.MAX_FILES_PER_SESSION} arquivos por vez |
{config.MAX_FILE_SIZE_MB}MB por arquivo |
{config.RATE_LIMIT_REQUESTS} requisições/hora
""",
elem_classes=["info-text"],
)
# Evento de processamento
process_btn.click(
fn=process_documents,
inputs=[file_input, format_selector],
outputs=[file_output, status_output],
show_progress="full",
)
# Limpa status quando novos arquivos são selecionados
file_input.change(
fn=lambda: ("", None),
outputs=[status_output, file_output],
)
return demo
# =============================================================================
# PONTO DE ENTRADA
# =============================================================================
if __name__ == "__main__":
# Cria diretórios necessários
config.TEMP_DIR.mkdir(parents=True, exist_ok=True)
config.LOGS_DIR.mkdir(parents=True, exist_ok=True)
# Limpa arquivos temporários antigos
cleanup_old_files()
logger.info("Iniciando Docling Document Processor...")
logger.info(f"ZeroGPU disponível: {HAS_SPACES}")
# Cria e lança a interface
demo = create_interface()
demo.queue().launch(
server_name="0.0.0.0",
server_port=7860,
max_file_size=f"{config.MAX_FILE_SIZE_MB}mb",
show_error=True,
)