|
|
""" |
|
|
Nó para operações de banco de dados |
|
|
""" |
|
|
import os |
|
|
import logging |
|
|
import pandas as pd |
|
|
from typing import Dict, Any, TypedDict, Optional |
|
|
from sqlalchemy import create_engine |
|
|
|
|
|
from utils.config import SQL_DB_PATH |
|
|
from utils.database import create_sql_database, validate_database |
|
|
from utils.object_manager import get_object_manager |
|
|
|
|
|
class DatabaseState(TypedDict): |
|
|
"""Estado para operações de banco de dados""" |
|
|
success: bool |
|
|
message: str |
|
|
database_info: dict |
|
|
engine_id: str |
|
|
db_id: str |
|
|
|
|
|
async def create_database_from_dataframe_node(state: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Nó para criar banco de dados a partir de DataFrame processado |
|
|
|
|
|
Args: |
|
|
state: Estado contendo informações do DataFrame processado |
|
|
|
|
|
Returns: |
|
|
Estado atualizado com informações do banco |
|
|
""" |
|
|
try: |
|
|
obj_manager = get_object_manager() |
|
|
|
|
|
|
|
|
df_id = state.get("dataframe_id") |
|
|
if not df_id: |
|
|
raise ValueError("ID do DataFrame não encontrado no estado") |
|
|
|
|
|
processed_df = obj_manager.get_object(df_id) |
|
|
if processed_df is None: |
|
|
raise ValueError("DataFrame processado não encontrado") |
|
|
|
|
|
|
|
|
column_info = state.get("column_info", {}) |
|
|
sql_types = column_info.get("sql_types", {}) |
|
|
|
|
|
|
|
|
engine = create_engine(f"sqlite:///{SQL_DB_PATH}") |
|
|
|
|
|
|
|
|
processed_df.to_sql( |
|
|
"tabela", |
|
|
engine, |
|
|
index=False, |
|
|
if_exists="replace", |
|
|
dtype=sql_types |
|
|
) |
|
|
|
|
|
logging.info(f"[DATABASE] Banco criado com {len(processed_df)} registros") |
|
|
|
|
|
|
|
|
db = create_sql_database(engine) |
|
|
|
|
|
|
|
|
is_valid = validate_database(engine) |
|
|
|
|
|
|
|
|
engine_id = obj_manager.store_engine(engine) |
|
|
db_id = obj_manager.store_database(db) |
|
|
|
|
|
|
|
|
database_info = { |
|
|
"path": SQL_DB_PATH, |
|
|
"table_name": "tabela", |
|
|
"total_records": len(processed_df), |
|
|
"columns": list(processed_df.columns), |
|
|
"column_types": {col: str(dtype) for col, dtype in processed_df.dtypes.items()}, |
|
|
"is_valid": is_valid, |
|
|
"sql_types_used": {col: str(sql_type) for col, sql_type in sql_types.items()} |
|
|
} |
|
|
|
|
|
|
|
|
state.update({ |
|
|
"success": True, |
|
|
"message": f"✅ Banco de dados criado com sucesso! {len(processed_df)} registros salvos", |
|
|
"database_info": database_info, |
|
|
"engine_id": engine_id, |
|
|
"db_id": db_id |
|
|
}) |
|
|
|
|
|
logging.info(f"[DATABASE] Banco criado e validado: {database_info}") |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"❌ Erro ao criar banco de dados: {e}" |
|
|
logging.error(f"[DATABASE] {error_msg}") |
|
|
state.update({ |
|
|
"success": False, |
|
|
"message": error_msg, |
|
|
"database_info": {}, |
|
|
"engine_id": "", |
|
|
"db_id": "" |
|
|
}) |
|
|
|
|
|
return state |
|
|
|
|
|
async def load_existing_database_node(state: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Nó para carregar banco de dados existente |
|
|
|
|
|
Args: |
|
|
state: Estado atual |
|
|
|
|
|
Returns: |
|
|
Estado atualizado com informações do banco existente |
|
|
""" |
|
|
try: |
|
|
if not os.path.exists(SQL_DB_PATH): |
|
|
raise ValueError("Banco de dados não encontrado") |
|
|
|
|
|
|
|
|
engine = create_engine(f"sqlite:///{SQL_DB_PATH}") |
|
|
|
|
|
|
|
|
db = create_sql_database(engine) |
|
|
|
|
|
|
|
|
is_valid = validate_database(engine) |
|
|
|
|
|
|
|
|
try: |
|
|
sample_df = pd.read_sql_query("SELECT * FROM tabela LIMIT 5", engine) |
|
|
total_records_df = pd.read_sql_query("SELECT COUNT(*) as count FROM tabela", engine) |
|
|
total_records = total_records_df.iloc[0]['count'] |
|
|
|
|
|
database_info = { |
|
|
"path": SQL_DB_PATH, |
|
|
"table_name": "tabela", |
|
|
"total_records": total_records, |
|
|
"columns": list(sample_df.columns), |
|
|
"column_types": {col: str(dtype) for col, dtype in sample_df.dtypes.items()}, |
|
|
"is_valid": is_valid, |
|
|
"sample_data": sample_df.head(3).to_dict() |
|
|
} |
|
|
except Exception as e: |
|
|
logging.warning(f"Erro ao obter informações detalhadas do banco: {e}") |
|
|
database_info = { |
|
|
"path": SQL_DB_PATH, |
|
|
"table_name": "tabela", |
|
|
"is_valid": is_valid, |
|
|
"error": str(e) |
|
|
} |
|
|
|
|
|
|
|
|
obj_manager = get_object_manager() |
|
|
engine_id = obj_manager.store_engine(engine) |
|
|
db_id = obj_manager.store_database(db) |
|
|
|
|
|
|
|
|
state.update({ |
|
|
"success": True, |
|
|
"message": "✅ Banco de dados existente carregado com sucesso", |
|
|
"database_info": database_info, |
|
|
"engine_id": engine_id, |
|
|
"db_id": db_id |
|
|
}) |
|
|
|
|
|
logging.info(f"[DATABASE] Banco existente carregado: {database_info}") |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"❌ Erro ao carregar banco existente: {e}" |
|
|
logging.error(f"[DATABASE] {error_msg}") |
|
|
state.update({ |
|
|
"success": False, |
|
|
"message": error_msg, |
|
|
"database_info": {}, |
|
|
"engine_id": "", |
|
|
"db_id": "" |
|
|
}) |
|
|
|
|
|
return state |
|
|
|
|
|
async def get_database_sample_node(state: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Nó para obter amostra dos dados do banco |
|
|
|
|
|
Args: |
|
|
state: Estado contendo ID da engine |
|
|
|
|
|
Returns: |
|
|
Estado atualizado com amostra dos dados |
|
|
""" |
|
|
try: |
|
|
obj_manager = get_object_manager() |
|
|
|
|
|
|
|
|
engine_id = state.get("engine_id") |
|
|
if not engine_id: |
|
|
raise ValueError("ID da engine não encontrado") |
|
|
|
|
|
engine = obj_manager.get_engine(engine_id) |
|
|
if not engine: |
|
|
raise ValueError("Engine não encontrada") |
|
|
|
|
|
|
|
|
connection_type = state.get("connection_type", "csv") |
|
|
|
|
|
if connection_type == "postgresql": |
|
|
|
|
|
import sqlalchemy as sa |
|
|
|
|
|
try: |
|
|
with engine.connect() as conn: |
|
|
|
|
|
tables_result = conn.execute(sa.text(""" |
|
|
SELECT table_name |
|
|
FROM information_schema.tables |
|
|
WHERE table_schema = 'public' |
|
|
ORDER BY table_name |
|
|
""")) |
|
|
available_tables = [row[0] for row in tables_result.fetchall()] |
|
|
|
|
|
if not available_tables: |
|
|
raise ValueError("Nenhuma tabela encontrada no banco PostgreSQL") |
|
|
|
|
|
|
|
|
table_name = None |
|
|
for table in available_tables: |
|
|
try: |
|
|
|
|
|
count_result = conn.execute(sa.text(f"SELECT COUNT(*) FROM {table} LIMIT 1")) |
|
|
count = count_result.scalar() |
|
|
if count > 0: |
|
|
table_name = table |
|
|
logging.info(f"[DATABASE] PostgreSQL - usando tabela '{table_name}' para amostra ({count} registros)") |
|
|
break |
|
|
except Exception as e: |
|
|
logging.warning(f"[DATABASE] Erro ao verificar tabela {table}: {e}") |
|
|
continue |
|
|
|
|
|
|
|
|
if not table_name: |
|
|
table_name = available_tables[0] |
|
|
logging.info(f"[DATABASE] PostgreSQL - usando primeira tabela '{table_name}' (sem dados detectados)") |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"[DATABASE] Erro ao detectar tabelas PostgreSQL: {e}") |
|
|
raise ValueError(f"Erro ao acessar tabelas PostgreSQL: {e}") |
|
|
|
|
|
else: |
|
|
table_name = "tabela" |
|
|
logging.info(f"[DATABASE] CSV - usando tabela padrão: {table_name}") |
|
|
|
|
|
|
|
|
try: |
|
|
sample_df = pd.read_sql_query(f"SELECT * FROM {table_name} LIMIT 10", engine) |
|
|
logging.info(f"[DATABASE] Amostra obtida da tabela '{table_name}': {sample_df.shape[0]} registros") |
|
|
except Exception as e: |
|
|
logging.error(f"[DATABASE] Erro ao obter amostra da tabela '{table_name}': {e}") |
|
|
|
|
|
sample_df = pd.DataFrame() |
|
|
|
|
|
|
|
|
db_sample_dict = { |
|
|
"data": sample_df.to_dict('records'), |
|
|
"columns": list(sample_df.columns), |
|
|
"dtypes": sample_df.dtypes.astype(str).to_dict(), |
|
|
"shape": sample_df.shape |
|
|
} |
|
|
|
|
|
state["db_sample_dict"] = db_sample_dict |
|
|
|
|
|
logging.info(f"[DATABASE] Amostra obtida: {sample_df.shape[0]} registros") |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Erro ao obter amostra do banco: {e}" |
|
|
logging.error(f"[DATABASE] {error_msg}") |
|
|
state["db_sample_dict"] = {} |
|
|
state["error"] = error_msg |
|
|
|
|
|
return state |
|
|
|