|
|
|
|
|
""" |
|
|
banco.py |
|
|
Compatível com: |
|
|
- Roteamento por ambiente (db_router.py): produção/teste/treinamento |
|
|
- Fallback: um único DATABASE_URL vindo de env/Secrets |
|
|
- Postgres / MySQL / SQLite (c/ alias de case para load.db no Linux) |
|
|
- Garantia de criação do diretório pai do SQLite (evita 'unable to open database file') |
|
|
|
|
|
Principais melhorias: |
|
|
- 'engine' agora é um PROXY dinâmico (quando houver db_router), refletindo a escolha atual. |
|
|
- Mantém API compatível: engine, Base, SessionLocal, db_info(), get_engine(), etc. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import shutil |
|
|
import importlib |
|
|
from typing import Optional |
|
|
|
|
|
from sqlalchemy import create_engine |
|
|
from sqlalchemy.orm import sessionmaker, declarative_base |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
|
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _ensure_sqlite_case_alias() -> str: |
|
|
""" |
|
|
Garante que exista 'load.db' no diretório do app. |
|
|
Se encontrar 'Load.db' (ou outra variação de caixa), copia para 'load.db'. |
|
|
Retorna o caminho absoluto de 'load.db'. |
|
|
""" |
|
|
lower = os.path.join(BASE_DIR, "load.db") |
|
|
if os.path.exists(lower): |
|
|
return lower |
|
|
|
|
|
|
|
|
for cand in ("Load.db", "LOAD.DB", "Load.DB"): |
|
|
up = os.path.join(BASE_DIR, cand) |
|
|
if os.path.exists(up): |
|
|
try: |
|
|
shutil.copy(up, lower) |
|
|
except Exception: |
|
|
|
|
|
pass |
|
|
break |
|
|
|
|
|
return lower |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _ensure_parent_dir_sqlite(sqlite_url: str) -> str: |
|
|
""" |
|
|
Garante que o diretório pai do arquivo SQLite exista. |
|
|
Caso não consiga criar (permissão), cai para ~/.ioirun/<arquivo>.db |
|
|
Retorna a URL (que pode ser ajustada para fallback). |
|
|
""" |
|
|
if not sqlite_url or not sqlite_url.startswith("sqlite"): |
|
|
return sqlite_url |
|
|
|
|
|
|
|
|
path = sqlite_url.replace("sqlite:///", "", 1) |
|
|
if path.startswith("//"): |
|
|
path = path[1:] |
|
|
file_path = os.path.abspath(path) |
|
|
parent = os.path.dirname(file_path) |
|
|
|
|
|
try: |
|
|
os.makedirs(parent, exist_ok=True) |
|
|
return sqlite_url |
|
|
except Exception: |
|
|
|
|
|
home_dir = os.path.join(os.path.expanduser("~"), ".ioirun") |
|
|
os.makedirs(home_dir, exist_ok=True) |
|
|
alt = os.path.join(home_dir, os.path.basename(file_path)) |
|
|
return f"sqlite:///{alt}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
from db_router import ( |
|
|
get_engine as _router_get_engine, |
|
|
get_session_factory as _router_get_session_factory, |
|
|
SessionLocal as _router_SessionLocal, |
|
|
current_db_choice as _router_current_choice, |
|
|
bank_label as _router_bank_label, |
|
|
) |
|
|
_HAS_ROUTER = True |
|
|
except Exception: |
|
|
_HAS_ROUTER = False |
|
|
_router_get_engine = None |
|
|
_router_get_session_factory = None |
|
|
_router_SessionLocal = None |
|
|
_router_current_choice = None |
|
|
_router_bank_label = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _build_fallback_uri() -> str: |
|
|
""" |
|
|
Monta a URI do banco quando não existe db_router. |
|
|
Ordem de preferência: |
|
|
1. DATABASE_URL (completo) |
|
|
2. Variáveis separadas: DB_DRIVER, DB_HOST, DB_PORT, DB_USER, DB_PASS, DB_NAME |
|
|
3. SQLite local em 'load.db' |
|
|
""" |
|
|
|
|
|
url = os.getenv("DATABASE_URL") |
|
|
if url: |
|
|
|
|
|
return _ensure_parent_dir_sqlite(url) |
|
|
|
|
|
|
|
|
driver = (os.getenv("DB_DRIVER") or "").strip().lower() |
|
|
host = os.getenv("DB_HOST") |
|
|
port = os.getenv("DB_PORT") |
|
|
user = os.getenv("DB_USER") |
|
|
pwd = os.getenv("DB_PASS") |
|
|
name = os.getenv("DB_NAME") |
|
|
|
|
|
if driver and host and user and pwd and name: |
|
|
|
|
|
if not port: |
|
|
port = "5432" if driver.startswith("post") else "3306" |
|
|
|
|
|
if driver.startswith("post"): |
|
|
return f"postgresql+psycopg2://{user}:{pwd}@{host}:{port}/{name}" |
|
|
elif driver.startswith("mysql"): |
|
|
return f"mysql+pymysql://{user}:{pwd}@{host}:{port}/{name}" |
|
|
|
|
|
|
|
|
sqlite_path = _ensure_sqlite_case_alias() |
|
|
return _ensure_parent_dir_sqlite(f"sqlite:///{sqlite_path}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if _HAS_ROUTER: |
|
|
|
|
|
def get_engine(): |
|
|
"""Retorna a engine do banco ATUAL (prod/test/treinamento).""" |
|
|
return _router_get_engine() |
|
|
|
|
|
def get_session_factory(): |
|
|
"""Retorna um sessionmaker para o banco ATUAL (do roteador).""" |
|
|
return _router_get_session_factory() |
|
|
|
|
|
|
|
|
SessionLocal = _router_SessionLocal |
|
|
|
|
|
else: |
|
|
|
|
|
DATABASE_URL = _build_fallback_uri() |
|
|
|
|
|
engine_args = { |
|
|
"echo": False, |
|
|
"pool_pre_ping": True, |
|
|
} |
|
|
|
|
|
if DATABASE_URL.startswith("sqlite"): |
|
|
|
|
|
engine_args["connect_args"] = {"check_same_thread": False} |
|
|
|
|
|
_engine = create_engine(DATABASE_URL, **engine_args) |
|
|
|
|
|
def get_engine(): |
|
|
""" |
|
|
Engine fixa do fallback. Para trocar de banco em runtime sem roteador, |
|
|
é necessário recriar a engine manualmente. |
|
|
""" |
|
|
return _engine |
|
|
|
|
|
def get_session_factory(): |
|
|
"""Fábrica de sessão fixa (fallback).""" |
|
|
return sessionmaker(autocommit=False, autoflush=False, bind=_engine) |
|
|
|
|
|
|
|
|
SessionLocal = get_session_factory() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Base = declarative_base() |
|
|
|
|
|
class _EngineProxy: |
|
|
""" |
|
|
Proxy leve que encaminha atributos/operções para a engine ATUAL. |
|
|
Isso permite manter 'engine' importável sem cristalizar a escolha de banco. |
|
|
""" |
|
|
def __getattr__(self, name): |
|
|
return getattr(get_engine(), name) |
|
|
|
|
|
def __repr__(self): |
|
|
eng = get_engine() |
|
|
try: |
|
|
url = str(eng.url) |
|
|
except Exception: |
|
|
url = "(url indisponível)" |
|
|
if _HAS_ROUTER and _router_current_choice: |
|
|
ch = _router_current_choice() |
|
|
lbl = _router_bank_label(ch) if _router_bank_label else ch |
|
|
return f"<EngineProxy choice={ch} label={lbl} url={url}>" |
|
|
return f"<EngineProxy url={url}>" |
|
|
|
|
|
|
|
|
engine = _EngineProxy() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def init_schema(): |
|
|
""" |
|
|
Cria/atualiza as tabelas no banco ATUAL. |
|
|
• Com roteador: aplica no banco escolhido (Produção/Teste/Treinamento). |
|
|
• Sem roteador: aplica no DATABASE_URL padrão. |
|
|
|
|
|
Use em DEV/TESTE; em produção, prefira migrações (ex.: Alembic). |
|
|
""" |
|
|
|
|
|
try: |
|
|
importlib.import_module("models") |
|
|
except ModuleNotFoundError: |
|
|
|
|
|
|
|
|
raise |
|
|
|
|
|
Base.metadata.create_all(bind=get_engine()) |
|
|
|
|
|
|
|
|
def db_info() -> dict: |
|
|
""" |
|
|
Retorna informações básicas do banco ativo (para debug/UX). |
|
|
""" |
|
|
eng = get_engine() |
|
|
try: |
|
|
url = str(eng.url) |
|
|
except Exception: |
|
|
try: |
|
|
url = DATABASE_URL |
|
|
except Exception: |
|
|
url = "(não disponível)" |
|
|
|
|
|
info = {"url": url, "using_router": _HAS_ROUTER} |
|
|
|
|
|
if _HAS_ROUTER and _router_current_choice: |
|
|
ch = _router_current_choice() |
|
|
info["choice"] = ch |
|
|
try: |
|
|
info["label"] = _router_bank_label(ch) |
|
|
except Exception: |
|
|
info["label"] = ch |
|
|
|
|
|
return info |