#!/usr/bin/env python3 """ Script para executar migrações de banco de dados """ import sys from pathlib import Path import psycopg # Adiciona src ao path sys.path.insert(0, str(Path(__file__).parent.parent)) from src.config import DATABASE_URL from src.logging_config import db_logger class MigrationRunner: """Executor de migrações SQL""" def __init__(self, database_url: str): self.database_url = database_url self.migrations_dir = Path(__file__).parent / "migrations" def get_pending_migrations(self, conn) -> list: """ Retorna lista de migrações pendentes Args: conn: Conexão com banco Returns: Lista de arquivos SQL pendentes """ # Cria tabela de controle se não existir with conn.cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS schema_migrations ( version VARCHAR(255) PRIMARY KEY, applied_at TIMESTAMP DEFAULT NOW() ) """) conn.commit() # Busca migrações já aplicadas cur.execute("SELECT version FROM schema_migrations ORDER BY version") applied = {row[0] for row in cur.fetchall()} # Lista todos arquivos SQL na pasta migrations all_migrations = sorted(self.migrations_dir.glob("*.sql")) # Filtra apenas pendentes pending = [ f for f in all_migrations if f.stem not in applied ] return pending def run_migration(self, conn, migration_file: Path) -> bool: """ Executa uma migração Args: conn: Conexão com banco migration_file: Arquivo SQL da migração Returns: True se sucesso, False se falha """ version = migration_file.stem try: db_logger.info(f"Executando migração: {version}") # Lê arquivo SQL sql = migration_file.read_text(encoding="utf-8") # Executa em transaction with conn.cursor() as cur: cur.execute(sql) # Registra migração como aplicada cur.execute( "INSERT INTO schema_migrations (version) VALUES (%s)", (version,) ) conn.commit() db_logger.info(f"Migração {version} aplicada com sucesso") return True except Exception as e: conn.rollback() db_logger.error(f"Erro ao executar migração {version}: {str(e)}") return False def run_all(self) -> tuple: """ Executa todas as migrações pendentes Returns: Tupla (total_aplicadas, total_falhadas) """ try: conn = psycopg.connect(self.database_url, autocommit=False) except Exception as e: db_logger.error(f"Erro ao conectar ao banco: {str(e)}") return 0, 0 try: pending = self.get_pending_migrations(conn) if not pending: db_logger.info("Nenhuma migração pendente") return 0, 0 db_logger.info(f"Encontradas {len(pending)} migrações pendentes") applied = 0 failed = 0 for migration in pending: if self.run_migration(conn, migration): applied += 1 else: failed += 1 break # Para na primeira falha return applied, failed finally: conn.close() def show_status(self) -> None: """Mostra status das migrações""" try: conn = psycopg.connect(self.database_url) except Exception as e: print(f"Erro ao conectar ao banco: {str(e)}") return try: with conn.cursor() as cur: # Verifica se tabela existe cur.execute(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_name = 'schema_migrations' ) """) if not cur.fetchone()[0]: print("Nenhuma migração aplicada ainda") return # Lista migrações aplicadas cur.execute(""" SELECT version, applied_at FROM schema_migrations ORDER BY version """) rows = cur.fetchall() if not rows: print("Nenhuma migração aplicada ainda") return print(f"\nMigrações aplicadas ({len(rows)}):\n") print(f"{'Versão':<40} {'Data de Aplicação':<25}") print("-" * 65) for version, applied_at in rows: print(f"{version:<40} {str(applied_at):<25}") finally: conn.close() def main(): """Função principal""" if len(sys.argv) < 2: print("Uso: python migrate.py [run|status]") print(" run - Executa migrações pendentes") print(" status - Mostra status das migrações") sys.exit(1) command = sys.argv[1] runner = MigrationRunner(DATABASE_URL) if command == "run": applied, failed = runner.run_all() print(f"\nResultado:") print(f" Aplicadas: {applied}") print(f" Falhadas: {failed}") if failed > 0: sys.exit(1) elif command == "status": runner.show_status() else: print(f"Comando desconhecido: {command}") sys.exit(1) if __name__ == "__main__": main()