""" Docling Document Processor - Aplicação Principal. Este é o ponto de entrada da aplicação Gradio que permite o upload e processamento de documentos usando Docling. Recursos: - Upload múltiplo (1-5 arquivos) - Formatos: PDF, DOC, DOCX - Saída: JSON, Markdown ou ambos (ZIP) - Aceleração GPU via ZeroGPU """ import os import sys import time import traceback from collections import defaultdict from datetime import datetime, timedelta from pathlib import Path from typing import Optional import gradio as gr # Importação condicional do spaces para ZeroGPU try: import spaces HAS_SPACES = True except ImportError: HAS_SPACES = False # Adiciona o diretório atual ao path para imports locais sys.path.insert(0, str(Path(__file__).parent)) import config from utils.validators import validate_files, ValidationError from utils.file_handler import ( create_temp_directory, cleanup_old_files, create_zip_output, save_output_file, ) from utils.logger import setup_logger, get_logger from processors.docling_processor import DoclingProcessor from processors.json_formatter import format_to_json, JSONFormatter from processors.markdown_formatter import format_to_markdown, MarkdownFormatter # Configura logger logger = setup_logger("docling_space") # ============================================================================= # RATE LIMITING (in-memory) # ============================================================================= # Armazena requisições por IP: {ip: [timestamps]} _rate_limit_store: dict[str, list[datetime]] = defaultdict(list) def check_rate_limit(request: gr.Request) -> bool: """ Verifica se o IP excedeu o limite de requisições. Args: request: Objeto de request do Gradio. Returns: True se está dentro do limite, False se excedeu. """ if request is None: return True # Obtém IP do cliente client_ip = getattr(request, "client", {}) if isinstance(client_ip, dict): ip = client_ip.get("host", "unknown") else: ip = str(client_ip) now = datetime.now() window_start = now - timedelta(hours=config.RATE_LIMIT_WINDOW_HOURS) # Limpa requisições antigas _rate_limit_store[ip] = [ ts for ts in _rate_limit_store[ip] if ts > window_start ] # Verifica limite if len(_rate_limit_store[ip]) >= config.RATE_LIMIT_REQUESTS: logger.warning(f"Rate limit excedido para IP: {ip}") return False # Registra nova requisição _rate_limit_store[ip].append(now) return True # ============================================================================= # FUNÇÃO DE PROCESSAMENTO PRINCIPAL # ============================================================================= def _process_documents_internal( files: list, output_format: str, progress: Optional[gr.Progress] = None ) -> tuple[str | list[str], str]: """ Função interna de processamento (sem decorator GPU). Args: files: Lista de arquivos enviados. output_format: Formato de saída ("JSON", "Markdown", "Ambos"). progress: Objeto de progresso do Gradio. Returns: Tupla (caminho(s) do arquivo de saída, mensagem de status). """ start_time = time.time() # Limpa arquivos temporários antigos cleanup_old_files() # Valida arquivos if progress: progress(0.1, desc="Validando arquivos...") try: validated_files = validate_files(files) except ValidationError as e: logger.warning(f"Erro de validação: {e.message}") raise gr.Error(e.message) # Prepara processador if progress: progress(0.2, desc="Inicializando Docling...") processor = DoclingProcessor( enable_ocr=True, enable_table_detection=True, use_gpu=HAS_SPACES ) # Cria diretório de saída output_dir = create_temp_directory(prefix="output_") output_files = [] processed_count = 0 total_files = len(validated_files) # Processa cada arquivo for i, (file_path, sanitized_name) in enumerate(validated_files): progress_pct = 0.2 + (0.6 * (i / total_files)) if progress: progress(progress_pct, desc=f"Processando {sanitized_name}...") try: # Processa documento processed_data = processor.process_document(file_path) # Gera nome base sem extensão base_name = Path(sanitized_name).stem # Formata saída if output_format == "JSON": json_content = format_to_json(processed_data, sanitized_name) json_path = save_output_file( json_content, f"{base_name}.json", output_dir ) output_files.append((json_path, f"{base_name}.json")) elif output_format == "Markdown": md_content = format_to_markdown(processed_data) md_path = save_output_file( md_content, f"{base_name}.md", output_dir ) output_files.append((md_path, f"{base_name}.md")) else: # Ambos json_content = format_to_json(processed_data, sanitized_name) md_content = format_to_markdown(processed_data) json_path = save_output_file( json_content, f"{base_name}.json", output_dir ) md_path = save_output_file( md_content, f"{base_name}.md", output_dir ) output_files.append((json_path, f"{base_name}.json")) output_files.append((md_path, f"{base_name}.md")) processed_count += 1 logger.info(f"Processado: {sanitized_name}") except Exception as e: logger.error(f"Erro ao processar {sanitized_name}: {e}") logger.debug(traceback.format_exc()) # Continua com próximos arquivos if total_files == 1: raise gr.Error( f"❌ Erro ao processar {sanitized_name}: {str(e)}" ) # Prepara saída final if progress: progress(0.9, desc="Preparando download...") if not output_files: raise gr.Error("❌ Nenhum arquivo foi processado com sucesso.") # Se há múltiplos arquivos ou formato "Ambos", cria ZIP if len(output_files) > 1 or output_format == "Ambos": zip_path = create_zip_output( output_files, output_name="documentos_processados" ) final_output = str(zip_path) else: final_output = str(output_files[0][0]) # Calcula tempo total elapsed_time = time.time() - start_time if progress: progress(1.0, desc="Concluído!") # Mensagem de status status_msg = ( f"✅ Processamento concluído!\n\n" f"📄 **Arquivos processados:** {processed_count}/{total_files}\n" f"📦 **Formato:** {output_format}\n" f"⏱️ **Tempo:** {elapsed_time:.1f} segundos" ) logger.info( f"Batch concluído: {processed_count}/{total_files} arquivos, " f"{elapsed_time:.1f}s, formato={output_format}" ) return final_output, status_msg # Versão com GPU (se disponível) if HAS_SPACES: @spaces.GPU(duration=config.GPU_TIMEOUT_SECONDS) def process_documents_gpu( files: list, output_format: str, progress: gr.Progress = gr.Progress() ) -> tuple[str | list[str], str]: """Processamento com aceleração GPU via ZeroGPU.""" return _process_documents_internal(files, output_format, progress) else: process_documents_gpu = None def process_documents( files: list, output_format: str, request: gr.Request, progress: gr.Progress = gr.Progress() ) -> tuple[str | list[str], str]: """ Função principal de processamento. Usa GPU se disponível, senão fallback para CPU. Args: files: Lista de arquivos enviados. output_format: Formato de saída. request: Request do Gradio (para rate limiting). progress: Objeto de progresso. Returns: Tupla (caminho do arquivo de saída, mensagem de status). """ # Verifica rate limit if not check_rate_limit(request): raise gr.Error( f"⚠️ Limite de requisições excedido. " f"Máximo: {config.RATE_LIMIT_REQUESTS} por hora. " f"Tente novamente mais tarde." ) try: # Tenta usar GPU if HAS_SPACES and process_documents_gpu is not None: logger.info("Usando processamento GPU (ZeroGPU)") return process_documents_gpu(files, output_format, progress) else: logger.info("Usando processamento CPU (fallback)") return _process_documents_internal(files, output_format, progress) except gr.Error: # Re-raise erros do Gradio raise except TimeoutError: logger.error("Timeout no processamento") raise gr.Error( "⏱️ Tempo limite excedido. Tente com arquivos menores ou menos arquivos." ) except MemoryError: logger.error("Memória insuficiente") raise gr.Error( "💾 Memória insuficiente. Tente com arquivos menores." ) except Exception as e: logger.error(f"Erro inesperado: {e}") logger.debug(traceback.format_exc()) raise gr.Error(f"❌ Erro inesperado: {str(e)}") # ============================================================================= # INTERFACE GRADIO # ============================================================================= # CSS customizado CUSTOM_CSS = """ .main-container { max-width: 900px; margin: 0 auto; } .upload-box { border: 2px dashed #4a90a4; border-radius: 12px; padding: 20px; background: linear-gradient(135deg, #f8f9fa 0%, #e9ecef 100%); } .status-box { background: #f0f7f4; border-radius: 8px; padding: 15px; margin-top: 10px; } .info-text { font-size: 0.9em; color: #666; } """ # Texto de descrição DESCRIPTION = """ # 📄 Docling Document Processor Converta documentos PDF, DOC e DOCX em formatos estruturados usando IA. ## Recursos - 🔍 **Extração inteligente** de texto, tabelas e metadados - **Detecção automática** de idioma - 🚀 **Aceleração GPU** para processamento rápido - 📊 **Preserva estrutura** hierárquica do documento """ INSTRUCTIONS = """ ### Como usar 1. **Upload**: Arraste ou selecione seus arquivos (máx. 5 arquivos, 50MB cada) 2. **Formato**: Escolha o formato de saída desejado 3. **Processar**: Clique no botão e aguarde 4. **Download**: Baixe o resultado quando concluído ### Formatos suportados - **Entrada**: PDF, DOC, DOCX - **Saída**: JSON, Markdown ou ambos (ZIP) """ def create_interface() -> gr.Blocks: """Cria e retorna a interface Gradio.""" with gr.Blocks( title="Docling Document Processor", theme=gr.themes.Soft( primary_hue="teal", secondary_hue="blue", ), css=CUSTOM_CSS, ) as demo: # Header gr.Markdown(DESCRIPTION) with gr.Row(): # Coluna principal with gr.Column(scale=2): # Upload de arquivos file_input = gr.File( file_count="multiple", file_types=[".pdf", ".doc", ".docx"], label="📁 Upload de Documentos", elem_classes=["upload-box"], ) # Seletor de formato format_selector = gr.Radio( choices=config.OUTPUT_FORMATS, value="Markdown", label="📤 Formato de Saída", info="Escolha como deseja receber o documento processado", ) # Botão de processar process_btn = gr.Button( "🚀 Processar Documentos", variant="primary", size="lg", ) # Coluna de informações with gr.Column(scale=1): gr.Markdown(INSTRUCTIONS) # Área de resultados with gr.Row(): with gr.Column(): # Status status_output = gr.Markdown( label="Status", elem_classes=["status-box"], ) # Arquivo de saída file_output = gr.File( label="📥 Download do Resultado", interactive=False, ) # Informações de limites gr.Markdown( f""" --- **Limites:** {config.MAX_FILES_PER_SESSION} arquivos por vez | {config.MAX_FILE_SIZE_MB}MB por arquivo | {config.RATE_LIMIT_REQUESTS} requisições/hora """, elem_classes=["info-text"], ) # Evento de processamento process_btn.click( fn=process_documents, inputs=[file_input, format_selector], outputs=[file_output, status_output], show_progress="full", ) # Limpa status quando novos arquivos são selecionados file_input.change( fn=lambda: ("", None), outputs=[status_output, file_output], ) return demo # ============================================================================= # PONTO DE ENTRADA # ============================================================================= if __name__ == "__main__": # Cria diretórios necessários config.TEMP_DIR.mkdir(parents=True, exist_ok=True) config.LOGS_DIR.mkdir(parents=True, exist_ok=True) # Limpa arquivos temporários antigos cleanup_old_files() logger.info("Iniciando Docling Document Processor...") logger.info(f"ZeroGPU disponível: {HAS_SPACES}") # Cria e lança a interface demo = create_interface() demo.queue().launch( server_name="0.0.0.0", server_port=7860, max_file_size=f"{config.MAX_FILE_SIZE_MB}mb", show_error=True, )