Spaces:
Runtime error
Runtime error
| import os | |
| import tarfile | |
| import shutil | |
| import hashlib | |
| from pathlib import Path | |
| from typing import Optional, List, Tuple | |
| from datetime import datetime | |
| import logging | |
| import json | |
| logger = logging.getLogger(__name__) | |
| class FilesManager: | |
| """Manager para gerenciamento de arquivos e compressão TAR.GZ.""" | |
| def __init__(self, base_path: Optional[str] = None): | |
| """ | |
| Inicializa o FilesManager. | |
| Args: | |
| base_path: Caminho base para armazenamento. Se None, usa variável de ambiente. | |
| """ | |
| self.base_path = Path(base_path or os.getenv('FILES_BASE_PATH', './data/files')) | |
| self.base_path.mkdir(parents=True, exist_ok=True) | |
| self.archives_path = self.base_path / 'archives' | |
| self.archives_path.mkdir(parents=True, exist_ok=True) | |
| self.temp_path = self.base_path / 'temp' | |
| self.temp_path.mkdir(parents=True, exist_ok=True) | |
| logger.info(f"FilesManager inicializado com base_path: {self.base_path}") | |
| def create_archive(self, source_dir: str, archive_name: str, compress: bool = True) -> Tuple[str, str]: | |
| """ | |
| Cria arquivo TAR.GZ a partir de um diretório. | |
| Args: | |
| source_dir: Caminho do diretório de origem. | |
| archive_name: Nome do arquivo a ser criado (sem extensão). | |
| compress: Se True, comprime com gzip. | |
| Returns: | |
| Tupla (caminho_arquivo, hash_sha256) | |
| """ | |
| try: | |
| source_path = Path(source_dir) | |
| if not source_path.exists(): | |
| raise FileNotFoundError(f"Diretório não encontrado: {source_dir}") | |
| ext = '.tar.gz' if compress else '.tar' | |
| archive_path = self.archives_path / f"{archive_name}{ext}" | |
| mode = 'w:gz' if compress else 'w' | |
| with tarfile.open(archive_path, mode) as tar: | |
| tar.add(source_path, arcname=source_path.name) | |
| file_hash = self._calculate_file_hash(str(archive_path)) | |
| logger.info(f"Arquivo criado: {archive_path} (hash: {file_hash})") | |
| return str(archive_path), file_hash | |
| except Exception as e: | |
| logger.error(f"Erro ao criar arquivo: {e}") | |
| raise | |
| def extract_archive(self, archive_path: str, extract_to: Optional[str] = None) -> str: | |
| """ | |
| Extrai arquivo TAR.GZ. | |
| Args: | |
| archive_path: Caminho do arquivo a extrair. | |
| extract_to: Caminho de destino. Se None, usa temp_path. | |
| Returns: | |
| Caminho do diretório extraÃdo. | |
| """ | |
| try: | |
| archive_file = Path(archive_path) | |
| if not archive_file.exists(): | |
| raise FileNotFoundError(f"Arquivo não encontrado: {archive_path}") | |
| extract_dir = Path(extract_to) if extract_to else self.temp_path / archive_file.stem | |
| extract_dir.mkdir(parents=True, exist_ok=True) | |
| with tarfile.open(archive_file, 'r:gz' if archive_file.suffix == '.gz' else 'r') as tar: | |
| tar.extractall(path=extract_dir) | |
| logger.info(f"Arquivo extraÃdo: {archive_file} -> {extract_dir}") | |
| return str(extract_dir) | |
| except Exception as e: | |
| logger.error(f"Erro ao extrair arquivo: {e}") | |
| raise | |
| def delete_archive(self, archive_path: str) -> bool: | |
| """ | |
| Deleta um arquivo de arquivo. | |
| Args: | |
| archive_path: Caminho do arquivo a deletar. | |
| Returns: | |
| True se deletado com sucesso. | |
| """ | |
| try: | |
| file_path = Path(archive_path) | |
| if file_path.exists(): | |
| file_path.unlink() | |
| logger.info(f"Arquivo deletado: {archive_path}") | |
| return True | |
| return False | |
| except Exception as e: | |
| logger.error(f"Erro ao deletar arquivo: {e}") | |
| raise | |
| def list_archives(self) -> List[dict]: | |
| """ | |
| Lista todos os arquivos armazenados. | |
| Returns: | |
| Lista de dicts com informações dos arquivos. | |
| """ | |
| try: | |
| archives = [] | |
| for archive_file in self.archives_path.glob('*'): | |
| if archive_file.is_file(): | |
| stat = archive_file.stat() | |
| archives.append({ | |
| 'nome': archive_file.name, | |
| 'tamanho_bytes': stat.st_size, | |
| 'data_criacao': datetime.fromtimestamp(stat.st_ctime).isoformat(), | |
| 'caminho': str(archive_file) | |
| }) | |
| logger.info(f"Listados {len(archives)} arquivos") | |
| return archives | |
| except Exception as e: | |
| logger.error(f"Erro ao listar arquivos: {e}") | |
| raise | |
| def get_archive_info(self, archive_path: str) -> dict: | |
| """ | |
| Retorna informações sobre um arquivo. | |
| Args: | |
| archive_path: Caminho do arquivo. | |
| Returns: | |
| Dict com informações do arquivo. | |
| """ | |
| try: | |
| file_path = Path(archive_path) | |
| if not file_path.exists(): | |
| raise FileNotFoundError(f"Arquivo não encontrado: {archive_path}") | |
| stat = file_path.stat() | |
| file_hash = self._calculate_file_hash(archive_path) | |
| info = { | |
| 'nome': file_path.name, | |
| 'tamanho_bytes': stat.st_size, | |
| 'data_criacao': datetime.fromtimestamp(stat.st_ctime).isoformat(), | |
| 'data_modificacao': datetime.fromtimestamp(stat.st_mtime).isoformat(), | |
| 'hash_sha256': file_hash, | |
| 'tipo': 'tar.gz' if file_path.suffix == '.gz' else 'tar' | |
| } | |
| # Listar conteúdo do arquivo | |
| try: | |
| with tarfile.open(archive_path, 'r:gz' if file_path.suffix == '.gz' else 'r') as tar: | |
| info['arquivo_conteudo'] = [m.name for m in tar.getmembers()] | |
| info['total_arquivos'] = len(tar.getmembers()) | |
| except: | |
| pass | |
| return info | |
| except Exception as e: | |
| logger.error(f"Erro ao obter info do arquivo: {e}") | |
| raise | |
| def _calculate_file_hash(self, file_path: str) -> str: | |
| """ | |
| Calcula hash SHA256 de um arquivo. | |
| Args: | |
| file_path: Caminho do arquivo. | |
| Returns: | |
| Hash SHA256 em hex. | |
| """ | |
| sha256_hash = hashlib.sha256() | |
| with open(file_path, 'rb') as f: | |
| for byte_block in iter(lambda: f.read(4096), b''): | |
| sha256_hash.update(byte_block) | |
| return sha256_hash.hexdigest() | |
| def cleanup_temp(self) -> int: | |
| """ | |
| Remove arquivos temporários antigos. | |
| Returns: | |
| Número de arquivos removidos. | |
| """ | |
| try: | |
| removed_count = 0 | |
| if self.temp_path.exists(): | |
| shutil.rmtree(self.temp_path) | |
| self.temp_path.mkdir(parents=True, exist_ok=True) | |
| removed_count = 1 | |
| logger.info(f"Limpeza de temp concluÃda: {removed_count} item(ns) removido(s)") | |
| return removed_count | |
| except Exception as e: | |
| logger.error(f"Erro ao limpar temp: {e}") | |
| raise | |
| def backup_archive(self, archive_path: str, backup_path: Optional[str] = None) -> str: | |
| """ | |
| Cria backup de um arquivo. | |
| Args: | |
| archive_path: Caminho do arquivo a fazer backup. | |
| backup_path: Caminho de destino. Se None, cria em backup/. | |
| Returns: | |
| Caminho do arquivo de backup. | |
| """ | |
| try: | |
| source = Path(archive_path) | |
| if not source.exists(): | |
| raise FileNotFoundError(f"Arquivo não encontrado: {archive_path}") | |
| backup_dir = Path(backup_path) if backup_path else self.base_path / 'backups' | |
| backup_dir.mkdir(parents=True, exist_ok=True) | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| backup_file = backup_dir / f"{source.stem}_{timestamp}{source.suffix}" | |
| shutil.copy2(source, backup_file) | |
| logger.info(f"Backup criado: {backup_file}") | |
| return str(backup_file) | |
| except Exception as e: | |
| logger.error(f"Erro ao fazer backup: {e}") | |
| raise | |
| def verify_integrity(self, archive_path: str, expected_hash: Optional[str] = None) -> bool: | |
| """ | |
| Verifica integridade de um arquivo TAR.GZ. | |
| Args: | |
| archive_path: Caminho do arquivo. | |
| expected_hash: Hash esperado (opcional). | |
| Returns: | |
| True se Ãntegro, False caso contrário. | |
| """ | |
| try: | |
| file_path = Path(archive_path) | |
| if not file_path.exists(): | |
| logger.warning(f"Arquivo não encontrado: {archive_path}") | |
| return False | |
| # Verificar se é um TAR válido | |
| try: | |
| with tarfile.open(archive_path, 'r:gz' if file_path.suffix == '.gz' else 'r') as tar: | |
| tar.getmembers() | |
| except tarfile.TarError as e: | |
| logger.warning(f"TAR inválido: {e}") | |
| return False | |
| # Verificar hash se fornecido | |
| if expected_hash: | |
| actual_hash = self._calculate_file_hash(archive_path) | |
| if actual_hash != expected_hash: | |
| logger.warning(f"Hash mismatch: {actual_hash} != {expected_hash}") | |
| return False | |
| logger.info(f"Integridade verificada: {archive_path}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Erro ao verificar integridade: {e}") | |
| return False | |
| # Instância global | |
| _files_manager: Optional[FilesManager] = None | |
| def get_files_manager() -> FilesManager: | |
| """Retorna a instância global do FilesManager.""" | |
| global _files_manager | |
| if _files_manager is None: | |
| _files_manager = FilesManager() | |
| return _files_manager | |
| def init_files_manager(base_path: Optional[str] = None) -> FilesManager: | |
| """Inicializa o FilesManager global.""" | |
| global _files_manager | |
| _files_manager = FilesManager(base_path) | |
| return _files_manager |