IOI-RUN / formulario.py
Roudrigus's picture
Upload 82 files
0f0ef8d verified
# -*- coding: utf-8 -*-
import streamlit as st
from datetime import datetime, date
from banco import SessionLocal
from models import Equipamento
from sqlalchemy import distinct
from log import registrar_log
# ======================================================
# LISTAS FIXAS
# ======================================================
MODAL_LISTA = ["", "AÉREO", "MARÍTIMO", "EXPRESSO"]
INCLUSAO_EXCLUSAO_LISTA = ["", "INCLUSÃO", "EXCLUSÃO"]
RESP_ERRO_LISTA = ["", "Sim", "Não"]
D_LISTA = ["", "D1", "D2", "D3"]
# ======================================================
# LAYOUT
# ======================================================
def layout_padrao(titulo: str):
st.title(titulo)
st.divider()
# ======================================================
# UTILS
# ======================================================
def _safe_index(options, value, default=0):
try:
return options.index(value)
except Exception:
return default
def _ss_get(key, default=None):
return st.session_state.get(key, default)
def _ss_set(key, value):
st.session_state[key] = value
# 🟢 Parser de datas robusto (aceita date, datetime e string)
def _parse_date_any(v):
if isinstance(v, date):
return v
if isinstance(v, datetime):
return v.date()
if isinstance(v, str):
s = v.strip()
# ISO: YYYY-MM-DD
try:
return date.fromisoformat(s)
except Exception:
pass
# BR: DD/MM/YYYY
try:
return datetime.strptime(s, "%d/%m/%Y").date()
except Exception:
pass
# YYYY/MM/DD (alguns DBs/ETLs geram assim)
try:
return datetime.strptime(s, "%Y/%m/%d").date()
except Exception:
pass
return None
def _build_prefill_from_record(r: Equipamento) -> dict:
# 🟢 Usa parser que converte string->date quando possível
dc = _parse_date_any(getattr(r, "data_coleta", None))
return {
"fpso1": r.fpso1 or "",
"fpso": r.fpso or "",
"data_coleta": dc, # 🟢
"especialista": r.especialista or "",
"conferente": r.conferente or "",
"osm": r.osm or "",
"modal": r.modal or "",
"quant_equip": int(r.quant_equip or 0),
"mrob": r.mrob or "",
"linhas_osm": int(r.linhas_osm or 0),
"linhas_mrob": int(r.linhas_mrob or 0),
"linhas_erros": int(r.linhas_erros or 0),
"erro_storekeeper": r.erro_storekeeper or "",
"erro_operacao": r.erro_operacao or "",
"erro_especialista": r.erro_especialista or "",
"erro_outros": r.erro_outros or "",
"inclusao_exclusao": r.inclusao_exclusao or "",
"po": r.po or "",
"part_number": r.part_number or "",
"material": r.material or "",
"solicitante": r.solicitante or "",
"requisitante": r.requisitante or "",
"nota_fiscal": r.nota_fiscal or "",
"impacto": r.impacto or "",
"dimensao": r.dimensao or "",
"motivo": getattr(r, "motivo", "") or "",
"observacoes": r.observacoes or "",
"dia_inclusao": r.dia_inclusao or "",
"_origem_id": r.id,
}
@st.cache_data
def get_distinct(_campo):
db = SessionLocal()
try:
rows = db.query(distinct(_campo)).filter(_campo.isnot(None)).filter(_campo != "").all()
return sorted([r[0] for r in rows])
finally:
db.close()
def get_fpsos():
return [""] + get_distinct(Equipamento.fpso)
def get_fpsos1():
return [""] + get_distinct(Equipamento.fpso1)
def get_notas_fiscais():
return sorted([str(x) for x in get_distinct(Equipamento.nota_fiscal)])
def _apply_prefill_to_state(prefill: dict, debug=False):
"""
Aplica o prefill uma única vez por origem (ID), antes de criar widgets.
Se o valor não existir nas listas, move para o campo '➕ Novo ...' + text_input.
"""
if not prefill:
if debug: st.info("DEBUG: prefill vazio. Nada a aplicar.")
return
origem = prefill.get("_origem_id")
if _ss_get("__prefill_applied__") == origem:
if debug: st.info(f"DEBUG: prefill já aplicado para origem {origem}.")
return
# Carrega todos os campos no session_state como w_*
for k, v in prefill.items():
_ss_set(f"w_{k}", v)
# Backups para os inputs de texto dos "➕ Novo ..."
_ss_set("w_fpso1_text", prefill.get("fpso1", ""))
_ss_set("w_fpso_text", prefill.get("fpso", ""))
# Normalização para selects fixos
def _norm(key, allowed):
v = _ss_get(key, "")
if v not in allowed:
_ss_set(key, allowed[0])
_norm("w_modal", MODAL_LISTA)
_norm("w_erro_storekeeper", RESP_ERRO_LISTA)
_norm("w_erro_operacao", RESP_ERRO_LISTA)
_norm("w_erro_especialista", RESP_ERRO_LISTA)
_norm("w_erro_outros", RESP_ERRO_LISTA)
_norm("w_inclusao_exclusao", INCLUSAO_EXCLUSAO_LISTA)
_norm("w_dia_inclusao", D_LISTA)
# 🟢 Tipos/valores coerentes
_ss_set("w_quant_equip", int(_ss_get("w_quant_equip", 0) or 0))
_ss_set("w_linhas_osm", int(_ss_get("w_linhas_osm", 0) or 0))
_ss_set("w_linhas_mrob", int(_ss_get("w_linhas_mrob", 0) or 0))
_ss_set("w_linhas_erros", int(_ss_get("w_linhas_erros", 0) or 0))
# 🟢 NÃO força “hoje” se vier None — mas o widget precisa de um valor (trata na UI)
# _ss_set("w_data_coleta", _ss_get("w_data_coleta") or date.today())
_ss_set("__prefill_applied__", origem)
if debug: st.success(f"DEBUG: prefill aplicado. origem={origem}")
# ======================================================
# APP
# ======================================================
def main():
layout_padrao(
"📋 CONTROLE DE OSM – Inclusões / Exclusões • Visão por linha (D3 a partir das 16h)"
)
debug = st.toggle("🔍 Modo debug", value=False)
# ======================================================
# EXPANDER • Pré-carregar por Nota Fiscal
# ======================================================
with st.expander("🧾 Pré-carregar via Nota Fiscal (clonar como nova entrada)", expanded=False):
col1, col2, col3 = st.columns([2, 2, 1.2])
nf_digitado = col1.text_input("Digite a Nota Fiscal", key="nf_lookup")
nf_select = col2.selectbox("Ou selecione", [""] + get_notas_fiscais(), key="nf_select")
marcar_origem = col3.checkbox("Marcar origem", value=True, key="nf_marcar_origem")
c1, c2 = st.columns(2)
buscar = c1.button("🔎 Buscar NF", key="btn_buscar_nf")
limpar = c2.button("🧹 Limpar pré-preenchimento", key="btn_limpar_prefill")
if limpar:
# Limpa apenas os w_* e marcadores de prefill/resultados
for k in list(st.session_state.keys()):
if k.startswith("w_"):
del st.session_state[k]
st.session_state.pop("form_prefill", None)
st.session_state.pop("__prefill_applied__", None)
st.session_state.pop("__nf_busca_result__", None)
st.session_state.pop("__nf_busca_opts__", None)
st.session_state.pop("sel_registro_base", None)
st.success("Pré-preenchimento limpo.")
st.rerun()
if buscar:
nf = (nf_digitado or nf_select or "").strip()
if not nf:
st.warning("Informe ou selecione uma Nota Fiscal.")
else:
db = SessionLocal()
try:
regs = (
db.query(Equipamento)
.filter(Equipamento.nota_fiscal == nf)
.order_by(Equipamento.id.desc())
.all()
)
except Exception as e:
regs = []
st.error(f"Erro ao buscar NF: {e}")
finally:
db.close()
if not regs:
st.info("Nenhum registro encontrado para a NF informada.")
st.session_state.pop("__nf_busca_result__", None)
st.session_state.pop("__nf_busca_opts__", None)
else:
# Persistimos APENAS IDs e um resumo textual (não guardamos objetos SQLAlchemy)
result = [
{"id": r.id, "data_coleta": str(r.data_coleta), "fpso": r.fpso or "—", "osm": r.osm or "—"}
for r in regs
]
opts = [f"ID {x['id']} | {x['data_coleta']} | FPSO {x['fpso']} | OSM {x['osm']}" for x in result]
st.session_state["__nf_busca_result__"] = result
st.session_state["__nf_busca_opts__"] = opts
# Reset da seleção anterior
st.session_state["sel_registro_base"] = opts[0] if opts else None
# ⬇️ Mostrar seleção + botão USAR sempre que houver resultado persistido
if st.session_state.get("__nf_busca_opts__"):
escolha = st.selectbox(
"Selecione o registro base",
st.session_state["__nf_busca_opts__"],
key="sel_registro_base"
)
if st.button("📥 Usar este registro como base", key="btn_usar_base"):
try:
# Extrai o ID do texto selecionado
chosen_id = int(escolha.split()[1])
except Exception:
st.error("Falha ao identificar o ID do registro selecionado.")
chosen_id = None
if chosen_id:
# Busca o registro no banco por ID (seguro e stateless)
db = SessionLocal()
try:
base = db.query(Equipamento).filter(Equipamento.id == chosen_id).first()
except Exception as e:
base = None
st.error(f"Erro ao buscar o registro por ID: {e}")
finally:
db.close()
if not base:
st.error("Registro não encontrado. Tente buscar novamente.")
else:
pre = _build_prefill_from_record(base)
if marcar_origem:
pre["observacoes"] = (pre.get("observacoes") or "") + \
f" [Clonado do ID {base.id} em {datetime.now():%d/%m/%Y %H:%M}]"
st.session_state["form_prefill"] = pre
st.session_state.pop("__prefill_applied__", None)
st.success("Pré-preenchimento carregado! Role até o formulário para revisar.")
st.rerun()
# ======================================================
# APLICAR PREFILL (antes de criar os widgets)
# ======================================================
prefill = _ss_get("form_prefill", {})
_apply_prefill_to_state(prefill, debug=debug)
if debug:
with st.expander("🔎 DEBUG • session_state (parcial)"):
keys = [k for k in st.session_state.keys() if k.startswith("w_") or k in ("form_prefill", "__prefill_applied__", "__nf_busca_result__", "__nf_busca_opts__", "sel_registro_base")]
dump = {k: st.session_state[k] for k in sorted(keys)}
st.write(dump)
# ======================================================
# FORMULÁRIO (SEM st.form)
# ======================================================
# -------- FPSO --------
st.subheader("🚢 Identificação FPSO")
c1, c2 = st.columns(2)
lista_fpso1 = get_fpsos1() + ["➕ Novo FPSO1"]
lista_fpso = get_fpsos() + ["➕ Novo FPSO"]
# Se valor não está na lista, muda para "➕ Novo …" e mantém texto
if _ss_get("w_fpso1", "") not in lista_fpso1:
_ss_set("w_fpso1", "➕ Novo FPSO1" if _ss_get("w_fpso1_text") else "")
if _ss_get("w_fpso", "") not in lista_fpso:
_ss_set("w_fpso", "➕ Novo FPSO" if _ss_get("w_fpso_text") else "")
with c1:
st.selectbox(
"FPSO1 *",
lista_fpso1,
index=_safe_index(lista_fpso1, _ss_get("w_fpso1", "")),
key="w_fpso1"
)
if _ss_get("w_fpso1") == "➕ Novo FPSO1":
st.text_input("Digite o novo FPSO1 *", key="w_fpso1_text")
fpso1 = _ss_get("w_fpso1_text", "").strip()
else:
_ss_set("w_fpso1_text", _ss_get("w_fpso1"))
fpso1 = _ss_get("w_fpso1")
with c2:
st.selectbox(
"FPSO *",
lista_fpso,
index=_safe_index(lista_fpso, _ss_get("w_fpso", "")),
key="w_fpso"
)
if _ss_get("w_fpso") == "➕ Novo FPSO":
st.text_input("Digite o novo FPSO *", key="w_fpso_text")
fpso = _ss_get("w_fpso_text", "").strip()
else:
_ss_set("w_fpso_text", _ss_get("w_fpso"))
fpso = _ss_get("w_fpso")
st.divider()
# -------- Dados Operacionais --------
st.subheader("📦 Dados Operacionais")
c1, c2, c3 = st.columns(3)
with c1:
# 🟢 Usa valor do estado; se vier None, o widget ainda precisa de um date válido.
st.date_input(
"Data de Coleta na ARM *",
value=_ss_get("w_data_coleta") or date.today(),
key="w_data_coleta"
)
st.text_input("Especialista Responsável *", key="w_especialista")
st.text_input("Conferente Responsável *", key="w_conferente")
st.text_input("OSM *", key="w_osm")
with c2:
st.selectbox("Modal *", MODAL_LISTA,
index=_safe_index(MODAL_LISTA, _ss_get("w_modal", "")),
key="w_modal")
st.number_input("Quantidade de Equipamentos *", min_value=0, value=_ss_get("w_quant_equip", 0), key="w_quant_equip")
st.text_input("MROB *", key="w_mrob")
with c3:
st.number_input("Total de Linhas OSM", min_value=0, value=_ss_get("w_linhas_osm", 0), key="w_linhas_osm")
st.number_input("Total de Linhas MROB", min_value=0, value=_ss_get("w_linhas_mrob", 0), key="w_linhas_mrob")
st.number_input("Total de Linhas com Erro", min_value=0, value=_ss_get("w_linhas_erros", 0), key="w_linhas_erros")
st.divider()
# -------- Análise de Erros --------
st.subheader("⚠️ Análise de Erros")
c1, c2, c3, c4 = st.columns(4)
c1.selectbox("Storekeeper", RESP_ERRO_LISTA, index=_safe_index(RESP_ERRO_LISTA, _ss_get("w_erro_storekeeper", "")), key="w_erro_storekeeper")
c2.selectbox("Operação WH", RESP_ERRO_LISTA, index=_safe_index(RESP_ERRO_LISTA, _ss_get("w_erro_operacao", "")), key="w_erro_operacao")
c3.selectbox("Especialista WH", RESP_ERRO_LISTA, index=_safe_index(RESP_ERRO_LISTA, _ss_get("w_erro_especialista", "")), key="w_erro_especialista")
c4.selectbox("Outros", RESP_ERRO_LISTA, index=_safe_index(RESP_ERRO_LISTA, _ss_get("w_erro_outros", "")), key="w_erro_outros")
st.selectbox(
"Inclusão / Exclusão",
INCLUSAO_EXCLUSAO_LISTA,
index=_safe_index(INCLUSAO_EXCLUSAO_LISTA, _ss_get("w_inclusao_exclusao", "")),
key="w_inclusao_exclusao"
)
st.divider()
# -------- Dados Administrativos --------
st.subheader("🧾 Dados Administrativos")
ca, cb, cc = st.columns(3)
with ca:
st.text_input("PO", key="w_po")
st.text_input("Part Number", key="w_part_number")
with cb:
st.text_input("Material", key="w_material")
st.text_input("Nota Fiscal *", key="w_nota_fiscal")
with cc:
st.text_input("Solicitante *", key="w_solicitante")
st.text_input("Requisitante *", key="w_requisitante")
st.text_input("Impacto *", key="w_impacto")
st.text_input("Dimensão *", key="w_dimensao")
st.text_input("Motivo da Inclusão / Exclusão", key="w_motivo")
st.text_area("Observações", key="w_observacoes", height=120)
# -------- Dia D --------
st.subheader("🗓️ Dia de Inclusão (D)")
st.selectbox(
"Selecione o dia",
D_LISTA,
index=_safe_index(D_LISTA, _ss_get("w_dia_inclusao", "")),
key="w_dia_inclusao"
)
# ======================================================
# SALVAR (sempre nova entrada)
# ======================================================
if st.button("💾 Salvar Registro", type="primary"):
# Lê valores atuais do session_state
data_coleta = _ss_get("w_data_coleta")
especialista = _ss_get("w_especialista", "").strip()
conferente = _ss_get("w_conferente", "").strip()
osm = _ss_get("w_osm", "").strip()
modal = _ss_get("w_modal", "")
quant_equip = int(_ss_get("w_quant_equip", 0) or 0)
mrob = _ss_get("w_mrob", "").strip()
linhas_osm = int(_ss_get("w_linhas_osm", 0) or 0)
linhas_mrob = int(_ss_get("w_linhas_mrob", 0) or 0)
linhas_erros = int(_ss_get("w_linhas_erros", 0) or 0)
erro_storekeeper = _ss_get("w_erro_storekeeper", "")
erro_operacao = _ss_get("w_erro_operacao", "")
erro_especialista = _ss_get("w_erro_especialista", "")
erro_outros = _ss_get("w_erro_outros", "")
inclusao_exclusao = _ss_get("w_inclusao_exclusao", "")
po = _ss_get("w_po", "").strip()
part_number = _ss_get("w_part_number", "").strip()
material = _ss_get("w_material", "").strip()
solicitante = _ss_get("w_solicitante", "").strip()
requisitante = _ss_get("w_requisitante", "").strip()
nota_fiscal = _ss_get("w_nota_fiscal", "").strip()
impacto = _ss_get("w_impacto", "").strip()
dimensao = _ss_get("w_dimensao", "").strip()
motivo = _ss_get("w_motivo", "").strip()
observacoes = _ss_get("w_observacoes", "").strip()
dia_inclusao = _ss_get("w_dia_inclusao", "")
# Campos FPSO/FPSO1
fpso1 = _ss_get("w_fpso1_text", "").strip() if _ss_get("w_fpso1") == "➕ Novo FPSO1" else _ss_get("w_fpso1", "").strip()
fpso = _ss_get("w_fpso_text", "").strip() if _ss_get("w_fpso") == "➕ Novo FPSO" else _ss_get("w_fpso", "").strip()
obrigatorios = {
"FPSO1": fpso1,
"FPSO": fpso,
"Especialista": especialista,
"Conferente": conferente,
"OSM": osm,
"Modal": modal,
"MROB": mrob,
"Solicitante": solicitante,
"Requisitante": requisitante,
"Nota Fiscal": nota_fiscal,
"Impacto": impacto,
"Dimensão": dimensao,
"Dia de Inclusão (D)": dia_inclusao,
}
faltantes = [k for k, v in obrigatorios.items() if not v]
if faltantes:
st.error("❌ Campos obrigatórios não preenchidos: " + ", ".join(faltantes))
return
db = SessionLocal()
try:
novo = Equipamento(
fpso1=fpso1,
fpso=fpso,
data_coleta=data_coleta,
especialista=especialista,
conferente=conferente,
osm=osm,
modal=modal,
quant_equip=quant_equip,
mrob=mrob,
linhas_osm=linhas_osm,
linhas_mrob=linhas_mrob,
linhas_erros=linhas_erros,
erro_storekeeper=erro_storekeeper,
erro_operacao=erro_operacao,
erro_especialista=erro_especialista,
erro_outros=erro_outros,
inclusao_exclusao=inclusao_exclusao,
po=po,
part_number=part_number,
material=material,
solicitante=solicitante,
motivo=motivo,
requisitante=requisitante,
nota_fiscal=nota_fiscal,
impacto=impacto,
dimensao=dimensao,
observacoes=observacoes,
dia_inclusao=dia_inclusao,
data_hora_input=datetime.now(),
)
db.add(novo)
db.commit()
registrar_log(
usuario=_ss_get("usuario", "desconhecido"),
acao="INSERIR",
tabela="equipamentos",
registro_id=novo.id
)
st.success("✅ Registro salvo com sucesso!")
st.rerun()
except Exception as e:
db.rollback()
st.error(f"❌ Erro ao salvar: {e}")
finally:
db.close()
if __name__ == "__main__":
main()