Integration / app.py
Thslc99's picture
Update app.py
2201c00 verified
"""
LegalOne – PowerBI-like Dashboard (Streamlit)
---------------------------------------------
Visual com cards de KPI, filtros na sidebar, gráficos Plotly e tabela interativa.
- Upload do CSV do Legal One
- Mapeamento (opcional) de "escritorio_responsavel" -> "escritorio_cat" via JSON
- Download do CSV filtrado
Requisitos (requirements.txt):
streamlit>=1.37
pandas>=2.2
plotly>=5.22
numpy>=1.26
streamlit-aggrid>=0.3.4.post3
"""
from __future__ import annotations
import io
import json
import re
import unicodedata
from datetime import datetime
from typing import Optional
import numpy as np
import pandas as pd
import plotly.express as px
import streamlit as st
# AgGrid é opcional
try:
from st_aggrid import AgGrid, GridOptionsBuilder, GridUpdateMode
AG_AVAILABLE = True
except Exception:
AG_AVAILABLE = False
# ---------- Configuração de página e tema ----------
st.set_page_config(page_title="LegalOne Dashboard", layout="wide")
px.defaults.template = "plotly_dark"
px.defaults.width = None
px.defaults.height = 420
PRIMARY_BG = "#0f172a" # slate-900
CARD_BG = "#111827" # gray-900
TEXT = "#e5e7eb" # gray-200
SUBTLE = "#94a3b8" # slate-400
st.markdown(f"""
<style>
html, body, [class^="css"], .stApp {{ background-color: {PRIMARY_BG} !important; }}
.block-container {{ padding-top: 1rem; padding-bottom: 1rem; }}
.kpi-card {{ background: {CARD_BG}; border-radius: 16px; padding: 16px 18px; border: 1px solid #1f2937; }}
.kpi-label {{ color: {SUBTLE}; font-size: 0.9rem; margin-bottom: 4px; }}
.kpi-value {{ color: {TEXT}; font-size: 1.8rem; font-weight: 700; }}
.section-title {{ color: {TEXT}; font-weight: 700; font-size: 1.2rem; margin: 12px 0 8px 0; }}
hr {{ border: none; border-top: 1px solid #1f2937; margin: 8px 0 16px 0; }}
</style>
""", unsafe_allow_html=True)
# ---------- Utilidades ----------
def _norm_text(s: Optional[str]) -> str:
if s is None:
return ""
s = str(s).strip().lower()
s = "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn")
s = re.sub(r"\s+", " ", s)
return s
@st.cache_data(show_spinner=False)
def load_csv(upload) -> pd.DataFrame:
"""
Lê CSV, normaliza cabeçalhos e garante colunas canônicas.
Garante 'escritorio_cat' (A) e tipagem básica.
"""
if upload is None:
return pd.DataFrame()
# 1) lê CSV
df = pd.read_csv(upload)
# 2) normaliza nomes de colunas (lower, sem acento, sem espaços extras)
original_cols = list(df.columns)
norm_cols = [_norm_text(c) for c in original_cols]
rename_map = dict(zip(original_cols, norm_cols))
df = df.rename(columns=rename_map)
# 3) mapeia cabeçalhos comuns -> canônicos
header_map = {
# processo
"numero do processo": "processo_numero",
"número do processo": "processo_numero",
"processo numero": "processo_numero",
"processo": "processo_numero",
"num do processo": "processo_numero",
"nº do processo": "processo_numero",
# cliente / parte contrária
"cliente": "cliente",
"parte contraria": "contrario",
"parte contrária": "contrario",
"contrario": "contrario",
# valores
"valor da causa": "valor_causa",
"valor causa": "valor_causa",
"valor_causa": "valor_causa",
# ação / natureza / área
"acao": "acao",
"ação": "acao",
"natureza": "natureza",
"area": "area",
"área": "area",
# órgãos / localização
"orgao": "orgao",
"órgão": "orgao",
"comarca": "comarca",
"tribunal": "tribunal",
"vara": "vara",
# situação / datas / posição do cliente
"situacao": "situacao",
"situação": "situacao",
"data de ajuizamento": "data_ajuizamento",
"data ajuizamento": "data_ajuizamento",
"posicao do cliente": "posicao_cliente",
"posição do cliente": "posicao_cliente",
# escritório
"escritorio responsavel": "escritorio_responsavel",
"escritório responsavel": "escritorio_responsavel",
"escritório responsável": "escritorio_responsavel",
"escritorio_responsavel": "escritorio_responsavel",
"escritorio cat": "escritorio_cat",
"categoria escritorio": "escritorio_cat",
"categoria do escritorio": "escritorio_cat",
# página
"pagina": "pagina_pdf",
"página": "pagina_pdf",
"pagina pdf": "pagina_pdf",
"página pdf": "pagina_pdf",
}
df = df.rename(columns={c: header_map[c] for c in list(df.columns) if c in header_map})
# 4) garante as colunas canônicas (cria se não existir)
cols = [
"processo_numero","cliente","contrario","valor_causa","acao","natureza","area","orgao",
"comarca","tribunal","vara","situacao","data_ajuizamento","posicao_cliente",
"escritorio_responsavel","escritorio_cat","pagina_pdf"
]
for c in cols:
if c not in df.columns:
df[c] = None
# 5) tipagem
df["valor_causa"] = pd.to_numeric(df["valor_causa"], errors="coerce")
df["data_ajuizamento"] = pd.to_datetime(df["data_ajuizamento"], errors="coerce")
# 6) A: garante categoria (se vazia, usa responsável)
df["escritorio_cat"] = (
df["escritorio_cat"]
.fillna(df["escritorio_responsavel"])
.astype(str)
.replace({"None": None})
)
# devolve apenas canônicas
return df[cols]
@st.cache_data(show_spinner=False)
def apply_mapping(df: pd.DataFrame, mapping_json: str) -> pd.DataFrame:
"""Aplica mapeamento JSON (responsável -> categoria)."""
if df.empty or not mapping_json:
return df
try:
raw = json.loads(mapping_json)
mapping = {_norm_text(k): v for k, v in raw.items()}
out = df.copy()
out["escritorio_cat"] = out["escritorio_responsavel"].apply(
lambda x: mapping.get(_norm_text(x), x)
)
return out
except Exception:
return df
@st.cache_data(show_spinner=False)
def filter_df(
df: pd.DataFrame,
cliente_q: str,
escritorio_cat: list[str],
tribunal: list[str],
natureza: list[str],
acao: list[str],
periodo: tuple[datetime, datetime] | None,
) -> pd.DataFrame:
if df.empty:
return df
out = df.copy()
if cliente_q:
out = out[out["cliente"].astype(str).str.contains(cliente_q, case=False, na=False)]
if escritorio_cat:
out = out[out["escritorio_cat"].isin(escritorio_cat)]
if tribunal:
out = out[out["tribunal"].isin(tribunal)]
if natureza:
out = out[out["natureza"].isin(natureza)]
if acao:
out = out[out["acao"].isin(acao)]
if periodo and not pd.isna(out["data_ajuizamento"]).all():
start, end = periodo
out = out[
(out["data_ajuizamento"] >= pd.to_datetime(start)) &
(out["data_ajuizamento"] <= pd.to_datetime(end))
]
return out
# ---------- Sidebar (upload + filtros) ----------
st.sidebar.title("⚙️ Filtros")
up = st.sidebar.file_uploader("CSV do Legal One", type=["csv"])
mapping_str = st.sidebar.text_area(
"Mapeamento (JSON opcional) — escritório bruto → categoria",
value='{"CÍVEL PARTIDO":"Cível – Partido","CÍVEL INDIVIDUAL":"Cível – Individual","CÍVEL RECUPERAÇÃO DE CRÉDITO":"Cível – Recuperação de Crédito"}',
height=120,
)
base = load_csv(up)
base = apply_mapping(base, mapping_str)
# B: reforço pós-mapeamento + extra para CSVs antigos
if "escritorio_cat" not in base.columns:
base["escritorio_cat"] = base.get("escritorio_responsavel", "")
base["escritorio_cat"] = base["escritorio_cat"].fillna(base.get("escritorio_responsavel", "")).astype(str)
for col in ["tribunal", "natureza", "acao", "data_ajuizamento", "valor_causa", "processo_numero", "cliente", "contrario"]:
if col not in base.columns:
base[col] = None
# C: guard-clause antes de montar filtros
if base.empty:
st.info("Faça upload do CSV para começar.")
st.stop()
# valores únicos para filtros
cats = sorted([c for c in base["escritorio_cat"].dropna().unique()])
tribs = sorted([t for t in base["tribunal"].dropna().unique()])
nats = sorted([n for n in base["natureza"].dropna().unique()])
actions = sorted([a for a in base["acao"].dropna().unique()])
st.sidebar.markdown("---")
cliente_q = st.sidebar.text_input("Cliente contém")
sel_cat = st.sidebar.multiselect("Categoria de Escritório", cats)
sel_trib = st.sidebar.multiselect("Tribunal", tribs)
sel_nat = st.sidebar.multiselect("Natureza", nats)
sel_acao = st.sidebar.multiselect("Ação", actions)
min_dt = pd.to_datetime(base["data_ajuizamento"]).min()
max_dt = pd.to_datetime(base["data_ajuizamento"]).max()
if pd.isna(min_dt) or pd.isna(max_dt):
period = None
else:
period = st.sidebar.date_input(
"Período (Data de ajuizamento)",
value=(min_dt.date(), max_dt.date())
)
f = filter_df(base, cliente_q, sel_cat, sel_trib, sel_nat, sel_acao, period)
# ---------- KPIs ----------
col1, col2, col3, col4 = st.columns(4)
with col1:
st.markdown(
f'<div class="kpi-card"><div class="kpi-label">Processos</div>'
f'<div class="kpi-value">{int(f["processo_numero"].nunique())}</div></div>',
unsafe_allow_html=True,
)
with col2:
st.markdown(
f'<div class="kpi-card"><div class="kpi-label">Clientes</div>'
f'<div class="kpi-value">{int(f["cliente"].nunique())}</div></div>',
unsafe_allow_html=True,
)
with col3:
st.markdown(
f'<div class="kpi-card"><div class="kpi-label">Categorias de Escritório</div>'
f'<div class="kpi-value">{int(f["escritorio_cat"].nunique())}</div></div>',
unsafe_allow_html=True,
)
with col4:
total_valor = float(f["valor_causa"].fillna(0).sum())
st.markdown(
f'<div class="kpi-card"><div class="kpi-label">Soma Valor da Causa</div>'
f'<div class="kpi-value">R$ {total_valor:,.2f}</div></div>',
unsafe_allow_html=True,
)
st.markdown("<div class='section-title'>Visão Geral</div>", unsafe_allow_html=True)
# ---------- Gráficos ----------
gcol1, gcol2 = st.columns(2)
with gcol1:
top_cat = (
f.groupby("escritorio_cat")["processo_numero"].nunique()
.sort_values(ascending=False).head(15).reset_index(name="qtd")
)
fig1 = px.bar(top_cat, x="escritorio_cat", y="qtd",
title="Processos por Categoria de Escritório (Top 15)")
fig1.update_layout(margin=dict(l=10, r=10, b=10, t=50))
st.plotly_chart(fig1, use_container_width=True)
with gcol2:
by_tri = (
f.groupby("tribunal")["processo_numero"].nunique()
.sort_values(ascending=False).reset_index(name="qtd")
)
fig2 = px.bar(by_tri, x="tribunal", y="qtd", title="Processos por Tribunal")
fig2.update_layout(margin=dict(l=10, r=10, b=10, t=50))
st.plotly_chart(fig2, use_container_width=True)
if not f["data_ajuizamento"].isna().all():
ts = f.dropna(subset=["data_ajuizamento"]).copy()
ts["mes"] = ts["data_ajuizamento"].dt.to_period("M").dt.to_timestamp()
serie = ts.groupby("mes")["processo_numero"].nunique().reset_index(name="qtd")
fig3 = px.line(serie, x="mes", y="qtd", markers=True,
title="Processos por mês (Data de ajuizamento)")
fig3.update_layout(margin=dict(l=10, r=10, b=10, t=50))
st.plotly_chart(fig3, use_container_width=True)
vc = f["valor_causa"].dropna()
if len(vc) > 0:
fig4 = px.histogram(f, x="valor_causa", nbins=30,
title="Distribuição do Valor da Causa")
fig4.update_layout(margin=dict(l=10, r=10, b=10, t=50))
st.plotly_chart(fig4, use_container_width=True)
st.markdown("<div class='section-title'>Tabela</div>", unsafe_allow_html=True)
# ---------- Tabela ----------
if AG_AVAILABLE:
gob = GridOptionsBuilder.from_dataframe(f)
gob.configure_pagination(paginationAutoPageSize=False, paginationPageSize=20)
gob.configure_side_bar()
gob.configure_default_column(filter=True, sortable=True, resizable=True)
gob.configure_selection("single")
grid_options = gob.build()
AgGrid(
f,
gridOptions=grid_options,
update_mode=GridUpdateMode.MODEL_CHANGED,
theme="alpine",
height=420,
fit_columns_on_grid_load=True,
)
else:
st.dataframe(f, use_container_width=True)
# ---------- Export ----------
st.markdown("---")
buff = io.StringIO()
f.to_csv(buff, index=False)
st.download_button(
label="⬇️ Baixar CSV filtrado",
data=buff.getvalue(),
file_name="processos_filtrado.csv",
mime="text/csv",
)
st.caption("Feito com Streamlit + Plotly. Visual inspirado no Power BI.")