# -*- coding: utf-8 -*- import streamlit as st from datetime import datetime, date from banco import SessionLocal from models import Equipamento from sqlalchemy import distinct from log import registrar_log # ====================================================== # LISTAS FIXAS # ====================================================== MODAL_LISTA = ["", "AÉREO", "MARÍTIMO", "EXPRESSO"] INCLUSAO_EXCLUSAO_LISTA = ["", "INCLUSÃO", "EXCLUSÃO"] RESP_ERRO_LISTA = ["", "Sim", "Não"] D_LISTA = ["", "D1", "D2", "D3"] # ====================================================== # LAYOUT # ====================================================== def layout_padrao(titulo: str): st.title(titulo) st.divider() # ====================================================== # UTILS # ====================================================== def _safe_index(options, value, default=0): try: return options.index(value) except Exception: return default def _ss_get(key, default=None): return st.session_state.get(key, default) def _ss_set(key, value): st.session_state[key] = value # 🟢 Parser de datas robusto (aceita date, datetime e string) def _parse_date_any(v): if isinstance(v, date): return v if isinstance(v, datetime): return v.date() if isinstance(v, str): s = v.strip() # ISO: YYYY-MM-DD try: return date.fromisoformat(s) except Exception: pass # BR: DD/MM/YYYY try: return datetime.strptime(s, "%d/%m/%Y").date() except Exception: pass # YYYY/MM/DD (alguns DBs/ETLs geram assim) try: return datetime.strptime(s, "%Y/%m/%d").date() except Exception: pass return None def _build_prefill_from_record(r: Equipamento) -> dict: # 🟢 Usa parser que converte string->date quando possível dc = _parse_date_any(getattr(r, "data_coleta", None)) return { "fpso1": r.fpso1 or "", "fpso": r.fpso or "", "data_coleta": dc, # 🟢 "especialista": r.especialista or "", "conferente": r.conferente or "", "osm": r.osm or "", "modal": r.modal or "", "quant_equip": int(r.quant_equip or 0), "mrob": r.mrob or "", "linhas_osm": int(r.linhas_osm or 0), "linhas_mrob": int(r.linhas_mrob or 0), "linhas_erros": int(r.linhas_erros or 0), "erro_storekeeper": r.erro_storekeeper or "", "erro_operacao": r.erro_operacao or "", "erro_especialista": r.erro_especialista or "", "erro_outros": r.erro_outros or "", "inclusao_exclusao": r.inclusao_exclusao or "", "po": r.po or "", "part_number": r.part_number or "", "material": r.material or "", "solicitante": r.solicitante or "", "requisitante": r.requisitante or "", "nota_fiscal": r.nota_fiscal or "", "impacto": r.impacto or "", "dimensao": r.dimensao or "", "motivo": getattr(r, "motivo", "") or "", "observacoes": r.observacoes or "", "dia_inclusao": r.dia_inclusao or "", "_origem_id": r.id, } @st.cache_data def get_distinct(_campo): db = SessionLocal() try: rows = db.query(distinct(_campo)).filter(_campo.isnot(None)).filter(_campo != "").all() return sorted([r[0] for r in rows]) finally: db.close() def get_fpsos(): return [""] + get_distinct(Equipamento.fpso) def get_fpsos1(): return [""] + get_distinct(Equipamento.fpso1) def get_notas_fiscais(): return sorted([str(x) for x in get_distinct(Equipamento.nota_fiscal)]) def _apply_prefill_to_state(prefill: dict, debug=False): """ Aplica o prefill uma única vez por origem (ID), antes de criar widgets. Se o valor não existir nas listas, move para o campo '➕ Novo ...' + text_input. """ if not prefill: if debug: st.info("DEBUG: prefill vazio. Nada a aplicar.") return origem = prefill.get("_origem_id") if _ss_get("__prefill_applied__") == origem: if debug: st.info(f"DEBUG: prefill já aplicado para origem {origem}.") return # Carrega todos os campos no session_state como w_* for k, v in prefill.items(): _ss_set(f"w_{k}", v) # Backups para os inputs de texto dos "➕ Novo ..." _ss_set("w_fpso1_text", prefill.get("fpso1", "")) _ss_set("w_fpso_text", prefill.get("fpso", "")) # Normalização para selects fixos def _norm(key, allowed): v = _ss_get(key, "") if v not in allowed: _ss_set(key, allowed[0]) _norm("w_modal", MODAL_LISTA) _norm("w_erro_storekeeper", RESP_ERRO_LISTA) _norm("w_erro_operacao", RESP_ERRO_LISTA) _norm("w_erro_especialista", RESP_ERRO_LISTA) _norm("w_erro_outros", RESP_ERRO_LISTA) _norm("w_inclusao_exclusao", INCLUSAO_EXCLUSAO_LISTA) _norm("w_dia_inclusao", D_LISTA) # 🟢 Tipos/valores coerentes _ss_set("w_quant_equip", int(_ss_get("w_quant_equip", 0) or 0)) _ss_set("w_linhas_osm", int(_ss_get("w_linhas_osm", 0) or 0)) _ss_set("w_linhas_mrob", int(_ss_get("w_linhas_mrob", 0) or 0)) _ss_set("w_linhas_erros", int(_ss_get("w_linhas_erros", 0) or 0)) # 🟢 NÃO força “hoje” se vier None — mas o widget precisa de um valor (trata na UI) # _ss_set("w_data_coleta", _ss_get("w_data_coleta") or date.today()) _ss_set("__prefill_applied__", origem) if debug: st.success(f"DEBUG: prefill aplicado. origem={origem}") # ====================================================== # APP # ====================================================== def main(): layout_padrao( "📋 CONTROLE DE OSM – Inclusões / Exclusões • Visão por linha (D3 a partir das 16h)" ) debug = st.toggle("🔍 Modo debug", value=False) # ====================================================== # EXPANDER • Pré-carregar por Nota Fiscal # ====================================================== with st.expander("🧾 Pré-carregar via Nota Fiscal (clonar como nova entrada)", expanded=False): col1, col2, col3 = st.columns([2, 2, 1.2]) nf_digitado = col1.text_input("Digite a Nota Fiscal", key="nf_lookup") nf_select = col2.selectbox("Ou selecione", [""] + get_notas_fiscais(), key="nf_select") marcar_origem = col3.checkbox("Marcar origem", value=True, key="nf_marcar_origem") c1, c2 = st.columns(2) buscar = c1.button("🔎 Buscar NF", key="btn_buscar_nf") limpar = c2.button("🧹 Limpar pré-preenchimento", key="btn_limpar_prefill") if limpar: # Limpa apenas os w_* e marcadores de prefill/resultados for k in list(st.session_state.keys()): if k.startswith("w_"): del st.session_state[k] st.session_state.pop("form_prefill", None) st.session_state.pop("__prefill_applied__", None) st.session_state.pop("__nf_busca_result__", None) st.session_state.pop("__nf_busca_opts__", None) st.session_state.pop("sel_registro_base", None) st.success("Pré-preenchimento limpo.") st.rerun() if buscar: nf = (nf_digitado or nf_select or "").strip() if not nf: st.warning("Informe ou selecione uma Nota Fiscal.") else: db = SessionLocal() try: regs = ( db.query(Equipamento) .filter(Equipamento.nota_fiscal == nf) .order_by(Equipamento.id.desc()) .all() ) except Exception as e: regs = [] st.error(f"Erro ao buscar NF: {e}") finally: db.close() if not regs: st.info("Nenhum registro encontrado para a NF informada.") st.session_state.pop("__nf_busca_result__", None) st.session_state.pop("__nf_busca_opts__", None) else: # Persistimos APENAS IDs e um resumo textual (não guardamos objetos SQLAlchemy) result = [ {"id": r.id, "data_coleta": str(r.data_coleta), "fpso": r.fpso or "—", "osm": r.osm or "—"} for r in regs ] opts = [f"ID {x['id']} | {x['data_coleta']} | FPSO {x['fpso']} | OSM {x['osm']}" for x in result] st.session_state["__nf_busca_result__"] = result st.session_state["__nf_busca_opts__"] = opts # Reset da seleção anterior st.session_state["sel_registro_base"] = opts[0] if opts else None # ⬇️ Mostrar seleção + botão USAR sempre que houver resultado persistido if st.session_state.get("__nf_busca_opts__"): escolha = st.selectbox( "Selecione o registro base", st.session_state["__nf_busca_opts__"], key="sel_registro_base" ) if st.button("📥 Usar este registro como base", key="btn_usar_base"): try: # Extrai o ID do texto selecionado chosen_id = int(escolha.split()[1]) except Exception: st.error("Falha ao identificar o ID do registro selecionado.") chosen_id = None if chosen_id: # Busca o registro no banco por ID (seguro e stateless) db = SessionLocal() try: base = db.query(Equipamento).filter(Equipamento.id == chosen_id).first() except Exception as e: base = None st.error(f"Erro ao buscar o registro por ID: {e}") finally: db.close() if not base: st.error("Registro não encontrado. Tente buscar novamente.") else: pre = _build_prefill_from_record(base) if marcar_origem: pre["observacoes"] = (pre.get("observacoes") or "") + \ f" [Clonado do ID {base.id} em {datetime.now():%d/%m/%Y %H:%M}]" st.session_state["form_prefill"] = pre st.session_state.pop("__prefill_applied__", None) st.success("Pré-preenchimento carregado! Role até o formulário para revisar.") st.rerun() # ====================================================== # APLICAR PREFILL (antes de criar os widgets) # ====================================================== prefill = _ss_get("form_prefill", {}) _apply_prefill_to_state(prefill, debug=debug) if debug: with st.expander("🔎 DEBUG • session_state (parcial)"): keys = [k for k in st.session_state.keys() if k.startswith("w_") or k in ("form_prefill", "__prefill_applied__", "__nf_busca_result__", "__nf_busca_opts__", "sel_registro_base")] dump = {k: st.session_state[k] for k in sorted(keys)} st.write(dump) # ====================================================== # FORMULÁRIO (SEM st.form) # ====================================================== # -------- FPSO -------- st.subheader("🚢 Identificação FPSO") c1, c2 = st.columns(2) lista_fpso1 = get_fpsos1() + ["➕ Novo FPSO1"] lista_fpso = get_fpsos() + ["➕ Novo FPSO"] # Se valor não está na lista, muda para "➕ Novo …" e mantém texto if _ss_get("w_fpso1", "") not in lista_fpso1: _ss_set("w_fpso1", "➕ Novo FPSO1" if _ss_get("w_fpso1_text") else "") if _ss_get("w_fpso", "") not in lista_fpso: _ss_set("w_fpso", "➕ Novo FPSO" if _ss_get("w_fpso_text") else "") with c1: st.selectbox( "FPSO1 *", lista_fpso1, index=_safe_index(lista_fpso1, _ss_get("w_fpso1", "")), key="w_fpso1" ) if _ss_get("w_fpso1") == "➕ Novo FPSO1": st.text_input("Digite o novo FPSO1 *", key="w_fpso1_text") fpso1 = _ss_get("w_fpso1_text", "").strip() else: _ss_set("w_fpso1_text", _ss_get("w_fpso1")) fpso1 = _ss_get("w_fpso1") with c2: st.selectbox( "FPSO *", lista_fpso, index=_safe_index(lista_fpso, _ss_get("w_fpso", "")), key="w_fpso" ) if _ss_get("w_fpso") == "➕ Novo FPSO": st.text_input("Digite o novo FPSO *", key="w_fpso_text") fpso = _ss_get("w_fpso_text", "").strip() else: _ss_set("w_fpso_text", _ss_get("w_fpso")) fpso = _ss_get("w_fpso") st.divider() # -------- Dados Operacionais -------- st.subheader("📦 Dados Operacionais") c1, c2, c3 = st.columns(3) with c1: # 🟢 Usa valor do estado; se vier None, o widget ainda precisa de um date válido. st.date_input( "Data de Coleta na ARM *", value=_ss_get("w_data_coleta") or date.today(), key="w_data_coleta" ) st.text_input("Especialista Responsável *", key="w_especialista") st.text_input("Conferente Responsável *", key="w_conferente") st.text_input("OSM *", key="w_osm") with c2: st.selectbox("Modal *", MODAL_LISTA, index=_safe_index(MODAL_LISTA, _ss_get("w_modal", "")), key="w_modal") st.number_input("Quantidade de Equipamentos *", min_value=0, value=_ss_get("w_quant_equip", 0), key="w_quant_equip") st.text_input("MROB *", key="w_mrob") with c3: st.number_input("Total de Linhas OSM", min_value=0, value=_ss_get("w_linhas_osm", 0), key="w_linhas_osm") st.number_input("Total de Linhas MROB", min_value=0, value=_ss_get("w_linhas_mrob", 0), key="w_linhas_mrob") st.number_input("Total de Linhas com Erro", min_value=0, value=_ss_get("w_linhas_erros", 0), key="w_linhas_erros") st.divider() # -------- Análise de Erros -------- st.subheader("⚠️ Análise de Erros") c1, c2, c3, c4 = st.columns(4) c1.selectbox("Storekeeper", RESP_ERRO_LISTA, index=_safe_index(RESP_ERRO_LISTA, _ss_get("w_erro_storekeeper", "")), key="w_erro_storekeeper") c2.selectbox("Operação WH", RESP_ERRO_LISTA, index=_safe_index(RESP_ERRO_LISTA, _ss_get("w_erro_operacao", "")), key="w_erro_operacao") c3.selectbox("Especialista WH", RESP_ERRO_LISTA, index=_safe_index(RESP_ERRO_LISTA, _ss_get("w_erro_especialista", "")), key="w_erro_especialista") c4.selectbox("Outros", RESP_ERRO_LISTA, index=_safe_index(RESP_ERRO_LISTA, _ss_get("w_erro_outros", "")), key="w_erro_outros") st.selectbox( "Inclusão / Exclusão", INCLUSAO_EXCLUSAO_LISTA, index=_safe_index(INCLUSAO_EXCLUSAO_LISTA, _ss_get("w_inclusao_exclusao", "")), key="w_inclusao_exclusao" ) st.divider() # -------- Dados Administrativos -------- st.subheader("🧾 Dados Administrativos") ca, cb, cc = st.columns(3) with ca: st.text_input("PO", key="w_po") st.text_input("Part Number", key="w_part_number") with cb: st.text_input("Material", key="w_material") st.text_input("Nota Fiscal *", key="w_nota_fiscal") with cc: st.text_input("Solicitante *", key="w_solicitante") st.text_input("Requisitante *", key="w_requisitante") st.text_input("Impacto *", key="w_impacto") st.text_input("Dimensão *", key="w_dimensao") st.text_input("Motivo da Inclusão / Exclusão", key="w_motivo") st.text_area("Observações", key="w_observacoes", height=120) # -------- Dia D -------- st.subheader("🗓️ Dia de Inclusão (D)") st.selectbox( "Selecione o dia", D_LISTA, index=_safe_index(D_LISTA, _ss_get("w_dia_inclusao", "")), key="w_dia_inclusao" ) # ====================================================== # SALVAR (sempre nova entrada) # ====================================================== if st.button("💾 Salvar Registro", type="primary"): # Lê valores atuais do session_state data_coleta = _ss_get("w_data_coleta") especialista = _ss_get("w_especialista", "").strip() conferente = _ss_get("w_conferente", "").strip() osm = _ss_get("w_osm", "").strip() modal = _ss_get("w_modal", "") quant_equip = int(_ss_get("w_quant_equip", 0) or 0) mrob = _ss_get("w_mrob", "").strip() linhas_osm = int(_ss_get("w_linhas_osm", 0) or 0) linhas_mrob = int(_ss_get("w_linhas_mrob", 0) or 0) linhas_erros = int(_ss_get("w_linhas_erros", 0) or 0) erro_storekeeper = _ss_get("w_erro_storekeeper", "") erro_operacao = _ss_get("w_erro_operacao", "") erro_especialista = _ss_get("w_erro_especialista", "") erro_outros = _ss_get("w_erro_outros", "") inclusao_exclusao = _ss_get("w_inclusao_exclusao", "") po = _ss_get("w_po", "").strip() part_number = _ss_get("w_part_number", "").strip() material = _ss_get("w_material", "").strip() solicitante = _ss_get("w_solicitante", "").strip() requisitante = _ss_get("w_requisitante", "").strip() nota_fiscal = _ss_get("w_nota_fiscal", "").strip() impacto = _ss_get("w_impacto", "").strip() dimensao = _ss_get("w_dimensao", "").strip() motivo = _ss_get("w_motivo", "").strip() observacoes = _ss_get("w_observacoes", "").strip() dia_inclusao = _ss_get("w_dia_inclusao", "") # Campos FPSO/FPSO1 fpso1 = _ss_get("w_fpso1_text", "").strip() if _ss_get("w_fpso1") == "➕ Novo FPSO1" else _ss_get("w_fpso1", "").strip() fpso = _ss_get("w_fpso_text", "").strip() if _ss_get("w_fpso") == "➕ Novo FPSO" else _ss_get("w_fpso", "").strip() obrigatorios = { "FPSO1": fpso1, "FPSO": fpso, "Especialista": especialista, "Conferente": conferente, "OSM": osm, "Modal": modal, "MROB": mrob, "Solicitante": solicitante, "Requisitante": requisitante, "Nota Fiscal": nota_fiscal, "Impacto": impacto, "Dimensão": dimensao, "Dia de Inclusão (D)": dia_inclusao, } faltantes = [k for k, v in obrigatorios.items() if not v] if faltantes: st.error("❌ Campos obrigatórios não preenchidos: " + ", ".join(faltantes)) return db = SessionLocal() try: novo = Equipamento( fpso1=fpso1, fpso=fpso, data_coleta=data_coleta, especialista=especialista, conferente=conferente, osm=osm, modal=modal, quant_equip=quant_equip, mrob=mrob, linhas_osm=linhas_osm, linhas_mrob=linhas_mrob, linhas_erros=linhas_erros, erro_storekeeper=erro_storekeeper, erro_operacao=erro_operacao, erro_especialista=erro_especialista, erro_outros=erro_outros, inclusao_exclusao=inclusao_exclusao, po=po, part_number=part_number, material=material, solicitante=solicitante, motivo=motivo, requisitante=requisitante, nota_fiscal=nota_fiscal, impacto=impacto, dimensao=dimensao, observacoes=observacoes, dia_inclusao=dia_inclusao, data_hora_input=datetime.now(), ) db.add(novo) db.commit() registrar_log( usuario=_ss_get("usuario", "desconhecido"), acao="INSERIR", tabela="equipamentos", registro_id=novo.id ) st.success("✅ Registro salvo com sucesso!") st.rerun() except Exception as e: db.rollback() st.error(f"❌ Erro ao salvar: {e}") finally: db.close() if __name__ == "__main__": main()