File size: 8,496 Bytes
a050b5d 9aabbbe 4019e91 a050b5d 9aabbbe a050b5d 9aabbbe a050b5d 4019e91 a050b5d 9aabbbe a050b5d 9aabbbe a050b5d 4019e91 a050b5d 4019e91 9aabbbe 4019e91 9aabbbe 4019e91 a050b5d 9aabbbe a050b5d 9aabbbe a050b5d 9aabbbe a050b5d 9aabbbe a050b5d 9aabbbe a050b5d 9aabbbe a050b5d 9aabbbe a050b5d 9aabbbe a050b5d 9aabbbe a050b5d 9aabbbe a050b5d 9aabbbe 4019e91 a050b5d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
# -*- coding: utf-8 -*-
"""
db_router.py — Roteia Engine/SessionLocal para:
- 'prod' → Load.db
- 'test' → Load_teste.db
- 'treinamento' → Load_treinamento.db
Mantém a escolha do usuário em st.session_state (quando disponível) e em variável
de ambiente (DB_CHOICE) para persistir entre reruns/contexts.
APIs expostas (compatíveis com o app):
• get_available_choices() -> list[str]
• set_current_db_choice(choice: str) -> None
• set_db_choice(choice: str) -> None
• current_db_choice() -> str
• bank_label(choice: str) -> str
• get_engine() -> sqlalchemy.Engine
• get_session_factory() -> sqlalchemy.orm.sessionmaker
• SessionLocal() -> sqlalchemy.orm.Session
Inclui garantia de criação do diretório pai do SQLite com fallback para ~/.ioirun.
Compatível com execução fora do Streamlit (fallback em estado global).
"""
from __future__ import annotations
import os
from typing import Dict, Optional, Any
# Streamlit é preferível; mas se não houver (execução fora do app), caímos em fallback
try:
import streamlit as st
_HAS_ST = True
except Exception:
_HAS_ST = False
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# ============================
# Configuração e caminhos base
# ============================
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# Nomes de arquivos de banco conforme sua especificação
PROD_DB_NAME = "Load.db"
TEST_DB_NAME = "Load_teste.db"
TREINAMENTO_DB_NAME = "Load_treinamento.db"
# (Opcional) uso de .env/Secrets para apontar outras URIs (Postgres/MySQL/SQLite abs.)
DB1_PROD_URL = os.getenv("DB1_PROD_URL", f"sqlite:///{os.path.join(BASE_DIR, PROD_DB_NAME)}")
DB2_TEST_URL = os.getenv("DB2_TEST_URL", f"sqlite:///{os.path.join(BASE_DIR, TEST_DB_NAME)}")
DB3_TREINAMENTO_URL = os.getenv("DB3_TREINAMENTO_URL", f"sqlite:///{os.path.join(BASE_DIR, TREINAMENTO_DB_NAME)}")
DB_URLS: Dict[str, str] = {
"prod": DB1_PROD_URL,
"test": DB2_TEST_URL,
"treinamento": DB3_TREINAMENTO_URL,
}
# Aliases aceitos (ex.: "train" → "treinamento")
CHOICE_ALIASES: Dict[str, str] = {
"train": "treinamento",
}
DB_LABELS: Dict[str, str] = {
"prod": "🟢 Produção",
"test": "🔴 Teste",
"treinamento": "🟡 Treinamento",
}
# ============================
# Garantir diretório do SQLite
# ============================
def _ensure_parent_dir_sqlite(url: str) -> str:
"""
Garante que a pasta do arquivo SQLite exista. Se não conseguir,
direciona para ~/.ioirun/<arquivo>.db (gravável no Spaces).
"""
if not url or not url.startswith("sqlite"):
return url
# Extrai caminho local do SQLite (sqlite:////abs/path ou sqlite:///rel/path)
prefix = "sqlite:///"
path = url[len(prefix):] if url.startswith(prefix) else url
# Normaliza e cria parent
file_path = os.path.abspath(path)
parent = os.path.dirname(file_path)
try:
os.makedirs(parent, exist_ok=True)
return url
except Exception:
# Fallback para HOME gravável
home_dir = os.path.join(os.path.expanduser("~"), ".ioirun")
os.makedirs(home_dir, exist_ok=True)
alt = os.path.join(home_dir, os.path.basename(file_path))
return f"sqlite:///{alt}"
# ============================
# Helpers de UI
# ============================
def get_available_choices() -> list[str]:
"""Lista as chaves de bancos disponíveis (para UI)."""
return list(DB_URLS.keys())
def list_banks() -> list[str]:
"""Compat anterior — alias de get_available_choices()."""
return get_available_choices()
def bank_label(choice: str) -> str:
"""Rótulo amigável para a UI."""
return DB_LABELS.get(choice, choice)
# ============================
# Chaves de sessão (ou fallback global)
# ============================
SESSION_DB_CHOICE_KEY = "__db_choice__" # "prod" | "test" | "treinamento"
SESSION_DB_ENGINE_KEY = "__db_engine__" # cache de engine
SESSION_DB_FACTORY_KEY = "__db_session_factory__" # cache de sessionmaker
# Fallback global quando não houver Streamlit
_GLOBAL_STATE: Dict[str, Any] = {
SESSION_DB_CHOICE_KEY: os.getenv("DB_CHOICE", "prod"),
SESSION_DB_ENGINE_KEY: None,
SESSION_DB_FACTORY_KEY: None,
}
def _session_get(key: str, default=None):
if _HAS_ST:
return st.session_state.get(key, default)
return _GLOBAL_STATE.get(key, default)
def _session_set(key: str, value):
if _HAS_ST:
st.session_state[key] = value
else:
_GLOBAL_STATE[key] = value
def _session_pop(key: str):
if _HAS_ST:
st.session_state.pop(key, None)
else:
_GLOBAL_STATE.pop(key, None)
# ============================
# Normalização da escolha
# ============================
def _normalize_choice(raw: Optional[str]) -> str:
val = (raw or "").strip().lower()
if val in CHOICE_ALIASES:
val = CHOICE_ALIASES[val]
if val not in DB_URLS:
val = "prod"
return val
# ============================
# Escolha do banco
# ============================
def set_db_choice(choice: str):
"""
Define o banco ativo para a sessão atual.
choice ∈ {"prod", "test", "treinamento"} (aceita alias "train" → "treinamento").
Invalida caches de engine/session e atualiza ENV (DB_CHOICE).
"""
choice = _normalize_choice(choice)
# Se houver engine cacheado, faça dispose antes de trocar
cached_engine = _session_get(SESSION_DB_ENGINE_KEY)
if cached_engine is not None:
try:
cached_engine.dispose()
except Exception:
pass
_session_set(SESSION_DB_CHOICE_KEY, choice)
os.environ["DB_CHOICE"] = choice # permite que outros módulos leiam
# Ao trocar, invalida caches
_session_pop(SESSION_DB_ENGINE_KEY)
_session_pop(SESSION_DB_FACTORY_KEY)
def set_current_db_choice(choice: str) -> None:
"""Alias compatível com app: set_current_db_choice(choice)."""
set_db_choice(choice)
def current_db_choice() -> str:
"""
Retorna 'prod' | 'test' | 'treinamento' (default: 'prod').
Prioriza session_state; se ausente, lê DB_CHOICE do ambiente.
"""
# 1) Session
val = _session_get(SESSION_DB_CHOICE_KEY)
# 2) ENV (permite seleção por URL/externo)
if val is None:
env_val = os.getenv("DB_CHOICE")
if env_val:
val = _normalize_choice(env_val)
_session_set(SESSION_DB_CHOICE_KEY, val)
# 3) Default
if val is None:
val = "prod"
_session_set(SESSION_DB_CHOICE_KEY, val)
# Sanitize
val = _normalize_choice(val)
if val != _session_get(SESSION_DB_CHOICE_KEY):
_session_set(SESSION_DB_CHOICE_KEY, val)
return val
# ============================
# Engine / Session por ambiente
# ============================
def _url_for_choice(choice: str) -> str:
return DB_URLS[choice]
def _engine_args_for_url(url: str) -> dict:
args = {
"echo": False,
"pool_pre_ping": True,
}
if url.startswith("sqlite:///"):
# evita erro em threads do Streamlit
args["connect_args"] = {"check_same_thread": False}
return args
def get_engine():
"""
Entrega o engine do banco ATIVO (por sessão). Cria e cacheia se necessário.
"""
choice = current_db_choice()
cached = _session_get(SESSION_DB_ENGINE_KEY)
if cached is not None and getattr(cached, "__db_choice__", None) == choice:
return cached
url = _url_for_choice(choice)
url = _ensure_parent_dir_sqlite(url) # ⬅️ garante diretório pai se for SQLite
eng = create_engine(url, **_engine_args_for_url(url))
setattr(eng, "__db_choice__", choice)
_session_set(SESSION_DB_ENGINE_KEY, eng)
return eng
def get_session_factory():
"""
Entrega um sessionmaker vinculado ao engine do banco ATIVO (em cache).
"""
choice = current_db_choice()
fac = _session_get(SESSION_DB_FACTORY_KEY)
if fac is not None and getattr(fac, "__db_choice__", None) == choice:
return fac
fac = sessionmaker(bind=get_engine(), autocommit=False, autoflush=False)
setattr(fac, "__db_choice__", choice)
_session_set(SESSION_DB_FACTORY_KEY, fac)
return fac
def SessionLocal():
"""
Cria uma sessão no banco ATIVO.
Uso:
db = SessionLocal()
try:
...
finally:
db.close()
"""
return get_session_factory()() |