IOI-RUN / outlook_relatorio.py
Roudrigus's picture
Update outlook_relatorio.py
fbbbeb0 verified
raw
history blame
71.4 kB
# -*- coding: utf-8 -*-
import streamlit as st
import pandas as pd
from datetime import datetime, timedelta, date
import io
import re
import unicodedata
import pythoncom # ✅ COM init/finalize para evitar 'CoInitialize não foi chamado'
# (opcional) auditoria, se existir no seu projeto
try:
from utils_auditoria import registrar_log
_HAS_AUDIT = True
except Exception:
_HAS_AUDIT = False
# ==============================
# 🎨 Estilos (UX)
# ==============================
_STYLES = """
<style>
/* --- Cards/KPIs --- */
.kpi-wrap {display:grid;grid-template-columns:repeat(3,1fr);gap:14px;margin:6px 0 8px 0;}
.kpi-card {border:1px solid rgba(0,0,0,.06);border-radius:12px;padding:14px;background:#ffffff;
box-shadow: 0 1px 3px rgba(0,0,0,.04);}
.kpi-top {display:flex;align-items:center;gap:10px;margin-bottom:6px;}
.kpi-ico {font-size:22px;line-height:1;}
.kpi-title {font-size:12px;font-weight:700;color:#6c757d;letter-spacing:.2px;text-transform:uppercase;}
.kpi-val {font-size:26px;font-weight:800;color:#0b1320;margin-top:2px;}
.kpi-note {font-size:12px;color:#6c757d;margin-top:4px;}
/* --- Barra de status --- */
.status-bar {border:1px solid rgba(0,0,0,.06);border-radius:12px;padding:10px 12px;margin:4px 0 10px 0;
background:linear-gradient(180deg, rgba(248,249,250,.9), rgba(255,255,255,.9));}
.status-line {display:flex;flex-wrap:wrap;gap:10px;font-size:13px;color:#495057;}
.badge {display:inline-flex;align-items:center;gap:6px;padding:4px 8px;border-radius:999px;background:#eef2ff;color:#1d4ed8;
border:1px solid #e0e7ff;font-weight:600;}
.badge.gray {background:#f8f9fa;color:#495057;border:1px solid #e9ecef;}
.badge.green {background:#e7f6ec;color:#1b5e20;border:1px solid #cdebd7;}
.badge.amber {background:#fff4e5;color:#8a4b08;border:1px solid #ffe2bf;}
/* --- Toolbar Downloads (fixa no rodapé) --- */
.dl-bar {position:sticky;bottom:8px;z-index:99;display:flex;gap:10px;padding:8px;background:rgba(255,255,255,.85);
border:1px solid rgba(0,0,0,.06);border-radius:12px;backdrop-filter:saturate(180%) blur(8px);}
@media (max-width: 900px){ .kpi-wrap{grid-template-columns:1fr 1fr;} }
@media (max-width: 600px){ .kpi-wrap{grid-template-columns:1fr;} .dl-bar{flex-wrap:wrap;} }
</style>
"""
# ==============================
# Utils — exportação / indicadores
# ==============================
def _build_downloads(df: pd.DataFrame, base_name: str):
"""Cria botões de download (CSV, Excel e PDF) para o DataFrame."""
if df.empty:
st.warning("Nenhum dado para exportar.")
return
st.markdown('<div class="dl-bar">', unsafe_allow_html=True)
# CSV
csv_buf = io.StringIO()
df.to_csv(csv_buf, index=False, encoding="utf-8-sig")
st.download_button(
"⬇️ Baixar CSV",
data=csv_buf.getvalue(),
file_name=f"{base_name}.csv",
mime="text/csv",
key=f"dl_csv_{base_name}"
)
# Excel (com autoajuste de larguras)
xlsx_buf = io.BytesIO()
with pd.ExcelWriter(xlsx_buf, engine="openpyxl") as writer:
df.to_excel(writer, index=False, sheet_name="Relatorio")
ws = writer.sheets["Relatorio"]
# 🔎 Ajuste automático de largura das colunas
from openpyxl.utils import get_column_letter
for col_idx, col_cells in enumerate(ws.columns, start=1):
max_len = 0
for cell in col_cells:
try:
v = "" if cell.value is None else str(cell.value)
max_len = max(max_len, len(v))
except Exception:
pass
ws.column_dimensions[get_column_letter(col_idx)].width = min(max_len + 2, 60)
xlsx_buf.seek(0)
st.download_button(
"⬇️ Baixar Excel",
data=xlsx_buf,
file_name=f"{base_name}.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
key=f"dl_xlsx_{base_name}"
)
# PDF (resumo até 100 linhas)
try:
from reportlab.lib.pagesizes import A4, landscape
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer
from reportlab.lib import colors
from reportlab.lib.styles import getSampleStyleSheet
pdf_buf = io.BytesIO()
doc = SimpleDocTemplate(
pdf_buf, pagesize=landscape(A4),
rightMargin=20, leftMargin=20, topMargin=20, bottomMargin=20
)
styles = getSampleStyleSheet()
story = [Paragraph(f"Relatório de E-mails — {base_name}", styles["Title"]), Spacer(1, 12)]
df_show = df.copy().head(100)
data_table = [list(df_show.columns)] + df_show.astype(str).values.tolist()
table = Table(data_table, repeatRows=1)
table.setStyle(TableStyle([
("BACKGROUND", (0,0), (-1,0), colors.HexColor("#E9ECEF")),
("TEXTCOLOR", (0,0), (-1,0), colors.HexColor("#212529")),
("GRID", (0,0), (-1,-1), 0.25, colors.HexColor("#ADB5BD")),
("FONTNAME", (0,0), (-1,0), "Helvetica-Bold"),
("FONTNAME", (0,1), (-1,-1), "Helvetica"),
("FONTSIZE", (0,0), (-1,-1), 9),
("ALIGN", (0,0), (-1,-1), "LEFT"),
("VALIGN", (0,0), (-1,-1), "MIDDLE"),
]))
story.append(table)
doc.build(story)
pdf_buf.seek(0)
st.download_button(
"⬇️ Baixar PDF",
data=pdf_buf,
file_name=f"{base_name}.pdf",
mime="application/pdf",
key=f"dl_pdf_{base_name}"
)
except Exception as e:
st.info(f"PDF: não foi possível gerar o arquivo (ReportLab). Detalhe: {e}")
st.markdown("</div>", unsafe_allow_html=True) # fecha dl-bar
# ======================================================
# Helpers para clientes múltiplos e Indicadores visuais
# ======================================================
def _ensure_client_exploded(df: pd.DataFrame) -> pd.DataFrame:
"""
Garante 'cliente_lista' a partir de 'cliente' separando por ';' quando necessário
e retorna um DF 'explodido' (uma linha por cliente), sem perder as colunas originais.
"""
if df.empty:
return df.copy()
df2 = df.copy()
if "cliente_lista" not in df2.columns and "cliente" in df2.columns:
def _split_by_semicolon(s):
if pd.isna(s):
return None
parts = [p.strip() for p in str(s).split(";")]
parts = [p for p in parts if p]
return parts or None
df2["cliente_lista"] = df2["cliente"].apply(_split_by_semicolon)
if "cliente_lista" in df2.columns:
try:
exploded = df2.explode("cliente_lista", ignore_index=True)
exploded["cliente_lista"] = exploded["cliente_lista"].astype(str).str.strip()
exploded = exploded[exploded["cliente_lista"].notna() & (exploded["cliente_lista"].str.len() > 0)]
return exploded
except Exception:
return df2
return df2
# ==============================
# ✅ NOVO: visão da Tabela (cliente único + Data/Hora + ícones)
# ==============================
def _build_table_view_unique_client(df: pd.DataFrame) -> pd.DataFrame:
"""
Prepara a Tabela com:
- Cliente único por linha (explode por ';')
- RecebidoEm separado em Data e Hora
- Colunas visuais (📎, 🔔, 👁️)
"""
if df.empty:
return df.copy()
base = df.copy()
# 1) Garante lista de clientes e "explode"
base = _ensure_client_exploded(base)
if "cliente_lista" not in base.columns and "cliente" in base.columns:
base["cliente_lista"] = base["cliente"].apply(
lambda s: [p.strip() for p in str(s).split(";") if p.strip()] if pd.notna(s) else None
)
try:
base = base.explode("cliente_lista", ignore_index=True)
except Exception:
pass
# Nome final do cliente para a Tabela
base["Cliente"] = base["cliente_lista"].where(base["cliente_lista"].notna(), base.get("cliente"))
# 2) Separa RecebidoEm em Data e Hora
dt = pd.to_datetime(base.get("RecebidoEm"), errors="coerce")
base["Data"] = dt.dt.strftime("%d/%m/%Y").fillna("")
base["Hora"] = dt.dt.strftime("%H:%M").fillna("")
# 3) Colunas visuais
anexos = base.get("Anexos")
if anexos is not None:
base["📎"] = anexos.fillna(0).astype(int).apply(lambda n: "📎" if n > 0 else "")
else:
base["📎"] = ""
imp = base.get("Importancia")
if imp is not None:
mapa_imp = {"2": "🔴 Alta", "1": "🟡 Normal", "0": "⚪ Baixa", 2: "🔴 Alta", 1: "🟡 Normal", 0: "⚪ Baixa"}
base["🔔"] = base["Importancia"].map(mapa_imp).fillna("")
else:
base["🔔"] = ""
if "Lido" in base.columns:
base["👁️"] = base["Lido"].apply(lambda v: "✅" if bool(v) else "⏳")
else:
base["👁️"] = ""
# 4) Alias para Placa
if "placa" in base.columns and "Placa" not in base.columns:
base["Placa"] = base["placa"]
# 5) Ordenação das colunas para leitura profissional
prefer = [
"Data", "Hora", "Cliente", "tipo", "Placa", "Assunto",
"Status_join", "portaria",
"📎", "🔔", "👁️",
"Anexos", "TamanhoKB", "Remetente",
"PastaPath", "Pasta",
]
prefer = [c for c in prefer if c in base.columns]
drop_aux = {"cliente_lista"} # oculta auxiliar interna
others = [c for c in base.columns if c not in (set(prefer) | drop_aux | {"RecebidoEm"})]
cols = prefer + others
df_show = base[cols].copy()
return df_show
# ==============================
# Indicadores (visual + profissional)
# ==============================
import plotly.express as px
def _render_indicators_custom(df: pd.DataFrame, dt_col_name: str, cols_for_topn: list[str], topn_default: int = 10):
"""
Indicadores com:
- Série temporal (gráfico)
- Top Clientes (tratando clientes múltiplos separados por ';')
- Outros Top N (Remetente, Tipo, Categoria, Status etc.)
Em layout de duas colunas para não ocupar a página inteira.
Inclui controles: Top N e Mostrar tabelas.
"""
if df.empty:
st.info("Nenhum dado após filtros. Ajuste os filtros para ver indicadores.")
return
st.subheader("📊 Indicadores")
# Controles globais dos Indicadores
ctl1, ctl2, ctl3 = st.columns([1,1,2])
topn = ctl1.selectbox("Top N", [5, 10, 15, 20, 30], index=[5,10,15,20,30].index(topn_default))
show_tables = ctl2.checkbox("Mostrar tabelas abaixo dos gráficos", value=True)
palette = ctl3.selectbox("Tema de cor", ["plotly_white", "plotly", "ggplot2", "seaborn"], index=0)
# ===== 1) Série temporal =====
if dt_col_name in df.columns:
try:
_dt = pd.to_datetime(df[dt_col_name], errors="coerce")
por_dia = _dt.dt.date.value_counts().sort_index()
if not por_dia.empty:
fig_ts = px.bar(
por_dia, x=por_dia.index, y=por_dia.values,
labels={"x": "Data", "y": "Qtd"},
title="Mensagens por dia",
template=palette,
height=300,
)
fig_ts.update_layout(margin=dict(l=10, r=10, t=50, b=10))
st.plotly_chart(fig_ts, use_container_width=True)
except Exception:
pass
# ===== 2) Top Clientes =====
col_left, col_right = st.columns(2)
with col_left:
st.markdown("### 👥 Top Clientes")
df_clients = _ensure_client_exploded(df)
if "cliente_lista" in df_clients.columns:
vc_clientes = (
df_clients["cliente_lista"]
.dropna()
.astype(str)
.str.strip()
.value_counts()
.head(topn)
)
if not vc_clientes.empty:
ordered = vc_clientes.sort_values(ascending=True)
fig_cli = px.bar(
ordered,
x=ordered.values, y=ordered.index, orientation="h",
labels={"x": "Qtd", "y": "Cliente"},
title=f"Top {topn} Clientes",
template=palette, height=420,
)
fig_cli.update_layout(margin=dict(l=10, r=10, t=50, b=10))
st.plotly_chart(fig_cli, use_container_width=True)
if show_tables:
st.dataframe(
vc_clientes.rename("Qtd").to_frame(),
use_container_width=True,
height=300
)
else:
st.info("Não há clientes para exibir.")
else:
st.info("Coluna de clientes não disponível para indicador.")
# ===== 3) Outros Top N =====
with col_right:
st.markdown("### 🏆 Outros Top N")
ignore_cols = {"cliente", "cliente_lista"}
cols_others = [c for c in (cols_for_topn or []) if c in df.columns and c not in ignore_cols]
if not cols_others:
fallback_cols = [c for c in ["Remetente", "tipo", "Categoria", "Status_join", "Pasta", "PastaPath"] if c in df.columns]
cols_others = fallback_cols[:3] # até 3 por padrão
for col in cols_others[:4]:
try:
st.markdown(f"**Top {topn} por `{col}`**")
if df[col].apply(lambda x: isinstance(x, list)).any():
exploded = df.explode(col)
serie = exploded[col].dropna().astype(str).str.strip().value_counts().head(topn)
else:
serie = df[col].dropna().astype(str).str.strip().value_counts().head(topn)
if serie is None or serie.empty:
st.caption("Sem dados nesta coluna.")
continue
ordered = serie.sort_values(ascending=True)
fig_any = px.bar(
ordered,
x=ordered.values, y=ordered.index, orientation="h",
labels={"x": "Qtd", "y": col},
template=palette, height=260,
)
fig_any.update_layout(margin=dict(l=10, r=10, t=30, b=10))
st.plotly_chart(fig_any, use_container_width=True)
if show_tables:
st.dataframe(
serie.rename("Qtd").to_frame(),
use_container_width=True,
height=220,
)
except Exception as e:
st.warning(f"Não foi possível calcular TopN para `{col}`: {e}")
# ==============================
# Cards/KPIs — clientes, veículos e notas
# ==============================
def _count_notes(row) -> int:
"""Conta notas em uma linha (lista em 'nota_fiscal' ou string)."""
v = row.get("nota_fiscal")
if isinstance(v, list):
return len([x for x in v if str(x).strip()])
if isinstance(v, str) and v.strip():
parts = [p for p in re.split(r"[^\d]+", v) if p.strip()]
return len(parts)
return 0
def _render_kpis(df: pd.DataFrame):
"""Renderiza cards de KPIs e tabelas de resumo por cliente, incluindo tipos de operação."""
if df.empty:
return
# Preferimos 'cliente_lista' para múltiplos; caso contrário, 'cliente'
cliente_col = "cliente_lista" if "cliente_lista" in df.columns else ("cliente" if "cliente" in df.columns else None)
tipo_col = "tipo" if "tipo" in df.columns else None
placa_col = "placa" if "placa" in df.columns else ("Placa" if "Placa" in df.columns else None)
_df = df.copy()
_df["__notes_count__"] = _df.apply(_count_notes, axis=1)
# Se for lista, vamos explodir para análises por cliente
_df_exploded = None
if cliente_col == "cliente_lista":
try:
_df_exploded = _df.explode("cliente_lista")
_df_exploded["cliente_lista"] = _df_exploded["cliente_lista"].astype(str).str.strip()
except Exception:
_df_exploded = None
# Clientes únicos
if cliente_col == "cliente_lista" and _df_exploded is not None:
clientes_unicos = _df_exploded["cliente_lista"].dropna().astype(str).str.strip().nunique()
elif cliente_col:
clientes_unicos = _df[cliente_col].dropna().astype(str).str.strip().nunique()
else:
clientes_unicos = 0
# Placas únicas
placas_unicas = _df[placa_col].dropna().nunique() if placa_col else 0
# Total de notas
notas_total = int(_df["__notes_count__"].sum())
st.markdown('<div class="kpi-wrap">', unsafe_allow_html=True)
st.markdown(
f"""
<div class="kpi-card">
<div class="kpi-top"><div class="kpi-ico">👥</div><div class="kpi-title">Clientes (únicos)</div></div>
<div class="kpi-val">{clientes_unicos}</div>
<div class="kpi-note">Número total de clientes distintos no período.</div>
</div>
""",
unsafe_allow_html=True,
)
st.markdown(
f"""
<div class="kpi-card">
<div class="kpi-top"><div class="kpi-ico">🚚</div><div class="kpi-title">Placas (veículos únicos)</div></div>
<div class="kpi-val">{placas_unicas}</div>
<div class="kpi-note">Contagem de veículos distintos identificados.</div>
</div>
""",
unsafe_allow_html=True,
)
st.markdown(
f"""
<div class="kpi-card">
<div class="kpi-top"><div class="kpi-ico">🧾</div><div class="kpi-title">Notas (total)</div></div>
<div class="kpi-val">{notas_total}</div>
<div class="kpi-note">Soma de notas fiscais informadas nos e-mails.</div>
</div>
""",
unsafe_allow_html=True,
)
st.markdown("</div>", unsafe_allow_html=True)
try:
media_placas_por_cliente = (placas_unicas / clientes_unicos) if clientes_unicos else 0
media_notas_por_cliente = (notas_total / clientes_unicos) if clientes_unicos else 0
st.caption(
f"📌 Média de **placas por cliente**: {media_placas_por_cliente:.2f} • "
f"Média de **notas por cliente**: {media_notas_por_cliente:.2f}"
)
except Exception:
pass
with st.expander("📒 Detalhes por cliente (resumo)", expanded=False):
# ✅ Operações por cliente e tipo
if cliente_col and tipo_col:
st.write("**Operações por cliente (contagem por tipo)**")
if cliente_col == "cliente_lista" and _df_exploded is not None:
operacoes_por_cliente = (
_df_exploded.dropna(subset=["cliente_lista", tipo_col])
.groupby(["cliente_lista", tipo_col])
.size()
.unstack(fill_value=0)
.sort_index()
)
else:
operacoes_por_cliente = (
_df.dropna(subset=[cliente_col, tipo_col])
.groupby([cliente_col, tipo_col])
.size()
.unstack(fill_value=0)
.sort_index()
)
st.dataframe(operacoes_por_cliente, use_container_width=True)
# ✅ Gráfico de barras agrupadas
st.write("📊 **Gráfico: Operações por cliente e tipo**")
try:
df_plot = operacoes_por_cliente.reset_index().melt(
id_vars=(["cliente_lista"] if cliente_col == "cliente_lista" else [cliente_col]),
var_name="Tipo", value_name="Quantidade"
)
x_col = "cliente_lista" if cliente_col == "cliente_lista" else cliente_col
fig = px.bar(
df_plot, x=x_col, y="Quantidade", color="Tipo", barmode="group",
title="Operações por Cliente e Tipo", text="Quantidade", template="plotly_white"
)
fig.update_layout(xaxis_title="Cliente", yaxis_title="Quantidade", legend_title="Tipo", height=460)
st.plotly_chart(fig, use_container_width=True)
except Exception as e:
st.warning(f"Não foi possível gerar o gráfico: {e}")
else:
st.info("Sem colunas suficientes para resumo de operações por tipo.")
# ✅ Veículos por cliente
if cliente_col and placa_col:
if cliente_col == "cliente_lista" and _df_exploded is not None:
veic_por_cliente = (
_df_exploded.dropna(subset=["cliente_lista", placa_col])
.groupby("cliente_lista")[placa_col].nunique()
.sort_values(ascending=False)
.rename("Placas_Únicas")
.to_frame()
)
else:
veic_por_cliente = (
_df.dropna(subset=[cliente_col, placa_col])
.groupby(cliente_col)[placa_col].nunique()
.sort_values(ascending=False)
.rename("Placas_Únicas")
.to_frame()
)
st.write("**Veículos (placas únicas) por cliente — Top 20**")
st.dataframe(veic_por_cliente.head(20), use_container_width=True, height=460)
else:
st.info("Sem colunas de cliente/placa suficientes para o resumo de veículos.")
# ✅ Notas por cliente
if cliente_col:
if cliente_col == "cliente_lista" and _df_exploded is not None:
notas_por_cliente = (
_df_exploded.groupby("cliente_lista")["__notes_count__"]
.sum()
.sort_values(ascending=False)
.rename("Notas_Total")
.to_frame()
)
else:
notas_por_cliente = (
_df.groupby(cliente_col)["__notes_count__"]
.sum()
.sort_values(ascending=False)
.rename("Notas_Total")
.to_frame()
)
st.write("**Notas por cliente — Top 20**")
st.dataframe(notas_por_cliente.head(20), use_container_width=True, height=460)
else:
st.info("Sem coluna de cliente para o resumo de notas.")
# ==============================
# Funções auxiliares — normalização / validação
# ==============================
def _strip_accents(s: str) -> str:
if not isinstance(s, str):
return s
return "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn")
def _html_to_text(html: str) -> str:
if not html:
return ""
text = re.sub(r"(?is)<style.*?>.*?</style>", " ", html)
text = re.sub(r"(?is)<script.*?>.*?</script>", " ", text)
text = re.sub(r"(?is)<br\s*/?>", "\n", text)
text = re.sub(r"(?is)</p>", "\n", text)
text = re.sub(r"(?is)<.*?>", " ", text)
text = re.sub(r"[ \t]+", " ", text)
text = re.sub(r"\s*\n\s*", "\n", text).strip()
return text
def _normalize_spaces(s: str) -> str:
if not s:
return ""
s = s.replace("\r", "\n")
s = re.sub(r"[ \t]+", " ", s)
s = re.sub(r"\n{2,}", "\n", s).strip()
return s
def _normalize_person_name(s: str) -> str:
"""
Converte 'Sobrenome, Nome' -> 'Nome Sobrenome'.
Mantém como está se não houver vírgula.
"""
if not s:
return s
s = s.strip()
if "," in s:
partes = [p.strip() for p in s.split(",")]
if len(partes) >= 2:
return f"{partes[1]} {partes[0]}".strip()
return s
# ====== Utilitário robusto para extrair e validar PLACA ======
def _clean_plate(p: str) -> str:
"""Normaliza a placa: remove espaços/hífens e deixa maiúscula."""
return re.sub(r"[^A-Za-z0-9]", "", (p or "")).upper()
def _is_valid_plate(p: str) -> bool:
"""
Valida placa nos formatos:
- antigo: AAA1234
- Mercosul: AAA1B23
"""
if not p:
return False
p = _clean_plate(p)
if len(p) != 7:
return False
if re.fullmatch(r"[A-Z]{3}[0-9]{4}", p): # antigo
return True
if re.fullmatch(r"[A-Z]{3}[0-9][A-Z][0-9]{2}", p): # Mercosul
return True
return False
def _format_plate(p: str) -> str:
"""Formata visualmente: antigo → AAA-1234; Mercosul permanece sem hífen."""
p0 = _clean_plate(p)
if re.fullmatch(r"[A-Z]{3}[0-9]{4}", p0):
return f"{p0[:3]}-{p0[3:]}"
return p0
def _extract_plate_from_text(text: str) -> str | None:
"""Extrai placa do texto com robustez."""
if not text:
return None
t = _strip_accents(text).upper()
t = re.sub(r"PLACA\s+DE\s+AUTORIZA\w+", " ", t)
# 1) Por linhas contendo 'PLACA' e NÃO 'AUTORIZA'
for ln in [x.strip() for x in t.splitlines() if x.strip()]:
if "PLACA" in ln and "AUTORIZA" not in ln:
for m in re.finditer(r"\b[A-Z0-9]{7}\b", ln):
cand = m.group(0)
if _is_valid_plate(cand):
return _format_plate(cand)
m_old = re.search(r"\b([A-Z]{3})[ -]?([0-9]{4})\b", ln)
if m_old:
cand = (m_old.group(1) + m_old.group(2)).upper()
if _is_valid_plate(cand):
return _format_plate(cand)
m_new = re.search(r"\b([A-Z]{3})[ -]?([0-9])[ -]?([A-Z])[ -]?([0-9]{2})\b", ln)
if m_new:
cand = (m_new.group(1) + m_new.group(2) + m_new.group(3) + m_new.group(4)).upper()
if _is_valid_plate(cand):
return _format_plate(cand)
# 2) Fallbacks
for m in re.finditer(r"\b([A-Z]{3})[ -]?([0-9]{4})\b", t):
cand = (m.group(1) + m.group(2)).upper()
if _is_valid_plate(cand):
return _format_plate(cand)
for m in re.finditer(r"\b([A-Z]{3})[ -]?([0-9])[ -]?([A-Z])[ -]?([0-9]{2})\b", t):
cand = (m.group(1) + m.group(2) + m.group(3) + m.group(4)).upper()
if _is_valid_plate(cand):
return _format_plate(cand)
return None
# ====== Sanitização de CLIENTE ======
def _sanitize_cliente(s: str | None) -> str | None:
if not s:
return s
s2 = re.sub(r"\s*:\s*", " ", s)
s2 = re.sub(r"\s+", " ", s2).strip(" -:|")
return s2.strip()
def _split_semicolon(s: str | None) -> list[str] | None:
if not s:
return None
parts = [p.strip(" -:|").strip() for p in s.split(";")]
parts = [p for p in parts if p]
return parts or None
def _cliente_to_list(cliente_raw: str | None) -> list[str] | None:
s = _sanitize_cliente(cliente_raw)
return _split_semicolon(s)
# ==============================
# Parser do corpo do e‑mail (modelo padrão)
# ==============================
def _parse_email_body(raw_text: str) -> dict:
"""Extrai campos estruturados do e‑mail padrão."""
text = _normalize_spaces(raw_text or "")
text_acc = _strip_accents(text)
def get(pat, acc=False, flags=re.IGNORECASE | re.MULTILINE):
return (re.search(pat, text_acc if acc else text, flags) or re.search(pat, text, flags))
data = {}
m = get(r"\bItem\s+ID\s*([0-9]+)")
data["item_id"] = m.group(1) if m else None
m = get(r"Portaria\s*-\s*([^\n]+)")
data["portaria"] = (m.group(1).strip() if m else None)
m = get(r"\b([0-9]{2}/[0-9]{2}/[0-9]{4})\s+([0-9]{2}:[0-9]{2})\b")
data["timestamp_corpo_data"] = m.group(1) if m else None
data["timestamp_corpo_hora"] = m.group(2) if m else None
m = get(r"NOTA\s+FISCAL\s*([0-9/\- ]+)")
nf = None
if m:
nf = re.sub(r"\s+", "", m.group(1))
nf = [p for p in nf.split("/") if p]
data["nota_fiscal"] = nf
m = get(r"TIPO\s*([A-ZÁÂÃÉÍÓÚÇ ]+)")
data["tipo"] = (m.group(1).strip() if m else None)
m = get(r"N[º°]?\s*DA\s*PLACA\s*DE\s*AUTORIZA[ÇC]AO\s*([0-9]+)", acc=True)
data["num_placa_autorizacao"] = m.group(1) if m else None
# CLIENTE — ":" opcional
m = get(r"CLIENTE\s*:?\s*([^\n]+)")
cliente_raw = (m.group(1).strip() if m else None)
data["cliente"] = _sanitize_cliente(cliente_raw)
data["cliente_lista"] = _cliente_to_list(cliente_raw)
# STATUS — ":" opcional; separa por múltiplos espaços/tab
m = get(r"STATUS\s*:?\s*([^\n]+)")
status_raw = m.group(1).strip() if m else None
data["status_lista"] = [p.strip() for p in re.split(r"\s{2,}|\t", status_raw) if p.strip()] if status_raw else None
# Liberações (ENTRADA/SAÍDA)
m = get(r"LIBERA[ÇC][AÃ]O\s*DE\s*ENTRADA\s*([^\n]*)", acc=True)
liber_entr = (m.group(1).strip() or None) if m else None
data["liberacao_entrada"] = liber_entr
data["liberacao_entrada_responsavel"] = _normalize_person_name(liber_entr) if liber_entr else None
data["liberacao_entrada_flag"] = bool(liber_entr)
m = get(r"LIBERA[ÇC][AÃ]O\s*DE\s*SA[IÍ]DA\s*([^\n]*)", acc=True)
liber_saida = (m.group(1).strip() or None) if m else None
data["liberacao_saida"] = liber_saida
data["liberacao_saida_responsavel"] = _normalize_person_name(liber_saida) if liber_saida else None
data["liberacao_saida_flag"] = bool(liber_saida)
m = get(r"RG\s*ou\s*CPF\s*do\s*MOTORISTA\s*([0-9.\-]+)")
data["rg_cpf_motorista"] = m.group(1) if m else None
# Placa do veículo
data["placa"] = _extract_plate_from_text(text)
# Horários
m = get(r"HR\s*ENTRADA\s*:?\s*([0-9]{2}:[0-9]{2})")
data["hr_entrada"] = m.group(1) if m else None
m = get(r"HR\s*SA[IÍ]DA\s*:?\s*([0-9]{2}:[0-9]{2})", acc=True)
data["hr_saida"] = m.group(1) if m else None
edit_saida = get(r"HR\s*SA[IÍ]DA.*?\bEditado\b", acc=True, flags=re.IGNORECASE | re.DOTALL)
data["hr_saida_editado"] = bool(edit_saida)
m = get(r"DATA\s*DA\s*OPERA[ÇC][AÃ]O\s*([0-9]{2}/[0-9]{2}/[0-9]{4})", acc=True)
data["data_operacao"] = m.group(1) if m else None
m = get(r"Palavras[- ]chave\s*Corporativas\s*([^\n]*)", acc=True)
data["palavras_chave"] = (m.group(1).strip() or None) if m else None
m = get(r"AGENDAMENTO\s*(SIM|N[ÃA]O)", acc=True)
agend = m.group(1).upper() if m else None
data["agendamento"] = {"SIM": True, "NAO": False, "NÃO": False}.get(agend, None)
data["status_editado"] = any("editado" in p.lower() for p in (data["status_lista"] or []))
data["Status_join"] = " | ".join(data["status_lista"]) if isinstance(data.get("status_lista"), list) else (data.get("status_lista") or "")
return data
# ==============================
# Outlook Desktop — listagem / leitura
# ==============================
def _list_inbox_paths(ns) -> list[str]:
"""Inbox da caixa padrão (para retrocompatibilidade)."""
paths = ["Inbox"]
try:
olFolderInbox = 6
inbox = ns.GetDefaultFolder(olFolderInbox)
except Exception:
return []
def _walk(folder, prefix="Inbox"):
try:
for i in range(1, folder.Folders.Count + 1):
f = folder.Folders.Item(i)
full_path = prefix + "\\" + f.Name
paths.append(full_path)
_walk(f, full_path)
except Exception:
pass
_walk(inbox, "Inbox")
return paths
def _list_all_inbox_paths(ns) -> list[str]:
"""Lista caminhos de Inbox para TODAS as stores (caixas), ex.: 'Minha Caixa\\Inbox\\Sub'."""
paths: list[str] = []
olFolderInbox = 6
try:
for i in range(1, ns.Folders.Count + 1):
store = ns.Folders.Item(i)
root_name = str(store.Name).strip()
try:
inbox = store.GetDefaultFolder(olFolderInbox)
except Exception:
continue
inbox_display = str(inbox.Name).strip()
root_prefix = f"{root_name}\\{inbox_display}"
paths.append(root_prefix)
def _walk(folder, prefix):
try:
for j in range(1, folder.Folders.Count + 1):
f2 = folder.Folders.Item(j)
full_path = prefix + "\\" + f2.Name
paths.append(full_path)
_walk(f2, full_path)
except Exception:
pass
_walk(inbox, root_prefix)
except Exception:
pass
return sorted(set(paths))
def _get_folder_by_path_any_store(ns, path: str):
"""
Resolve caminho de pasta começando por:
- 'Inbox\\...' (ou nome local, ex.: 'Caixa de Entrada\\...')
- 'Sent Items\\...' (ou nome local, ex.: 'Itens Enviados\\...')
- 'NomeDaStore\\Inbox\\...' OU 'NomeDaStore\\<NomeLocalInbox>\\...'
- 'NomeDaStore\\Sent Items\\...' OU 'NomeDaStore\\<NomeLocalSent>\\...'
Faz match case-insensitive nas subpastas.
"""
OL_INBOX = 6
OL_SENT = 5
p = (path or "").strip().replace("/", "\\")
parts = [x for x in p.split("\\") if x]
if not parts:
raise RuntimeError("Caminho da pasta vazio. Ex.: Inbox\\Financeiro, Sent Items\\Faturamento ou Minha Caixa\\Inbox\\Operacional")
# Nomes locais (ex.: PT-BR)
try:
inbox_local = ns.GetDefaultFolder(OL_INBOX).Name
except Exception:
inbox_local = "Inbox"
try:
sent_local = ns.GetDefaultFolder(OL_SENT).Name
except Exception:
sent_local = "Sent Items"
inbox_names = {"inbox", str(inbox_local).strip().lower()}
sent_names = {"sent items", str(sent_local).strip().lower()}
def _descend_from(root_folder, segs):
folder = root_folder
for seg in segs:
target = seg.strip().lower()
found = None
for i in range(1, folder.Folders.Count + 1):
cand = folder.Folders.Item(i)
if str(cand.Name).strip().lower() == target:
found = cand
break
if not found:
raise RuntimeError(f"Subpasta não encontrada: '{seg}' em '{folder.Name}'")
folder = found
return folder
# 1) Tenta como 'Store\\(Inbox|Sent Items)\\...'
if len(parts) >= 2:
store_name_candidate = parts[0].strip().lower()
for i in range(1, ns.Folders.Count + 1):
store = ns.Folders.Item(i)
if str(store.Name).strip().lower() == store_name_candidate:
first_seg = parts[1].strip().lower()
if first_seg in inbox_names:
root = store.GetDefaultFolder(OL_INBOX)
return _descend_from(root, parts[2:])
if first_seg in sent_names:
root = store.GetDefaultFolder(OL_SENT)
return _descend_from(root, parts[2:])
raise RuntimeError(
f"Segundo segmento deve ser 'Inbox'/'{inbox_local}' ou 'Sent Items'/'{sent_local}' para a store '{store.Name}'."
)
# 2) Caso contrário, usa a store padrão (Inbox ou Sent Items conforme o primeiro segmento)
first = parts[0].strip().lower()
if first in inbox_names:
root = ns.GetDefaultFolder(OL_INBOX)
return _descend_from(root, parts[1:])
if first in sent_names:
root = ns.GetDefaultFolder(OL_SENT)
return _descend_from(root, parts[1:])
raise RuntimeError(
f"O caminho deve começar por 'Inbox' (ou '{inbox_local}') ou 'Sent Items' (ou '{sent_local}'), "
f"ou por 'NomeDaStore\\Inbox' / 'NomeDaStore\\Sent Items'."
)
def _read_folder_dataframe(folder, path: str, dias: int, filtro_remetente: str, extrair_campos: bool) -> pd.DataFrame:
"""
Lê e-mails de uma pasta específica e retorna DataFrame.
✅ Fallback: se Restrict retornar 0 itens, volta para iteração completa + filtro por data.
"""
items = folder.Items
items.Sort("[ReceivedTime]", True) # decrescente
cutoff = datetime.now() - timedelta(days=dias)
iter_items = items
restricted_ok = False
restricted_count = None
try:
# Formato recomendado pelo Outlook MAPI: US locale "mm/dd/yyyy hh:mm AM/PM"
dt_from = cutoff.strftime("%m/%d/%Y %I:%M %p")
iter_items = items.Restrict(f"[ReceivedTime] >= '{dt_from}'")
restricted_ok = True
try:
restricted_count = iter_items.Count
except Exception:
restricted_count = None
except Exception:
restricted_ok = False
# ✅ Se Restrict "funcionou" mas não retornou nada, aplica fallback
if restricted_ok and (restricted_count is None or restricted_count == 0):
iter_items = items
restricted_ok = False
rows = []
total_lidos = 0
for mail in iter_items:
total_lidos += 1
try:
if getattr(mail, "Class", None) != 43: # 43 = MailItem
continue
# Filtro manual de data quando Restrict não foi usado
if not restricted_ok:
try:
if getattr(mail, "ReceivedTime", None) and mail.ReceivedTime < cutoff:
continue
except Exception:
pass
try:
sender = mail.SenderEmailAddress or mail.Sender.Name
except Exception:
sender = getattr(mail, "SenderName", None)
if filtro_remetente and sender:
if filtro_remetente.lower() not in str(sender).lower():
continue
base = {
"Pasta": folder.Name,
"PastaPath": path,
"Assunto": mail.Subject,
"Remetente": sender,
"RecebidoEm": mail.ReceivedTime.strftime("%Y-%m-%d %H:%M"),
"Anexos": mail.Attachments.Count if hasattr(mail, "Attachments") else 0,
"TamanhoKB": round(mail.Size / 1024, 1) if hasattr(mail, "Size") else None,
"Importancia": str(getattr(mail, "Importance", "")),
"Categoria": getattr(mail, "Categories", "") or "",
"Lido": bool(getattr(mail, "UnRead", False) == False),
}
if extrair_campos:
raw_body = ""
try:
raw_body = mail.Body or ""
except Exception:
raw_body = ""
if not raw_body:
try:
raw_body = _html_to_text(mail.HTMLBody)
except Exception:
raw_body = ""
parsed = _parse_email_body(raw_body)
base.update(parsed)
try:
base["__corpo__"] = _strip_accents(raw_body)[:800]
except Exception:
base["__corpo__"] = ""
rows.append(base)
except Exception as e:
rows.append({"Pasta": folder.Name, "PastaPath": path, "Assunto": f"[ERRO] {e}"})
df = pd.DataFrame(rows)
with st.expander(f"🔎 Diagnóstico • {path}", expanded=False):
st.caption(
f"Itens enumerados: **{total_lidos}** "
f"| Restrict aplicado: **{restricted_ok}** "
f"{'| Restrict.Count=' + str(restricted_count) if restricted_count is not None else ''} "
f"| Registros DF: **{df.shape[0]}**"
)
return df
def gerar_relatorio_outlook_desktop_multi(
pastas: list[str],
dias: int,
filtro_remetente: str = "",
extrair_campos: bool = True
) -> pd.DataFrame:
"""Inicializa COM, conecta ao Outlook, lê múltiplas pastas (todas as stores) e finaliza COM."""
try:
import win32com.client
pythoncom.CoInitialize()
ns = win32com.client.Dispatch("Outlook.Application").GetNamespace("MAPI")
except Exception as e:
st.error(f"Falha ao conectar ao Outlook/pywin32: {e}")
return pd.DataFrame()
frames = []
try:
for path in pastas:
try:
folder = _get_folder_by_path_any_store(ns, path)
df = _read_folder_dataframe(folder, path, dias, filtro_remetente, extrair_campos)
frames.append(df)
except Exception as e:
st.warning(f"Não foi possível ler a pasta '{path}': {e}")
return pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()
finally:
try:
pythoncom.CoUninitialize()
except Exception:
pass
# ==============================
# Cache / refresh control
# ==============================
@st.cache_data(show_spinner=False, ttl=60)
def _cache_outlook_df(pastas: tuple, dias: int, filtro_remetente: str, extrair_campos: bool, _v: int = 1) -> pd.DataFrame:
"""Cache com TTL para reduzir leituras do Outlook."""
return gerar_relatorio_outlook_desktop_multi(list(pastas), dias, filtro_remetente=filtro_remetente, extrair_campos=extrair_campos)
def _read_outlook_fresh(pastas: list[str], dias: int, filtro_remetente: str, extrair_campos: bool) -> pd.DataFrame:
"""Leitura direta, sem cache (para 'Atualizar agora')."""
return gerar_relatorio_outlook_desktop_multi(pastas, dias, filtro_remetente=filtro_remetente, extrair_campos=extrair_campos)
# ==============================
# Filtros dinâmicos (persistentes) + aplicação manual
# ==============================
def _sanitize_default(options: list[str], default_list: list[str]) -> list[str]:
if not options or not default_list:
return []
opts_set = set(options)
return [v for v in default_list if v in opts_set]
def _build_dynamic_filters(df: pd.DataFrame):
"""
Constrói UI de filtros e retorna (df_filtrado, cols_topn).
"""
if df.empty:
return df, []
df_f = df.copy()
try:
df_f["RecebidoEm_dt"] = pd.to_datetime(df_f.get("RecebidoEm"), errors="coerce")
except Exception:
df_f["RecebidoEm_dt"] = pd.NaT
total_inicial = len(df_f)
state_keys = {
"f_dt_ini": "flt_dt_ini",
"f_dt_fim": "flt_dt_fim",
"f_lido": "flt_lido",
"f_status": "flt_status",
"f_saida_flag": "flt_saida_flag",
"f_entrada_flag": "flt_entrada_flag",
"f_anexos_equal": "flt_anexos_equal",
"f_anexos_apply_equal": "flt_anexos_apply_equal",
"f_tamanho_equal": "flt_tamanho_equal",
"f_tamanho_apply_equal": "flt_tamanho_apply_equal",
"f_assunto": "flt_assunto",
"f_cols_topn": "flt_cols_topn",
"f_pasta": "flt_pasta",
"f_pasta_path": "flt_pasta_path",
"f_portaria": "flt_portaria",
"f_cliente": "flt_cliente",
"f_tipo": "flt_tipo",
"f_placa": "flt_placa",
"f_importancia": "flt_importancia",
"f_categoria": "flt_categoria",
"f_resp_saida": "flt_resp_saida",
"f_resp_entrada": "flt_resp_entrada",
}
for k in state_keys.values():
st.session_state.setdefault(k, None)
with st.expander("🔎 Filtros do relatório (por colunas retornadas)", expanded=True):
with st.form("form_filtros_outlook"):
# Período
min_dt = df_f["RecebidoEm_dt"].min()
max_dt = df_f["RecebidoEm_dt"].max()
if pd.notna(min_dt) and pd.notna(max_dt):
col_d1, col_d2 = st.columns(2)
dt_ini = col_d1.date_input("Data inicial (RecebidoEm)", value=st.session_state[state_keys["f_dt_ini"]] or min_dt.date(), key="f_dt_ini_widget")
dt_fim = col_d2.date_input("Data final (RecebidoEm)", value=st.session_state[state_keys["f_dt_fim"]] or max_dt.date(), key="f_dt_fim_widget")
else:
dt_ini, dt_fim = None, None
def _multi_select(col_name, label, state_key):
if col_name in df_f.columns:
options = sorted([v for v in df_f[col_name].dropna().astype(str).unique() if v != ""])
default_raw = st.session_state[state_keys[state_key]] or []
default = _sanitize_default(options, default_raw)
sel = st.multiselect(label, options=options, default=default, key=f"{state_key}_widget")
st.session_state[state_keys[state_key]] = sel
# ✅ Pasta (caminho completo) — preferido
_multi_select("PastaPath", "Pasta (caminho completo)", "f_pasta_path")
# Opcional — nome curto
_multi_select("Pasta", "Pasta (apenas nome)", "f_pasta")
_multi_select("portaria", "Portaria", "f_portaria")
# Clientes (múltiplos)
if "cliente_lista" in df_f.columns:
clientes_opts = sorted({
c.strip() for lst in df_f["cliente_lista"]
if isinstance(lst, list) for c in lst if c and str(c).strip()
})
default_raw = st.session_state[state_keys["f_cliente"]] or []
default = _sanitize_default(clientes_opts, default_raw)
cliente_sel = st.multiselect("Cliente", options=clientes_opts, default=default, key="f_cliente_widget")
st.session_state[state_keys["f_cliente"]] = cliente_sel
else:
_multi_select("cliente", "Cliente", "f_cliente")
_multi_select("tipo", "Tipo", "f_tipo")
_multi_select("placa", "Placa (veículo)", "f_placa")
_multi_select("Importancia", "Importância (0=baixa,1=normal,2=alta)", "f_importancia")
_multi_select("Categoria", "Categoria", "f_categoria")
_multi_select("liberacao_saida_responsavel", "Responsável (Liberação de Saída)", "f_resp_saida")
_multi_select("liberacao_entrada_responsavel", "Responsável (Liberação de Entrada)", "f_resp_entrada")
# Lido?
if "Lido" in df_f.columns:
options_lido = ["True", "False"]
default_raw = st.session_state[state_keys["f_lido"]] or []
default_lido = _sanitize_default(options_lido, default_raw)
lido_sel = st.multiselect("Lido?", options=options_lido, default=default_lido, key="f_lido_widget")
st.session_state[state_keys["f_lido"]] = lido_sel
# Status
if "status_lista" in df_f.columns:
all_status = sorted(set(s for v in df_f["status_lista"] if isinstance(v, list) for s in v))
default_raw = st.session_state[state_keys["f_status"]] or []
default_status = _sanitize_default(all_status, default_raw)
status_sel = st.multiselect("Status (qualquer dos selecionados)", options=all_status, default=default_status, key="f_status_widget")
st.session_state[state_keys["f_status"]] = status_sel
# Flags
if "liberacao_saida_flag" in df_f.columns:
opt_saida = st.selectbox("Teve Liberação de Saída?", ["(todas)", "Sim", "Não"],
index=(0 if st.session_state[state_keys["f_saida_flag"]] is None else ["(todas)", "Sim", "Não"].index(st.session_state[state_keys["f_saida_flag"]])),
key="f_saida_flag_widget")
st.session_state[state_keys["f_saida_flag"]] = opt_saida
if "liberacao_entrada_flag" in df_f.columns:
opt_entr = st.selectbox("Teve Liberação de Entrada?", ["(todas)", "Sim", "Não"],
index=(0 if st.session_state[state_keys["f_entrada_flag"]] is None else ["(todas)", "Sim", "Não"].index(st.session_state[state_keys["f_entrada_flag"]])),
key="f_entrada_flag_widget")
st.session_state[state_keys["f_entrada_flag"]] = opt_entr
# Numéricos
col_n1, col_n2 = st.columns(2)
if "Anexos" in df_f.columns:
col_vals = df_f["Anexos"].dropna()
if col_vals.empty:
col_n1.info("Sem valores para **Anexos** nesta seleção.")
else:
min_ax = int(col_vals.min()); max_ax = int(col_vals.max())
if min_ax == max_ax:
col_n1.info(f"Todos os registros têm **{min_ax}** anexos.")
eq_val = col_n1.number_input("Filtrar por Anexos = (opcional)", min_value=min_ax, max_value=max_ax,
value=st.session_state[state_keys["f_anexos_equal"]] or min_ax, step=1, key="f_anexos_equal_widget")
apply_eq = col_n1.checkbox("Aplicar igualdade de Anexos", value=bool(st.session_state[state_keys["f_anexos_apply_equal"]]), key="f_anexos_apply_equal_widget")
st.session_state[state_keys["f_anexos_equal"]] = eq_val
st.session_state[state_keys["f_anexos_apply_equal"]] = apply_eq
else:
rng_ax = col_n1.slider("Anexos (intervalo)", min_value=min_ax, max_value=max_ax,
value=st.session_state.get("__rng_anexos__", (min_ax, max_ax)), key="__rng_anexos__widget")
st.session_state["__rng_anexos__"] = rng_ax
if "TamanhoKB" in df_f.columns:
col_vals = df_f["TamanhoKB"].dropna()
if col_vals.empty:
col_n2.info("Sem valores para **TamanhoKB** nesta seleção.")
else:
min_sz = float(col_vals.min()); max_sz = float(col_vals.max())
if abs(min_sz - max_sz) < 1e-9:
col_n2.info(f"Todos os registros têm **{min_sz:.1f} KB**.")
eq_sz = col_n2.number_input("Filtrar por TamanhoKB = (opcional)", min_value=float(min_sz), max_value=float(max_sz),
value=st.session_state[state_keys["f_tamanho_equal"]] or float(min_sz), step=0.1, format="%.1f", key="f_tamanho_equal_widget")
apply_sz = col_n2.checkbox("Aplicar igualdade de TamanhoKB", value=bool(st.session_state[state_keys["f_tamanho_apply_equal"]]), key="f_tamanho_apply_equal_widget")
st.session_state[state_keys["f_tamanho_equal"]] = eq_sz
st.session_state[state_keys["f_tamanho_apply_equal"]] = apply_sz
else:
rng_sz = col_n2.slider("Tamanho (KB)", min_value=float(min_sz), max_value=float(max_sz),
value=st.session_state.get("__rng_tamanho__", (float(min_sz), float(max_sz))), key="__rng_tamanho__widget")
st.session_state["__rng_tamanho__"] = rng_sz
# Texto — Assunto contém
txt_default = st.session_state[state_keys["f_assunto"]] or ""
txt = st.text_input("Assunto contém (opcional)", value=txt_default, key="f_assunto_widget")
st.session_state[state_keys["f_assunto"]] = txt
# Indicadores — Top N
possiveis_cols = [c for c in ["Pasta", "PastaPath", "portaria", "cliente", "cliente_lista", "tipo", "placa",
"Importancia", "Categoria", "Remetente", "Status_join",
"liberacao_saida_responsavel", "liberacao_entrada_responsavel"] if c in df_f.columns]
desired_default = st.session_state[state_keys["f_cols_topn"]] or ["cliente", "Status_join"]
default_cols = _sanitize_default(possiveis_cols, desired_default)
cols_for_topn = st.multiselect("Colunas para indicadores (Top N)", options=possiveis_cols, default=default_cols, key="f_cols_topn_widget")
st.session_state[state_keys["f_cols_topn"]] = cols_for_topn
# Botões
col_b1, col_b2 = st.columns(2)
aplicar = col_b1.form_submit_button("✅ Aplicar filtros")
limpar = col_b2.form_submit_button("🧹 Limpar filtros")
if limpar:
for k in state_keys.values():
st.session_state[k] = None
st.session_state.pop("__rng_anexos__", None)
st.session_state.pop("__rng_tamanho__", None)
st.info("Filtros limpos.")
return df, []
if aplicar:
# período
if pd.notna(df_f["RecebidoEm_dt"]).any() and dt_ini and dt_fim:
mask_dt = (df_f["RecebidoEm_dt"].dt.date >= dt_ini) & (df_f["RecebidoEm_dt"].dt.date <= dt_fim)
df_f = df_f[mask_dt]
def _apply_multi(col_name, session_key):
sel = st.session_state.get(session_key)
nonlocal df_f
if sel and col_name in df_f.columns:
df_f = df_f[df_f[col_name].astype(str).isin(sel)]
# ✅ aplica pelo caminho completo (preciso)
_apply_multi("PastaPath", state_keys["f_pasta_path"])
# (opcional) nome curto
_apply_multi("Pasta", state_keys["f_pasta"])
_apply_multi("portaria", state_keys["f_portaria"])
_apply_multi("tipo", state_keys["f_tipo"])
_apply_multi("placa", state_keys["f_placa"])
_apply_multi("Importancia", state_keys["f_importancia"])
_apply_multi("Categoria", state_keys["f_categoria"])
_apply_multi("liberacao_saida_responsavel", state_keys["f_resp_saida"])
_apply_multi("liberacao_entrada_responsavel", state_keys["f_resp_entrada"])
# cliente (lista ou string)
cliente_sel = st.session_state[state_keys["f_cliente"]]
if cliente_sel:
if "cliente_lista" in df_f.columns:
df_f = df_f[df_f["cliente_lista"].apply(lambda lst: isinstance(lst, list) and any(c in lst for c in cliente_sel))]
elif "cliente" in df_f.columns:
df_f = df_f[df_f["cliente"].astype(str).isin(cliente_sel)]
# lido
lido_sel = st.session_state[state_keys["f_lido"]]
if lido_sel and "Lido" in df_f.columns:
df_f = df_f[df_f["Lido"].astype(str).isin(lido_sel)]
# status
status_sel = st.session_state[state_keys["f_status"]]
if status_sel and "status_lista" in df_f.columns:
df_f = df_f[df_f["status_lista"].apply(lambda lst: isinstance(lst, list) and any(s in lst for s in status_sel))]
# flags
opt_saida = st.session_state[state_keys["f_saida_flag"]]
if opt_saida in ("Sim", "Não") and "liberacao_saida_flag" in df_f.columns:
df_f = df_f[df_f["liberacao_saida_flag"] == (opt_saida == "Sim")]
opt_entr = st.session_state[state_keys["f_entrada_flag"]]
if opt_entr in ("Sim", "Não") and "liberacao_entrada_flag" in df_f.columns:
df_f = df_f[df_f["liberacao_entrada_flag"] == (opt_entr == "Sim")]
# numéricos
if st.session_state.get("__rng_anexos__") and "Anexos" in df_f.columns:
a0, a1 = st.session_state["__rng_anexos__"]
df_f = df_f[(df_f["Anexos"] >= a0) & (df_f["Anexos"] <= a1)]
if st.session_state.get("__rng_tamanho__") and "TamanhoKB" in df_f.columns:
s0, s1 = st.session_state["__rng_tamanho__"]
df_f = df_f[(df_f["TamanhoKB"] >= s0) & (df_f["TamanhoKB"] <= s1)]
# igualdade
if st.session_state.get(state_keys["f_anexos_apply_equal"]) and "Anexos" in df_f.columns:
eq_val = st.session_state.get(state_keys["f_anexos_equal"])
if eq_val is not None:
df_f = df_f[df_f["Anexos"] == int(eq_val)]
if st.session_state.get(state_keys["f_tamanho_apply_equal"]) and "TamanhoKB" in df_f.columns:
eq_sz = st.session_state.get(state_keys["f_tamanho_equal"])
if eq_sz is not None:
df_f = df_f[df_f["TamanhoKB"].round(1) == round(float(eq_sz), 1)]
# texto
txt = st.session_state[state_keys["f_assunto"]]
if txt and "Assunto" in df_f.columns:
df_f = df_f[df_f["Assunto"].astype(str).str.contains(txt.strip(), case=False, na=False)]
cols_for_topn = st.session_state[state_keys["f_cols_topn"]] or []
total_final = len(df_f)
st.caption(f"🧮 **Diagnóstico**: antes dos filtros = {total_inicial} | depois dos filtros = {total_final}")
return df_f, cols_for_topn
possiveis_cols_init = [c for c in ["Pasta", "PastaPath", "portaria", "cliente", "cliente_lista", "tipo", "placa",
"Importancia", "Categoria", "Remetente", "Status_join",
"liberacao_saida_responsavel", "liberacao_entrada_responsavel"] if c in df_f.columns]
desired_default_init = st.session_state[state_keys["f_cols_topn"]] or ["cliente", "Status_join"]
default_cols_init = _sanitize_default(possiveis_cols_init, desired_default_init)
st.caption(f"🧮 **Diagnóstico**: registros disponíveis (sem aplicar filtros) = {total_inicial}")
return df, default_cols_init
# ==============================
# UI — módulo (com buffer/caching + UX simplificada)
# ==============================
def main():
st.title("📧 Relatório • Outlook Desktop (Estruturado)")
st.caption("Escolha a Caixa (Entrada ou Itens Enviados), defina o período e extraia campos estruturados do e‑mail.")
st.markdown(_STYLES, unsafe_allow_html=True)
# Buffer de dados
st.session_state.setdefault("__outlook_df__", None)
st.session_state.setdefault("__outlook_meta__", None)
with st.expander("⚙️ Configurações", expanded=True):
with st.form("form_execucao_outlook"):
col_a, col_b, col_c = st.columns([1, 1, 1])
dias = col_a.slider("Período (últimos N dias)", min_value=1, max_value=365, value=30)
filtro_remetente = col_b.text_input("Filtrar por remetente (opcional)", value="", placeholder='Ex.: "@fornecedor.com" ou "Fulano"')
extrair_campos = col_c.checkbox("Extrair campos do e‑mail (modelo padrão)", value=True)
# Seletor simplificado
tipo_pasta = st.selectbox(
"Tipo de Caixa:",
["📥 Caixa de Entrada", "📤 Itens Enviados"],
help="Escolha se deseja ler a Caixa de Entrada ou os Itens Enviados."
)
mapa_caminho = {
"📥 Caixa de Entrada": "Inbox",
"📤 Itens Enviados": "Sent Items"
}
pasta_base = mapa_caminho[tipo_pasta]
subpasta_manual = st.text_input(
"Subpasta (opcional)",
value="",
placeholder="Ex.: Financeiro\\Operacional"
)
if subpasta_manual.strip():
pasta_final = f"{pasta_base}\\{subpasta_manual.strip()}"
else:
pasta_final = pasta_base
pastas_escolhidas = [pasta_final]
# Botões
col_btn1, col_btn2, col_btn3, col_btn4 = st.columns([1, 1, 1, 1])
submit_cache = col_btn1.form_submit_button("🔍 Gerar (com cache)")
submit_fresh = col_btn2.form_submit_button("⚡ Atualizar agora (sem cache)")
submit_clear = col_btn3.form_submit_button("🧹 Limpar cache")
submit_test = col_btn4.form_submit_button("🧪 Teste de conexão")
# Ações pós-submit
if submit_clear:
try:
_cache_outlook_df.clear()
except Exception:
try:
st.cache_data.clear()
except Exception:
pass
st.session_state["__outlook_df__"] = None
st.session_state["__outlook_meta__"] = None
st.success("Cache limpo. Gere novamente os dados.")
if submit_test:
try:
import win32com.client
pythoncom.CoInitialize()
ns = win32com.client.Dispatch("Outlook.Application").GetNamespace("MAPI")
stores = ns.Folders.Count
inbox_def = ns.GetDefaultFolder(6)
sent_def = ns.GetDefaultFolder(5)
st.info(f"Stores detectadas: **{stores}** | Inbox padrão: **{inbox_def.Name}** | Sent: **{sent_def.Name}**")
if pastas_escolhidas:
for p in pastas_escolhidas:
try:
f = _get_folder_by_path_any_store(ns, p)
st.write(f"📁 {p} → '{f.Name}' itens: {f.Items.Count}")
except Exception as e:
st.warning(f"[Teste] Falha ao acessar '{p}': {e}")
except Exception as e:
st.error(f"[Teste] Erro ao conectar: {e}")
finally:
try:
pythoncom.CoUninitialize()
except Exception:
pass
if submit_fresh:
if not pastas_escolhidas:
st.error("Selecione ao menos uma pasta.")
else:
with st.spinner("Lendo e-mails do Outlook (sem cache)..."):
df_fresh = _read_outlook_fresh(pastas_escolhidas, dias, filtro_remetente, extrair_campos)
st.session_state["__outlook_df__"] = df_fresh
st.session_state["__outlook_meta__"] = {
"pastas": list(pastas_escolhidas),
"dias": dias,
"filtro_remetente": filtro_remetente,
"extrair_campos": extrair_campos,
"loaded_at": datetime.now().strftime("%d/%m/%Y %H:%M"),
"source": "fresh"
}
if submit_cache:
if not pastas_escolhidas:
st.error("Selecione ao menos uma pasta.")
else:
with st.spinner("Lendo e-mails do Outlook (com cache)..."):
df = _cache_outlook_df(tuple(pastas_escolhidas), dias, filtro_remetente, extrair_campos, _v=1)
st.session_state["__outlook_df__"] = df
st.session_state["__outlook_meta__"] = {
"pastas": list(pastas_escolhidas),
"dias": dias,
"filtro_remetente": filtro_remetente,
"extrair_campos": extrair_campos,
"loaded_at": datetime.now().strftime("%d/%m/%Y %H:%M"),
"source": "cache"
}
# Usa o buffer se disponível
df_src = st.session_state.get("__outlook_df__")
meta = st.session_state.get("__outlook_meta__")
if isinstance(df_src, pd.DataFrame):
origem = (meta or {}).get("source", "cache/buffer")
if df_src.empty:
st.info("Nenhum e-mail encontrado para os parâmetros informados. Use o botão 🧪 Teste de conexão para diagnosticar.")
return
# 🎛️ Barra de Status
pastas_lbl = ", ".join(meta.get("pastas") or [])
extra_flag = "SIM" if meta.get("extrair_campos") else "NÃO"
st.markdown(
f"""
<div class="status-bar">
<div class="status-line">
<div class="badge">📂 Pastas: <strong>{pastas_lbl}</strong></div>
<div class="badge gray">🗓️ Dias: <strong>{meta.get('dias')}</strong></div>
<div class="badge green">🔍 Origem: <strong>{origem}</strong></div>
<div class="badge amber">🧩 Extração: <strong>{extra_flag}</strong></div>
<div class="badge gray">⏱️ Em: <strong>{meta.get('loaded_at')}</strong></div>
</div>
</div>
""", unsafe_allow_html=True
)
# Filtros dinâmicos
df_filtrado, cols_for_topn = _build_dynamic_filters(df_src)
# Alias visual para colunas úteis
if "placa" in df_filtrado.columns and "Placa" not in df_filtrado.columns:
df_filtrado = df_filtrado.copy()
df_filtrado["Placa"] = df_filtrado["placa"]
if "cliente_lista" in df_filtrado.columns and "ClienteLista" not in df_filtrado.columns:
df_filtrado = df_filtrado.copy()
df_filtrado["ClienteLista"] = df_filtrado["cliente_lista"].apply(lambda lst: "; ".join(lst) if isinstance(lst, list) else "")
# 🔖 Abas de visualização
tab_geral, tab_tabela, tab_kpis, tab_inds, tab_diag = st.tabs(
["🧭 Visão Geral", "📄 Tabela", "📈 KPIs & Gráficos", "🏆 Indicadores (Top N)", "🛠️ Diagnóstico"]
)
with tab_geral:
# Mini-resumo
qtd = len(df_filtrado)
unicos_pastas = df_filtrado["PastaPath"].nunique() if "PastaPath" in df_filtrado.columns else df_filtrado["Pasta"].nunique() if "Pasta" in df_filtrado.columns else None
dt_min = pd.to_datetime(df_filtrado["RecebidoEm"], errors="coerce").min()
dt_max = pd.to_datetime(df_filtrado["RecebidoEm"], errors="coerce").max()
st.write(f"**Registros após filtros:** {qtd} "
+ (f"• **Pastas (únicas)**: {unicos_pastas}" if unicos_pastas is not None else "")
+ (f" • **Intervalo**: {dt_min.strftime('%d/%m/%Y %H:%M')}{dt_max.strftime('%d/%m/%Y %H:%M')}" if pd.notna(dt_min) and pd.notna(dt_max) else "")
)
# Série temporal simples
if "RecebidoEm" in df_filtrado.columns:
try:
dts = pd.to_datetime(df_filtrado["RecebidoEm"], errors="coerce")
series = dts.dt.date.value_counts().sort_index()
fig = px.bar(series, x=series.index, y=series.values, labels={"x":"Data", "y":"Qtd"},
title="Mensagens por dia", template="plotly_white")
fig.update_layout(height=360)
st.plotly_chart(fig, use_container_width=True)
except Exception:
pass
# ==============================
# ✅ ABA TABELA — Cliente único + Data/Hora + ícones
# ==============================
with tab_tabela:
df_show = _build_table_view_unique_client(df_filtrado)
# 🎛️ Configurações de colunas (tipos e rótulos)
colcfg = {}
# Data e Hora: tenta usar tipos nativos; fallback para texto
if "Data" in df_show.columns:
try:
colcfg["Data"] = st.column_config.DateColumn("Data", format="DD/MM/YYYY")
except Exception:
colcfg["Data"] = st.column_config.TextColumn("Data")
if "Hora" in df_show.columns:
# Alguns builds do Streamlit não possuem TimeColumn — fallback automático
try:
colcfg["Hora"] = st.column_config.TimeColumn("Hora", format="HH:mm")
except Exception:
colcfg["Hora"] = st.column_config.TextColumn("Hora")
if "Cliente" in df_show.columns:
colcfg["Cliente"] = st.column_config.TextColumn("Cliente")
if "Placa" in df_show.columns:
colcfg["Placa"] = st.column_config.TextColumn("Placa")
if "tipo" in df_show.columns:
colcfg["tipo"] = st.column_config.TextColumn("Tipo")
if "Status_join" in df_show.columns:
colcfg["Status_join"] = st.column_config.TextColumn("Status")
if "Anexos" in df_show.columns:
colcfg["Anexos"] = st.column_config.NumberColumn("Anexos", format="%d")
if "TamanhoKB" in df_show.columns:
colcfg["TamanhoKB"] = st.column_config.NumberColumn("Tamanho (KB)", format="%.1f")
if "Remetente" in df_show.columns:
colcfg["Remetente"] = st.column_config.TextColumn("Remetente")
if "PastaPath" in df_show.columns:
colcfg["PastaPath"] = st.column_config.TextColumn("Caminho da Pasta")
if "Pasta" in df_show.columns:
colcfg["Pasta"] = st.column_config.TextColumn("Pasta")
if "portaria" in df_show.columns:
colcfg["portaria"] = st.column_config.TextColumn("Portaria")
# Colunas visuais
if "📎" in df_show.columns:
colcfg["📎"] = st.column_config.TextColumn("Anexo")
if "🔔" in df_show.columns:
colcfg["🔔"] = st.column_config.TextColumn("Importância")
if "👁️" in df_show.columns:
colcfg["👁️"] = st.column_config.TextColumn("Lido")
st.dataframe(
df_show,
use_container_width=True,
hide_index=True,
column_config=colcfg,
height=560
)
st.caption("🔎 A Tabela está em visão **por cliente** (uma linha por cliente) e com **Data** e **Hora** separadas.")
with tab_kpis:
_render_kpis(df_filtrado)
with tab_inds:
_render_indicators_custom(df_filtrado, dt_col_name="RecebidoEm", cols_for_topn=cols_for_topn, topn_default=10)
with tab_diag:
st.write("**Colunas disponíveis**:", list(df_src.columns))
st.write("**Linhas (antes filtros)**:", len(df_src))
st.write("**Linhas (após filtros)**:", len(df_filtrado))
if "PastaPath" in df_src.columns:
st.write("**Pastas distintas**:", df_src["PastaPath"].nunique())
if "__corpo__" in df_src.columns:
st.text_area("Amostra de corpo (debug, 1ª linha):", value=str(df_src["__corpo__"].dropna().head(1).values[0]) if df_src["__corpo__"].dropna().any() else "", height=120)
# ==============================
# ✅ Downloads — exporta a mesma visão da Tabela (explodida + Data/Hora)
# ==============================
base_name = f"relatorio_outlook_desktop_{date.today()}"
df_to_export = _build_table_view_unique_client(df_filtrado)
_build_downloads(df_to_export, base_name=base_name)
# Auditoria
if _HAS_AUDIT and meta:
try:
registrar_log(
usuario=st.session_state.get("usuario"),
acao=f"Relatório Outlook (pastas={len(meta['pastas'])}, dias={meta['dias']}, extrair={origem}) — filtros aplicados",
tabela="outlook_relatorio",
registro_id=None
)
except Exception:
pass
else:
st.info("👉 Selecione a caixa (Entrada/Saída), opcionalmente uma subpasta, e clique em **🔍 Gerar (com cache)** ou **⚡ Atualizar agora (sem cache)**. Use **🧪 Teste de conexão** se vier vazio.")
# if __name__ == "__main__":
# main()