import os import tarfile import shutil import hashlib from pathlib import Path from typing import Optional, List, Tuple from datetime import datetime import logging import json logger = logging.getLogger(__name__) class FilesManager: """Manager para gerenciamento de arquivos e compressão TAR.GZ.""" def __init__(self, base_path: Optional[str] = None): """ Inicializa o FilesManager. Args: base_path: Caminho base para armazenamento. Se None, usa variável de ambiente. """ self.base_path = Path(base_path or os.getenv('FILES_BASE_PATH', './data/files')) self.base_path.mkdir(parents=True, exist_ok=True) self.archives_path = self.base_path / 'archives' self.archives_path.mkdir(parents=True, exist_ok=True) self.temp_path = self.base_path / 'temp' self.temp_path.mkdir(parents=True, exist_ok=True) logger.info(f"FilesManager inicializado com base_path: {self.base_path}") def create_archive(self, source_dir: str, archive_name: str, compress: bool = True) -> Tuple[str, str]: """ Cria arquivo TAR.GZ a partir de um diretório. Args: source_dir: Caminho do diretório de origem. archive_name: Nome do arquivo a ser criado (sem extensão). compress: Se True, comprime com gzip. Returns: Tupla (caminho_arquivo, hash_sha256) """ try: source_path = Path(source_dir) if not source_path.exists(): raise FileNotFoundError(f"Diretório não encontrado: {source_dir}") ext = '.tar.gz' if compress else '.tar' archive_path = self.archives_path / f"{archive_name}{ext}" mode = 'w:gz' if compress else 'w' with tarfile.open(archive_path, mode) as tar: tar.add(source_path, arcname=source_path.name) file_hash = self._calculate_file_hash(str(archive_path)) logger.info(f"Arquivo criado: {archive_path} (hash: {file_hash})") return str(archive_path), file_hash except Exception as e: logger.error(f"Erro ao criar arquivo: {e}") raise def extract_archive(self, archive_path: str, extract_to: Optional[str] = None) -> str: """ Extrai arquivo TAR.GZ. Args: archive_path: Caminho do arquivo a extrair. extract_to: Caminho de destino. Se None, usa temp_path. Returns: Caminho do diretório extraído. """ try: archive_file = Path(archive_path) if not archive_file.exists(): raise FileNotFoundError(f"Arquivo não encontrado: {archive_path}") extract_dir = Path(extract_to) if extract_to else self.temp_path / archive_file.stem extract_dir.mkdir(parents=True, exist_ok=True) with tarfile.open(archive_file, 'r:gz' if archive_file.suffix == '.gz' else 'r') as tar: tar.extractall(path=extract_dir) logger.info(f"Arquivo extraído: {archive_file} -> {extract_dir}") return str(extract_dir) except Exception as e: logger.error(f"Erro ao extrair arquivo: {e}") raise def delete_archive(self, archive_path: str) -> bool: """ Deleta um arquivo de arquivo. Args: archive_path: Caminho do arquivo a deletar. Returns: True se deletado com sucesso. """ try: file_path = Path(archive_path) if file_path.exists(): file_path.unlink() logger.info(f"Arquivo deletado: {archive_path}") return True return False except Exception as e: logger.error(f"Erro ao deletar arquivo: {e}") raise def list_archives(self) -> List[dict]: """ Lista todos os arquivos armazenados. Returns: Lista de dicts com informações dos arquivos. """ try: archives = [] for archive_file in self.archives_path.glob('*'): if archive_file.is_file(): stat = archive_file.stat() archives.append({ 'nome': archive_file.name, 'tamanho_bytes': stat.st_size, 'data_criacao': datetime.fromtimestamp(stat.st_ctime).isoformat(), 'caminho': str(archive_file) }) logger.info(f"Listados {len(archives)} arquivos") return archives except Exception as e: logger.error(f"Erro ao listar arquivos: {e}") raise def get_archive_info(self, archive_path: str) -> dict: """ Retorna informações sobre um arquivo. Args: archive_path: Caminho do arquivo. Returns: Dict com informações do arquivo. """ try: file_path = Path(archive_path) if not file_path.exists(): raise FileNotFoundError(f"Arquivo não encontrado: {archive_path}") stat = file_path.stat() file_hash = self._calculate_file_hash(archive_path) info = { 'nome': file_path.name, 'tamanho_bytes': stat.st_size, 'data_criacao': datetime.fromtimestamp(stat.st_ctime).isoformat(), 'data_modificacao': datetime.fromtimestamp(stat.st_mtime).isoformat(), 'hash_sha256': file_hash, 'tipo': 'tar.gz' if file_path.suffix == '.gz' else 'tar' } # Listar conteúdo do arquivo try: with tarfile.open(archive_path, 'r:gz' if file_path.suffix == '.gz' else 'r') as tar: info['arquivo_conteudo'] = [m.name for m in tar.getmembers()] info['total_arquivos'] = len(tar.getmembers()) except: pass return info except Exception as e: logger.error(f"Erro ao obter info do arquivo: {e}") raise def _calculate_file_hash(self, file_path: str) -> str: """ Calcula hash SHA256 de um arquivo. Args: file_path: Caminho do arquivo. Returns: Hash SHA256 em hex. """ sha256_hash = hashlib.sha256() with open(file_path, 'rb') as f: for byte_block in iter(lambda: f.read(4096), b''): sha256_hash.update(byte_block) return sha256_hash.hexdigest() def cleanup_temp(self) -> int: """ Remove arquivos temporários antigos. Returns: Número de arquivos removidos. """ try: removed_count = 0 if self.temp_path.exists(): shutil.rmtree(self.temp_path) self.temp_path.mkdir(parents=True, exist_ok=True) removed_count = 1 logger.info(f"Limpeza de temp concluída: {removed_count} item(ns) removido(s)") return removed_count except Exception as e: logger.error(f"Erro ao limpar temp: {e}") raise def backup_archive(self, archive_path: str, backup_path: Optional[str] = None) -> str: """ Cria backup de um arquivo. Args: archive_path: Caminho do arquivo a fazer backup. backup_path: Caminho de destino. Se None, cria em backup/. Returns: Caminho do arquivo de backup. """ try: source = Path(archive_path) if not source.exists(): raise FileNotFoundError(f"Arquivo não encontrado: {archive_path}") backup_dir = Path(backup_path) if backup_path else self.base_path / 'backups' backup_dir.mkdir(parents=True, exist_ok=True) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') backup_file = backup_dir / f"{source.stem}_{timestamp}{source.suffix}" shutil.copy2(source, backup_file) logger.info(f"Backup criado: {backup_file}") return str(backup_file) except Exception as e: logger.error(f"Erro ao fazer backup: {e}") raise def verify_integrity(self, archive_path: str, expected_hash: Optional[str] = None) -> bool: """ Verifica integridade de um arquivo TAR.GZ. Args: archive_path: Caminho do arquivo. expected_hash: Hash esperado (opcional). Returns: True se íntegro, False caso contrário. """ try: file_path = Path(archive_path) if not file_path.exists(): logger.warning(f"Arquivo não encontrado: {archive_path}") return False # Verificar se é um TAR válido try: with tarfile.open(archive_path, 'r:gz' if file_path.suffix == '.gz' else 'r') as tar: tar.getmembers() except tarfile.TarError as e: logger.warning(f"TAR inválido: {e}") return False # Verificar hash se fornecido if expected_hash: actual_hash = self._calculate_file_hash(archive_path) if actual_hash != expected_hash: logger.warning(f"Hash mismatch: {actual_hash} != {expected_hash}") return False logger.info(f"Integridade verificada: {archive_path}") return True except Exception as e: logger.error(f"Erro ao verificar integridade: {e}") return False # Instância global _files_manager: Optional[FilesManager] = None def get_files_manager() -> FilesManager: """Retorna a instância global do FilesManager.""" global _files_manager if _files_manager is None: _files_manager = FilesManager() return _files_manager def init_files_manager(base_path: Optional[str] = None) -> FilesManager: """Inicializa o FilesManager global.""" global _files_manager _files_manager = FilesManager(base_path) return _files_manager