""" LegalOne – PowerBI-like Dashboard (Streamlit) --------------------------------------------- Visual com cards de KPI, filtros na sidebar, gráficos Plotly e tabela interativa. - Upload do CSV do Legal One - Mapeamento (opcional) de "escritorio_responsavel" -> "escritorio_cat" via JSON - Download do CSV filtrado Requisitos (requirements.txt): streamlit>=1.37 pandas>=2.2 plotly>=5.22 numpy>=1.26 streamlit-aggrid>=0.3.4.post3 """ from __future__ import annotations import io import json import re import unicodedata from datetime import datetime from typing import Optional import numpy as np import pandas as pd import plotly.express as px import streamlit as st # AgGrid é opcional try: from st_aggrid import AgGrid, GridOptionsBuilder, GridUpdateMode AG_AVAILABLE = True except Exception: AG_AVAILABLE = False # ---------- Configuração de página e tema ---------- st.set_page_config(page_title="LegalOne Dashboard", layout="wide") px.defaults.template = "plotly_dark" px.defaults.width = None px.defaults.height = 420 PRIMARY_BG = "#0f172a" # slate-900 CARD_BG = "#111827" # gray-900 TEXT = "#e5e7eb" # gray-200 SUBTLE = "#94a3b8" # slate-400 st.markdown(f""" """, unsafe_allow_html=True) # ---------- Utilidades ---------- def _norm_text(s: Optional[str]) -> str: if s is None: return "" s = str(s).strip().lower() s = "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn") s = re.sub(r"\s+", " ", s) return s @st.cache_data(show_spinner=False) def load_csv(upload) -> pd.DataFrame: """ Lê CSV, normaliza cabeçalhos e garante colunas canônicas. Garante 'escritorio_cat' (A) e tipagem básica. """ if upload is None: return pd.DataFrame() # 1) lê CSV df = pd.read_csv(upload) # 2) normaliza nomes de colunas (lower, sem acento, sem espaços extras) original_cols = list(df.columns) norm_cols = [_norm_text(c) for c in original_cols] rename_map = dict(zip(original_cols, norm_cols)) df = df.rename(columns=rename_map) # 3) mapeia cabeçalhos comuns -> canônicos header_map = { # processo "numero do processo": "processo_numero", "número do processo": "processo_numero", "processo numero": "processo_numero", "processo": "processo_numero", "num do processo": "processo_numero", "nº do processo": "processo_numero", # cliente / parte contrária "cliente": "cliente", "parte contraria": "contrario", "parte contrária": "contrario", "contrario": "contrario", # valores "valor da causa": "valor_causa", "valor causa": "valor_causa", "valor_causa": "valor_causa", # ação / natureza / área "acao": "acao", "ação": "acao", "natureza": "natureza", "area": "area", "área": "area", # órgãos / localização "orgao": "orgao", "órgão": "orgao", "comarca": "comarca", "tribunal": "tribunal", "vara": "vara", # situação / datas / posição do cliente "situacao": "situacao", "situação": "situacao", "data de ajuizamento": "data_ajuizamento", "data ajuizamento": "data_ajuizamento", "posicao do cliente": "posicao_cliente", "posição do cliente": "posicao_cliente", # escritório "escritorio responsavel": "escritorio_responsavel", "escritório responsavel": "escritorio_responsavel", "escritório responsável": "escritorio_responsavel", "escritorio_responsavel": "escritorio_responsavel", "escritorio cat": "escritorio_cat", "categoria escritorio": "escritorio_cat", "categoria do escritorio": "escritorio_cat", # página "pagina": "pagina_pdf", "página": "pagina_pdf", "pagina pdf": "pagina_pdf", "página pdf": "pagina_pdf", } df = df.rename(columns={c: header_map[c] for c in list(df.columns) if c in header_map}) # 4) garante as colunas canônicas (cria se não existir) cols = [ "processo_numero","cliente","contrario","valor_causa","acao","natureza","area","orgao", "comarca","tribunal","vara","situacao","data_ajuizamento","posicao_cliente", "escritorio_responsavel","escritorio_cat","pagina_pdf" ] for c in cols: if c not in df.columns: df[c] = None # 5) tipagem df["valor_causa"] = pd.to_numeric(df["valor_causa"], errors="coerce") df["data_ajuizamento"] = pd.to_datetime(df["data_ajuizamento"], errors="coerce") # 6) A: garante categoria (se vazia, usa responsável) df["escritorio_cat"] = ( df["escritorio_cat"] .fillna(df["escritorio_responsavel"]) .astype(str) .replace({"None": None}) ) # devolve apenas canônicas return df[cols] @st.cache_data(show_spinner=False) def apply_mapping(df: pd.DataFrame, mapping_json: str) -> pd.DataFrame: """Aplica mapeamento JSON (responsável -> categoria).""" if df.empty or not mapping_json: return df try: raw = json.loads(mapping_json) mapping = {_norm_text(k): v for k, v in raw.items()} out = df.copy() out["escritorio_cat"] = out["escritorio_responsavel"].apply( lambda x: mapping.get(_norm_text(x), x) ) return out except Exception: return df @st.cache_data(show_spinner=False) def filter_df( df: pd.DataFrame, cliente_q: str, escritorio_cat: list[str], tribunal: list[str], natureza: list[str], acao: list[str], periodo: tuple[datetime, datetime] | None, ) -> pd.DataFrame: if df.empty: return df out = df.copy() if cliente_q: out = out[out["cliente"].astype(str).str.contains(cliente_q, case=False, na=False)] if escritorio_cat: out = out[out["escritorio_cat"].isin(escritorio_cat)] if tribunal: out = out[out["tribunal"].isin(tribunal)] if natureza: out = out[out["natureza"].isin(natureza)] if acao: out = out[out["acao"].isin(acao)] if periodo and not pd.isna(out["data_ajuizamento"]).all(): start, end = periodo out = out[ (out["data_ajuizamento"] >= pd.to_datetime(start)) & (out["data_ajuizamento"] <= pd.to_datetime(end)) ] return out # ---------- Sidebar (upload + filtros) ---------- st.sidebar.title("⚙️ Filtros") up = st.sidebar.file_uploader("CSV do Legal One", type=["csv"]) mapping_str = st.sidebar.text_area( "Mapeamento (JSON opcional) — escritório bruto → categoria", value='{"CÍVEL PARTIDO":"Cível – Partido","CÍVEL INDIVIDUAL":"Cível – Individual","CÍVEL RECUPERAÇÃO DE CRÉDITO":"Cível – Recuperação de Crédito"}', height=120, ) base = load_csv(up) base = apply_mapping(base, mapping_str) # B: reforço pós-mapeamento + extra para CSVs antigos if "escritorio_cat" not in base.columns: base["escritorio_cat"] = base.get("escritorio_responsavel", "") base["escritorio_cat"] = base["escritorio_cat"].fillna(base.get("escritorio_responsavel", "")).astype(str) for col in ["tribunal", "natureza", "acao", "data_ajuizamento", "valor_causa", "processo_numero", "cliente", "contrario"]: if col not in base.columns: base[col] = None # C: guard-clause antes de montar filtros if base.empty: st.info("Faça upload do CSV para começar.") st.stop() # valores únicos para filtros cats = sorted([c for c in base["escritorio_cat"].dropna().unique()]) tribs = sorted([t for t in base["tribunal"].dropna().unique()]) nats = sorted([n for n in base["natureza"].dropna().unique()]) actions = sorted([a for a in base["acao"].dropna().unique()]) st.sidebar.markdown("---") cliente_q = st.sidebar.text_input("Cliente contém") sel_cat = st.sidebar.multiselect("Categoria de Escritório", cats) sel_trib = st.sidebar.multiselect("Tribunal", tribs) sel_nat = st.sidebar.multiselect("Natureza", nats) sel_acao = st.sidebar.multiselect("Ação", actions) min_dt = pd.to_datetime(base["data_ajuizamento"]).min() max_dt = pd.to_datetime(base["data_ajuizamento"]).max() if pd.isna(min_dt) or pd.isna(max_dt): period = None else: period = st.sidebar.date_input( "Período (Data de ajuizamento)", value=(min_dt.date(), max_dt.date()) ) f = filter_df(base, cliente_q, sel_cat, sel_trib, sel_nat, sel_acao, period) # ---------- KPIs ---------- col1, col2, col3, col4 = st.columns(4) with col1: st.markdown( f'