rag_template / db /migrate.py
Guilherme Favaron
Major update: Add hybrid search, reranking, multiple LLMs, and UI improvements
1b447de
#!/usr/bin/env python3
"""
Script para executar migrações de banco de dados
"""
import sys
from pathlib import Path
import psycopg
# Adiciona src ao path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.config import DATABASE_URL
from src.logging_config import db_logger
class MigrationRunner:
"""Executor de migrações SQL"""
def __init__(self, database_url: str):
self.database_url = database_url
self.migrations_dir = Path(__file__).parent / "migrations"
def get_pending_migrations(self, conn) -> list:
"""
Retorna lista de migrações pendentes
Args:
conn: Conexão com banco
Returns:
Lista de arquivos SQL pendentes
"""
# Cria tabela de controle se não existir
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS schema_migrations (
version VARCHAR(255) PRIMARY KEY,
applied_at TIMESTAMP DEFAULT NOW()
)
""")
conn.commit()
# Busca migrações já aplicadas
cur.execute("SELECT version FROM schema_migrations ORDER BY version")
applied = {row[0] for row in cur.fetchall()}
# Lista todos arquivos SQL na pasta migrations
all_migrations = sorted(self.migrations_dir.glob("*.sql"))
# Filtra apenas pendentes
pending = [
f for f in all_migrations
if f.stem not in applied
]
return pending
def run_migration(self, conn, migration_file: Path) -> bool:
"""
Executa uma migração
Args:
conn: Conexão com banco
migration_file: Arquivo SQL da migração
Returns:
True se sucesso, False se falha
"""
version = migration_file.stem
try:
db_logger.info(f"Executando migração: {version}")
# Lê arquivo SQL
sql = migration_file.read_text(encoding="utf-8")
# Executa em transaction
with conn.cursor() as cur:
cur.execute(sql)
# Registra migração como aplicada
cur.execute(
"INSERT INTO schema_migrations (version) VALUES (%s)",
(version,)
)
conn.commit()
db_logger.info(f"Migração {version} aplicada com sucesso")
return True
except Exception as e:
conn.rollback()
db_logger.error(f"Erro ao executar migração {version}: {str(e)}")
return False
def run_all(self) -> tuple:
"""
Executa todas as migrações pendentes
Returns:
Tupla (total_aplicadas, total_falhadas)
"""
try:
conn = psycopg.connect(self.database_url, autocommit=False)
except Exception as e:
db_logger.error(f"Erro ao conectar ao banco: {str(e)}")
return 0, 0
try:
pending = self.get_pending_migrations(conn)
if not pending:
db_logger.info("Nenhuma migração pendente")
return 0, 0
db_logger.info(f"Encontradas {len(pending)} migrações pendentes")
applied = 0
failed = 0
for migration in pending:
if self.run_migration(conn, migration):
applied += 1
else:
failed += 1
break # Para na primeira falha
return applied, failed
finally:
conn.close()
def show_status(self) -> None:
"""Mostra status das migrações"""
try:
conn = psycopg.connect(self.database_url)
except Exception as e:
print(f"Erro ao conectar ao banco: {str(e)}")
return
try:
with conn.cursor() as cur:
# Verifica se tabela existe
cur.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'schema_migrations'
)
""")
if not cur.fetchone()[0]:
print("Nenhuma migração aplicada ainda")
return
# Lista migrações aplicadas
cur.execute("""
SELECT version, applied_at
FROM schema_migrations
ORDER BY version
""")
rows = cur.fetchall()
if not rows:
print("Nenhuma migração aplicada ainda")
return
print(f"\nMigrações aplicadas ({len(rows)}):\n")
print(f"{'Versão':<40} {'Data de Aplicação':<25}")
print("-" * 65)
for version, applied_at in rows:
print(f"{version:<40} {str(applied_at):<25}")
finally:
conn.close()
def main():
"""Função principal"""
if len(sys.argv) < 2:
print("Uso: python migrate.py [run|status]")
print(" run - Executa migrações pendentes")
print(" status - Mostra status das migrações")
sys.exit(1)
command = sys.argv[1]
runner = MigrationRunner(DATABASE_URL)
if command == "run":
applied, failed = runner.run_all()
print(f"\nResultado:")
print(f" Aplicadas: {applied}")
print(f" Falhadas: {failed}")
if failed > 0:
sys.exit(1)
elif command == "status":
runner.show_status()
else:
print(f"Comando desconhecido: {command}")
sys.exit(1)
if __name__ == "__main__":
main()