| | |
| | """ |
| | importar_excel.py |
| | Importa planilhas Excel para a tabela Equipamento, com: |
| | - Geração de modelo Excel |
| | - Upload e pré-visualização |
| | - Verificação/edição de duplicidades |
| | - Conversões seguras de tipos (datas/números/strings) |
| | - Gravação transacional + auditoria resiliente |
| | Compatível com Linux (Spaces) e local/Windows. |
| | """ |
| |
|
| | import streamlit as st |
| | import pandas as pd |
| | from io import BytesIO |
| | from datetime import datetime, date |
| |
|
| | from banco import SessionLocal |
| | from models import Equipamento |
| | from utils_auditoria import registrar_log |
| |
|
| | |
| | |
| | |
| | COLUNAS_ESPERADAS = [ |
| | "fpso1", "fpso", "data_coleta", "especialista", "conferente", "osm", "modal", |
| | "quant_equip", "mrob", "linhas_osm", "linhas_mrob", "linhas_erros", |
| | "erro_storekeeper", "erro_operacao", "erro_especialista", "erro_outros", |
| | "inclusao_exclusao", "po", "part_number", "material", "solicitante", "motivo", |
| | "requisitante", "nota_fiscal", "impacto", "dimensao", "observacoes", "dia_inclusao", |
| | ] |
| |
|
| | |
| | COLS_INTEIRAS = { |
| | "quant_equip", "linhas_osm", "linhas_mrob", "linhas_erros" |
| | } |
| |
|
| | |
| | |
| | |
| | def _normalize_cols_case_insensitive(df: pd.DataFrame) -> pd.DataFrame: |
| | """ |
| | Mapeia colunas do arquivo (case/espaços) para os nomes esperados em COLUNAS_ESPERADAS. |
| | Ex.: " FPSo " -> "fpso"; "DATA_COLETA" -> "data_coleta". |
| | Colunas não reconhecidas permanecem como estão. |
| | """ |
| | if df is None or df.empty: |
| | return df |
| |
|
| | |
| | existing = list(df.columns) |
| | lower_map = {c.strip().lower(): c for c in existing} |
| | new_names = {} |
| | for wanted in COLUNAS_ESPERADAS: |
| | key = wanted.lower() |
| | if key in lower_map: |
| | new_names[lower_map[key]] = wanted |
| |
|
| | |
| | try: |
| | df = df.rename(columns=new_names) |
| | except Exception: |
| | pass |
| | return df |
| |
|
| |
|
| | def to_date(value): |
| | """ |
| | Converte para date: |
| | - pandas.Timestamp/datetime/date -> date |
| | - string em formatos 'YYYY-MM-DD' ou 'DD/MM/YYYY' |
| | - retorna None se não puder converter |
| | Necessário para compatibilidade com SQLite e outros. |
| | """ |
| | if value is None or (isinstance(value, float) and pd.isna(value)) or pd.isna(value): |
| | return None |
| |
|
| | if isinstance(value, pd.Timestamp): |
| | return value.date() |
| | if isinstance(value, datetime): |
| | return value.date() |
| | if isinstance(value, date): |
| | return value |
| |
|
| | if isinstance(value, str): |
| | s = value.strip() |
| | if not s: |
| | return None |
| | |
| | try: |
| | return datetime.strptime(s, "%Y-%m-%d").date() |
| | except Exception: |
| | pass |
| | |
| | try: |
| | return datetime.strptime(s, "%d/%m/%Y").date() |
| | except Exception: |
| | pass |
| | return None |
| |
|
| |
|
| | def safe_value(value): |
| | """ |
| | Retorna 0 se o valor for vazio/NaN; senão o próprio valor. |
| | Usado para campos "livres" que não podem ir nulos. |
| | """ |
| | if value is None or (isinstance(value, float) and pd.isna(value)) or pd.isna(value): |
| | return 0 |
| | return value |
| |
|
| |
|
| | def safe_int(value) -> int: |
| | """ |
| | Converte para int com fallback (None/NaN/erro -> 0). |
| | """ |
| | try: |
| | if value is None or pd.isna(value): |
| | return 0 |
| | |
| | if isinstance(value, str) and value.strip() == "": |
| | return 0 |
| | return int(float(value)) |
| | except Exception: |
| | return 0 |
| |
|
| |
|
| | def safe_str(value) -> str: |
| | """ |
| | Converte para string segura (None/NaN -> ""). |
| | """ |
| | if value is None or (isinstance(value, float) and pd.isna(value)) or pd.isna(value): |
| | return "" |
| | return str(value) |
| |
|
| |
|
| | def _df_template() -> pd.DataFrame: |
| | """ |
| | Retorna um DataFrame vazio com as colunas esperadas (para gerar o modelo). |
| | """ |
| | return pd.DataFrame(columns=COLUNAS_ESPERADAS) |
| |
|
| |
|
| | def _download_modelo_excel() -> BytesIO: |
| | """ |
| | Gera um arquivo Excel de modelo (aba MODELO) e retorna buffer. |
| | """ |
| | df = _df_template() |
| | buf = BytesIO() |
| | with pd.ExcelWriter(buf, engine="openpyxl") as writer: |
| | df.to_excel(writer, index=False, sheet_name="MODELO") |
| | buf.seek(0) |
| | return buf |
| |
|
| |
|
| | def _validate_required_cols(df: pd.DataFrame) -> list: |
| | """ |
| | Retorna a lista de colunas faltantes em relação a COLUNAS_ESPERADAS. |
| | Apenas informa; a importação pode prosseguir mesmo faltando algumas |
| | (desde que sua tabela/ETL aceite nulos). |
| | """ |
| | missing = [c for c in COLUNAS_ESPERADAS if c not in df.columns] |
| | return missing |
| |
|
| |
|
| | |
| | |
| | |
| | def main(): |
| | st.title("📥 Importação de Dados via Excel") |
| |
|
| | st.markdown( |
| | """ |
| | Este módulo permite: |
| | - 📄 Baixar um **modelo Excel padrão** |
| | - ✍️ Preencher os dados offline |
| | - 🔍 Validar antes da gravação |
| | - 💾 Importar os registros para o banco |
| | """ |
| | ) |
| |
|
| | |
| | |
| | |
| | st.subheader("📄 Baixar modelo Excel") |
| |
|
| | buffer = _download_modelo_excel() |
| | st.download_button( |
| | label="⬇️ Baixar modelo Excel", |
| | data=buffer, |
| | file_name="modelo_importacao_load.xlsx", |
| | mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", |
| | ) |
| |
|
| | st.divider() |
| |
|
| | |
| | |
| | |
| | st.subheader("📤 Importar arquivo preenchido") |
| |
|
| | arquivo = st.file_uploader( |
| | "Selecione o arquivo Excel (.xlsx)", |
| | type=["xlsx"] |
| | ) |
| |
|
| | if not arquivo: |
| | st.info("📌 Faça o upload de um arquivo para continuar.") |
| | return |
| |
|
| | |
| | try: |
| | df = pd.read_excel(arquivo, engine="openpyxl") |
| | except Exception as e: |
| | st.error(f"❌ Erro ao ler o arquivo: {e}") |
| | return |
| |
|
| | if df is None or df.empty: |
| | st.error("❌ O arquivo não possui dados (planilha vazia).") |
| | return |
| |
|
| | |
| | df = _normalize_cols_case_insensitive(df) |
| |
|
| | |
| | if "_row_id" not in df.columns: |
| | df["_row_id"] = range(len(df)) |
| |
|
| | |
| | st.session_state["df_raw"] = df.copy() |
| |
|
| | |
| | faltantes = _validate_required_cols(df) |
| | if faltantes: |
| | st.warning( |
| | "⚠️ Algumas colunas esperadas **não** foram encontradas no arquivo (" |
| | + ", ".join(faltantes) + "). " |
| | "Se esses campos forem obrigatórios em seu banco, a importação pode falhar." |
| | ) |
| |
|
| | st.success("✅ Arquivo carregado com sucesso!") |
| |
|
| | |
| | |
| | |
| | st.subheader("🔍 Prévia dos dados (arquivo lido)") |
| | st.dataframe(df, use_container_width=True) |
| |
|
| | |
| | |
| | |
| | st.subheader("🧪 Verificação de duplicidade") |
| |
|
| | st.caption( |
| | "Escolha as colunas que definem a duplicidade. " |
| | "Em seguida, marque o checkbox **_excluir** nas linhas que **não** deseja importar." |
| | ) |
| |
|
| | |
| | sugestao_chaves = [c for c in ["fpso", "osm", "po", "part_number", "nota_fiscal", "data_coleta"] if c in df.columns] |
| |
|
| | chaves = st.multiselect( |
| | "📌 Colunas para verificação de duplicidade:", |
| | options=list(df.columns), |
| | default=sugestao_chaves if len(sugestao_chaves) > 0 else [] |
| | ) |
| |
|
| | |
| | work_df = df.copy() |
| | work_df["_duplicado"] = False |
| | work_df["_excluir"] = False |
| |
|
| | if len(chaves) == 0: |
| | st.info("Selecione ao menos **uma** coluna para verificar duplicidade.") |
| | |
| | with st.expander("🔧 (Opcional) Excluir linhas manualmente mesmo sem duplicidade"): |
| | manual_view = work_df.set_index("_row_id")[ |
| | [c for c in work_df.columns if c not in ["_duplicado"]] + ["_excluir"] |
| | ] |
| | edited_manual = st.data_editor( |
| | manual_view, |
| | use_container_width=True, |
| | num_rows="fixed", |
| | column_config={ |
| | "_excluir": st.column_config.CheckboxColumn("Excluir da importação") |
| | } |
| | ) |
| | |
| | work_df = work_df.set_index("_row_id") |
| | work_df["_excluir"] = edited_manual["_excluir"].reindex(work_df.index).fillna(False).astype(bool) |
| | work_df = work_df.reset_index() |
| |
|
| | else: |
| | |
| | mask_dup_any = work_df.duplicated(subset=chaves, keep=False) |
| | work_df["_duplicado"] = mask_dup_any |
| |
|
| | |
| | mask_dup_not_first = work_df.duplicated(subset=chaves, keep="first") |
| | work_df.loc[mask_dup_not_first, "_excluir"] = True |
| |
|
| | if mask_dup_any.any(): |
| | st.warning("⚠️ Foram encontradas linhas duplicadas com base nas chaves selecionadas:") |
| | |
| | cols_para_mostrar = chaves + [c for c in ["_duplicado", "_excluir"] if c not in chaves] |
| | |
| | seen = set() |
| | cols_para_mostrar = [c for c in cols_para_mostrar if not (c in seen or seen.add(c))] |
| |
|
| | dup_view = work_df.loc[mask_dup_any].set_index("_row_id")[cols_para_mostrar] |
| | edited_dup = st.data_editor( |
| | dup_view, |
| | use_container_width=True, |
| | num_rows="fixed", |
| | column_config={ |
| | "_excluir": st.column_config.CheckboxColumn("Excluir da importação"), |
| | "_duplicado": st.column_config.CheckboxColumn("Duplicado", disabled=True) |
| | } |
| | ) |
| |
|
| | |
| | work_df = work_df.set_index("_row_id") |
| | if "_excluir" in edited_dup.columns: |
| | work_df.loc[edited_dup.index, "_excluir"] = ( |
| | edited_dup["_excluir"].reindex(work_df.index).fillna(work_df["_excluir"]).astype(bool) |
| | ) |
| | work_df = work_df.reset_index() |
| |
|
| | st.info( |
| | f"📊 Totais — Linhas: {len(work_df)} | Duplicadas: {mask_dup_any.sum()} | " |
| | f"Marcadas para excluir: {int(work_df['_excluir'].sum())}" |
| | ) |
| | else: |
| | st.success("✅ Nenhuma duplicidade encontrada com as chaves selecionadas.") |
| |
|
| | st.divider() |
| |
|
| | |
| | |
| | |
| | df_para_importar = work_df[~work_df["_excluir"]].drop(columns=["_duplicado", "_excluir"], errors="ignore") |
| | st.session_state["df_para_importar"] = df_para_importar.copy() |
| |
|
| | st.subheader("🧾 Prévia do que será importado") |
| | st.caption("A prévia abaixo desconsidera as linhas marcadas como **_excluir**.") |
| | st.dataframe(df_para_importar.drop(columns=["_row_id"], errors="ignore"), use_container_width=True) |
| |
|
| | |
| | buf_prev = BytesIO() |
| | with pd.ExcelWriter(buf_prev, engine="openpyxl") as writer: |
| | df_para_importar.drop(columns=["_row_id"], errors="ignore").to_excel(writer, index=False, sheet_name="A_IMPORTAR") |
| | buf_prev.seek(0) |
| | st.download_button( |
| | "⬇️ Baixar prévia (Excel)", |
| | data=buf_prev, |
| | file_name="previa_a_importar.xlsx", |
| | mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", |
| | help="Baixe a prévia do conjunto que será gravado." |
| | ) |
| |
|
| | |
| | |
| | |
| | st.subheader("💾 Gravar dados no banco") |
| |
|
| | col1, col2 = st.columns(2) |
| |
|
| | if col1.button("💾 Salvar registros importados"): |
| | df_import = st.session_state.get("df_para_importar", df) |
| |
|
| | if df_import.empty: |
| | st.error("Não há registros para importar. Revise as exclusões.") |
| | return |
| |
|
| | with SessionLocal() as db: |
| | try: |
| | for _, row in df_import.iterrows(): |
| | registro = Equipamento( |
| | fpso1=safe_str(row.get("fpso1")), |
| | fpso=safe_str(row.get("fpso")), |
| | data_coleta=to_date(row.get("data_coleta")), |
| | especialista=safe_str(row.get("especialista")), |
| | conferente=safe_str(row.get("conferente")), |
| | osm=safe_str(row.get("osm")), |
| | modal=safe_str(row.get("modal")), |
| | quant_equip=safe_int(row.get("quant_equip")), |
| | mrob=safe_str(row.get("mrob")), |
| | linhas_osm=safe_int(row.get("linhas_osm")), |
| | linhas_mrob=safe_int(row.get("linhas_mrob")), |
| | linhas_erros=safe_int(row.get("linhas_erros")), |
| | erro_storekeeper=safe_str(row.get("erro_storekeeper")), |
| | erro_operacao=safe_str(row.get("erro_operacao")), |
| | erro_especialista=safe_str(row.get("erro_especialista")), |
| | erro_outros=safe_str(row.get("erro_outros")), |
| | inclusao_exclusao=safe_str(row.get("inclusao_exclusao")), |
| | po=safe_str(row.get("po")), |
| | part_number=safe_str(row.get("part_number")), |
| | material=safe_str(row.get("material")), |
| | solicitante=safe_str(row.get("solicitante")), |
| | motivo=safe_str(row.get("motivo")), |
| | requisitante=safe_str(row.get("requisitante")), |
| | nota_fiscal=safe_str(row.get("nota_fiscal")), |
| | impacto=safe_str(row.get("impacto")), |
| | dimensao=safe_str(row.get("dimensao")), |
| | observacoes=safe_str(row.get("observacoes")), |
| | dia_inclusao=safe_str(row.get("dia_inclusao")), |
| | ) |
| | db.add(registro) |
| |
|
| | db.commit() |
| |
|
| | try: |
| | registrar_log( |
| | usuario=st.session_state.get("usuario"), |
| | acao=f"IMPORTAÇÃO EXCEL ({len(df_import)} registros) - com filtro de duplicidade", |
| | tabela="equipamentos", |
| | registro_id=None |
| | ) |
| | except Exception: |
| | |
| | pass |
| |
|
| | st.success(f"🎉 Importação concluída com sucesso! {len(df_import)} registros gravados.") |
| |
|
| | except Exception as e: |
| | db.rollback() |
| | st.error(f"❌ Erro ao gravar no banco: {e}") |
| |
|
| | if col2.button("❌ Cancelar importação"): |
| | st.warning("Importação cancelada pelo usuário.") |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |