IOI-RUN / importar_excel.py
Roudrigus's picture
Update importar_excel.py
e199519 verified
# -*- coding: utf-8 -*-
"""
importar_excel.py
Importa planilhas Excel para a tabela Equipamento, com:
- Geração de modelo Excel
- Upload e pré-visualização
- Verificação/edição de duplicidades
- Conversões seguras de tipos (datas/números/strings)
- Gravação transacional + auditoria resiliente
Compatível com Linux (Spaces) e local/Windows.
"""
import streamlit as st
import pandas as pd
from io import BytesIO
from datetime import datetime, date
from banco import SessionLocal
from models import Equipamento
from utils_auditoria import registrar_log
# =========================================
# Configuração – nomes de colunas esperadas
# =========================================
COLUNAS_ESPERADAS = [
"fpso1", "fpso", "data_coleta", "especialista", "conferente", "osm", "modal",
"quant_equip", "mrob", "linhas_osm", "linhas_mrob", "linhas_erros",
"erro_storekeeper", "erro_operacao", "erro_especialista", "erro_outros",
"inclusao_exclusao", "po", "part_number", "material", "solicitante", "motivo",
"requisitante", "nota_fiscal", "impacto", "dimensao", "observacoes", "dia_inclusao",
]
# Colunas que tratamos como números inteiros na gravação
COLS_INTEIRAS = {
"quant_equip", "linhas_osm", "linhas_mrob", "linhas_erros"
}
# =====================================================
# FUNÇÕES AUXILIARES
# =====================================================
def _normalize_cols_case_insensitive(df: pd.DataFrame) -> pd.DataFrame:
"""
Mapeia colunas do arquivo (case/espaços) para os nomes esperados em COLUNAS_ESPERADAS.
Ex.: " FPSo " -> "fpso"; "DATA_COLETA" -> "data_coleta".
Colunas não reconhecidas permanecem como estão.
"""
if df is None or df.empty:
return df
# prioriza mapeamento por lower(strip)
existing = list(df.columns)
lower_map = {c.strip().lower(): c for c in existing}
new_names = {}
for wanted in COLUNAS_ESPERADAS:
key = wanted.lower()
if key in lower_map:
new_names[lower_map[key]] = wanted # renomeia p/ o nome "oficial" esperado
# aplica renomeação
try:
df = df.rename(columns=new_names)
except Exception:
pass
return df
def to_date(value):
"""
Converte para date:
- pandas.Timestamp/datetime/date -> date
- string em formatos 'YYYY-MM-DD' ou 'DD/MM/YYYY'
- retorna None se não puder converter
Necessário para compatibilidade com SQLite e outros.
"""
if value is None or (isinstance(value, float) and pd.isna(value)) or pd.isna(value):
return None
if isinstance(value, pd.Timestamp):
return value.date()
if isinstance(value, datetime):
return value.date()
if isinstance(value, date):
return value
if isinstance(value, str):
s = value.strip()
if not s:
return None
# tenta ISO
try:
return datetime.strptime(s, "%Y-%m-%d").date()
except Exception:
pass
# tenta BR
try:
return datetime.strptime(s, "%d/%m/%Y").date()
except Exception:
pass
return None
def safe_value(value):
"""
Retorna 0 se o valor for vazio/NaN; senão o próprio valor.
Usado para campos "livres" que não podem ir nulos.
"""
if value is None or (isinstance(value, float) and pd.isna(value)) or pd.isna(value):
return 0
return value
def safe_int(value) -> int:
"""
Converte para int com fallback (None/NaN/erro -> 0).
"""
try:
if value is None or pd.isna(value):
return 0
# strings vazias
if isinstance(value, str) and value.strip() == "":
return 0
return int(float(value))
except Exception:
return 0
def safe_str(value) -> str:
"""
Converte para string segura (None/NaN -> "").
"""
if value is None or (isinstance(value, float) and pd.isna(value)) or pd.isna(value):
return ""
return str(value)
def _df_template() -> pd.DataFrame:
"""
Retorna um DataFrame vazio com as colunas esperadas (para gerar o modelo).
"""
return pd.DataFrame(columns=COLUNAS_ESPERADAS)
def _download_modelo_excel() -> BytesIO:
"""
Gera um arquivo Excel de modelo (aba MODELO) e retorna buffer.
"""
df = _df_template()
buf = BytesIO()
with pd.ExcelWriter(buf, engine="openpyxl") as writer:
df.to_excel(writer, index=False, sheet_name="MODELO")
buf.seek(0)
return buf
def _validate_required_cols(df: pd.DataFrame) -> list:
"""
Retorna a lista de colunas faltantes em relação a COLUNAS_ESPERADAS.
Apenas informa; a importação pode prosseguir mesmo faltando algumas
(desde que sua tabela/ETL aceite nulos).
"""
missing = [c for c in COLUNAS_ESPERADAS if c not in df.columns]
return missing
# =====================================================
# MÓDULO PRINCIPAL
# =====================================================
def main():
st.title("📥 Importação de Dados via Excel")
st.markdown(
"""
Este módulo permite:
- 📄 Baixar um **modelo Excel padrão**
- ✍️ Preencher os dados offline
- 🔍 Validar antes da gravação
- 💾 Importar os registros para o banco
"""
)
# =====================================================
# 1️⃣ GERAR MODELO EXCEL
# =====================================================
st.subheader("📄 Baixar modelo Excel")
buffer = _download_modelo_excel()
st.download_button(
label="⬇️ Baixar modelo Excel",
data=buffer,
file_name="modelo_importacao_load.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
)
st.divider()
# =====================================================
# 2️⃣ UPLOAD DO ARQUIVO
# =====================================================
st.subheader("📤 Importar arquivo preenchido")
arquivo = st.file_uploader(
"Selecione o arquivo Excel (.xlsx)",
type=["xlsx"]
)
if not arquivo:
st.info("📌 Faça o upload de um arquivo para continuar.")
return
# Leitura com engine explícito para maior compatibilidade
try:
df = pd.read_excel(arquivo, engine="openpyxl")
except Exception as e:
st.error(f"❌ Erro ao ler o arquivo: {e}")
return
if df is None or df.empty:
st.error("❌ O arquivo não possui dados (planilha vazia).")
return
# Normaliza colunas (case/espaços) para nomes esperados
df = _normalize_cols_case_insensitive(df)
# Garante um identificador estável para cada linha durante as edições
if "_row_id" not in df.columns:
df["_row_id"] = range(len(df))
# Salva o DF bruto na sessão para persistência entre reruns
st.session_state["df_raw"] = df.copy()
# Diagnóstico de colunas
faltantes = _validate_required_cols(df)
if faltantes:
st.warning(
"⚠️ Algumas colunas esperadas **não** foram encontradas no arquivo ("
+ ", ".join(faltantes) + "). "
"Se esses campos forem obrigatórios em seu banco, a importação pode falhar."
)
st.success("✅ Arquivo carregado com sucesso!")
# =====================================================
# 3️⃣ PRÉVIA DOS DADOS
# =====================================================
st.subheader("🔍 Prévia dos dados (arquivo lido)")
st.dataframe(df, use_container_width=True)
# =====================================================
# 4️⃣ VERIFICAÇÃO DE DUPLICIDADE (com seleção de linhas a excluir)
# =====================================================
st.subheader("🧪 Verificação de duplicidade")
st.caption(
"Escolha as colunas que definem a duplicidade. "
"Em seguida, marque o checkbox **_excluir** nas linhas que **não** deseja importar."
)
# Sugerimos um conjunto padrão de chaves, mas só usamos as que existem no arquivo
sugestao_chaves = [c for c in ["fpso", "osm", "po", "part_number", "nota_fiscal", "data_coleta"] if c in df.columns]
chaves = st.multiselect(
"📌 Colunas para verificação de duplicidade:",
options=list(df.columns),
default=sugestao_chaves if len(sugestao_chaves) > 0 else []
)
# Prepara um DF de trabalho com colunas auxiliares
work_df = df.copy()
work_df["_duplicado"] = False
work_df["_excluir"] = False # será editado pelo usuário
if len(chaves) == 0:
st.info("Selecione ao menos **uma** coluna para verificar duplicidade.")
# Mesmo sem duplicidade definida, permitimos marcar exclusões manuais:
with st.expander("🔧 (Opcional) Excluir linhas manualmente mesmo sem duplicidade"):
manual_view = work_df.set_index("_row_id")[
[c for c in work_df.columns if c not in ["_duplicado"]] + ["_excluir"]
]
edited_manual = st.data_editor(
manual_view,
use_container_width=True,
num_rows="fixed",
column_config={
"_excluir": st.column_config.CheckboxColumn("Excluir da importação")
}
)
# Aplica exclusões manuais
work_df = work_df.set_index("_row_id")
work_df["_excluir"] = edited_manual["_excluir"].reindex(work_df.index).fillna(False).astype(bool)
work_df = work_df.reset_index()
else:
# Marca duplicadas com base nas chaves selecionadas
mask_dup_any = work_df.duplicated(subset=chaves, keep=False)
work_df["_duplicado"] = mask_dup_any
# Sugerimos exclusão automática das ocorrências não-primárias (o usuário pode alterar)
mask_dup_not_first = work_df.duplicated(subset=chaves, keep="first")
work_df.loc[mask_dup_not_first, "_excluir"] = True
if mask_dup_any.any():
st.warning("⚠️ Foram encontradas linhas duplicadas com base nas chaves selecionadas:")
# Mostrar somente as duplicadas para facilitar a decisão
cols_para_mostrar = chaves + [c for c in ["_duplicado", "_excluir"] if c not in chaves]
# Evita colunas repetidas mantendo ordem
seen = set()
cols_para_mostrar = [c for c in cols_para_mostrar if not (c in seen or seen.add(c))]
dup_view = work_df.loc[mask_dup_any].set_index("_row_id")[cols_para_mostrar]
edited_dup = st.data_editor(
dup_view,
use_container_width=True,
num_rows="fixed",
column_config={
"_excluir": st.column_config.CheckboxColumn("Excluir da importação"),
"_duplicado": st.column_config.CheckboxColumn("Duplicado", disabled=True)
}
)
# Mescla de volta as escolhas do usuário
work_df = work_df.set_index("_row_id")
if "_excluir" in edited_dup.columns:
work_df.loc[edited_dup.index, "_excluir"] = (
edited_dup["_excluir"].reindex(work_df.index).fillna(work_df["_excluir"]).astype(bool)
)
work_df = work_df.reset_index()
st.info(
f"📊 Totais — Linhas: {len(work_df)} | Duplicadas: {mask_dup_any.sum()} | "
f"Marcadas para excluir: {int(work_df['_excluir'].sum())}"
)
else:
st.success("✅ Nenhuma duplicidade encontrada com as chaves selecionadas.")
st.divider()
# =====================================================
# 4.1️⃣ Prévia final do que será importado + download
# =====================================================
df_para_importar = work_df[~work_df["_excluir"]].drop(columns=["_duplicado", "_excluir"], errors="ignore")
st.session_state["df_para_importar"] = df_para_importar.copy()
st.subheader("🧾 Prévia do que será importado")
st.caption("A prévia abaixo desconsidera as linhas marcadas como **_excluir**.")
st.dataframe(df_para_importar.drop(columns=["_row_id"], errors="ignore"), use_container_width=True)
# Download da prévia para conferência
buf_prev = BytesIO()
with pd.ExcelWriter(buf_prev, engine="openpyxl") as writer:
df_para_importar.drop(columns=["_row_id"], errors="ignore").to_excel(writer, index=False, sheet_name="A_IMPORTAR")
buf_prev.seek(0)
st.download_button(
"⬇️ Baixar prévia (Excel)",
data=buf_prev,
file_name="previa_a_importar.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
help="Baixe a prévia do conjunto que será gravado."
)
# =====================================================
# 5️⃣ GRAVAÇÃO NO BANCO (usa o DataFrame filtrado)
# =====================================================
st.subheader("💾 Gravar dados no banco")
col1, col2 = st.columns(2)
if col1.button("💾 Salvar registros importados"):
df_import = st.session_state.get("df_para_importar", df)
if df_import.empty:
st.error("Não há registros para importar. Revise as exclusões.")
return
with SessionLocal() as db:
try:
for _, row in df_import.iterrows():
registro = Equipamento(
fpso1=safe_str(row.get("fpso1")),
fpso=safe_str(row.get("fpso")),
data_coleta=to_date(row.get("data_coleta")),
especialista=safe_str(row.get("especialista")),
conferente=safe_str(row.get("conferente")),
osm=safe_str(row.get("osm")),
modal=safe_str(row.get("modal")),
quant_equip=safe_int(row.get("quant_equip")),
mrob=safe_str(row.get("mrob")),
linhas_osm=safe_int(row.get("linhas_osm")),
linhas_mrob=safe_int(row.get("linhas_mrob")),
linhas_erros=safe_int(row.get("linhas_erros")),
erro_storekeeper=safe_str(row.get("erro_storekeeper")),
erro_operacao=safe_str(row.get("erro_operacao")),
erro_especialista=safe_str(row.get("erro_especialista")),
erro_outros=safe_str(row.get("erro_outros")),
inclusao_exclusao=safe_str(row.get("inclusao_exclusao")),
po=safe_str(row.get("po")),
part_number=safe_str(row.get("part_number")),
material=safe_str(row.get("material")),
solicitante=safe_str(row.get("solicitante")),
motivo=safe_str(row.get("motivo")),
requisitante=safe_str(row.get("requisitante")),
nota_fiscal=safe_str(row.get("nota_fiscal")),
impacto=safe_str(row.get("impacto")),
dimensao=safe_str(row.get("dimensao")),
observacoes=safe_str(row.get("observacoes")),
dia_inclusao=safe_str(row.get("dia_inclusao")),
)
db.add(registro)
db.commit()
try:
registrar_log(
usuario=st.session_state.get("usuario"),
acao=f"IMPORTAÇÃO EXCEL ({len(df_import)} registros) - com filtro de duplicidade",
tabela="equipamentos",
registro_id=None
)
except Exception:
# Não quebra o fluxo se auditoria falhar
pass
st.success(f"🎉 Importação concluída com sucesso! {len(df_import)} registros gravados.")
except Exception as e:
db.rollback()
st.error(f"❌ Erro ao gravar no banco: {e}")
if col2.button("❌ Cancelar importação"):
st.warning("Importação cancelada pelo usuário.")
if __name__ == "__main__":
main()