|
|
|
|
|
""" |
|
|
importar_excel.py |
|
|
Importa planilhas Excel para a tabela Equipamento, com: |
|
|
- Geração de modelo Excel |
|
|
- Upload e pré-visualização |
|
|
- Verificação/edição de duplicidades |
|
|
- Conversões seguras de tipos (datas/números/strings) |
|
|
- Gravação transacional + auditoria resiliente |
|
|
Compatível com Linux (Spaces) e local/Windows. |
|
|
""" |
|
|
|
|
|
import streamlit as st |
|
|
import pandas as pd |
|
|
from io import BytesIO |
|
|
from datetime import datetime, date |
|
|
|
|
|
from banco import SessionLocal |
|
|
from models import Equipamento |
|
|
from utils_auditoria import registrar_log |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
COLUNAS_ESPERADAS = [ |
|
|
"fpso1", "fpso", "data_coleta", "especialista", "conferente", "osm", "modal", |
|
|
"quant_equip", "mrob", "linhas_osm", "linhas_mrob", "linhas_erros", |
|
|
"erro_storekeeper", "erro_operacao", "erro_especialista", "erro_outros", |
|
|
"inclusao_exclusao", "po", "part_number", "material", "solicitante", "motivo", |
|
|
"requisitante", "nota_fiscal", "impacto", "dimensao", "observacoes", "dia_inclusao", |
|
|
] |
|
|
|
|
|
|
|
|
COLS_INTEIRAS = { |
|
|
"quant_equip", "linhas_osm", "linhas_mrob", "linhas_erros" |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _normalize_cols_case_insensitive(df: pd.DataFrame) -> pd.DataFrame: |
|
|
""" |
|
|
Mapeia colunas do arquivo (case/espaços) para os nomes esperados em COLUNAS_ESPERADAS. |
|
|
Ex.: " FPSo " -> "fpso"; "DATA_COLETA" -> "data_coleta". |
|
|
Colunas não reconhecidas permanecem como estão. |
|
|
""" |
|
|
if df is None or df.empty: |
|
|
return df |
|
|
|
|
|
|
|
|
existing = list(df.columns) |
|
|
lower_map = {c.strip().lower(): c for c in existing} |
|
|
new_names = {} |
|
|
for wanted in COLUNAS_ESPERADAS: |
|
|
key = wanted.lower() |
|
|
if key in lower_map: |
|
|
new_names[lower_map[key]] = wanted |
|
|
|
|
|
|
|
|
try: |
|
|
df = df.rename(columns=new_names) |
|
|
except Exception: |
|
|
pass |
|
|
return df |
|
|
|
|
|
|
|
|
def to_date(value): |
|
|
""" |
|
|
Converte para date: |
|
|
- pandas.Timestamp/datetime/date -> date |
|
|
- string em formatos 'YYYY-MM-DD' ou 'DD/MM/YYYY' |
|
|
- retorna None se não puder converter |
|
|
Necessário para compatibilidade com SQLite e outros. |
|
|
""" |
|
|
if value is None or (isinstance(value, float) and pd.isna(value)) or pd.isna(value): |
|
|
return None |
|
|
|
|
|
if isinstance(value, pd.Timestamp): |
|
|
return value.date() |
|
|
if isinstance(value, datetime): |
|
|
return value.date() |
|
|
if isinstance(value, date): |
|
|
return value |
|
|
|
|
|
if isinstance(value, str): |
|
|
s = value.strip() |
|
|
if not s: |
|
|
return None |
|
|
|
|
|
try: |
|
|
return datetime.strptime(s, "%Y-%m-%d").date() |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
try: |
|
|
return datetime.strptime(s, "%d/%m/%Y").date() |
|
|
except Exception: |
|
|
pass |
|
|
return None |
|
|
|
|
|
|
|
|
def safe_value(value): |
|
|
""" |
|
|
Retorna 0 se o valor for vazio/NaN; senão o próprio valor. |
|
|
Usado para campos "livres" que não podem ir nulos. |
|
|
""" |
|
|
if value is None or (isinstance(value, float) and pd.isna(value)) or pd.isna(value): |
|
|
return 0 |
|
|
return value |
|
|
|
|
|
|
|
|
def safe_int(value) -> int: |
|
|
""" |
|
|
Converte para int com fallback (None/NaN/erro -> 0). |
|
|
""" |
|
|
try: |
|
|
if value is None or pd.isna(value): |
|
|
return 0 |
|
|
|
|
|
if isinstance(value, str) and value.strip() == "": |
|
|
return 0 |
|
|
return int(float(value)) |
|
|
except Exception: |
|
|
return 0 |
|
|
|
|
|
|
|
|
def safe_str(value) -> str: |
|
|
""" |
|
|
Converte para string segura (None/NaN -> ""). |
|
|
""" |
|
|
if value is None or (isinstance(value, float) and pd.isna(value)) or pd.isna(value): |
|
|
return "" |
|
|
return str(value) |
|
|
|
|
|
|
|
|
def _df_template() -> pd.DataFrame: |
|
|
""" |
|
|
Retorna um DataFrame vazio com as colunas esperadas (para gerar o modelo). |
|
|
""" |
|
|
return pd.DataFrame(columns=COLUNAS_ESPERADAS) |
|
|
|
|
|
|
|
|
def _download_modelo_excel() -> BytesIO: |
|
|
""" |
|
|
Gera um arquivo Excel de modelo (aba MODELO) e retorna buffer. |
|
|
""" |
|
|
df = _df_template() |
|
|
buf = BytesIO() |
|
|
with pd.ExcelWriter(buf, engine="openpyxl") as writer: |
|
|
df.to_excel(writer, index=False, sheet_name="MODELO") |
|
|
buf.seek(0) |
|
|
return buf |
|
|
|
|
|
|
|
|
def _validate_required_cols(df: pd.DataFrame) -> list: |
|
|
""" |
|
|
Retorna a lista de colunas faltantes em relação a COLUNAS_ESPERADAS. |
|
|
Apenas informa; a importação pode prosseguir mesmo faltando algumas |
|
|
(desde que sua tabela/ETL aceite nulos). |
|
|
""" |
|
|
missing = [c for c in COLUNAS_ESPERADAS if c not in df.columns] |
|
|
return missing |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(): |
|
|
st.title("📥 Importação de Dados via Excel") |
|
|
|
|
|
st.markdown( |
|
|
""" |
|
|
Este módulo permite: |
|
|
- 📄 Baixar um **modelo Excel padrão** |
|
|
- ✍️ Preencher os dados offline |
|
|
- 🔍 Validar antes da gravação |
|
|
- 💾 Importar os registros para o banco |
|
|
""" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.subheader("📄 Baixar modelo Excel") |
|
|
|
|
|
buffer = _download_modelo_excel() |
|
|
st.download_button( |
|
|
label="⬇️ Baixar modelo Excel", |
|
|
data=buffer, |
|
|
file_name="modelo_importacao_load.xlsx", |
|
|
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", |
|
|
) |
|
|
|
|
|
st.divider() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.subheader("📤 Importar arquivo preenchido") |
|
|
|
|
|
arquivo = st.file_uploader( |
|
|
"Selecione o arquivo Excel (.xlsx)", |
|
|
type=["xlsx"] |
|
|
) |
|
|
|
|
|
if not arquivo: |
|
|
st.info("📌 Faça o upload de um arquivo para continuar.") |
|
|
return |
|
|
|
|
|
|
|
|
try: |
|
|
df = pd.read_excel(arquivo, engine="openpyxl") |
|
|
except Exception as e: |
|
|
st.error(f"❌ Erro ao ler o arquivo: {e}") |
|
|
return |
|
|
|
|
|
if df is None or df.empty: |
|
|
st.error("❌ O arquivo não possui dados (planilha vazia).") |
|
|
return |
|
|
|
|
|
|
|
|
df = _normalize_cols_case_insensitive(df) |
|
|
|
|
|
|
|
|
if "_row_id" not in df.columns: |
|
|
df["_row_id"] = range(len(df)) |
|
|
|
|
|
|
|
|
st.session_state["df_raw"] = df.copy() |
|
|
|
|
|
|
|
|
faltantes = _validate_required_cols(df) |
|
|
if faltantes: |
|
|
st.warning( |
|
|
"⚠️ Algumas colunas esperadas **não** foram encontradas no arquivo (" |
|
|
+ ", ".join(faltantes) + "). " |
|
|
"Se esses campos forem obrigatórios em seu banco, a importação pode falhar." |
|
|
) |
|
|
|
|
|
st.success("✅ Arquivo carregado com sucesso!") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.subheader("🔍 Prévia dos dados (arquivo lido)") |
|
|
st.dataframe(df, use_container_width=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.subheader("🧪 Verificação de duplicidade") |
|
|
|
|
|
st.caption( |
|
|
"Escolha as colunas que definem a duplicidade. " |
|
|
"Em seguida, marque o checkbox **_excluir** nas linhas que **não** deseja importar." |
|
|
) |
|
|
|
|
|
|
|
|
sugestao_chaves = [c for c in ["fpso", "osm", "po", "part_number", "nota_fiscal", "data_coleta"] if c in df.columns] |
|
|
|
|
|
chaves = st.multiselect( |
|
|
"📌 Colunas para verificação de duplicidade:", |
|
|
options=list(df.columns), |
|
|
default=sugestao_chaves if len(sugestao_chaves) > 0 else [] |
|
|
) |
|
|
|
|
|
|
|
|
work_df = df.copy() |
|
|
work_df["_duplicado"] = False |
|
|
work_df["_excluir"] = False |
|
|
|
|
|
if len(chaves) == 0: |
|
|
st.info("Selecione ao menos **uma** coluna para verificar duplicidade.") |
|
|
|
|
|
with st.expander("🔧 (Opcional) Excluir linhas manualmente mesmo sem duplicidade"): |
|
|
manual_view = work_df.set_index("_row_id")[ |
|
|
[c for c in work_df.columns if c not in ["_duplicado"]] + ["_excluir"] |
|
|
] |
|
|
edited_manual = st.data_editor( |
|
|
manual_view, |
|
|
use_container_width=True, |
|
|
num_rows="fixed", |
|
|
column_config={ |
|
|
"_excluir": st.column_config.CheckboxColumn("Excluir da importação") |
|
|
} |
|
|
) |
|
|
|
|
|
work_df = work_df.set_index("_row_id") |
|
|
work_df["_excluir"] = edited_manual["_excluir"].reindex(work_df.index).fillna(False).astype(bool) |
|
|
work_df = work_df.reset_index() |
|
|
|
|
|
else: |
|
|
|
|
|
mask_dup_any = work_df.duplicated(subset=chaves, keep=False) |
|
|
work_df["_duplicado"] = mask_dup_any |
|
|
|
|
|
|
|
|
mask_dup_not_first = work_df.duplicated(subset=chaves, keep="first") |
|
|
work_df.loc[mask_dup_not_first, "_excluir"] = True |
|
|
|
|
|
if mask_dup_any.any(): |
|
|
st.warning("⚠️ Foram encontradas linhas duplicadas com base nas chaves selecionadas:") |
|
|
|
|
|
cols_para_mostrar = chaves + [c for c in ["_duplicado", "_excluir"] if c not in chaves] |
|
|
|
|
|
seen = set() |
|
|
cols_para_mostrar = [c for c in cols_para_mostrar if not (c in seen or seen.add(c))] |
|
|
|
|
|
dup_view = work_df.loc[mask_dup_any].set_index("_row_id")[cols_para_mostrar] |
|
|
edited_dup = st.data_editor( |
|
|
dup_view, |
|
|
use_container_width=True, |
|
|
num_rows="fixed", |
|
|
column_config={ |
|
|
"_excluir": st.column_config.CheckboxColumn("Excluir da importação"), |
|
|
"_duplicado": st.column_config.CheckboxColumn("Duplicado", disabled=True) |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
work_df = work_df.set_index("_row_id") |
|
|
if "_excluir" in edited_dup.columns: |
|
|
work_df.loc[edited_dup.index, "_excluir"] = ( |
|
|
edited_dup["_excluir"].reindex(work_df.index).fillna(work_df["_excluir"]).astype(bool) |
|
|
) |
|
|
work_df = work_df.reset_index() |
|
|
|
|
|
st.info( |
|
|
f"📊 Totais — Linhas: {len(work_df)} | Duplicadas: {mask_dup_any.sum()} | " |
|
|
f"Marcadas para excluir: {int(work_df['_excluir'].sum())}" |
|
|
) |
|
|
else: |
|
|
st.success("✅ Nenhuma duplicidade encontrada com as chaves selecionadas.") |
|
|
|
|
|
st.divider() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
df_para_importar = work_df[~work_df["_excluir"]].drop(columns=["_duplicado", "_excluir"], errors="ignore") |
|
|
st.session_state["df_para_importar"] = df_para_importar.copy() |
|
|
|
|
|
st.subheader("🧾 Prévia do que será importado") |
|
|
st.caption("A prévia abaixo desconsidera as linhas marcadas como **_excluir**.") |
|
|
st.dataframe(df_para_importar.drop(columns=["_row_id"], errors="ignore"), use_container_width=True) |
|
|
|
|
|
|
|
|
buf_prev = BytesIO() |
|
|
with pd.ExcelWriter(buf_prev, engine="openpyxl") as writer: |
|
|
df_para_importar.drop(columns=["_row_id"], errors="ignore").to_excel(writer, index=False, sheet_name="A_IMPORTAR") |
|
|
buf_prev.seek(0) |
|
|
st.download_button( |
|
|
"⬇️ Baixar prévia (Excel)", |
|
|
data=buf_prev, |
|
|
file_name="previa_a_importar.xlsx", |
|
|
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", |
|
|
help="Baixe a prévia do conjunto que será gravado." |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.subheader("💾 Gravar dados no banco") |
|
|
|
|
|
col1, col2 = st.columns(2) |
|
|
|
|
|
if col1.button("💾 Salvar registros importados"): |
|
|
df_import = st.session_state.get("df_para_importar", df) |
|
|
|
|
|
if df_import.empty: |
|
|
st.error("Não há registros para importar. Revise as exclusões.") |
|
|
return |
|
|
|
|
|
with SessionLocal() as db: |
|
|
try: |
|
|
for _, row in df_import.iterrows(): |
|
|
registro = Equipamento( |
|
|
fpso1=safe_str(row.get("fpso1")), |
|
|
fpso=safe_str(row.get("fpso")), |
|
|
data_coleta=to_date(row.get("data_coleta")), |
|
|
especialista=safe_str(row.get("especialista")), |
|
|
conferente=safe_str(row.get("conferente")), |
|
|
osm=safe_str(row.get("osm")), |
|
|
modal=safe_str(row.get("modal")), |
|
|
quant_equip=safe_int(row.get("quant_equip")), |
|
|
mrob=safe_str(row.get("mrob")), |
|
|
linhas_osm=safe_int(row.get("linhas_osm")), |
|
|
linhas_mrob=safe_int(row.get("linhas_mrob")), |
|
|
linhas_erros=safe_int(row.get("linhas_erros")), |
|
|
erro_storekeeper=safe_str(row.get("erro_storekeeper")), |
|
|
erro_operacao=safe_str(row.get("erro_operacao")), |
|
|
erro_especialista=safe_str(row.get("erro_especialista")), |
|
|
erro_outros=safe_str(row.get("erro_outros")), |
|
|
inclusao_exclusao=safe_str(row.get("inclusao_exclusao")), |
|
|
po=safe_str(row.get("po")), |
|
|
part_number=safe_str(row.get("part_number")), |
|
|
material=safe_str(row.get("material")), |
|
|
solicitante=safe_str(row.get("solicitante")), |
|
|
motivo=safe_str(row.get("motivo")), |
|
|
requisitante=safe_str(row.get("requisitante")), |
|
|
nota_fiscal=safe_str(row.get("nota_fiscal")), |
|
|
impacto=safe_str(row.get("impacto")), |
|
|
dimensao=safe_str(row.get("dimensao")), |
|
|
observacoes=safe_str(row.get("observacoes")), |
|
|
dia_inclusao=safe_str(row.get("dia_inclusao")), |
|
|
) |
|
|
db.add(registro) |
|
|
|
|
|
db.commit() |
|
|
|
|
|
try: |
|
|
registrar_log( |
|
|
usuario=st.session_state.get("usuario"), |
|
|
acao=f"IMPORTAÇÃO EXCEL ({len(df_import)} registros) - com filtro de duplicidade", |
|
|
tabela="equipamentos", |
|
|
registro_id=None |
|
|
) |
|
|
except Exception: |
|
|
|
|
|
pass |
|
|
|
|
|
st.success(f"🎉 Importação concluída com sucesso! {len(df_import)} registros gravados.") |
|
|
|
|
|
except Exception as e: |
|
|
db.rollback() |
|
|
st.error(f"❌ Erro ao gravar no banco: {e}") |
|
|
|
|
|
if col2.button("❌ Cancelar importação"): |
|
|
st.warning("Importação cancelada pelo usuário.") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |