# -*- coding: utf-8 -*- """ importar_excel.py Importa planilhas Excel para a tabela Equipamento, com: - Geração de modelo Excel - Upload e pré-visualização - Verificação/edição de duplicidades - Conversões seguras de tipos (datas/números/strings) - Gravação transacional + auditoria resiliente Compatível com Linux (Spaces) e local/Windows. """ import streamlit as st import pandas as pd from io import BytesIO from datetime import datetime, date from banco import SessionLocal from models import Equipamento from utils_auditoria import registrar_log # ========================================= # Configuração – nomes de colunas esperadas # ========================================= COLUNAS_ESPERADAS = [ "fpso1", "fpso", "data_coleta", "especialista", "conferente", "osm", "modal", "quant_equip", "mrob", "linhas_osm", "linhas_mrob", "linhas_erros", "erro_storekeeper", "erro_operacao", "erro_especialista", "erro_outros", "inclusao_exclusao", "po", "part_number", "material", "solicitante", "motivo", "requisitante", "nota_fiscal", "impacto", "dimensao", "observacoes", "dia_inclusao", ] # Colunas que tratamos como números inteiros na gravação COLS_INTEIRAS = { "quant_equip", "linhas_osm", "linhas_mrob", "linhas_erros" } # ===================================================== # FUNÇÕES AUXILIARES # ===================================================== def _normalize_cols_case_insensitive(df: pd.DataFrame) -> pd.DataFrame: """ Mapeia colunas do arquivo (case/espaços) para os nomes esperados em COLUNAS_ESPERADAS. Ex.: " FPSo " -> "fpso"; "DATA_COLETA" -> "data_coleta". Colunas não reconhecidas permanecem como estão. """ if df is None or df.empty: return df # prioriza mapeamento por lower(strip) existing = list(df.columns) lower_map = {c.strip().lower(): c for c in existing} new_names = {} for wanted in COLUNAS_ESPERADAS: key = wanted.lower() if key in lower_map: new_names[lower_map[key]] = wanted # renomeia p/ o nome "oficial" esperado # aplica renomeação try: df = df.rename(columns=new_names) except Exception: pass return df def to_date(value): """ Converte para date: - pandas.Timestamp/datetime/date -> date - string em formatos 'YYYY-MM-DD' ou 'DD/MM/YYYY' - retorna None se não puder converter Necessário para compatibilidade com SQLite e outros. """ if value is None or (isinstance(value, float) and pd.isna(value)) or pd.isna(value): return None if isinstance(value, pd.Timestamp): return value.date() if isinstance(value, datetime): return value.date() if isinstance(value, date): return value if isinstance(value, str): s = value.strip() if not s: return None # tenta ISO try: return datetime.strptime(s, "%Y-%m-%d").date() except Exception: pass # tenta BR try: return datetime.strptime(s, "%d/%m/%Y").date() except Exception: pass return None def safe_value(value): """ Retorna 0 se o valor for vazio/NaN; senão o próprio valor. Usado para campos "livres" que não podem ir nulos. """ if value is None or (isinstance(value, float) and pd.isna(value)) or pd.isna(value): return 0 return value def safe_int(value) -> int: """ Converte para int com fallback (None/NaN/erro -> 0). """ try: if value is None or pd.isna(value): return 0 # strings vazias if isinstance(value, str) and value.strip() == "": return 0 return int(float(value)) except Exception: return 0 def safe_str(value) -> str: """ Converte para string segura (None/NaN -> ""). """ if value is None or (isinstance(value, float) and pd.isna(value)) or pd.isna(value): return "" return str(value) def _df_template() -> pd.DataFrame: """ Retorna um DataFrame vazio com as colunas esperadas (para gerar o modelo). """ return pd.DataFrame(columns=COLUNAS_ESPERADAS) def _download_modelo_excel() -> BytesIO: """ Gera um arquivo Excel de modelo (aba MODELO) e retorna buffer. """ df = _df_template() buf = BytesIO() with pd.ExcelWriter(buf, engine="openpyxl") as writer: df.to_excel(writer, index=False, sheet_name="MODELO") buf.seek(0) return buf def _validate_required_cols(df: pd.DataFrame) -> list: """ Retorna a lista de colunas faltantes em relação a COLUNAS_ESPERADAS. Apenas informa; a importação pode prosseguir mesmo faltando algumas (desde que sua tabela/ETL aceite nulos). """ missing = [c for c in COLUNAS_ESPERADAS if c not in df.columns] return missing # ===================================================== # MÓDULO PRINCIPAL # ===================================================== def main(): st.title("📥 Importação de Dados via Excel") st.markdown( """ Este módulo permite: - 📄 Baixar um **modelo Excel padrão** - ✍️ Preencher os dados offline - 🔍 Validar antes da gravação - 💾 Importar os registros para o banco """ ) # ===================================================== # 1️⃣ GERAR MODELO EXCEL # ===================================================== st.subheader("📄 Baixar modelo Excel") buffer = _download_modelo_excel() st.download_button( label="⬇️ Baixar modelo Excel", data=buffer, file_name="modelo_importacao_load.xlsx", mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", ) st.divider() # ===================================================== # 2️⃣ UPLOAD DO ARQUIVO # ===================================================== st.subheader("📤 Importar arquivo preenchido") arquivo = st.file_uploader( "Selecione o arquivo Excel (.xlsx)", type=["xlsx"] ) if not arquivo: st.info("📌 Faça o upload de um arquivo para continuar.") return # Leitura com engine explícito para maior compatibilidade try: df = pd.read_excel(arquivo, engine="openpyxl") except Exception as e: st.error(f"❌ Erro ao ler o arquivo: {e}") return if df is None or df.empty: st.error("❌ O arquivo não possui dados (planilha vazia).") return # Normaliza colunas (case/espaços) para nomes esperados df = _normalize_cols_case_insensitive(df) # Garante um identificador estável para cada linha durante as edições if "_row_id" not in df.columns: df["_row_id"] = range(len(df)) # Salva o DF bruto na sessão para persistência entre reruns st.session_state["df_raw"] = df.copy() # Diagnóstico de colunas faltantes = _validate_required_cols(df) if faltantes: st.warning( "⚠️ Algumas colunas esperadas **não** foram encontradas no arquivo (" + ", ".join(faltantes) + "). " "Se esses campos forem obrigatórios em seu banco, a importação pode falhar." ) st.success("✅ Arquivo carregado com sucesso!") # ===================================================== # 3️⃣ PRÉVIA DOS DADOS # ===================================================== st.subheader("🔍 Prévia dos dados (arquivo lido)") st.dataframe(df, use_container_width=True) # ===================================================== # 4️⃣ VERIFICAÇÃO DE DUPLICIDADE (com seleção de linhas a excluir) # ===================================================== st.subheader("🧪 Verificação de duplicidade") st.caption( "Escolha as colunas que definem a duplicidade. " "Em seguida, marque o checkbox **_excluir** nas linhas que **não** deseja importar." ) # Sugerimos um conjunto padrão de chaves, mas só usamos as que existem no arquivo sugestao_chaves = [c for c in ["fpso", "osm", "po", "part_number", "nota_fiscal", "data_coleta"] if c in df.columns] chaves = st.multiselect( "📌 Colunas para verificação de duplicidade:", options=list(df.columns), default=sugestao_chaves if len(sugestao_chaves) > 0 else [] ) # Prepara um DF de trabalho com colunas auxiliares work_df = df.copy() work_df["_duplicado"] = False work_df["_excluir"] = False # será editado pelo usuário if len(chaves) == 0: st.info("Selecione ao menos **uma** coluna para verificar duplicidade.") # Mesmo sem duplicidade definida, permitimos marcar exclusões manuais: with st.expander("🔧 (Opcional) Excluir linhas manualmente mesmo sem duplicidade"): manual_view = work_df.set_index("_row_id")[ [c for c in work_df.columns if c not in ["_duplicado"]] + ["_excluir"] ] edited_manual = st.data_editor( manual_view, use_container_width=True, num_rows="fixed", column_config={ "_excluir": st.column_config.CheckboxColumn("Excluir da importação") } ) # Aplica exclusões manuais work_df = work_df.set_index("_row_id") work_df["_excluir"] = edited_manual["_excluir"].reindex(work_df.index).fillna(False).astype(bool) work_df = work_df.reset_index() else: # Marca duplicadas com base nas chaves selecionadas mask_dup_any = work_df.duplicated(subset=chaves, keep=False) work_df["_duplicado"] = mask_dup_any # Sugerimos exclusão automática das ocorrências não-primárias (o usuário pode alterar) mask_dup_not_first = work_df.duplicated(subset=chaves, keep="first") work_df.loc[mask_dup_not_first, "_excluir"] = True if mask_dup_any.any(): st.warning("⚠️ Foram encontradas linhas duplicadas com base nas chaves selecionadas:") # Mostrar somente as duplicadas para facilitar a decisão cols_para_mostrar = chaves + [c for c in ["_duplicado", "_excluir"] if c not in chaves] # Evita colunas repetidas mantendo ordem seen = set() cols_para_mostrar = [c for c in cols_para_mostrar if not (c in seen or seen.add(c))] dup_view = work_df.loc[mask_dup_any].set_index("_row_id")[cols_para_mostrar] edited_dup = st.data_editor( dup_view, use_container_width=True, num_rows="fixed", column_config={ "_excluir": st.column_config.CheckboxColumn("Excluir da importação"), "_duplicado": st.column_config.CheckboxColumn("Duplicado", disabled=True) } ) # Mescla de volta as escolhas do usuário work_df = work_df.set_index("_row_id") if "_excluir" in edited_dup.columns: work_df.loc[edited_dup.index, "_excluir"] = ( edited_dup["_excluir"].reindex(work_df.index).fillna(work_df["_excluir"]).astype(bool) ) work_df = work_df.reset_index() st.info( f"📊 Totais — Linhas: {len(work_df)} | Duplicadas: {mask_dup_any.sum()} | " f"Marcadas para excluir: {int(work_df['_excluir'].sum())}" ) else: st.success("✅ Nenhuma duplicidade encontrada com as chaves selecionadas.") st.divider() # ===================================================== # 4.1️⃣ Prévia final do que será importado + download # ===================================================== df_para_importar = work_df[~work_df["_excluir"]].drop(columns=["_duplicado", "_excluir"], errors="ignore") st.session_state["df_para_importar"] = df_para_importar.copy() st.subheader("🧾 Prévia do que será importado") st.caption("A prévia abaixo desconsidera as linhas marcadas como **_excluir**.") st.dataframe(df_para_importar.drop(columns=["_row_id"], errors="ignore"), use_container_width=True) # Download da prévia para conferência buf_prev = BytesIO() with pd.ExcelWriter(buf_prev, engine="openpyxl") as writer: df_para_importar.drop(columns=["_row_id"], errors="ignore").to_excel(writer, index=False, sheet_name="A_IMPORTAR") buf_prev.seek(0) st.download_button( "⬇️ Baixar prévia (Excel)", data=buf_prev, file_name="previa_a_importar.xlsx", mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", help="Baixe a prévia do conjunto que será gravado." ) # ===================================================== # 5️⃣ GRAVAÇÃO NO BANCO (usa o DataFrame filtrado) # ===================================================== st.subheader("💾 Gravar dados no banco") col1, col2 = st.columns(2) if col1.button("💾 Salvar registros importados"): df_import = st.session_state.get("df_para_importar", df) if df_import.empty: st.error("Não há registros para importar. Revise as exclusões.") return with SessionLocal() as db: try: for _, row in df_import.iterrows(): registro = Equipamento( fpso1=safe_str(row.get("fpso1")), fpso=safe_str(row.get("fpso")), data_coleta=to_date(row.get("data_coleta")), especialista=safe_str(row.get("especialista")), conferente=safe_str(row.get("conferente")), osm=safe_str(row.get("osm")), modal=safe_str(row.get("modal")), quant_equip=safe_int(row.get("quant_equip")), mrob=safe_str(row.get("mrob")), linhas_osm=safe_int(row.get("linhas_osm")), linhas_mrob=safe_int(row.get("linhas_mrob")), linhas_erros=safe_int(row.get("linhas_erros")), erro_storekeeper=safe_str(row.get("erro_storekeeper")), erro_operacao=safe_str(row.get("erro_operacao")), erro_especialista=safe_str(row.get("erro_especialista")), erro_outros=safe_str(row.get("erro_outros")), inclusao_exclusao=safe_str(row.get("inclusao_exclusao")), po=safe_str(row.get("po")), part_number=safe_str(row.get("part_number")), material=safe_str(row.get("material")), solicitante=safe_str(row.get("solicitante")), motivo=safe_str(row.get("motivo")), requisitante=safe_str(row.get("requisitante")), nota_fiscal=safe_str(row.get("nota_fiscal")), impacto=safe_str(row.get("impacto")), dimensao=safe_str(row.get("dimensao")), observacoes=safe_str(row.get("observacoes")), dia_inclusao=safe_str(row.get("dia_inclusao")), ) db.add(registro) db.commit() try: registrar_log( usuario=st.session_state.get("usuario"), acao=f"IMPORTAÇÃO EXCEL ({len(df_import)} registros) - com filtro de duplicidade", tabela="equipamentos", registro_id=None ) except Exception: # Não quebra o fluxo se auditoria falhar pass st.success(f"🎉 Importação concluída com sucesso! {len(df_import)} registros gravados.") except Exception as e: db.rollback() st.error(f"❌ Erro ao gravar no banco: {e}") if col2.button("❌ Cancelar importação"): st.warning("Importação cancelada pelo usuário.") if __name__ == "__main__": main()