Spaces:
Runtime error
Runtime error
| """ | |
| Manipulação de arquivos temporários e outputs. | |
| Este módulo contém funções para gerenciar arquivos temporários, | |
| criar outputs em diferentes formatos e limpar arquivos antigos. | |
| """ | |
| import os | |
| import shutil | |
| import tempfile | |
| import time | |
| import zipfile | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from typing import Optional | |
| import config | |
| from utils.logger import get_logger | |
| # Logger para este módulo | |
| logger = get_logger(__name__) | |
| def create_temp_directory(prefix: str = "docling_") -> Path: | |
| """ | |
| Cria um diretório temporário isolado para processamento. | |
| Args: | |
| prefix: Prefixo para o nome do diretório. | |
| Returns: | |
| Path para o diretório temporário criado. | |
| """ | |
| # Garante que o diretório base existe | |
| config.TEMP_DIR.mkdir(parents=True, exist_ok=True) | |
| # Cria subdiretório com timestamp | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") | |
| temp_path = config.TEMP_DIR / f"{prefix}{timestamp}" | |
| temp_path.mkdir(parents=True, exist_ok=True) | |
| logger.debug(f"Diretório temporário criado: {temp_path}") | |
| return temp_path | |
| def get_temp_file_path( | |
| filename: str, | |
| temp_dir: Optional[Path] = None | |
| ) -> Path: | |
| """ | |
| Retorna um caminho de arquivo temporário sanitizado. | |
| Args: | |
| filename: Nome do arquivo (será sanitizado se necessário). | |
| temp_dir: Diretório temporário opcional. Se não fornecido, cria um novo. | |
| Returns: | |
| Path completo para o arquivo temporário. | |
| """ | |
| from utils.validators import sanitize_filename | |
| if temp_dir is None: | |
| temp_dir = create_temp_directory() | |
| safe_filename = sanitize_filename(filename) | |
| return temp_dir / safe_filename | |
| def cleanup_old_files( | |
| max_age_hours: Optional[int] = None, | |
| target_dir: Optional[Path] = None | |
| ) -> int: | |
| """ | |
| Remove arquivos temporários mais antigos que o limite especificado. | |
| Args: | |
| max_age_hours: Idade máxima em horas. Se não especificado, usa config. | |
| target_dir: Diretório a limpar. Se não especificado, usa TEMP_DIR. | |
| Returns: | |
| Número de arquivos/diretórios removidos. | |
| """ | |
| if max_age_hours is None: | |
| max_age_hours = config.TEMP_DIR_CLEANUP_HOURS | |
| if target_dir is None: | |
| target_dir = config.TEMP_DIR | |
| if not target_dir.exists(): | |
| return 0 | |
| cutoff_time = time.time() - (max_age_hours * 3600) | |
| removed_count = 0 | |
| try: | |
| for item in target_dir.iterdir(): | |
| try: | |
| item_stat = item.stat() | |
| # Usa tempo de modificação | |
| if item_stat.st_mtime < cutoff_time: | |
| if item.is_dir(): | |
| shutil.rmtree(item) | |
| logger.info(f"Diretório removido: {item}") | |
| else: | |
| item.unlink() | |
| logger.info(f"Arquivo removido: {item}") | |
| removed_count += 1 | |
| except PermissionError: | |
| logger.warning(f"Sem permissão para remover: {item}") | |
| except FileNotFoundError: | |
| # Já foi removido | |
| pass | |
| except Exception as e: | |
| logger.error(f"Erro ao remover {item}: {e}") | |
| except Exception as e: | |
| logger.error(f"Erro ao limpar diretório {target_dir}: {e}") | |
| if removed_count > 0: | |
| logger.info(f"Limpeza concluída: {removed_count} itens removidos") | |
| return removed_count | |
| def create_zip_output( | |
| files: list[tuple[Path, str]], | |
| output_name: str = "resultado" | |
| ) -> Path: | |
| """ | |
| Cria um arquivo ZIP contendo múltiplos arquivos de saída. | |
| Args: | |
| files: Lista de tuplas (caminho_arquivo, nome_no_zip). | |
| output_name: Nome base para o arquivo ZIP (sem extensão). | |
| Returns: | |
| Path para o arquivo ZIP criado. | |
| """ | |
| # Cria diretório temporário para o ZIP | |
| temp_dir = create_temp_directory(prefix="zip_") | |
| zip_path = temp_dir / f"{output_name}.zip" | |
| try: | |
| with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: | |
| for file_path, archive_name in files: | |
| if file_path.exists(): | |
| zf.write(file_path, archive_name) | |
| logger.debug(f"Adicionado ao ZIP: {archive_name}") | |
| else: | |
| logger.warning(f"Arquivo não encontrado para ZIP: {file_path}") | |
| logger.info(f"ZIP criado: {zip_path} ({len(files)} arquivos)") | |
| return zip_path | |
| except Exception as e: | |
| logger.error(f"Erro ao criar ZIP: {e}") | |
| raise | |
| def copy_file_to_temp( | |
| source: Path | str, | |
| temp_dir: Optional[Path] = None | |
| ) -> Path: | |
| """ | |
| Copia um arquivo para o diretório temporário. | |
| Args: | |
| source: Caminho do arquivo fonte. | |
| temp_dir: Diretório de destino opcional. | |
| Returns: | |
| Path para o arquivo copiado. | |
| """ | |
| source = Path(source) | |
| if temp_dir is None: | |
| temp_dir = create_temp_directory() | |
| dest_path = temp_dir / source.name | |
| shutil.copy2(source, dest_path) | |
| logger.debug(f"Arquivo copiado: {source} -> {dest_path}") | |
| return dest_path | |
| def save_output_file( | |
| content: str | bytes, | |
| filename: str, | |
| temp_dir: Optional[Path] = None, | |
| encoding: str = "utf-8" | |
| ) -> Path: | |
| """ | |
| Salva conteúdo em um arquivo temporário. | |
| Args: | |
| content: Conteúdo a ser salvo (string ou bytes). | |
| filename: Nome do arquivo de saída. | |
| temp_dir: Diretório de destino opcional. | |
| encoding: Encoding para strings (padrão: utf-8). | |
| Returns: | |
| Path para o arquivo salvo. | |
| """ | |
| if temp_dir is None: | |
| temp_dir = create_temp_directory(prefix="output_") | |
| output_path = get_temp_file_path(filename, temp_dir) | |
| if isinstance(content, str): | |
| output_path.write_text(content, encoding=encoding) | |
| else: | |
| output_path.write_bytes(content) | |
| logger.debug(f"Arquivo salvo: {output_path}") | |
| return output_path | |
| def get_temp_dir_size() -> int: | |
| """ | |
| Calcula o tamanho total do diretório temporário. | |
| Returns: | |
| Tamanho em bytes. | |
| """ | |
| if not config.TEMP_DIR.exists(): | |
| return 0 | |
| total_size = 0 | |
| for item in config.TEMP_DIR.rglob("*"): | |
| if item.is_file(): | |
| try: | |
| total_size += item.stat().st_size | |
| except Exception: | |
| pass | |
| return total_size | |
| def format_size(size_bytes: int) -> str: | |
| """ | |
| Formata tamanho em bytes para string legível. | |
| Args: | |
| size_bytes: Tamanho em bytes. | |
| Returns: | |
| String formatada (ex: "1.5 MB"). | |
| """ | |
| for unit in ["B", "KB", "MB", "GB"]: | |
| if size_bytes < 1024: | |
| return f"{size_bytes:.1f} {unit}" | |
| size_bytes /= 1024 | |
| return f"{size_bytes:.1f} TB" | |