GT-PGV / app.py
fschwartzer's picture
Update app.py
6fd6740 verified
import pandas as pd
import numpy as np
import gradio as gr
import os
import io
import tempfile
import pytz
from datetime import datetime
import plotly.express as px
import plotly.graph_objects as go
# Path to the CSV file
CSV_FILE_PATH = "./dados/input_app_20250731.csv"
MAX_YEARS = 15 # Máximo de anos para a regra de transição
MAX_BANDS = 8 # Máximo de faixas de alíquota permitidas
# --- NEW HELPER FUNCTION ---
def calculate_column_widths(
df: pd.DataFrame, min_width=75, char_multiplier=9, padding=2
):
"""
Calculates optimal column widths for a Gradio DataFrame to fit content.
"""
if df is None or df.empty:
return None
widths = []
for col in df.columns:
header_len = len(str(col))
if not df[col].empty:
max_len_data = df[col].astype(str).str.len().max()
if pd.notna(max_len_data):
max_len = max(header_len, int(max_len_data))
else:
max_len = header_len
else:
max_len = header_len
calculated_width = (max_len + padding) * char_multiplier
widths.append(max(min_width, calculated_width))
return widths
# --- Otimização 1: Cache de Dados ---
def load_and_prepare_data():
"""
Carrega o CSV e pré-calcula todos os filtros booleanos que não dependem
dos inputs do usuário. Esta função deve ser chamada apenas uma vez.
"""
if not os.path.exists(CSV_FILE_PATH):
raise FileNotFoundError(f"Arquivo de dados não encontrado em: {CSV_FILE_PATH}")
df = pd.read_csv(CSV_FILE_PATH, low_memory=False)
df.columns = df.columns.str.strip()
required_cols = [
"TIPO_LANCAMENTO",
"DES_TIPO_ISENCAO_MULTI",
"VLR_IMPOSTO_iptu",
"VLR_VENAL_IMOVEL_TRIB",
"DES_FINALIDADE_unidade",
"DES_USO_unidade",
"IDF_TIPO_BENEFICIO",
"NUM_DIVISAO_FISCAL_iptu",
"VALOR_VENAL",
"VLR_IMPOSTO_CALCULADO_iptu",
"SITUACAO_ALAGAMENTO",
"PERC_ATING",
"ANO_FIM",
"ANO_INICIO",
]
missing_cols = [col for col in required_cols if col not in df.columns]
if missing_cols:
raise ValueError(f"Colunas ausentes no CSV: {', '.join(missing_cols)}")
for col_str in [
"TIPO_LANCAMENTO",
"DES_TIPO_ISENCAO_MULTI",
"DES_FINALIDADE_unidade",
"DES_USO_unidade",
"SITUACAO_ALAGAMENTO",
]:
if col_str in df.columns:
df[col_str] = df[col_str].astype(str).fillna("")
for col_num in [
"PERC_ATING",
"VALOR_VENAL",
"VLR_VENAL_IMOVEL_TRIB",
"VLR_IMPOSTO_iptu",
"VLR_IMPOSTO_CALCULADO_iptu",
"ANO_FIM",
"ANO_INICIO",
"IDF_TIPO_BENEFICIO",
"NUM_DIVISAO_FISCAL_iptu",
]:
df[col_num] = pd.to_numeric(df[col_num], errors="coerce").fillna(0)
df.rename(columns={"VALOR_VENAL": "valor_venal_calculo"}, inplace=True)
filters = {}
finalidades_estacionamentos = [
"ESPACO DE ESTACIONAMENTO RESIDENCIAL",
"ESPACO DE ESTACIONAMENTO NAO RESIDENCIAL",
"ESPACO ESTACIONAMENTO VINCULADO NAO RESID DESCOBERTO",
"ESPACO ESTACIONAMENTO VINCULADO RESID COBERTO",
"ESPACO ESTACIONAMENTO VINCULADO NAO RESID COBERTO",
"ESPACO DE ESTACIONAMENTO RESIDENCIAL DESCOBERTO",
"ESPACO DE ESTACIONAMENTO NAO RESIDENC DESCOBERTO",
"ESPACO ESTACIONAMENTO VINCULADO RESID DESCOBERTO",
]
finalidades_terrenos = [
"TERRENO",
"GLEBA",
"TERRENOS CONDOMINIO HORIZ ABERTO SEM AREA USO COMUM",
"TERRENO EM CONDOMINIO HORIZONTAL FECHADO",
"TERRENO EM CONDOMINIO HORIZONTAL ABERTO",
"SOBRA DE AREA",
"AREA A VISTORIAR",
]
sem_lancamento = (
df["TIPO_LANCAMENTO"].isin(
[
"Isenção Total por Benefício Fiscal",
"Imóvel com bloqueio de lançamento de IPTU/TCL",
"Não lançado - Valor total calculado menor que o mínimo",
"Isenção TCL Box e Não Lançado - valor menor que Mínimo",
"Isenção Técnica",
]
)
) | (
(df["TIPO_LANCAMENTO"] == "Isenção TCL - Box")
& (df["VLR_IMPOSTO_iptu"] == 0)
& (df["ANO_FIM"] >= 2026)
)
isentos = df["DES_TIPO_ISENCAO_MULTI"].str.contains("isen", case=False, na=False)
imunes = (
df["DES_TIPO_ISENCAO_MULTI"].str.contains("imun", case=False, na=False)
) | (df["DES_TIPO_ISENCAO_MULTI"].str.contains("IMUNIDADES", na=False))
nao_incidencia = df["DES_TIPO_ISENCAO_MULTI"].str.contains(
"NÃO INCIDÊNCIA DE IPTU", case=False, na=False
)
filters["nao_tributados"] = (
(isentos) | (imunes) | (sem_lancamento) | (nao_incidencia)
)
filters["isen_prop_loc"] = (
(
df["DES_TIPO_ISENCAO_MULTI"]
== "ISENÇÃO - PROPRIETARIO/USUFRUTUARIO APOSENTADO, INATIVO, PENSIONISTA"
)
| (
df["DES_TIPO_ISENCAO_MULTI"]
== "ISENÇÃO - LOCATÁRIO/COMODATARIO/ARRENDATÁRIO APOSENTADO, INATIVO, PENSIONISTA"
)
| (
df["DES_TIPO_ISENCAO_MULTI"]
== "ISENÇÃO - PROPRIETARIO/USUFRUTUARIO DEFICIENTE"
)
)
filters["isen_hab_pop"] = (
df["DES_TIPO_ISENCAO_MULTI"]
== "ISENÇÃO - HABITAÇÕES POPULARES DE EMPREENDIMENTOS HABITACIONAIS DESTINADOS PARA HABITAÇÃO DE INTERESSE SOCIAL"
) | (
df["DES_TIPO_ISENCAO_MULTI"]
== "ISENÇÃO - HABITAÇÕES POPULARES ORIUNDAS DE REGULARIZAÇÕES FUNDIÁRIAS PROMOVIDAS POR ÓRGÃOS PÚBLICOS"
)
estacionamentos_residenciais = (
df["DES_FINALIDADE_unidade"].isin(finalidades_estacionamentos)
) & (df["DES_USO_unidade"] == "Exclusivamente Residencial")
filters["estacionamentos_residenciais"] = estacionamentos_residenciais
filters["estacionamentos_residenciais_tributados"] = (
estacionamentos_residenciais & (~filters["nao_tributados"])
)
filters["estacionamentos_residenciais_nao_tributados"] = (
estacionamentos_residenciais & (filters["nao_tributados"])
)
estacionamentos_nao_residenciais = (
df["DES_FINALIDADE_unidade"].isin(finalidades_estacionamentos)
) & (df["DES_USO_unidade"] != "Exclusivamente Residencial")
filters["estacionamentos_nao_residenciais"] = estacionamentos_nao_residenciais
filters["estacionamentos_nao_residenciais_tributados"] = (
estacionamentos_nao_residenciais & (~filters["nao_tributados"])
)
filters["estacionamentos_nao_residenciais_nao_tributados"] = (
estacionamentos_nao_residenciais & (filters["nao_tributados"])
)
estacionamentos_geral = df["DES_FINALIDADE_unidade"].isin(
finalidades_estacionamentos
)
filters["estacionamentos_geral"] = estacionamentos_geral
filters["estacionamentos_geral_tributados"] = estacionamentos_geral & (
~filters["nao_tributados"]
)
filters["estacionamentos_geral_nao_tributados"] = (
estacionamentos_geral & (filters["nao_tributados"])
)
regra_tae_base = (
(df["DES_FINALIDADE_unidade"].isin(finalidades_terrenos))
& (df["IDF_TIPO_BENEFICIO"] == 301)
& ((df["ANO_INICIO"] >= 2023) | (df["ANO_FIM"] >= 2026) | (df["ANO_FIM"] == 0))
)
filters["terrenos_aliqesp"] = regra_tae_base & (~filters["nao_tributados"])
regra_taf_base = (df["DES_FINALIDADE_unidade"].isin(finalidades_terrenos)) & (
df["IDF_TIPO_BENEFICIO"] == 95
)
filters["terrenos_aliqfix"] = regra_taf_base & (~filters["nao_tributados"])
terrenos_1df_base = (
(df["DES_FINALIDADE_unidade"].isin(finalidades_terrenos))
& (df["NUM_DIVISAO_FISCAL_iptu"] == 1)
& (~regra_tae_base)
& (~regra_taf_base)
)
filters["terrenos_1df_base"] = terrenos_1df_base
filters["terrenos_1df_tributados"] = terrenos_1df_base & (
~filters["nao_tributados"]
)
filters["terrenos_1df_nao_tributados"] = (
terrenos_1df_base & (filters["nao_tributados"])
)
terrenos_2df_base = (
(df["DES_FINALIDADE_unidade"].isin(finalidades_terrenos))
& (df["NUM_DIVISAO_FISCAL_iptu"] == 2)
& (~regra_tae_base)
& (~regra_taf_base)
)
filters["terrenos_2df_base"] = terrenos_2df_base
filters["terrenos_2df_tributados"] = terrenos_2df_base & (
~filters["nao_tributados"]
)
filters["terrenos_2df_nao_tributados"] = (
terrenos_2df_base & (filters["nao_tributados"])
)
terrenos_3df_base = (
(df["DES_FINALIDADE_unidade"].isin(finalidades_terrenos))
& (df["NUM_DIVISAO_FISCAL_iptu"] == 3)
& (~regra_tae_base)
& (~regra_taf_base)
)
filters["terrenos_3df_base"] = terrenos_3df_base
filters["terrenos_3df_tributados"] = terrenos_3df_base & (
~filters["nao_tributados"]
)
filters["terrenos_3df_nao_tributados"] = (
terrenos_3df_base & (filters["nao_tributados"])
)
prediais_base = ~df["DES_FINALIDADE_unidade"].isin(
finalidades_estacionamentos + finalidades_terrenos
)
filters["prediais_base"] = prediais_base
prediais_residenciais = prediais_base & (
df["DES_USO_unidade"] == "Exclusivamente Residencial"
)
filters["prediais_residenciais"] = prediais_residenciais
filters["prediais_residenciais_tributados"] = (
prediais_residenciais
& (~filters["nao_tributados"])
& (~filters["isen_prop_loc"])
& (~filters["isen_hab_pop"])
)
filters["prediais_residenciais_nao_tributados"] = (
prediais_residenciais
& (filters["nao_tributados"])
& (~filters["isen_prop_loc"])
& (~filters["isen_hab_pop"])
)
prediais_nao_residenciais = (
prediais_base
& (df["DES_USO_unidade"] != "Exclusivamente Residencial")
& (~filters["isen_prop_loc"])
& (~filters["isen_hab_pop"])
)
filters["prediais_nao_residenciais"] = prediais_nao_residenciais
filters["prediais_nao_residenciais_tributados"] = prediais_nao_residenciais & (
~filters["nao_tributados"]
)
filters["prediais_nao_residenciais_nao_tributados"] = (
prediais_nao_residenciais & (filters["nao_tributados"])
)
prediais_geral = (
prediais_base & (~filters["isen_prop_loc"]) & (~filters["isen_hab_pop"])
)
filters["prediais_geral"] = prediais_geral
filters["prediais_geral_tributados"] = prediais_geral & (~filters["nao_tributados"])
filters["prediais_geral_nao_tributados"] = (
prediais_geral & (filters["nao_tributados"])
)
regra_tae_base = (
(df["DES_FINALIDADE_unidade"].isin(finalidades_terrenos))
& (df["IDF_TIPO_BENEFICIO"] == 301)
& ((df["ANO_INICIO"] >= 2023) | (df["ANO_FIM"] >= 2026) | (df["ANO_FIM"] == 0))
)
filters["regra_tae_base"] = regra_tae_base
filters["terrenos_aliqesp"] = regra_tae_base & (~filters["nao_tributados"])
regra_taf_base = (df["DES_FINALIDADE_unidade"].isin(finalidades_terrenos)) & (
df["IDF_TIPO_BENEFICIO"] == 95
)
filters["regra_taf_base"] = regra_taf_base
filters["terrenos_aliqfix"] = regra_taf_base & (~filters["nao_tributados"])
return df, filters
# --- NOVA FUNÇÃO: Carregar Cenário do Excel ---
def load_scenario_from_excel(file_obj):
if file_obj is None:
num_outputs = 1 + 5 + 2 + 4 + (1 + (MAX_BANDS - 1) * 2 + 1) * 9 + 2
return [gr.update()] * num_outputs
try:
params_df = pd.read_excel(file_obj.name, sheet_name="Parâmetros Utilizados")
params_df = params_df.set_index(params_df.columns[0]).astype(str)
def get_param(key, default, type_func):
try:
value = params_df.loc[key, "Valor"]
if value == "nan" or pd.isna(value):
return default
return type_func(value)
except (KeyError, ValueError):
return default
def get_percent_param(key, default):
try:
value = params_df.loc[key, "Valor"]
if value == "nan" or pd.isna(value):
return default
# Remove o símbolo % se presente e converte para float
if isinstance(value, str) and value.endswith('%'):
return float(value[:-1])
return float(value)
except (KeyError, ValueError):
return default
ufm_val = get_param("Valor da UFM", 5.771, float)
diff_est = get_param("Diferenciar Estacionamentos por Uso", "Não", str) == "Sim"
diff_pred = get_param("Diferenciar Prédios por Uso", "Não", str) == "Sim"
is_segregated = (
"REGRAS DE TRANSIÇÃO (CONFIGURAÇÃO POR CATEGORIA)" in params_df.index
)
segregar_trava = is_segregated
trava_perc_str = get_param(
"Ativar trava SE aumento for >=", "0.0%", str
).replace("%", "")
trava_perc = float(trava_perc_str) if trava_perc_str else 0.0
desconto_26 = get_percent_param(
"Desconto na Alíquota Efetiva (Enchente - Predial) 2026", 0.0
)
desconto_27 = get_percent_param(
"Desconto na Alíquota Efetiva (Enchente - Predial) 2027", 0.0
)
desconto_28 = get_percent_param(
"Desconto na Alíquota Efetiva (Enchente - Predial) 2028", 0.0
)
desconto_29 = get_percent_param(
"Desconto na Alíquota Efetiva (Enchente - Predial) 2029", 0.0
)
universal_rules = []
default_categories = [
"Predial Geral",
"Predial Residencial",
"Predial Não Residencial",
"Estacionamentos Geral",
"Estacionamentos Residenciais",
"Estacionamentos Não Residenciais",
"Terrenos 1ª DF",
"Terrenos 2ª DF",
"Terrenos 3ª DF",
"Terrenos Alíquota Especial",
"Terrenos Alíquota Fixa",
]
category_rules = {cat: [] for cat in default_categories}
if is_segregated:
current_category = None
for index, row in params_df.iterrows():
if str(index).startswith("Categoria:"):
current_category = index.replace("Categoria: ", "").strip()
elif str(index).startswith("Ano ") and current_category:
year = int(index.replace("Ano ", ""))
factor_str = row["Valor"].replace("%", "")
factor = float(factor_str) / 100.0 if factor_str else 1.0
if current_category in category_rules:
category_rules[current_category].append(
{"year": year, "factor": factor}
)
elif pd.isna(index):
current_category = None
else:
if "Ano" in params_df.index:
start_index_series = params_df.index.get_loc("Ano")
start_index = (
start_index_series
if isinstance(start_index_series, int)
else start_index_series[0]
)
start_index += 1
for i in range(start_index, len(params_df)):
idx_label = params_df.index[i]
if (
pd.isna(idx_label)
or not str(idx_label).replace(".0", "").isdigit()
):
break
year = int(float(idx_label))
factor_str = params_df.iloc[i]["Valor"].replace("%", "")
factor = float(factor_str) / 100.0 if factor_str else 1.0
universal_rules.append({"year": year, "factor": factor})
rate_inputs = []
prefixes = [
("PG", "PREDIAL GERAL (PG)"),
("PR", "PREDIAL RESIDENCIAL (PR)"),
("PNR", "PREDIAL NÃO RESIDENCIAL (PNR)"),
("EG", "ESTACIONAMENTOS GERAL (EG)"),
("ER", "ESTACIONAMENTOS RESIDENCIAIS (ER)"),
("ENR", "ESTACIONAMENTOS NÃO RESIDENCIAIS (ENR)"),
("T1DF", "TERRENOS 1ª DF (T1DF)"),
("T2DF", "TERRENOS 2ª DF (T2DF)"),
("T3DF", "TERRENOS 3ª DF (T3DF)"),
]
for p_code, p_excel in prefixes:
rate_inputs.append(
get_param(
f"{p_code} Número de Faixas",
MAX_BANDS if "T" not in p_code else 2,
int,
)
)
for i in range(1, MAX_BANDS):
rate_inputs.append(get_param(f"{p_code} Limite {i} (UFM)", 0, float))
rate_inputs.append(get_param(f"{p_code} Alíquota {i}", 0, float))
rate_inputs.append(get_param(f"{p_code} Alíquota {MAX_BANDS}", 0, float))
rate_inputs.append(get_param("TAE Alíquota 1", 0, float))
rate_inputs.append(get_param("TAF Alíquota 1", 0, float))
status_msg = f"Cenário carregado com sucesso do arquivo: {os.path.basename(file_obj.name)}"
return (
status_msg,
ufm_val,
diff_est,
diff_pred,
segregar_trava,
trava_perc,
universal_rules,
category_rules,
desconto_26,
desconto_27,
desconto_28,
desconto_29,
) + tuple(rate_inputs)
except Exception as e:
error_msg = f"Erro ao processar o arquivo: {e}. Verifique se o arquivo é válido e foi gerado por este aplicativo."
num_outputs = 1 + 5 + 2 + 4 + (1 + (MAX_BANDS - 1) * 2 + 1) * 9 + 2
return tuple([error_msg] + [gr.update()] * (num_outputs - 1))
# --- Chame a função UMA VEZ no escopo global ---
try:
THE_MERGE_GLOBAL, FILTERS_GLOBAL = load_and_prepare_data()
print("Dados carregados e pré-processados com sucesso.")
except (FileNotFoundError, ValueError) as e:
print(f"ERRO CRÍTICO AO INICIAR O APLICATIVO: {e}")
THE_MERGE_GLOBAL, FILTERS_GLOBAL = None, None
# --- Funções auxiliares de UI (MODIFICADAS) ---
def create_configurable_band_ui(prefix, default_num_bands, defaults_lim, defaults_aliq):
components_to_return = []
dropdown = gr.Dropdown(
choices=list(range(2, MAX_BANDS + 1)),
value=default_num_bands,
label="Número de Faixas",
)
components_to_return.append(dropdown)
rows = []
end_inputs_num = []
end_inputs_inf = []
end_inputs_interactive = []
for i in range(MAX_BANDS):
lim_val = defaults_lim[i] if i < len(defaults_lim) else 0
aliq_val = defaults_aliq[i] if i < len(defaults_aliq) else 0
prev_lim_val = (
defaults_lim[i - 1] if i > 0 and (i - 1) < len(defaults_lim) else 0
)
with gr.Row(
equal_height=True, variant="compact", visible=(i < default_num_bands)
) as r:
start = gr.Number(
label="Início do Intervalo",
value=0 if i == 0 else prev_lim_val,
interactive=False,
scale=1,
)
with gr.Column(scale=1, min_width=50):
is_last_visible_at_start = i == default_num_bands - 1
end_num = gr.Number(
label="Fim do Intervalo (UFM)",
value=lim_val,
visible=not is_last_visible_at_start,
scale=1,
)
end_inf = gr.Textbox(
value="Infinito",
label="Fim do Intervalo",
interactive=False,
visible=is_last_visible_at_start,
scale=1,
)
rate = gr.Number(label=f"Alíquota {i + 1}", value=aliq_val, scale=1)
rows.append(r)
end_inputs_num.append(end_num)
end_inputs_inf.append(end_inf)
end_inputs_interactive.append(end_num)
if i < MAX_BANDS - 1:
components_to_return.extend([end_num, rate])
else:
components_to_return.append(rate)
if i > 0:
end_inputs_interactive[i - 1].change(
lambda x: x,
inputs=end_inputs_interactive[i - 1],
outputs=start,
queue=False,
)
def update_visibility_and_swap(num_bands_selected):
row_updates = []
end_num_updates = []
end_inf_updates = []
for i in range(MAX_BANDS):
is_visible = i < num_bands_selected
is_last_visible = i == num_bands_selected - 1
row_updates.append(gr.update(visible=is_visible))
end_num_updates.append(
gr.update(visible=is_visible and not is_last_visible)
)
end_inf_updates.append(gr.update(visible=is_visible and is_last_visible))
return row_updates + end_num_updates + end_inf_updates
dropdown.change(
fn=update_visibility_and_swap,
inputs=dropdown,
outputs=rows + end_inputs_num + end_inputs_inf,
queue=False,
)
return components_to_return
# --- Função para criar a aba de parâmetros para o Excel (CORRIGIDA) ---
def create_parameters_sheet(
ufm_value,
diferencia_estacionamentos_flag,
diferencia_predios_flag,
trava_aumento_perc,
segregar_trava_flag,
universal_rules,
category_rules,
descontos_anuais,
all_parameters,
):
param_data = []
param_data.append(["CONFIGURAÇÕES GERAIS", "", ""])
param_data.append(
[
"Valor Venal Utilizado",
'Coluna "VALOR_VENAL"',
"Valor fixo do arquivo de entrada",
]
)
param_data.append(["Valor da UFM", ufm_value, ""])
param_data.append(
[
"Diferenciar Estacionamentos por Uso",
"Sim" if diferencia_estacionamentos_flag else "Não",
"",
]
)
param_data.append(
["Diferenciar Prédios por Uso", "Sim" if diferencia_predios_flag else "Não", ""]
)
param_data.append(["", "", ""])
param_data.append(["REGRAS DE TRANSIÇÃO (TRAVA DE AUMENTO)", "", ""])
param_data.append(
["Ativar trava SE aumento for >=", f"{trava_aumento_perc:.2f}%", ""]
)
if segregar_trava_flag:
param_data.append(["Configuração da Trava", "Por Categoria", ""])
param_data.append(["", "", ""])
sorted_categories = sorted(category_rules.keys())
for category in sorted_categories:
rules = category_rules.get(category, [])
if rules:
param_data.append(
[
f"Categoria: {category}",
"Fator Aplicado sobre o Aumento",
"Fórmula: IPTU_Atual + Fator * (IPTU_Projetado - IPTU_Atual)",
]
)
for rule in rules:
param_data.append(
[f"Ano {rule['year']}", f"{rule['factor'] * 100:.1f}%", ""]
)
param_data.append(["", "", ""])
else:
param_data.append(["Configuração da Trava", "Universal", ""])
param_data.append(
[
"Ano",
"Fator Aplicado sobre o Aumento",
"Fórmula: IPTU_Atual + Fator * (IPTU_Projetado - IPTU_Atual)",
]
)
for rule in universal_rules:
param_data.append([rule["year"], f"{rule['factor'] * 100:.1f}%", ""])
param_data.append(["", "", ""])
param_data.append(
["DESCONTO PARA ATINGIDOS PELA ENCHENTE (APENAS PREDIAL)", "", ""]
)
param_data.append(
[
"Descrição",
"Desconto Percentual na Alíquota Efetiva",
"O desconto é aplicado sobre o IPTU calculado e então a trava é aplicada.",
]
)
for year, discount in descontos_anuais.items():
param_data.append(
[
f"Desconto na Alíquota Efetiva (Enchente - Predial) {year}",
f"{discount:.2f}%",
"",
]
)
param_data.append(["", "", ""])
param_map = [
("PG", "PREDIAL GERAL (PG)"),
("PR", "PREDIAL RESIDENCIAL (PR)"),
("PNR", "PREDIAL NÃO RESIDENCIAL (PNR)"),
("EG", "ESTACIONAMENTOS GERAL (EG)"),
("ER", "ESTACIONAMENTOS RESIDENCIAIS (ER)"),
("ENR", "ESTACIONAMENTOS NÃO RESIDENCIAIS (ENR)"),
("T1DF", "TERRENOS 1ª DF (T1DF)"),
("T2DF", "TERRENOS 2ª DF (T2DF)"),
("T3DF", "TERRENOS 3ª DF (T3DF)"),
]
num_inputs_per_cat = 1 + (MAX_BANDS - 1) * 2 + 1
input_ptr = 0
for p_code, p_excel in param_map:
param_data.append([p_excel, "", ""])
cat_inputs = all_parameters[input_ptr : input_ptr + num_inputs_per_cat]
if len(cat_inputs) < num_inputs_per_cat:
while len(cat_inputs) < num_inputs_per_cat:
cat_inputs.append(0)
num_bands = cat_inputs[0] if cat_inputs[0] is not None else 2
rate_inputs = cat_inputs[1:]
param_data.append([f"{p_code} Número de Faixas", num_bands, ""])
# Processar limites e alíquotas das faixas intermediárias (1 a num_bands-1)
for i in range(num_bands - 1):
limite_idx = i * 2
aliquota_idx = i * 2 + 1
if limite_idx < len(rate_inputs) and aliquota_idx < len(rate_inputs):
limite = rate_inputs[limite_idx] if rate_inputs[limite_idx] is not None else 0
aliquota = rate_inputs[aliquota_idx] if rate_inputs[aliquota_idx] is not None else 0
else:
limite, aliquota = 0, 0
param_data.append([f"{p_code} Limite {i + 1} (UFM)", limite, ""])
param_data.append(
[
f"{p_code} Alíquota {i + 1}",
aliquota,
f"{aliquota * 100:.3f}%" if aliquota is not None and aliquota != 0 else "0.000%",
]
)
# --- SEÇÃO CORRIGIDA ---
# Processar a alíquota final (da última faixa).
# A lógica agora calcula o índice correto da alíquota final com base no número de faixas,
# em vez de sempre pegar o último elemento da lista de inputs.
# O índice da alíquota da última faixa (N) é (N-1)*2 + 1 nos rate_inputs.
last_aliquot_index = (num_bands - 1) * 2 + 1
# Tratamento especial se for o número máximo de faixas, pois a UI a estrutura de forma diferente.
if num_bands == MAX_BANDS:
last_aliquot_index = -1 # Pega o último elemento da lista
# Garante que o índice é válido e obtém o valor da alíquota final.
if last_aliquot_index < len(rate_inputs):
aliquota_final = rate_inputs[last_aliquot_index] if rate_inputs[last_aliquot_index] is not None else 0
else:
# Fallback caso a lista de inputs seja inesperadamente curta.
aliquota_final = rate_inputs[-1] if rate_inputs else 0
param_data.append(
[
f"{p_code} Alíquota {num_bands}",
aliquota_final,
f"{aliquota_final * 100:.3f}%" if aliquota_final is not None and aliquota_final != 0 else "0.000%",
]
)
# --- FIM DA SEÇÃO CORRIGIDA ---
param_data.append(["", "", ""])
input_ptr += num_inputs_per_cat
# Processar TAE e TAF
if input_ptr < len(all_parameters):
tae_aliq = all_parameters[input_ptr] if all_parameters[input_ptr] is not None else 0
else:
tae_aliq = 0
if input_ptr + 1 < len(all_parameters):
taf_aliq = all_parameters[input_ptr + 1] if all_parameters[input_ptr + 1] is not None else 0
else:
taf_aliq = 0
param_data.extend(
[
["TERRENOS ALÍQUOTA ESPECIAL (TAE)", "", ""],
[
"TAE Alíquota 1",
tae_aliq,
f"{tae_aliq * 100:.3f}%" if tae_aliq is not None and tae_aliq != 0 else "0.000%",
],
["", "", ""],
["TERRENOS ALÍQUOTA FIXA (TAF)", "", ""],
[
"TAF Alíquota 1",
taf_aliq,
f"{taf_aliq * 100:.3f}%" if taf_aliq is not None and taf_aliq != 0 else "0.000%",
],
]
)
df = pd.DataFrame(param_data, columns=["Parâmetro", "Valor", "Observação"])
return df
# --- Funções de formatação e criação de Excel (REVISADA E AMPLIADA) ---
def format_worksheet(worksheet, dataframe, currency_columns=None, percent_columns=None):
from openpyxl.styles import NamedStyle, Font, PatternFill, Alignment
from openpyxl.utils import get_column_letter
if currency_columns is None:
currency_columns = []
if percent_columns is None:
percent_columns = []
header_style = NamedStyle(
name="header",
font=Font(bold=True, color="FFFFFF"),
fill=PatternFill(start_color="366092", end_color="366092", fill_type="solid"),
alignment=Alignment(horizontal="center", vertical="center"),
)
# Apply header style and adjust column width
for col_num, column_title in enumerate(dataframe.columns, 1):
cell = worksheet.cell(row=1, column=col_num)
cell.style = header_style
# Auto-adjust column width
max_length = len(str(column_title))
for cell_in_col in worksheet[get_column_letter(col_num)]:
if cell_in_col.value:
max_length = max(max_length, len(str(cell_in_col.value)))
adjusted_width = max_length + 2
worksheet.column_dimensions[get_column_letter(col_num)].width = adjusted_width
# Apply number formatting
for col_name in currency_columns:
if col_name in dataframe.columns:
col_idx = dataframe.columns.get_loc(col_name) + 1
for row_num in range(2, len(dataframe) + 2):
cell = worksheet.cell(row=row_num, column=col_idx)
cell.number_format = 'R$ #,##0.00'
for col_name in percent_columns:
if col_name in dataframe.columns:
col_idx_letter = get_column_letter(dataframe.columns.get_loc(col_name) + 1)
for cell in worksheet[col_idx_letter]:
if cell.row > 1:
cell.number_format = '0.00%'
return worksheet
# --- Funções de formatação e criação de Excel (CORRIGIDA) ---
def create_excel_download(
df_projection_raw,
total_novo_str,
total_atual_trib_str,
total_atual_calc_str,
ufm_value,
diferencia_estacionamentos_flag,
diferencia_predios_flag,
trava_aumento_perc,
segregar_trava_flag,
universal_rules,
category_rules,
descontos_anuais,
all_parameters,
yearly_aggregates_raw,
detailed_by_category_raw,
detailed_by_value_ranges_raw,
df_enchente_analysis_raw,
variation_analysis_raw,
):
from openpyxl.utils import get_column_letter
output = io.BytesIO()
with pd.ExcelWriter(output, engine="openpyxl") as writer:
def sanitize_sheet_name(name):
return name.replace("/", "-").replace("\\", "-")[:31]
# 1. Parâmetros Utilizados (sem alteração)
params_data = create_parameters_sheet(
ufm_value, diferencia_estacionamentos_flag, diferencia_predios_flag,
trava_aumento_perc, segregar_trava_flag, universal_rules,
category_rules, descontos_anuais, all_parameters,
)
params_data.to_excel(writer, sheet_name="Parâmetros Utilizados", index=False)
ws_params = writer.sheets["Parâmetros Utilizados"]
format_worksheet(ws_params, params_data)
# 2. Totais Gerais (sem alteração)
if total_novo_str:
summary_data = {
"Descrição": ["Total IPTU Novo (Simulado Puro)", "Total IPTU Atual Tributado", "Total IPTU Atual Calculado"],
"Valor": [
float(total_novo_str.split(": ")[1].replace(",", "")),
float(total_atual_trib_str.split(": ")[1].replace(",", "")),
float(total_atual_calc_str.split(": ")[1].replace(",", "")),
],
}
summary_df = pd.DataFrame(summary_data)
summary_df.to_excel(writer, sheet_name="Totais Gerais", index=False)
ws_total = writer.sheets["Totais Gerais"]
format_worksheet(ws_total, summary_df, currency_columns=["Valor"])
# 3. Projeção com Trava (CORRIGIDO)
if df_projection_raw is not None and not df_projection_raw.empty:
df_export = df_projection_raw.copy()
currency_cols = ["Valor Total Arrecadado", "Crescimento Absoluto"]
percent_cols = ["Crescimento Relativo (%)"]
for col in percent_cols:
# CORREÇÃO: Converte para numérico antes de dividir
df_export[col] = pd.to_numeric(df_export[col], errors='coerce') / 100.0
df_export.to_excel(writer, sheet_name="Projeção com Trava", index=False)
ws = writer.sheets["Projeção com Trava"]
format_worksheet(ws, df_export, currency_columns=currency_cols, percent_columns=percent_cols)
# 4. Análise Enchente (CORRIGIDO)
if df_enchente_analysis_raw is not None and not df_enchente_analysis_raw.empty:
df_export = df_enchente_analysis_raw.copy()
currency_cols = ["IPTU Atual Atingidos (2025)", "Simulado Puro (Y_base)", "Simulado com Desconto (sem trava)", "IPTU Final Projetado", "Diferença Puro vs Desconto (R$)", "Redução Total (R$)"]
percent_cols = ["Diferença Puro vs Desconto (%)", "Redução Total (%)"]
for col in percent_cols:
# CORREÇÃO: Converte para numérico antes de dividir
df_export[col] = pd.to_numeric(df_export[col], errors='coerce') / 100.0
df_export.to_excel(writer, sheet_name="Análise Enchente", index=False)
ws = writer.sheets["Análise Enchente"]
format_worksheet(ws, df_export, currency_columns=currency_cols, percent_columns=percent_cols)
# 5. Resumos Agregados por Ano (CORRIGIDO)
if yearly_aggregates_raw:
for year, df_agg in yearly_aggregates_raw.items():
if df_agg is not None and not df_agg.empty:
df_export = df_agg.copy()
year_str = str(year).replace(" ", "_")
currency_cols = [col for col in df_export.columns if "Total" in col or "Diferença Absoluta" in col]
percent_cols = [col for col in df_export.columns if "%" in col]
for col in percent_cols:
# CORREÇÃO: Converte para numérico antes de dividir
df_export[col] = pd.to_numeric(df_export[col], errors='coerce') / 100.0
sheet_name = sanitize_sheet_name(f"Resumo Agregado {year_str}")
df_export.to_excel(writer, sheet_name=sheet_name, index=False)
ws = writer.sheets[sheet_name]
format_worksheet(ws, df_export, currency_columns=currency_cols, percent_columns=percent_cols)
# 6. Detalhado por Faixa de Alíquota (já estava correto)
if detailed_by_category_raw:
for category_name, df_category in detailed_by_category_raw.items():
if df_category is not None and not df_category.empty:
df_export = df_category.copy()
currency_cols = [col for col in df_export.columns if "Total " in col]
percent_cols = [col for col in df_export.columns if "%" in col]
for col in percent_cols:
df_export[col] = pd.to_numeric(df_export[col].astype(str).str.replace("%", ""), errors="coerce") / 100
sheet_name = sanitize_sheet_name(f"Faixas Alíq - {category_name}")
df_export.to_excel(writer, sheet_name=sheet_name, index=False)
ws = writer.sheets[sheet_name]
format_worksheet(ws, df_export, currency_columns=currency_cols, percent_columns=percent_cols)
# 7. Detalhado por Valor Venal (já estava correto)
if detailed_by_value_ranges_raw:
for category_name, df_category in detailed_by_value_ranges_raw.items():
if df_category is not None and not df_category.empty:
df_export = df_category.copy()
currency_cols = [col for col in df_export.columns if "Total " in col]
percent_cols = [col for col in df_export.columns if "%" in col]
for col in percent_cols:
df_export[col] = pd.to_numeric(df_export[col].astype(str).str.replace("%", ""), errors="coerce") / 100
sheet_name = sanitize_sheet_name(f"Faixas VV - {category_name}")
df_export.to_excel(writer, sheet_name=sheet_name, index=False)
ws = writer.sheets[sheet_name]
format_worksheet(ws, df_export, currency_columns=currency_cols, percent_columns=percent_cols)
# 8. Análise de Variação (já estava correto)
if variation_analysis_raw:
for category_name, df_category in variation_analysis_raw.items():
if df_category is not None and not df_category.empty:
df_export = df_category.copy()
currency_cols = [col for col in df_export.columns if "Total" in col or "Diferença" in col]
percent_cols = [col for col in df_export.columns if "%" in col]
for col in percent_cols:
df_export[col] = pd.to_numeric(df_export[col].astype(str).str.replace("%", ""), errors="coerce") / 100
sheet_name = sanitize_sheet_name(f"Variação - {category_name}")
df_export.to_excel(writer, sheet_name=sheet_name, index=False)
ws = writer.sheets[sheet_name]
format_worksheet(ws, df_export, currency_columns=currency_cols, percent_columns=percent_cols)
output.seek(0)
return output
# --- Otimização 2: Cálculo de Imposto Vetorizado ---
def calculate_progressive_tax_vectorized(df_input, vv_column_name, iptu_rules):
df = df_input.copy()
total_tax_col = pd.Series(0, index=df.index, dtype=np.float64)
for regra_nome, regra in iptu_rules.items():
filtro = regra["rule"]
if not filtro.any():
continue
faixas = regra["ranges"]
if not faixas:
continue
current_vv = df.loc[filtro, vv_column_name]
limites = [lim[0] for lim in faixas]
aliquotas = [lim[1] for lim in faixas]
parcelas_a_deduzir = [0.0]
for i in range(1, len(limites)):
deducao = parcelas_a_deduzir[i - 1] + (
limites[i - 1] * (aliquotas[i] - aliquotas[i - 1])
)
parcelas_a_deduzir.append(deducao)
conditions = []
choices = []
for i in range(len(limites)):
limite_inferior = limites[i - 1] if i > 0 else 0
limite_superior = limites[i]
conditions.append(
(current_vv > limite_inferior) & (current_vv <= limite_superior)
)
choices.append(current_vv * aliquotas[i] - parcelas_a_deduzir[i])
imposto_calculado = np.select(conditions, choices, default=0)
total_tax_col.loc[filtro] = imposto_calculado
df["IPTU_NOVO_TOTAL"] = total_tax_col
return df
def create_flood_analysis_graphs_plotly(df_analysis, years_to_plot):
figs = []
if not years_to_plot:
return []
data_base = df_analysis[
(
df_analysis["SITUACAO_ALAGAMENTO"].isin(
["Direto", "Indireto", "Indeterminado"]
)
)
& (df_analysis["VLR_IMPOSTO_iptu"] > 1)
].copy()
if data_base.empty:
return []
colors = px.colors.qualitative.Vivid
for i, year in enumerate(years_to_plot):
col_name = f"IPTU_PROJETADO_{year}"
if col_name not in data_base.columns:
continue
data_year = data_base[["VLR_IMPOSTO_iptu", col_name]].copy().dropna()
if data_year.empty:
continue
color_for_year = colors[i % len(colors)]
data_year["ratio"] = data_year[col_name] / data_year["VLR_IMPOSTO_iptu"]
fig_hist = go.Figure()
fig_hist.add_trace(
go.Histogram(
x=data_year["ratio"],
name="Inscrições",
marker_color=color_for_year,
xbins=dict(start=0.0, end=2.0, size=0.05),
)
)
fig_hist.update_layout(
title_text=f"Ano {year}: Relação IPTU Projetado / Atual (Atingidos)",
xaxis_title_text="Ratio (Novo IPTU / IPTU Atual)",
yaxis_title_text="Nº de Inscrições",
title_font_size=16,
font_size=12,
bargap=0.05,
plot_bgcolor="white",
)
fig_hist.add_vline(
x=1, line_dash="dash", line_color="red", annotation_text="Sem Mudança"
)
figs.append(fig_hist)
diff = data_year[col_name] - data_year["VLR_IMPOSTO_iptu"]
aumentou = int((diff > 0.01).sum())
diminuiu = int((diff < -0.01).sum())
manteve = int(len(diff) - aumentou - diminuiu)
diminuiu_ou_manteve = diminuiu + manteve
total_inscricoes = aumentou + diminuiu_ou_manteve
if total_inscricoes > 0:
percent_aumentou = (aumentou / total_inscricoes) * 100
percent_diminuiu_manteve = (diminuiu_ou_manteve / total_inscricoes) * 100
fig_bar = go.Figure()
fig_bar.add_trace(
go.Bar(
y=[""],
x=[percent_diminuiu_manteve],
name=f"Diminuiu ou Manteve ({diminuiu_ou_manteve:,})",
orientation="h",
marker=dict(color="royalblue", line=dict(color="black", width=1.5)),
text=f"<b>{diminuiu_ou_manteve:,}</b><br>({percent_diminuiu_manteve:.1f}%)",
textposition="inside",
insidetextanchor="middle",
textfont=dict(color="black", size=16, family="Arial"),
)
)
fig_bar.add_trace(
go.Bar(
y=[""],
x=[percent_aumentou],
name=f"Aumentou ({aumentou:,})",
orientation="h",
marker=dict(
color="lightcoral", line=dict(color="black", width=1.5)
),
text=f"<b>{aumentou:,}</b><br>({percent_aumentou:.1f}%)",
textposition="inside",
insidetextanchor="middle",
textfont=dict(color="black", size=16, family="Arial"),
)
)
fig_bar.update_layout(
title={
"text": f"<b>Atingidos - Ano {year}</b><br>Proporção de Inscrições por Variação do IPTU",
"y": 0.95,
"x": 0.5,
"xanchor": "center",
"yanchor": "top",
"font": {"size": 18, "family": "Arial"},
},
barmode="stack",
xaxis=dict(
range=[0, 100],
ticksuffix="%",
title=dict(text="Proporção do Total de Inscrições", standoff=35),
),
yaxis=dict(showticklabels=False),
legend=dict(
orientation="h",
yanchor="bottom",
y=-0.25,
xanchor="center",
x=0.5,
traceorder="normal",
),
plot_bgcolor="rgba(0,0,0,0)",
paper_bgcolor="rgba(0,0,0,0)",
margin=dict(t=80, b=140),
height=400,
font=dict(family="Arial"),
)
figs.append(fig_bar)
return figs
# --- Função create_active_ranges (CORRIGIDA) ---
def create_active_ranges(rate_inputs, num_bands, ufm_value):
"""
Cria a lista de faixas de alíquota ativas ([limite_em_reais, aliquota])
com base nos inputs do usuário, de forma robusta.
"""
# Retorna uma lista vazia se não houver bandas ou inputs a processar.
if not num_bands or not rate_inputs:
return []
active_ranges = []
# Itera sobre o número de faixas que o usuário configurou, exceto a última.
# A última faixa é tratada separadamente, pois seu limite é infinito.
for i in range(num_bands - 1):
# Calcula o índice do limite e da alíquota na lista de inputs.
# Ex: para a primeira faixa (i=0), usa o limite no índice 0 e a alíquota no índice 1.
limite_idx = i * 2
aliquota_idx = i * 2 + 1
# Obtém o valor do limite, usando 0 como padrão se o campo estiver vazio.
limite_ufm = (
rate_inputs[limite_idx] if rate_inputs[limite_idx] is not None else 0
)
# Obtém o valor da alíquota, usando 0 como padrão se o campo estiver vazio.
aliquota = (
rate_inputs[aliquota_idx] if rate_inputs[aliquota_idx] is not None else 0
)
active_ranges.append([limite_ufm * ufm_value, aliquota])
# A última faixa sempre tem um limite infinito.
# Precisamos encontrar a alíquota correta para esta última faixa.
# O índice da alíquota da última faixa (N) é (N-1)*2 + 1.
last_aliquot_index = (num_bands - 1) * 2 + 1
# O layout dos inputs no código original tem uma exceção: a alíquota da 8ª faixa
# é o último item da lista. A lógica abaixo lida com isso.
if num_bands == MAX_BANDS:
last_aliquot_index = -1 # O último item da lista de inputs
# Obtém a alíquota final, usando 0 como padrão se o campo estiver vazio.
final_aliquot = (
rate_inputs[last_aliquot_index]
if rate_inputs[last_aliquot_index] is not None
else 0
)
active_ranges.append([np.inf, final_aliquot])
return active_ranges
# --- NOVA FUNÇÃO DE VALIDAÇÃO ---
def validate_band_limits(diferencia_est, diferencia_pred, *all_rates):
"""
Valida os limites das faixas de alíquota em tempo real para garantir a consistência.
Desativa o botão de processamento se encontrar um erro.
"""
try:
num_inputs_per_cat = 1 + (MAX_BANDS - 1) * 2 + 1
# Mapeamento de prefixos para informações de categoria para mensagens de erro amigáveis
prefix_map = {
"PG": {"idx": 0, "name": "Predial Geral"},
"PR": {"idx": 1, "name": "Predial Residencial"},
"PNR": {"idx": 2, "name": "Predial Não Residencial"},
"EG": {"idx": 3, "name": "Estacionamentos Geral"},
"ER": {"idx": 4, "name": "Estacionamentos Residenciais"},
"ENR": {"idx": 5, "name": "Estacionamentos Não Residenciais"},
"T1DF": {"idx": 6, "name": "Terrenos 1a DF"},
"T2DF": {"idx": 7, "name": "Terrenos 2a DF"},
"T3DF": {"idx": 8, "name": "Terrenos 3a DF"},
}
# Determina quais categorias estão ativas com base nos checkboxes
active_prefixes = []
if diferencia_pred:
active_prefixes.extend(["PR", "PNR"])
else:
active_prefixes.append("PG")
if diferencia_est:
active_prefixes.extend(["ER", "ENR"])
else:
active_prefixes.append("EG")
active_prefixes.extend(["T1DF", "T2DF", "T3DF"])
for prefix in active_prefixes:
cat_info = prefix_map[prefix]
cat_index = cat_info["idx"]
cat_name = cat_info["name"]
cat_inputs = all_rates[
cat_index * num_inputs_per_cat : (cat_index + 1) * num_inputs_per_cat
]
# Pula a validação se o número de faixas não estiver definido (pode ser None no início)
if cat_inputs[0] is None or str(cat_inputs[0]) == "":
continue
num_bands = int(cat_inputs[0])
last_limit = 0.0
for i in range(num_bands - 1):
limit_input = cat_inputs[1 + i * 2]
# Pula a validação se o campo do limite estiver vazio
if limit_input is None or str(limit_input) == "":
continue
current_limit = float(limit_input)
# A validação principal: o limite final de uma faixa deve ser maior que o seu início (que é o limite da faixa anterior)
if current_limit <= last_limit:
error_msg = f"Erro em '{cat_name}', Faixa {i + 1}: O Fim do Intervalo ({current_limit}) deve ser maior que o seu Início ({last_limit})."
return gr.update(value=error_msg, visible=True), gr.update(
interactive=False
)
last_limit = current_limit
# Se o loop terminar sem erros, oculta a mensagem de erro e ativa o botão
return gr.update(value="", visible=False), gr.update(interactive=True)
except (ValueError, TypeError):
# Captura erros se o usuário digitar um texto não numérico
return gr.update(
value="Erro: valor inválido inserido em uma das faixas.", visible=True
), gr.update(interactive=False)
except Exception as e:
# Captura outros erros inesperados durante a validação
return gr.update(
value=f"Erro inesperado na validação: {e}", visible=True
), gr.update(interactive=False)
# --- NOVA FUNÇÃO AUXILIAR PARA ANÁLISE DA ENCHENTE ---
def generate_flood_analysis(
the_merge, df_analysis, base_filter, anos_projecao, descontos_anuais
):
"""
Gera um DataFrame de análise de enchente para um determinado filtro.
Retorna um DataFrame formatado para exibição e um DataFrame com dados brutos.
"""
# Retorna DFs vazios se o filtro não encontrar nenhum imóvel
if not base_filter.any():
return pd.DataFrame(), pd.DataFrame()
# Cálculos de totais para o subconjunto de imóveis definido pelo 'base_filter'
iptu_2025_enchente_sum = the_merge.loc[base_filter, "VLR_IMPOSTO_iptu"].sum()
y_base_puro_enchente_sum = df_analysis.loc[base_filter, "IPTU_CENARIO_PURO"].sum()
dados_por_ano = []
# Garante que a análise sempre inclua os anos com desconto definido
anos_analise = sorted(list(set(anos_projecao + list(descontos_anuais.keys()))))
for year in anos_analise:
desconto_percentual = descontos_anuais.get(year, 0)
fator_desconto = 1 - (desconto_percentual / 100.0)
# Calcula o valor com desconto, antes da trava
cenario_com_desconto = y_base_puro_enchente_sum * fator_desconto
# Determina o valor final projetado (com travas)
# Se o ano não tiver projeção (ex: 2028, 2029 sem regra de transição), o valor final é o próprio valor com desconto
if f"IPTU_PROJETADO_{year}" in df_analysis.columns:
cenario_final = df_analysis.loc[base_filter, f"IPTU_PROJETADO_{year}"].sum()
else:
cenario_final = cenario_com_desconto
dados_por_ano.append(
{
"Exercício": year,
"Simulado Puro (Y_base)": y_base_puro_enchente_sum,
"Simulado com Desconto (sem trava)": cenario_com_desconto,
"IPTU Final Projetado": cenario_final,
}
)
df_raw = pd.DataFrame(dados_por_ano)
if df_raw.empty:
return pd.DataFrame(), pd.DataFrame()
# Adiciona colunas calculadas
df_raw["IPTU Atual Atingidos (2025)"] = iptu_2025_enchente_sum
df_raw["Diferença Puro vs Desconto (R$)"] = (
df_raw["Simulado Puro (Y_base)"] - df_raw["Simulado com Desconto (sem trava)"]
)
df_raw["Diferença Puro vs Desconto (%)"] = (
df_raw["Diferença Puro vs Desconto (R$)"]
/ df_raw["Simulado Puro (Y_base)"]
* 100
).fillna(0)
df_raw["Redução Total (R$)"] = (
df_raw["Simulado Puro (Y_base)"] - df_raw["IPTU Final Projetado"]
)
df_raw["Redução Total (%)"] = (
df_raw["Redução Total (R$)"] / df_raw["Simulado Puro (Y_base)"] * 100
).fillna(0)
# Ordena e formata as colunas
cols_order = [
"Exercício",
"IPTU Atual Atingidos (2025)",
"Simulado Puro (Y_base)",
"Simulado com Desconto (sem trava)",
"IPTU Final Projetado",
"Diferença Puro vs Desconto (R$)",
"Diferença Puro vs Desconto (%)",
"Redução Total (R$)",
"Redução Total (%)",
]
df_raw = df_raw[cols_order]
df_display = df_raw.copy()
currency_cols = [
"IPTU Atual Atingidos (2025)",
"Simulado Puro (Y_base)",
"Simulado com Desconto (sem trava)",
"IPTU Final Projetado",
"Diferença Puro vs Desconto (R$)",
"Redução Total (R$)",
]
percent_cols = ["Diferença Puro vs Desconto (%)", "Redução Total (%)"]
for col in currency_cols:
df_display[col] = df_display[col].apply(lambda x: f"R$ {x:,.2f}")
for col in percent_cols:
df_display[col] = df_display[col].apply(lambda x: f"{x:.2f}%")
return df_display, df_raw
# --- Função Principal de Processamento (MODIFICADA) ---
def process_iptu_data(
ufm_value_input,
diferencia_estacionamentos_flag,
diferencia_predios_flag,
trava_aumento_perc,
segregar_piso_flag,
segregar_trava_flag,
value_range_interval,
aggregate_from_vv_value,
show_projection,
show_aggregate,
show_detailed_bands,
show_detailed_vv,
show_totals,
show_flood_analysis,
show_flood_graphs,
show_excel,
universal_transition_rules,
transition_rules_by_category,
desconto_2026,
desconto_2027,
desconto_2028,
desconto_2029,
*all_rate_inputs,
):
num_inputs_per_cat = 1 + (MAX_BANDS - 1) * 2 + 1
pg_num_bands, *pg_rates = all_rate_inputs[0:num_inputs_per_cat]
pr_num_bands, *pr_rates = all_rate_inputs[
num_inputs_per_cat * 1 : num_inputs_per_cat * 2
]
pnr_num_bands, *pnr_rates = all_rate_inputs[
num_inputs_per_cat * 2 : num_inputs_per_cat * 3
]
eg_num_bands, *eg_rates = all_rate_inputs[
num_inputs_per_cat * 3 : num_inputs_per_cat * 4
]
er_num_bands, *er_rates = all_rate_inputs[
num_inputs_per_cat * 4 : num_inputs_per_cat * 5
]
enr_num_bands, *enr_rates = all_rate_inputs[
num_inputs_per_cat * 5 : num_inputs_per_cat * 6
]
t1df_num_bands, *t1df_rates = all_rate_inputs[
num_inputs_per_cat * 6 : num_inputs_per_cat * 7
]
t2df_num_bands, *t2df_rates = all_rate_inputs[
num_inputs_per_cat * 7 : num_inputs_per_cat * 8
]
t3df_num_bands, *t3df_rates = all_rate_inputs[
num_inputs_per_cat * 8 : num_inputs_per_cat * 9
]
tae_alq1, taf_alq1 = all_rate_inputs[num_inputs_per_cat * 9 :]
descontos_anuais = {
2026: desconto_2026,
2027: desconto_2027,
2028: desconto_2028,
2029: desconto_2029,
}
views_to_process = []
if show_projection:
views_to_process.append("Projeção de Arrecadação")
if show_aggregate:
views_to_process.append("Resumo Agregado")
if show_detailed_bands:
views_to_process.append("Resumo Detalhado por Faixa")
if show_detailed_vv:
views_to_process.append("Resumo Detalhado por Valor Venal")
if show_totals:
views_to_process.append("Totais Gerais")
if show_flood_analysis:
views_to_process.append("Análise da Enchente")
if show_flood_graphs:
views_to_process.append("Gráficos da Enchente")
if show_excel:
views_to_process.append("Gerar Arquivo Excel")
empty_df = pd.DataFrame()
empty_dict = {}
visibility_updates = [
gr.update(visible=v)
for v in [
show_projection,
show_aggregate,
show_detailed_bands,
show_detailed_vv,
show_totals,
show_flood_analysis,
show_excel,
show_flood_graphs,
]
]
MAX_PLOTS = 30
empty_plots = [gr.update(value=None, visible=False)] * MAX_PLOTS
num_cat_outputs = 13 # Number of category-based outputs
error_return = (
"Erro no processamento.",
empty_df, "", "", "", # Projection, Totals
empty_df, empty_df, empty_df, empty_df, empty_df, empty_df, empty_df, # Flood analysis
empty_dict, gr.update(choices=[], value=None), empty_df, # Aggregate
gr.Textbox(visible=False), gr.DownloadButton(visible=False), # Excel
) + (gr.update(value=empty_df),) * num_cat_outputs \
+ (gr.update(value=empty_df),) * num_cat_outputs \
+ tuple(empty_plots) \
+ (None, None, None, None, None) \
+ tuple(visibility_updates) \
+ (gr.update(choices=[], value=None), ) # Variation analysis year dropdown
if not views_to_process:
empty_return = (
"Nenhuma visão selecionada para processamento.",
empty_df, "", "", "", # Projection, Totals
empty_df, empty_df, empty_df, empty_df, empty_df, empty_df, empty_df, # Flood analysis
empty_dict, gr.update(choices=[], value=None), empty_df, # Aggregate
gr.Textbox(visible=False), gr.DownloadButton(visible=False), # Excel
) + (gr.update(value=empty_df),) * num_cat_outputs \
+ (gr.update(value=empty_df),) * num_cat_outputs \
+ tuple(empty_plots) \
+ (None, None, None, None, None) \
+ tuple([gr.update(visible=False)] * 8) \
+ (gr.update(choices=[], value=None), )
return empty_return
if not os.path.exists(CSV_FILE_PATH):
return (f"Erro: Arquivo '{CSV_FILE_PATH}' não encontrado.",) + error_return[1:]
if THE_MERGE_GLOBAL is None:
return (
"Erro crítico: Os dados iniciais não puderam ser carregados. Verifique o console.",
) + error_return[1:]
the_merge = THE_MERGE_GLOBAL.copy()
UFM_VALUE = ufm_value_input
nao_tributados = FILTERS_GLOBAL["nao_tributados"]
isen_prop_loc = FILTERS_GLOBAL["isen_prop_loc"]
isen_hab_pop = FILTERS_GLOBAL["isen_hab_pop"]
prediais_base = FILTERS_GLOBAL["prediais_base"]
prediais_residenciais = FILTERS_GLOBAL["prediais_residenciais"]
prediais_nao_residenciais = FILTERS_GLOBAL["prediais_nao_residenciais"]
prediais_residenciais_tributados = FILTERS_GLOBAL[
"prediais_residenciais_tributados"
]
prediais_nao_residenciais_tributados = FILTERS_GLOBAL[
"prediais_nao_residenciais_tributados"
]
prediais_geral_tributados = FILTERS_GLOBAL["prediais_geral_tributados"]
prediais_residenciais_nao_tributados = FILTERS_GLOBAL[
"prediais_residenciais_nao_tributados"
]
prediais_nao_residenciais_nao_tributados = FILTERS_GLOBAL[
"prediais_nao_residenciais_nao_tributados"
]
prediais_geral_nao_tributados = FILTERS_GLOBAL["prediais_geral_nao_tributados"]
estacionamentos_geral = FILTERS_GLOBAL["estacionamentos_geral"]
estacionamentos_residenciais = FILTERS_GLOBAL["estacionamentos_residenciais"]
estacionamentos_nao_residenciais = FILTERS_GLOBAL[
"estacionamentos_nao_residenciais"
]
estacionamentos_residenciais_tributados = FILTERS_GLOBAL[
"estacionamentos_residenciais_tributados"
]
estacionamentos_nao_residenciais_tributados = FILTERS_GLOBAL[
"estacionamentos_nao_residenciais_tributados"
]
estacionamentos_geral_tributados = FILTERS_GLOBAL[
"estacionamentos_geral_tributados"
]
estacionamentos_residenciais_nao_tributados = FILTERS_GLOBAL[
"estacionamentos_residenciais_nao_tributados"
]
estacionamentos_nao_residenciais_nao_tributados = FILTERS_GLOBAL[
"estacionamentos_nao_residenciais_nao_tributados"
]
estacionamentos_geral_nao_tributados = FILTERS_GLOBAL[
"estacionamentos_geral_nao_tributados"
]
terrenos_1df_base = FILTERS_GLOBAL["terrenos_1df_base"]
terrenos_2df_base = FILTERS_GLOBAL["terrenos_2df_base"]
terrenos_3df_base = FILTERS_GLOBAL["terrenos_3df_base"]
terrenos_1df_tributados = FILTERS_GLOBAL["terrenos_1df_tributados"]
terrenos_2df_tributados = FILTERS_GLOBAL["terrenos_2df_tributados"]
terrenos_3df_tributados = FILTERS_GLOBAL["terrenos_3df_tributados"]
terrenos_1df_nao_tributados = FILTERS_GLOBAL["terrenos_1df_nao_tributados"]
terrenos_2df_nao_tributados = FILTERS_GLOBAL["terrenos_2df_nao_tributados"]
terrenos_3df_nao_tributados = FILTERS_GLOBAL["terrenos_3df_nao_tributados"]
regra_tae_base = FILTERS_GLOBAL["regra_tae_base"]
regra_taf_base = FILTERS_GLOBAL["regra_taf_base"]
terrenos_aliqesp = FILTERS_GLOBAL["terrenos_aliqesp"]
terrenos_aliqfix = FILTERS_GLOBAL["terrenos_aliqfix"]
category_mapping = {}
if diferencia_predios_flag:
category_mapping.update(
{
"Predial Residencial": FILTERS_GLOBAL[
"prediais_residenciais_tributados"
]
| isen_prop_loc
| isen_hab_pop,
"Predial Não Residencial": prediais_nao_residenciais_tributados,
}
)
else:
category_mapping["Predial Geral"] = (
prediais_geral_tributados
| prediais_residenciais_tributados
| prediais_nao_residenciais_tributados
| isen_prop_loc
| isen_hab_pop
)
if diferencia_estacionamentos_flag:
category_mapping.update(
{
"Estacionamentos Residenciais": estacionamentos_residenciais_tributados,
"Estacionamentos Não Residenciais": estacionamentos_nao_residenciais_tributados,
}
)
else:
category_mapping["Estacionamentos Geral"] = (
estacionamentos_geral_tributados
| estacionamentos_residenciais_tributados
| estacionamentos_nao_residenciais_tributados
)
category_mapping.update(
{
"Terrenos 1ª DF": terrenos_1df_tributados,
"Terrenos 2ª DF": terrenos_2df_tributados,
"Terrenos 3ª DF": terrenos_3df_tributados,
"Terrenos Alíquota Especial": terrenos_aliqesp,
"Terrenos Alíquota Fixa": terrenos_aliqfix,
}
)
# --- ETAPA 1: Cálculo do IPTU Puro (sem descontos) ---
IPTU_RULES = {
"PG": {
"name": "Predial Sem Dif. por Uso",
"rule": prediais_geral_tributados,
"ranges": create_active_ranges(pg_rates, pg_num_bands, UFM_VALUE),
},
"PR": {
"name": "Predial Residencial",
"rule": prediais_residenciais_tributados,
"ranges": create_active_ranges(pr_rates, pr_num_bands, UFM_VALUE),
},
"PNR": {
"name": "Predial Não Residencial",
"rule": prediais_nao_residenciais_tributados,
"ranges": create_active_ranges(pnr_rates, pnr_num_bands, UFM_VALUE),
},
"EG": {
"name": "Estacionamentos Sem Dif. por Uso",
"rule": estacionamentos_geral_tributados,
"ranges": create_active_ranges(eg_rates, eg_num_bands, UFM_VALUE),
},
"ER": {
"name": "Estacionamentos Residenciais",
"rule": estacionamentos_residenciais_tributados,
"ranges": create_active_ranges(er_rates, er_num_bands, UFM_VALUE),
},
"ENR": {
"name": "Estacionamentos Não Residenciais",
"rule": estacionamentos_nao_residenciais_tributados,
"ranges": create_active_ranges(enr_rates, enr_num_bands, UFM_VALUE),
},
"T1DF": {
"name": "Terrenos na 1a DF",
"rule": terrenos_1df_tributados,
"ranges": create_active_ranges(t1df_rates, t1df_num_bands, UFM_VALUE),
},
"T2DF": {
"name": "Terrenos na 2a DF",
"rule": terrenos_2df_tributados,
"ranges": create_active_ranges(t2df_rates, t2df_num_bands, UFM_VALUE),
},
"T3DF": {
"name": "Terrenos na 3a DF",
"rule": terrenos_3df_tributados,
"ranges": create_active_ranges(t3df_rates, t3df_num_bands, UFM_VALUE),
},
"PGI": {
"name": "Predial Sem Dif. por Uso Isentos",
"rule": prediais_geral_nao_tributados,
"ranges": [[np.inf, 0.0]],
},
"PRI": {
"name": "Predial Residencial Isentos",
"rule": prediais_residenciais_nao_tributados,
"ranges": [[np.inf, 0.0]],
},
"PNRI": {
"name": "Predial Não Residencial Isentos",
"rule": prediais_nao_residenciais_nao_tributados,
"ranges": [[np.inf, 0.0]],
},
"EGI": {
"name": "Estacionamentos Sem Dif. por Uso Isentos",
"rule": estacionamentos_geral_nao_tributados,
"ranges": [[np.inf, 0.0]],
},
"ERI": {
"name": "Estacionamentos Residenciais Isentos",
"rule": estacionamentos_residenciais_nao_tributados,
"ranges": [[np.inf, 0.0]],
},
"ENRI": {
"name": "Estacionamentos Não Residenciais Isentos",
"rule": estacionamentos_nao_residenciais_nao_tributados,
"ranges": [[np.inf, 0.0]],
},
"T1DFI": {
"name": "Terrenos na 1a DF Isentos",
"rule": terrenos_1df_nao_tributados,
"ranges": [[np.inf, 0.0]],
},
"T2DFI": {
"name": "Terrenos na 2a DF Isentos",
"rule": terrenos_2df_nao_tributados,
"ranges": [[np.inf, 0.0]],
},
"T3DFI": {
"name": "Terrenos na 3a DF Isentos",
"rule": terrenos_3df_nao_tributados,
"ranges": [[np.inf, 0.0]],
},
"TAE": {
"name": "Terrenos Aliq. Esp.",
"rule": terrenos_aliqesp,
"ranges": [[np.inf, tae_alq1]],
},
"TAF": {
"name": "Terrenos Aliq. Fix.",
"rule": terrenos_aliqfix,
"ranges": [[np.inf, taf_alq1]],
},
}
if diferencia_estacionamentos_flag:
del IPTU_RULES["EG"], IPTU_RULES["EGI"]
else:
del IPTU_RULES["ER"], IPTU_RULES["ERI"], IPTU_RULES["ENR"], IPTU_RULES["ENRI"]
if diferencia_predios_flag:
del IPTU_RULES["PG"], IPTU_RULES["PGI"]
else:
del IPTU_RULES["PR"], IPTU_RULES["PRI"], IPTU_RULES["PNR"], IPTU_RULES["PNRI"]
reference_rule_key = "PR" if diferencia_predios_flag else "PG"
if reference_rule_key in IPTU_RULES:
reference_rule = IPTU_RULES[reference_rule_key]
prpl_ranges = (
[[100000 * UFM_VALUE, 0.0]]
+ [r for r in reference_rule["ranges"] if r[0] > 100000 * UFM_VALUE]
if reference_rule["ranges"]
and reference_rule["ranges"][0][0] < 100000 * UFM_VALUE
else reference_rule["ranges"].copy()
)
IPTU_RULES["PRPL"] = {
"name": "Predial Residencial - Aposentado, Inativo, Pensionista, Deficiente",
"rule": isen_prop_loc,
"ranges": prpl_ranges,
}
prph_ranges = (
[[55000 * UFM_VALUE, 0.0]]
+ [r for r in reference_rule["ranges"] if r[0] > 55000 * UFM_VALUE]
if reference_rule["ranges"]
and reference_rule["ranges"][0][0] < 55000 * UFM_VALUE
else reference_rule["ranges"].copy()
)
IPTU_RULES["PRHP"] = {
"name": "Predial Residencial - Habitações Populares",
"rule": isen_hab_pop,
"ranges": prph_ranges,
}
df_resumo_final = calculate_progressive_tax_vectorized(
the_merge, "valor_venal_calculo", IPTU_RULES
)
IPTU_RULES_FINAL = IPTU_RULES.copy()
df_analysis = the_merge[
[
"VLR_IMPOSTO_iptu",
"VLR_IMPOSTO_CALCULADO_iptu",
"SITUACAO_ALAGAMENTO",
"PERC_ATING",
"valor_venal_calculo",
]
].copy()
df_analysis["IPTU_CENARIO_PURO"] = df_resumo_final["IPTU_NOVO_TOTAL"]
# --- ETAPA 2: Aplicação de Descontos e Travas de Aumento ---
X = df_analysis["VLR_IMPOSTO_iptu"].copy().fillna(0)
Y_puro = df_analysis["IPTU_CENARIO_PURO"].copy().fillna(0)
filtro_enchente = the_merge["SITUACAO_ALAGAMENTO"].isin(
["Direto", "Indireto", "Indeterminado"]
)
filtro_enchente_predial = filtro_enchente & prediais_base
# NOVA COLUNA: IPTU Projetado Puro com Desconto (para "Total Projeto" e "Sem Travas")
Y_puro_com_desconto = Y_puro.copy()
primeiro_ano_desconto = descontos_anuais.get(2026, 0) # Assumindo 2026 como base
if primeiro_ano_desconto > 0:
fator_desc_primeiro_ano = 1 - (primeiro_ano_desconto / 100.0)
Y_puro_com_desconto.loc[filtro_enchente_predial] *= fator_desc_primeiro_ano
df_analysis["IPTU_PURO_C_DESC"] = Y_puro_com_desconto
anos_projecao = (
sorted(
list(
set(
rule["year"]
for rules in transition_rules_by_category.values()
for rule in rules
)
)
)
if segregar_trava_flag
else sorted(list(set(rule["year"] for rule in universal_transition_rules)))
)
for year in anos_projecao:
# Começa com o valor puro do cenário
Y_ano_corrente = Y_puro.copy()
# Aplica o desconto de enchente, se houver para o ano
desconto_percentual = descontos_anuais.get(year, 0)
if desconto_percentual > 0:
fator_desconto = 1 - (desconto_percentual / 100.0)
Y_ano_corrente.loc[filtro_enchente_predial] *= fator_desconto
# Aplica a trava de aumento sobre o valor já com desconto
with np.errstate(divide="ignore", invalid="ignore"):
aumento_relativo = np.nan_to_num((Y_ano_corrente - X) / X, posinf=np.inf)
trava_ativada = aumento_relativo >= (trava_aumento_perc / 100.0)
resultado_final = Y_ano_corrente.copy()
if trava_ativada.any():
if segregar_trava_flag:
conditions, factors = [], []
for cat_name, cat_filter in category_mapping.items():
year_rule = next(
(
r
for r in transition_rules_by_category.get(cat_name, [])
if r["year"] == year
),
None,
)
if year_rule:
conditions.append(cat_filter)
factors.append(year_rule["factor"])
fator_serie = pd.Series(
np.select(conditions, factors, default=1.0), index=the_merge.index
)
else:
year_rule = next(
(r for r in universal_transition_rules if r["year"] == year), None
)
fator_universal = year_rule["factor"] if year_rule else 1.0
fator_serie = pd.Series(fator_universal, index=the_merge.index)
indices_com_trava = trava_ativada
if indices_com_trava.any():
X_trava = X[indices_com_trava].copy()
Y_trava = Y_ano_corrente[indices_com_trava].copy()
fator_trava = fator_serie[indices_com_trava].copy()
valor_com_trava_calculado = X_trava + fator_trava * (Y_trava - X_trava)
resultado_final.loc[indices_com_trava] = valor_com_trava_calculado
df_analysis[f"IPTU_PROJETADO_{year}"] = resultado_final.copy()
the_merge = the_merge.join(df_analysis.filter(like="IPTU_"))
# --- ETAPA 3: Geração de Visões e Resultados ---
(
df_projection_display,
total_novo_str,
total_atual_trib_str,
total_atual_calc_str,
) = empty_df, "", "", ""
(
df_enchente_analysis_display,
df_enchente_res_display,
df_enchente_non_res_display,
df_enchente_est_res_display,
df_enchente_est_non_res_display,
df_enchente_predial_geral_display,
df_enchente_est_geral_display,
) = empty_df, empty_df, empty_df, empty_df, empty_df, empty_df, empty_df
flood_plot_figs = []
yearly_aggregates, yearly_aggregates_raw, agg_dropdown_update, first_year_agg_df = (
empty_dict,
empty_dict,
gr.update(choices=[], value=None),
empty_df,
)
(
detailed_by_category,
detailed_by_category_raw,
detailed_by_value_ranges,
detailed_by_value_ranges_raw
) = ({}, {}, {}, {})
excel_file_buffer, df_projection_raw, df_enchente_analysis_raw_data = (
None,
empty_df,
empty_df,
)
if "Projeção de Arrecadação" in views_to_process:
atual_total = X.sum()
valores_projecao = [atual_total]
# O valor "Sem Trava" agora é o IPTU_PURO_C_DESC
Y_puro_com_desconto_sum = Y_puro_com_desconto.sum()
valores_projecao.append(Y_puro_com_desconto_sum)
for year in anos_projecao:
valores_projecao.append(df_analysis[f"IPTU_PROJETADO_{year}"].sum())
anos_label = [
"Arrecadação Atual (Tributado)",
"Projetado (Puro, c/ Desc.)",
] + [f"Projeção {year}" for year in anos_projecao]
crescimento_absoluto = [v - atual_total for v in valores_projecao]
crescimento_relativo = [
(v / atual_total - 1) * 100 if atual_total > 0 else 0
for v in valores_projecao
]
df_projection_raw = pd.DataFrame(
{
"Ano": anos_label,
"Valor Total Arrecadado": valores_projecao,
"Crescimento Absoluto": crescimento_absoluto,
"Crescimento Relativo (%)": crescimento_relativo,
}
)
df_projection_display = df_projection_raw.copy()
df_projection_display["Valor Total Arrecadado"] = df_projection_display[
"Valor Total Arrecadado"
].apply(lambda x: f"R$ {x:,.2f}")
df_projection_display["Crescimento Absoluto"] = df_projection_display[
"Crescimento Absoluto"
].apply(lambda x: f"R$ {x:,.2f}")
df_projection_display["Crescimento Relativo (%)"] = df_projection_display[
"Crescimento Relativo (%)"
].apply(lambda x: f"{x:.2f}%")
if "Totais Gerais" in views_to_process:
total_novo_sum, total_atual_trib_sum, total_atual_calc_sum = (
Y_puro.sum(),
X.sum(),
the_merge["VLR_IMPOSTO_CALCULADO_iptu"].sum(),
)
total_novo_str, total_atual_trib_str, total_atual_calc_str = (
f"Total IPTU Novo (Simulado Puro): {total_novo_sum:,.2f}",
f"Total IPTU Atual Tributado: {total_atual_trib_sum:,.2f}",
f"Total IPTU Atual Calculado: {total_atual_calc_sum:,.2f}",
)
if "Análise da Enchente" in views_to_process:
filtro_prediais_estacionamentos_tributados = (
prediais_base | estacionamentos_geral
) & (~nao_tributados)
filtro_enchente_analysis = (
the_merge["SITUACAO_ALAGAMENTO"].isin(
["Direto", "Indireto", "Indeterminado"]
)
& filtro_prediais_estacionamentos_tributados
)
df_enchente_analysis_display, df_enchente_analysis_raw_data = generate_flood_analysis(
the_merge,
df_analysis,
filtro_enchente_analysis,
anos_projecao,
descontos_anuais,
)
if diferencia_predios_flag:
filtro_res = FILTERS_GLOBAL["prediais_residenciais"]
filtro_non_res = FILTERS_GLOBAL["prediais_nao_residenciais"]
df_enchente_res_display, _ = generate_flood_analysis(
the_merge, df_analysis, filtro_enchente_analysis & filtro_res, anos_projecao, descontos_anuais,
)
df_enchente_non_res_display, _ = generate_flood_analysis(
the_merge, df_analysis, filtro_enchente_analysis & filtro_non_res, anos_projecao, descontos_anuais,
)
else:
filtro_pred_geral = FILTERS_GLOBAL["prediais_base"]
df_enchente_predial_geral_display, _ = generate_flood_analysis(
the_merge, df_analysis, filtro_enchente_analysis & filtro_pred_geral, anos_projecao, descontos_anuais,
)
if diferencia_estacionamentos_flag:
filtro_est_res = FILTERS_GLOBAL["estacionamentos_residenciais"]
filtro_est_non_res = FILTERS_GLOBAL["estacionamentos_nao_residenciais"]
df_enchente_est_res_display, _ = generate_flood_analysis(
the_merge, df_analysis, filtro_enchente_analysis & filtro_est_res, anos_projecao, descontos_anuais,
)
df_enchente_est_non_res_display, _ = generate_flood_analysis(
the_merge, df_analysis, filtro_enchente_analysis & filtro_est_non_res, anos_projecao, descontos_anuais,
)
else:
filtro_est_geral = FILTERS_GLOBAL["estacionamentos_geral"]
df_enchente_est_geral_display, _ = generate_flood_analysis(
the_merge, df_analysis, filtro_enchente_analysis & filtro_est_geral, anos_projecao, descontos_anuais,
)
if "Gráficos da Enchente" in views_to_process and not df_enchente_analysis_raw_data.empty:
anos_para_graficos = df_enchente_analysis_raw_data["Exercício"].tolist()
flood_plot_figs = create_flood_analysis_graphs_plotly(
the_merge, anos_para_graficos
)
if "Resumo Detalhado por Faixa" in views_to_process:
active_categories_faixa = [regra["name"] for regra in IPTU_RULES_FINAL.values()]
primeiro_ano_proj = anos_projecao[0] if anos_projecao else None
anos_colunas = [f"Total {year}" for year in anos_projecao]
df_colunas = [
"Faixa (VV em UFM)", "Em R$", "Inscrições na Faixa",
"Proporção Inscrições (%)", "Proporção Arrecadação (%)",
"Total 2025", "Total Projeto (Puro, c/ Desc.)"
] + anos_colunas
for regra_nome, regra in IPTU_RULES_FINAL.items():
nome_legivel, filtro_rule, faixas = regra["name"], regra["rule"], regra["ranges"]
if nome_legivel not in active_categories_faixa or not filtro_rule.any():
continue
df_filtrado = df_resumo_final[filtro_rule].copy()
vv_perc, category_data = df_filtrado["valor_venal_calculo"], []
limites_com_zero = [0] + [lim[0] for lim in faixas]
total_inscricoes_categoria = len(df_filtrado)
total_categoria_final = the_merge.loc[filtro_rule, f"IPTU_PROJETADO_{anos_projecao[-1]}"].sum() if anos_projecao else 0
for i, (limite_superior_raw, _) in enumerate(faixas):
faixa_ufm = f"{(limites_com_zero[i] / UFM_VALUE if UFM_VALUE != 0 else 0):,.0f} a {(limite_superior_raw / UFM_VALUE if UFM_VALUE != 0 else 'inf'):,.0f} UFM"
faixa_reais = f"R$ {limites_com_zero[i]:,.0f} a R$ {limite_superior_raw:,.0f}" if np.isfinite(limite_superior_raw) else f"R$ {limites_com_zero[i]:,.0f} a inf"
cond_exclusiva = (vv_perc > limites_com_zero[i]) & (vv_perc <= limite_superior_raw)
indices_na_faixa = df_filtrado[cond_exclusiva].index
if indices_na_faixa.empty: continue
total_2025 = the_merge.loc[indices_na_faixa, "VLR_IMPOSTO_iptu"].sum()
total_projeto_puro = the_merge.loc[indices_na_faixa, "IPTU_PURO_C_DESC"].sum()
ano_totals = [the_merge.loc[indices_na_faixa, f"IPTU_PROJETADO_{year}"].sum() for year in anos_projecao]
prop_inscricoes_str = f"{(len(indices_na_faixa) / total_inscricoes_categoria * 100):.1f}%"
prop_arrecadacao_str = f"{(ano_totals[-1] / total_categoria_final * 100 if total_categoria_final > 0 else 0):.1f}%"
category_data.append(
[faixa_ufm, faixa_reais, len(indices_na_faixa), prop_inscricoes_str, prop_arrecadacao_str, total_2025, total_projeto_puro] + ano_totals
)
if category_data:
df_category_raw = pd.DataFrame(category_data, columns=df_colunas)
detailed_by_category_raw[nome_legivel] = df_category_raw.copy()
df_category_display = df_category_raw.copy()
currency_cols_display = ["Total 2025", "Total Projeto (Puro, c/ Desc.)"] + anos_colunas
for col in currency_cols_display:
df_category_display[col] = df_category_display[col].apply(lambda x: f"R$ {x:,.2f}")
detailed_by_category[nome_legivel] = df_category_display
if "Resumo Detalhado por Valor Venal" in views_to_process:
value_range_outputs_data = update_valor_venal_view(
value_range_interval, aggregate_from_vv_value, df_resumo_final, the_merge,
IPTU_RULES_FINAL, anos_projecao, diferencia_predios_flag,
diferencia_estacionamentos_flag, return_updates=False,
)
category_names = [rule['name'] for rule in IPTU_RULES_FINAL.values()]
for i, category in enumerate(category_names):
df_data = value_range_outputs_data[i * 2]
df_raw_data = value_range_outputs_data[i * 2 + 1]
if df_data is not None and not df_data.empty:
detailed_by_value_ranges[category] = df_data
detailed_by_value_ranges_raw[category] = df_raw_data
if "Resumo Agregado" in views_to_process:
conditions, choices = [], []
# Define as categorias base
if diferencia_predios_flag:
conditions.extend([prediais_residenciais, prediais_nao_residenciais])
choices.extend(["Predial Residencial", "Predial Não Residencial"])
else:
conditions.append(prediais_base); choices.append("Predial Sem Dif. por Uso")
if diferencia_estacionamentos_flag:
conditions.extend([estacionamentos_residenciais, estacionamentos_nao_residenciais])
choices.extend(["Estacionamentos Residenciais", "Estacionamentos Não Residenciais"])
else:
conditions.append(estacionamentos_geral); choices.append("Estacionamentos Sem Dif. por Uso")
conditions.extend([terrenos_1df_base, terrenos_2df_base, terrenos_3df_base, regra_tae_base, regra_taf_base])
choices.extend(["Terrenos na 1a DF", "Terrenos na 2a DF", "Terrenos na 3a DF", "Terrenos Aliq. Esp.", "Terrenos Aliq. Fix."])
the_merge["CategoriaAgregada"] = np.select(conditions, choices, default="Outros/Isentos")
# Anos a processar, incluindo o "sem trava"
anos_para_agregar = ["Projetado (Puro, c/ Desc.)"] + anos_projecao
colunas_iptu = {"Projetado (Puro, c/ Desc.)": "IPTU_PURO_C_DESC"}
colunas_iptu.update({year: f"IPTU_PROJETADO_{year}" for year in anos_projecao})
for ano_label in anos_para_agregar:
year_col = colunas_iptu[ano_label]
if year_col not in the_merge.columns: continue
agg_data = the_merge.groupby("CategoriaAgregada").agg(
Inscrições=("VLR_IMPOSTO_iptu", "size"),
Inscricoes_com_IPTU_Atual=("VLR_IMPOSTO_iptu", lambda x: (x > 0).sum()),
Total_IPTU_Projetado=(year_col, "sum"),
Total_IPTU_Atual_Tributado=("VLR_IMPOSTO_iptu", "sum"),
).reset_index()
agg_data = agg_data[agg_data["CategoriaAgregada"] != "Outros/Isentos"]
if agg_data.empty: continue
total_inscricoes_com_iptu = (the_merge["VLR_IMPOSTO_iptu"] > 0).sum()
total_atual_sum = agg_data["Total_IPTU_Atual_Tributado"].sum()
total_projetado_sum = agg_data["Total_IPTU_Projetado"].sum()
agg_data["Participação Inscrições IPTU>0 (%)"] = (agg_data["Inscricoes_com_IPTU_Atual"] / total_inscricoes_com_iptu * 100) if total_inscricoes_com_iptu > 0 else 0
agg_data["Participação no Total Atual (%)"] = (agg_data["Total_IPTU_Atual_Tributado"] / total_atual_sum * 100) if total_atual_sum > 0 else 0
agg_data["Participação no Total Projetado (%)"] = (agg_data["Total_IPTU_Projetado"] / total_projetado_sum * 100) if total_projetado_sum > 0 else 0
agg_data["Diferença Absoluta"] = agg_data["Total_IPTU_Projetado"] - agg_data["Total_IPTU_Atual_Tributado"]
agg_data["Diferença Percentual (%)"] = (agg_data["Diferença Absoluta"] / agg_data["Total_IPTU_Atual_Tributado"] * 100).where(agg_data["Total_IPTU_Atual_Tributado"] > 0, 0)
agg_data.rename(columns={
"CategoriaAgregada": "Categoria", "Inscricoes_com_IPTU_Atual": "Inscrições com IPTU > 0",
"Total_IPTU_Projetado": f"Total IPTU Projetado {ano_label}", "Total_IPTU_Atual_Tributado": "Total IPTU Atual Tributado"},
inplace=True,
)
final_cols_order = [
"Categoria", "Inscrições", "Inscrições com IPTU > 0", "Participação Inscrições IPTU>0 (%)",
"Total IPTU Atual Tributado", "Participação no Total Atual (%)", f"Total IPTU Projetado {ano_label}",
"Participação no Total Projetado (%)", "Diferença Absoluta", "Diferença Percentual (%)",
]
agg_data = agg_data[final_cols_order]
yearly_aggregates_raw[ano_label] = agg_data.copy()
agg_data_display = agg_data.copy()
currency_cols = ["Total IPTU Atual Tributado", f"Total IPTU Projetado {ano_label}", "Diferença Absoluta"]
for col in currency_cols:
agg_data_display[col] = agg_data_display[col].apply(lambda x: f"R$ {x:,.2f}" if pd.notna(x) else "R$ 0,00")
percent_cols = ["Participação Inscrições IPTU>0 (%)", "Participação no Total Atual (%)", "Participação no Total Projetado (%)", "Diferença Percentual (%)"]
for col in percent_cols:
agg_data_display[col] = agg_data_display[col].apply(lambda x: f"{x:,.2f}%" if pd.notna(x) else "0,00%")
yearly_aggregates[ano_label] = agg_data_display
if anos_para_agregar:
agg_dropdown_update = gr.update(choices=anos_para_agregar, value=anos_para_agregar[0])
first_year_agg_df = yearly_aggregates.get(anos_para_agregar[0], pd.DataFrame())
else:
agg_dropdown_update = gr.update(choices=[], value=None)
first_year_agg_df = pd.DataFrame()
if "Gerar Arquivo Excel" in views_to_process:
# A nova view de variação não é processada aqui, então seus dados não estarão prontos para o Excel
# Para incluir, seria preciso processá-la aqui ou assumir que o usuário a gerou antes.
# Por simplicidade, passaremos um dicionário vazio. Se a view tiver sido gerada,
# seus resultados estarão no state, mas esta função não tem acesso a ele.
excel_file_buffer = create_excel_download(
df_projection_raw=df_projection_raw,
total_novo_str=total_novo_str,
total_atual_trib_str=total_atual_trib_str,
total_atual_calc_str=total_atual_calc_str,
ufm_value=ufm_value_input,
diferencia_estacionamentos_flag=diferencia_estacionamentos_flag,
diferencia_predios_flag=diferencia_predios_flag,
trava_aumento_perc=trava_aumento_perc,
segregar_trava_flag=segregar_trava_flag,
universal_rules=universal_transition_rules,
category_rules=transition_rules_by_category,
descontos_anuais=descontos_anuais,
all_parameters=all_rate_inputs,
yearly_aggregates_raw=yearly_aggregates_raw,
detailed_by_category_raw=detailed_by_category_raw,
detailed_by_value_ranges_raw=detailed_by_value_ranges_raw,
df_enchente_analysis_raw=df_enchente_analysis_raw_data,
variation_analysis_raw={}, # Passando vazio pois é gerado por outro botão
)
filename_text, download_button_result = "", gr.DownloadButton(visible=False)
if excel_file_buffer:
try:
timestamp = datetime.now(pytz.timezone("America/Sao_Paulo")).strftime("%Y%m%d_%H%M")
filename = f"resultados_iptu_{timestamp}.xlsx"
temp_file_path = os.path.join(tempfile.gettempdir(), filename)
with open(temp_file_path, "wb") as f: f.write(excel_file_buffer.getvalue())
size_str = f"{len(excel_file_buffer.getvalue()) / 1024:.1f} KB"
download_button_result = gr.DownloadButton(label=f"Download ({size_str})", value=temp_file_path, visible=True)
filename_text = filename
except Exception as e:
print(f"Erro ao criar arquivo Excel: {e}")
projection_update = gr.update(value=df_projection_display, column_widths=calculate_column_widths(df_projection_display))
enchente_analysis_update = gr.update(value=df_enchente_analysis_display, column_widths=calculate_column_widths(df_enchente_analysis_display))
enchente_res_update = gr.update(value=df_enchente_res_display, column_widths=calculate_column_widths(df_enchente_res_display))
enchente_non_res_update = gr.update(value=df_enchente_non_res_display, column_widths=calculate_column_widths(df_enchente_non_res_display))
enchente_est_res_update = gr.update(value=df_enchente_est_res_display, column_widths=calculate_column_widths(df_enchente_est_res_display))
enchente_est_non_res_update = gr.update(value=df_enchente_est_non_res_display, column_widths=calculate_column_widths(df_enchente_est_non_res_display))
enchente_pred_geral_update = gr.update(value=df_enchente_predial_geral_display, column_widths=calculate_column_widths(df_enchente_predial_geral_display))
enchente_est_geral_update = gr.update(value=df_enchente_est_geral_display, column_widths=calculate_column_widths(df_enchente_est_geral_display))
first_year_agg_update = gr.update(value=first_year_agg_df, column_widths=calculate_column_widths(first_year_agg_df))
detailed_outputs_list = [
gr.update(value=df, column_widths=calculate_column_widths(df))
for df in [
detailed_by_category.get("Predial Sem Dif. por Uso", pd.DataFrame()),
detailed_by_category.get("Predial Residencial", pd.DataFrame()),
detailed_by_category.get("Predial Não Residencial", pd.DataFrame()),
detailed_by_category.get("Estacionamentos Sem Dif. por Uso", pd.DataFrame()),
detailed_by_category.get("Estacionamentos Residenciais", pd.DataFrame()),
detailed_by_category.get("Estacionamentos Não Residenciais", pd.DataFrame()),
detailed_by_category.get("Terrenos na 1a DF", pd.DataFrame()),
detailed_by_category.get("Terrenos na 2a DF", pd.DataFrame()),
detailed_by_category.get("Terrenos na 3a DF", pd.DataFrame()),
detailed_by_category.get("Terrenos Aliq. Esp.", pd.DataFrame()),
detailed_by_category.get("Terrenos Aliq. Fix.", pd.DataFrame()),
detailed_by_category.get("Predial Residencial - Aposentado, Inativo, Pensionista, Deficiente", pd.DataFrame()),
detailed_by_category.get("Predial Residencial - Habitações Populares", pd.DataFrame()),
]
]
value_range_outputs_list = [
gr.update(value=df, column_widths=calculate_column_widths(df))
for df in [
detailed_by_value_ranges.get("Predial Sem Dif. por Uso", pd.DataFrame()),
detailed_by_value_ranges.get("Predial Residencial", pd.DataFrame()),
detailed_by_value_ranges.get("Predial Não Residencial", pd.DataFrame()),
detailed_by_value_ranges.get("Estacionamentos Sem Dif. por Uso", pd.DataFrame()),
detailed_by_value_ranges.get("Estacionamentos Residenciais", pd.DataFrame()),
detailed_by_value_ranges.get("Estacionamentos Não Residenciais", pd.DataFrame()),
detailed_by_value_ranges.get("Terrenos na 1a DF", pd.DataFrame()),
detailed_by_value_ranges.get("Terrenos na 2a DF", pd.DataFrame()),
detailed_by_value_ranges.get("Terrenos na 3a DF", pd.DataFrame()),
detailed_by_value_ranges.get("Terrenos Aliq. Esp.", pd.DataFrame()),
detailed_by_value_ranges.get("Terrenos Aliq. Fix.", pd.DataFrame()),
detailed_by_value_ranges.get("Predial Residencial - Aposentado, Inativo, Pensionista, Deficiente", pd.DataFrame()),
detailed_by_value_ranges.get("Predial Residencial - Habitações Populares", pd.DataFrame()),
]
]
final_plot_outputs = [
gr.update(value=flood_plot_figs[i], visible=True)
if i < len(flood_plot_figs)
else gr.update(value=None, visible=False)
for i in range(MAX_PLOTS)
]
variation_analysis_years = ["Projetado (Puro, c/ Desc.)"] + anos_projecao
variation_analysis_year_update = gr.update(choices=variation_analysis_years, value=variation_analysis_years[0])
return (
"Processamento concluído.",
projection_update,
total_novo_str, total_atual_trib_str, total_atual_calc_str,
enchente_analysis_update, enchente_pred_geral_update, enchente_res_update,
enchente_non_res_update, enchente_est_geral_update, enchente_est_res_update,
enchente_est_non_res_update,
yearly_aggregates, agg_dropdown_update, first_year_agg_update,
gr.Textbox(value=filename_text, visible=bool(filename_text)), download_button_result,
) + tuple(detailed_outputs_list) \
+ tuple(value_range_outputs_list) \
+ tuple(final_plot_outputs) \
+ (df_resumo_final, the_merge, IPTU_RULES_FINAL, anos_projecao, detailed_by_value_ranges_raw) \
+ tuple(visibility_updates) \
+ (variation_analysis_year_update,)
# --- Função dedicada para atualizar a visão de valor venal (MODIFICADA) ---
def update_valor_venal_view(
interval_value,
aggregate_from_value,
df_resumo,
the_merge_df,
iptu_rules,
anos_projecao,
diferencia_predios,
diferencia_estacionamentos,
return_updates=True,
):
if df_resumo is None or the_merge_df is None or iptu_rules is None or anos_projecao is None:
empty_df, num_cats = pd.DataFrame(), 13
if return_updates: return (gr.update(value=empty_df, column_widths=None),) * num_cats
else: return (empty_df, empty_df) * num_cats
category_names = [rule['name'] for rule in iptu_rules.values()]
all_outputs = []
interval = interval_value if interval_value and interval_value > 0 else 300000.0
aggregate_from = aggregate_from_value if aggregate_from_value and aggregate_from_value > 0 else float("inf")
anos_colunas = [f"Total {year}" for year in anos_projecao]
df_colunas = ["Faixa de Valor Venal", "Inscrições na Faixa", "Proporção Inscrições (%)",
"Proporção Arrecadação (%)", "Total 2025", "Total Projeto (Puro, c/ Desc.)"] + anos_colunas
for regra_nome, regra in iptu_rules.items():
nome_legivel, filtro_rule = regra["name"], regra["rule"]
df_filtrado = df_resumo[filtro_rule].copy() if filtro_rule.any() else pd.DataFrame()
if df_filtrado.empty:
all_outputs.extend([pd.DataFrame(), pd.DataFrame()])
continue
vv_values = df_filtrado["valor_venal_calculo"]
total_inscricoes_categoria = len(df_filtrado)
total_categoria_final = the_merge_df.loc[filtro_rule, f"IPTU_PROJETADO_{anos_projecao[-1]}"].sum() if anos_projecao else 0
max_value = vv_values.max()
ranges, category_data = [], []
if max_value > 0:
current_min = 0
while current_min < max_value and current_min < aggregate_from and interval > 0:
ranges.append((current_min, current_min + interval))
current_min += interval
if current_min < max_value: ranges.append((current_min, max_value))
if not ranges: ranges.append((0, max_value))
for range_min, range_max in ranges:
range_filter = (vv_values >= range_min) & (vv_values < range_max)
if range_max >= max_value: range_filter |= (vv_values == max_value)
indices_na_faixa = df_filtrado[range_filter].index
if indices_na_faixa.empty: continue
faixa_nome = f"R$ {range_min:,.0f} a R$ {range_max:,.0f}"
qnt, total_2025 = len(indices_na_faixa), the_merge_df.loc[indices_na_faixa, "VLR_IMPOSTO_iptu"].sum()
total_projeto_puro = the_merge_df.loc[indices_na_faixa, "IPTU_PURO_C_DESC"].sum()
ano_totals = [the_merge_df.loc[indices_na_faixa, f"IPTU_PROJETADO_{year}"].sum() for year in anos_projecao]
prop_insc = f"{(qnt / total_inscricoes_categoria * 100):.1f}%"
prop_arrec = f"{(ano_totals[-1] / total_categoria_final * 100 if total_categoria_final > 0 else 0):.1f}%"
category_data.append([faixa_nome, qnt, prop_insc, prop_arrec, total_2025, total_projeto_puro] + ano_totals)
if category_data:
df_category_raw = pd.DataFrame(category_data, columns=df_colunas)
df_category_display = df_category_raw.copy()
currency_cols_display = ["Total 2025", "Total Projeto (Puro, c/ Desc.)"] + anos_colunas
for col in currency_cols_display:
df_category_display[col] = df_category_display[col].apply(lambda x: f"R$ {x:,.2f}")
all_outputs.extend([df_category_display, df_category_raw])
else:
all_outputs.extend([pd.DataFrame(), pd.DataFrame()])
if return_updates:
return tuple(gr.update(value=df, column_widths=calculate_column_widths(df)) for df in all_outputs[::2])
else:
return tuple(all_outputs)
# --- Função de visibilidade e controle da UI (MODIFICADO) ---
def update_parking_visibility(diferencia_estacionamentos):
return (
gr.Accordion(visible=not diferencia_estacionamentos),
gr.Accordion(visible=diferencia_estacionamentos),
gr.Accordion(visible=diferencia_estacionamentos),
)
def update_building_visibility(diferencia_predios):
return (
gr.Accordion(visible=not diferencia_predios),
gr.Accordion(visible=diferencia_predios),
gr.Accordion(visible=diferencia_predios),
)
def update_detailed_tabs_visibility(diferencia_predios, diferencia_estacionamentos):
return (
gr.update(visible=not diferencia_predios),
gr.update(visible=diferencia_predios),
gr.update(visible=diferencia_predios),
gr.update(visible=not diferencia_estacionamentos),
gr.update(visible=diferencia_estacionamentos),
gr.update(visible=diferencia_estacionamentos),
gr.update(visible=True), gr.update(visible=True), gr.update(visible=True),
gr.update(visible=True), gr.update(visible=True), gr.update(visible=True),
gr.update(visible=True),
)
def update_detailed_vv_tabs_visibility(diferencia_predios, diferencia_estacionamentos):
return (
gr.update(visible=not diferencia_predios),
gr.update(visible=diferencia_predios),
gr.update(visible=diferencia_predios),
gr.update(visible=not diferencia_estacionamentos),
gr.update(visible=diferencia_estacionamentos),
gr.update(visible=diferencia_estacionamentos),
gr.update(visible=True), gr.update(visible=True), gr.update(visible=True),
gr.update(visible=True), gr.update(visible=True), gr.update(visible=True),
gr.update(visible=True),
)
def update_selected_tabs(is_differentiated):
"""Muda a aba selecionada com base no checkbox de diferenciação."""
if is_differentiated:
# Quando diferenciar, torna a aba "Residencial" ativa.
return (
gr.update(selected="pr_faixa"),
gr.update(selected="pr_vv"),
gr.update(selected="pr_var")
)
else:
# Quando não diferenciar, torna a aba "Geral" ativa.
return (
gr.update(selected="pg_faixa"),
gr.update(selected="pg_vv"),
gr.update(selected="pg_var")
)
def add_year_rule(rules):
if len(rules) >= MAX_YEARS:
return rules
new_year = rules[-1]["year"] + 1 if rules else 2026
rules.append({"year": new_year, "factor": 1.0})
return rules
def remove_year_rule(rules):
if len(rules) > 1:
rules.pop()
if rules:
rules[-1]["factor"] = 1.0
return rules
def update_transition_ui(rules):
updates = []
num_rules = len(rules)
for i in range(MAX_YEARS):
if i < num_rules:
updates.append(gr.update(visible=True))
updates.append(gr.update(value=rules[i]["year"]))
updates.append(gr.update(value=rules[i]["factor"] * 100))
else:
updates.append(gr.update(visible=False))
updates.append(gr.update())
updates.append(gr.update())
return updates
def update_rule_factor(index, new_factor_perc, rules):
if 0 <= index < len(rules):
rules[index]["factor"] = (new_factor_perc or 0) / 100.0
return rules
def get_agg_df_for_year(selected_year, all_aggregates):
"""
Função melhorada para obter o DataFrame agregado para o ano selecionado
"""
# Handle the case where all_aggregates might be a State object or empty
if hasattr(all_aggregates, "value"):
# If it's a State object, extract the value
aggregates_dict = all_aggregates.value
else:
# It should be the actual dictionary
aggregates_dict = all_aggregates
# Check if we have valid data
if (
not selected_year
or not aggregates_dict
or not isinstance(aggregates_dict, dict)
):
return gr.update(value=pd.DataFrame(), column_widths=None)
# Try to get the dataframe for the selected year
try:
df = aggregates_dict.get(selected_year, pd.DataFrame())
return gr.update(value=df, column_widths=calculate_column_widths(df))
except (ValueError, TypeError) as e:
return gr.update(value=pd.DataFrame(), column_widths=None)
def update_category_rules_ui(selected_category, rules_by_category):
rules = rules_by_category.get(selected_category, [])
return update_transition_ui(rules)
def add_year_to_category(selected_category, rules_by_category):
if selected_category not in rules_by_category:
rules_by_category[selected_category] = []
rules = rules_by_category[selected_category]
add_year_rule(rules)
return rules_by_category
def remove_year_from_category(selected_category, rules_by_category):
if selected_category in rules_by_category:
rules = rules_by_category[selected_category]
remove_year_rule(rules)
return rules_by_category
def update_category_rule_factor(
index, new_factor_perc, selected_category, rules_by_category
):
if selected_category in rules_by_category:
rules = rules_by_category[selected_category]
update_rule_factor(index, new_factor_perc, rules)
return rules_by_category
def copy_rules_to_category(source_category, target_category, rules_by_category):
if source_category in rules_by_category and target_category is not None:
rules_by_category[target_category] = [
rule.copy() for rule in rules_by_category[source_category]
]
return rules_by_category
# --- FUNÇÃO CORRIGIDA: Análise de Variação - Lógica Passo a Passo ---
def generate_variation_analysis_view(
limit_perc, interval_perc, selected_year, abrangencia, the_merge_df, iptu_rules,
diferencia_predios, diferencia_estacionamentos,
):
# Validações
if the_merge_df is None or iptu_rules is None:
return [gr.update(value="Erro: Dados principais não processados. Clique em 'Processar Dados' primeiro.", visible=True)] \
+ [gr.update(value=pd.DataFrame())] * 13 + [gr.State({})]
# Verificar se é modo simples (limite e intervalo são zero)
modo_simples = (limit_perc == 0 and interval_perc == 0)
if not modo_simples and (not limit_perc or not interval_perc or limit_perc % interval_perc != 0):
return [gr.update(value="Erro: O intervalo deve ser um divisor exato do limite.", visible=True)] \
+ [gr.update(value=pd.DataFrame())] * 13 + [gr.State({})]
col_iptu_atual = "VLR_IMPOSTO_iptu"
col_iptu_projetado = "IPTU_PURO_C_DESC" if selected_year == "Projetado (Puro, c/ Desc.)" else f"IPTU_PROJETADO_{selected_year}"
if col_iptu_projetado not in the_merge_df.columns:
return [gr.update(value=f"Erro: A coluna de projeção para '{selected_year}' não foi encontrada.", visible=True)] \
+ [gr.update(value=pd.DataFrame())] * 13 + [gr.State({})]
df = the_merge_df.copy()
# Aplicar filtro de abrangência
if abrangencia == "Apenas imóveis da mancha":
filtro_mancha = df["SITUACAO_ALAGAMENTO"].isin(["Direto", "Indireto", "Indeterminado"])
df = df[filtro_mancha].copy()
if df.empty:
return [gr.update(value="Erro: Nenhum imóvel encontrado na mancha da enchente.", visible=True)] \
+ [gr.update(value=pd.DataFrame())] * 13 + [gr.State({})]
# Calcular variação
iptu_atual = df[col_iptu_atual]
iptu_projetado = df[col_iptu_projetado]
with np.errstate(divide='ignore', invalid='ignore'):
variacao_perc = pd.Series(
np.where(iptu_atual > 0, (iptu_projetado / iptu_atual - 1) * 100, np.nan),
index=df.index
)
# Preparar faixas ordenadas
if modo_simples:
faixas_ordenadas = ["Não Lançado", "Permanece Zero", "Não Mais Tributados", "Novos Tributados", "Diminuiu/Manteve", "Aumentou"]
else:
# Gerar faixas de variação detalhadas
limit_abs = abs(limit_perc)
faixas_variacao = []
faixas_variacao.append(f"Abaixo de {-limit_abs}%")
limite_inferior = -limit_abs
while limite_inferior < limit_abs:
limite_superior = limite_inferior + interval_perc
faixas_variacao.append(f"De {limite_inferior}% a {limite_superior}%")
limite_inferior = limite_superior
faixas_variacao.append(f"Acima de {limit_abs}%")
faixas_ordenadas = ["Não Lançado", "Permanece Zero", "Não Mais Tributados", "Novos Tributados"] + faixas_variacao
# Debug inicial
total_registros = len(df)
print(f"DEBUG - Total de registros: {total_registros}")
# Criar mapeamento de filtros base - ORDEM CORRIGIDA PARA BATER COM OS OUTPUTS
category_order = [
"Predial Residencial", "Predial Não Residencial", "Predial Sem Dif. por Uso", # Ordem corrigida
"Estacionamentos Sem Dif. por Uso", "Estacionamentos Residenciais", "Estacionamentos Não Residenciais",
"Terrenos na 1a DF", "Terrenos na 2a DF", "Terrenos na 3a DF",
"Terrenos Aliq. Esp.", "Terrenos Aliq. Fix.",
"Predial Residencial - Aposentado, Inativo, Pensionista, Deficiente",
"Predial Residencial - Habitações Populares"
]
filters_by_name = {}
if diferencia_predios:
filters_by_name["Predial Residencial"] = FILTERS_GLOBAL["prediais_residenciais"]
filters_by_name["Predial Não Residencial"] = FILTERS_GLOBAL["prediais_nao_residenciais"]
else:
filters_by_name["Predial Sem Dif. por Uso"] = FILTERS_GLOBAL["prediais_base"]
if diferencia_estacionamentos:
filters_by_name["Estacionamentos Residenciais"] = FILTERS_GLOBAL["estacionamentos_residenciais"]
filters_by_name["Estacionamentos Não Residenciais"] = FILTERS_GLOBAL["estacionamentos_nao_residenciais"]
else:
filters_by_name["Estacionamentos Sem Dif. por Uso"] = FILTERS_GLOBAL["estacionamentos_geral"]
filters_by_name.update({
"Terrenos na 1a DF": FILTERS_GLOBAL["terrenos_1df_base"],
"Terrenos na 2a DF": FILTERS_GLOBAL["terrenos_2df_base"],
"Terrenos na 3a DF": FILTERS_GLOBAL["terrenos_3df_base"],
"Terrenos Aliq. Esp.": FILTERS_GLOBAL["regra_tae_base"],
"Terrenos Aliq. Fix.": FILTERS_GLOBAL["regra_taf_base"],
"Predial Residencial - Aposentado, Inativo, Pensionista, Deficiente": FILTERS_GLOBAL["isen_prop_loc"],
"Predial Residencial - Habitações Populares": FILTERS_GLOBAL["isen_hab_pop"]
})
all_dfs_display = []
all_dfs_raw = {}
total_arrec_atual = df.loc[df[col_iptu_atual] > 0, col_iptu_atual].sum()
total_arrec_projetado = df.loc[df[col_iptu_projetado] > 0, col_iptu_projetado].sum()
# PRIMEIRO: PROCESSAR TOTAIS GERAIS (PRIMEIRA ABA)
print(f"\nDEBUG TOTAIS GERAIS - Total geral: {len(df)}")
df_geral = df.copy()
df_geral['Faixa de Variação'] = "Não Aplicável"
indices_restantes_geral = df_geral.index.copy()
# LÓGICA PASSO A PASSO PARA TOTAIS GERAIS
# PASSO 1: Não Lançado (filtro de não tributados)
try:
nao_tributados_global = FILTERS_GLOBAL["nao_tributados"]
nao_tributados_gerais = nao_tributados_global.loc[df_geral.index]
indices_nao_tributados_geral = df_geral.index[nao_tributados_gerais]
df_geral.loc[indices_nao_tributados_geral, 'Faixa de Variação'] = "Não Lançado"
indices_restantes_geral = indices_restantes_geral.difference(indices_nao_tributados_geral)
print(f"DEBUG GERAL - Passo 1 (Não Lançado): {len(indices_nao_tributados_geral)}")
except Exception as e:
print(f"DEBUG GERAL - Erro no Passo 1: {e}")
indices_nao_tributados_geral = pd.Index([])
# PASSO 2: Permanece Zero
if len(indices_restantes_geral) > 0:
mask_permanece_zero_geral = (df_geral.loc[indices_restantes_geral, col_iptu_atual] == 0) & (df_geral.loc[indices_restantes_geral, col_iptu_projetado] == 0)
indices_permanece_zero_geral = indices_restantes_geral[mask_permanece_zero_geral]
df_geral.loc[indices_permanece_zero_geral, 'Faixa de Variação'] = "Permanece Zero"
indices_restantes_geral = indices_restantes_geral.difference(indices_permanece_zero_geral)
print(f"DEBUG GERAL - Passo 2 (Permanece Zero): {len(indices_permanece_zero_geral)}")
# PASSO 3: Não Mais Tributados
if len(indices_restantes_geral) > 0:
mask_nao_mais_trib_geral = (df_geral.loc[indices_restantes_geral, col_iptu_atual] > 0) & (df_geral.loc[indices_restantes_geral, col_iptu_projetado] == 0)
indices_nao_mais_trib_geral = indices_restantes_geral[mask_nao_mais_trib_geral]
df_geral.loc[indices_nao_mais_trib_geral, 'Faixa de Variação'] = "Não Mais Tributados"
indices_restantes_geral = indices_restantes_geral.difference(indices_nao_mais_trib_geral)
print(f"DEBUG GERAL - Passo 3 (Não Mais Tributados): {len(indices_nao_mais_trib_geral)}")
# PASSO 4: Novos Tributados
if len(indices_restantes_geral) > 0:
mask_novos_trib_geral = (df_geral.loc[indices_restantes_geral, col_iptu_atual] == 0) & (df_geral.loc[indices_restantes_geral, col_iptu_projetado] > 0)
indices_novos_trib_geral = indices_restantes_geral[mask_novos_trib_geral]
df_geral.loc[indices_novos_trib_geral, 'Faixa de Variação'] = "Novos Tributados"
indices_restantes_geral = indices_restantes_geral.difference(indices_novos_trib_geral)
print(f"DEBUG GERAL - Passo 4 (Novos Tributados): {len(indices_novos_trib_geral)}")
# PASSO 5: Aumentos e Reduções
if len(indices_restantes_geral) > 0:
if modo_simples:
# Modo simples
variacao_restantes_geral = variacao_perc.loc[indices_restantes_geral]
mask_diminuiu_geral = variacao_restantes_geral <= 0
mask_aumentou_geral = variacao_restantes_geral > 0
indices_diminuiu_geral = indices_restantes_geral[mask_diminuiu_geral]
indices_aumentou_geral = indices_restantes_geral[mask_aumentou_geral]
df_geral.loc[indices_diminuiu_geral, 'Faixa de Variação'] = "Diminuiu/Manteve"
df_geral.loc[indices_aumentou_geral, 'Faixa de Variação'] = "Aumentou"
print(f"DEBUG GERAL - Passo 5a (Diminuiu/Manteve): {len(indices_diminuiu_geral)}")
print(f"DEBUG GERAL - Passo 5b (Aumentou): {len(indices_aumentou_geral)}")
else:
# Modo detalhado
variacao_restantes_geral = variacao_perc.loc[indices_restantes_geral]
limit_abs = abs(limit_perc)
for i, faixa_nome in enumerate(faixas_variacao):
if i == 0:
mask_faixa_geral = variacao_restantes_geral < -limit_abs
elif i == len(faixas_variacao) - 1:
mask_faixa_geral = variacao_restantes_geral >= limit_abs
else:
limite_inf = -limit_abs + (i - 1) * interval_perc
limite_sup = limite_inf + interval_perc
mask_faixa_geral = (variacao_restantes_geral >= limite_inf) & (variacao_restantes_geral < limite_sup)
indices_faixa_geral = indices_restantes_geral[mask_faixa_geral]
df_geral.loc[indices_faixa_geral, 'Faixa de Variação'] = faixa_nome
if len(indices_faixa_geral) > 0:
print(f"DEBUG GERAL - Passo 5 ({faixa_nome}): {len(indices_faixa_geral)}")
# Processar resultado geral
def processar_resultado(df_proc, nome_categoria):
if modo_simples:
# MODO SIMPLES: Agregar em apenas 2 categorias
df_proc['Faixa de Variação Simples'] = "Não Aplicável"
mask_diminuiu_geral = df_proc['Faixa de Variação'].isin([
"Não Lançado", "Permanece Zero", "Não Mais Tributados", "Diminuiu/Manteve"
])
df_proc.loc[mask_diminuiu_geral, 'Faixa de Variação Simples'] = "Diminuiu/Manteve"
mask_aumentou_geral = df_proc['Faixa de Variação'].isin([
"Novos Tributados", "Aumentou"
])
df_proc.loc[mask_aumentou_geral, 'Faixa de Variação Simples'] = "Aumentou"
diminuiu_count = mask_diminuiu_geral.sum()
aumentou_count = mask_aumentou_geral.sum()
print(f"DEBUG {nome_categoria} - SIMPLES: Diminuiu/Manteve: {diminuiu_count}, Aumentou: {aumentou_count}")
categorias_simples = ["Diminuiu/Manteve", "Aumentou"]
df_proc['Faixa de Variação Simples'] = pd.Categorical(df_proc['Faixa de Variação Simples'], categories=categorias_simples, ordered=True)
agg = df_proc.groupby('Faixa de Variação Simples', observed=False).agg(
N_Inscricoes=('Faixa de Variação Simples', 'count'),
Total_IPTU_Atual=(col_iptu_atual, 'sum'),
Total_IPTU_Projetado=(col_iptu_projetado, 'sum')
).reset_index()
agg = agg.rename(columns={'Faixa de Variação Simples': 'Faixa de Variação'})
else:
# MODO DETALHADO
df_proc['Faixa de Variação'] = pd.Categorical(df_proc['Faixa de Variação'], categories=faixas_ordenadas, ordered=True)
agg = df_proc.groupby('Faixa de Variação', observed=False).agg(
N_Inscricoes=('Faixa de Variação', 'count'),
Total_IPTU_Atual=(col_iptu_atual, 'sum'),
Total_IPTU_Projetado=(col_iptu_projetado, 'sum')
).reset_index()
total_inscricoes = agg['N_Inscricoes'].sum()
agg['%_do_Total_de_Inscrições'] = (agg['N_Inscricoes'] / total_inscricoes * 100) if total_inscricoes > 0 else 0
agg['Diferença_Absoluta'] = agg['Total_IPTU_Projetado'] - agg['Total_IPTU_Atual']
agg['Participação_na_Arrecadação_Atual_%'] = (agg['Total_IPTU_Atual'] / total_arrec_atual * 100) if total_arrec_atual > 0 else 0
agg['Participação_na_Arrecadação_Projetada_%'] = (agg['Total_IPTU_Projetado'] / total_arrec_projetado * 100) if total_arrec_projetado > 0 else 0
agg = agg.rename(columns={
'N_Inscricoes': 'Nº de Inscrições',
'%_do_Total_de_Inscrições': '% do Total de Inscrições',
'Total_IPTU_Atual': 'Total IPTU Atual (2025)',
'Total_IPTU_Projetado': f'Total IPTU Projetado ({selected_year})',
'Diferença_Absoluta': 'Diferença Absoluta (R$)',
'Participação_na_Arrecadação_Atual_%': 'Participação na Arrecadação Atual (%)',
'Participação_na_Arrecadação_Projetada_%': 'Participação na Arrecadação Projetada (%)',
})
final_cols = ['Faixa de Variação', 'Nº de Inscrições', '% do Total de Inscrições', 'Total IPTU Atual (2025)',
f'Total IPTU Projetado ({selected_year})', 'Diferença Absoluta (R$)',
'Participação na Arrecadação Atual (%)', 'Participação na Arrecadação Projetada (%)']
agg = agg[final_cols]
return agg
# Processar totais gerais
agg_geral = processar_resultado(df_geral, "TOTAIS GERAIS")
all_dfs_raw["Totais Gerais"] = agg_geral.copy()
# Formatação para exibição dos totais gerais
df_display_geral = agg_geral.copy()
currency_cols = ['Total IPTU Atual (2025)', f'Total IPTU Projetado ({selected_year})', 'Diferença Absoluta (R$)']
percent_cols = ['% do Total de Inscrições', 'Participação na Arrecadação Atual (%)', 'Participação na Arrecadação Projetada (%)']
for col in currency_cols:
df_display_geral[col] = df_display_geral[col].apply(lambda x: f"R$ {x:,.2f}")
for col in percent_cols:
df_display_geral[col] = df_display_geral[col].apply(lambda x: f"{x:.2f}%")
# Adicionar totais gerais como primeira aba
all_dfs_display.append(gr.update(value=df_display_geral, column_widths=calculate_column_widths(df_display_geral)))
# DEPOIS: PROCESSAR POR CATEGORIA (DEMAIS ABAS)
# Iterar por categoria
for nome_legivel in category_order:
if nome_legivel not in filters_by_name:
all_dfs_display.append(gr.update(value=pd.DataFrame()))
continue
filtro_rule = filters_by_name[nome_legivel]
if not filtro_rule.any():
all_dfs_display.append(gr.update(value=pd.DataFrame()))
continue
# Aplicar filtro da categoria
df_cat = df[filtro_rule[df.index]].copy()
if df_cat.empty:
all_dfs_display.append(gr.update(value=pd.DataFrame()))
continue
print(f"\nDEBUG {nome_legivel} - Total na categoria: {len(df_cat)}")
# LÓGICA PASSO A PASSO - SEQUENCIAL E EXCLUSIVA
df_cat['Faixa de Variação'] = "Não Aplicável"
indices_restantes = df_cat.index.copy()
# PASSO 1: Não Lançado (filtro de não tributados)
try:
nao_tributados_global = FILTERS_GLOBAL["nao_tributados"]
nao_tributados_nesta_cat = nao_tributados_global.loc[df_cat.index]
indices_nao_tributados = df_cat.index[nao_tributados_nesta_cat]
df_cat.loc[indices_nao_tributados, 'Faixa de Variação'] = "Não Lançado"
indices_restantes = indices_restantes.difference(indices_nao_tributados)
print(f"DEBUG {nome_legivel} - Passo 1 (Não Lançado): {len(indices_nao_tributados)}")
except Exception as e:
print(f"DEBUG {nome_legivel} - Erro no Passo 1: {e}")
indices_nao_tributados = pd.Index([])
# PASSO 2: Permanece Zero (eram zero, continuam zero, MAS não são não tributados)
if len(indices_restantes) > 0:
mask_permanece_zero = (df_cat.loc[indices_restantes, col_iptu_atual] == 0) & (df_cat.loc[indices_restantes, col_iptu_projetado] == 0)
indices_permanece_zero = indices_restantes[mask_permanece_zero]
df_cat.loc[indices_permanece_zero, 'Faixa de Variação'] = "Permanece Zero"
indices_restantes = indices_restantes.difference(indices_permanece_zero)
print(f"DEBUG {nome_legivel} - Passo 2 (Permanece Zero): {len(indices_permanece_zero)}")
# PASSO 3: Não Mais Tributados (eram positivos, agora zero, não são não tributados)
if len(indices_restantes) > 0:
mask_nao_mais_trib = (df_cat.loc[indices_restantes, col_iptu_atual] > 0) & (df_cat.loc[indices_restantes, col_iptu_projetado] == 0)
indices_nao_mais_trib = indices_restantes[mask_nao_mais_trib]
df_cat.loc[indices_nao_mais_trib, 'Faixa de Variação'] = "Não Mais Tributados"
indices_restantes = indices_restantes.difference(indices_nao_mais_trib)
print(f"DEBUG {nome_legivel} - Passo 3 (Não Mais Tributados): {len(indices_nao_mais_trib)}")
# PASSO 4: Novos Tributados (eram zero, agora positivos, não são não tributados)
if len(indices_restantes) > 0:
mask_novos_trib = (df_cat.loc[indices_restantes, col_iptu_atual] == 0) & (df_cat.loc[indices_restantes, col_iptu_projetado] > 0)
indices_novos_trib = indices_restantes[mask_novos_trib]
df_cat.loc[indices_novos_trib, 'Faixa de Variação'] = "Novos Tributados"
indices_restantes = indices_restantes.difference(indices_novos_trib)
print(f"DEBUG {nome_legivel} - Passo 4 (Novos Tributados): {len(indices_novos_trib)}")
# PASSO 5: Aumentos e Reduções (eram positivos, continuam positivos)
if len(indices_restantes) > 0:
if modo_simples:
# Modo simples: apenas aumentou vs diminuiu/manteve
variacao_restantes = variacao_perc.loc[indices_restantes]
mask_diminuiu = variacao_restantes <= 0
mask_aumentou = variacao_restantes > 0
indices_diminuiu = indices_restantes[mask_diminuiu]
indices_aumentou = indices_restantes[mask_aumentou]
df_cat.loc[indices_diminuiu, 'Faixa de Variação'] = "Diminuiu/Manteve"
df_cat.loc[indices_aumentou, 'Faixa de Variação'] = "Aumentou"
print(f"DEBUG {nome_legivel} - Passo 5a (Diminuiu/Manteve): {len(indices_diminuiu)}")
print(f"DEBUG {nome_legivel} - Passo 5b (Aumentou): {len(indices_aumentou)}")
else:
# Modo detalhado: faixas de variação
variacao_restantes = variacao_perc.loc[indices_restantes]
limit_abs = abs(limit_perc)
# Aplicar faixas
for i, faixa_nome in enumerate(faixas_variacao):
if i == 0: # Primeira faixa: abaixo do limite negativo
mask_faixa = variacao_restantes < -limit_abs
elif i == len(faixas_variacao) - 1: # Última faixa: acima do limite positivo
mask_faixa = variacao_restantes >= limit_abs
else: # Faixas intermediárias
limite_inf = -limit_abs + (i - 1) * interval_perc
limite_sup = limite_inf + interval_perc
mask_faixa = (variacao_restantes >= limite_inf) & (variacao_restantes < limite_sup)
indices_faixa = indices_restantes[mask_faixa]
df_cat.loc[indices_faixa, 'Faixa de Variação'] = faixa_nome
if len(indices_faixa) > 0:
print(f"DEBUG {nome_legivel} - Passo 5 ({faixa_nome}): {len(indices_faixa)}")
# Processar resultado da categoria usando a função auxiliar
agg = processar_resultado(df_cat, nome_legivel)
all_dfs_raw[nome_legivel] = agg.copy()
# Formatação para exibição
df_display = agg.copy()
currency_cols = ['Total IPTU Atual (2025)', f'Total IPTU Projetado ({selected_year})', 'Diferença Absoluta (R$)']
percent_cols = ['% do Total de Inscrições', 'Participação na Arrecadação Atual (%)', 'Participação na Arrecadação Projetada (%)']
for col in currency_cols:
df_display[col] = df_display[col].apply(lambda x: f"R$ {x:,.2f}")
for col in percent_cols:
df_display[col] = df_display[col].apply(lambda x: f"{x:.2f}%")
all_dfs_display.append(gr.update(value=df_display, column_widths=calculate_column_widths(df_display)))
# Garantir 14 resultados (1 Totais Gerais + 13 categorias)
num_total_tabs = 14 # 1 totais gerais + 13 categorias = 14 abas
while len(all_dfs_display) < num_total_tabs:
all_dfs_display.append(gr.update(value=pd.DataFrame()))
return [gr.update(value="", visible=False)] + all_dfs_display[:num_total_tabs] + [all_dfs_raw]
# --- Gradio Interface (MODIFICADA) ---
with gr.Blocks(theme=gr.themes.Default()) as demo:
# --- CONFIGURAÇÃO DE ESTADO INICIAL ---
INITIAL_DIFFERENTIATE_BUILDINGS = True
INITIAL_DIFFERENTIATE_PARKING = True
# --- FIM DA CONFIGURAÇÃO ---
gr.Markdown("# Simulador de Cálculo de IPTU")
gr.Markdown(f"Ajuste os parâmetros abaixo.")
default_categories = [
"Predial Geral", "Predial Residencial", "Predial Não Residencial",
"Estacionamentos Geral", "Estacionamentos Residenciais", "Estacionamentos Não Residenciais",
"Terrenos 1ª DF", "Terrenos 2ª DF", "Terrenos 3ª DF",
"Terrenos Alíquota Especial", "Terrenos Alíquota Fixa",
]
default_rules_global = [{"year": 2026, "factor": 0.3}, {"year": 2027, "factor": 0.6}, {"year": 2028, "factor": 0.8}, {"year": 2029, "factor": 1.0}]
# --- STATES ---
universal_transition_rules_state = gr.State(default_rules_global)
initial_rules_by_category = {cat: [{"year": 2026, "factor": 1.0}] for cat in default_categories}
transition_rules_by_category_state = gr.State(initial_rules_by_category)
all_yearly_aggregates_state = gr.State({})
df_resumo_final_state = gr.State()
the_merge_state = gr.State()
iptu_rules_state = gr.State()
anos_projecao_state = gr.State()
detailed_by_value_ranges_raw_state = gr.State({})
variation_analysis_raw_state = gr.State({}) # State para a nova view
with gr.Row():
with gr.Column(scale=1):
with gr.Accordion("📁 Carregar Cenário de Arquivo Excel", open=False):
gr.Markdown("Faça o upload de um arquivo `resultados_iptu_...xlsx` para carregar os parâmetros usados.")
file_upload_input = gr.File(label="Carregar arquivo .xlsx", file_types=[".xlsx"])
upload_status_output = gr.Textbox(label="Status do Carregamento", interactive=False)
gr.Markdown("### Parâmetros Gerais")
ufm_input = gr.Number(value=5.771, label="Valor da UFM (2025)")
diferencia_estacionamentos_check = gr.Checkbox(value=INITIAL_DIFFERENTIATE_PARKING, label="Diferenciar Estacionamentos por Uso?")
diferencia_predios_check = gr.Checkbox(value=INITIAL_DIFFERENTIATE_BUILDINGS, label="Diferenciar Prédios por Uso?")
segregar_piso_check = gr.Checkbox(value=False, label="Segregar Piso por Categoria?", visible=False)
segregar_trava_check = gr.Checkbox(value=False, label="Segregar Regras de Transição por Categoria?")
with gr.Accordion("Parâmetros da Trava de Aumento", open=True):
trava_aumento_perc_input = gr.Number(value=0.0, label="Piso Aumento Relativo para Ativar Trava (%)")
with gr.Accordion("Parâmetros do Desconto nas Alíquotas da Mancha", open=True):
gr.Markdown("Defina o percentual de desconto a ser aplicado sobre cada uma das faixas de alíquota dos imóveis prediais e estacionamentos, para os imóveis constantes da mancha da enchente.")
desconto_2026_input = gr.Number(value=5.0, label="Desconto em 2026 (%)", minimum=0, maximum=100)
desconto_2027_input = gr.Number(value=0.0, label="Desconto em 2027 (%)", minimum=0, maximum=100)
desconto_2028_input = gr.Number(value=0.0, label="Desconto em 2028 (%)", minimum=0, maximum=100)
desconto_2029_input = gr.Number(value=0.0, label="Desconto em 2029 (%)", minimum=0, maximum=100)
with gr.Accordion("Regras de Transição Universais (Fator da Trava)", open=True, visible=True) as universal_rules_accordion:
universal_ui_outputs = []
for i in range(MAX_YEARS):
is_visible = i < len(default_rules_global)
year_val, factor_val = (default_rules_global[i]["year"], default_rules_global[i]["factor"] * 100) if is_visible else (2026 + i, 100)
with gr.Row(visible=is_visible) as row:
year_input = gr.Number(value=year_val, label="Ano", interactive=False)
factor_input = gr.Number(value=factor_val, label="Fator do Aumento (%)", interactive=True)
universal_ui_outputs.extend([row, year_input, factor_input])
with gr.Row():
universal_add_year_btn = gr.Button("➕ Adicionar Ano", scale=1)
universal_remove_year_btn = gr.Button("➖ Remover Último Ano", scale=1)
with gr.Accordion("Regras de Transição por Categoria (Fator da Trava)", open=False, visible=False) as category_rules_accordion:
category_selector = gr.Dropdown(choices=default_categories, value="Predial Geral", label="Selecione a Categoria para Configurar")
category_ui_outputs = []
for i in range(MAX_YEARS):
is_visible = i < 1
year_val, factor_val = (2026, 100)
with gr.Row(visible=is_visible) as row:
year_input = gr.Number(value=year_val, label="Ano", interactive=False)
factor_input = gr.Number(value=factor_val, label="Fator do Aumento (%)", interactive=True)
category_ui_outputs.extend([row, year_input, factor_input])
with gr.Row():
category_add_year_btn = gr.Button("➕ Adicionar Ano à Categoria", scale=1)
category_remove_year_btn = gr.Button("➖ Remover Último Ano da Categoria", scale=1)
with gr.Row(variant="panel"):
copy_source = gr.Dropdown(choices=default_categories, label="Copiar de:", scale=1)
copy_target = gr.Dropdown(choices=default_categories, label="Para:", scale=1)
copy_btn = gr.Button("Copiar Regras", scale=1)
gr.Markdown("### Faixas e Alíquotas Configuráveis")
all_rate_inputs = []
pg_defaults_lim = [17330, 24954, 74859, 124589, 190609, 259921, 762433]
pg_defaults_aliq = [0.0, 0.004, 0.0047, 0.0055, 0.0062, 0.007, 0.0077, 0.0080]
pr_defaults_lim = [17330, 24954, 74859, 124589, 190609, 259921, 762433]
pr_defaults_aliq = [0.0, 0.004, 0.0047, 0.0055, 0.0062, 0.007, 0.0077, 0.0085]
pnr_defaults_lim = [17330, 43320]
pnr_defaults_aliq = [0.0, 0.006, 0.008]
eg_defaults_lim = [2600, 24954, 74859, 124589, 190609, 260021, 762433]
eg_defaults_aliq = [0.0, 0.004, 0.0047, 0.0055, 0.0062, 0.007, 0.0077, 0.0080]
er_defaults_lim = [2600, 24954, 74859, 124589, 190609, 259921, 762433]
er_defaults_aliq = [0.0, 0.004, 0.0047, 0.0055, 0.0062, 0.007, 0.0077, 0.0085]
enr_defaults_lim = [2600, 7798]
enr_defaults_aliq = [0.0, 0.006, 0.008]
t_df_defaults_lim = [17330] + [0] * (MAX_BANDS - 2)
with gr.Accordion("Predial Geral (PG)", open=False, visible=not INITIAL_DIFFERENTIATE_BUILDINGS) as pg_accordion:
pg_inputs = create_configurable_band_ui("PG", 8, pg_defaults_lim, pg_defaults_aliq)
all_rate_inputs.extend(pg_inputs)
with gr.Accordion("Predial Residencial (PR)", open=False, visible=INITIAL_DIFFERENTIATE_BUILDINGS) as pr_accordion:
pr_inputs = create_configurable_band_ui("PR", 8, pr_defaults_lim, pr_defaults_aliq)
all_rate_inputs.extend(pr_inputs)
with gr.Accordion("Predial Não Residencial (PNR)", open=False, visible=INITIAL_DIFFERENTIATE_BUILDINGS) as pnr_accordion:
pnr_inputs = create_configurable_band_ui("PNR", 3, pnr_defaults_lim, pnr_defaults_aliq)
all_rate_inputs.extend(pnr_inputs)
with gr.Accordion("Estacionamentos Geral (EG)", open=False, visible=not INITIAL_DIFFERENTIATE_PARKING) as eg_accordion:
eg_inputs = create_configurable_band_ui("EG", 8, eg_defaults_lim, eg_defaults_aliq)
all_rate_inputs.extend(eg_inputs)
with gr.Accordion("Estacionamentos Residenciais (ER)", open=False, visible=INITIAL_DIFFERENTIATE_PARKING) as er_accordion:
er_inputs = create_configurable_band_ui("ER", 8, er_defaults_lim, er_defaults_aliq)
all_rate_inputs.extend(er_inputs)
with gr.Accordion("Estacionamentos Não Residenciais (ENR)", open=False, visible=INITIAL_DIFFERENTIATE_PARKING) as enr_accordion:
enr_inputs = create_configurable_band_ui("ENR", 3, enr_defaults_lim, enr_defaults_aliq)
all_rate_inputs.extend(enr_inputs)
with gr.Accordion("Terrenos 1a DF (T1DF)", open=False) as t1df_accordion:
t1df_inputs = create_configurable_band_ui("T1DF", 2, t_df_defaults_lim, [0.0, 0.03] + [0] * (MAX_BANDS - 2))
all_rate_inputs.extend(t1df_inputs)
with gr.Accordion("Terrenos 2a DF (T2DF)", open=False) as t2df_accordion:
t2df_inputs = create_configurable_band_ui("T2DF", 2, t_df_defaults_lim, [0.0, 0.02] + [0] * (MAX_BANDS - 2))
all_rate_inputs.extend(t2df_inputs)
with gr.Accordion("Terrenos 3a DF (T3DF)", open=False) as t3df_accordion:
t3df_inputs = create_configurable_band_ui("T3DF", 2, t_df_defaults_lim, [0.0, 0.01] + [0] * (MAX_BANDS - 2))
all_rate_inputs.extend(t3df_inputs)
with gr.Accordion("Terrenos Alíquota Especial (TAE)", open=False) as tae_accordion:
tae_alq1_input = gr.Number(label="TAE Alíquota 1 (ex: 0.009)", value=0.009)
all_rate_inputs.append(tae_alq1_input)
with gr.Accordion("Terrenos Alíquota Fixa (TAF)", open=False) as taf_accordion:
taf_alq1_input = gr.Number(label="TAF Alíquota 1 (ex: 0.0095)", value=0.0095)
all_rate_inputs.append(taf_alq1_input)
with gr.Accordion("Visões a Processar/Exibir", open=True):
show_projection_check = gr.Checkbox(label="Projeção de Arrecadação", value=True)
show_aggregate_check = gr.Checkbox(label="Resumo Agregado por Categoria", value=True)
show_detailed_bands_check = gr.Checkbox(label="Resumo Detalhado por Faixa", value=True)
show_detailed_vv_check = gr.Checkbox(label="Resumo Detalhado por Valor Venal", value=True)
show_totals_check = gr.Checkbox(label="Totais Gerais Calculados", value=True)
show_flood_analysis_check = gr.Checkbox(label="Análise da Enchente (Tabela)", value=True)
show_flood_graphs_check = gr.Checkbox(label="Análise da Enchente (Gráficos)", value=True)
show_excel_check = gr.Checkbox(label="Download Resultados (Excel)", value=True)
validation_error_box = gr.Textbox(label="⚠️ Erros de Validação", visible=False, interactive=False, lines=2)
process_button = gr.Button("Processar Dados", variant="primary", size="lg")
with gr.Column(scale=2):
status_message = gr.Textbox(label="Status", interactive=False)
with gr.Group(visible=True) as projection_box:
gr.Markdown("### 📈 Projeção de Arrecadação com Trava de Aumento")
output_df_projection = gr.DataFrame(label="Projeção Anual")
with gr.Group(visible=True) as aggregate_summary_box:
gr.Markdown("### 📊 Resumo Agregado por Categoria e Ano")
with gr.Row():
output_agg_year_selector = gr.Dropdown(label="Selecione o Ano para Visualizar o Resumo", scale=1)
output_agg_df = gr.DataFrame(label="Resumo Agregado Anual", scale=2)
with gr.Group(visible=True) as detailed_bands_box:
gr.Markdown("### 📋 Resumo Detalhado por Faixa e Categoria")
selected_detailed_tab_id = "pr_faixa" if INITIAL_DIFFERENTIATE_BUILDINGS else "pg_faixa"
with gr.Tabs(elem_id="detailed_tabs_container", selected=selected_detailed_tab_id) as detailed_tabs:
with gr.TabItem("Predial Geral", visible=not INITIAL_DIFFERENTIATE_BUILDINGS, id="pg_faixa") as tab_predial_geral:
output_predial_geral = gr.DataFrame(label="Predial Sem Diferenciação")
with gr.TabItem("Predial Residencial", visible=INITIAL_DIFFERENTIATE_BUILDINGS, id="pr_faixa") as tab_predial_residencial:
output_predial_residencial = gr.DataFrame(label="Predial Residencial")
with gr.TabItem("Predial Não Residencial", visible=INITIAL_DIFFERENTIATE_BUILDINGS, id="pnr_faixa") as tab_predial_nao_residencial:
output_predial_nao_residencial = gr.DataFrame(label="Predial Não Residencial")
with gr.TabItem("Estacionamentos Geral", visible=not INITIAL_DIFFERENTIATE_PARKING, id="eg_faixa") as tab_estacionamentos_geral:
output_estacionamentos_geral = gr.DataFrame(label="Estacionamentos Sem Diferenciação")
with gr.TabItem("Estacionamentos Residenciais", visible=INITIAL_DIFFERENTIATE_PARKING, id="er_faixa") as tab_estacionamentos_residenciais:
output_estacionamentos_residenciais = gr.DataFrame(label="Estacionamentos Residenciais")
with gr.TabItem("Estacionamentos Não Residenciais", visible=INITIAL_DIFFERENTIATE_PARKING, id="enr_faixa") as tab_estacionamentos_nao_residenciais:
output_estacionamentos_nao_residenciais = gr.DataFrame(label="Estacionamentos Não Residenciais")
with gr.TabItem("Terrenos 1ª DF", visible=True, id="t1df_faixa") as tab_terrenos_1df:
output_terrenos_1df = gr.DataFrame(label="Terrenos 1ª Divisão Fiscal")
with gr.TabItem("Terrenos 2ª DF", visible=True, id="t2df_faixa") as tab_terrenos_2df:
output_terrenos_2df = gr.DataFrame(label="Terrenos 2ª Divisão Fiscal")
with gr.TabItem("Terrenos 3ª DF", visible=True, id="t3df_faixa") as tab_terrenos_3df:
output_terrenos_3df = gr.DataFrame(label="Terrenos 3ª Divisão Fiscal")
with gr.TabItem("Terrenos Aliq. Especial", visible=True, id="tae_faixa") as tab_terrenos_esp:
output_terrenos_esp = gr.DataFrame(label="Terrenos Alíquota Especial")
with gr.TabItem("Terrenos Aliq. Fixa", visible=True, id="taf_faixa") as tab_terrenos_fix:
output_terrenos_fix = gr.DataFrame(label="Terrenos Alíquota Fixa")
with gr.TabItem("Aposentados/Deficientes", visible=True, id="apo_faixa") as tab_aposentados:
output_aposentados = gr.DataFrame(label="Predial - Aposentados/Deficientes")
with gr.TabItem("Habitações Populares", visible=True, id="hab_faixa") as tab_habitacoes:
output_habitacoes = gr.DataFrame(label="Predial - Habitações Populares")
with gr.Group(visible=True) as detailed_vv_box:
gr.Markdown("### 💲 Resumo Detalhado por Valor Venal")
with gr.Row(variant="panel"):
value_range_interval_input = gr.Number(label="Intervalo da Faixa de Valor Venal (R$)", value=300000, scale=2)
aggregate_from_value_input = gr.Number(label="Agregar a Partir de Valor (R$)", value=5000000, scale=2)
update_vv_button = gr.Button("Atualizar Visão", scale=1, min_width=100)
selected_vv_tab_id = "pr_vv" if INITIAL_DIFFERENTIATE_BUILDINGS else "pg_vv"
with gr.Tabs(elem_id="vv_tabs_container", selected=selected_vv_tab_id) as value_range_tabs:
with gr.TabItem("Predial Geral VV", visible=not INITIAL_DIFFERENTIATE_BUILDINGS, id="pg_vv") as tab_predial_geral_vv:
output_predial_geral_vv = gr.DataFrame(label="Predial Sem Diferenciação - Por Valor")
with gr.TabItem("Predial Residencial VV", visible=INITIAL_DIFFERENTIATE_BUILDINGS, id="pr_vv") as tab_predial_residencial_vv:
output_predial_residencial_vv = gr.DataFrame(label="Predial Residencial - Por Valor")
with gr.TabItem("Predial Não Residencial VV", visible=INITIAL_DIFFERENTIATE_BUILDINGS, id="pnr_vv") as tab_predial_nao_residencial_vv:
output_predial_nao_residencial_vv = gr.DataFrame(label="Predial Não Residencial - Por Valor")
with gr.TabItem("Estacionamentos Geral VV", visible=not INITIAL_DIFFERENTIATE_PARKING, id="eg_vv") as tab_estacionamentos_geral_vv:
output_estacionamentos_geral_vv = gr.DataFrame(label="Estacionamentos Sem Diferenciação - Por Valor")
with gr.TabItem("Estacionamentos Residenciais VV", visible=INITIAL_DIFFERENTIATE_PARKING, id="er_vv") as tab_estacionamentos_residenciais_vv:
output_estacionamentos_residenciais_vv = gr.DataFrame(label="Estacionamentos Residenciais - Por Valor")
with gr.TabItem("Estacionamentos Não Residenciais VV", visible=INITIAL_DIFFERENTIATE_PARKING, id="enr_vv") as tab_estacionamentos_nao_residenciais_vv:
output_estacionamentos_nao_residenciais_vv = gr.DataFrame(label="Estacionamentos Não Residenciais - Por Valor")
with gr.TabItem("Terrenos 1ª DF VV", visible=True, id="t1df_vv") as tab_terrenos_1df_vv:
output_terrenos_1df_vv = gr.DataFrame(label="Terrenos 1ª DF - Por Valor")
with gr.TabItem("Terrenos 2ª DF VV", visible=True, id="t2df_vv") as tab_terrenos_2df_vv:
output_terrenos_2df_vv = gr.DataFrame(label="Terrenos 2ª DF - Por Valor")
with gr.TabItem("Terrenos 3ª DF VV", visible=True, id="t3df_vv") as tab_terrenos_3df_vv:
output_terrenos_3df_vv = gr.DataFrame(label="Terrenos 3ª DF - Por Valor")
with gr.TabItem("Terrenos Aliq. Especial VV", visible=True, id="tae_vv") as tab_terrenos_esp_vv:
output_terrenos_esp_vv = gr.DataFrame(label="Terrenos Alíquota Especial - Por Valor")
with gr.TabItem("Terrenos Aliq. Fixa VV", visible=True, id="taf_vv") as tab_terrenos_fix_vv:
output_terrenos_fix_vv = gr.DataFrame(label="Terrenos Alíquota Fixa - Por Valor")
with gr.TabItem("Aposentados/Deficientes VV", visible=True, id="apo_vv") as tab_aposentados_vv:
output_aposentados_vv = gr.DataFrame(label="Predial - Aposentados/Deficientes - Por Valor")
with gr.TabItem("Habitações Populares VV", visible=True, id="hab_vv") as tab_habitacoes_vv:
output_habitacoes_vv = gr.DataFrame(label="Predial - Habitações Populares - Por Valor")
# --- NOVA VIEW: ANÁLISE DE VARIAÇÃO ---
with gr.Group(visible=True) as variation_analysis_box:
gr.Markdown("### ⚖️ Análise de Variação do IPTU")
gr.Markdown("**💡 Dica:** Para ver apenas quantos % subiram vs. desceram, deixe ambos os campos 'Limite' e 'Intervalo' em zero.")
with gr.Row(variant="panel"):
variation_limit_input = gr.Number(label="Limite Percentual (+/-)", value=50, scale=1)
variation_interval_input = gr.Number(label="Intervalo de Agrupamento (%)", value=25, scale=1)
variation_abrangencia_selector = gr.Dropdown(
label="Abrangência",
choices=["Todos os imóveis", "Apenas imóveis da mancha"],
value="Todos os imóveis",
scale=1
)
variation_year_selector = gr.Dropdown(label="Analisar Ano/Projeção", scale=2)
variation_generate_btn = gr.Button("Gerar Análise de Variação", scale=1)
variation_validation_error = gr.Textbox(label="⚠️ Erro", visible=False, interactive=False)
selected_var_tab_id = "totais_var"
with gr.Tabs(elem_id="var_tabs_container", selected=selected_var_tab_id) as variation_tabs:
with gr.TabItem("Totais Gerais", visible=True, id="totais_var") as tab_totais_gerais_var:
output_totais_gerais_var = gr.DataFrame(label="Variação - Totais Gerais")
with gr.TabItem("Predial Geral", visible=not INITIAL_DIFFERENTIATE_BUILDINGS, id="pg_var") as tab_predial_geral_var:
output_predial_geral_var = gr.DataFrame(label="Variação - Predial Sem Diferenciação")
with gr.TabItem("Predial Residencial", visible=INITIAL_DIFFERENTIATE_BUILDINGS, id="pr_var") as tab_predial_residencial_var:
output_predial_residencial_var = gr.DataFrame(label="Variação - Predial Residencial")
with gr.TabItem("Predial Não Residencial", visible=INITIAL_DIFFERENTIATE_BUILDINGS, id="pnr_var") as tab_predial_nao_residencial_var:
output_predial_nao_residencial_var = gr.DataFrame(label="Variação - Predial Não Residencial")
with gr.TabItem("Estacionamentos Geral", visible=not INITIAL_DIFFERENTIATE_PARKING, id="eg_var") as tab_estacionamentos_geral_var:
output_estacionamentos_geral_var = gr.DataFrame(label="Variação - Estacionamentos Sem Diferenciação")
with gr.TabItem("Estacionamentos Residenciais", visible=INITIAL_DIFFERENTIATE_PARKING, id="er_var") as tab_estacionamentos_residenciais_var:
output_estacionamentos_residenciais_var = gr.DataFrame(label="Variação - Estacionamentos Residenciais")
with gr.TabItem("Estacionamentos Não Residenciais", visible=INITIAL_DIFFERENTIATE_PARKING, id="enr_var") as tab_estacionamentos_nao_residenciais_var:
output_estacionamentos_nao_residenciais_var = gr.DataFrame(label="Variação - Estacionamentos Não Residenciais")
with gr.TabItem("Terrenos 1ª DF", visible=True, id="t1df_var") as tab_terrenos_1df_var:
output_terrenos_1df_var = gr.DataFrame(label="Variação - Terrenos 1ª Divisão Fiscal")
with gr.TabItem("Terrenos 2ª DF", visible=True, id="t2df_var") as tab_terrenos_2df_var:
output_terrenos_2df_var = gr.DataFrame(label="Variação - Terrenos 2ª Divisão Fiscal")
with gr.TabItem("Terrenos 3ª DF", visible=True, id="t3df_var") as tab_terrenos_3df_var:
output_terrenos_3df_var = gr.DataFrame(label="Variação - Terrenos 3ª Divisão Fiscal")
with gr.TabItem("Terrenos Aliq. Especial", visible=True, id="tae_var") as tab_terrenos_esp_var:
output_terrenos_esp_var = gr.DataFrame(label="Variação - Terrenos Alíquota Especial")
with gr.TabItem("Terrenos Aliq. Fixa", visible=True, id="taf_var") as tab_terrenos_fix_var:
output_terrenos_fix_var = gr.DataFrame(label="Variação - Terrenos Alíquota Fixa")
with gr.TabItem("Aposentados/Deficientes", visible=True, id="apo_var") as tab_aposentados_var:
output_aposentados_var = gr.DataFrame(label="Variação - Predial - Aposentados/Deficientes")
with gr.TabItem("Habitações Populares", visible=True, id="hab_var") as tab_habitacoes_var:
output_habitacoes_var = gr.DataFrame(label="Variação - Predial - Habitações Populares")
with gr.Group(visible=True) as totals_box:
gr.Markdown("### 🔢 Totais Gerais Calculados")
total_novo_text, total_atual_trib_text, total_atual_calc_text = gr.Textbox(interactive=False), gr.Textbox(interactive=False), gr.Textbox(interactive=False)
with gr.Group(visible=True) as flood_analysis_box:
gr.Markdown("### 🌊 Análise dos Imóveis Atingidos pela Enchente")
with gr.Tabs(elem_id="flood_analysis_tabs_container") as flood_analysis_tabs:
with gr.TabItem("Visão Agregada", visible=True, id="flood_general") as tab_flood_general:
output_enchente_analysis = gr.DataFrame(label="Análise Comparativa Total para Imóveis Atingidos")
with gr.TabItem("Predial Geral", visible=not INITIAL_DIFFERENTIATE_BUILDINGS, id="flood_pred_geral") as tab_flood_pred_geral:
output_enchente_analysis_pred_geral = gr.DataFrame(label="Análise Comparativa para Imóveis Prediais (Geral) Atingidos")
with gr.TabItem("Predial Residencial", visible=INITIAL_DIFFERENTIATE_BUILDINGS, id="flood_pred_res") as tab_flood_pred_res:
output_enchente_analysis_res = gr.DataFrame(label="Análise Comparativa para Imóveis Prediais Residenciais Atingidos")
with gr.TabItem("Predial Não Residencial", visible=INITIAL_DIFFERENTIATE_BUILDINGS, id="flood_pred_non_res") as tab_flood_pred_non_res:
output_enchente_analysis_non_res = gr.DataFrame(label="Análise Comparativa para Imóveis Prediais Não Residenciais Atingidos")
with gr.TabItem("Estacionamentos Geral", visible=not INITIAL_DIFFERENTIATE_PARKING, id="flood_est_geral") as tab_flood_est_geral:
output_enchente_analysis_est_geral = gr.DataFrame(label="Análise Comparativa para Estacionamentos (Geral) Atingidos")
with gr.TabItem("Estacionamento Residencial", visible=INITIAL_DIFFERENTIATE_PARKING, id="flood_est_res") as tab_flood_est_res:
output_enchente_analysis_est_res = gr.DataFrame(label="Análise Comparativa para Estacionamentos Residenciais Atingidos")
with gr.TabItem("Estacionamento Não Residencial", visible=INITIAL_DIFFERENTIATE_PARKING, id="flood_est_non_res") as tab_flood_est_non_res:
output_enchente_analysis_est_non_res = gr.DataFrame(label="Análise Comparativa para Estacionamentos Não Residenciais Atingidos")
MAX_PLOTS = 30
with gr.Group(visible=True) as excel_download_box:
gr.Markdown("### 📥 Download Resultados")
filename_display, download_btn = gr.Textbox(label="Arquivo gerado", interactive=False, visible=False), gr.DownloadButton(label="Download", visible=False)
with gr.Group(visible=True) as flood_graphs_box:
gr.Markdown("### 📈🌊 Análise Gráfica - Imóveis atingidos pela enchente")
flood_plots_outputs = [gr.Plot(visible=False) for _ in range(MAX_PLOTS)]
# --- Lógica de Eventos da UI ---
segregar_trava_check.change(fn=lambda is_checked: [gr.update(visible=not is_checked), gr.update(visible=is_checked)], inputs=[segregar_trava_check], outputs=[universal_rules_accordion, category_rules_accordion])
for i in range(MAX_YEARS):
factor_input = universal_ui_outputs[i * 3 + 2]
factor_input.change(fn=update_rule_factor, inputs=[gr.Number(value=i, visible=False), factor_input, universal_transition_rules_state], outputs=[universal_transition_rules_state], queue=False)
universal_add_year_btn.click(fn=add_year_rule, inputs=[universal_transition_rules_state], outputs=[universal_transition_rules_state]).then(fn=update_transition_ui, inputs=[universal_transition_rules_state], outputs=universal_ui_outputs)
universal_remove_year_btn.click(fn=remove_year_rule, inputs=[universal_transition_rules_state], outputs=[universal_transition_rules_state]).then(fn=update_transition_ui, inputs=[universal_transition_rules_state], outputs=universal_ui_outputs)
category_selector.change(fn=update_category_rules_ui, inputs=[category_selector, transition_rules_by_category_state], outputs=category_ui_outputs)
for i in range(MAX_YEARS):
factor_input = category_ui_outputs[i * 3 + 2]
factor_input.change(fn=update_category_rule_factor, inputs=[gr.Number(value=i, visible=False), factor_input, category_selector, transition_rules_by_category_state], outputs=[transition_rules_by_category_state], queue=False)
category_add_year_btn.click(fn=add_year_to_category, inputs=[category_selector, transition_rules_by_category_state], outputs=[transition_rules_by_category_state]).then(fn=lambda cat, state: update_transition_ui(state.get(cat, [])), inputs=[category_selector, transition_rules_by_category_state], outputs=category_ui_outputs)
category_remove_year_btn.click(fn=remove_year_from_category, inputs=[category_selector, transition_rules_by_category_state], outputs=[transition_rules_by_category_state]).then(fn=lambda cat, state: update_transition_ui(state.get(cat, [])), inputs=[category_selector, transition_rules_by_category_state], outputs=category_ui_outputs)
copy_btn.click(fn=copy_rules_to_category, inputs=[copy_source, copy_target, transition_rules_by_category_state], outputs=[transition_rules_by_category_state]).then(fn=lambda cat, state: update_transition_ui(state.get(cat, [])), inputs=[copy_target, transition_rules_by_category_state], outputs=category_ui_outputs)
# Lógica de visibilidade dos Acordions e Abas
diferencia_estacionamentos_check.change(fn=update_parking_visibility, inputs=diferencia_estacionamentos_check, outputs=[eg_accordion, er_accordion, enr_accordion])
diferencia_predios_check.change(fn=update_building_visibility, inputs=diferencia_predios_check, outputs=[pg_accordion, pr_accordion, pnr_accordion])
detailed_tab_refs = [tab_predial_geral, tab_predial_residencial, tab_predial_nao_residencial, tab_estacionamentos_geral, tab_estacionamentos_residenciais, tab_estacionamentos_nao_residenciais, tab_terrenos_1df, tab_terrenos_2df, tab_terrenos_3df, tab_terrenos_esp, tab_terrenos_fix, tab_aposentados, tab_habitacoes]
diferencia_predios_check.change(fn=update_detailed_tabs_visibility, inputs=[diferencia_predios_check, diferencia_estacionamentos_check], outputs=detailed_tab_refs)
diferencia_estacionamentos_check.change(fn=update_detailed_tabs_visibility, inputs=[diferencia_predios_check, diferencia_estacionamentos_check], outputs=detailed_tab_refs)
value_range_tab_refs = [tab_predial_geral_vv, tab_predial_residencial_vv, tab_predial_nao_residencial_vv, tab_estacionamentos_geral_vv, tab_estacionamentos_residenciais_vv, tab_estacionamentos_nao_residenciais_vv, tab_terrenos_1df_vv, tab_terrenos_2df_vv, tab_terrenos_3df_vv, tab_terrenos_esp_vv, tab_terrenos_fix_vv, tab_aposentados_vv, tab_habitacoes_vv]
diferencia_predios_check.change(fn=update_detailed_vv_tabs_visibility, inputs=[diferencia_predios_check, diferencia_estacionamentos_check], outputs=value_range_tab_refs)
diferencia_estacionamentos_check.change(fn=update_detailed_vv_tabs_visibility, inputs=[diferencia_predios_check, diferencia_estacionamentos_check], outputs=value_range_tab_refs)
# Nova Lógica de Visibilidade para Análise de Variação
variation_tab_refs = [tab_predial_geral_var, tab_predial_residencial_var, tab_predial_nao_residencial_var, tab_estacionamentos_geral_var, tab_estacionamentos_residenciais_var, tab_estacionamentos_nao_residenciais_var, tab_terrenos_1df_var, tab_terrenos_2df_var, tab_terrenos_3df_var, tab_terrenos_esp_var, tab_terrenos_fix_var, tab_aposentados_var, tab_habitacoes_var]
diferencia_predios_check.change(fn=update_detailed_tabs_visibility, inputs=[diferencia_predios_check, diferencia_estacionamentos_check], outputs=variation_tab_refs)
diferencia_estacionamentos_check.change(fn=update_detailed_tabs_visibility, inputs=[diferencia_predios_check, diferencia_estacionamentos_check], outputs=variation_tab_refs)
flood_analysis_tab_refs = [tab_flood_general, tab_flood_pred_geral, tab_flood_pred_res, tab_flood_pred_non_res, tab_flood_est_geral, tab_flood_est_res, tab_flood_est_non_res]
def update_flood_analysis_visibility(diferencia_predios, diferencia_estacionamentos):
return (gr.update(visible=True), gr.update(visible=not diferencia_predios), gr.update(visible=diferencia_predios), gr.update(visible=diferencia_predios), gr.update(visible=not diferencia_estacionamentos), gr.update(visible=diferencia_estacionamentos), gr.update(visible=diferencia_estacionamentos))
diferencia_predios_check.change(fn=update_flood_analysis_visibility, inputs=[diferencia_predios_check, diferencia_estacionamentos_check], outputs=flood_analysis_tab_refs)
diferencia_estacionamentos_check.change(fn=update_flood_analysis_visibility, inputs=[diferencia_predios_check, diferencia_estacionamentos_check], outputs=flood_analysis_tab_refs)
# LÓGICA PARA SELEÇÃO DE ABAS
diferencia_predios_check.change(fn=update_selected_tabs, inputs=[diferencia_predios_check], outputs=[detailed_tabs, value_range_tabs, variation_tabs], queue=False)
output_agg_year_selector.change(fn=get_agg_df_for_year, inputs=[output_agg_year_selector, all_yearly_aggregates_state], outputs=[output_agg_df])
# --- LÓGICA DE VALIDAÇÃO ---
limit_inputs, num_bands_dropdowns = [], []
num_inputs_per_cat = 1 + (MAX_BANDS - 1) * 2 + 1
for i in range(9):
cat_inputs = all_rate_inputs[i * num_inputs_per_cat : (i + 1) * num_inputs_per_cat]
num_bands_dropdowns.append(cat_inputs[0])
for j in range(MAX_BANDS - 1): limit_inputs.append(cat_inputs[1 + j * 2])
validation_triggers = limit_inputs + num_bands_dropdowns + [diferencia_estacionamentos_check, diferencia_predios_check]
validation_inputs = [diferencia_estacionamentos_check, diferencia_predios_check] + all_rate_inputs
for trigger_component in validation_triggers:
trigger_component.change(fn=validate_band_limits, inputs=validation_inputs, outputs=[validation_error_box, process_button], queue=False)
click_inputs = [ufm_input, diferencia_estacionamentos_check, diferencia_predios_check, trava_aumento_perc_input, segregar_piso_check, segregar_trava_check, value_range_interval_input, aggregate_from_value_input, show_projection_check, show_aggregate_check, show_detailed_bands_check, show_detailed_vv_check, show_totals_check, show_flood_analysis_check, show_flood_graphs_check, show_excel_check, universal_transition_rules_state, transition_rules_by_category_state, desconto_2026_input, desconto_2027_input, desconto_2028_input, desconto_2029_input] + all_rate_inputs
detailed_tab_outputs = [output_predial_geral, output_predial_residencial, output_predial_nao_residencial, output_estacionamentos_geral, output_estacionamentos_residenciais, output_estacionamentos_nao_residenciais, output_terrenos_1df, output_terrenos_2df, output_terrenos_3df, output_terrenos_esp, output_terrenos_fix, output_aposentados, output_habitacoes]
value_range_outputs = [output_predial_geral_vv, output_predial_residencial_vv, output_predial_nao_residencial_vv, output_estacionamentos_geral_vv, output_estacionamentos_residenciais_vv, output_estacionamentos_nao_residenciais_vv, output_terrenos_1df_vv, output_terrenos_2df_vv, output_terrenos_3df_vv, output_terrenos_esp_vv, output_terrenos_fix_vv, output_aposentados_vv, output_habitacoes_vv]
visibility_outputs = [projection_box, aggregate_summary_box, detailed_bands_box, detailed_vv_box, totals_box, flood_analysis_box, excel_download_box, flood_graphs_box]
outputs_for_upload = [upload_status_output, ufm_input, diferencia_estacionamentos_check, diferencia_predios_check, segregar_trava_check, trava_aumento_perc_input, universal_transition_rules_state, transition_rules_by_category_state, desconto_2026_input, desconto_2027_input, desconto_2028_input, desconto_2029_input] + all_rate_inputs
file_upload_input.upload(fn=load_scenario_from_excel, inputs=[file_upload_input], outputs=outputs_for_upload).then(fn=update_transition_ui, inputs=[universal_transition_rules_state], outputs=universal_ui_outputs).then(fn=lambda cat, state: update_transition_ui(state.get(cat, [])), inputs=[category_selector, transition_rules_by_category_state], outputs=category_ui_outputs)
process_button.click(
fn=process_iptu_data, inputs=click_inputs,
outputs=[
status_message, output_df_projection, total_novo_text, total_atual_trib_text, total_atual_calc_text,
output_enchente_analysis, output_enchente_analysis_pred_geral, output_enchente_analysis_res, output_enchente_analysis_non_res,
output_enchente_analysis_est_geral, output_enchente_analysis_est_res, output_enchente_analysis_est_non_res,
all_yearly_aggregates_state, output_agg_year_selector, output_agg_df,
filename_display, download_btn
] + detailed_tab_outputs + value_range_outputs + flood_plots_outputs +
[df_resumo_final_state, the_merge_state, iptu_rules_state, anos_projecao_state, detailed_by_value_ranges_raw_state] +
visibility_outputs + [variation_year_selector]
)
update_vv_button.click(
fn=update_valor_venal_view,
inputs=[value_range_interval_input, aggregate_from_value_input, df_resumo_final_state, the_merge_state, iptu_rules_state, anos_projecao_state, diferencia_predios_check, diferencia_estacionamentos_check],
outputs=value_range_outputs
)
# Lista de outputs atualizada (14 abas)
variation_df_outputs = [
output_totais_gerais_var, # NOVA: Primeira aba - Totais Gerais
output_predial_residencial_var, # Aba 2 - Predial Residencial
output_predial_nao_residencial_var, # Aba 3 - Predial Não Residencial
output_predial_geral_var, # Aba 4 - Predial Geral
output_estacionamentos_geral_var, # Aba 5 - Estacionamentos Geral
output_estacionamentos_residenciais_var, # Aba 6 - Estacionamentos Residenciais
output_estacionamentos_nao_residenciais_var, # Aba 7 - Estacionamentos Não Residenciais
output_terrenos_1df_var, # Aba 8 - Terrenos 1ª DF
output_terrenos_2df_var, # Aba 9 - Terrenos 2ª DF
output_terrenos_3df_var, # Aba 10 - Terrenos 3ª DF
output_terrenos_esp_var, # Aba 11 - Terrenos Alíquota Especial
output_terrenos_fix_var, # Aba 12 - Terrenos Alíquota Fixa
output_aposentados_var, # Aba 13 - Aposentados/Deficientes
output_habitacoes_var # Aba 14 - Habitações Populares
]
# Total de outputs: 1 (erro) + 14 (abas) + 1 (state) = 16 outputs
variation_outputs = [variation_validation_error] + variation_df_outputs + [variation_analysis_raw_state]
# Chamada atualizada do botão
variation_generate_btn.click(
fn=generate_variation_analysis_view,
inputs=[variation_limit_input, variation_interval_input, variation_year_selector, variation_abrangencia_selector, the_merge_state, iptu_rules_state, diferencia_predios_check, diferencia_estacionamentos_check],
outputs=variation_outputs
)
if __name__ == "__main__":
demo.launch()