Spaces:
Running
Running
| # -*- coding: utf-8 -*- | |
| import streamlit as st | |
| from datetime import datetime, date | |
| from banco import SessionLocal | |
| from models import Equipamento | |
| from sqlalchemy import distinct | |
| from log import registrar_log | |
| # ====================================================== | |
| # LISTAS FIXAS | |
| # ====================================================== | |
| MODAL_LISTA = ["", "AÉREO", "MARÍTIMO", "EXPRESSO"] | |
| INCLUSAO_EXCLUSAO_LISTA = ["", "INCLUSÃO", "EXCLUSÃO"] | |
| RESP_ERRO_LISTA = ["", "Sim", "Não"] | |
| D_LISTA = ["", "D1", "D2", "D3"] | |
| # ====================================================== | |
| # LAYOUT | |
| # ====================================================== | |
| def layout_padrao(titulo: str): | |
| st.title(titulo) | |
| st.divider() | |
| # ====================================================== | |
| # UTILS | |
| # ====================================================== | |
| def _safe_index(options, value, default=0): | |
| try: | |
| return options.index(value) | |
| except Exception: | |
| return default | |
| def _ss_get(key, default=None): | |
| return st.session_state.get(key, default) | |
| def _ss_set(key, value): | |
| st.session_state[key] = value | |
| # 🟢 Parser de datas robusto (aceita date, datetime e string) | |
| def _parse_date_any(v): | |
| if isinstance(v, date): | |
| return v | |
| if isinstance(v, datetime): | |
| return v.date() | |
| if isinstance(v, str): | |
| s = v.strip() | |
| # ISO: YYYY-MM-DD | |
| try: | |
| return date.fromisoformat(s) | |
| except Exception: | |
| pass | |
| # BR: DD/MM/YYYY | |
| try: | |
| return datetime.strptime(s, "%d/%m/%Y").date() | |
| except Exception: | |
| pass | |
| # YYYY/MM/DD (alguns DBs/ETLs geram assim) | |
| try: | |
| return datetime.strptime(s, "%Y/%m/%d").date() | |
| except Exception: | |
| pass | |
| return None | |
| def _build_prefill_from_record(r: Equipamento) -> dict: | |
| # 🟢 Usa parser que converte string->date quando possível | |
| dc = _parse_date_any(getattr(r, "data_coleta", None)) | |
| return { | |
| "fpso1": r.fpso1 or "", | |
| "fpso": r.fpso or "", | |
| "data_coleta": dc, # 🟢 | |
| "especialista": r.especialista or "", | |
| "conferente": r.conferente or "", | |
| "osm": r.osm or "", | |
| "modal": r.modal or "", | |
| "quant_equip": int(r.quant_equip or 0), | |
| "mrob": r.mrob or "", | |
| "linhas_osm": int(r.linhas_osm or 0), | |
| "linhas_mrob": int(r.linhas_mrob or 0), | |
| "linhas_erros": int(r.linhas_erros or 0), | |
| "erro_storekeeper": r.erro_storekeeper or "", | |
| "erro_operacao": r.erro_operacao or "", | |
| "erro_especialista": r.erro_especialista or "", | |
| "erro_outros": r.erro_outros or "", | |
| "inclusao_exclusao": r.inclusao_exclusao or "", | |
| "po": r.po or "", | |
| "part_number": r.part_number or "", | |
| "material": r.material or "", | |
| "solicitante": r.solicitante or "", | |
| "requisitante": r.requisitante or "", | |
| "nota_fiscal": r.nota_fiscal or "", | |
| "impacto": r.impacto or "", | |
| "dimensao": r.dimensao or "", | |
| "motivo": getattr(r, "motivo", "") or "", | |
| "observacoes": r.observacoes or "", | |
| "dia_inclusao": r.dia_inclusao or "", | |
| "_origem_id": r.id, | |
| } | |
| def get_distinct(_campo): | |
| db = SessionLocal() | |
| try: | |
| rows = db.query(distinct(_campo)).filter(_campo.isnot(None)).filter(_campo != "").all() | |
| return sorted([r[0] for r in rows]) | |
| finally: | |
| db.close() | |
| def get_fpsos(): | |
| return [""] + get_distinct(Equipamento.fpso) | |
| def get_fpsos1(): | |
| return [""] + get_distinct(Equipamento.fpso1) | |
| def get_notas_fiscais(): | |
| return sorted([str(x) for x in get_distinct(Equipamento.nota_fiscal)]) | |
| def _apply_prefill_to_state(prefill: dict, debug=False): | |
| """ | |
| Aplica o prefill uma única vez por origem (ID), antes de criar widgets. | |
| Se o valor não existir nas listas, move para o campo '➕ Novo ...' + text_input. | |
| """ | |
| if not prefill: | |
| if debug: st.info("DEBUG: prefill vazio. Nada a aplicar.") | |
| return | |
| origem = prefill.get("_origem_id") | |
| if _ss_get("__prefill_applied__") == origem: | |
| if debug: st.info(f"DEBUG: prefill já aplicado para origem {origem}.") | |
| return | |
| # Carrega todos os campos no session_state como w_* | |
| for k, v in prefill.items(): | |
| _ss_set(f"w_{k}", v) | |
| # Backups para os inputs de texto dos "➕ Novo ..." | |
| _ss_set("w_fpso1_text", prefill.get("fpso1", "")) | |
| _ss_set("w_fpso_text", prefill.get("fpso", "")) | |
| # Normalização para selects fixos | |
| def _norm(key, allowed): | |
| v = _ss_get(key, "") | |
| if v not in allowed: | |
| _ss_set(key, allowed[0]) | |
| _norm("w_modal", MODAL_LISTA) | |
| _norm("w_erro_storekeeper", RESP_ERRO_LISTA) | |
| _norm("w_erro_operacao", RESP_ERRO_LISTA) | |
| _norm("w_erro_especialista", RESP_ERRO_LISTA) | |
| _norm("w_erro_outros", RESP_ERRO_LISTA) | |
| _norm("w_inclusao_exclusao", INCLUSAO_EXCLUSAO_LISTA) | |
| _norm("w_dia_inclusao", D_LISTA) | |
| # 🟢 Tipos/valores coerentes | |
| _ss_set("w_quant_equip", int(_ss_get("w_quant_equip", 0) or 0)) | |
| _ss_set("w_linhas_osm", int(_ss_get("w_linhas_osm", 0) or 0)) | |
| _ss_set("w_linhas_mrob", int(_ss_get("w_linhas_mrob", 0) or 0)) | |
| _ss_set("w_linhas_erros", int(_ss_get("w_linhas_erros", 0) or 0)) | |
| # 🟢 NÃO força “hoje” se vier None — mas o widget precisa de um valor (trata na UI) | |
| # _ss_set("w_data_coleta", _ss_get("w_data_coleta") or date.today()) | |
| _ss_set("__prefill_applied__", origem) | |
| if debug: st.success(f"DEBUG: prefill aplicado. origem={origem}") | |
| # ====================================================== | |
| # APP | |
| # ====================================================== | |
| def main(): | |
| layout_padrao( | |
| "📋 CONTROLE DE OSM – Inclusões / Exclusões • Visão por linha (D3 a partir das 16h)" | |
| ) | |
| debug = st.toggle("🔍 Modo debug", value=False) | |
| # ====================================================== | |
| # EXPANDER • Pré-carregar por Nota Fiscal | |
| # ====================================================== | |
| with st.expander("🧾 Pré-carregar via Nota Fiscal (clonar como nova entrada)", expanded=False): | |
| col1, col2, col3 = st.columns([2, 2, 1.2]) | |
| nf_digitado = col1.text_input("Digite a Nota Fiscal", key="nf_lookup") | |
| nf_select = col2.selectbox("Ou selecione", [""] + get_notas_fiscais(), key="nf_select") | |
| marcar_origem = col3.checkbox("Marcar origem", value=True, key="nf_marcar_origem") | |
| c1, c2 = st.columns(2) | |
| buscar = c1.button("🔎 Buscar NF", key="btn_buscar_nf") | |
| limpar = c2.button("🧹 Limpar pré-preenchimento", key="btn_limpar_prefill") | |
| if limpar: | |
| # Limpa apenas os w_* e marcadores de prefill/resultados | |
| for k in list(st.session_state.keys()): | |
| if k.startswith("w_"): | |
| del st.session_state[k] | |
| st.session_state.pop("form_prefill", None) | |
| st.session_state.pop("__prefill_applied__", None) | |
| st.session_state.pop("__nf_busca_result__", None) | |
| st.session_state.pop("__nf_busca_opts__", None) | |
| st.session_state.pop("sel_registro_base", None) | |
| st.success("Pré-preenchimento limpo.") | |
| st.rerun() | |
| if buscar: | |
| nf = (nf_digitado or nf_select or "").strip() | |
| if not nf: | |
| st.warning("Informe ou selecione uma Nota Fiscal.") | |
| else: | |
| db = SessionLocal() | |
| try: | |
| regs = ( | |
| db.query(Equipamento) | |
| .filter(Equipamento.nota_fiscal == nf) | |
| .order_by(Equipamento.id.desc()) | |
| .all() | |
| ) | |
| except Exception as e: | |
| regs = [] | |
| st.error(f"Erro ao buscar NF: {e}") | |
| finally: | |
| db.close() | |
| if not regs: | |
| st.info("Nenhum registro encontrado para a NF informada.") | |
| st.session_state.pop("__nf_busca_result__", None) | |
| st.session_state.pop("__nf_busca_opts__", None) | |
| else: | |
| # Persistimos APENAS IDs e um resumo textual (não guardamos objetos SQLAlchemy) | |
| result = [ | |
| {"id": r.id, "data_coleta": str(r.data_coleta), "fpso": r.fpso or "—", "osm": r.osm or "—"} | |
| for r in regs | |
| ] | |
| opts = [f"ID {x['id']} | {x['data_coleta']} | FPSO {x['fpso']} | OSM {x['osm']}" for x in result] | |
| st.session_state["__nf_busca_result__"] = result | |
| st.session_state["__nf_busca_opts__"] = opts | |
| # Reset da seleção anterior | |
| st.session_state["sel_registro_base"] = opts[0] if opts else None | |
| # ⬇️ Mostrar seleção + botão USAR sempre que houver resultado persistido | |
| if st.session_state.get("__nf_busca_opts__"): | |
| escolha = st.selectbox( | |
| "Selecione o registro base", | |
| st.session_state["__nf_busca_opts__"], | |
| key="sel_registro_base" | |
| ) | |
| if st.button("📥 Usar este registro como base", key="btn_usar_base"): | |
| try: | |
| # Extrai o ID do texto selecionado | |
| chosen_id = int(escolha.split()[1]) | |
| except Exception: | |
| st.error("Falha ao identificar o ID do registro selecionado.") | |
| chosen_id = None | |
| if chosen_id: | |
| # Busca o registro no banco por ID (seguro e stateless) | |
| db = SessionLocal() | |
| try: | |
| base = db.query(Equipamento).filter(Equipamento.id == chosen_id).first() | |
| except Exception as e: | |
| base = None | |
| st.error(f"Erro ao buscar o registro por ID: {e}") | |
| finally: | |
| db.close() | |
| if not base: | |
| st.error("Registro não encontrado. Tente buscar novamente.") | |
| else: | |
| pre = _build_prefill_from_record(base) | |
| if marcar_origem: | |
| pre["observacoes"] = (pre.get("observacoes") or "") + \ | |
| f" [Clonado do ID {base.id} em {datetime.now():%d/%m/%Y %H:%M}]" | |
| st.session_state["form_prefill"] = pre | |
| st.session_state.pop("__prefill_applied__", None) | |
| st.success("Pré-preenchimento carregado! Role até o formulário para revisar.") | |
| st.rerun() | |
| # ====================================================== | |
| # APLICAR PREFILL (antes de criar os widgets) | |
| # ====================================================== | |
| prefill = _ss_get("form_prefill", {}) | |
| _apply_prefill_to_state(prefill, debug=debug) | |
| if debug: | |
| with st.expander("🔎 DEBUG • session_state (parcial)"): | |
| keys = [k for k in st.session_state.keys() if k.startswith("w_") or k in ("form_prefill", "__prefill_applied__", "__nf_busca_result__", "__nf_busca_opts__", "sel_registro_base")] | |
| dump = {k: st.session_state[k] for k in sorted(keys)} | |
| st.write(dump) | |
| # ====================================================== | |
| # FORMULÁRIO (SEM st.form) | |
| # ====================================================== | |
| # -------- FPSO -------- | |
| st.subheader("🚢 Identificação FPSO") | |
| c1, c2 = st.columns(2) | |
| lista_fpso1 = get_fpsos1() + ["➕ Novo FPSO1"] | |
| lista_fpso = get_fpsos() + ["➕ Novo FPSO"] | |
| # Se valor não está na lista, muda para "➕ Novo …" e mantém texto | |
| if _ss_get("w_fpso1", "") not in lista_fpso1: | |
| _ss_set("w_fpso1", "➕ Novo FPSO1" if _ss_get("w_fpso1_text") else "") | |
| if _ss_get("w_fpso", "") not in lista_fpso: | |
| _ss_set("w_fpso", "➕ Novo FPSO" if _ss_get("w_fpso_text") else "") | |
| with c1: | |
| st.selectbox( | |
| "FPSO1 *", | |
| lista_fpso1, | |
| index=_safe_index(lista_fpso1, _ss_get("w_fpso1", "")), | |
| key="w_fpso1" | |
| ) | |
| if _ss_get("w_fpso1") == "➕ Novo FPSO1": | |
| st.text_input("Digite o novo FPSO1 *", key="w_fpso1_text") | |
| fpso1 = _ss_get("w_fpso1_text", "").strip() | |
| else: | |
| _ss_set("w_fpso1_text", _ss_get("w_fpso1")) | |
| fpso1 = _ss_get("w_fpso1") | |
| with c2: | |
| st.selectbox( | |
| "FPSO *", | |
| lista_fpso, | |
| index=_safe_index(lista_fpso, _ss_get("w_fpso", "")), | |
| key="w_fpso" | |
| ) | |
| if _ss_get("w_fpso") == "➕ Novo FPSO": | |
| st.text_input("Digite o novo FPSO *", key="w_fpso_text") | |
| fpso = _ss_get("w_fpso_text", "").strip() | |
| else: | |
| _ss_set("w_fpso_text", _ss_get("w_fpso")) | |
| fpso = _ss_get("w_fpso") | |
| st.divider() | |
| # -------- Dados Operacionais -------- | |
| st.subheader("📦 Dados Operacionais") | |
| c1, c2, c3 = st.columns(3) | |
| with c1: | |
| # 🟢 Usa valor do estado; se vier None, o widget ainda precisa de um date válido. | |
| st.date_input( | |
| "Data de Coleta na ARM *", | |
| value=_ss_get("w_data_coleta") or date.today(), | |
| key="w_data_coleta" | |
| ) | |
| st.text_input("Especialista Responsável *", key="w_especialista") | |
| st.text_input("Conferente Responsável *", key="w_conferente") | |
| st.text_input("OSM *", key="w_osm") | |
| with c2: | |
| st.selectbox("Modal *", MODAL_LISTA, | |
| index=_safe_index(MODAL_LISTA, _ss_get("w_modal", "")), | |
| key="w_modal") | |
| st.number_input("Quantidade de Equipamentos *", min_value=0, value=_ss_get("w_quant_equip", 0), key="w_quant_equip") | |
| st.text_input("MROB *", key="w_mrob") | |
| with c3: | |
| st.number_input("Total de Linhas OSM", min_value=0, value=_ss_get("w_linhas_osm", 0), key="w_linhas_osm") | |
| st.number_input("Total de Linhas MROB", min_value=0, value=_ss_get("w_linhas_mrob", 0), key="w_linhas_mrob") | |
| st.number_input("Total de Linhas com Erro", min_value=0, value=_ss_get("w_linhas_erros", 0), key="w_linhas_erros") | |
| st.divider() | |
| # -------- Análise de Erros -------- | |
| st.subheader("⚠️ Análise de Erros") | |
| c1, c2, c3, c4 = st.columns(4) | |
| c1.selectbox("Storekeeper", RESP_ERRO_LISTA, index=_safe_index(RESP_ERRO_LISTA, _ss_get("w_erro_storekeeper", "")), key="w_erro_storekeeper") | |
| c2.selectbox("Operação WH", RESP_ERRO_LISTA, index=_safe_index(RESP_ERRO_LISTA, _ss_get("w_erro_operacao", "")), key="w_erro_operacao") | |
| c3.selectbox("Especialista WH", RESP_ERRO_LISTA, index=_safe_index(RESP_ERRO_LISTA, _ss_get("w_erro_especialista", "")), key="w_erro_especialista") | |
| c4.selectbox("Outros", RESP_ERRO_LISTA, index=_safe_index(RESP_ERRO_LISTA, _ss_get("w_erro_outros", "")), key="w_erro_outros") | |
| st.selectbox( | |
| "Inclusão / Exclusão", | |
| INCLUSAO_EXCLUSAO_LISTA, | |
| index=_safe_index(INCLUSAO_EXCLUSAO_LISTA, _ss_get("w_inclusao_exclusao", "")), | |
| key="w_inclusao_exclusao" | |
| ) | |
| st.divider() | |
| # -------- Dados Administrativos -------- | |
| st.subheader("🧾 Dados Administrativos") | |
| ca, cb, cc = st.columns(3) | |
| with ca: | |
| st.text_input("PO", key="w_po") | |
| st.text_input("Part Number", key="w_part_number") | |
| with cb: | |
| st.text_input("Material", key="w_material") | |
| st.text_input("Nota Fiscal *", key="w_nota_fiscal") | |
| with cc: | |
| st.text_input("Solicitante *", key="w_solicitante") | |
| st.text_input("Requisitante *", key="w_requisitante") | |
| st.text_input("Impacto *", key="w_impacto") | |
| st.text_input("Dimensão *", key="w_dimensao") | |
| st.text_input("Motivo da Inclusão / Exclusão", key="w_motivo") | |
| st.text_area("Observações", key="w_observacoes", height=120) | |
| # -------- Dia D -------- | |
| st.subheader("🗓️ Dia de Inclusão (D)") | |
| st.selectbox( | |
| "Selecione o dia", | |
| D_LISTA, | |
| index=_safe_index(D_LISTA, _ss_get("w_dia_inclusao", "")), | |
| key="w_dia_inclusao" | |
| ) | |
| # ====================================================== | |
| # SALVAR (sempre nova entrada) | |
| # ====================================================== | |
| if st.button("💾 Salvar Registro", type="primary"): | |
| # Lê valores atuais do session_state | |
| data_coleta = _ss_get("w_data_coleta") | |
| especialista = _ss_get("w_especialista", "").strip() | |
| conferente = _ss_get("w_conferente", "").strip() | |
| osm = _ss_get("w_osm", "").strip() | |
| modal = _ss_get("w_modal", "") | |
| quant_equip = int(_ss_get("w_quant_equip", 0) or 0) | |
| mrob = _ss_get("w_mrob", "").strip() | |
| linhas_osm = int(_ss_get("w_linhas_osm", 0) or 0) | |
| linhas_mrob = int(_ss_get("w_linhas_mrob", 0) or 0) | |
| linhas_erros = int(_ss_get("w_linhas_erros", 0) or 0) | |
| erro_storekeeper = _ss_get("w_erro_storekeeper", "") | |
| erro_operacao = _ss_get("w_erro_operacao", "") | |
| erro_especialista = _ss_get("w_erro_especialista", "") | |
| erro_outros = _ss_get("w_erro_outros", "") | |
| inclusao_exclusao = _ss_get("w_inclusao_exclusao", "") | |
| po = _ss_get("w_po", "").strip() | |
| part_number = _ss_get("w_part_number", "").strip() | |
| material = _ss_get("w_material", "").strip() | |
| solicitante = _ss_get("w_solicitante", "").strip() | |
| requisitante = _ss_get("w_requisitante", "").strip() | |
| nota_fiscal = _ss_get("w_nota_fiscal", "").strip() | |
| impacto = _ss_get("w_impacto", "").strip() | |
| dimensao = _ss_get("w_dimensao", "").strip() | |
| motivo = _ss_get("w_motivo", "").strip() | |
| observacoes = _ss_get("w_observacoes", "").strip() | |
| dia_inclusao = _ss_get("w_dia_inclusao", "") | |
| # Campos FPSO/FPSO1 | |
| fpso1 = _ss_get("w_fpso1_text", "").strip() if _ss_get("w_fpso1") == "➕ Novo FPSO1" else _ss_get("w_fpso1", "").strip() | |
| fpso = _ss_get("w_fpso_text", "").strip() if _ss_get("w_fpso") == "➕ Novo FPSO" else _ss_get("w_fpso", "").strip() | |
| obrigatorios = { | |
| "FPSO1": fpso1, | |
| "FPSO": fpso, | |
| "Especialista": especialista, | |
| "Conferente": conferente, | |
| "OSM": osm, | |
| "Modal": modal, | |
| "MROB": mrob, | |
| "Solicitante": solicitante, | |
| "Requisitante": requisitante, | |
| "Nota Fiscal": nota_fiscal, | |
| "Impacto": impacto, | |
| "Dimensão": dimensao, | |
| "Dia de Inclusão (D)": dia_inclusao, | |
| } | |
| faltantes = [k for k, v in obrigatorios.items() if not v] | |
| if faltantes: | |
| st.error("❌ Campos obrigatórios não preenchidos: " + ", ".join(faltantes)) | |
| return | |
| db = SessionLocal() | |
| try: | |
| novo = Equipamento( | |
| fpso1=fpso1, | |
| fpso=fpso, | |
| data_coleta=data_coleta, | |
| especialista=especialista, | |
| conferente=conferente, | |
| osm=osm, | |
| modal=modal, | |
| quant_equip=quant_equip, | |
| mrob=mrob, | |
| linhas_osm=linhas_osm, | |
| linhas_mrob=linhas_mrob, | |
| linhas_erros=linhas_erros, | |
| erro_storekeeper=erro_storekeeper, | |
| erro_operacao=erro_operacao, | |
| erro_especialista=erro_especialista, | |
| erro_outros=erro_outros, | |
| inclusao_exclusao=inclusao_exclusao, | |
| po=po, | |
| part_number=part_number, | |
| material=material, | |
| solicitante=solicitante, | |
| motivo=motivo, | |
| requisitante=requisitante, | |
| nota_fiscal=nota_fiscal, | |
| impacto=impacto, | |
| dimensao=dimensao, | |
| observacoes=observacoes, | |
| dia_inclusao=dia_inclusao, | |
| data_hora_input=datetime.now(), | |
| ) | |
| db.add(novo) | |
| db.commit() | |
| registrar_log( | |
| usuario=_ss_get("usuario", "desconhecido"), | |
| acao="INSERIR", | |
| tabela="equipamentos", | |
| registro_id=novo.id | |
| ) | |
| st.success("✅ Registro salvo com sucesso!") | |
| st.rerun() | |
| except Exception as e: | |
| db.rollback() | |
| st.error(f"❌ Erro ao salvar: {e}") | |
| finally: | |
| db.close() | |
| if __name__ == "__main__": | |
| main() | |