|
|
| |
| import streamlit as st |
| import pandas as pd |
| from datetime import datetime, timedelta, date |
| import io |
| import re |
| import unicodedata |
| import pythoncom |
|
|
| |
| try: |
| from utils_auditoria import registrar_log |
| _HAS_AUDIT = True |
| except Exception: |
| _HAS_AUDIT = False |
|
|
| |
| |
| |
| _STYLES = """ |
| <style> |
| /* --- Cards/KPIs --- */ |
| .kpi-wrap {display:grid;grid-template-columns:repeat(3,1fr);gap:14px;margin:6px 0 8px 0;} |
| .kpi-card {border:1px solid rgba(0,0,0,.06);border-radius:12px;padding:14px;background:#ffffff; |
| box-shadow: 0 1px 3px rgba(0,0,0,.04);} |
| .kpi-top {display:flex;align-items:center;gap:10px;margin-bottom:6px;} |
| .kpi-ico {font-size:22px;line-height:1;} |
| .kpi-title {font-size:12px;font-weight:700;color:#6c757d;letter-spacing:.2px;text-transform:uppercase;} |
| .kpi-val {font-size:26px;font-weight:800;color:#0b1320;margin-top:2px;} |
| .kpi-note {font-size:12px;color:#6c757d;margin-top:4px;} |
| |
| /* --- Barra de status --- */ |
| .status-bar {border:1px solid rgba(0,0,0,.06);border-radius:12px;padding:10px 12px;margin:4px 0 10px 0; |
| background:linear-gradient(180deg, rgba(248,249,250,.9), rgba(255,255,255,.9));} |
| .status-line {display:flex;flex-wrap:wrap;gap:10px;font-size:13px;color:#495057;} |
| .badge {display:inline-flex;align-items:center;gap:6px;padding:4px 8px;border-radius:999px;background:#eef2ff;color:#1d4ed8; |
| border:1px solid #e0e7ff;font-weight:600;} |
| .badge.gray {background:#f8f9fa;color:#495057;border:1px solid #e9ecef;} |
| .badge.green {background:#e7f6ec;color:#1b5e20;border:1px solid #cdebd7;} |
| .badge.amber {background:#fff4e5;color:#8a4b08;border:1px solid #ffe2bf;} |
| |
| /* --- Toolbar Downloads (fixa no rodapé) --- */ |
| .dl-bar {position:sticky;bottom:8px;z-index:99;display:flex;gap:10px;padding:8px;background:rgba(255,255,255,.85); |
| border:1px solid rgba(0,0,0,.06);border-radius:12px;backdrop-filter:saturate(180%) blur(8px);} |
| |
| @media (max-width: 900px){ .kpi-wrap{grid-template-columns:1fr 1fr;} } |
| @media (max-width: 600px){ .kpi-wrap{grid-template-columns:1fr;} .dl-bar{flex-wrap:wrap;} } |
| </style> |
| """ |
|
|
| |
| |
| |
| def _build_downloads(df: pd.DataFrame, base_name: str): |
| """Cria botões de download (CSV, Excel e PDF) para o DataFrame.""" |
| if df.empty: |
| st.warning("Nenhum dado para exportar.") |
| return |
|
|
| st.markdown('<div class="dl-bar">', unsafe_allow_html=True) |
|
|
| |
| csv_buf = io.StringIO() |
| df.to_csv(csv_buf, index=False, encoding="utf-8-sig") |
| st.download_button( |
| "⬇️ Baixar CSV", |
| data=csv_buf.getvalue(), |
| file_name=f"{base_name}.csv", |
| mime="text/csv", |
| key=f"dl_csv_{base_name}" |
| ) |
|
|
| |
| xlsx_buf = io.BytesIO() |
| with pd.ExcelWriter(xlsx_buf, engine="openpyxl") as writer: |
| df.to_excel(writer, index=False, sheet_name="Relatorio") |
| ws = writer.sheets["Relatorio"] |
|
|
| |
| from openpyxl.utils import get_column_letter |
| for col_idx, col_cells in enumerate(ws.columns, start=1): |
| max_len = 0 |
| for cell in col_cells: |
| try: |
| v = "" if cell.value is None else str(cell.value) |
| max_len = max(max_len, len(v)) |
| except Exception: |
| pass |
| ws.column_dimensions[get_column_letter(col_idx)].width = min(max_len + 2, 60) |
|
|
| xlsx_buf.seek(0) |
| st.download_button( |
| "⬇️ Baixar Excel", |
| data=xlsx_buf, |
| file_name=f"{base_name}.xlsx", |
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", |
| key=f"dl_xlsx_{base_name}" |
| ) |
|
|
| |
| try: |
| from reportlab.lib.pagesizes import A4, landscape |
| from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer |
| from reportlab.lib import colors |
| from reportlab.lib.styles import getSampleStyleSheet |
|
|
| pdf_buf = io.BytesIO() |
| doc = SimpleDocTemplate( |
| pdf_buf, pagesize=landscape(A4), |
| rightMargin=20, leftMargin=20, topMargin=20, bottomMargin=20 |
| ) |
| styles = getSampleStyleSheet() |
| story = [Paragraph(f"Relatório de E-mails — {base_name}", styles["Title"]), Spacer(1, 12)] |
| df_show = df.copy().head(100) |
| data_table = [list(df_show.columns)] + df_show.astype(str).values.tolist() |
| table = Table(data_table, repeatRows=1) |
| table.setStyle(TableStyle([ |
| ("BACKGROUND", (0,0), (-1,0), colors.HexColor("#E9ECEF")), |
| ("TEXTCOLOR", (0,0), (-1,0), colors.HexColor("#212529")), |
| ("GRID", (0,0), (-1,-1), 0.25, colors.HexColor("#ADB5BD")), |
| ("FONTNAME", (0,0), (-1,0), "Helvetica-Bold"), |
| ("FONTNAME", (0,1), (-1,-1), "Helvetica"), |
| ("FONTSIZE", (0,0), (-1,-1), 9), |
| ("ALIGN", (0,0), (-1,-1), "LEFT"), |
| ("VALIGN", (0,0), (-1,-1), "MIDDLE"), |
| ])) |
| story.append(table) |
| doc.build(story) |
| pdf_buf.seek(0) |
| st.download_button( |
| "⬇️ Baixar PDF", |
| data=pdf_buf, |
| file_name=f"{base_name}.pdf", |
| mime="application/pdf", |
| key=f"dl_pdf_{base_name}" |
| ) |
| except Exception as e: |
| st.info(f"PDF: não foi possível gerar o arquivo (ReportLab). Detalhe: {e}") |
|
|
| st.markdown("</div>", unsafe_allow_html=True) |
|
|
|
|
| |
| |
| |
| def _ensure_client_exploded(df: pd.DataFrame) -> pd.DataFrame: |
| """ |
| Garante 'cliente_lista' a partir de 'cliente' separando por ';' quando necessário |
| e retorna um DF 'explodido' (uma linha por cliente), sem perder as colunas originais. |
| """ |
| if df.empty: |
| return df.copy() |
|
|
| df2 = df.copy() |
|
|
| if "cliente_lista" not in df2.columns and "cliente" in df2.columns: |
| def _split_by_semicolon(s): |
| if pd.isna(s): |
| return None |
| parts = [p.strip() for p in str(s).split(";")] |
| parts = [p for p in parts if p] |
| return parts or None |
| df2["cliente_lista"] = df2["cliente"].apply(_split_by_semicolon) |
|
|
| if "cliente_lista" in df2.columns: |
| try: |
| exploded = df2.explode("cliente_lista", ignore_index=True) |
| exploded["cliente_lista"] = exploded["cliente_lista"].astype(str).str.strip() |
| exploded = exploded[exploded["cliente_lista"].notna() & (exploded["cliente_lista"].str.len() > 0)] |
| return exploded |
| except Exception: |
| return df2 |
| return df2 |
|
|
| |
| |
| |
| def _build_table_view_unique_client(df: pd.DataFrame) -> pd.DataFrame: |
| """ |
| Prepara a Tabela com: |
| - Cliente único por linha (explode por ';') |
| - RecebidoEm separado em Data e Hora |
| - Colunas visuais (📎, 🔔, 👁️) |
| """ |
| if df.empty: |
| return df.copy() |
|
|
| base = df.copy() |
|
|
| |
| base = _ensure_client_exploded(base) |
| if "cliente_lista" not in base.columns and "cliente" in base.columns: |
| base["cliente_lista"] = base["cliente"].apply( |
| lambda s: [p.strip() for p in str(s).split(";") if p.strip()] if pd.notna(s) else None |
| ) |
| try: |
| base = base.explode("cliente_lista", ignore_index=True) |
| except Exception: |
| pass |
|
|
| |
| base["Cliente"] = base["cliente_lista"].where(base["cliente_lista"].notna(), base.get("cliente")) |
|
|
| |
| dt = pd.to_datetime(base.get("RecebidoEm"), errors="coerce") |
| base["Data"] = dt.dt.strftime("%d/%m/%Y").fillna("") |
| base["Hora"] = dt.dt.strftime("%H:%M").fillna("") |
|
|
| |
| anexos = base.get("Anexos") |
| if anexos is not None: |
| base["📎"] = anexos.fillna(0).astype(int).apply(lambda n: "📎" if n > 0 else "") |
| else: |
| base["📎"] = "" |
|
|
| imp = base.get("Importancia") |
| if imp is not None: |
| mapa_imp = {"2": "🔴 Alta", "1": "🟡 Normal", "0": "⚪ Baixa", 2: "🔴 Alta", 1: "🟡 Normal", 0: "⚪ Baixa"} |
| base["🔔"] = base["Importancia"].map(mapa_imp).fillna("") |
| else: |
| base["🔔"] = "" |
|
|
| if "Lido" in base.columns: |
| base["👁️"] = base["Lido"].apply(lambda v: "✅" if bool(v) else "⏳") |
| else: |
| base["👁️"] = "" |
|
|
| |
| if "placa" in base.columns and "Placa" not in base.columns: |
| base["Placa"] = base["placa"] |
|
|
| |
| prefer = [ |
| "Data", "Hora", "Cliente", "tipo", "Placa", "Assunto", |
| "Status_join", "portaria", |
| "📎", "🔔", "👁️", |
| "Anexos", "TamanhoKB", "Remetente", |
| "PastaPath", "Pasta", |
| ] |
| prefer = [c for c in prefer if c in base.columns] |
|
|
| drop_aux = {"cliente_lista"} |
| others = [c for c in base.columns if c not in (set(prefer) | drop_aux | {"RecebidoEm"})] |
| cols = prefer + others |
|
|
| df_show = base[cols].copy() |
| return df_show |
|
|
| |
| |
| |
| import plotly.express as px |
|
|
| def _render_indicators_custom(df: pd.DataFrame, dt_col_name: str, cols_for_topn: list[str], topn_default: int = 10): |
| """ |
| Indicadores com: |
| - Série temporal (gráfico) |
| - Top Clientes (tratando clientes múltiplos separados por ';') |
| - Outros Top N (Remetente, Tipo, Categoria, Status etc.) |
| Em layout de duas colunas para não ocupar a página inteira. |
| Inclui controles: Top N e Mostrar tabelas. |
| """ |
| if df.empty: |
| st.info("Nenhum dado após filtros. Ajuste os filtros para ver indicadores.") |
| return |
|
|
| st.subheader("📊 Indicadores") |
|
|
| |
| ctl1, ctl2, ctl3 = st.columns([1,1,2]) |
| topn = ctl1.selectbox("Top N", [5, 10, 15, 20, 30], index=[5,10,15,20,30].index(topn_default)) |
| show_tables = ctl2.checkbox("Mostrar tabelas abaixo dos gráficos", value=True) |
| palette = ctl3.selectbox("Tema de cor", ["plotly_white", "plotly", "ggplot2", "seaborn"], index=0) |
|
|
| |
| if dt_col_name in df.columns: |
| try: |
| _dt = pd.to_datetime(df[dt_col_name], errors="coerce") |
| por_dia = _dt.dt.date.value_counts().sort_index() |
| if not por_dia.empty: |
| fig_ts = px.bar( |
| por_dia, x=por_dia.index, y=por_dia.values, |
| labels={"x": "Data", "y": "Qtd"}, |
| title="Mensagens por dia", |
| template=palette, |
| height=300, |
| ) |
| fig_ts.update_layout(margin=dict(l=10, r=10, t=50, b=10)) |
| st.plotly_chart(fig_ts, use_container_width=True) |
| except Exception: |
| pass |
|
|
| |
| col_left, col_right = st.columns(2) |
|
|
| with col_left: |
| st.markdown("### 👥 Top Clientes") |
| df_clients = _ensure_client_exploded(df) |
|
|
| if "cliente_lista" in df_clients.columns: |
| vc_clientes = ( |
| df_clients["cliente_lista"] |
| .dropna() |
| .astype(str) |
| .str.strip() |
| .value_counts() |
| .head(topn) |
| ) |
| if not vc_clientes.empty: |
| ordered = vc_clientes.sort_values(ascending=True) |
| fig_cli = px.bar( |
| ordered, |
| x=ordered.values, y=ordered.index, orientation="h", |
| labels={"x": "Qtd", "y": "Cliente"}, |
| title=f"Top {topn} Clientes", |
| template=palette, height=420, |
| ) |
| fig_cli.update_layout(margin=dict(l=10, r=10, t=50, b=10)) |
| st.plotly_chart(fig_cli, use_container_width=True) |
|
|
| if show_tables: |
| st.dataframe( |
| vc_clientes.rename("Qtd").to_frame(), |
| use_container_width=True, |
| height=300 |
| ) |
| else: |
| st.info("Não há clientes para exibir.") |
| else: |
| st.info("Coluna de clientes não disponível para indicador.") |
|
|
| |
| with col_right: |
| st.markdown("### 🏆 Outros Top N") |
| ignore_cols = {"cliente", "cliente_lista"} |
| cols_others = [c for c in (cols_for_topn or []) if c in df.columns and c not in ignore_cols] |
|
|
| if not cols_others: |
| fallback_cols = [c for c in ["Remetente", "tipo", "Categoria", "Status_join", "Pasta", "PastaPath"] if c in df.columns] |
| cols_others = fallback_cols[:3] |
|
|
| for col in cols_others[:4]: |
| try: |
| st.markdown(f"**Top {topn} por `{col}`**") |
| if df[col].apply(lambda x: isinstance(x, list)).any(): |
| exploded = df.explode(col) |
| serie = exploded[col].dropna().astype(str).str.strip().value_counts().head(topn) |
| else: |
| serie = df[col].dropna().astype(str).str.strip().value_counts().head(topn) |
|
|
| if serie is None or serie.empty: |
| st.caption("Sem dados nesta coluna.") |
| continue |
|
|
| ordered = serie.sort_values(ascending=True) |
| fig_any = px.bar( |
| ordered, |
| x=ordered.values, y=ordered.index, orientation="h", |
| labels={"x": "Qtd", "y": col}, |
| template=palette, height=260, |
| ) |
| fig_any.update_layout(margin=dict(l=10, r=10, t=30, b=10)) |
| st.plotly_chart(fig_any, use_container_width=True) |
|
|
| if show_tables: |
| st.dataframe( |
| serie.rename("Qtd").to_frame(), |
| use_container_width=True, |
| height=220, |
| ) |
| except Exception as e: |
| st.warning(f"Não foi possível calcular TopN para `{col}`: {e}") |
|
|
|
|
| |
| |
| |
| def _count_notes(row) -> int: |
| """Conta notas em uma linha (lista em 'nota_fiscal' ou string).""" |
| v = row.get("nota_fiscal") |
| if isinstance(v, list): |
| return len([x for x in v if str(x).strip()]) |
| if isinstance(v, str) and v.strip(): |
| parts = [p for p in re.split(r"[^\d]+", v) if p.strip()] |
| return len(parts) |
| return 0 |
|
|
| def _render_kpis(df: pd.DataFrame): |
| """Renderiza cards de KPIs e tabelas de resumo por cliente, incluindo tipos de operação.""" |
| if df.empty: |
| return |
|
|
| |
| cliente_col = "cliente_lista" if "cliente_lista" in df.columns else ("cliente" if "cliente" in df.columns else None) |
| tipo_col = "tipo" if "tipo" in df.columns else None |
| placa_col = "placa" if "placa" in df.columns else ("Placa" if "Placa" in df.columns else None) |
|
|
| _df = df.copy() |
| _df["__notes_count__"] = _df.apply(_count_notes, axis=1) |
|
|
| |
| _df_exploded = None |
| if cliente_col == "cliente_lista": |
| try: |
| _df_exploded = _df.explode("cliente_lista") |
| _df_exploded["cliente_lista"] = _df_exploded["cliente_lista"].astype(str).str.strip() |
| except Exception: |
| _df_exploded = None |
|
|
| |
| if cliente_col == "cliente_lista" and _df_exploded is not None: |
| clientes_unicos = _df_exploded["cliente_lista"].dropna().astype(str).str.strip().nunique() |
| elif cliente_col: |
| clientes_unicos = _df[cliente_col].dropna().astype(str).str.strip().nunique() |
| else: |
| clientes_unicos = 0 |
|
|
| |
| placas_unicas = _df[placa_col].dropna().nunique() if placa_col else 0 |
|
|
| |
| notas_total = int(_df["__notes_count__"].sum()) |
|
|
| st.markdown('<div class="kpi-wrap">', unsafe_allow_html=True) |
| st.markdown( |
| f""" |
| <div class="kpi-card"> |
| <div class="kpi-top"><div class="kpi-ico">👥</div><div class="kpi-title">Clientes (únicos)</div></div> |
| <div class="kpi-val">{clientes_unicos}</div> |
| <div class="kpi-note">Número total de clientes distintos no período.</div> |
| </div> |
| """, |
| unsafe_allow_html=True, |
| ) |
| st.markdown( |
| f""" |
| <div class="kpi-card"> |
| <div class="kpi-top"><div class="kpi-ico">🚚</div><div class="kpi-title">Placas (veículos únicos)</div></div> |
| <div class="kpi-val">{placas_unicas}</div> |
| <div class="kpi-note">Contagem de veículos distintos identificados.</div> |
| </div> |
| """, |
| unsafe_allow_html=True, |
| ) |
| st.markdown( |
| f""" |
| <div class="kpi-card"> |
| <div class="kpi-top"><div class="kpi-ico">🧾</div><div class="kpi-title">Notas (total)</div></div> |
| <div class="kpi-val">{notas_total}</div> |
| <div class="kpi-note">Soma de notas fiscais informadas nos e-mails.</div> |
| </div> |
| """, |
| unsafe_allow_html=True, |
| ) |
| st.markdown("</div>", unsafe_allow_html=True) |
|
|
| try: |
| media_placas_por_cliente = (placas_unicas / clientes_unicos) if clientes_unicos else 0 |
| media_notas_por_cliente = (notas_total / clientes_unicos) if clientes_unicos else 0 |
| st.caption( |
| f"📌 Média de **placas por cliente**: {media_placas_por_cliente:.2f} • " |
| f"Média de **notas por cliente**: {media_notas_por_cliente:.2f}" |
| ) |
| except Exception: |
| pass |
|
|
| with st.expander("📒 Detalhes por cliente (resumo)", expanded=False): |
| |
| if cliente_col and tipo_col: |
| st.write("**Operações por cliente (contagem por tipo)**") |
| if cliente_col == "cliente_lista" and _df_exploded is not None: |
| operacoes_por_cliente = ( |
| _df_exploded.dropna(subset=["cliente_lista", tipo_col]) |
| .groupby(["cliente_lista", tipo_col]) |
| .size() |
| .unstack(fill_value=0) |
| .sort_index() |
| ) |
| else: |
| operacoes_por_cliente = ( |
| _df.dropna(subset=[cliente_col, tipo_col]) |
| .groupby([cliente_col, tipo_col]) |
| .size() |
| .unstack(fill_value=0) |
| .sort_index() |
| ) |
| st.dataframe(operacoes_por_cliente, use_container_width=True) |
|
|
| |
| st.write("📊 **Gráfico: Operações por cliente e tipo**") |
| try: |
| df_plot = operacoes_por_cliente.reset_index().melt( |
| id_vars=(["cliente_lista"] if cliente_col == "cliente_lista" else [cliente_col]), |
| var_name="Tipo", value_name="Quantidade" |
| ) |
| x_col = "cliente_lista" if cliente_col == "cliente_lista" else cliente_col |
| fig = px.bar( |
| df_plot, x=x_col, y="Quantidade", color="Tipo", barmode="group", |
| title="Operações por Cliente e Tipo", text="Quantidade", template="plotly_white" |
| ) |
| fig.update_layout(xaxis_title="Cliente", yaxis_title="Quantidade", legend_title="Tipo", height=460) |
| st.plotly_chart(fig, use_container_width=True) |
| except Exception as e: |
| st.warning(f"Não foi possível gerar o gráfico: {e}") |
| else: |
| st.info("Sem colunas suficientes para resumo de operações por tipo.") |
|
|
| |
| if cliente_col and placa_col: |
| if cliente_col == "cliente_lista" and _df_exploded is not None: |
| veic_por_cliente = ( |
| _df_exploded.dropna(subset=["cliente_lista", placa_col]) |
| .groupby("cliente_lista")[placa_col].nunique() |
| .sort_values(ascending=False) |
| .rename("Placas_Únicas") |
| .to_frame() |
| ) |
| else: |
| veic_por_cliente = ( |
| _df.dropna(subset=[cliente_col, placa_col]) |
| .groupby(cliente_col)[placa_col].nunique() |
| .sort_values(ascending=False) |
| .rename("Placas_Únicas") |
| .to_frame() |
| ) |
| st.write("**Veículos (placas únicas) por cliente — Top 20**") |
| st.dataframe(veic_por_cliente.head(20), use_container_width=True, height=460) |
| else: |
| st.info("Sem colunas de cliente/placa suficientes para o resumo de veículos.") |
|
|
| |
| if cliente_col: |
| if cliente_col == "cliente_lista" and _df_exploded is not None: |
| notas_por_cliente = ( |
| _df_exploded.groupby("cliente_lista")["__notes_count__"] |
| .sum() |
| .sort_values(ascending=False) |
| .rename("Notas_Total") |
| .to_frame() |
| ) |
| else: |
| notas_por_cliente = ( |
| _df.groupby(cliente_col)["__notes_count__"] |
| .sum() |
| .sort_values(ascending=False) |
| .rename("Notas_Total") |
| .to_frame() |
| ) |
| st.write("**Notas por cliente — Top 20**") |
| st.dataframe(notas_por_cliente.head(20), use_container_width=True, height=460) |
| else: |
| st.info("Sem coluna de cliente para o resumo de notas.") |
|
|
| |
| |
| |
| def _strip_accents(s: str) -> str: |
| if not isinstance(s, str): |
| return s |
| return "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn") |
|
|
| def _html_to_text(html: str) -> str: |
| if not html: |
| return "" |
| text = re.sub(r"(?is)<style.*?>.*?</style>", " ", html) |
| text = re.sub(r"(?is)<script.*?>.*?</script>", " ", text) |
| text = re.sub(r"(?is)<br\s*/?>", "\n", text) |
| text = re.sub(r"(?is)</p>", "\n", text) |
| text = re.sub(r"(?is)<.*?>", " ", text) |
| text = re.sub(r"[ \t]+", " ", text) |
| text = re.sub(r"\s*\n\s*", "\n", text).strip() |
| return text |
|
|
| def _normalize_spaces(s: str) -> str: |
| if not s: |
| return "" |
| s = s.replace("\r", "\n") |
| s = re.sub(r"[ \t]+", " ", s) |
| s = re.sub(r"\n{2,}", "\n", s).strip() |
| return s |
|
|
| def _normalize_person_name(s: str) -> str: |
| """ |
| Converte 'Sobrenome, Nome' -> 'Nome Sobrenome'. |
| Mantém como está se não houver vírgula. |
| """ |
| if not s: |
| return s |
| s = s.strip() |
| if "," in s: |
| partes = [p.strip() for p in s.split(",")] |
| if len(partes) >= 2: |
| return f"{partes[1]} {partes[0]}".strip() |
| return s |
|
|
| |
| def _clean_plate(p: str) -> str: |
| """Normaliza a placa: remove espaços/hífens e deixa maiúscula.""" |
| return re.sub(r"[^A-Za-z0-9]", "", (p or "")).upper() |
|
|
| def _is_valid_plate(p: str) -> bool: |
| """ |
| Valida placa nos formatos: |
| - antigo: AAA1234 |
| - Mercosul: AAA1B23 |
| """ |
| if not p: |
| return False |
| p = _clean_plate(p) |
| if len(p) != 7: |
| return False |
| if re.fullmatch(r"[A-Z]{3}[0-9]{4}", p): |
| return True |
| if re.fullmatch(r"[A-Z]{3}[0-9][A-Z][0-9]{2}", p): |
| return True |
| return False |
|
|
| def _format_plate(p: str) -> str: |
| """Formata visualmente: antigo → AAA-1234; Mercosul permanece sem hífen.""" |
| p0 = _clean_plate(p) |
| if re.fullmatch(r"[A-Z]{3}[0-9]{4}", p0): |
| return f"{p0[:3]}-{p0[3:]}" |
| return p0 |
|
|
| def _extract_plate_from_text(text: str) -> str | None: |
| """Extrai placa do texto com robustez.""" |
| if not text: |
| return None |
|
|
| t = _strip_accents(text).upper() |
| t = re.sub(r"PLACA\s+DE\s+AUTORIZA\w+", " ", t) |
|
|
| |
| for ln in [x.strip() for x in t.splitlines() if x.strip()]: |
| if "PLACA" in ln and "AUTORIZA" not in ln: |
| for m in re.finditer(r"\b[A-Z0-9]{7}\b", ln): |
| cand = m.group(0) |
| if _is_valid_plate(cand): |
| return _format_plate(cand) |
| m_old = re.search(r"\b([A-Z]{3})[ -]?([0-9]{4})\b", ln) |
| if m_old: |
| cand = (m_old.group(1) + m_old.group(2)).upper() |
| if _is_valid_plate(cand): |
| return _format_plate(cand) |
| m_new = re.search(r"\b([A-Z]{3})[ -]?([0-9])[ -]?([A-Z])[ -]?([0-9]{2})\b", ln) |
| if m_new: |
| cand = (m_new.group(1) + m_new.group(2) + m_new.group(3) + m_new.group(4)).upper() |
| if _is_valid_plate(cand): |
| return _format_plate(cand) |
|
|
| |
| for m in re.finditer(r"\b([A-Z]{3})[ -]?([0-9]{4})\b", t): |
| cand = (m.group(1) + m.group(2)).upper() |
| if _is_valid_plate(cand): |
| return _format_plate(cand) |
| for m in re.finditer(r"\b([A-Z]{3})[ -]?([0-9])[ -]?([A-Z])[ -]?([0-9]{2})\b", t): |
| cand = (m.group(1) + m.group(2) + m.group(3) + m.group(4)).upper() |
| if _is_valid_plate(cand): |
| return _format_plate(cand) |
|
|
| return None |
|
|
| |
| def _sanitize_cliente(s: str | None) -> str | None: |
| if not s: |
| return s |
| s2 = re.sub(r"\s*:\s*", " ", s) |
| s2 = re.sub(r"\s+", " ", s2).strip(" -:|") |
| return s2.strip() |
|
|
| def _split_semicolon(s: str | None) -> list[str] | None: |
| if not s: |
| return None |
| parts = [p.strip(" -:|").strip() for p in s.split(";")] |
| parts = [p for p in parts if p] |
| return parts or None |
|
|
| def _cliente_to_list(cliente_raw: str | None) -> list[str] | None: |
| s = _sanitize_cliente(cliente_raw) |
| return _split_semicolon(s) |
|
|
| |
| |
| |
| def _parse_email_body(raw_text: str) -> dict: |
| """Extrai campos estruturados do e‑mail padrão.""" |
| text = _normalize_spaces(raw_text or "") |
| text_acc = _strip_accents(text) |
|
|
| def get(pat, acc=False, flags=re.IGNORECASE | re.MULTILINE): |
| return (re.search(pat, text_acc if acc else text, flags) or re.search(pat, text, flags)) |
|
|
| data = {} |
| m = get(r"\bItem\s+ID\s*([0-9]+)") |
| data["item_id"] = m.group(1) if m else None |
|
|
| m = get(r"Portaria\s*-\s*([^\n]+)") |
| data["portaria"] = (m.group(1).strip() if m else None) |
|
|
| m = get(r"\b([0-9]{2}/[0-9]{2}/[0-9]{4})\s+([0-9]{2}:[0-9]{2})\b") |
| data["timestamp_corpo_data"] = m.group(1) if m else None |
| data["timestamp_corpo_hora"] = m.group(2) if m else None |
|
|
| m = get(r"NOTA\s+FISCAL\s*([0-9/\- ]+)") |
| nf = None |
| if m: |
| nf = re.sub(r"\s+", "", m.group(1)) |
| nf = [p for p in nf.split("/") if p] |
| data["nota_fiscal"] = nf |
|
|
| m = get(r"TIPO\s*([A-ZÁÂÃÉÍÓÚÇ ]+)") |
| data["tipo"] = (m.group(1).strip() if m else None) |
|
|
| m = get(r"N[º°]?\s*DA\s*PLACA\s*DE\s*AUTORIZA[ÇC]AO\s*([0-9]+)", acc=True) |
| data["num_placa_autorizacao"] = m.group(1) if m else None |
|
|
| |
| m = get(r"CLIENTE\s*:?\s*([^\n]+)") |
| cliente_raw = (m.group(1).strip() if m else None) |
| data["cliente"] = _sanitize_cliente(cliente_raw) |
| data["cliente_lista"] = _cliente_to_list(cliente_raw) |
|
|
| |
| m = get(r"STATUS\s*:?\s*([^\n]+)") |
| status_raw = m.group(1).strip() if m else None |
| data["status_lista"] = [p.strip() for p in re.split(r"\s{2,}|\t", status_raw) if p.strip()] if status_raw else None |
|
|
| |
| m = get(r"LIBERA[ÇC][AÃ]O\s*DE\s*ENTRADA\s*([^\n]*)", acc=True) |
| liber_entr = (m.group(1).strip() or None) if m else None |
| data["liberacao_entrada"] = liber_entr |
| data["liberacao_entrada_responsavel"] = _normalize_person_name(liber_entr) if liber_entr else None |
| data["liberacao_entrada_flag"] = bool(liber_entr) |
|
|
| m = get(r"LIBERA[ÇC][AÃ]O\s*DE\s*SA[IÍ]DA\s*([^\n]*)", acc=True) |
| liber_saida = (m.group(1).strip() or None) if m else None |
| data["liberacao_saida"] = liber_saida |
| data["liberacao_saida_responsavel"] = _normalize_person_name(liber_saida) if liber_saida else None |
| data["liberacao_saida_flag"] = bool(liber_saida) |
|
|
| m = get(r"RG\s*ou\s*CPF\s*do\s*MOTORISTA\s*([0-9.\-]+)") |
| data["rg_cpf_motorista"] = m.group(1) if m else None |
|
|
| |
| data["placa"] = _extract_plate_from_text(text) |
|
|
| |
| m = get(r"HR\s*ENTRADA\s*:?\s*([0-9]{2}:[0-9]{2})") |
| data["hr_entrada"] = m.group(1) if m else None |
|
|
| m = get(r"HR\s*SA[IÍ]DA\s*:?\s*([0-9]{2}:[0-9]{2})", acc=True) |
| data["hr_saida"] = m.group(1) if m else None |
| edit_saida = get(r"HR\s*SA[IÍ]DA.*?\bEditado\b", acc=True, flags=re.IGNORECASE | re.DOTALL) |
| data["hr_saida_editado"] = bool(edit_saida) |
|
|
| m = get(r"DATA\s*DA\s*OPERA[ÇC][AÃ]O\s*([0-9]{2}/[0-9]{2}/[0-9]{4})", acc=True) |
| data["data_operacao"] = m.group(1) if m else None |
|
|
| m = get(r"Palavras[- ]chave\s*Corporativas\s*([^\n]*)", acc=True) |
| data["palavras_chave"] = (m.group(1).strip() or None) if m else None |
|
|
| m = get(r"AGENDAMENTO\s*(SIM|N[ÃA]O)", acc=True) |
| agend = m.group(1).upper() if m else None |
| data["agendamento"] = {"SIM": True, "NAO": False, "NÃO": False}.get(agend, None) |
|
|
| data["status_editado"] = any("editado" in p.lower() for p in (data["status_lista"] or [])) |
| data["Status_join"] = " | ".join(data["status_lista"]) if isinstance(data.get("status_lista"), list) else (data.get("status_lista") or "") |
| return data |
|
|
| |
| |
| |
| def _list_inbox_paths(ns) -> list[str]: |
| """Inbox da caixa padrão (para retrocompatibilidade).""" |
| paths = ["Inbox"] |
| try: |
| olFolderInbox = 6 |
| inbox = ns.GetDefaultFolder(olFolderInbox) |
| except Exception: |
| return [] |
|
|
| def _walk(folder, prefix="Inbox"): |
| try: |
| for i in range(1, folder.Folders.Count + 1): |
| f = folder.Folders.Item(i) |
| full_path = prefix + "\\" + f.Name |
| paths.append(full_path) |
| _walk(f, full_path) |
| except Exception: |
| pass |
|
|
| _walk(inbox, "Inbox") |
| return paths |
|
|
| def _list_all_inbox_paths(ns) -> list[str]: |
| """Lista caminhos de Inbox para TODAS as stores (caixas), ex.: 'Minha Caixa\\Inbox\\Sub'.""" |
| paths: list[str] = [] |
| olFolderInbox = 6 |
| try: |
| for i in range(1, ns.Folders.Count + 1): |
| store = ns.Folders.Item(i) |
| root_name = str(store.Name).strip() |
| try: |
| inbox = store.GetDefaultFolder(olFolderInbox) |
| except Exception: |
| continue |
| inbox_display = str(inbox.Name).strip() |
| root_prefix = f"{root_name}\\{inbox_display}" |
| paths.append(root_prefix) |
|
|
| def _walk(folder, prefix): |
| try: |
| for j in range(1, folder.Folders.Count + 1): |
| f2 = folder.Folders.Item(j) |
| full_path = prefix + "\\" + f2.Name |
| paths.append(full_path) |
| _walk(f2, full_path) |
| except Exception: |
| pass |
|
|
| _walk(inbox, root_prefix) |
| except Exception: |
| pass |
| return sorted(set(paths)) |
|
|
| def _get_folder_by_path_any_store(ns, path: str): |
| """ |
| Resolve caminho de pasta começando por: |
| - 'Inbox\\...' (ou nome local, ex.: 'Caixa de Entrada\\...') |
| - 'Sent Items\\...' (ou nome local, ex.: 'Itens Enviados\\...') |
| - 'NomeDaStore\\Inbox\\...' OU 'NomeDaStore\\<NomeLocalInbox>\\...' |
| - 'NomeDaStore\\Sent Items\\...' OU 'NomeDaStore\\<NomeLocalSent>\\...' |
| Faz match case-insensitive nas subpastas. |
| """ |
| OL_INBOX = 6 |
| OL_SENT = 5 |
|
|
| p = (path or "").strip().replace("/", "\\") |
| parts = [x for x in p.split("\\") if x] |
| if not parts: |
| raise RuntimeError("Caminho da pasta vazio. Ex.: Inbox\\Financeiro, Sent Items\\Faturamento ou Minha Caixa\\Inbox\\Operacional") |
|
|
| |
| try: |
| inbox_local = ns.GetDefaultFolder(OL_INBOX).Name |
| except Exception: |
| inbox_local = "Inbox" |
| try: |
| sent_local = ns.GetDefaultFolder(OL_SENT).Name |
| except Exception: |
| sent_local = "Sent Items" |
|
|
| inbox_names = {"inbox", str(inbox_local).strip().lower()} |
| sent_names = {"sent items", str(sent_local).strip().lower()} |
|
|
| def _descend_from(root_folder, segs): |
| folder = root_folder |
| for seg in segs: |
| target = seg.strip().lower() |
| found = None |
| for i in range(1, folder.Folders.Count + 1): |
| cand = folder.Folders.Item(i) |
| if str(cand.Name).strip().lower() == target: |
| found = cand |
| break |
| if not found: |
| raise RuntimeError(f"Subpasta não encontrada: '{seg}' em '{folder.Name}'") |
| folder = found |
| return folder |
|
|
| |
| if len(parts) >= 2: |
| store_name_candidate = parts[0].strip().lower() |
| for i in range(1, ns.Folders.Count + 1): |
| store = ns.Folders.Item(i) |
| if str(store.Name).strip().lower() == store_name_candidate: |
| first_seg = parts[1].strip().lower() |
| if first_seg in inbox_names: |
| root = store.GetDefaultFolder(OL_INBOX) |
| return _descend_from(root, parts[2:]) |
| if first_seg in sent_names: |
| root = store.GetDefaultFolder(OL_SENT) |
| return _descend_from(root, parts[2:]) |
| raise RuntimeError( |
| f"Segundo segmento deve ser 'Inbox'/'{inbox_local}' ou 'Sent Items'/'{sent_local}' para a store '{store.Name}'." |
| ) |
|
|
| |
| first = parts[0].strip().lower() |
| if first in inbox_names: |
| root = ns.GetDefaultFolder(OL_INBOX) |
| return _descend_from(root, parts[1:]) |
| if first in sent_names: |
| root = ns.GetDefaultFolder(OL_SENT) |
| return _descend_from(root, parts[1:]) |
|
|
| raise RuntimeError( |
| f"O caminho deve começar por 'Inbox' (ou '{inbox_local}') ou 'Sent Items' (ou '{sent_local}'), " |
| f"ou por 'NomeDaStore\\Inbox' / 'NomeDaStore\\Sent Items'." |
| ) |
|
|
| def _read_folder_dataframe(folder, path: str, dias: int, filtro_remetente: str, extrair_campos: bool) -> pd.DataFrame: |
| """ |
| Lê e-mails de uma pasta específica e retorna DataFrame. |
| ✅ Fallback: se Restrict retornar 0 itens, volta para iteração completa + filtro por data. |
| """ |
| items = folder.Items |
| items.Sort("[ReceivedTime]", True) |
| cutoff = datetime.now() - timedelta(days=dias) |
|
|
| iter_items = items |
| restricted_ok = False |
| restricted_count = None |
| try: |
| |
| dt_from = cutoff.strftime("%m/%d/%Y %I:%M %p") |
| iter_items = items.Restrict(f"[ReceivedTime] >= '{dt_from}'") |
| restricted_ok = True |
| try: |
| restricted_count = iter_items.Count |
| except Exception: |
| restricted_count = None |
| except Exception: |
| restricted_ok = False |
|
|
| |
| if restricted_ok and (restricted_count is None or restricted_count == 0): |
| iter_items = items |
| restricted_ok = False |
|
|
| rows = [] |
| total_lidos = 0 |
| for mail in iter_items: |
| total_lidos += 1 |
| try: |
| if getattr(mail, "Class", None) != 43: |
| continue |
|
|
| |
| if not restricted_ok: |
| try: |
| if getattr(mail, "ReceivedTime", None) and mail.ReceivedTime < cutoff: |
| continue |
| except Exception: |
| pass |
|
|
| try: |
| sender = mail.SenderEmailAddress or mail.Sender.Name |
| except Exception: |
| sender = getattr(mail, "SenderName", None) |
|
|
| if filtro_remetente and sender: |
| if filtro_remetente.lower() not in str(sender).lower(): |
| continue |
|
|
| base = { |
| "Pasta": folder.Name, |
| "PastaPath": path, |
| "Assunto": mail.Subject, |
| "Remetente": sender, |
| "RecebidoEm": mail.ReceivedTime.strftime("%Y-%m-%d %H:%M"), |
| "Anexos": mail.Attachments.Count if hasattr(mail, "Attachments") else 0, |
| "TamanhoKB": round(mail.Size / 1024, 1) if hasattr(mail, "Size") else None, |
| "Importancia": str(getattr(mail, "Importance", "")), |
| "Categoria": getattr(mail, "Categories", "") or "", |
| "Lido": bool(getattr(mail, "UnRead", False) == False), |
| } |
|
|
| if extrair_campos: |
| raw_body = "" |
| try: |
| raw_body = mail.Body or "" |
| except Exception: |
| raw_body = "" |
| if not raw_body: |
| try: |
| raw_body = _html_to_text(mail.HTMLBody) |
| except Exception: |
| raw_body = "" |
|
|
| parsed = _parse_email_body(raw_body) |
| base.update(parsed) |
|
|
| try: |
| base["__corpo__"] = _strip_accents(raw_body)[:800] |
| except Exception: |
| base["__corpo__"] = "" |
|
|
| rows.append(base) |
| except Exception as e: |
| rows.append({"Pasta": folder.Name, "PastaPath": path, "Assunto": f"[ERRO] {e}"}) |
|
|
| df = pd.DataFrame(rows) |
| with st.expander(f"🔎 Diagnóstico • {path}", expanded=False): |
| st.caption( |
| f"Itens enumerados: **{total_lidos}** " |
| f"| Restrict aplicado: **{restricted_ok}** " |
| f"{'| Restrict.Count=' + str(restricted_count) if restricted_count is not None else ''} " |
| f"| Registros DF: **{df.shape[0]}**" |
| ) |
| return df |
|
|
| def gerar_relatorio_outlook_desktop_multi( |
| pastas: list[str], |
| dias: int, |
| filtro_remetente: str = "", |
| extrair_campos: bool = True |
| ) -> pd.DataFrame: |
| """Inicializa COM, conecta ao Outlook, lê múltiplas pastas (todas as stores) e finaliza COM.""" |
| try: |
| import win32com.client |
| pythoncom.CoInitialize() |
| ns = win32com.client.Dispatch("Outlook.Application").GetNamespace("MAPI") |
| except Exception as e: |
| st.error(f"Falha ao conectar ao Outlook/pywin32: {e}") |
| return pd.DataFrame() |
|
|
| frames = [] |
| try: |
| for path in pastas: |
| try: |
| folder = _get_folder_by_path_any_store(ns, path) |
| df = _read_folder_dataframe(folder, path, dias, filtro_remetente, extrair_campos) |
| frames.append(df) |
| except Exception as e: |
| st.warning(f"Não foi possível ler a pasta '{path}': {e}") |
|
|
| return pd.concat(frames, ignore_index=True) if frames else pd.DataFrame() |
| finally: |
| try: |
| pythoncom.CoUninitialize() |
| except Exception: |
| pass |
|
|
| |
| |
| |
| @st.cache_data(show_spinner=False, ttl=60) |
| def _cache_outlook_df(pastas: tuple, dias: int, filtro_remetente: str, extrair_campos: bool, _v: int = 1) -> pd.DataFrame: |
| """Cache com TTL para reduzir leituras do Outlook.""" |
| return gerar_relatorio_outlook_desktop_multi(list(pastas), dias, filtro_remetente=filtro_remetente, extrair_campos=extrair_campos) |
|
|
| def _read_outlook_fresh(pastas: list[str], dias: int, filtro_remetente: str, extrair_campos: bool) -> pd.DataFrame: |
| """Leitura direta, sem cache (para 'Atualizar agora').""" |
| return gerar_relatorio_outlook_desktop_multi(pastas, dias, filtro_remetente=filtro_remetente, extrair_campos=extrair_campos) |
|
|
| |
| |
| |
| def _sanitize_default(options: list[str], default_list: list[str]) -> list[str]: |
| if not options or not default_list: |
| return [] |
| opts_set = set(options) |
| return [v for v in default_list if v in opts_set] |
|
|
| def _build_dynamic_filters(df: pd.DataFrame): |
| """ |
| Constrói UI de filtros e retorna (df_filtrado, cols_topn). |
| """ |
| if df.empty: |
| return df, [] |
|
|
| df_f = df.copy() |
| try: |
| df_f["RecebidoEm_dt"] = pd.to_datetime(df_f.get("RecebidoEm"), errors="coerce") |
| except Exception: |
| df_f["RecebidoEm_dt"] = pd.NaT |
|
|
| total_inicial = len(df_f) |
|
|
| state_keys = { |
| "f_dt_ini": "flt_dt_ini", |
| "f_dt_fim": "flt_dt_fim", |
| "f_lido": "flt_lido", |
| "f_status": "flt_status", |
| "f_saida_flag": "flt_saida_flag", |
| "f_entrada_flag": "flt_entrada_flag", |
| "f_anexos_equal": "flt_anexos_equal", |
| "f_anexos_apply_equal": "flt_anexos_apply_equal", |
| "f_tamanho_equal": "flt_tamanho_equal", |
| "f_tamanho_apply_equal": "flt_tamanho_apply_equal", |
| "f_assunto": "flt_assunto", |
| "f_cols_topn": "flt_cols_topn", |
| "f_pasta": "flt_pasta", |
| "f_pasta_path": "flt_pasta_path", |
| "f_portaria": "flt_portaria", |
| "f_cliente": "flt_cliente", |
| "f_tipo": "flt_tipo", |
| "f_placa": "flt_placa", |
| "f_importancia": "flt_importancia", |
| "f_categoria": "flt_categoria", |
| "f_resp_saida": "flt_resp_saida", |
| "f_resp_entrada": "flt_resp_entrada", |
| } |
| for k in state_keys.values(): |
| st.session_state.setdefault(k, None) |
|
|
| with st.expander("🔎 Filtros do relatório (por colunas retornadas)", expanded=True): |
| with st.form("form_filtros_outlook"): |
| |
| min_dt = df_f["RecebidoEm_dt"].min() |
| max_dt = df_f["RecebidoEm_dt"].max() |
| if pd.notna(min_dt) and pd.notna(max_dt): |
| col_d1, col_d2 = st.columns(2) |
| dt_ini = col_d1.date_input("Data inicial (RecebidoEm)", value=st.session_state[state_keys["f_dt_ini"]] or min_dt.date(), key="f_dt_ini_widget") |
| dt_fim = col_d2.date_input("Data final (RecebidoEm)", value=st.session_state[state_keys["f_dt_fim"]] or max_dt.date(), key="f_dt_fim_widget") |
| else: |
| dt_ini, dt_fim = None, None |
|
|
| def _multi_select(col_name, label, state_key): |
| if col_name in df_f.columns: |
| options = sorted([v for v in df_f[col_name].dropna().astype(str).unique() if v != ""]) |
| default_raw = st.session_state[state_keys[state_key]] or [] |
| default = _sanitize_default(options, default_raw) |
| sel = st.multiselect(label, options=options, default=default, key=f"{state_key}_widget") |
| st.session_state[state_keys[state_key]] = sel |
|
|
| |
| _multi_select("PastaPath", "Pasta (caminho completo)", "f_pasta_path") |
| |
| _multi_select("Pasta", "Pasta (apenas nome)", "f_pasta") |
|
|
| _multi_select("portaria", "Portaria", "f_portaria") |
|
|
| |
| if "cliente_lista" in df_f.columns: |
| clientes_opts = sorted({ |
| c.strip() for lst in df_f["cliente_lista"] |
| if isinstance(lst, list) for c in lst if c and str(c).strip() |
| }) |
| default_raw = st.session_state[state_keys["f_cliente"]] or [] |
| default = _sanitize_default(clientes_opts, default_raw) |
| cliente_sel = st.multiselect("Cliente", options=clientes_opts, default=default, key="f_cliente_widget") |
| st.session_state[state_keys["f_cliente"]] = cliente_sel |
| else: |
| _multi_select("cliente", "Cliente", "f_cliente") |
|
|
| _multi_select("tipo", "Tipo", "f_tipo") |
| _multi_select("placa", "Placa (veículo)", "f_placa") |
| _multi_select("Importancia", "Importância (0=baixa,1=normal,2=alta)", "f_importancia") |
| _multi_select("Categoria", "Categoria", "f_categoria") |
| _multi_select("liberacao_saida_responsavel", "Responsável (Liberação de Saída)", "f_resp_saida") |
| _multi_select("liberacao_entrada_responsavel", "Responsável (Liberação de Entrada)", "f_resp_entrada") |
|
|
| |
| if "Lido" in df_f.columns: |
| options_lido = ["True", "False"] |
| default_raw = st.session_state[state_keys["f_lido"]] or [] |
| default_lido = _sanitize_default(options_lido, default_raw) |
| lido_sel = st.multiselect("Lido?", options=options_lido, default=default_lido, key="f_lido_widget") |
| st.session_state[state_keys["f_lido"]] = lido_sel |
|
|
| |
| if "status_lista" in df_f.columns: |
| all_status = sorted(set(s for v in df_f["status_lista"] if isinstance(v, list) for s in v)) |
| default_raw = st.session_state[state_keys["f_status"]] or [] |
| default_status = _sanitize_default(all_status, default_raw) |
| status_sel = st.multiselect("Status (qualquer dos selecionados)", options=all_status, default=default_status, key="f_status_widget") |
| st.session_state[state_keys["f_status"]] = status_sel |
|
|
| |
| if "liberacao_saida_flag" in df_f.columns: |
| opt_saida = st.selectbox("Teve Liberação de Saída?", ["(todas)", "Sim", "Não"], |
| index=(0 if st.session_state[state_keys["f_saida_flag"]] is None else ["(todas)", "Sim", "Não"].index(st.session_state[state_keys["f_saida_flag"]])), |
| key="f_saida_flag_widget") |
| st.session_state[state_keys["f_saida_flag"]] = opt_saida |
| if "liberacao_entrada_flag" in df_f.columns: |
| opt_entr = st.selectbox("Teve Liberação de Entrada?", ["(todas)", "Sim", "Não"], |
| index=(0 if st.session_state[state_keys["f_entrada_flag"]] is None else ["(todas)", "Sim", "Não"].index(st.session_state[state_keys["f_entrada_flag"]])), |
| key="f_entrada_flag_widget") |
| st.session_state[state_keys["f_entrada_flag"]] = opt_entr |
|
|
| |
| col_n1, col_n2 = st.columns(2) |
| if "Anexos" in df_f.columns: |
| col_vals = df_f["Anexos"].dropna() |
| if col_vals.empty: |
| col_n1.info("Sem valores para **Anexos** nesta seleção.") |
| else: |
| min_ax = int(col_vals.min()); max_ax = int(col_vals.max()) |
| if min_ax == max_ax: |
| col_n1.info(f"Todos os registros têm **{min_ax}** anexos.") |
| eq_val = col_n1.number_input("Filtrar por Anexos = (opcional)", min_value=min_ax, max_value=max_ax, |
| value=st.session_state[state_keys["f_anexos_equal"]] or min_ax, step=1, key="f_anexos_equal_widget") |
| apply_eq = col_n1.checkbox("Aplicar igualdade de Anexos", value=bool(st.session_state[state_keys["f_anexos_apply_equal"]]), key="f_anexos_apply_equal_widget") |
| st.session_state[state_keys["f_anexos_equal"]] = eq_val |
| st.session_state[state_keys["f_anexos_apply_equal"]] = apply_eq |
| else: |
| rng_ax = col_n1.slider("Anexos (intervalo)", min_value=min_ax, max_value=max_ax, |
| value=st.session_state.get("__rng_anexos__", (min_ax, max_ax)), key="__rng_anexos__widget") |
| st.session_state["__rng_anexos__"] = rng_ax |
|
|
| if "TamanhoKB" in df_f.columns: |
| col_vals = df_f["TamanhoKB"].dropna() |
| if col_vals.empty: |
| col_n2.info("Sem valores para **TamanhoKB** nesta seleção.") |
| else: |
| min_sz = float(col_vals.min()); max_sz = float(col_vals.max()) |
| if abs(min_sz - max_sz) < 1e-9: |
| col_n2.info(f"Todos os registros têm **{min_sz:.1f} KB**.") |
| eq_sz = col_n2.number_input("Filtrar por TamanhoKB = (opcional)", min_value=float(min_sz), max_value=float(max_sz), |
| value=st.session_state[state_keys["f_tamanho_equal"]] or float(min_sz), step=0.1, format="%.1f", key="f_tamanho_equal_widget") |
| apply_sz = col_n2.checkbox("Aplicar igualdade de TamanhoKB", value=bool(st.session_state[state_keys["f_tamanho_apply_equal"]]), key="f_tamanho_apply_equal_widget") |
| st.session_state[state_keys["f_tamanho_equal"]] = eq_sz |
| st.session_state[state_keys["f_tamanho_apply_equal"]] = apply_sz |
| else: |
| rng_sz = col_n2.slider("Tamanho (KB)", min_value=float(min_sz), max_value=float(max_sz), |
| value=st.session_state.get("__rng_tamanho__", (float(min_sz), float(max_sz))), key="__rng_tamanho__widget") |
| st.session_state["__rng_tamanho__"] = rng_sz |
|
|
| |
| txt_default = st.session_state[state_keys["f_assunto"]] or "" |
| txt = st.text_input("Assunto contém (opcional)", value=txt_default, key="f_assunto_widget") |
| st.session_state[state_keys["f_assunto"]] = txt |
|
|
| |
| possiveis_cols = [c for c in ["Pasta", "PastaPath", "portaria", "cliente", "cliente_lista", "tipo", "placa", |
| "Importancia", "Categoria", "Remetente", "Status_join", |
| "liberacao_saida_responsavel", "liberacao_entrada_responsavel"] if c in df_f.columns] |
| desired_default = st.session_state[state_keys["f_cols_topn"]] or ["cliente", "Status_join"] |
| default_cols = _sanitize_default(possiveis_cols, desired_default) |
| cols_for_topn = st.multiselect("Colunas para indicadores (Top N)", options=possiveis_cols, default=default_cols, key="f_cols_topn_widget") |
| st.session_state[state_keys["f_cols_topn"]] = cols_for_topn |
|
|
| |
| col_b1, col_b2 = st.columns(2) |
| aplicar = col_b1.form_submit_button("✅ Aplicar filtros") |
| limpar = col_b2.form_submit_button("🧹 Limpar filtros") |
|
|
| if limpar: |
| for k in state_keys.values(): |
| st.session_state[k] = None |
| st.session_state.pop("__rng_anexos__", None) |
| st.session_state.pop("__rng_tamanho__", None) |
| st.info("Filtros limpos.") |
| return df, [] |
|
|
| if aplicar: |
| |
| if pd.notna(df_f["RecebidoEm_dt"]).any() and dt_ini and dt_fim: |
| mask_dt = (df_f["RecebidoEm_dt"].dt.date >= dt_ini) & (df_f["RecebidoEm_dt"].dt.date <= dt_fim) |
| df_f = df_f[mask_dt] |
|
|
| def _apply_multi(col_name, session_key): |
| sel = st.session_state.get(session_key) |
| nonlocal df_f |
| if sel and col_name in df_f.columns: |
| df_f = df_f[df_f[col_name].astype(str).isin(sel)] |
|
|
| |
| _apply_multi("PastaPath", state_keys["f_pasta_path"]) |
| |
| _apply_multi("Pasta", state_keys["f_pasta"]) |
| _apply_multi("portaria", state_keys["f_portaria"]) |
| _apply_multi("tipo", state_keys["f_tipo"]) |
| _apply_multi("placa", state_keys["f_placa"]) |
| _apply_multi("Importancia", state_keys["f_importancia"]) |
| _apply_multi("Categoria", state_keys["f_categoria"]) |
| _apply_multi("liberacao_saida_responsavel", state_keys["f_resp_saida"]) |
| _apply_multi("liberacao_entrada_responsavel", state_keys["f_resp_entrada"]) |
|
|
| |
| cliente_sel = st.session_state[state_keys["f_cliente"]] |
| if cliente_sel: |
| if "cliente_lista" in df_f.columns: |
| df_f = df_f[df_f["cliente_lista"].apply(lambda lst: isinstance(lst, list) and any(c in lst for c in cliente_sel))] |
| elif "cliente" in df_f.columns: |
| df_f = df_f[df_f["cliente"].astype(str).isin(cliente_sel)] |
|
|
| |
| lido_sel = st.session_state[state_keys["f_lido"]] |
| if lido_sel and "Lido" in df_f.columns: |
| df_f = df_f[df_f["Lido"].astype(str).isin(lido_sel)] |
|
|
| |
| status_sel = st.session_state[state_keys["f_status"]] |
| if status_sel and "status_lista" in df_f.columns: |
| df_f = df_f[df_f["status_lista"].apply(lambda lst: isinstance(lst, list) and any(s in lst for s in status_sel))] |
|
|
| |
| opt_saida = st.session_state[state_keys["f_saida_flag"]] |
| if opt_saida in ("Sim", "Não") and "liberacao_saida_flag" in df_f.columns: |
| df_f = df_f[df_f["liberacao_saida_flag"] == (opt_saida == "Sim")] |
| opt_entr = st.session_state[state_keys["f_entrada_flag"]] |
| if opt_entr in ("Sim", "Não") and "liberacao_entrada_flag" in df_f.columns: |
| df_f = df_f[df_f["liberacao_entrada_flag"] == (opt_entr == "Sim")] |
|
|
| |
| if st.session_state.get("__rng_anexos__") and "Anexos" in df_f.columns: |
| a0, a1 = st.session_state["__rng_anexos__"] |
| df_f = df_f[(df_f["Anexos"] >= a0) & (df_f["Anexos"] <= a1)] |
| if st.session_state.get("__rng_tamanho__") and "TamanhoKB" in df_f.columns: |
| s0, s1 = st.session_state["__rng_tamanho__"] |
| df_f = df_f[(df_f["TamanhoKB"] >= s0) & (df_f["TamanhoKB"] <= s1)] |
|
|
| |
| if st.session_state.get(state_keys["f_anexos_apply_equal"]) and "Anexos" in df_f.columns: |
| eq_val = st.session_state.get(state_keys["f_anexos_equal"]) |
| if eq_val is not None: |
| df_f = df_f[df_f["Anexos"] == int(eq_val)] |
| if st.session_state.get(state_keys["f_tamanho_apply_equal"]) and "TamanhoKB" in df_f.columns: |
| eq_sz = st.session_state.get(state_keys["f_tamanho_equal"]) |
| if eq_sz is not None: |
| df_f = df_f[df_f["TamanhoKB"].round(1) == round(float(eq_sz), 1)] |
|
|
| |
| txt = st.session_state[state_keys["f_assunto"]] |
| if txt and "Assunto" in df_f.columns: |
| df_f = df_f[df_f["Assunto"].astype(str).str.contains(txt.strip(), case=False, na=False)] |
|
|
| cols_for_topn = st.session_state[state_keys["f_cols_topn"]] or [] |
|
|
| total_final = len(df_f) |
| st.caption(f"🧮 **Diagnóstico**: antes dos filtros = {total_inicial} | depois dos filtros = {total_final}") |
|
|
| return df_f, cols_for_topn |
|
|
| possiveis_cols_init = [c for c in ["Pasta", "PastaPath", "portaria", "cliente", "cliente_lista", "tipo", "placa", |
| "Importancia", "Categoria", "Remetente", "Status_join", |
| "liberacao_saida_responsavel", "liberacao_entrada_responsavel"] if c in df_f.columns] |
| desired_default_init = st.session_state[state_keys["f_cols_topn"]] or ["cliente", "Status_join"] |
| default_cols_init = _sanitize_default(possiveis_cols_init, desired_default_init) |
|
|
| st.caption(f"🧮 **Diagnóstico**: registros disponíveis (sem aplicar filtros) = {total_inicial}") |
| return df, default_cols_init |
|
|
| |
| |
| |
| def main(): |
| st.title("📧 Relatório • Outlook Desktop (Estruturado)") |
| st.caption("Escolha a Caixa (Entrada ou Itens Enviados), defina o período e extraia campos estruturados do e‑mail.") |
| st.markdown(_STYLES, unsafe_allow_html=True) |
|
|
| |
| st.session_state.setdefault("__outlook_df__", None) |
| st.session_state.setdefault("__outlook_meta__", None) |
|
|
| with st.expander("⚙️ Configurações", expanded=True): |
| with st.form("form_execucao_outlook"): |
| col_a, col_b, col_c = st.columns([1, 1, 1]) |
| dias = col_a.slider("Período (últimos N dias)", min_value=1, max_value=365, value=30) |
| filtro_remetente = col_b.text_input("Filtrar por remetente (opcional)", value="", placeholder='Ex.: "@fornecedor.com" ou "Fulano"') |
| extrair_campos = col_c.checkbox("Extrair campos do e‑mail (modelo padrão)", value=True) |
|
|
| |
| tipo_pasta = st.selectbox( |
| "Tipo de Caixa:", |
| ["📥 Caixa de Entrada", "📤 Itens Enviados"], |
| help="Escolha se deseja ler a Caixa de Entrada ou os Itens Enviados." |
| ) |
|
|
| mapa_caminho = { |
| "📥 Caixa de Entrada": "Inbox", |
| "📤 Itens Enviados": "Sent Items" |
| } |
| pasta_base = mapa_caminho[tipo_pasta] |
|
|
| subpasta_manual = st.text_input( |
| "Subpasta (opcional)", |
| value="", |
| placeholder="Ex.: Financeiro\\Operacional" |
| ) |
|
|
| if subpasta_manual.strip(): |
| pasta_final = f"{pasta_base}\\{subpasta_manual.strip()}" |
| else: |
| pasta_final = pasta_base |
|
|
| pastas_escolhidas = [pasta_final] |
|
|
| |
| col_btn1, col_btn2, col_btn3, col_btn4 = st.columns([1, 1, 1, 1]) |
| submit_cache = col_btn1.form_submit_button("🔍 Gerar (com cache)") |
| submit_fresh = col_btn2.form_submit_button("⚡ Atualizar agora (sem cache)") |
| submit_clear = col_btn3.form_submit_button("🧹 Limpar cache") |
| submit_test = col_btn4.form_submit_button("🧪 Teste de conexão") |
|
|
| |
| if submit_clear: |
| try: |
| _cache_outlook_df.clear() |
| except Exception: |
| try: |
| st.cache_data.clear() |
| except Exception: |
| pass |
| st.session_state["__outlook_df__"] = None |
| st.session_state["__outlook_meta__"] = None |
| st.success("Cache limpo. Gere novamente os dados.") |
|
|
| if submit_test: |
| try: |
| import win32com.client |
| pythoncom.CoInitialize() |
| ns = win32com.client.Dispatch("Outlook.Application").GetNamespace("MAPI") |
| stores = ns.Folders.Count |
| inbox_def = ns.GetDefaultFolder(6) |
| sent_def = ns.GetDefaultFolder(5) |
| st.info(f"Stores detectadas: **{stores}** | Inbox padrão: **{inbox_def.Name}** | Sent: **{sent_def.Name}**") |
| if pastas_escolhidas: |
| for p in pastas_escolhidas: |
| try: |
| f = _get_folder_by_path_any_store(ns, p) |
| st.write(f"📁 {p} → '{f.Name}' itens: {f.Items.Count}") |
| except Exception as e: |
| st.warning(f"[Teste] Falha ao acessar '{p}': {e}") |
| except Exception as e: |
| st.error(f"[Teste] Erro ao conectar: {e}") |
| finally: |
| try: |
| pythoncom.CoUninitialize() |
| except Exception: |
| pass |
|
|
| if submit_fresh: |
| if not pastas_escolhidas: |
| st.error("Selecione ao menos uma pasta.") |
| else: |
| with st.spinner("Lendo e-mails do Outlook (sem cache)..."): |
| df_fresh = _read_outlook_fresh(pastas_escolhidas, dias, filtro_remetente, extrair_campos) |
| st.session_state["__outlook_df__"] = df_fresh |
| st.session_state["__outlook_meta__"] = { |
| "pastas": list(pastas_escolhidas), |
| "dias": dias, |
| "filtro_remetente": filtro_remetente, |
| "extrair_campos": extrair_campos, |
| "loaded_at": datetime.now().strftime("%d/%m/%Y %H:%M"), |
| "source": "fresh" |
| } |
|
|
| if submit_cache: |
| if not pastas_escolhidas: |
| st.error("Selecione ao menos uma pasta.") |
| else: |
| with st.spinner("Lendo e-mails do Outlook (com cache)..."): |
| df = _cache_outlook_df(tuple(pastas_escolhidas), dias, filtro_remetente, extrair_campos, _v=1) |
| st.session_state["__outlook_df__"] = df |
| st.session_state["__outlook_meta__"] = { |
| "pastas": list(pastas_escolhidas), |
| "dias": dias, |
| "filtro_remetente": filtro_remetente, |
| "extrair_campos": extrair_campos, |
| "loaded_at": datetime.now().strftime("%d/%m/%Y %H:%M"), |
| "source": "cache" |
| } |
|
|
| |
| df_src = st.session_state.get("__outlook_df__") |
| meta = st.session_state.get("__outlook_meta__") |
|
|
| if isinstance(df_src, pd.DataFrame): |
| origem = (meta or {}).get("source", "cache/buffer") |
| if df_src.empty: |
| st.info("Nenhum e-mail encontrado para os parâmetros informados. Use o botão 🧪 Teste de conexão para diagnosticar.") |
| return |
|
|
| |
| pastas_lbl = ", ".join(meta.get("pastas") or []) |
| extra_flag = "SIM" if meta.get("extrair_campos") else "NÃO" |
| st.markdown( |
| f""" |
| <div class="status-bar"> |
| <div class="status-line"> |
| <div class="badge">📂 Pastas: <strong>{pastas_lbl}</strong></div> |
| <div class="badge gray">🗓️ Dias: <strong>{meta.get('dias')}</strong></div> |
| <div class="badge green">🔍 Origem: <strong>{origem}</strong></div> |
| <div class="badge amber">🧩 Extração: <strong>{extra_flag}</strong></div> |
| <div class="badge gray">⏱️ Em: <strong>{meta.get('loaded_at')}</strong></div> |
| </div> |
| </div> |
| """, unsafe_allow_html=True |
| ) |
|
|
| |
| df_filtrado, cols_for_topn = _build_dynamic_filters(df_src) |
|
|
| |
| if "placa" in df_filtrado.columns and "Placa" not in df_filtrado.columns: |
| df_filtrado = df_filtrado.copy() |
| df_filtrado["Placa"] = df_filtrado["placa"] |
| if "cliente_lista" in df_filtrado.columns and "ClienteLista" not in df_filtrado.columns: |
| df_filtrado = df_filtrado.copy() |
| df_filtrado["ClienteLista"] = df_filtrado["cliente_lista"].apply(lambda lst: "; ".join(lst) if isinstance(lst, list) else "") |
|
|
| |
| tab_geral, tab_tabela, tab_kpis, tab_inds, tab_diag = st.tabs( |
| ["🧭 Visão Geral", "📄 Tabela", "📈 KPIs & Gráficos", "🏆 Indicadores (Top N)", "🛠️ Diagnóstico"] |
| ) |
|
|
| with tab_geral: |
| |
| qtd = len(df_filtrado) |
| unicos_pastas = df_filtrado["PastaPath"].nunique() if "PastaPath" in df_filtrado.columns else df_filtrado["Pasta"].nunique() if "Pasta" in df_filtrado.columns else None |
| dt_min = pd.to_datetime(df_filtrado["RecebidoEm"], errors="coerce").min() |
| dt_max = pd.to_datetime(df_filtrado["RecebidoEm"], errors="coerce").max() |
| st.write(f"**Registros após filtros:** {qtd} " |
| + (f"• **Pastas (únicas)**: {unicos_pastas}" if unicos_pastas is not None else "") |
| + (f" • **Intervalo**: {dt_min.strftime('%d/%m/%Y %H:%M')} → {dt_max.strftime('%d/%m/%Y %H:%M')}" if pd.notna(dt_min) and pd.notna(dt_max) else "") |
| ) |
|
|
| |
| if "RecebidoEm" in df_filtrado.columns: |
| try: |
| dts = pd.to_datetime(df_filtrado["RecebidoEm"], errors="coerce") |
| series = dts.dt.date.value_counts().sort_index() |
| fig = px.bar(series, x=series.index, y=series.values, labels={"x":"Data", "y":"Qtd"}, |
| title="Mensagens por dia", template="plotly_white") |
| fig.update_layout(height=360) |
| st.plotly_chart(fig, use_container_width=True) |
| except Exception: |
| pass |
|
|
| |
| |
| |
| with tab_tabela: |
| df_show = _build_table_view_unique_client(df_filtrado) |
|
|
| |
| colcfg = {} |
|
|
| |
| if "Data" in df_show.columns: |
| try: |
| colcfg["Data"] = st.column_config.DateColumn("Data", format="DD/MM/YYYY") |
| except Exception: |
| colcfg["Data"] = st.column_config.TextColumn("Data") |
|
|
| if "Hora" in df_show.columns: |
| |
| try: |
| colcfg["Hora"] = st.column_config.TimeColumn("Hora", format="HH:mm") |
| except Exception: |
| colcfg["Hora"] = st.column_config.TextColumn("Hora") |
|
|
| if "Cliente" in df_show.columns: |
| colcfg["Cliente"] = st.column_config.TextColumn("Cliente") |
|
|
| if "Placa" in df_show.columns: |
| colcfg["Placa"] = st.column_config.TextColumn("Placa") |
|
|
| if "tipo" in df_show.columns: |
| colcfg["tipo"] = st.column_config.TextColumn("Tipo") |
|
|
| if "Status_join" in df_show.columns: |
| colcfg["Status_join"] = st.column_config.TextColumn("Status") |
|
|
| if "Anexos" in df_show.columns: |
| colcfg["Anexos"] = st.column_config.NumberColumn("Anexos", format="%d") |
|
|
| if "TamanhoKB" in df_show.columns: |
| colcfg["TamanhoKB"] = st.column_config.NumberColumn("Tamanho (KB)", format="%.1f") |
|
|
| if "Remetente" in df_show.columns: |
| colcfg["Remetente"] = st.column_config.TextColumn("Remetente") |
|
|
| if "PastaPath" in df_show.columns: |
| colcfg["PastaPath"] = st.column_config.TextColumn("Caminho da Pasta") |
|
|
| if "Pasta" in df_show.columns: |
| colcfg["Pasta"] = st.column_config.TextColumn("Pasta") |
|
|
| if "portaria" in df_show.columns: |
| colcfg["portaria"] = st.column_config.TextColumn("Portaria") |
|
|
| |
| if "📎" in df_show.columns: |
| colcfg["📎"] = st.column_config.TextColumn("Anexo") |
| if "🔔" in df_show.columns: |
| colcfg["🔔"] = st.column_config.TextColumn("Importância") |
| if "👁️" in df_show.columns: |
| colcfg["👁️"] = st.column_config.TextColumn("Lido") |
|
|
| st.dataframe( |
| df_show, |
| use_container_width=True, |
| hide_index=True, |
| column_config=colcfg, |
| height=560 |
| ) |
|
|
| st.caption("🔎 A Tabela está em visão **por cliente** (uma linha por cliente) e com **Data** e **Hora** separadas.") |
|
|
| with tab_kpis: |
| _render_kpis(df_filtrado) |
|
|
| with tab_inds: |
| _render_indicators_custom(df_filtrado, dt_col_name="RecebidoEm", cols_for_topn=cols_for_topn, topn_default=10) |
|
|
| with tab_diag: |
| st.write("**Colunas disponíveis**:", list(df_src.columns)) |
| st.write("**Linhas (antes filtros)**:", len(df_src)) |
| st.write("**Linhas (após filtros)**:", len(df_filtrado)) |
| if "PastaPath" in df_src.columns: |
| st.write("**Pastas distintas**:", df_src["PastaPath"].nunique()) |
| if "__corpo__" in df_src.columns: |
| st.text_area("Amostra de corpo (debug, 1ª linha):", value=str(df_src["__corpo__"].dropna().head(1).values[0]) if df_src["__corpo__"].dropna().any() else "", height=120) |
|
|
| |
| |
| |
| base_name = f"relatorio_outlook_desktop_{date.today()}" |
| df_to_export = _build_table_view_unique_client(df_filtrado) |
| _build_downloads(df_to_export, base_name=base_name) |
|
|
| |
| if _HAS_AUDIT and meta: |
| try: |
| registrar_log( |
| usuario=st.session_state.get("usuario"), |
| acao=f"Relatório Outlook (pastas={len(meta['pastas'])}, dias={meta['dias']}, extrair={origem}) — filtros aplicados", |
| tabela="outlook_relatorio", |
| registro_id=None |
| ) |
| except Exception: |
| pass |
|
|
| else: |
| st.info("👉 Selecione a caixa (Entrada/Saída), opcionalmente uma subpasta, e clique em **🔍 Gerar (com cache)** ou **⚡ Atualizar agora (sem cache)**. Use **🧪 Teste de conexão** se vier vazio.") |
|
|
| |
| |
|
|