Spaces:
Runtime error
Runtime error
| import pandas as pd | |
| import numpy as np | |
| import gradio as gr | |
| import os | |
| import io | |
| import tempfile | |
| import pytz | |
| from datetime import datetime | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| # Path to the CSV file | |
| CSV_FILE_PATH = "./dados/input_app_20250731.csv" | |
| MAX_YEARS = 15 # Máximo de anos para a regra de transição | |
| MAX_BANDS = 8 # Máximo de faixas de alíquota permitidas | |
| # --- NEW HELPER FUNCTION --- | |
| def calculate_column_widths( | |
| df: pd.DataFrame, min_width=75, char_multiplier=9, padding=2 | |
| ): | |
| """ | |
| Calculates optimal column widths for a Gradio DataFrame to fit content. | |
| """ | |
| if df is None or df.empty: | |
| return None | |
| widths = [] | |
| for col in df.columns: | |
| header_len = len(str(col)) | |
| if not df[col].empty: | |
| max_len_data = df[col].astype(str).str.len().max() | |
| if pd.notna(max_len_data): | |
| max_len = max(header_len, int(max_len_data)) | |
| else: | |
| max_len = header_len | |
| else: | |
| max_len = header_len | |
| calculated_width = (max_len + padding) * char_multiplier | |
| widths.append(max(min_width, calculated_width)) | |
| return widths | |
| # --- Otimização 1: Cache de Dados --- | |
| def load_and_prepare_data(): | |
| """ | |
| Carrega o CSV e pré-calcula todos os filtros booleanos que não dependem | |
| dos inputs do usuário. Esta função deve ser chamada apenas uma vez. | |
| """ | |
| if not os.path.exists(CSV_FILE_PATH): | |
| raise FileNotFoundError(f"Arquivo de dados não encontrado em: {CSV_FILE_PATH}") | |
| df = pd.read_csv(CSV_FILE_PATH, low_memory=False) | |
| df.columns = df.columns.str.strip() | |
| required_cols = [ | |
| "TIPO_LANCAMENTO", | |
| "DES_TIPO_ISENCAO_MULTI", | |
| "VLR_IMPOSTO_iptu", | |
| "VLR_VENAL_IMOVEL_TRIB", | |
| "DES_FINALIDADE_unidade", | |
| "DES_USO_unidade", | |
| "IDF_TIPO_BENEFICIO", | |
| "NUM_DIVISAO_FISCAL_iptu", | |
| "VALOR_VENAL", | |
| "VLR_IMPOSTO_CALCULADO_iptu", | |
| "SITUACAO_ALAGAMENTO", | |
| "PERC_ATING", | |
| "ANO_FIM", | |
| "ANO_INICIO", | |
| ] | |
| missing_cols = [col for col in required_cols if col not in df.columns] | |
| if missing_cols: | |
| raise ValueError(f"Colunas ausentes no CSV: {', '.join(missing_cols)}") | |
| for col_str in [ | |
| "TIPO_LANCAMENTO", | |
| "DES_TIPO_ISENCAO_MULTI", | |
| "DES_FINALIDADE_unidade", | |
| "DES_USO_unidade", | |
| "SITUACAO_ALAGAMENTO", | |
| ]: | |
| if col_str in df.columns: | |
| df[col_str] = df[col_str].astype(str).fillna("") | |
| for col_num in [ | |
| "PERC_ATING", | |
| "VALOR_VENAL", | |
| "VLR_VENAL_IMOVEL_TRIB", | |
| "VLR_IMPOSTO_iptu", | |
| "VLR_IMPOSTO_CALCULADO_iptu", | |
| "ANO_FIM", | |
| "ANO_INICIO", | |
| "IDF_TIPO_BENEFICIO", | |
| "NUM_DIVISAO_FISCAL_iptu", | |
| ]: | |
| df[col_num] = pd.to_numeric(df[col_num], errors="coerce").fillna(0) | |
| df.rename(columns={"VALOR_VENAL": "valor_venal_calculo"}, inplace=True) | |
| filters = {} | |
| finalidades_estacionamentos = [ | |
| "ESPACO DE ESTACIONAMENTO RESIDENCIAL", | |
| "ESPACO DE ESTACIONAMENTO NAO RESIDENCIAL", | |
| "ESPACO ESTACIONAMENTO VINCULADO NAO RESID DESCOBERTO", | |
| "ESPACO ESTACIONAMENTO VINCULADO RESID COBERTO", | |
| "ESPACO ESTACIONAMENTO VINCULADO NAO RESID COBERTO", | |
| "ESPACO DE ESTACIONAMENTO RESIDENCIAL DESCOBERTO", | |
| "ESPACO DE ESTACIONAMENTO NAO RESIDENC DESCOBERTO", | |
| "ESPACO ESTACIONAMENTO VINCULADO RESID DESCOBERTO", | |
| ] | |
| finalidades_terrenos = [ | |
| "TERRENO", | |
| "GLEBA", | |
| "TERRENOS CONDOMINIO HORIZ ABERTO SEM AREA USO COMUM", | |
| "TERRENO EM CONDOMINIO HORIZONTAL FECHADO", | |
| "TERRENO EM CONDOMINIO HORIZONTAL ABERTO", | |
| "SOBRA DE AREA", | |
| "AREA A VISTORIAR", | |
| ] | |
| sem_lancamento = ( | |
| df["TIPO_LANCAMENTO"].isin( | |
| [ | |
| "Isenção Total por Benefício Fiscal", | |
| "Imóvel com bloqueio de lançamento de IPTU/TCL", | |
| "Não lançado - Valor total calculado menor que o mínimo", | |
| "Isenção TCL Box e Não Lançado - valor menor que Mínimo", | |
| "Isenção Técnica", | |
| ] | |
| ) | |
| ) | ( | |
| (df["TIPO_LANCAMENTO"] == "Isenção TCL - Box") | |
| & (df["VLR_IMPOSTO_iptu"] == 0) | |
| & (df["ANO_FIM"] >= 2026) | |
| ) | |
| isentos = df["DES_TIPO_ISENCAO_MULTI"].str.contains("isen", case=False, na=False) | |
| imunes = ( | |
| df["DES_TIPO_ISENCAO_MULTI"].str.contains("imun", case=False, na=False) | |
| ) | (df["DES_TIPO_ISENCAO_MULTI"].str.contains("IMUNIDADES", na=False)) | |
| nao_incidencia = df["DES_TIPO_ISENCAO_MULTI"].str.contains( | |
| "NÃO INCIDÊNCIA DE IPTU", case=False, na=False | |
| ) | |
| filters["nao_tributados"] = ( | |
| (isentos) | (imunes) | (sem_lancamento) | (nao_incidencia) | |
| ) | |
| filters["isen_prop_loc"] = ( | |
| ( | |
| df["DES_TIPO_ISENCAO_MULTI"] | |
| == "ISENÇÃO - PROPRIETARIO/USUFRUTUARIO APOSENTADO, INATIVO, PENSIONISTA" | |
| ) | |
| | ( | |
| df["DES_TIPO_ISENCAO_MULTI"] | |
| == "ISENÇÃO - LOCATÁRIO/COMODATARIO/ARRENDATÁRIO APOSENTADO, INATIVO, PENSIONISTA" | |
| ) | |
| | ( | |
| df["DES_TIPO_ISENCAO_MULTI"] | |
| == "ISENÇÃO - PROPRIETARIO/USUFRUTUARIO DEFICIENTE" | |
| ) | |
| ) | |
| filters["isen_hab_pop"] = ( | |
| df["DES_TIPO_ISENCAO_MULTI"] | |
| == "ISENÇÃO - HABITAÇÕES POPULARES DE EMPREENDIMENTOS HABITACIONAIS DESTINADOS PARA HABITAÇÃO DE INTERESSE SOCIAL" | |
| ) | ( | |
| df["DES_TIPO_ISENCAO_MULTI"] | |
| == "ISENÇÃO - HABITAÇÕES POPULARES ORIUNDAS DE REGULARIZAÇÕES FUNDIÁRIAS PROMOVIDAS POR ÓRGÃOS PÚBLICOS" | |
| ) | |
| estacionamentos_residenciais = ( | |
| df["DES_FINALIDADE_unidade"].isin(finalidades_estacionamentos) | |
| ) & (df["DES_USO_unidade"] == "Exclusivamente Residencial") | |
| filters["estacionamentos_residenciais"] = estacionamentos_residenciais | |
| filters["estacionamentos_residenciais_tributados"] = ( | |
| estacionamentos_residenciais & (~filters["nao_tributados"]) | |
| ) | |
| filters["estacionamentos_residenciais_nao_tributados"] = ( | |
| estacionamentos_residenciais & (filters["nao_tributados"]) | |
| ) | |
| estacionamentos_nao_residenciais = ( | |
| df["DES_FINALIDADE_unidade"].isin(finalidades_estacionamentos) | |
| ) & (df["DES_USO_unidade"] != "Exclusivamente Residencial") | |
| filters["estacionamentos_nao_residenciais"] = estacionamentos_nao_residenciais | |
| filters["estacionamentos_nao_residenciais_tributados"] = ( | |
| estacionamentos_nao_residenciais & (~filters["nao_tributados"]) | |
| ) | |
| filters["estacionamentos_nao_residenciais_nao_tributados"] = ( | |
| estacionamentos_nao_residenciais & (filters["nao_tributados"]) | |
| ) | |
| estacionamentos_geral = df["DES_FINALIDADE_unidade"].isin( | |
| finalidades_estacionamentos | |
| ) | |
| filters["estacionamentos_geral"] = estacionamentos_geral | |
| filters["estacionamentos_geral_tributados"] = estacionamentos_geral & ( | |
| ~filters["nao_tributados"] | |
| ) | |
| filters["estacionamentos_geral_nao_tributados"] = ( | |
| estacionamentos_geral & (filters["nao_tributados"]) | |
| ) | |
| regra_tae_base = ( | |
| (df["DES_FINALIDADE_unidade"].isin(finalidades_terrenos)) | |
| & (df["IDF_TIPO_BENEFICIO"] == 301) | |
| & ((df["ANO_INICIO"] >= 2023) | (df["ANO_FIM"] >= 2026) | (df["ANO_FIM"] == 0)) | |
| ) | |
| filters["terrenos_aliqesp"] = regra_tae_base & (~filters["nao_tributados"]) | |
| regra_taf_base = (df["DES_FINALIDADE_unidade"].isin(finalidades_terrenos)) & ( | |
| df["IDF_TIPO_BENEFICIO"] == 95 | |
| ) | |
| filters["terrenos_aliqfix"] = regra_taf_base & (~filters["nao_tributados"]) | |
| terrenos_1df_base = ( | |
| (df["DES_FINALIDADE_unidade"].isin(finalidades_terrenos)) | |
| & (df["NUM_DIVISAO_FISCAL_iptu"] == 1) | |
| & (~regra_tae_base) | |
| & (~regra_taf_base) | |
| ) | |
| filters["terrenos_1df_base"] = terrenos_1df_base | |
| filters["terrenos_1df_tributados"] = terrenos_1df_base & ( | |
| ~filters["nao_tributados"] | |
| ) | |
| filters["terrenos_1df_nao_tributados"] = ( | |
| terrenos_1df_base & (filters["nao_tributados"]) | |
| ) | |
| terrenos_2df_base = ( | |
| (df["DES_FINALIDADE_unidade"].isin(finalidades_terrenos)) | |
| & (df["NUM_DIVISAO_FISCAL_iptu"] == 2) | |
| & (~regra_tae_base) | |
| & (~regra_taf_base) | |
| ) | |
| filters["terrenos_2df_base"] = terrenos_2df_base | |
| filters["terrenos_2df_tributados"] = terrenos_2df_base & ( | |
| ~filters["nao_tributados"] | |
| ) | |
| filters["terrenos_2df_nao_tributados"] = ( | |
| terrenos_2df_base & (filters["nao_tributados"]) | |
| ) | |
| terrenos_3df_base = ( | |
| (df["DES_FINALIDADE_unidade"].isin(finalidades_terrenos)) | |
| & (df["NUM_DIVISAO_FISCAL_iptu"] == 3) | |
| & (~regra_tae_base) | |
| & (~regra_taf_base) | |
| ) | |
| filters["terrenos_3df_base"] = terrenos_3df_base | |
| filters["terrenos_3df_tributados"] = terrenos_3df_base & ( | |
| ~filters["nao_tributados"] | |
| ) | |
| filters["terrenos_3df_nao_tributados"] = ( | |
| terrenos_3df_base & (filters["nao_tributados"]) | |
| ) | |
| prediais_base = ~df["DES_FINALIDADE_unidade"].isin( | |
| finalidades_estacionamentos + finalidades_terrenos | |
| ) | |
| filters["prediais_base"] = prediais_base | |
| prediais_residenciais = prediais_base & ( | |
| df["DES_USO_unidade"] == "Exclusivamente Residencial" | |
| ) | |
| filters["prediais_residenciais"] = prediais_residenciais | |
| filters["prediais_residenciais_tributados"] = ( | |
| prediais_residenciais | |
| & (~filters["nao_tributados"]) | |
| & (~filters["isen_prop_loc"]) | |
| & (~filters["isen_hab_pop"]) | |
| ) | |
| filters["prediais_residenciais_nao_tributados"] = ( | |
| prediais_residenciais | |
| & (filters["nao_tributados"]) | |
| & (~filters["isen_prop_loc"]) | |
| & (~filters["isen_hab_pop"]) | |
| ) | |
| prediais_nao_residenciais = ( | |
| prediais_base | |
| & (df["DES_USO_unidade"] != "Exclusivamente Residencial") | |
| & (~filters["isen_prop_loc"]) | |
| & (~filters["isen_hab_pop"]) | |
| ) | |
| filters["prediais_nao_residenciais"] = prediais_nao_residenciais | |
| filters["prediais_nao_residenciais_tributados"] = prediais_nao_residenciais & ( | |
| ~filters["nao_tributados"] | |
| ) | |
| filters["prediais_nao_residenciais_nao_tributados"] = ( | |
| prediais_nao_residenciais & (filters["nao_tributados"]) | |
| ) | |
| prediais_geral = ( | |
| prediais_base & (~filters["isen_prop_loc"]) & (~filters["isen_hab_pop"]) | |
| ) | |
| filters["prediais_geral"] = prediais_geral | |
| filters["prediais_geral_tributados"] = prediais_geral & (~filters["nao_tributados"]) | |
| filters["prediais_geral_nao_tributados"] = ( | |
| prediais_geral & (filters["nao_tributados"]) | |
| ) | |
| regra_tae_base = ( | |
| (df["DES_FINALIDADE_unidade"].isin(finalidades_terrenos)) | |
| & (df["IDF_TIPO_BENEFICIO"] == 301) | |
| & ((df["ANO_INICIO"] >= 2023) | (df["ANO_FIM"] >= 2026) | (df["ANO_FIM"] == 0)) | |
| ) | |
| filters["regra_tae_base"] = regra_tae_base | |
| filters["terrenos_aliqesp"] = regra_tae_base & (~filters["nao_tributados"]) | |
| regra_taf_base = (df["DES_FINALIDADE_unidade"].isin(finalidades_terrenos)) & ( | |
| df["IDF_TIPO_BENEFICIO"] == 95 | |
| ) | |
| filters["regra_taf_base"] = regra_taf_base | |
| filters["terrenos_aliqfix"] = regra_taf_base & (~filters["nao_tributados"]) | |
| return df, filters | |
| # --- NOVA FUNÇÃO: Carregar Cenário do Excel --- | |
| def load_scenario_from_excel(file_obj): | |
| if file_obj is None: | |
| num_outputs = 1 + 5 + 2 + 4 + (1 + (MAX_BANDS - 1) * 2 + 1) * 9 + 2 | |
| return [gr.update()] * num_outputs | |
| try: | |
| params_df = pd.read_excel(file_obj.name, sheet_name="Parâmetros Utilizados") | |
| params_df = params_df.set_index(params_df.columns[0]).astype(str) | |
| def get_param(key, default, type_func): | |
| try: | |
| value = params_df.loc[key, "Valor"] | |
| if value == "nan" or pd.isna(value): | |
| return default | |
| return type_func(value) | |
| except (KeyError, ValueError): | |
| return default | |
| def get_percent_param(key, default): | |
| try: | |
| value = params_df.loc[key, "Valor"] | |
| if value == "nan" or pd.isna(value): | |
| return default | |
| # Remove o símbolo % se presente e converte para float | |
| if isinstance(value, str) and value.endswith('%'): | |
| return float(value[:-1]) | |
| return float(value) | |
| except (KeyError, ValueError): | |
| return default | |
| ufm_val = get_param("Valor da UFM", 5.771, float) | |
| diff_est = get_param("Diferenciar Estacionamentos por Uso", "Não", str) == "Sim" | |
| diff_pred = get_param("Diferenciar Prédios por Uso", "Não", str) == "Sim" | |
| is_segregated = ( | |
| "REGRAS DE TRANSIÇÃO (CONFIGURAÇÃO POR CATEGORIA)" in params_df.index | |
| ) | |
| segregar_trava = is_segregated | |
| trava_perc_str = get_param( | |
| "Ativar trava SE aumento for >=", "0.0%", str | |
| ).replace("%", "") | |
| trava_perc = float(trava_perc_str) if trava_perc_str else 0.0 | |
| desconto_26 = get_percent_param( | |
| "Desconto na Alíquota Efetiva (Enchente - Predial) 2026", 0.0 | |
| ) | |
| desconto_27 = get_percent_param( | |
| "Desconto na Alíquota Efetiva (Enchente - Predial) 2027", 0.0 | |
| ) | |
| desconto_28 = get_percent_param( | |
| "Desconto na Alíquota Efetiva (Enchente - Predial) 2028", 0.0 | |
| ) | |
| desconto_29 = get_percent_param( | |
| "Desconto na Alíquota Efetiva (Enchente - Predial) 2029", 0.0 | |
| ) | |
| universal_rules = [] | |
| default_categories = [ | |
| "Predial Geral", | |
| "Predial Residencial", | |
| "Predial Não Residencial", | |
| "Estacionamentos Geral", | |
| "Estacionamentos Residenciais", | |
| "Estacionamentos Não Residenciais", | |
| "Terrenos 1ª DF", | |
| "Terrenos 2ª DF", | |
| "Terrenos 3ª DF", | |
| "Terrenos Alíquota Especial", | |
| "Terrenos Alíquota Fixa", | |
| ] | |
| category_rules = {cat: [] for cat in default_categories} | |
| if is_segregated: | |
| current_category = None | |
| for index, row in params_df.iterrows(): | |
| if str(index).startswith("Categoria:"): | |
| current_category = index.replace("Categoria: ", "").strip() | |
| elif str(index).startswith("Ano ") and current_category: | |
| year = int(index.replace("Ano ", "")) | |
| factor_str = row["Valor"].replace("%", "") | |
| factor = float(factor_str) / 100.0 if factor_str else 1.0 | |
| if current_category in category_rules: | |
| category_rules[current_category].append( | |
| {"year": year, "factor": factor} | |
| ) | |
| elif pd.isna(index): | |
| current_category = None | |
| else: | |
| if "Ano" in params_df.index: | |
| start_index_series = params_df.index.get_loc("Ano") | |
| start_index = ( | |
| start_index_series | |
| if isinstance(start_index_series, int) | |
| else start_index_series[0] | |
| ) | |
| start_index += 1 | |
| for i in range(start_index, len(params_df)): | |
| idx_label = params_df.index[i] | |
| if ( | |
| pd.isna(idx_label) | |
| or not str(idx_label).replace(".0", "").isdigit() | |
| ): | |
| break | |
| year = int(float(idx_label)) | |
| factor_str = params_df.iloc[i]["Valor"].replace("%", "") | |
| factor = float(factor_str) / 100.0 if factor_str else 1.0 | |
| universal_rules.append({"year": year, "factor": factor}) | |
| rate_inputs = [] | |
| prefixes = [ | |
| ("PG", "PREDIAL GERAL (PG)"), | |
| ("PR", "PREDIAL RESIDENCIAL (PR)"), | |
| ("PNR", "PREDIAL NÃO RESIDENCIAL (PNR)"), | |
| ("EG", "ESTACIONAMENTOS GERAL (EG)"), | |
| ("ER", "ESTACIONAMENTOS RESIDENCIAIS (ER)"), | |
| ("ENR", "ESTACIONAMENTOS NÃO RESIDENCIAIS (ENR)"), | |
| ("T1DF", "TERRENOS 1ª DF (T1DF)"), | |
| ("T2DF", "TERRENOS 2ª DF (T2DF)"), | |
| ("T3DF", "TERRENOS 3ª DF (T3DF)"), | |
| ] | |
| for p_code, p_excel in prefixes: | |
| rate_inputs.append( | |
| get_param( | |
| f"{p_code} Número de Faixas", | |
| MAX_BANDS if "T" not in p_code else 2, | |
| int, | |
| ) | |
| ) | |
| for i in range(1, MAX_BANDS): | |
| rate_inputs.append(get_param(f"{p_code} Limite {i} (UFM)", 0, float)) | |
| rate_inputs.append(get_param(f"{p_code} Alíquota {i}", 0, float)) | |
| rate_inputs.append(get_param(f"{p_code} Alíquota {MAX_BANDS}", 0, float)) | |
| rate_inputs.append(get_param("TAE Alíquota 1", 0, float)) | |
| rate_inputs.append(get_param("TAF Alíquota 1", 0, float)) | |
| status_msg = f"Cenário carregado com sucesso do arquivo: {os.path.basename(file_obj.name)}" | |
| return ( | |
| status_msg, | |
| ufm_val, | |
| diff_est, | |
| diff_pred, | |
| segregar_trava, | |
| trava_perc, | |
| universal_rules, | |
| category_rules, | |
| desconto_26, | |
| desconto_27, | |
| desconto_28, | |
| desconto_29, | |
| ) + tuple(rate_inputs) | |
| except Exception as e: | |
| error_msg = f"Erro ao processar o arquivo: {e}. Verifique se o arquivo é válido e foi gerado por este aplicativo." | |
| num_outputs = 1 + 5 + 2 + 4 + (1 + (MAX_BANDS - 1) * 2 + 1) * 9 + 2 | |
| return tuple([error_msg] + [gr.update()] * (num_outputs - 1)) | |
| # --- Chame a função UMA VEZ no escopo global --- | |
| try: | |
| THE_MERGE_GLOBAL, FILTERS_GLOBAL = load_and_prepare_data() | |
| print("Dados carregados e pré-processados com sucesso.") | |
| except (FileNotFoundError, ValueError) as e: | |
| print(f"ERRO CRÍTICO AO INICIAR O APLICATIVO: {e}") | |
| THE_MERGE_GLOBAL, FILTERS_GLOBAL = None, None | |
| # --- Funções auxiliares de UI (MODIFICADAS) --- | |
| def create_configurable_band_ui(prefix, default_num_bands, defaults_lim, defaults_aliq): | |
| components_to_return = [] | |
| dropdown = gr.Dropdown( | |
| choices=list(range(2, MAX_BANDS + 1)), | |
| value=default_num_bands, | |
| label="Número de Faixas", | |
| ) | |
| components_to_return.append(dropdown) | |
| rows = [] | |
| end_inputs_num = [] | |
| end_inputs_inf = [] | |
| end_inputs_interactive = [] | |
| for i in range(MAX_BANDS): | |
| lim_val = defaults_lim[i] if i < len(defaults_lim) else 0 | |
| aliq_val = defaults_aliq[i] if i < len(defaults_aliq) else 0 | |
| prev_lim_val = ( | |
| defaults_lim[i - 1] if i > 0 and (i - 1) < len(defaults_lim) else 0 | |
| ) | |
| with gr.Row( | |
| equal_height=True, variant="compact", visible=(i < default_num_bands) | |
| ) as r: | |
| start = gr.Number( | |
| label="Início do Intervalo", | |
| value=0 if i == 0 else prev_lim_val, | |
| interactive=False, | |
| scale=1, | |
| ) | |
| with gr.Column(scale=1, min_width=50): | |
| is_last_visible_at_start = i == default_num_bands - 1 | |
| end_num = gr.Number( | |
| label="Fim do Intervalo (UFM)", | |
| value=lim_val, | |
| visible=not is_last_visible_at_start, | |
| scale=1, | |
| ) | |
| end_inf = gr.Textbox( | |
| value="Infinito", | |
| label="Fim do Intervalo", | |
| interactive=False, | |
| visible=is_last_visible_at_start, | |
| scale=1, | |
| ) | |
| rate = gr.Number(label=f"Alíquota {i + 1}", value=aliq_val, scale=1) | |
| rows.append(r) | |
| end_inputs_num.append(end_num) | |
| end_inputs_inf.append(end_inf) | |
| end_inputs_interactive.append(end_num) | |
| if i < MAX_BANDS - 1: | |
| components_to_return.extend([end_num, rate]) | |
| else: | |
| components_to_return.append(rate) | |
| if i > 0: | |
| end_inputs_interactive[i - 1].change( | |
| lambda x: x, | |
| inputs=end_inputs_interactive[i - 1], | |
| outputs=start, | |
| queue=False, | |
| ) | |
| def update_visibility_and_swap(num_bands_selected): | |
| row_updates = [] | |
| end_num_updates = [] | |
| end_inf_updates = [] | |
| for i in range(MAX_BANDS): | |
| is_visible = i < num_bands_selected | |
| is_last_visible = i == num_bands_selected - 1 | |
| row_updates.append(gr.update(visible=is_visible)) | |
| end_num_updates.append( | |
| gr.update(visible=is_visible and not is_last_visible) | |
| ) | |
| end_inf_updates.append(gr.update(visible=is_visible and is_last_visible)) | |
| return row_updates + end_num_updates + end_inf_updates | |
| dropdown.change( | |
| fn=update_visibility_and_swap, | |
| inputs=dropdown, | |
| outputs=rows + end_inputs_num + end_inputs_inf, | |
| queue=False, | |
| ) | |
| return components_to_return | |
| # --- Função para criar a aba de parâmetros para o Excel (CORRIGIDA) --- | |
| def create_parameters_sheet( | |
| ufm_value, | |
| diferencia_estacionamentos_flag, | |
| diferencia_predios_flag, | |
| trava_aumento_perc, | |
| segregar_trava_flag, | |
| universal_rules, | |
| category_rules, | |
| descontos_anuais, | |
| all_parameters, | |
| ): | |
| param_data = [] | |
| param_data.append(["CONFIGURAÇÕES GERAIS", "", ""]) | |
| param_data.append( | |
| [ | |
| "Valor Venal Utilizado", | |
| 'Coluna "VALOR_VENAL"', | |
| "Valor fixo do arquivo de entrada", | |
| ] | |
| ) | |
| param_data.append(["Valor da UFM", ufm_value, ""]) | |
| param_data.append( | |
| [ | |
| "Diferenciar Estacionamentos por Uso", | |
| "Sim" if diferencia_estacionamentos_flag else "Não", | |
| "", | |
| ] | |
| ) | |
| param_data.append( | |
| ["Diferenciar Prédios por Uso", "Sim" if diferencia_predios_flag else "Não", ""] | |
| ) | |
| param_data.append(["", "", ""]) | |
| param_data.append(["REGRAS DE TRANSIÇÃO (TRAVA DE AUMENTO)", "", ""]) | |
| param_data.append( | |
| ["Ativar trava SE aumento for >=", f"{trava_aumento_perc:.2f}%", ""] | |
| ) | |
| if segregar_trava_flag: | |
| param_data.append(["Configuração da Trava", "Por Categoria", ""]) | |
| param_data.append(["", "", ""]) | |
| sorted_categories = sorted(category_rules.keys()) | |
| for category in sorted_categories: | |
| rules = category_rules.get(category, []) | |
| if rules: | |
| param_data.append( | |
| [ | |
| f"Categoria: {category}", | |
| "Fator Aplicado sobre o Aumento", | |
| "Fórmula: IPTU_Atual + Fator * (IPTU_Projetado - IPTU_Atual)", | |
| ] | |
| ) | |
| for rule in rules: | |
| param_data.append( | |
| [f"Ano {rule['year']}", f"{rule['factor'] * 100:.1f}%", ""] | |
| ) | |
| param_data.append(["", "", ""]) | |
| else: | |
| param_data.append(["Configuração da Trava", "Universal", ""]) | |
| param_data.append( | |
| [ | |
| "Ano", | |
| "Fator Aplicado sobre o Aumento", | |
| "Fórmula: IPTU_Atual + Fator * (IPTU_Projetado - IPTU_Atual)", | |
| ] | |
| ) | |
| for rule in universal_rules: | |
| param_data.append([rule["year"], f"{rule['factor'] * 100:.1f}%", ""]) | |
| param_data.append(["", "", ""]) | |
| param_data.append( | |
| ["DESCONTO PARA ATINGIDOS PELA ENCHENTE (APENAS PREDIAL)", "", ""] | |
| ) | |
| param_data.append( | |
| [ | |
| "Descrição", | |
| "Desconto Percentual na Alíquota Efetiva", | |
| "O desconto é aplicado sobre o IPTU calculado e então a trava é aplicada.", | |
| ] | |
| ) | |
| for year, discount in descontos_anuais.items(): | |
| param_data.append( | |
| [ | |
| f"Desconto na Alíquota Efetiva (Enchente - Predial) {year}", | |
| f"{discount:.2f}%", | |
| "", | |
| ] | |
| ) | |
| param_data.append(["", "", ""]) | |
| param_map = [ | |
| ("PG", "PREDIAL GERAL (PG)"), | |
| ("PR", "PREDIAL RESIDENCIAL (PR)"), | |
| ("PNR", "PREDIAL NÃO RESIDENCIAL (PNR)"), | |
| ("EG", "ESTACIONAMENTOS GERAL (EG)"), | |
| ("ER", "ESTACIONAMENTOS RESIDENCIAIS (ER)"), | |
| ("ENR", "ESTACIONAMENTOS NÃO RESIDENCIAIS (ENR)"), | |
| ("T1DF", "TERRENOS 1ª DF (T1DF)"), | |
| ("T2DF", "TERRENOS 2ª DF (T2DF)"), | |
| ("T3DF", "TERRENOS 3ª DF (T3DF)"), | |
| ] | |
| num_inputs_per_cat = 1 + (MAX_BANDS - 1) * 2 + 1 | |
| input_ptr = 0 | |
| for p_code, p_excel in param_map: | |
| param_data.append([p_excel, "", ""]) | |
| cat_inputs = all_parameters[input_ptr : input_ptr + num_inputs_per_cat] | |
| if len(cat_inputs) < num_inputs_per_cat: | |
| while len(cat_inputs) < num_inputs_per_cat: | |
| cat_inputs.append(0) | |
| num_bands = cat_inputs[0] if cat_inputs[0] is not None else 2 | |
| rate_inputs = cat_inputs[1:] | |
| param_data.append([f"{p_code} Número de Faixas", num_bands, ""]) | |
| # Processar limites e alíquotas das faixas intermediárias (1 a num_bands-1) | |
| for i in range(num_bands - 1): | |
| limite_idx = i * 2 | |
| aliquota_idx = i * 2 + 1 | |
| if limite_idx < len(rate_inputs) and aliquota_idx < len(rate_inputs): | |
| limite = rate_inputs[limite_idx] if rate_inputs[limite_idx] is not None else 0 | |
| aliquota = rate_inputs[aliquota_idx] if rate_inputs[aliquota_idx] is not None else 0 | |
| else: | |
| limite, aliquota = 0, 0 | |
| param_data.append([f"{p_code} Limite {i + 1} (UFM)", limite, ""]) | |
| param_data.append( | |
| [ | |
| f"{p_code} Alíquota {i + 1}", | |
| aliquota, | |
| f"{aliquota * 100:.3f}%" if aliquota is not None and aliquota != 0 else "0.000%", | |
| ] | |
| ) | |
| # --- SEÇÃO CORRIGIDA --- | |
| # Processar a alíquota final (da última faixa). | |
| # A lógica agora calcula o índice correto da alíquota final com base no número de faixas, | |
| # em vez de sempre pegar o último elemento da lista de inputs. | |
| # O índice da alíquota da última faixa (N) é (N-1)*2 + 1 nos rate_inputs. | |
| last_aliquot_index = (num_bands - 1) * 2 + 1 | |
| # Tratamento especial se for o número máximo de faixas, pois a UI a estrutura de forma diferente. | |
| if num_bands == MAX_BANDS: | |
| last_aliquot_index = -1 # Pega o último elemento da lista | |
| # Garante que o índice é válido e obtém o valor da alíquota final. | |
| if last_aliquot_index < len(rate_inputs): | |
| aliquota_final = rate_inputs[last_aliquot_index] if rate_inputs[last_aliquot_index] is not None else 0 | |
| else: | |
| # Fallback caso a lista de inputs seja inesperadamente curta. | |
| aliquota_final = rate_inputs[-1] if rate_inputs else 0 | |
| param_data.append( | |
| [ | |
| f"{p_code} Alíquota {num_bands}", | |
| aliquota_final, | |
| f"{aliquota_final * 100:.3f}%" if aliquota_final is not None and aliquota_final != 0 else "0.000%", | |
| ] | |
| ) | |
| # --- FIM DA SEÇÃO CORRIGIDA --- | |
| param_data.append(["", "", ""]) | |
| input_ptr += num_inputs_per_cat | |
| # Processar TAE e TAF | |
| if input_ptr < len(all_parameters): | |
| tae_aliq = all_parameters[input_ptr] if all_parameters[input_ptr] is not None else 0 | |
| else: | |
| tae_aliq = 0 | |
| if input_ptr + 1 < len(all_parameters): | |
| taf_aliq = all_parameters[input_ptr + 1] if all_parameters[input_ptr + 1] is not None else 0 | |
| else: | |
| taf_aliq = 0 | |
| param_data.extend( | |
| [ | |
| ["TERRENOS ALÍQUOTA ESPECIAL (TAE)", "", ""], | |
| [ | |
| "TAE Alíquota 1", | |
| tae_aliq, | |
| f"{tae_aliq * 100:.3f}%" if tae_aliq is not None and tae_aliq != 0 else "0.000%", | |
| ], | |
| ["", "", ""], | |
| ["TERRENOS ALÍQUOTA FIXA (TAF)", "", ""], | |
| [ | |
| "TAF Alíquota 1", | |
| taf_aliq, | |
| f"{taf_aliq * 100:.3f}%" if taf_aliq is not None and taf_aliq != 0 else "0.000%", | |
| ], | |
| ] | |
| ) | |
| df = pd.DataFrame(param_data, columns=["Parâmetro", "Valor", "Observação"]) | |
| return df | |
| # --- Funções de formatação e criação de Excel (REVISADA E AMPLIADA) --- | |
| def format_worksheet(worksheet, dataframe, currency_columns=None, percent_columns=None): | |
| from openpyxl.styles import NamedStyle, Font, PatternFill, Alignment | |
| from openpyxl.utils import get_column_letter | |
| if currency_columns is None: | |
| currency_columns = [] | |
| if percent_columns is None: | |
| percent_columns = [] | |
| header_style = NamedStyle( | |
| name="header", | |
| font=Font(bold=True, color="FFFFFF"), | |
| fill=PatternFill(start_color="366092", end_color="366092", fill_type="solid"), | |
| alignment=Alignment(horizontal="center", vertical="center"), | |
| ) | |
| # Apply header style and adjust column width | |
| for col_num, column_title in enumerate(dataframe.columns, 1): | |
| cell = worksheet.cell(row=1, column=col_num) | |
| cell.style = header_style | |
| # Auto-adjust column width | |
| max_length = len(str(column_title)) | |
| for cell_in_col in worksheet[get_column_letter(col_num)]: | |
| if cell_in_col.value: | |
| max_length = max(max_length, len(str(cell_in_col.value))) | |
| adjusted_width = max_length + 2 | |
| worksheet.column_dimensions[get_column_letter(col_num)].width = adjusted_width | |
| # Apply number formatting | |
| for col_name in currency_columns: | |
| if col_name in dataframe.columns: | |
| col_idx = dataframe.columns.get_loc(col_name) + 1 | |
| for row_num in range(2, len(dataframe) + 2): | |
| cell = worksheet.cell(row=row_num, column=col_idx) | |
| cell.number_format = 'R$ #,##0.00' | |
| for col_name in percent_columns: | |
| if col_name in dataframe.columns: | |
| col_idx_letter = get_column_letter(dataframe.columns.get_loc(col_name) + 1) | |
| for cell in worksheet[col_idx_letter]: | |
| if cell.row > 1: | |
| cell.number_format = '0.00%' | |
| return worksheet | |
| # --- Funções de formatação e criação de Excel (CORRIGIDA) --- | |
| def create_excel_download( | |
| df_projection_raw, | |
| total_novo_str, | |
| total_atual_trib_str, | |
| total_atual_calc_str, | |
| ufm_value, | |
| diferencia_estacionamentos_flag, | |
| diferencia_predios_flag, | |
| trava_aumento_perc, | |
| segregar_trava_flag, | |
| universal_rules, | |
| category_rules, | |
| descontos_anuais, | |
| all_parameters, | |
| yearly_aggregates_raw, | |
| detailed_by_category_raw, | |
| detailed_by_value_ranges_raw, | |
| df_enchente_analysis_raw, | |
| variation_analysis_raw, | |
| ): | |
| from openpyxl.utils import get_column_letter | |
| output = io.BytesIO() | |
| with pd.ExcelWriter(output, engine="openpyxl") as writer: | |
| def sanitize_sheet_name(name): | |
| return name.replace("/", "-").replace("\\", "-")[:31] | |
| # 1. Parâmetros Utilizados (sem alteração) | |
| params_data = create_parameters_sheet( | |
| ufm_value, diferencia_estacionamentos_flag, diferencia_predios_flag, | |
| trava_aumento_perc, segregar_trava_flag, universal_rules, | |
| category_rules, descontos_anuais, all_parameters, | |
| ) | |
| params_data.to_excel(writer, sheet_name="Parâmetros Utilizados", index=False) | |
| ws_params = writer.sheets["Parâmetros Utilizados"] | |
| format_worksheet(ws_params, params_data) | |
| # 2. Totais Gerais (sem alteração) | |
| if total_novo_str: | |
| summary_data = { | |
| "Descrição": ["Total IPTU Novo (Simulado Puro)", "Total IPTU Atual Tributado", "Total IPTU Atual Calculado"], | |
| "Valor": [ | |
| float(total_novo_str.split(": ")[1].replace(",", "")), | |
| float(total_atual_trib_str.split(": ")[1].replace(",", "")), | |
| float(total_atual_calc_str.split(": ")[1].replace(",", "")), | |
| ], | |
| } | |
| summary_df = pd.DataFrame(summary_data) | |
| summary_df.to_excel(writer, sheet_name="Totais Gerais", index=False) | |
| ws_total = writer.sheets["Totais Gerais"] | |
| format_worksheet(ws_total, summary_df, currency_columns=["Valor"]) | |
| # 3. Projeção com Trava (CORRIGIDO) | |
| if df_projection_raw is not None and not df_projection_raw.empty: | |
| df_export = df_projection_raw.copy() | |
| currency_cols = ["Valor Total Arrecadado", "Crescimento Absoluto"] | |
| percent_cols = ["Crescimento Relativo (%)"] | |
| for col in percent_cols: | |
| # CORREÇÃO: Converte para numérico antes de dividir | |
| df_export[col] = pd.to_numeric(df_export[col], errors='coerce') / 100.0 | |
| df_export.to_excel(writer, sheet_name="Projeção com Trava", index=False) | |
| ws = writer.sheets["Projeção com Trava"] | |
| format_worksheet(ws, df_export, currency_columns=currency_cols, percent_columns=percent_cols) | |
| # 4. Análise Enchente (CORRIGIDO) | |
| if df_enchente_analysis_raw is not None and not df_enchente_analysis_raw.empty: | |
| df_export = df_enchente_analysis_raw.copy() | |
| currency_cols = ["IPTU Atual Atingidos (2025)", "Simulado Puro (Y_base)", "Simulado com Desconto (sem trava)", "IPTU Final Projetado", "Diferença Puro vs Desconto (R$)", "Redução Total (R$)"] | |
| percent_cols = ["Diferença Puro vs Desconto (%)", "Redução Total (%)"] | |
| for col in percent_cols: | |
| # CORREÇÃO: Converte para numérico antes de dividir | |
| df_export[col] = pd.to_numeric(df_export[col], errors='coerce') / 100.0 | |
| df_export.to_excel(writer, sheet_name="Análise Enchente", index=False) | |
| ws = writer.sheets["Análise Enchente"] | |
| format_worksheet(ws, df_export, currency_columns=currency_cols, percent_columns=percent_cols) | |
| # 5. Resumos Agregados por Ano (CORRIGIDO) | |
| if yearly_aggregates_raw: | |
| for year, df_agg in yearly_aggregates_raw.items(): | |
| if df_agg is not None and not df_agg.empty: | |
| df_export = df_agg.copy() | |
| year_str = str(year).replace(" ", "_") | |
| currency_cols = [col for col in df_export.columns if "Total" in col or "Diferença Absoluta" in col] | |
| percent_cols = [col for col in df_export.columns if "%" in col] | |
| for col in percent_cols: | |
| # CORREÇÃO: Converte para numérico antes de dividir | |
| df_export[col] = pd.to_numeric(df_export[col], errors='coerce') / 100.0 | |
| sheet_name = sanitize_sheet_name(f"Resumo Agregado {year_str}") | |
| df_export.to_excel(writer, sheet_name=sheet_name, index=False) | |
| ws = writer.sheets[sheet_name] | |
| format_worksheet(ws, df_export, currency_columns=currency_cols, percent_columns=percent_cols) | |
| # 6. Detalhado por Faixa de Alíquota (já estava correto) | |
| if detailed_by_category_raw: | |
| for category_name, df_category in detailed_by_category_raw.items(): | |
| if df_category is not None and not df_category.empty: | |
| df_export = df_category.copy() | |
| currency_cols = [col for col in df_export.columns if "Total " in col] | |
| percent_cols = [col for col in df_export.columns if "%" in col] | |
| for col in percent_cols: | |
| df_export[col] = pd.to_numeric(df_export[col].astype(str).str.replace("%", ""), errors="coerce") / 100 | |
| sheet_name = sanitize_sheet_name(f"Faixas Alíq - {category_name}") | |
| df_export.to_excel(writer, sheet_name=sheet_name, index=False) | |
| ws = writer.sheets[sheet_name] | |
| format_worksheet(ws, df_export, currency_columns=currency_cols, percent_columns=percent_cols) | |
| # 7. Detalhado por Valor Venal (já estava correto) | |
| if detailed_by_value_ranges_raw: | |
| for category_name, df_category in detailed_by_value_ranges_raw.items(): | |
| if df_category is not None and not df_category.empty: | |
| df_export = df_category.copy() | |
| currency_cols = [col for col in df_export.columns if "Total " in col] | |
| percent_cols = [col for col in df_export.columns if "%" in col] | |
| for col in percent_cols: | |
| df_export[col] = pd.to_numeric(df_export[col].astype(str).str.replace("%", ""), errors="coerce") / 100 | |
| sheet_name = sanitize_sheet_name(f"Faixas VV - {category_name}") | |
| df_export.to_excel(writer, sheet_name=sheet_name, index=False) | |
| ws = writer.sheets[sheet_name] | |
| format_worksheet(ws, df_export, currency_columns=currency_cols, percent_columns=percent_cols) | |
| # 8. Análise de Variação (já estava correto) | |
| if variation_analysis_raw: | |
| for category_name, df_category in variation_analysis_raw.items(): | |
| if df_category is not None and not df_category.empty: | |
| df_export = df_category.copy() | |
| currency_cols = [col for col in df_export.columns if "Total" in col or "Diferença" in col] | |
| percent_cols = [col for col in df_export.columns if "%" in col] | |
| for col in percent_cols: | |
| df_export[col] = pd.to_numeric(df_export[col].astype(str).str.replace("%", ""), errors="coerce") / 100 | |
| sheet_name = sanitize_sheet_name(f"Variação - {category_name}") | |
| df_export.to_excel(writer, sheet_name=sheet_name, index=False) | |
| ws = writer.sheets[sheet_name] | |
| format_worksheet(ws, df_export, currency_columns=currency_cols, percent_columns=percent_cols) | |
| output.seek(0) | |
| return output | |
| # --- Otimização 2: Cálculo de Imposto Vetorizado --- | |
| def calculate_progressive_tax_vectorized(df_input, vv_column_name, iptu_rules): | |
| df = df_input.copy() | |
| total_tax_col = pd.Series(0, index=df.index, dtype=np.float64) | |
| for regra_nome, regra in iptu_rules.items(): | |
| filtro = regra["rule"] | |
| if not filtro.any(): | |
| continue | |
| faixas = regra["ranges"] | |
| if not faixas: | |
| continue | |
| current_vv = df.loc[filtro, vv_column_name] | |
| limites = [lim[0] for lim in faixas] | |
| aliquotas = [lim[1] for lim in faixas] | |
| parcelas_a_deduzir = [0.0] | |
| for i in range(1, len(limites)): | |
| deducao = parcelas_a_deduzir[i - 1] + ( | |
| limites[i - 1] * (aliquotas[i] - aliquotas[i - 1]) | |
| ) | |
| parcelas_a_deduzir.append(deducao) | |
| conditions = [] | |
| choices = [] | |
| for i in range(len(limites)): | |
| limite_inferior = limites[i - 1] if i > 0 else 0 | |
| limite_superior = limites[i] | |
| conditions.append( | |
| (current_vv > limite_inferior) & (current_vv <= limite_superior) | |
| ) | |
| choices.append(current_vv * aliquotas[i] - parcelas_a_deduzir[i]) | |
| imposto_calculado = np.select(conditions, choices, default=0) | |
| total_tax_col.loc[filtro] = imposto_calculado | |
| df["IPTU_NOVO_TOTAL"] = total_tax_col | |
| return df | |
| def create_flood_analysis_graphs_plotly(df_analysis, years_to_plot): | |
| figs = [] | |
| if not years_to_plot: | |
| return [] | |
| data_base = df_analysis[ | |
| ( | |
| df_analysis["SITUACAO_ALAGAMENTO"].isin( | |
| ["Direto", "Indireto", "Indeterminado"] | |
| ) | |
| ) | |
| & (df_analysis["VLR_IMPOSTO_iptu"] > 1) | |
| ].copy() | |
| if data_base.empty: | |
| return [] | |
| colors = px.colors.qualitative.Vivid | |
| for i, year in enumerate(years_to_plot): | |
| col_name = f"IPTU_PROJETADO_{year}" | |
| if col_name not in data_base.columns: | |
| continue | |
| data_year = data_base[["VLR_IMPOSTO_iptu", col_name]].copy().dropna() | |
| if data_year.empty: | |
| continue | |
| color_for_year = colors[i % len(colors)] | |
| data_year["ratio"] = data_year[col_name] / data_year["VLR_IMPOSTO_iptu"] | |
| fig_hist = go.Figure() | |
| fig_hist.add_trace( | |
| go.Histogram( | |
| x=data_year["ratio"], | |
| name="Inscrições", | |
| marker_color=color_for_year, | |
| xbins=dict(start=0.0, end=2.0, size=0.05), | |
| ) | |
| ) | |
| fig_hist.update_layout( | |
| title_text=f"Ano {year}: Relação IPTU Projetado / Atual (Atingidos)", | |
| xaxis_title_text="Ratio (Novo IPTU / IPTU Atual)", | |
| yaxis_title_text="Nº de Inscrições", | |
| title_font_size=16, | |
| font_size=12, | |
| bargap=0.05, | |
| plot_bgcolor="white", | |
| ) | |
| fig_hist.add_vline( | |
| x=1, line_dash="dash", line_color="red", annotation_text="Sem Mudança" | |
| ) | |
| figs.append(fig_hist) | |
| diff = data_year[col_name] - data_year["VLR_IMPOSTO_iptu"] | |
| aumentou = int((diff > 0.01).sum()) | |
| diminuiu = int((diff < -0.01).sum()) | |
| manteve = int(len(diff) - aumentou - diminuiu) | |
| diminuiu_ou_manteve = diminuiu + manteve | |
| total_inscricoes = aumentou + diminuiu_ou_manteve | |
| if total_inscricoes > 0: | |
| percent_aumentou = (aumentou / total_inscricoes) * 100 | |
| percent_diminuiu_manteve = (diminuiu_ou_manteve / total_inscricoes) * 100 | |
| fig_bar = go.Figure() | |
| fig_bar.add_trace( | |
| go.Bar( | |
| y=[""], | |
| x=[percent_diminuiu_manteve], | |
| name=f"Diminuiu ou Manteve ({diminuiu_ou_manteve:,})", | |
| orientation="h", | |
| marker=dict(color="royalblue", line=dict(color="black", width=1.5)), | |
| text=f"<b>{diminuiu_ou_manteve:,}</b><br>({percent_diminuiu_manteve:.1f}%)", | |
| textposition="inside", | |
| insidetextanchor="middle", | |
| textfont=dict(color="black", size=16, family="Arial"), | |
| ) | |
| ) | |
| fig_bar.add_trace( | |
| go.Bar( | |
| y=[""], | |
| x=[percent_aumentou], | |
| name=f"Aumentou ({aumentou:,})", | |
| orientation="h", | |
| marker=dict( | |
| color="lightcoral", line=dict(color="black", width=1.5) | |
| ), | |
| text=f"<b>{aumentou:,}</b><br>({percent_aumentou:.1f}%)", | |
| textposition="inside", | |
| insidetextanchor="middle", | |
| textfont=dict(color="black", size=16, family="Arial"), | |
| ) | |
| ) | |
| fig_bar.update_layout( | |
| title={ | |
| "text": f"<b>Atingidos - Ano {year}</b><br>Proporção de Inscrições por Variação do IPTU", | |
| "y": 0.95, | |
| "x": 0.5, | |
| "xanchor": "center", | |
| "yanchor": "top", | |
| "font": {"size": 18, "family": "Arial"}, | |
| }, | |
| barmode="stack", | |
| xaxis=dict( | |
| range=[0, 100], | |
| ticksuffix="%", | |
| title=dict(text="Proporção do Total de Inscrições", standoff=35), | |
| ), | |
| yaxis=dict(showticklabels=False), | |
| legend=dict( | |
| orientation="h", | |
| yanchor="bottom", | |
| y=-0.25, | |
| xanchor="center", | |
| x=0.5, | |
| traceorder="normal", | |
| ), | |
| plot_bgcolor="rgba(0,0,0,0)", | |
| paper_bgcolor="rgba(0,0,0,0)", | |
| margin=dict(t=80, b=140), | |
| height=400, | |
| font=dict(family="Arial"), | |
| ) | |
| figs.append(fig_bar) | |
| return figs | |
| # --- Função create_active_ranges (CORRIGIDA) --- | |
| def create_active_ranges(rate_inputs, num_bands, ufm_value): | |
| """ | |
| Cria a lista de faixas de alíquota ativas ([limite_em_reais, aliquota]) | |
| com base nos inputs do usuário, de forma robusta. | |
| """ | |
| # Retorna uma lista vazia se não houver bandas ou inputs a processar. | |
| if not num_bands or not rate_inputs: | |
| return [] | |
| active_ranges = [] | |
| # Itera sobre o número de faixas que o usuário configurou, exceto a última. | |
| # A última faixa é tratada separadamente, pois seu limite é infinito. | |
| for i in range(num_bands - 1): | |
| # Calcula o índice do limite e da alíquota na lista de inputs. | |
| # Ex: para a primeira faixa (i=0), usa o limite no índice 0 e a alíquota no índice 1. | |
| limite_idx = i * 2 | |
| aliquota_idx = i * 2 + 1 | |
| # Obtém o valor do limite, usando 0 como padrão se o campo estiver vazio. | |
| limite_ufm = ( | |
| rate_inputs[limite_idx] if rate_inputs[limite_idx] is not None else 0 | |
| ) | |
| # Obtém o valor da alíquota, usando 0 como padrão se o campo estiver vazio. | |
| aliquota = ( | |
| rate_inputs[aliquota_idx] if rate_inputs[aliquota_idx] is not None else 0 | |
| ) | |
| active_ranges.append([limite_ufm * ufm_value, aliquota]) | |
| # A última faixa sempre tem um limite infinito. | |
| # Precisamos encontrar a alíquota correta para esta última faixa. | |
| # O índice da alíquota da última faixa (N) é (N-1)*2 + 1. | |
| last_aliquot_index = (num_bands - 1) * 2 + 1 | |
| # O layout dos inputs no código original tem uma exceção: a alíquota da 8ª faixa | |
| # é o último item da lista. A lógica abaixo lida com isso. | |
| if num_bands == MAX_BANDS: | |
| last_aliquot_index = -1 # O último item da lista de inputs | |
| # Obtém a alíquota final, usando 0 como padrão se o campo estiver vazio. | |
| final_aliquot = ( | |
| rate_inputs[last_aliquot_index] | |
| if rate_inputs[last_aliquot_index] is not None | |
| else 0 | |
| ) | |
| active_ranges.append([np.inf, final_aliquot]) | |
| return active_ranges | |
| # --- NOVA FUNÇÃO DE VALIDAÇÃO --- | |
| def validate_band_limits(diferencia_est, diferencia_pred, *all_rates): | |
| """ | |
| Valida os limites das faixas de alíquota em tempo real para garantir a consistência. | |
| Desativa o botão de processamento se encontrar um erro. | |
| """ | |
| try: | |
| num_inputs_per_cat = 1 + (MAX_BANDS - 1) * 2 + 1 | |
| # Mapeamento de prefixos para informações de categoria para mensagens de erro amigáveis | |
| prefix_map = { | |
| "PG": {"idx": 0, "name": "Predial Geral"}, | |
| "PR": {"idx": 1, "name": "Predial Residencial"}, | |
| "PNR": {"idx": 2, "name": "Predial Não Residencial"}, | |
| "EG": {"idx": 3, "name": "Estacionamentos Geral"}, | |
| "ER": {"idx": 4, "name": "Estacionamentos Residenciais"}, | |
| "ENR": {"idx": 5, "name": "Estacionamentos Não Residenciais"}, | |
| "T1DF": {"idx": 6, "name": "Terrenos 1a DF"}, | |
| "T2DF": {"idx": 7, "name": "Terrenos 2a DF"}, | |
| "T3DF": {"idx": 8, "name": "Terrenos 3a DF"}, | |
| } | |
| # Determina quais categorias estão ativas com base nos checkboxes | |
| active_prefixes = [] | |
| if diferencia_pred: | |
| active_prefixes.extend(["PR", "PNR"]) | |
| else: | |
| active_prefixes.append("PG") | |
| if diferencia_est: | |
| active_prefixes.extend(["ER", "ENR"]) | |
| else: | |
| active_prefixes.append("EG") | |
| active_prefixes.extend(["T1DF", "T2DF", "T3DF"]) | |
| for prefix in active_prefixes: | |
| cat_info = prefix_map[prefix] | |
| cat_index = cat_info["idx"] | |
| cat_name = cat_info["name"] | |
| cat_inputs = all_rates[ | |
| cat_index * num_inputs_per_cat : (cat_index + 1) * num_inputs_per_cat | |
| ] | |
| # Pula a validação se o número de faixas não estiver definido (pode ser None no início) | |
| if cat_inputs[0] is None or str(cat_inputs[0]) == "": | |
| continue | |
| num_bands = int(cat_inputs[0]) | |
| last_limit = 0.0 | |
| for i in range(num_bands - 1): | |
| limit_input = cat_inputs[1 + i * 2] | |
| # Pula a validação se o campo do limite estiver vazio | |
| if limit_input is None or str(limit_input) == "": | |
| continue | |
| current_limit = float(limit_input) | |
| # A validação principal: o limite final de uma faixa deve ser maior que o seu início (que é o limite da faixa anterior) | |
| if current_limit <= last_limit: | |
| error_msg = f"Erro em '{cat_name}', Faixa {i + 1}: O Fim do Intervalo ({current_limit}) deve ser maior que o seu Início ({last_limit})." | |
| return gr.update(value=error_msg, visible=True), gr.update( | |
| interactive=False | |
| ) | |
| last_limit = current_limit | |
| # Se o loop terminar sem erros, oculta a mensagem de erro e ativa o botão | |
| return gr.update(value="", visible=False), gr.update(interactive=True) | |
| except (ValueError, TypeError): | |
| # Captura erros se o usuário digitar um texto não numérico | |
| return gr.update( | |
| value="Erro: valor inválido inserido em uma das faixas.", visible=True | |
| ), gr.update(interactive=False) | |
| except Exception as e: | |
| # Captura outros erros inesperados durante a validação | |
| return gr.update( | |
| value=f"Erro inesperado na validação: {e}", visible=True | |
| ), gr.update(interactive=False) | |
| # --- NOVA FUNÇÃO AUXILIAR PARA ANÁLISE DA ENCHENTE --- | |
| def generate_flood_analysis( | |
| the_merge, df_analysis, base_filter, anos_projecao, descontos_anuais | |
| ): | |
| """ | |
| Gera um DataFrame de análise de enchente para um determinado filtro. | |
| Retorna um DataFrame formatado para exibição e um DataFrame com dados brutos. | |
| """ | |
| # Retorna DFs vazios se o filtro não encontrar nenhum imóvel | |
| if not base_filter.any(): | |
| return pd.DataFrame(), pd.DataFrame() | |
| # Cálculos de totais para o subconjunto de imóveis definido pelo 'base_filter' | |
| iptu_2025_enchente_sum = the_merge.loc[base_filter, "VLR_IMPOSTO_iptu"].sum() | |
| y_base_puro_enchente_sum = df_analysis.loc[base_filter, "IPTU_CENARIO_PURO"].sum() | |
| dados_por_ano = [] | |
| # Garante que a análise sempre inclua os anos com desconto definido | |
| anos_analise = sorted(list(set(anos_projecao + list(descontos_anuais.keys())))) | |
| for year in anos_analise: | |
| desconto_percentual = descontos_anuais.get(year, 0) | |
| fator_desconto = 1 - (desconto_percentual / 100.0) | |
| # Calcula o valor com desconto, antes da trava | |
| cenario_com_desconto = y_base_puro_enchente_sum * fator_desconto | |
| # Determina o valor final projetado (com travas) | |
| # Se o ano não tiver projeção (ex: 2028, 2029 sem regra de transição), o valor final é o próprio valor com desconto | |
| if f"IPTU_PROJETADO_{year}" in df_analysis.columns: | |
| cenario_final = df_analysis.loc[base_filter, f"IPTU_PROJETADO_{year}"].sum() | |
| else: | |
| cenario_final = cenario_com_desconto | |
| dados_por_ano.append( | |
| { | |
| "Exercício": year, | |
| "Simulado Puro (Y_base)": y_base_puro_enchente_sum, | |
| "Simulado com Desconto (sem trava)": cenario_com_desconto, | |
| "IPTU Final Projetado": cenario_final, | |
| } | |
| ) | |
| df_raw = pd.DataFrame(dados_por_ano) | |
| if df_raw.empty: | |
| return pd.DataFrame(), pd.DataFrame() | |
| # Adiciona colunas calculadas | |
| df_raw["IPTU Atual Atingidos (2025)"] = iptu_2025_enchente_sum | |
| df_raw["Diferença Puro vs Desconto (R$)"] = ( | |
| df_raw["Simulado Puro (Y_base)"] - df_raw["Simulado com Desconto (sem trava)"] | |
| ) | |
| df_raw["Diferença Puro vs Desconto (%)"] = ( | |
| df_raw["Diferença Puro vs Desconto (R$)"] | |
| / df_raw["Simulado Puro (Y_base)"] | |
| * 100 | |
| ).fillna(0) | |
| df_raw["Redução Total (R$)"] = ( | |
| df_raw["Simulado Puro (Y_base)"] - df_raw["IPTU Final Projetado"] | |
| ) | |
| df_raw["Redução Total (%)"] = ( | |
| df_raw["Redução Total (R$)"] / df_raw["Simulado Puro (Y_base)"] * 100 | |
| ).fillna(0) | |
| # Ordena e formata as colunas | |
| cols_order = [ | |
| "Exercício", | |
| "IPTU Atual Atingidos (2025)", | |
| "Simulado Puro (Y_base)", | |
| "Simulado com Desconto (sem trava)", | |
| "IPTU Final Projetado", | |
| "Diferença Puro vs Desconto (R$)", | |
| "Diferença Puro vs Desconto (%)", | |
| "Redução Total (R$)", | |
| "Redução Total (%)", | |
| ] | |
| df_raw = df_raw[cols_order] | |
| df_display = df_raw.copy() | |
| currency_cols = [ | |
| "IPTU Atual Atingidos (2025)", | |
| "Simulado Puro (Y_base)", | |
| "Simulado com Desconto (sem trava)", | |
| "IPTU Final Projetado", | |
| "Diferença Puro vs Desconto (R$)", | |
| "Redução Total (R$)", | |
| ] | |
| percent_cols = ["Diferença Puro vs Desconto (%)", "Redução Total (%)"] | |
| for col in currency_cols: | |
| df_display[col] = df_display[col].apply(lambda x: f"R$ {x:,.2f}") | |
| for col in percent_cols: | |
| df_display[col] = df_display[col].apply(lambda x: f"{x:.2f}%") | |
| return df_display, df_raw | |
| # --- Função Principal de Processamento (MODIFICADA) --- | |
| def process_iptu_data( | |
| ufm_value_input, | |
| diferencia_estacionamentos_flag, | |
| diferencia_predios_flag, | |
| trava_aumento_perc, | |
| segregar_piso_flag, | |
| segregar_trava_flag, | |
| value_range_interval, | |
| aggregate_from_vv_value, | |
| show_projection, | |
| show_aggregate, | |
| show_detailed_bands, | |
| show_detailed_vv, | |
| show_totals, | |
| show_flood_analysis, | |
| show_flood_graphs, | |
| show_excel, | |
| universal_transition_rules, | |
| transition_rules_by_category, | |
| desconto_2026, | |
| desconto_2027, | |
| desconto_2028, | |
| desconto_2029, | |
| *all_rate_inputs, | |
| ): | |
| num_inputs_per_cat = 1 + (MAX_BANDS - 1) * 2 + 1 | |
| pg_num_bands, *pg_rates = all_rate_inputs[0:num_inputs_per_cat] | |
| pr_num_bands, *pr_rates = all_rate_inputs[ | |
| num_inputs_per_cat * 1 : num_inputs_per_cat * 2 | |
| ] | |
| pnr_num_bands, *pnr_rates = all_rate_inputs[ | |
| num_inputs_per_cat * 2 : num_inputs_per_cat * 3 | |
| ] | |
| eg_num_bands, *eg_rates = all_rate_inputs[ | |
| num_inputs_per_cat * 3 : num_inputs_per_cat * 4 | |
| ] | |
| er_num_bands, *er_rates = all_rate_inputs[ | |
| num_inputs_per_cat * 4 : num_inputs_per_cat * 5 | |
| ] | |
| enr_num_bands, *enr_rates = all_rate_inputs[ | |
| num_inputs_per_cat * 5 : num_inputs_per_cat * 6 | |
| ] | |
| t1df_num_bands, *t1df_rates = all_rate_inputs[ | |
| num_inputs_per_cat * 6 : num_inputs_per_cat * 7 | |
| ] | |
| t2df_num_bands, *t2df_rates = all_rate_inputs[ | |
| num_inputs_per_cat * 7 : num_inputs_per_cat * 8 | |
| ] | |
| t3df_num_bands, *t3df_rates = all_rate_inputs[ | |
| num_inputs_per_cat * 8 : num_inputs_per_cat * 9 | |
| ] | |
| tae_alq1, taf_alq1 = all_rate_inputs[num_inputs_per_cat * 9 :] | |
| descontos_anuais = { | |
| 2026: desconto_2026, | |
| 2027: desconto_2027, | |
| 2028: desconto_2028, | |
| 2029: desconto_2029, | |
| } | |
| views_to_process = [] | |
| if show_projection: | |
| views_to_process.append("Projeção de Arrecadação") | |
| if show_aggregate: | |
| views_to_process.append("Resumo Agregado") | |
| if show_detailed_bands: | |
| views_to_process.append("Resumo Detalhado por Faixa") | |
| if show_detailed_vv: | |
| views_to_process.append("Resumo Detalhado por Valor Venal") | |
| if show_totals: | |
| views_to_process.append("Totais Gerais") | |
| if show_flood_analysis: | |
| views_to_process.append("Análise da Enchente") | |
| if show_flood_graphs: | |
| views_to_process.append("Gráficos da Enchente") | |
| if show_excel: | |
| views_to_process.append("Gerar Arquivo Excel") | |
| empty_df = pd.DataFrame() | |
| empty_dict = {} | |
| visibility_updates = [ | |
| gr.update(visible=v) | |
| for v in [ | |
| show_projection, | |
| show_aggregate, | |
| show_detailed_bands, | |
| show_detailed_vv, | |
| show_totals, | |
| show_flood_analysis, | |
| show_excel, | |
| show_flood_graphs, | |
| ] | |
| ] | |
| MAX_PLOTS = 30 | |
| empty_plots = [gr.update(value=None, visible=False)] * MAX_PLOTS | |
| num_cat_outputs = 13 # Number of category-based outputs | |
| error_return = ( | |
| "Erro no processamento.", | |
| empty_df, "", "", "", # Projection, Totals | |
| empty_df, empty_df, empty_df, empty_df, empty_df, empty_df, empty_df, # Flood analysis | |
| empty_dict, gr.update(choices=[], value=None), empty_df, # Aggregate | |
| gr.Textbox(visible=False), gr.DownloadButton(visible=False), # Excel | |
| ) + (gr.update(value=empty_df),) * num_cat_outputs \ | |
| + (gr.update(value=empty_df),) * num_cat_outputs \ | |
| + tuple(empty_plots) \ | |
| + (None, None, None, None, None) \ | |
| + tuple(visibility_updates) \ | |
| + (gr.update(choices=[], value=None), ) # Variation analysis year dropdown | |
| if not views_to_process: | |
| empty_return = ( | |
| "Nenhuma visão selecionada para processamento.", | |
| empty_df, "", "", "", # Projection, Totals | |
| empty_df, empty_df, empty_df, empty_df, empty_df, empty_df, empty_df, # Flood analysis | |
| empty_dict, gr.update(choices=[], value=None), empty_df, # Aggregate | |
| gr.Textbox(visible=False), gr.DownloadButton(visible=False), # Excel | |
| ) + (gr.update(value=empty_df),) * num_cat_outputs \ | |
| + (gr.update(value=empty_df),) * num_cat_outputs \ | |
| + tuple(empty_plots) \ | |
| + (None, None, None, None, None) \ | |
| + tuple([gr.update(visible=False)] * 8) \ | |
| + (gr.update(choices=[], value=None), ) | |
| return empty_return | |
| if not os.path.exists(CSV_FILE_PATH): | |
| return (f"Erro: Arquivo '{CSV_FILE_PATH}' não encontrado.",) + error_return[1:] | |
| if THE_MERGE_GLOBAL is None: | |
| return ( | |
| "Erro crítico: Os dados iniciais não puderam ser carregados. Verifique o console.", | |
| ) + error_return[1:] | |
| the_merge = THE_MERGE_GLOBAL.copy() | |
| UFM_VALUE = ufm_value_input | |
| nao_tributados = FILTERS_GLOBAL["nao_tributados"] | |
| isen_prop_loc = FILTERS_GLOBAL["isen_prop_loc"] | |
| isen_hab_pop = FILTERS_GLOBAL["isen_hab_pop"] | |
| prediais_base = FILTERS_GLOBAL["prediais_base"] | |
| prediais_residenciais = FILTERS_GLOBAL["prediais_residenciais"] | |
| prediais_nao_residenciais = FILTERS_GLOBAL["prediais_nao_residenciais"] | |
| prediais_residenciais_tributados = FILTERS_GLOBAL[ | |
| "prediais_residenciais_tributados" | |
| ] | |
| prediais_nao_residenciais_tributados = FILTERS_GLOBAL[ | |
| "prediais_nao_residenciais_tributados" | |
| ] | |
| prediais_geral_tributados = FILTERS_GLOBAL["prediais_geral_tributados"] | |
| prediais_residenciais_nao_tributados = FILTERS_GLOBAL[ | |
| "prediais_residenciais_nao_tributados" | |
| ] | |
| prediais_nao_residenciais_nao_tributados = FILTERS_GLOBAL[ | |
| "prediais_nao_residenciais_nao_tributados" | |
| ] | |
| prediais_geral_nao_tributados = FILTERS_GLOBAL["prediais_geral_nao_tributados"] | |
| estacionamentos_geral = FILTERS_GLOBAL["estacionamentos_geral"] | |
| estacionamentos_residenciais = FILTERS_GLOBAL["estacionamentos_residenciais"] | |
| estacionamentos_nao_residenciais = FILTERS_GLOBAL[ | |
| "estacionamentos_nao_residenciais" | |
| ] | |
| estacionamentos_residenciais_tributados = FILTERS_GLOBAL[ | |
| "estacionamentos_residenciais_tributados" | |
| ] | |
| estacionamentos_nao_residenciais_tributados = FILTERS_GLOBAL[ | |
| "estacionamentos_nao_residenciais_tributados" | |
| ] | |
| estacionamentos_geral_tributados = FILTERS_GLOBAL[ | |
| "estacionamentos_geral_tributados" | |
| ] | |
| estacionamentos_residenciais_nao_tributados = FILTERS_GLOBAL[ | |
| "estacionamentos_residenciais_nao_tributados" | |
| ] | |
| estacionamentos_nao_residenciais_nao_tributados = FILTERS_GLOBAL[ | |
| "estacionamentos_nao_residenciais_nao_tributados" | |
| ] | |
| estacionamentos_geral_nao_tributados = FILTERS_GLOBAL[ | |
| "estacionamentos_geral_nao_tributados" | |
| ] | |
| terrenos_1df_base = FILTERS_GLOBAL["terrenos_1df_base"] | |
| terrenos_2df_base = FILTERS_GLOBAL["terrenos_2df_base"] | |
| terrenos_3df_base = FILTERS_GLOBAL["terrenos_3df_base"] | |
| terrenos_1df_tributados = FILTERS_GLOBAL["terrenos_1df_tributados"] | |
| terrenos_2df_tributados = FILTERS_GLOBAL["terrenos_2df_tributados"] | |
| terrenos_3df_tributados = FILTERS_GLOBAL["terrenos_3df_tributados"] | |
| terrenos_1df_nao_tributados = FILTERS_GLOBAL["terrenos_1df_nao_tributados"] | |
| terrenos_2df_nao_tributados = FILTERS_GLOBAL["terrenos_2df_nao_tributados"] | |
| terrenos_3df_nao_tributados = FILTERS_GLOBAL["terrenos_3df_nao_tributados"] | |
| regra_tae_base = FILTERS_GLOBAL["regra_tae_base"] | |
| regra_taf_base = FILTERS_GLOBAL["regra_taf_base"] | |
| terrenos_aliqesp = FILTERS_GLOBAL["terrenos_aliqesp"] | |
| terrenos_aliqfix = FILTERS_GLOBAL["terrenos_aliqfix"] | |
| category_mapping = {} | |
| if diferencia_predios_flag: | |
| category_mapping.update( | |
| { | |
| "Predial Residencial": FILTERS_GLOBAL[ | |
| "prediais_residenciais_tributados" | |
| ] | |
| | isen_prop_loc | |
| | isen_hab_pop, | |
| "Predial Não Residencial": prediais_nao_residenciais_tributados, | |
| } | |
| ) | |
| else: | |
| category_mapping["Predial Geral"] = ( | |
| prediais_geral_tributados | |
| | prediais_residenciais_tributados | |
| | prediais_nao_residenciais_tributados | |
| | isen_prop_loc | |
| | isen_hab_pop | |
| ) | |
| if diferencia_estacionamentos_flag: | |
| category_mapping.update( | |
| { | |
| "Estacionamentos Residenciais": estacionamentos_residenciais_tributados, | |
| "Estacionamentos Não Residenciais": estacionamentos_nao_residenciais_tributados, | |
| } | |
| ) | |
| else: | |
| category_mapping["Estacionamentos Geral"] = ( | |
| estacionamentos_geral_tributados | |
| | estacionamentos_residenciais_tributados | |
| | estacionamentos_nao_residenciais_tributados | |
| ) | |
| category_mapping.update( | |
| { | |
| "Terrenos 1ª DF": terrenos_1df_tributados, | |
| "Terrenos 2ª DF": terrenos_2df_tributados, | |
| "Terrenos 3ª DF": terrenos_3df_tributados, | |
| "Terrenos Alíquota Especial": terrenos_aliqesp, | |
| "Terrenos Alíquota Fixa": terrenos_aliqfix, | |
| } | |
| ) | |
| # --- ETAPA 1: Cálculo do IPTU Puro (sem descontos) --- | |
| IPTU_RULES = { | |
| "PG": { | |
| "name": "Predial Sem Dif. por Uso", | |
| "rule": prediais_geral_tributados, | |
| "ranges": create_active_ranges(pg_rates, pg_num_bands, UFM_VALUE), | |
| }, | |
| "PR": { | |
| "name": "Predial Residencial", | |
| "rule": prediais_residenciais_tributados, | |
| "ranges": create_active_ranges(pr_rates, pr_num_bands, UFM_VALUE), | |
| }, | |
| "PNR": { | |
| "name": "Predial Não Residencial", | |
| "rule": prediais_nao_residenciais_tributados, | |
| "ranges": create_active_ranges(pnr_rates, pnr_num_bands, UFM_VALUE), | |
| }, | |
| "EG": { | |
| "name": "Estacionamentos Sem Dif. por Uso", | |
| "rule": estacionamentos_geral_tributados, | |
| "ranges": create_active_ranges(eg_rates, eg_num_bands, UFM_VALUE), | |
| }, | |
| "ER": { | |
| "name": "Estacionamentos Residenciais", | |
| "rule": estacionamentos_residenciais_tributados, | |
| "ranges": create_active_ranges(er_rates, er_num_bands, UFM_VALUE), | |
| }, | |
| "ENR": { | |
| "name": "Estacionamentos Não Residenciais", | |
| "rule": estacionamentos_nao_residenciais_tributados, | |
| "ranges": create_active_ranges(enr_rates, enr_num_bands, UFM_VALUE), | |
| }, | |
| "T1DF": { | |
| "name": "Terrenos na 1a DF", | |
| "rule": terrenos_1df_tributados, | |
| "ranges": create_active_ranges(t1df_rates, t1df_num_bands, UFM_VALUE), | |
| }, | |
| "T2DF": { | |
| "name": "Terrenos na 2a DF", | |
| "rule": terrenos_2df_tributados, | |
| "ranges": create_active_ranges(t2df_rates, t2df_num_bands, UFM_VALUE), | |
| }, | |
| "T3DF": { | |
| "name": "Terrenos na 3a DF", | |
| "rule": terrenos_3df_tributados, | |
| "ranges": create_active_ranges(t3df_rates, t3df_num_bands, UFM_VALUE), | |
| }, | |
| "PGI": { | |
| "name": "Predial Sem Dif. por Uso Isentos", | |
| "rule": prediais_geral_nao_tributados, | |
| "ranges": [[np.inf, 0.0]], | |
| }, | |
| "PRI": { | |
| "name": "Predial Residencial Isentos", | |
| "rule": prediais_residenciais_nao_tributados, | |
| "ranges": [[np.inf, 0.0]], | |
| }, | |
| "PNRI": { | |
| "name": "Predial Não Residencial Isentos", | |
| "rule": prediais_nao_residenciais_nao_tributados, | |
| "ranges": [[np.inf, 0.0]], | |
| }, | |
| "EGI": { | |
| "name": "Estacionamentos Sem Dif. por Uso Isentos", | |
| "rule": estacionamentos_geral_nao_tributados, | |
| "ranges": [[np.inf, 0.0]], | |
| }, | |
| "ERI": { | |
| "name": "Estacionamentos Residenciais Isentos", | |
| "rule": estacionamentos_residenciais_nao_tributados, | |
| "ranges": [[np.inf, 0.0]], | |
| }, | |
| "ENRI": { | |
| "name": "Estacionamentos Não Residenciais Isentos", | |
| "rule": estacionamentos_nao_residenciais_nao_tributados, | |
| "ranges": [[np.inf, 0.0]], | |
| }, | |
| "T1DFI": { | |
| "name": "Terrenos na 1a DF Isentos", | |
| "rule": terrenos_1df_nao_tributados, | |
| "ranges": [[np.inf, 0.0]], | |
| }, | |
| "T2DFI": { | |
| "name": "Terrenos na 2a DF Isentos", | |
| "rule": terrenos_2df_nao_tributados, | |
| "ranges": [[np.inf, 0.0]], | |
| }, | |
| "T3DFI": { | |
| "name": "Terrenos na 3a DF Isentos", | |
| "rule": terrenos_3df_nao_tributados, | |
| "ranges": [[np.inf, 0.0]], | |
| }, | |
| "TAE": { | |
| "name": "Terrenos Aliq. Esp.", | |
| "rule": terrenos_aliqesp, | |
| "ranges": [[np.inf, tae_alq1]], | |
| }, | |
| "TAF": { | |
| "name": "Terrenos Aliq. Fix.", | |
| "rule": terrenos_aliqfix, | |
| "ranges": [[np.inf, taf_alq1]], | |
| }, | |
| } | |
| if diferencia_estacionamentos_flag: | |
| del IPTU_RULES["EG"], IPTU_RULES["EGI"] | |
| else: | |
| del IPTU_RULES["ER"], IPTU_RULES["ERI"], IPTU_RULES["ENR"], IPTU_RULES["ENRI"] | |
| if diferencia_predios_flag: | |
| del IPTU_RULES["PG"], IPTU_RULES["PGI"] | |
| else: | |
| del IPTU_RULES["PR"], IPTU_RULES["PRI"], IPTU_RULES["PNR"], IPTU_RULES["PNRI"] | |
| reference_rule_key = "PR" if diferencia_predios_flag else "PG" | |
| if reference_rule_key in IPTU_RULES: | |
| reference_rule = IPTU_RULES[reference_rule_key] | |
| prpl_ranges = ( | |
| [[100000 * UFM_VALUE, 0.0]] | |
| + [r for r in reference_rule["ranges"] if r[0] > 100000 * UFM_VALUE] | |
| if reference_rule["ranges"] | |
| and reference_rule["ranges"][0][0] < 100000 * UFM_VALUE | |
| else reference_rule["ranges"].copy() | |
| ) | |
| IPTU_RULES["PRPL"] = { | |
| "name": "Predial Residencial - Aposentado, Inativo, Pensionista, Deficiente", | |
| "rule": isen_prop_loc, | |
| "ranges": prpl_ranges, | |
| } | |
| prph_ranges = ( | |
| [[55000 * UFM_VALUE, 0.0]] | |
| + [r for r in reference_rule["ranges"] if r[0] > 55000 * UFM_VALUE] | |
| if reference_rule["ranges"] | |
| and reference_rule["ranges"][0][0] < 55000 * UFM_VALUE | |
| else reference_rule["ranges"].copy() | |
| ) | |
| IPTU_RULES["PRHP"] = { | |
| "name": "Predial Residencial - Habitações Populares", | |
| "rule": isen_hab_pop, | |
| "ranges": prph_ranges, | |
| } | |
| df_resumo_final = calculate_progressive_tax_vectorized( | |
| the_merge, "valor_venal_calculo", IPTU_RULES | |
| ) | |
| IPTU_RULES_FINAL = IPTU_RULES.copy() | |
| df_analysis = the_merge[ | |
| [ | |
| "VLR_IMPOSTO_iptu", | |
| "VLR_IMPOSTO_CALCULADO_iptu", | |
| "SITUACAO_ALAGAMENTO", | |
| "PERC_ATING", | |
| "valor_venal_calculo", | |
| ] | |
| ].copy() | |
| df_analysis["IPTU_CENARIO_PURO"] = df_resumo_final["IPTU_NOVO_TOTAL"] | |
| # --- ETAPA 2: Aplicação de Descontos e Travas de Aumento --- | |
| X = df_analysis["VLR_IMPOSTO_iptu"].copy().fillna(0) | |
| Y_puro = df_analysis["IPTU_CENARIO_PURO"].copy().fillna(0) | |
| filtro_enchente = the_merge["SITUACAO_ALAGAMENTO"].isin( | |
| ["Direto", "Indireto", "Indeterminado"] | |
| ) | |
| filtro_enchente_predial = filtro_enchente & prediais_base | |
| # NOVA COLUNA: IPTU Projetado Puro com Desconto (para "Total Projeto" e "Sem Travas") | |
| Y_puro_com_desconto = Y_puro.copy() | |
| primeiro_ano_desconto = descontos_anuais.get(2026, 0) # Assumindo 2026 como base | |
| if primeiro_ano_desconto > 0: | |
| fator_desc_primeiro_ano = 1 - (primeiro_ano_desconto / 100.0) | |
| Y_puro_com_desconto.loc[filtro_enchente_predial] *= fator_desc_primeiro_ano | |
| df_analysis["IPTU_PURO_C_DESC"] = Y_puro_com_desconto | |
| anos_projecao = ( | |
| sorted( | |
| list( | |
| set( | |
| rule["year"] | |
| for rules in transition_rules_by_category.values() | |
| for rule in rules | |
| ) | |
| ) | |
| ) | |
| if segregar_trava_flag | |
| else sorted(list(set(rule["year"] for rule in universal_transition_rules))) | |
| ) | |
| for year in anos_projecao: | |
| # Começa com o valor puro do cenário | |
| Y_ano_corrente = Y_puro.copy() | |
| # Aplica o desconto de enchente, se houver para o ano | |
| desconto_percentual = descontos_anuais.get(year, 0) | |
| if desconto_percentual > 0: | |
| fator_desconto = 1 - (desconto_percentual / 100.0) | |
| Y_ano_corrente.loc[filtro_enchente_predial] *= fator_desconto | |
| # Aplica a trava de aumento sobre o valor já com desconto | |
| with np.errstate(divide="ignore", invalid="ignore"): | |
| aumento_relativo = np.nan_to_num((Y_ano_corrente - X) / X, posinf=np.inf) | |
| trava_ativada = aumento_relativo >= (trava_aumento_perc / 100.0) | |
| resultado_final = Y_ano_corrente.copy() | |
| if trava_ativada.any(): | |
| if segregar_trava_flag: | |
| conditions, factors = [], [] | |
| for cat_name, cat_filter in category_mapping.items(): | |
| year_rule = next( | |
| ( | |
| r | |
| for r in transition_rules_by_category.get(cat_name, []) | |
| if r["year"] == year | |
| ), | |
| None, | |
| ) | |
| if year_rule: | |
| conditions.append(cat_filter) | |
| factors.append(year_rule["factor"]) | |
| fator_serie = pd.Series( | |
| np.select(conditions, factors, default=1.0), index=the_merge.index | |
| ) | |
| else: | |
| year_rule = next( | |
| (r for r in universal_transition_rules if r["year"] == year), None | |
| ) | |
| fator_universal = year_rule["factor"] if year_rule else 1.0 | |
| fator_serie = pd.Series(fator_universal, index=the_merge.index) | |
| indices_com_trava = trava_ativada | |
| if indices_com_trava.any(): | |
| X_trava = X[indices_com_trava].copy() | |
| Y_trava = Y_ano_corrente[indices_com_trava].copy() | |
| fator_trava = fator_serie[indices_com_trava].copy() | |
| valor_com_trava_calculado = X_trava + fator_trava * (Y_trava - X_trava) | |
| resultado_final.loc[indices_com_trava] = valor_com_trava_calculado | |
| df_analysis[f"IPTU_PROJETADO_{year}"] = resultado_final.copy() | |
| the_merge = the_merge.join(df_analysis.filter(like="IPTU_")) | |
| # --- ETAPA 3: Geração de Visões e Resultados --- | |
| ( | |
| df_projection_display, | |
| total_novo_str, | |
| total_atual_trib_str, | |
| total_atual_calc_str, | |
| ) = empty_df, "", "", "" | |
| ( | |
| df_enchente_analysis_display, | |
| df_enchente_res_display, | |
| df_enchente_non_res_display, | |
| df_enchente_est_res_display, | |
| df_enchente_est_non_res_display, | |
| df_enchente_predial_geral_display, | |
| df_enchente_est_geral_display, | |
| ) = empty_df, empty_df, empty_df, empty_df, empty_df, empty_df, empty_df | |
| flood_plot_figs = [] | |
| yearly_aggregates, yearly_aggregates_raw, agg_dropdown_update, first_year_agg_df = ( | |
| empty_dict, | |
| empty_dict, | |
| gr.update(choices=[], value=None), | |
| empty_df, | |
| ) | |
| ( | |
| detailed_by_category, | |
| detailed_by_category_raw, | |
| detailed_by_value_ranges, | |
| detailed_by_value_ranges_raw | |
| ) = ({}, {}, {}, {}) | |
| excel_file_buffer, df_projection_raw, df_enchente_analysis_raw_data = ( | |
| None, | |
| empty_df, | |
| empty_df, | |
| ) | |
| if "Projeção de Arrecadação" in views_to_process: | |
| atual_total = X.sum() | |
| valores_projecao = [atual_total] | |
| # O valor "Sem Trava" agora é o IPTU_PURO_C_DESC | |
| Y_puro_com_desconto_sum = Y_puro_com_desconto.sum() | |
| valores_projecao.append(Y_puro_com_desconto_sum) | |
| for year in anos_projecao: | |
| valores_projecao.append(df_analysis[f"IPTU_PROJETADO_{year}"].sum()) | |
| anos_label = [ | |
| "Arrecadação Atual (Tributado)", | |
| "Projetado (Puro, c/ Desc.)", | |
| ] + [f"Projeção {year}" for year in anos_projecao] | |
| crescimento_absoluto = [v - atual_total for v in valores_projecao] | |
| crescimento_relativo = [ | |
| (v / atual_total - 1) * 100 if atual_total > 0 else 0 | |
| for v in valores_projecao | |
| ] | |
| df_projection_raw = pd.DataFrame( | |
| { | |
| "Ano": anos_label, | |
| "Valor Total Arrecadado": valores_projecao, | |
| "Crescimento Absoluto": crescimento_absoluto, | |
| "Crescimento Relativo (%)": crescimento_relativo, | |
| } | |
| ) | |
| df_projection_display = df_projection_raw.copy() | |
| df_projection_display["Valor Total Arrecadado"] = df_projection_display[ | |
| "Valor Total Arrecadado" | |
| ].apply(lambda x: f"R$ {x:,.2f}") | |
| df_projection_display["Crescimento Absoluto"] = df_projection_display[ | |
| "Crescimento Absoluto" | |
| ].apply(lambda x: f"R$ {x:,.2f}") | |
| df_projection_display["Crescimento Relativo (%)"] = df_projection_display[ | |
| "Crescimento Relativo (%)" | |
| ].apply(lambda x: f"{x:.2f}%") | |
| if "Totais Gerais" in views_to_process: | |
| total_novo_sum, total_atual_trib_sum, total_atual_calc_sum = ( | |
| Y_puro.sum(), | |
| X.sum(), | |
| the_merge["VLR_IMPOSTO_CALCULADO_iptu"].sum(), | |
| ) | |
| total_novo_str, total_atual_trib_str, total_atual_calc_str = ( | |
| f"Total IPTU Novo (Simulado Puro): {total_novo_sum:,.2f}", | |
| f"Total IPTU Atual Tributado: {total_atual_trib_sum:,.2f}", | |
| f"Total IPTU Atual Calculado: {total_atual_calc_sum:,.2f}", | |
| ) | |
| if "Análise da Enchente" in views_to_process: | |
| filtro_prediais_estacionamentos_tributados = ( | |
| prediais_base | estacionamentos_geral | |
| ) & (~nao_tributados) | |
| filtro_enchente_analysis = ( | |
| the_merge["SITUACAO_ALAGAMENTO"].isin( | |
| ["Direto", "Indireto", "Indeterminado"] | |
| ) | |
| & filtro_prediais_estacionamentos_tributados | |
| ) | |
| df_enchente_analysis_display, df_enchente_analysis_raw_data = generate_flood_analysis( | |
| the_merge, | |
| df_analysis, | |
| filtro_enchente_analysis, | |
| anos_projecao, | |
| descontos_anuais, | |
| ) | |
| if diferencia_predios_flag: | |
| filtro_res = FILTERS_GLOBAL["prediais_residenciais"] | |
| filtro_non_res = FILTERS_GLOBAL["prediais_nao_residenciais"] | |
| df_enchente_res_display, _ = generate_flood_analysis( | |
| the_merge, df_analysis, filtro_enchente_analysis & filtro_res, anos_projecao, descontos_anuais, | |
| ) | |
| df_enchente_non_res_display, _ = generate_flood_analysis( | |
| the_merge, df_analysis, filtro_enchente_analysis & filtro_non_res, anos_projecao, descontos_anuais, | |
| ) | |
| else: | |
| filtro_pred_geral = FILTERS_GLOBAL["prediais_base"] | |
| df_enchente_predial_geral_display, _ = generate_flood_analysis( | |
| the_merge, df_analysis, filtro_enchente_analysis & filtro_pred_geral, anos_projecao, descontos_anuais, | |
| ) | |
| if diferencia_estacionamentos_flag: | |
| filtro_est_res = FILTERS_GLOBAL["estacionamentos_residenciais"] | |
| filtro_est_non_res = FILTERS_GLOBAL["estacionamentos_nao_residenciais"] | |
| df_enchente_est_res_display, _ = generate_flood_analysis( | |
| the_merge, df_analysis, filtro_enchente_analysis & filtro_est_res, anos_projecao, descontos_anuais, | |
| ) | |
| df_enchente_est_non_res_display, _ = generate_flood_analysis( | |
| the_merge, df_analysis, filtro_enchente_analysis & filtro_est_non_res, anos_projecao, descontos_anuais, | |
| ) | |
| else: | |
| filtro_est_geral = FILTERS_GLOBAL["estacionamentos_geral"] | |
| df_enchente_est_geral_display, _ = generate_flood_analysis( | |
| the_merge, df_analysis, filtro_enchente_analysis & filtro_est_geral, anos_projecao, descontos_anuais, | |
| ) | |
| if "Gráficos da Enchente" in views_to_process and not df_enchente_analysis_raw_data.empty: | |
| anos_para_graficos = df_enchente_analysis_raw_data["Exercício"].tolist() | |
| flood_plot_figs = create_flood_analysis_graphs_plotly( | |
| the_merge, anos_para_graficos | |
| ) | |
| if "Resumo Detalhado por Faixa" in views_to_process: | |
| active_categories_faixa = [regra["name"] for regra in IPTU_RULES_FINAL.values()] | |
| primeiro_ano_proj = anos_projecao[0] if anos_projecao else None | |
| anos_colunas = [f"Total {year}" for year in anos_projecao] | |
| df_colunas = [ | |
| "Faixa (VV em UFM)", "Em R$", "Inscrições na Faixa", | |
| "Proporção Inscrições (%)", "Proporção Arrecadação (%)", | |
| "Total 2025", "Total Projeto (Puro, c/ Desc.)" | |
| ] + anos_colunas | |
| for regra_nome, regra in IPTU_RULES_FINAL.items(): | |
| nome_legivel, filtro_rule, faixas = regra["name"], regra["rule"], regra["ranges"] | |
| if nome_legivel not in active_categories_faixa or not filtro_rule.any(): | |
| continue | |
| df_filtrado = df_resumo_final[filtro_rule].copy() | |
| vv_perc, category_data = df_filtrado["valor_venal_calculo"], [] | |
| limites_com_zero = [0] + [lim[0] for lim in faixas] | |
| total_inscricoes_categoria = len(df_filtrado) | |
| total_categoria_final = the_merge.loc[filtro_rule, f"IPTU_PROJETADO_{anos_projecao[-1]}"].sum() if anos_projecao else 0 | |
| for i, (limite_superior_raw, _) in enumerate(faixas): | |
| faixa_ufm = f"{(limites_com_zero[i] / UFM_VALUE if UFM_VALUE != 0 else 0):,.0f} a {(limite_superior_raw / UFM_VALUE if UFM_VALUE != 0 else 'inf'):,.0f} UFM" | |
| faixa_reais = f"R$ {limites_com_zero[i]:,.0f} a R$ {limite_superior_raw:,.0f}" if np.isfinite(limite_superior_raw) else f"R$ {limites_com_zero[i]:,.0f} a inf" | |
| cond_exclusiva = (vv_perc > limites_com_zero[i]) & (vv_perc <= limite_superior_raw) | |
| indices_na_faixa = df_filtrado[cond_exclusiva].index | |
| if indices_na_faixa.empty: continue | |
| total_2025 = the_merge.loc[indices_na_faixa, "VLR_IMPOSTO_iptu"].sum() | |
| total_projeto_puro = the_merge.loc[indices_na_faixa, "IPTU_PURO_C_DESC"].sum() | |
| ano_totals = [the_merge.loc[indices_na_faixa, f"IPTU_PROJETADO_{year}"].sum() for year in anos_projecao] | |
| prop_inscricoes_str = f"{(len(indices_na_faixa) / total_inscricoes_categoria * 100):.1f}%" | |
| prop_arrecadacao_str = f"{(ano_totals[-1] / total_categoria_final * 100 if total_categoria_final > 0 else 0):.1f}%" | |
| category_data.append( | |
| [faixa_ufm, faixa_reais, len(indices_na_faixa), prop_inscricoes_str, prop_arrecadacao_str, total_2025, total_projeto_puro] + ano_totals | |
| ) | |
| if category_data: | |
| df_category_raw = pd.DataFrame(category_data, columns=df_colunas) | |
| detailed_by_category_raw[nome_legivel] = df_category_raw.copy() | |
| df_category_display = df_category_raw.copy() | |
| currency_cols_display = ["Total 2025", "Total Projeto (Puro, c/ Desc.)"] + anos_colunas | |
| for col in currency_cols_display: | |
| df_category_display[col] = df_category_display[col].apply(lambda x: f"R$ {x:,.2f}") | |
| detailed_by_category[nome_legivel] = df_category_display | |
| if "Resumo Detalhado por Valor Venal" in views_to_process: | |
| value_range_outputs_data = update_valor_venal_view( | |
| value_range_interval, aggregate_from_vv_value, df_resumo_final, the_merge, | |
| IPTU_RULES_FINAL, anos_projecao, diferencia_predios_flag, | |
| diferencia_estacionamentos_flag, return_updates=False, | |
| ) | |
| category_names = [rule['name'] for rule in IPTU_RULES_FINAL.values()] | |
| for i, category in enumerate(category_names): | |
| df_data = value_range_outputs_data[i * 2] | |
| df_raw_data = value_range_outputs_data[i * 2 + 1] | |
| if df_data is not None and not df_data.empty: | |
| detailed_by_value_ranges[category] = df_data | |
| detailed_by_value_ranges_raw[category] = df_raw_data | |
| if "Resumo Agregado" in views_to_process: | |
| conditions, choices = [], [] | |
| # Define as categorias base | |
| if diferencia_predios_flag: | |
| conditions.extend([prediais_residenciais, prediais_nao_residenciais]) | |
| choices.extend(["Predial Residencial", "Predial Não Residencial"]) | |
| else: | |
| conditions.append(prediais_base); choices.append("Predial Sem Dif. por Uso") | |
| if diferencia_estacionamentos_flag: | |
| conditions.extend([estacionamentos_residenciais, estacionamentos_nao_residenciais]) | |
| choices.extend(["Estacionamentos Residenciais", "Estacionamentos Não Residenciais"]) | |
| else: | |
| conditions.append(estacionamentos_geral); choices.append("Estacionamentos Sem Dif. por Uso") | |
| conditions.extend([terrenos_1df_base, terrenos_2df_base, terrenos_3df_base, regra_tae_base, regra_taf_base]) | |
| choices.extend(["Terrenos na 1a DF", "Terrenos na 2a DF", "Terrenos na 3a DF", "Terrenos Aliq. Esp.", "Terrenos Aliq. Fix."]) | |
| the_merge["CategoriaAgregada"] = np.select(conditions, choices, default="Outros/Isentos") | |
| # Anos a processar, incluindo o "sem trava" | |
| anos_para_agregar = ["Projetado (Puro, c/ Desc.)"] + anos_projecao | |
| colunas_iptu = {"Projetado (Puro, c/ Desc.)": "IPTU_PURO_C_DESC"} | |
| colunas_iptu.update({year: f"IPTU_PROJETADO_{year}" for year in anos_projecao}) | |
| for ano_label in anos_para_agregar: | |
| year_col = colunas_iptu[ano_label] | |
| if year_col not in the_merge.columns: continue | |
| agg_data = the_merge.groupby("CategoriaAgregada").agg( | |
| Inscrições=("VLR_IMPOSTO_iptu", "size"), | |
| Inscricoes_com_IPTU_Atual=("VLR_IMPOSTO_iptu", lambda x: (x > 0).sum()), | |
| Total_IPTU_Projetado=(year_col, "sum"), | |
| Total_IPTU_Atual_Tributado=("VLR_IMPOSTO_iptu", "sum"), | |
| ).reset_index() | |
| agg_data = agg_data[agg_data["CategoriaAgregada"] != "Outros/Isentos"] | |
| if agg_data.empty: continue | |
| total_inscricoes_com_iptu = (the_merge["VLR_IMPOSTO_iptu"] > 0).sum() | |
| total_atual_sum = agg_data["Total_IPTU_Atual_Tributado"].sum() | |
| total_projetado_sum = agg_data["Total_IPTU_Projetado"].sum() | |
| agg_data["Participação Inscrições IPTU>0 (%)"] = (agg_data["Inscricoes_com_IPTU_Atual"] / total_inscricoes_com_iptu * 100) if total_inscricoes_com_iptu > 0 else 0 | |
| agg_data["Participação no Total Atual (%)"] = (agg_data["Total_IPTU_Atual_Tributado"] / total_atual_sum * 100) if total_atual_sum > 0 else 0 | |
| agg_data["Participação no Total Projetado (%)"] = (agg_data["Total_IPTU_Projetado"] / total_projetado_sum * 100) if total_projetado_sum > 0 else 0 | |
| agg_data["Diferença Absoluta"] = agg_data["Total_IPTU_Projetado"] - agg_data["Total_IPTU_Atual_Tributado"] | |
| agg_data["Diferença Percentual (%)"] = (agg_data["Diferença Absoluta"] / agg_data["Total_IPTU_Atual_Tributado"] * 100).where(agg_data["Total_IPTU_Atual_Tributado"] > 0, 0) | |
| agg_data.rename(columns={ | |
| "CategoriaAgregada": "Categoria", "Inscricoes_com_IPTU_Atual": "Inscrições com IPTU > 0", | |
| "Total_IPTU_Projetado": f"Total IPTU Projetado {ano_label}", "Total_IPTU_Atual_Tributado": "Total IPTU Atual Tributado"}, | |
| inplace=True, | |
| ) | |
| final_cols_order = [ | |
| "Categoria", "Inscrições", "Inscrições com IPTU > 0", "Participação Inscrições IPTU>0 (%)", | |
| "Total IPTU Atual Tributado", "Participação no Total Atual (%)", f"Total IPTU Projetado {ano_label}", | |
| "Participação no Total Projetado (%)", "Diferença Absoluta", "Diferença Percentual (%)", | |
| ] | |
| agg_data = agg_data[final_cols_order] | |
| yearly_aggregates_raw[ano_label] = agg_data.copy() | |
| agg_data_display = agg_data.copy() | |
| currency_cols = ["Total IPTU Atual Tributado", f"Total IPTU Projetado {ano_label}", "Diferença Absoluta"] | |
| for col in currency_cols: | |
| agg_data_display[col] = agg_data_display[col].apply(lambda x: f"R$ {x:,.2f}" if pd.notna(x) else "R$ 0,00") | |
| percent_cols = ["Participação Inscrições IPTU>0 (%)", "Participação no Total Atual (%)", "Participação no Total Projetado (%)", "Diferença Percentual (%)"] | |
| for col in percent_cols: | |
| agg_data_display[col] = agg_data_display[col].apply(lambda x: f"{x:,.2f}%" if pd.notna(x) else "0,00%") | |
| yearly_aggregates[ano_label] = agg_data_display | |
| if anos_para_agregar: | |
| agg_dropdown_update = gr.update(choices=anos_para_agregar, value=anos_para_agregar[0]) | |
| first_year_agg_df = yearly_aggregates.get(anos_para_agregar[0], pd.DataFrame()) | |
| else: | |
| agg_dropdown_update = gr.update(choices=[], value=None) | |
| first_year_agg_df = pd.DataFrame() | |
| if "Gerar Arquivo Excel" in views_to_process: | |
| # A nova view de variação não é processada aqui, então seus dados não estarão prontos para o Excel | |
| # Para incluir, seria preciso processá-la aqui ou assumir que o usuário a gerou antes. | |
| # Por simplicidade, passaremos um dicionário vazio. Se a view tiver sido gerada, | |
| # seus resultados estarão no state, mas esta função não tem acesso a ele. | |
| excel_file_buffer = create_excel_download( | |
| df_projection_raw=df_projection_raw, | |
| total_novo_str=total_novo_str, | |
| total_atual_trib_str=total_atual_trib_str, | |
| total_atual_calc_str=total_atual_calc_str, | |
| ufm_value=ufm_value_input, | |
| diferencia_estacionamentos_flag=diferencia_estacionamentos_flag, | |
| diferencia_predios_flag=diferencia_predios_flag, | |
| trava_aumento_perc=trava_aumento_perc, | |
| segregar_trava_flag=segregar_trava_flag, | |
| universal_rules=universal_transition_rules, | |
| category_rules=transition_rules_by_category, | |
| descontos_anuais=descontos_anuais, | |
| all_parameters=all_rate_inputs, | |
| yearly_aggregates_raw=yearly_aggregates_raw, | |
| detailed_by_category_raw=detailed_by_category_raw, | |
| detailed_by_value_ranges_raw=detailed_by_value_ranges_raw, | |
| df_enchente_analysis_raw=df_enchente_analysis_raw_data, | |
| variation_analysis_raw={}, # Passando vazio pois é gerado por outro botão | |
| ) | |
| filename_text, download_button_result = "", gr.DownloadButton(visible=False) | |
| if excel_file_buffer: | |
| try: | |
| timestamp = datetime.now(pytz.timezone("America/Sao_Paulo")).strftime("%Y%m%d_%H%M") | |
| filename = f"resultados_iptu_{timestamp}.xlsx" | |
| temp_file_path = os.path.join(tempfile.gettempdir(), filename) | |
| with open(temp_file_path, "wb") as f: f.write(excel_file_buffer.getvalue()) | |
| size_str = f"{len(excel_file_buffer.getvalue()) / 1024:.1f} KB" | |
| download_button_result = gr.DownloadButton(label=f"Download ({size_str})", value=temp_file_path, visible=True) | |
| filename_text = filename | |
| except Exception as e: | |
| print(f"Erro ao criar arquivo Excel: {e}") | |
| projection_update = gr.update(value=df_projection_display, column_widths=calculate_column_widths(df_projection_display)) | |
| enchente_analysis_update = gr.update(value=df_enchente_analysis_display, column_widths=calculate_column_widths(df_enchente_analysis_display)) | |
| enchente_res_update = gr.update(value=df_enchente_res_display, column_widths=calculate_column_widths(df_enchente_res_display)) | |
| enchente_non_res_update = gr.update(value=df_enchente_non_res_display, column_widths=calculate_column_widths(df_enchente_non_res_display)) | |
| enchente_est_res_update = gr.update(value=df_enchente_est_res_display, column_widths=calculate_column_widths(df_enchente_est_res_display)) | |
| enchente_est_non_res_update = gr.update(value=df_enchente_est_non_res_display, column_widths=calculate_column_widths(df_enchente_est_non_res_display)) | |
| enchente_pred_geral_update = gr.update(value=df_enchente_predial_geral_display, column_widths=calculate_column_widths(df_enchente_predial_geral_display)) | |
| enchente_est_geral_update = gr.update(value=df_enchente_est_geral_display, column_widths=calculate_column_widths(df_enchente_est_geral_display)) | |
| first_year_agg_update = gr.update(value=first_year_agg_df, column_widths=calculate_column_widths(first_year_agg_df)) | |
| detailed_outputs_list = [ | |
| gr.update(value=df, column_widths=calculate_column_widths(df)) | |
| for df in [ | |
| detailed_by_category.get("Predial Sem Dif. por Uso", pd.DataFrame()), | |
| detailed_by_category.get("Predial Residencial", pd.DataFrame()), | |
| detailed_by_category.get("Predial Não Residencial", pd.DataFrame()), | |
| detailed_by_category.get("Estacionamentos Sem Dif. por Uso", pd.DataFrame()), | |
| detailed_by_category.get("Estacionamentos Residenciais", pd.DataFrame()), | |
| detailed_by_category.get("Estacionamentos Não Residenciais", pd.DataFrame()), | |
| detailed_by_category.get("Terrenos na 1a DF", pd.DataFrame()), | |
| detailed_by_category.get("Terrenos na 2a DF", pd.DataFrame()), | |
| detailed_by_category.get("Terrenos na 3a DF", pd.DataFrame()), | |
| detailed_by_category.get("Terrenos Aliq. Esp.", pd.DataFrame()), | |
| detailed_by_category.get("Terrenos Aliq. Fix.", pd.DataFrame()), | |
| detailed_by_category.get("Predial Residencial - Aposentado, Inativo, Pensionista, Deficiente", pd.DataFrame()), | |
| detailed_by_category.get("Predial Residencial - Habitações Populares", pd.DataFrame()), | |
| ] | |
| ] | |
| value_range_outputs_list = [ | |
| gr.update(value=df, column_widths=calculate_column_widths(df)) | |
| for df in [ | |
| detailed_by_value_ranges.get("Predial Sem Dif. por Uso", pd.DataFrame()), | |
| detailed_by_value_ranges.get("Predial Residencial", pd.DataFrame()), | |
| detailed_by_value_ranges.get("Predial Não Residencial", pd.DataFrame()), | |
| detailed_by_value_ranges.get("Estacionamentos Sem Dif. por Uso", pd.DataFrame()), | |
| detailed_by_value_ranges.get("Estacionamentos Residenciais", pd.DataFrame()), | |
| detailed_by_value_ranges.get("Estacionamentos Não Residenciais", pd.DataFrame()), | |
| detailed_by_value_ranges.get("Terrenos na 1a DF", pd.DataFrame()), | |
| detailed_by_value_ranges.get("Terrenos na 2a DF", pd.DataFrame()), | |
| detailed_by_value_ranges.get("Terrenos na 3a DF", pd.DataFrame()), | |
| detailed_by_value_ranges.get("Terrenos Aliq. Esp.", pd.DataFrame()), | |
| detailed_by_value_ranges.get("Terrenos Aliq. Fix.", pd.DataFrame()), | |
| detailed_by_value_ranges.get("Predial Residencial - Aposentado, Inativo, Pensionista, Deficiente", pd.DataFrame()), | |
| detailed_by_value_ranges.get("Predial Residencial - Habitações Populares", pd.DataFrame()), | |
| ] | |
| ] | |
| final_plot_outputs = [ | |
| gr.update(value=flood_plot_figs[i], visible=True) | |
| if i < len(flood_plot_figs) | |
| else gr.update(value=None, visible=False) | |
| for i in range(MAX_PLOTS) | |
| ] | |
| variation_analysis_years = ["Projetado (Puro, c/ Desc.)"] + anos_projecao | |
| variation_analysis_year_update = gr.update(choices=variation_analysis_years, value=variation_analysis_years[0]) | |
| return ( | |
| "Processamento concluído.", | |
| projection_update, | |
| total_novo_str, total_atual_trib_str, total_atual_calc_str, | |
| enchente_analysis_update, enchente_pred_geral_update, enchente_res_update, | |
| enchente_non_res_update, enchente_est_geral_update, enchente_est_res_update, | |
| enchente_est_non_res_update, | |
| yearly_aggregates, agg_dropdown_update, first_year_agg_update, | |
| gr.Textbox(value=filename_text, visible=bool(filename_text)), download_button_result, | |
| ) + tuple(detailed_outputs_list) \ | |
| + tuple(value_range_outputs_list) \ | |
| + tuple(final_plot_outputs) \ | |
| + (df_resumo_final, the_merge, IPTU_RULES_FINAL, anos_projecao, detailed_by_value_ranges_raw) \ | |
| + tuple(visibility_updates) \ | |
| + (variation_analysis_year_update,) | |
| # --- Função dedicada para atualizar a visão de valor venal (MODIFICADA) --- | |
| def update_valor_venal_view( | |
| interval_value, | |
| aggregate_from_value, | |
| df_resumo, | |
| the_merge_df, | |
| iptu_rules, | |
| anos_projecao, | |
| diferencia_predios, | |
| diferencia_estacionamentos, | |
| return_updates=True, | |
| ): | |
| if df_resumo is None or the_merge_df is None or iptu_rules is None or anos_projecao is None: | |
| empty_df, num_cats = pd.DataFrame(), 13 | |
| if return_updates: return (gr.update(value=empty_df, column_widths=None),) * num_cats | |
| else: return (empty_df, empty_df) * num_cats | |
| category_names = [rule['name'] for rule in iptu_rules.values()] | |
| all_outputs = [] | |
| interval = interval_value if interval_value and interval_value > 0 else 300000.0 | |
| aggregate_from = aggregate_from_value if aggregate_from_value and aggregate_from_value > 0 else float("inf") | |
| anos_colunas = [f"Total {year}" for year in anos_projecao] | |
| df_colunas = ["Faixa de Valor Venal", "Inscrições na Faixa", "Proporção Inscrições (%)", | |
| "Proporção Arrecadação (%)", "Total 2025", "Total Projeto (Puro, c/ Desc.)"] + anos_colunas | |
| for regra_nome, regra in iptu_rules.items(): | |
| nome_legivel, filtro_rule = regra["name"], regra["rule"] | |
| df_filtrado = df_resumo[filtro_rule].copy() if filtro_rule.any() else pd.DataFrame() | |
| if df_filtrado.empty: | |
| all_outputs.extend([pd.DataFrame(), pd.DataFrame()]) | |
| continue | |
| vv_values = df_filtrado["valor_venal_calculo"] | |
| total_inscricoes_categoria = len(df_filtrado) | |
| total_categoria_final = the_merge_df.loc[filtro_rule, f"IPTU_PROJETADO_{anos_projecao[-1]}"].sum() if anos_projecao else 0 | |
| max_value = vv_values.max() | |
| ranges, category_data = [], [] | |
| if max_value > 0: | |
| current_min = 0 | |
| while current_min < max_value and current_min < aggregate_from and interval > 0: | |
| ranges.append((current_min, current_min + interval)) | |
| current_min += interval | |
| if current_min < max_value: ranges.append((current_min, max_value)) | |
| if not ranges: ranges.append((0, max_value)) | |
| for range_min, range_max in ranges: | |
| range_filter = (vv_values >= range_min) & (vv_values < range_max) | |
| if range_max >= max_value: range_filter |= (vv_values == max_value) | |
| indices_na_faixa = df_filtrado[range_filter].index | |
| if indices_na_faixa.empty: continue | |
| faixa_nome = f"R$ {range_min:,.0f} a R$ {range_max:,.0f}" | |
| qnt, total_2025 = len(indices_na_faixa), the_merge_df.loc[indices_na_faixa, "VLR_IMPOSTO_iptu"].sum() | |
| total_projeto_puro = the_merge_df.loc[indices_na_faixa, "IPTU_PURO_C_DESC"].sum() | |
| ano_totals = [the_merge_df.loc[indices_na_faixa, f"IPTU_PROJETADO_{year}"].sum() for year in anos_projecao] | |
| prop_insc = f"{(qnt / total_inscricoes_categoria * 100):.1f}%" | |
| prop_arrec = f"{(ano_totals[-1] / total_categoria_final * 100 if total_categoria_final > 0 else 0):.1f}%" | |
| category_data.append([faixa_nome, qnt, prop_insc, prop_arrec, total_2025, total_projeto_puro] + ano_totals) | |
| if category_data: | |
| df_category_raw = pd.DataFrame(category_data, columns=df_colunas) | |
| df_category_display = df_category_raw.copy() | |
| currency_cols_display = ["Total 2025", "Total Projeto (Puro, c/ Desc.)"] + anos_colunas | |
| for col in currency_cols_display: | |
| df_category_display[col] = df_category_display[col].apply(lambda x: f"R$ {x:,.2f}") | |
| all_outputs.extend([df_category_display, df_category_raw]) | |
| else: | |
| all_outputs.extend([pd.DataFrame(), pd.DataFrame()]) | |
| if return_updates: | |
| return tuple(gr.update(value=df, column_widths=calculate_column_widths(df)) for df in all_outputs[::2]) | |
| else: | |
| return tuple(all_outputs) | |
| # --- Função de visibilidade e controle da UI (MODIFICADO) --- | |
| def update_parking_visibility(diferencia_estacionamentos): | |
| return ( | |
| gr.Accordion(visible=not diferencia_estacionamentos), | |
| gr.Accordion(visible=diferencia_estacionamentos), | |
| gr.Accordion(visible=diferencia_estacionamentos), | |
| ) | |
| def update_building_visibility(diferencia_predios): | |
| return ( | |
| gr.Accordion(visible=not diferencia_predios), | |
| gr.Accordion(visible=diferencia_predios), | |
| gr.Accordion(visible=diferencia_predios), | |
| ) | |
| def update_detailed_tabs_visibility(diferencia_predios, diferencia_estacionamentos): | |
| return ( | |
| gr.update(visible=not diferencia_predios), | |
| gr.update(visible=diferencia_predios), | |
| gr.update(visible=diferencia_predios), | |
| gr.update(visible=not diferencia_estacionamentos), | |
| gr.update(visible=diferencia_estacionamentos), | |
| gr.update(visible=diferencia_estacionamentos), | |
| gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), | |
| gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), | |
| gr.update(visible=True), | |
| ) | |
| def update_detailed_vv_tabs_visibility(diferencia_predios, diferencia_estacionamentos): | |
| return ( | |
| gr.update(visible=not diferencia_predios), | |
| gr.update(visible=diferencia_predios), | |
| gr.update(visible=diferencia_predios), | |
| gr.update(visible=not diferencia_estacionamentos), | |
| gr.update(visible=diferencia_estacionamentos), | |
| gr.update(visible=diferencia_estacionamentos), | |
| gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), | |
| gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), | |
| gr.update(visible=True), | |
| ) | |
| def update_selected_tabs(is_differentiated): | |
| """Muda a aba selecionada com base no checkbox de diferenciação.""" | |
| if is_differentiated: | |
| # Quando diferenciar, torna a aba "Residencial" ativa. | |
| return ( | |
| gr.update(selected="pr_faixa"), | |
| gr.update(selected="pr_vv"), | |
| gr.update(selected="pr_var") | |
| ) | |
| else: | |
| # Quando não diferenciar, torna a aba "Geral" ativa. | |
| return ( | |
| gr.update(selected="pg_faixa"), | |
| gr.update(selected="pg_vv"), | |
| gr.update(selected="pg_var") | |
| ) | |
| def add_year_rule(rules): | |
| if len(rules) >= MAX_YEARS: | |
| return rules | |
| new_year = rules[-1]["year"] + 1 if rules else 2026 | |
| rules.append({"year": new_year, "factor": 1.0}) | |
| return rules | |
| def remove_year_rule(rules): | |
| if len(rules) > 1: | |
| rules.pop() | |
| if rules: | |
| rules[-1]["factor"] = 1.0 | |
| return rules | |
| def update_transition_ui(rules): | |
| updates = [] | |
| num_rules = len(rules) | |
| for i in range(MAX_YEARS): | |
| if i < num_rules: | |
| updates.append(gr.update(visible=True)) | |
| updates.append(gr.update(value=rules[i]["year"])) | |
| updates.append(gr.update(value=rules[i]["factor"] * 100)) | |
| else: | |
| updates.append(gr.update(visible=False)) | |
| updates.append(gr.update()) | |
| updates.append(gr.update()) | |
| return updates | |
| def update_rule_factor(index, new_factor_perc, rules): | |
| if 0 <= index < len(rules): | |
| rules[index]["factor"] = (new_factor_perc or 0) / 100.0 | |
| return rules | |
| def get_agg_df_for_year(selected_year, all_aggregates): | |
| """ | |
| Função melhorada para obter o DataFrame agregado para o ano selecionado | |
| """ | |
| # Handle the case where all_aggregates might be a State object or empty | |
| if hasattr(all_aggregates, "value"): | |
| # If it's a State object, extract the value | |
| aggregates_dict = all_aggregates.value | |
| else: | |
| # It should be the actual dictionary | |
| aggregates_dict = all_aggregates | |
| # Check if we have valid data | |
| if ( | |
| not selected_year | |
| or not aggregates_dict | |
| or not isinstance(aggregates_dict, dict) | |
| ): | |
| return gr.update(value=pd.DataFrame(), column_widths=None) | |
| # Try to get the dataframe for the selected year | |
| try: | |
| df = aggregates_dict.get(selected_year, pd.DataFrame()) | |
| return gr.update(value=df, column_widths=calculate_column_widths(df)) | |
| except (ValueError, TypeError) as e: | |
| return gr.update(value=pd.DataFrame(), column_widths=None) | |
| def update_category_rules_ui(selected_category, rules_by_category): | |
| rules = rules_by_category.get(selected_category, []) | |
| return update_transition_ui(rules) | |
| def add_year_to_category(selected_category, rules_by_category): | |
| if selected_category not in rules_by_category: | |
| rules_by_category[selected_category] = [] | |
| rules = rules_by_category[selected_category] | |
| add_year_rule(rules) | |
| return rules_by_category | |
| def remove_year_from_category(selected_category, rules_by_category): | |
| if selected_category in rules_by_category: | |
| rules = rules_by_category[selected_category] | |
| remove_year_rule(rules) | |
| return rules_by_category | |
| def update_category_rule_factor( | |
| index, new_factor_perc, selected_category, rules_by_category | |
| ): | |
| if selected_category in rules_by_category: | |
| rules = rules_by_category[selected_category] | |
| update_rule_factor(index, new_factor_perc, rules) | |
| return rules_by_category | |
| def copy_rules_to_category(source_category, target_category, rules_by_category): | |
| if source_category in rules_by_category and target_category is not None: | |
| rules_by_category[target_category] = [ | |
| rule.copy() for rule in rules_by_category[source_category] | |
| ] | |
| return rules_by_category | |
| # --- FUNÇÃO CORRIGIDA: Análise de Variação - Lógica Passo a Passo --- | |
| def generate_variation_analysis_view( | |
| limit_perc, interval_perc, selected_year, abrangencia, the_merge_df, iptu_rules, | |
| diferencia_predios, diferencia_estacionamentos, | |
| ): | |
| # Validações | |
| if the_merge_df is None or iptu_rules is None: | |
| return [gr.update(value="Erro: Dados principais não processados. Clique em 'Processar Dados' primeiro.", visible=True)] \ | |
| + [gr.update(value=pd.DataFrame())] * 13 + [gr.State({})] | |
| # Verificar se é modo simples (limite e intervalo são zero) | |
| modo_simples = (limit_perc == 0 and interval_perc == 0) | |
| if not modo_simples and (not limit_perc or not interval_perc or limit_perc % interval_perc != 0): | |
| return [gr.update(value="Erro: O intervalo deve ser um divisor exato do limite.", visible=True)] \ | |
| + [gr.update(value=pd.DataFrame())] * 13 + [gr.State({})] | |
| col_iptu_atual = "VLR_IMPOSTO_iptu" | |
| col_iptu_projetado = "IPTU_PURO_C_DESC" if selected_year == "Projetado (Puro, c/ Desc.)" else f"IPTU_PROJETADO_{selected_year}" | |
| if col_iptu_projetado not in the_merge_df.columns: | |
| return [gr.update(value=f"Erro: A coluna de projeção para '{selected_year}' não foi encontrada.", visible=True)] \ | |
| + [gr.update(value=pd.DataFrame())] * 13 + [gr.State({})] | |
| df = the_merge_df.copy() | |
| # Aplicar filtro de abrangência | |
| if abrangencia == "Apenas imóveis da mancha": | |
| filtro_mancha = df["SITUACAO_ALAGAMENTO"].isin(["Direto", "Indireto", "Indeterminado"]) | |
| df = df[filtro_mancha].copy() | |
| if df.empty: | |
| return [gr.update(value="Erro: Nenhum imóvel encontrado na mancha da enchente.", visible=True)] \ | |
| + [gr.update(value=pd.DataFrame())] * 13 + [gr.State({})] | |
| # Calcular variação | |
| iptu_atual = df[col_iptu_atual] | |
| iptu_projetado = df[col_iptu_projetado] | |
| with np.errstate(divide='ignore', invalid='ignore'): | |
| variacao_perc = pd.Series( | |
| np.where(iptu_atual > 0, (iptu_projetado / iptu_atual - 1) * 100, np.nan), | |
| index=df.index | |
| ) | |
| # Preparar faixas ordenadas | |
| if modo_simples: | |
| faixas_ordenadas = ["Não Lançado", "Permanece Zero", "Não Mais Tributados", "Novos Tributados", "Diminuiu/Manteve", "Aumentou"] | |
| else: | |
| # Gerar faixas de variação detalhadas | |
| limit_abs = abs(limit_perc) | |
| faixas_variacao = [] | |
| faixas_variacao.append(f"Abaixo de {-limit_abs}%") | |
| limite_inferior = -limit_abs | |
| while limite_inferior < limit_abs: | |
| limite_superior = limite_inferior + interval_perc | |
| faixas_variacao.append(f"De {limite_inferior}% a {limite_superior}%") | |
| limite_inferior = limite_superior | |
| faixas_variacao.append(f"Acima de {limit_abs}%") | |
| faixas_ordenadas = ["Não Lançado", "Permanece Zero", "Não Mais Tributados", "Novos Tributados"] + faixas_variacao | |
| # Debug inicial | |
| total_registros = len(df) | |
| print(f"DEBUG - Total de registros: {total_registros}") | |
| # Criar mapeamento de filtros base - ORDEM CORRIGIDA PARA BATER COM OS OUTPUTS | |
| category_order = [ | |
| "Predial Residencial", "Predial Não Residencial", "Predial Sem Dif. por Uso", # Ordem corrigida | |
| "Estacionamentos Sem Dif. por Uso", "Estacionamentos Residenciais", "Estacionamentos Não Residenciais", | |
| "Terrenos na 1a DF", "Terrenos na 2a DF", "Terrenos na 3a DF", | |
| "Terrenos Aliq. Esp.", "Terrenos Aliq. Fix.", | |
| "Predial Residencial - Aposentado, Inativo, Pensionista, Deficiente", | |
| "Predial Residencial - Habitações Populares" | |
| ] | |
| filters_by_name = {} | |
| if diferencia_predios: | |
| filters_by_name["Predial Residencial"] = FILTERS_GLOBAL["prediais_residenciais"] | |
| filters_by_name["Predial Não Residencial"] = FILTERS_GLOBAL["prediais_nao_residenciais"] | |
| else: | |
| filters_by_name["Predial Sem Dif. por Uso"] = FILTERS_GLOBAL["prediais_base"] | |
| if diferencia_estacionamentos: | |
| filters_by_name["Estacionamentos Residenciais"] = FILTERS_GLOBAL["estacionamentos_residenciais"] | |
| filters_by_name["Estacionamentos Não Residenciais"] = FILTERS_GLOBAL["estacionamentos_nao_residenciais"] | |
| else: | |
| filters_by_name["Estacionamentos Sem Dif. por Uso"] = FILTERS_GLOBAL["estacionamentos_geral"] | |
| filters_by_name.update({ | |
| "Terrenos na 1a DF": FILTERS_GLOBAL["terrenos_1df_base"], | |
| "Terrenos na 2a DF": FILTERS_GLOBAL["terrenos_2df_base"], | |
| "Terrenos na 3a DF": FILTERS_GLOBAL["terrenos_3df_base"], | |
| "Terrenos Aliq. Esp.": FILTERS_GLOBAL["regra_tae_base"], | |
| "Terrenos Aliq. Fix.": FILTERS_GLOBAL["regra_taf_base"], | |
| "Predial Residencial - Aposentado, Inativo, Pensionista, Deficiente": FILTERS_GLOBAL["isen_prop_loc"], | |
| "Predial Residencial - Habitações Populares": FILTERS_GLOBAL["isen_hab_pop"] | |
| }) | |
| all_dfs_display = [] | |
| all_dfs_raw = {} | |
| total_arrec_atual = df.loc[df[col_iptu_atual] > 0, col_iptu_atual].sum() | |
| total_arrec_projetado = df.loc[df[col_iptu_projetado] > 0, col_iptu_projetado].sum() | |
| # PRIMEIRO: PROCESSAR TOTAIS GERAIS (PRIMEIRA ABA) | |
| print(f"\nDEBUG TOTAIS GERAIS - Total geral: {len(df)}") | |
| df_geral = df.copy() | |
| df_geral['Faixa de Variação'] = "Não Aplicável" | |
| indices_restantes_geral = df_geral.index.copy() | |
| # LÓGICA PASSO A PASSO PARA TOTAIS GERAIS | |
| # PASSO 1: Não Lançado (filtro de não tributados) | |
| try: | |
| nao_tributados_global = FILTERS_GLOBAL["nao_tributados"] | |
| nao_tributados_gerais = nao_tributados_global.loc[df_geral.index] | |
| indices_nao_tributados_geral = df_geral.index[nao_tributados_gerais] | |
| df_geral.loc[indices_nao_tributados_geral, 'Faixa de Variação'] = "Não Lançado" | |
| indices_restantes_geral = indices_restantes_geral.difference(indices_nao_tributados_geral) | |
| print(f"DEBUG GERAL - Passo 1 (Não Lançado): {len(indices_nao_tributados_geral)}") | |
| except Exception as e: | |
| print(f"DEBUG GERAL - Erro no Passo 1: {e}") | |
| indices_nao_tributados_geral = pd.Index([]) | |
| # PASSO 2: Permanece Zero | |
| if len(indices_restantes_geral) > 0: | |
| mask_permanece_zero_geral = (df_geral.loc[indices_restantes_geral, col_iptu_atual] == 0) & (df_geral.loc[indices_restantes_geral, col_iptu_projetado] == 0) | |
| indices_permanece_zero_geral = indices_restantes_geral[mask_permanece_zero_geral] | |
| df_geral.loc[indices_permanece_zero_geral, 'Faixa de Variação'] = "Permanece Zero" | |
| indices_restantes_geral = indices_restantes_geral.difference(indices_permanece_zero_geral) | |
| print(f"DEBUG GERAL - Passo 2 (Permanece Zero): {len(indices_permanece_zero_geral)}") | |
| # PASSO 3: Não Mais Tributados | |
| if len(indices_restantes_geral) > 0: | |
| mask_nao_mais_trib_geral = (df_geral.loc[indices_restantes_geral, col_iptu_atual] > 0) & (df_geral.loc[indices_restantes_geral, col_iptu_projetado] == 0) | |
| indices_nao_mais_trib_geral = indices_restantes_geral[mask_nao_mais_trib_geral] | |
| df_geral.loc[indices_nao_mais_trib_geral, 'Faixa de Variação'] = "Não Mais Tributados" | |
| indices_restantes_geral = indices_restantes_geral.difference(indices_nao_mais_trib_geral) | |
| print(f"DEBUG GERAL - Passo 3 (Não Mais Tributados): {len(indices_nao_mais_trib_geral)}") | |
| # PASSO 4: Novos Tributados | |
| if len(indices_restantes_geral) > 0: | |
| mask_novos_trib_geral = (df_geral.loc[indices_restantes_geral, col_iptu_atual] == 0) & (df_geral.loc[indices_restantes_geral, col_iptu_projetado] > 0) | |
| indices_novos_trib_geral = indices_restantes_geral[mask_novos_trib_geral] | |
| df_geral.loc[indices_novos_trib_geral, 'Faixa de Variação'] = "Novos Tributados" | |
| indices_restantes_geral = indices_restantes_geral.difference(indices_novos_trib_geral) | |
| print(f"DEBUG GERAL - Passo 4 (Novos Tributados): {len(indices_novos_trib_geral)}") | |
| # PASSO 5: Aumentos e Reduções | |
| if len(indices_restantes_geral) > 0: | |
| if modo_simples: | |
| # Modo simples | |
| variacao_restantes_geral = variacao_perc.loc[indices_restantes_geral] | |
| mask_diminuiu_geral = variacao_restantes_geral <= 0 | |
| mask_aumentou_geral = variacao_restantes_geral > 0 | |
| indices_diminuiu_geral = indices_restantes_geral[mask_diminuiu_geral] | |
| indices_aumentou_geral = indices_restantes_geral[mask_aumentou_geral] | |
| df_geral.loc[indices_diminuiu_geral, 'Faixa de Variação'] = "Diminuiu/Manteve" | |
| df_geral.loc[indices_aumentou_geral, 'Faixa de Variação'] = "Aumentou" | |
| print(f"DEBUG GERAL - Passo 5a (Diminuiu/Manteve): {len(indices_diminuiu_geral)}") | |
| print(f"DEBUG GERAL - Passo 5b (Aumentou): {len(indices_aumentou_geral)}") | |
| else: | |
| # Modo detalhado | |
| variacao_restantes_geral = variacao_perc.loc[indices_restantes_geral] | |
| limit_abs = abs(limit_perc) | |
| for i, faixa_nome in enumerate(faixas_variacao): | |
| if i == 0: | |
| mask_faixa_geral = variacao_restantes_geral < -limit_abs | |
| elif i == len(faixas_variacao) - 1: | |
| mask_faixa_geral = variacao_restantes_geral >= limit_abs | |
| else: | |
| limite_inf = -limit_abs + (i - 1) * interval_perc | |
| limite_sup = limite_inf + interval_perc | |
| mask_faixa_geral = (variacao_restantes_geral >= limite_inf) & (variacao_restantes_geral < limite_sup) | |
| indices_faixa_geral = indices_restantes_geral[mask_faixa_geral] | |
| df_geral.loc[indices_faixa_geral, 'Faixa de Variação'] = faixa_nome | |
| if len(indices_faixa_geral) > 0: | |
| print(f"DEBUG GERAL - Passo 5 ({faixa_nome}): {len(indices_faixa_geral)}") | |
| # Processar resultado geral | |
| def processar_resultado(df_proc, nome_categoria): | |
| if modo_simples: | |
| # MODO SIMPLES: Agregar em apenas 2 categorias | |
| df_proc['Faixa de Variação Simples'] = "Não Aplicável" | |
| mask_diminuiu_geral = df_proc['Faixa de Variação'].isin([ | |
| "Não Lançado", "Permanece Zero", "Não Mais Tributados", "Diminuiu/Manteve" | |
| ]) | |
| df_proc.loc[mask_diminuiu_geral, 'Faixa de Variação Simples'] = "Diminuiu/Manteve" | |
| mask_aumentou_geral = df_proc['Faixa de Variação'].isin([ | |
| "Novos Tributados", "Aumentou" | |
| ]) | |
| df_proc.loc[mask_aumentou_geral, 'Faixa de Variação Simples'] = "Aumentou" | |
| diminuiu_count = mask_diminuiu_geral.sum() | |
| aumentou_count = mask_aumentou_geral.sum() | |
| print(f"DEBUG {nome_categoria} - SIMPLES: Diminuiu/Manteve: {diminuiu_count}, Aumentou: {aumentou_count}") | |
| categorias_simples = ["Diminuiu/Manteve", "Aumentou"] | |
| df_proc['Faixa de Variação Simples'] = pd.Categorical(df_proc['Faixa de Variação Simples'], categories=categorias_simples, ordered=True) | |
| agg = df_proc.groupby('Faixa de Variação Simples', observed=False).agg( | |
| N_Inscricoes=('Faixa de Variação Simples', 'count'), | |
| Total_IPTU_Atual=(col_iptu_atual, 'sum'), | |
| Total_IPTU_Projetado=(col_iptu_projetado, 'sum') | |
| ).reset_index() | |
| agg = agg.rename(columns={'Faixa de Variação Simples': 'Faixa de Variação'}) | |
| else: | |
| # MODO DETALHADO | |
| df_proc['Faixa de Variação'] = pd.Categorical(df_proc['Faixa de Variação'], categories=faixas_ordenadas, ordered=True) | |
| agg = df_proc.groupby('Faixa de Variação', observed=False).agg( | |
| N_Inscricoes=('Faixa de Variação', 'count'), | |
| Total_IPTU_Atual=(col_iptu_atual, 'sum'), | |
| Total_IPTU_Projetado=(col_iptu_projetado, 'sum') | |
| ).reset_index() | |
| total_inscricoes = agg['N_Inscricoes'].sum() | |
| agg['%_do_Total_de_Inscrições'] = (agg['N_Inscricoes'] / total_inscricoes * 100) if total_inscricoes > 0 else 0 | |
| agg['Diferença_Absoluta'] = agg['Total_IPTU_Projetado'] - agg['Total_IPTU_Atual'] | |
| agg['Participação_na_Arrecadação_Atual_%'] = (agg['Total_IPTU_Atual'] / total_arrec_atual * 100) if total_arrec_atual > 0 else 0 | |
| agg['Participação_na_Arrecadação_Projetada_%'] = (agg['Total_IPTU_Projetado'] / total_arrec_projetado * 100) if total_arrec_projetado > 0 else 0 | |
| agg = agg.rename(columns={ | |
| 'N_Inscricoes': 'Nº de Inscrições', | |
| '%_do_Total_de_Inscrições': '% do Total de Inscrições', | |
| 'Total_IPTU_Atual': 'Total IPTU Atual (2025)', | |
| 'Total_IPTU_Projetado': f'Total IPTU Projetado ({selected_year})', | |
| 'Diferença_Absoluta': 'Diferença Absoluta (R$)', | |
| 'Participação_na_Arrecadação_Atual_%': 'Participação na Arrecadação Atual (%)', | |
| 'Participação_na_Arrecadação_Projetada_%': 'Participação na Arrecadação Projetada (%)', | |
| }) | |
| final_cols = ['Faixa de Variação', 'Nº de Inscrições', '% do Total de Inscrições', 'Total IPTU Atual (2025)', | |
| f'Total IPTU Projetado ({selected_year})', 'Diferença Absoluta (R$)', | |
| 'Participação na Arrecadação Atual (%)', 'Participação na Arrecadação Projetada (%)'] | |
| agg = agg[final_cols] | |
| return agg | |
| # Processar totais gerais | |
| agg_geral = processar_resultado(df_geral, "TOTAIS GERAIS") | |
| all_dfs_raw["Totais Gerais"] = agg_geral.copy() | |
| # Formatação para exibição dos totais gerais | |
| df_display_geral = agg_geral.copy() | |
| currency_cols = ['Total IPTU Atual (2025)', f'Total IPTU Projetado ({selected_year})', 'Diferença Absoluta (R$)'] | |
| percent_cols = ['% do Total de Inscrições', 'Participação na Arrecadação Atual (%)', 'Participação na Arrecadação Projetada (%)'] | |
| for col in currency_cols: | |
| df_display_geral[col] = df_display_geral[col].apply(lambda x: f"R$ {x:,.2f}") | |
| for col in percent_cols: | |
| df_display_geral[col] = df_display_geral[col].apply(lambda x: f"{x:.2f}%") | |
| # Adicionar totais gerais como primeira aba | |
| all_dfs_display.append(gr.update(value=df_display_geral, column_widths=calculate_column_widths(df_display_geral))) | |
| # DEPOIS: PROCESSAR POR CATEGORIA (DEMAIS ABAS) | |
| # Iterar por categoria | |
| for nome_legivel in category_order: | |
| if nome_legivel not in filters_by_name: | |
| all_dfs_display.append(gr.update(value=pd.DataFrame())) | |
| continue | |
| filtro_rule = filters_by_name[nome_legivel] | |
| if not filtro_rule.any(): | |
| all_dfs_display.append(gr.update(value=pd.DataFrame())) | |
| continue | |
| # Aplicar filtro da categoria | |
| df_cat = df[filtro_rule[df.index]].copy() | |
| if df_cat.empty: | |
| all_dfs_display.append(gr.update(value=pd.DataFrame())) | |
| continue | |
| print(f"\nDEBUG {nome_legivel} - Total na categoria: {len(df_cat)}") | |
| # LÓGICA PASSO A PASSO - SEQUENCIAL E EXCLUSIVA | |
| df_cat['Faixa de Variação'] = "Não Aplicável" | |
| indices_restantes = df_cat.index.copy() | |
| # PASSO 1: Não Lançado (filtro de não tributados) | |
| try: | |
| nao_tributados_global = FILTERS_GLOBAL["nao_tributados"] | |
| nao_tributados_nesta_cat = nao_tributados_global.loc[df_cat.index] | |
| indices_nao_tributados = df_cat.index[nao_tributados_nesta_cat] | |
| df_cat.loc[indices_nao_tributados, 'Faixa de Variação'] = "Não Lançado" | |
| indices_restantes = indices_restantes.difference(indices_nao_tributados) | |
| print(f"DEBUG {nome_legivel} - Passo 1 (Não Lançado): {len(indices_nao_tributados)}") | |
| except Exception as e: | |
| print(f"DEBUG {nome_legivel} - Erro no Passo 1: {e}") | |
| indices_nao_tributados = pd.Index([]) | |
| # PASSO 2: Permanece Zero (eram zero, continuam zero, MAS não são não tributados) | |
| if len(indices_restantes) > 0: | |
| mask_permanece_zero = (df_cat.loc[indices_restantes, col_iptu_atual] == 0) & (df_cat.loc[indices_restantes, col_iptu_projetado] == 0) | |
| indices_permanece_zero = indices_restantes[mask_permanece_zero] | |
| df_cat.loc[indices_permanece_zero, 'Faixa de Variação'] = "Permanece Zero" | |
| indices_restantes = indices_restantes.difference(indices_permanece_zero) | |
| print(f"DEBUG {nome_legivel} - Passo 2 (Permanece Zero): {len(indices_permanece_zero)}") | |
| # PASSO 3: Não Mais Tributados (eram positivos, agora zero, não são não tributados) | |
| if len(indices_restantes) > 0: | |
| mask_nao_mais_trib = (df_cat.loc[indices_restantes, col_iptu_atual] > 0) & (df_cat.loc[indices_restantes, col_iptu_projetado] == 0) | |
| indices_nao_mais_trib = indices_restantes[mask_nao_mais_trib] | |
| df_cat.loc[indices_nao_mais_trib, 'Faixa de Variação'] = "Não Mais Tributados" | |
| indices_restantes = indices_restantes.difference(indices_nao_mais_trib) | |
| print(f"DEBUG {nome_legivel} - Passo 3 (Não Mais Tributados): {len(indices_nao_mais_trib)}") | |
| # PASSO 4: Novos Tributados (eram zero, agora positivos, não são não tributados) | |
| if len(indices_restantes) > 0: | |
| mask_novos_trib = (df_cat.loc[indices_restantes, col_iptu_atual] == 0) & (df_cat.loc[indices_restantes, col_iptu_projetado] > 0) | |
| indices_novos_trib = indices_restantes[mask_novos_trib] | |
| df_cat.loc[indices_novos_trib, 'Faixa de Variação'] = "Novos Tributados" | |
| indices_restantes = indices_restantes.difference(indices_novos_trib) | |
| print(f"DEBUG {nome_legivel} - Passo 4 (Novos Tributados): {len(indices_novos_trib)}") | |
| # PASSO 5: Aumentos e Reduções (eram positivos, continuam positivos) | |
| if len(indices_restantes) > 0: | |
| if modo_simples: | |
| # Modo simples: apenas aumentou vs diminuiu/manteve | |
| variacao_restantes = variacao_perc.loc[indices_restantes] | |
| mask_diminuiu = variacao_restantes <= 0 | |
| mask_aumentou = variacao_restantes > 0 | |
| indices_diminuiu = indices_restantes[mask_diminuiu] | |
| indices_aumentou = indices_restantes[mask_aumentou] | |
| df_cat.loc[indices_diminuiu, 'Faixa de Variação'] = "Diminuiu/Manteve" | |
| df_cat.loc[indices_aumentou, 'Faixa de Variação'] = "Aumentou" | |
| print(f"DEBUG {nome_legivel} - Passo 5a (Diminuiu/Manteve): {len(indices_diminuiu)}") | |
| print(f"DEBUG {nome_legivel} - Passo 5b (Aumentou): {len(indices_aumentou)}") | |
| else: | |
| # Modo detalhado: faixas de variação | |
| variacao_restantes = variacao_perc.loc[indices_restantes] | |
| limit_abs = abs(limit_perc) | |
| # Aplicar faixas | |
| for i, faixa_nome in enumerate(faixas_variacao): | |
| if i == 0: # Primeira faixa: abaixo do limite negativo | |
| mask_faixa = variacao_restantes < -limit_abs | |
| elif i == len(faixas_variacao) - 1: # Última faixa: acima do limite positivo | |
| mask_faixa = variacao_restantes >= limit_abs | |
| else: # Faixas intermediárias | |
| limite_inf = -limit_abs + (i - 1) * interval_perc | |
| limite_sup = limite_inf + interval_perc | |
| mask_faixa = (variacao_restantes >= limite_inf) & (variacao_restantes < limite_sup) | |
| indices_faixa = indices_restantes[mask_faixa] | |
| df_cat.loc[indices_faixa, 'Faixa de Variação'] = faixa_nome | |
| if len(indices_faixa) > 0: | |
| print(f"DEBUG {nome_legivel} - Passo 5 ({faixa_nome}): {len(indices_faixa)}") | |
| # Processar resultado da categoria usando a função auxiliar | |
| agg = processar_resultado(df_cat, nome_legivel) | |
| all_dfs_raw[nome_legivel] = agg.copy() | |
| # Formatação para exibição | |
| df_display = agg.copy() | |
| currency_cols = ['Total IPTU Atual (2025)', f'Total IPTU Projetado ({selected_year})', 'Diferença Absoluta (R$)'] | |
| percent_cols = ['% do Total de Inscrições', 'Participação na Arrecadação Atual (%)', 'Participação na Arrecadação Projetada (%)'] | |
| for col in currency_cols: | |
| df_display[col] = df_display[col].apply(lambda x: f"R$ {x:,.2f}") | |
| for col in percent_cols: | |
| df_display[col] = df_display[col].apply(lambda x: f"{x:.2f}%") | |
| all_dfs_display.append(gr.update(value=df_display, column_widths=calculate_column_widths(df_display))) | |
| # Garantir 14 resultados (1 Totais Gerais + 13 categorias) | |
| num_total_tabs = 14 # 1 totais gerais + 13 categorias = 14 abas | |
| while len(all_dfs_display) < num_total_tabs: | |
| all_dfs_display.append(gr.update(value=pd.DataFrame())) | |
| return [gr.update(value="", visible=False)] + all_dfs_display[:num_total_tabs] + [all_dfs_raw] | |
| # --- Gradio Interface (MODIFICADA) --- | |
| with gr.Blocks(theme=gr.themes.Default()) as demo: | |
| # --- CONFIGURAÇÃO DE ESTADO INICIAL --- | |
| INITIAL_DIFFERENTIATE_BUILDINGS = True | |
| INITIAL_DIFFERENTIATE_PARKING = True | |
| # --- FIM DA CONFIGURAÇÃO --- | |
| gr.Markdown("# Simulador de Cálculo de IPTU") | |
| gr.Markdown(f"Ajuste os parâmetros abaixo.") | |
| default_categories = [ | |
| "Predial Geral", "Predial Residencial", "Predial Não Residencial", | |
| "Estacionamentos Geral", "Estacionamentos Residenciais", "Estacionamentos Não Residenciais", | |
| "Terrenos 1ª DF", "Terrenos 2ª DF", "Terrenos 3ª DF", | |
| "Terrenos Alíquota Especial", "Terrenos Alíquota Fixa", | |
| ] | |
| default_rules_global = [{"year": 2026, "factor": 0.3}, {"year": 2027, "factor": 0.6}, {"year": 2028, "factor": 0.8}, {"year": 2029, "factor": 1.0}] | |
| # --- STATES --- | |
| universal_transition_rules_state = gr.State(default_rules_global) | |
| initial_rules_by_category = {cat: [{"year": 2026, "factor": 1.0}] for cat in default_categories} | |
| transition_rules_by_category_state = gr.State(initial_rules_by_category) | |
| all_yearly_aggregates_state = gr.State({}) | |
| df_resumo_final_state = gr.State() | |
| the_merge_state = gr.State() | |
| iptu_rules_state = gr.State() | |
| anos_projecao_state = gr.State() | |
| detailed_by_value_ranges_raw_state = gr.State({}) | |
| variation_analysis_raw_state = gr.State({}) # State para a nova view | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| with gr.Accordion("📁 Carregar Cenário de Arquivo Excel", open=False): | |
| gr.Markdown("Faça o upload de um arquivo `resultados_iptu_...xlsx` para carregar os parâmetros usados.") | |
| file_upload_input = gr.File(label="Carregar arquivo .xlsx", file_types=[".xlsx"]) | |
| upload_status_output = gr.Textbox(label="Status do Carregamento", interactive=False) | |
| gr.Markdown("### Parâmetros Gerais") | |
| ufm_input = gr.Number(value=5.771, label="Valor da UFM (2025)") | |
| diferencia_estacionamentos_check = gr.Checkbox(value=INITIAL_DIFFERENTIATE_PARKING, label="Diferenciar Estacionamentos por Uso?") | |
| diferencia_predios_check = gr.Checkbox(value=INITIAL_DIFFERENTIATE_BUILDINGS, label="Diferenciar Prédios por Uso?") | |
| segregar_piso_check = gr.Checkbox(value=False, label="Segregar Piso por Categoria?", visible=False) | |
| segregar_trava_check = gr.Checkbox(value=False, label="Segregar Regras de Transição por Categoria?") | |
| with gr.Accordion("Parâmetros da Trava de Aumento", open=True): | |
| trava_aumento_perc_input = gr.Number(value=0.0, label="Piso Aumento Relativo para Ativar Trava (%)") | |
| with gr.Accordion("Parâmetros do Desconto nas Alíquotas da Mancha", open=True): | |
| gr.Markdown("Defina o percentual de desconto a ser aplicado sobre cada uma das faixas de alíquota dos imóveis prediais e estacionamentos, para os imóveis constantes da mancha da enchente.") | |
| desconto_2026_input = gr.Number(value=5.0, label="Desconto em 2026 (%)", minimum=0, maximum=100) | |
| desconto_2027_input = gr.Number(value=0.0, label="Desconto em 2027 (%)", minimum=0, maximum=100) | |
| desconto_2028_input = gr.Number(value=0.0, label="Desconto em 2028 (%)", minimum=0, maximum=100) | |
| desconto_2029_input = gr.Number(value=0.0, label="Desconto em 2029 (%)", minimum=0, maximum=100) | |
| with gr.Accordion("Regras de Transição Universais (Fator da Trava)", open=True, visible=True) as universal_rules_accordion: | |
| universal_ui_outputs = [] | |
| for i in range(MAX_YEARS): | |
| is_visible = i < len(default_rules_global) | |
| year_val, factor_val = (default_rules_global[i]["year"], default_rules_global[i]["factor"] * 100) if is_visible else (2026 + i, 100) | |
| with gr.Row(visible=is_visible) as row: | |
| year_input = gr.Number(value=year_val, label="Ano", interactive=False) | |
| factor_input = gr.Number(value=factor_val, label="Fator do Aumento (%)", interactive=True) | |
| universal_ui_outputs.extend([row, year_input, factor_input]) | |
| with gr.Row(): | |
| universal_add_year_btn = gr.Button("➕ Adicionar Ano", scale=1) | |
| universal_remove_year_btn = gr.Button("➖ Remover Último Ano", scale=1) | |
| with gr.Accordion("Regras de Transição por Categoria (Fator da Trava)", open=False, visible=False) as category_rules_accordion: | |
| category_selector = gr.Dropdown(choices=default_categories, value="Predial Geral", label="Selecione a Categoria para Configurar") | |
| category_ui_outputs = [] | |
| for i in range(MAX_YEARS): | |
| is_visible = i < 1 | |
| year_val, factor_val = (2026, 100) | |
| with gr.Row(visible=is_visible) as row: | |
| year_input = gr.Number(value=year_val, label="Ano", interactive=False) | |
| factor_input = gr.Number(value=factor_val, label="Fator do Aumento (%)", interactive=True) | |
| category_ui_outputs.extend([row, year_input, factor_input]) | |
| with gr.Row(): | |
| category_add_year_btn = gr.Button("➕ Adicionar Ano à Categoria", scale=1) | |
| category_remove_year_btn = gr.Button("➖ Remover Último Ano da Categoria", scale=1) | |
| with gr.Row(variant="panel"): | |
| copy_source = gr.Dropdown(choices=default_categories, label="Copiar de:", scale=1) | |
| copy_target = gr.Dropdown(choices=default_categories, label="Para:", scale=1) | |
| copy_btn = gr.Button("Copiar Regras", scale=1) | |
| gr.Markdown("### Faixas e Alíquotas Configuráveis") | |
| all_rate_inputs = [] | |
| pg_defaults_lim = [17330, 24954, 74859, 124589, 190609, 259921, 762433] | |
| pg_defaults_aliq = [0.0, 0.004, 0.0047, 0.0055, 0.0062, 0.007, 0.0077, 0.0080] | |
| pr_defaults_lim = [17330, 24954, 74859, 124589, 190609, 259921, 762433] | |
| pr_defaults_aliq = [0.0, 0.004, 0.0047, 0.0055, 0.0062, 0.007, 0.0077, 0.0085] | |
| pnr_defaults_lim = [17330, 43320] | |
| pnr_defaults_aliq = [0.0, 0.006, 0.008] | |
| eg_defaults_lim = [2600, 24954, 74859, 124589, 190609, 260021, 762433] | |
| eg_defaults_aliq = [0.0, 0.004, 0.0047, 0.0055, 0.0062, 0.007, 0.0077, 0.0080] | |
| er_defaults_lim = [2600, 24954, 74859, 124589, 190609, 259921, 762433] | |
| er_defaults_aliq = [0.0, 0.004, 0.0047, 0.0055, 0.0062, 0.007, 0.0077, 0.0085] | |
| enr_defaults_lim = [2600, 7798] | |
| enr_defaults_aliq = [0.0, 0.006, 0.008] | |
| t_df_defaults_lim = [17330] + [0] * (MAX_BANDS - 2) | |
| with gr.Accordion("Predial Geral (PG)", open=False, visible=not INITIAL_DIFFERENTIATE_BUILDINGS) as pg_accordion: | |
| pg_inputs = create_configurable_band_ui("PG", 8, pg_defaults_lim, pg_defaults_aliq) | |
| all_rate_inputs.extend(pg_inputs) | |
| with gr.Accordion("Predial Residencial (PR)", open=False, visible=INITIAL_DIFFERENTIATE_BUILDINGS) as pr_accordion: | |
| pr_inputs = create_configurable_band_ui("PR", 8, pr_defaults_lim, pr_defaults_aliq) | |
| all_rate_inputs.extend(pr_inputs) | |
| with gr.Accordion("Predial Não Residencial (PNR)", open=False, visible=INITIAL_DIFFERENTIATE_BUILDINGS) as pnr_accordion: | |
| pnr_inputs = create_configurable_band_ui("PNR", 3, pnr_defaults_lim, pnr_defaults_aliq) | |
| all_rate_inputs.extend(pnr_inputs) | |
| with gr.Accordion("Estacionamentos Geral (EG)", open=False, visible=not INITIAL_DIFFERENTIATE_PARKING) as eg_accordion: | |
| eg_inputs = create_configurable_band_ui("EG", 8, eg_defaults_lim, eg_defaults_aliq) | |
| all_rate_inputs.extend(eg_inputs) | |
| with gr.Accordion("Estacionamentos Residenciais (ER)", open=False, visible=INITIAL_DIFFERENTIATE_PARKING) as er_accordion: | |
| er_inputs = create_configurable_band_ui("ER", 8, er_defaults_lim, er_defaults_aliq) | |
| all_rate_inputs.extend(er_inputs) | |
| with gr.Accordion("Estacionamentos Não Residenciais (ENR)", open=False, visible=INITIAL_DIFFERENTIATE_PARKING) as enr_accordion: | |
| enr_inputs = create_configurable_band_ui("ENR", 3, enr_defaults_lim, enr_defaults_aliq) | |
| all_rate_inputs.extend(enr_inputs) | |
| with gr.Accordion("Terrenos 1a DF (T1DF)", open=False) as t1df_accordion: | |
| t1df_inputs = create_configurable_band_ui("T1DF", 2, t_df_defaults_lim, [0.0, 0.03] + [0] * (MAX_BANDS - 2)) | |
| all_rate_inputs.extend(t1df_inputs) | |
| with gr.Accordion("Terrenos 2a DF (T2DF)", open=False) as t2df_accordion: | |
| t2df_inputs = create_configurable_band_ui("T2DF", 2, t_df_defaults_lim, [0.0, 0.02] + [0] * (MAX_BANDS - 2)) | |
| all_rate_inputs.extend(t2df_inputs) | |
| with gr.Accordion("Terrenos 3a DF (T3DF)", open=False) as t3df_accordion: | |
| t3df_inputs = create_configurable_band_ui("T3DF", 2, t_df_defaults_lim, [0.0, 0.01] + [0] * (MAX_BANDS - 2)) | |
| all_rate_inputs.extend(t3df_inputs) | |
| with gr.Accordion("Terrenos Alíquota Especial (TAE)", open=False) as tae_accordion: | |
| tae_alq1_input = gr.Number(label="TAE Alíquota 1 (ex: 0.009)", value=0.009) | |
| all_rate_inputs.append(tae_alq1_input) | |
| with gr.Accordion("Terrenos Alíquota Fixa (TAF)", open=False) as taf_accordion: | |
| taf_alq1_input = gr.Number(label="TAF Alíquota 1 (ex: 0.0095)", value=0.0095) | |
| all_rate_inputs.append(taf_alq1_input) | |
| with gr.Accordion("Visões a Processar/Exibir", open=True): | |
| show_projection_check = gr.Checkbox(label="Projeção de Arrecadação", value=True) | |
| show_aggregate_check = gr.Checkbox(label="Resumo Agregado por Categoria", value=True) | |
| show_detailed_bands_check = gr.Checkbox(label="Resumo Detalhado por Faixa", value=True) | |
| show_detailed_vv_check = gr.Checkbox(label="Resumo Detalhado por Valor Venal", value=True) | |
| show_totals_check = gr.Checkbox(label="Totais Gerais Calculados", value=True) | |
| show_flood_analysis_check = gr.Checkbox(label="Análise da Enchente (Tabela)", value=True) | |
| show_flood_graphs_check = gr.Checkbox(label="Análise da Enchente (Gráficos)", value=True) | |
| show_excel_check = gr.Checkbox(label="Download Resultados (Excel)", value=True) | |
| validation_error_box = gr.Textbox(label="⚠️ Erros de Validação", visible=False, interactive=False, lines=2) | |
| process_button = gr.Button("Processar Dados", variant="primary", size="lg") | |
| with gr.Column(scale=2): | |
| status_message = gr.Textbox(label="Status", interactive=False) | |
| with gr.Group(visible=True) as projection_box: | |
| gr.Markdown("### 📈 Projeção de Arrecadação com Trava de Aumento") | |
| output_df_projection = gr.DataFrame(label="Projeção Anual") | |
| with gr.Group(visible=True) as aggregate_summary_box: | |
| gr.Markdown("### 📊 Resumo Agregado por Categoria e Ano") | |
| with gr.Row(): | |
| output_agg_year_selector = gr.Dropdown(label="Selecione o Ano para Visualizar o Resumo", scale=1) | |
| output_agg_df = gr.DataFrame(label="Resumo Agregado Anual", scale=2) | |
| with gr.Group(visible=True) as detailed_bands_box: | |
| gr.Markdown("### 📋 Resumo Detalhado por Faixa e Categoria") | |
| selected_detailed_tab_id = "pr_faixa" if INITIAL_DIFFERENTIATE_BUILDINGS else "pg_faixa" | |
| with gr.Tabs(elem_id="detailed_tabs_container", selected=selected_detailed_tab_id) as detailed_tabs: | |
| with gr.TabItem("Predial Geral", visible=not INITIAL_DIFFERENTIATE_BUILDINGS, id="pg_faixa") as tab_predial_geral: | |
| output_predial_geral = gr.DataFrame(label="Predial Sem Diferenciação") | |
| with gr.TabItem("Predial Residencial", visible=INITIAL_DIFFERENTIATE_BUILDINGS, id="pr_faixa") as tab_predial_residencial: | |
| output_predial_residencial = gr.DataFrame(label="Predial Residencial") | |
| with gr.TabItem("Predial Não Residencial", visible=INITIAL_DIFFERENTIATE_BUILDINGS, id="pnr_faixa") as tab_predial_nao_residencial: | |
| output_predial_nao_residencial = gr.DataFrame(label="Predial Não Residencial") | |
| with gr.TabItem("Estacionamentos Geral", visible=not INITIAL_DIFFERENTIATE_PARKING, id="eg_faixa") as tab_estacionamentos_geral: | |
| output_estacionamentos_geral = gr.DataFrame(label="Estacionamentos Sem Diferenciação") | |
| with gr.TabItem("Estacionamentos Residenciais", visible=INITIAL_DIFFERENTIATE_PARKING, id="er_faixa") as tab_estacionamentos_residenciais: | |
| output_estacionamentos_residenciais = gr.DataFrame(label="Estacionamentos Residenciais") | |
| with gr.TabItem("Estacionamentos Não Residenciais", visible=INITIAL_DIFFERENTIATE_PARKING, id="enr_faixa") as tab_estacionamentos_nao_residenciais: | |
| output_estacionamentos_nao_residenciais = gr.DataFrame(label="Estacionamentos Não Residenciais") | |
| with gr.TabItem("Terrenos 1ª DF", visible=True, id="t1df_faixa") as tab_terrenos_1df: | |
| output_terrenos_1df = gr.DataFrame(label="Terrenos 1ª Divisão Fiscal") | |
| with gr.TabItem("Terrenos 2ª DF", visible=True, id="t2df_faixa") as tab_terrenos_2df: | |
| output_terrenos_2df = gr.DataFrame(label="Terrenos 2ª Divisão Fiscal") | |
| with gr.TabItem("Terrenos 3ª DF", visible=True, id="t3df_faixa") as tab_terrenos_3df: | |
| output_terrenos_3df = gr.DataFrame(label="Terrenos 3ª Divisão Fiscal") | |
| with gr.TabItem("Terrenos Aliq. Especial", visible=True, id="tae_faixa") as tab_terrenos_esp: | |
| output_terrenos_esp = gr.DataFrame(label="Terrenos Alíquota Especial") | |
| with gr.TabItem("Terrenos Aliq. Fixa", visible=True, id="taf_faixa") as tab_terrenos_fix: | |
| output_terrenos_fix = gr.DataFrame(label="Terrenos Alíquota Fixa") | |
| with gr.TabItem("Aposentados/Deficientes", visible=True, id="apo_faixa") as tab_aposentados: | |
| output_aposentados = gr.DataFrame(label="Predial - Aposentados/Deficientes") | |
| with gr.TabItem("Habitações Populares", visible=True, id="hab_faixa") as tab_habitacoes: | |
| output_habitacoes = gr.DataFrame(label="Predial - Habitações Populares") | |
| with gr.Group(visible=True) as detailed_vv_box: | |
| gr.Markdown("### 💲 Resumo Detalhado por Valor Venal") | |
| with gr.Row(variant="panel"): | |
| value_range_interval_input = gr.Number(label="Intervalo da Faixa de Valor Venal (R$)", value=300000, scale=2) | |
| aggregate_from_value_input = gr.Number(label="Agregar a Partir de Valor (R$)", value=5000000, scale=2) | |
| update_vv_button = gr.Button("Atualizar Visão", scale=1, min_width=100) | |
| selected_vv_tab_id = "pr_vv" if INITIAL_DIFFERENTIATE_BUILDINGS else "pg_vv" | |
| with gr.Tabs(elem_id="vv_tabs_container", selected=selected_vv_tab_id) as value_range_tabs: | |
| with gr.TabItem("Predial Geral VV", visible=not INITIAL_DIFFERENTIATE_BUILDINGS, id="pg_vv") as tab_predial_geral_vv: | |
| output_predial_geral_vv = gr.DataFrame(label="Predial Sem Diferenciação - Por Valor") | |
| with gr.TabItem("Predial Residencial VV", visible=INITIAL_DIFFERENTIATE_BUILDINGS, id="pr_vv") as tab_predial_residencial_vv: | |
| output_predial_residencial_vv = gr.DataFrame(label="Predial Residencial - Por Valor") | |
| with gr.TabItem("Predial Não Residencial VV", visible=INITIAL_DIFFERENTIATE_BUILDINGS, id="pnr_vv") as tab_predial_nao_residencial_vv: | |
| output_predial_nao_residencial_vv = gr.DataFrame(label="Predial Não Residencial - Por Valor") | |
| with gr.TabItem("Estacionamentos Geral VV", visible=not INITIAL_DIFFERENTIATE_PARKING, id="eg_vv") as tab_estacionamentos_geral_vv: | |
| output_estacionamentos_geral_vv = gr.DataFrame(label="Estacionamentos Sem Diferenciação - Por Valor") | |
| with gr.TabItem("Estacionamentos Residenciais VV", visible=INITIAL_DIFFERENTIATE_PARKING, id="er_vv") as tab_estacionamentos_residenciais_vv: | |
| output_estacionamentos_residenciais_vv = gr.DataFrame(label="Estacionamentos Residenciais - Por Valor") | |
| with gr.TabItem("Estacionamentos Não Residenciais VV", visible=INITIAL_DIFFERENTIATE_PARKING, id="enr_vv") as tab_estacionamentos_nao_residenciais_vv: | |
| output_estacionamentos_nao_residenciais_vv = gr.DataFrame(label="Estacionamentos Não Residenciais - Por Valor") | |
| with gr.TabItem("Terrenos 1ª DF VV", visible=True, id="t1df_vv") as tab_terrenos_1df_vv: | |
| output_terrenos_1df_vv = gr.DataFrame(label="Terrenos 1ª DF - Por Valor") | |
| with gr.TabItem("Terrenos 2ª DF VV", visible=True, id="t2df_vv") as tab_terrenos_2df_vv: | |
| output_terrenos_2df_vv = gr.DataFrame(label="Terrenos 2ª DF - Por Valor") | |
| with gr.TabItem("Terrenos 3ª DF VV", visible=True, id="t3df_vv") as tab_terrenos_3df_vv: | |
| output_terrenos_3df_vv = gr.DataFrame(label="Terrenos 3ª DF - Por Valor") | |
| with gr.TabItem("Terrenos Aliq. Especial VV", visible=True, id="tae_vv") as tab_terrenos_esp_vv: | |
| output_terrenos_esp_vv = gr.DataFrame(label="Terrenos Alíquota Especial - Por Valor") | |
| with gr.TabItem("Terrenos Aliq. Fixa VV", visible=True, id="taf_vv") as tab_terrenos_fix_vv: | |
| output_terrenos_fix_vv = gr.DataFrame(label="Terrenos Alíquota Fixa - Por Valor") | |
| with gr.TabItem("Aposentados/Deficientes VV", visible=True, id="apo_vv") as tab_aposentados_vv: | |
| output_aposentados_vv = gr.DataFrame(label="Predial - Aposentados/Deficientes - Por Valor") | |
| with gr.TabItem("Habitações Populares VV", visible=True, id="hab_vv") as tab_habitacoes_vv: | |
| output_habitacoes_vv = gr.DataFrame(label="Predial - Habitações Populares - Por Valor") | |
| # --- NOVA VIEW: ANÁLISE DE VARIAÇÃO --- | |
| with gr.Group(visible=True) as variation_analysis_box: | |
| gr.Markdown("### ⚖️ Análise de Variação do IPTU") | |
| gr.Markdown("**💡 Dica:** Para ver apenas quantos % subiram vs. desceram, deixe ambos os campos 'Limite' e 'Intervalo' em zero.") | |
| with gr.Row(variant="panel"): | |
| variation_limit_input = gr.Number(label="Limite Percentual (+/-)", value=50, scale=1) | |
| variation_interval_input = gr.Number(label="Intervalo de Agrupamento (%)", value=25, scale=1) | |
| variation_abrangencia_selector = gr.Dropdown( | |
| label="Abrangência", | |
| choices=["Todos os imóveis", "Apenas imóveis da mancha"], | |
| value="Todos os imóveis", | |
| scale=1 | |
| ) | |
| variation_year_selector = gr.Dropdown(label="Analisar Ano/Projeção", scale=2) | |
| variation_generate_btn = gr.Button("Gerar Análise de Variação", scale=1) | |
| variation_validation_error = gr.Textbox(label="⚠️ Erro", visible=False, interactive=False) | |
| selected_var_tab_id = "totais_var" | |
| with gr.Tabs(elem_id="var_tabs_container", selected=selected_var_tab_id) as variation_tabs: | |
| with gr.TabItem("Totais Gerais", visible=True, id="totais_var") as tab_totais_gerais_var: | |
| output_totais_gerais_var = gr.DataFrame(label="Variação - Totais Gerais") | |
| with gr.TabItem("Predial Geral", visible=not INITIAL_DIFFERENTIATE_BUILDINGS, id="pg_var") as tab_predial_geral_var: | |
| output_predial_geral_var = gr.DataFrame(label="Variação - Predial Sem Diferenciação") | |
| with gr.TabItem("Predial Residencial", visible=INITIAL_DIFFERENTIATE_BUILDINGS, id="pr_var") as tab_predial_residencial_var: | |
| output_predial_residencial_var = gr.DataFrame(label="Variação - Predial Residencial") | |
| with gr.TabItem("Predial Não Residencial", visible=INITIAL_DIFFERENTIATE_BUILDINGS, id="pnr_var") as tab_predial_nao_residencial_var: | |
| output_predial_nao_residencial_var = gr.DataFrame(label="Variação - Predial Não Residencial") | |
| with gr.TabItem("Estacionamentos Geral", visible=not INITIAL_DIFFERENTIATE_PARKING, id="eg_var") as tab_estacionamentos_geral_var: | |
| output_estacionamentos_geral_var = gr.DataFrame(label="Variação - Estacionamentos Sem Diferenciação") | |
| with gr.TabItem("Estacionamentos Residenciais", visible=INITIAL_DIFFERENTIATE_PARKING, id="er_var") as tab_estacionamentos_residenciais_var: | |
| output_estacionamentos_residenciais_var = gr.DataFrame(label="Variação - Estacionamentos Residenciais") | |
| with gr.TabItem("Estacionamentos Não Residenciais", visible=INITIAL_DIFFERENTIATE_PARKING, id="enr_var") as tab_estacionamentos_nao_residenciais_var: | |
| output_estacionamentos_nao_residenciais_var = gr.DataFrame(label="Variação - Estacionamentos Não Residenciais") | |
| with gr.TabItem("Terrenos 1ª DF", visible=True, id="t1df_var") as tab_terrenos_1df_var: | |
| output_terrenos_1df_var = gr.DataFrame(label="Variação - Terrenos 1ª Divisão Fiscal") | |
| with gr.TabItem("Terrenos 2ª DF", visible=True, id="t2df_var") as tab_terrenos_2df_var: | |
| output_terrenos_2df_var = gr.DataFrame(label="Variação - Terrenos 2ª Divisão Fiscal") | |
| with gr.TabItem("Terrenos 3ª DF", visible=True, id="t3df_var") as tab_terrenos_3df_var: | |
| output_terrenos_3df_var = gr.DataFrame(label="Variação - Terrenos 3ª Divisão Fiscal") | |
| with gr.TabItem("Terrenos Aliq. Especial", visible=True, id="tae_var") as tab_terrenos_esp_var: | |
| output_terrenos_esp_var = gr.DataFrame(label="Variação - Terrenos Alíquota Especial") | |
| with gr.TabItem("Terrenos Aliq. Fixa", visible=True, id="taf_var") as tab_terrenos_fix_var: | |
| output_terrenos_fix_var = gr.DataFrame(label="Variação - Terrenos Alíquota Fixa") | |
| with gr.TabItem("Aposentados/Deficientes", visible=True, id="apo_var") as tab_aposentados_var: | |
| output_aposentados_var = gr.DataFrame(label="Variação - Predial - Aposentados/Deficientes") | |
| with gr.TabItem("Habitações Populares", visible=True, id="hab_var") as tab_habitacoes_var: | |
| output_habitacoes_var = gr.DataFrame(label="Variação - Predial - Habitações Populares") | |
| with gr.Group(visible=True) as totals_box: | |
| gr.Markdown("### 🔢 Totais Gerais Calculados") | |
| total_novo_text, total_atual_trib_text, total_atual_calc_text = gr.Textbox(interactive=False), gr.Textbox(interactive=False), gr.Textbox(interactive=False) | |
| with gr.Group(visible=True) as flood_analysis_box: | |
| gr.Markdown("### 🌊 Análise dos Imóveis Atingidos pela Enchente") | |
| with gr.Tabs(elem_id="flood_analysis_tabs_container") as flood_analysis_tabs: | |
| with gr.TabItem("Visão Agregada", visible=True, id="flood_general") as tab_flood_general: | |
| output_enchente_analysis = gr.DataFrame(label="Análise Comparativa Total para Imóveis Atingidos") | |
| with gr.TabItem("Predial Geral", visible=not INITIAL_DIFFERENTIATE_BUILDINGS, id="flood_pred_geral") as tab_flood_pred_geral: | |
| output_enchente_analysis_pred_geral = gr.DataFrame(label="Análise Comparativa para Imóveis Prediais (Geral) Atingidos") | |
| with gr.TabItem("Predial Residencial", visible=INITIAL_DIFFERENTIATE_BUILDINGS, id="flood_pred_res") as tab_flood_pred_res: | |
| output_enchente_analysis_res = gr.DataFrame(label="Análise Comparativa para Imóveis Prediais Residenciais Atingidos") | |
| with gr.TabItem("Predial Não Residencial", visible=INITIAL_DIFFERENTIATE_BUILDINGS, id="flood_pred_non_res") as tab_flood_pred_non_res: | |
| output_enchente_analysis_non_res = gr.DataFrame(label="Análise Comparativa para Imóveis Prediais Não Residenciais Atingidos") | |
| with gr.TabItem("Estacionamentos Geral", visible=not INITIAL_DIFFERENTIATE_PARKING, id="flood_est_geral") as tab_flood_est_geral: | |
| output_enchente_analysis_est_geral = gr.DataFrame(label="Análise Comparativa para Estacionamentos (Geral) Atingidos") | |
| with gr.TabItem("Estacionamento Residencial", visible=INITIAL_DIFFERENTIATE_PARKING, id="flood_est_res") as tab_flood_est_res: | |
| output_enchente_analysis_est_res = gr.DataFrame(label="Análise Comparativa para Estacionamentos Residenciais Atingidos") | |
| with gr.TabItem("Estacionamento Não Residencial", visible=INITIAL_DIFFERENTIATE_PARKING, id="flood_est_non_res") as tab_flood_est_non_res: | |
| output_enchente_analysis_est_non_res = gr.DataFrame(label="Análise Comparativa para Estacionamentos Não Residenciais Atingidos") | |
| MAX_PLOTS = 30 | |
| with gr.Group(visible=True) as excel_download_box: | |
| gr.Markdown("### 📥 Download Resultados") | |
| filename_display, download_btn = gr.Textbox(label="Arquivo gerado", interactive=False, visible=False), gr.DownloadButton(label="Download", visible=False) | |
| with gr.Group(visible=True) as flood_graphs_box: | |
| gr.Markdown("### 📈🌊 Análise Gráfica - Imóveis atingidos pela enchente") | |
| flood_plots_outputs = [gr.Plot(visible=False) for _ in range(MAX_PLOTS)] | |
| # --- Lógica de Eventos da UI --- | |
| segregar_trava_check.change(fn=lambda is_checked: [gr.update(visible=not is_checked), gr.update(visible=is_checked)], inputs=[segregar_trava_check], outputs=[universal_rules_accordion, category_rules_accordion]) | |
| for i in range(MAX_YEARS): | |
| factor_input = universal_ui_outputs[i * 3 + 2] | |
| factor_input.change(fn=update_rule_factor, inputs=[gr.Number(value=i, visible=False), factor_input, universal_transition_rules_state], outputs=[universal_transition_rules_state], queue=False) | |
| universal_add_year_btn.click(fn=add_year_rule, inputs=[universal_transition_rules_state], outputs=[universal_transition_rules_state]).then(fn=update_transition_ui, inputs=[universal_transition_rules_state], outputs=universal_ui_outputs) | |
| universal_remove_year_btn.click(fn=remove_year_rule, inputs=[universal_transition_rules_state], outputs=[universal_transition_rules_state]).then(fn=update_transition_ui, inputs=[universal_transition_rules_state], outputs=universal_ui_outputs) | |
| category_selector.change(fn=update_category_rules_ui, inputs=[category_selector, transition_rules_by_category_state], outputs=category_ui_outputs) | |
| for i in range(MAX_YEARS): | |
| factor_input = category_ui_outputs[i * 3 + 2] | |
| factor_input.change(fn=update_category_rule_factor, inputs=[gr.Number(value=i, visible=False), factor_input, category_selector, transition_rules_by_category_state], outputs=[transition_rules_by_category_state], queue=False) | |
| category_add_year_btn.click(fn=add_year_to_category, inputs=[category_selector, transition_rules_by_category_state], outputs=[transition_rules_by_category_state]).then(fn=lambda cat, state: update_transition_ui(state.get(cat, [])), inputs=[category_selector, transition_rules_by_category_state], outputs=category_ui_outputs) | |
| category_remove_year_btn.click(fn=remove_year_from_category, inputs=[category_selector, transition_rules_by_category_state], outputs=[transition_rules_by_category_state]).then(fn=lambda cat, state: update_transition_ui(state.get(cat, [])), inputs=[category_selector, transition_rules_by_category_state], outputs=category_ui_outputs) | |
| copy_btn.click(fn=copy_rules_to_category, inputs=[copy_source, copy_target, transition_rules_by_category_state], outputs=[transition_rules_by_category_state]).then(fn=lambda cat, state: update_transition_ui(state.get(cat, [])), inputs=[copy_target, transition_rules_by_category_state], outputs=category_ui_outputs) | |
| # Lógica de visibilidade dos Acordions e Abas | |
| diferencia_estacionamentos_check.change(fn=update_parking_visibility, inputs=diferencia_estacionamentos_check, outputs=[eg_accordion, er_accordion, enr_accordion]) | |
| diferencia_predios_check.change(fn=update_building_visibility, inputs=diferencia_predios_check, outputs=[pg_accordion, pr_accordion, pnr_accordion]) | |
| detailed_tab_refs = [tab_predial_geral, tab_predial_residencial, tab_predial_nao_residencial, tab_estacionamentos_geral, tab_estacionamentos_residenciais, tab_estacionamentos_nao_residenciais, tab_terrenos_1df, tab_terrenos_2df, tab_terrenos_3df, tab_terrenos_esp, tab_terrenos_fix, tab_aposentados, tab_habitacoes] | |
| diferencia_predios_check.change(fn=update_detailed_tabs_visibility, inputs=[diferencia_predios_check, diferencia_estacionamentos_check], outputs=detailed_tab_refs) | |
| diferencia_estacionamentos_check.change(fn=update_detailed_tabs_visibility, inputs=[diferencia_predios_check, diferencia_estacionamentos_check], outputs=detailed_tab_refs) | |
| value_range_tab_refs = [tab_predial_geral_vv, tab_predial_residencial_vv, tab_predial_nao_residencial_vv, tab_estacionamentos_geral_vv, tab_estacionamentos_residenciais_vv, tab_estacionamentos_nao_residenciais_vv, tab_terrenos_1df_vv, tab_terrenos_2df_vv, tab_terrenos_3df_vv, tab_terrenos_esp_vv, tab_terrenos_fix_vv, tab_aposentados_vv, tab_habitacoes_vv] | |
| diferencia_predios_check.change(fn=update_detailed_vv_tabs_visibility, inputs=[diferencia_predios_check, diferencia_estacionamentos_check], outputs=value_range_tab_refs) | |
| diferencia_estacionamentos_check.change(fn=update_detailed_vv_tabs_visibility, inputs=[diferencia_predios_check, diferencia_estacionamentos_check], outputs=value_range_tab_refs) | |
| # Nova Lógica de Visibilidade para Análise de Variação | |
| variation_tab_refs = [tab_predial_geral_var, tab_predial_residencial_var, tab_predial_nao_residencial_var, tab_estacionamentos_geral_var, tab_estacionamentos_residenciais_var, tab_estacionamentos_nao_residenciais_var, tab_terrenos_1df_var, tab_terrenos_2df_var, tab_terrenos_3df_var, tab_terrenos_esp_var, tab_terrenos_fix_var, tab_aposentados_var, tab_habitacoes_var] | |
| diferencia_predios_check.change(fn=update_detailed_tabs_visibility, inputs=[diferencia_predios_check, diferencia_estacionamentos_check], outputs=variation_tab_refs) | |
| diferencia_estacionamentos_check.change(fn=update_detailed_tabs_visibility, inputs=[diferencia_predios_check, diferencia_estacionamentos_check], outputs=variation_tab_refs) | |
| flood_analysis_tab_refs = [tab_flood_general, tab_flood_pred_geral, tab_flood_pred_res, tab_flood_pred_non_res, tab_flood_est_geral, tab_flood_est_res, tab_flood_est_non_res] | |
| def update_flood_analysis_visibility(diferencia_predios, diferencia_estacionamentos): | |
| return (gr.update(visible=True), gr.update(visible=not diferencia_predios), gr.update(visible=diferencia_predios), gr.update(visible=diferencia_predios), gr.update(visible=not diferencia_estacionamentos), gr.update(visible=diferencia_estacionamentos), gr.update(visible=diferencia_estacionamentos)) | |
| diferencia_predios_check.change(fn=update_flood_analysis_visibility, inputs=[diferencia_predios_check, diferencia_estacionamentos_check], outputs=flood_analysis_tab_refs) | |
| diferencia_estacionamentos_check.change(fn=update_flood_analysis_visibility, inputs=[diferencia_predios_check, diferencia_estacionamentos_check], outputs=flood_analysis_tab_refs) | |
| # LÓGICA PARA SELEÇÃO DE ABAS | |
| diferencia_predios_check.change(fn=update_selected_tabs, inputs=[diferencia_predios_check], outputs=[detailed_tabs, value_range_tabs, variation_tabs], queue=False) | |
| output_agg_year_selector.change(fn=get_agg_df_for_year, inputs=[output_agg_year_selector, all_yearly_aggregates_state], outputs=[output_agg_df]) | |
| # --- LÓGICA DE VALIDAÇÃO --- | |
| limit_inputs, num_bands_dropdowns = [], [] | |
| num_inputs_per_cat = 1 + (MAX_BANDS - 1) * 2 + 1 | |
| for i in range(9): | |
| cat_inputs = all_rate_inputs[i * num_inputs_per_cat : (i + 1) * num_inputs_per_cat] | |
| num_bands_dropdowns.append(cat_inputs[0]) | |
| for j in range(MAX_BANDS - 1): limit_inputs.append(cat_inputs[1 + j * 2]) | |
| validation_triggers = limit_inputs + num_bands_dropdowns + [diferencia_estacionamentos_check, diferencia_predios_check] | |
| validation_inputs = [diferencia_estacionamentos_check, diferencia_predios_check] + all_rate_inputs | |
| for trigger_component in validation_triggers: | |
| trigger_component.change(fn=validate_band_limits, inputs=validation_inputs, outputs=[validation_error_box, process_button], queue=False) | |
| click_inputs = [ufm_input, diferencia_estacionamentos_check, diferencia_predios_check, trava_aumento_perc_input, segregar_piso_check, segregar_trava_check, value_range_interval_input, aggregate_from_value_input, show_projection_check, show_aggregate_check, show_detailed_bands_check, show_detailed_vv_check, show_totals_check, show_flood_analysis_check, show_flood_graphs_check, show_excel_check, universal_transition_rules_state, transition_rules_by_category_state, desconto_2026_input, desconto_2027_input, desconto_2028_input, desconto_2029_input] + all_rate_inputs | |
| detailed_tab_outputs = [output_predial_geral, output_predial_residencial, output_predial_nao_residencial, output_estacionamentos_geral, output_estacionamentos_residenciais, output_estacionamentos_nao_residenciais, output_terrenos_1df, output_terrenos_2df, output_terrenos_3df, output_terrenos_esp, output_terrenos_fix, output_aposentados, output_habitacoes] | |
| value_range_outputs = [output_predial_geral_vv, output_predial_residencial_vv, output_predial_nao_residencial_vv, output_estacionamentos_geral_vv, output_estacionamentos_residenciais_vv, output_estacionamentos_nao_residenciais_vv, output_terrenos_1df_vv, output_terrenos_2df_vv, output_terrenos_3df_vv, output_terrenos_esp_vv, output_terrenos_fix_vv, output_aposentados_vv, output_habitacoes_vv] | |
| visibility_outputs = [projection_box, aggregate_summary_box, detailed_bands_box, detailed_vv_box, totals_box, flood_analysis_box, excel_download_box, flood_graphs_box] | |
| outputs_for_upload = [upload_status_output, ufm_input, diferencia_estacionamentos_check, diferencia_predios_check, segregar_trava_check, trava_aumento_perc_input, universal_transition_rules_state, transition_rules_by_category_state, desconto_2026_input, desconto_2027_input, desconto_2028_input, desconto_2029_input] + all_rate_inputs | |
| file_upload_input.upload(fn=load_scenario_from_excel, inputs=[file_upload_input], outputs=outputs_for_upload).then(fn=update_transition_ui, inputs=[universal_transition_rules_state], outputs=universal_ui_outputs).then(fn=lambda cat, state: update_transition_ui(state.get(cat, [])), inputs=[category_selector, transition_rules_by_category_state], outputs=category_ui_outputs) | |
| process_button.click( | |
| fn=process_iptu_data, inputs=click_inputs, | |
| outputs=[ | |
| status_message, output_df_projection, total_novo_text, total_atual_trib_text, total_atual_calc_text, | |
| output_enchente_analysis, output_enchente_analysis_pred_geral, output_enchente_analysis_res, output_enchente_analysis_non_res, | |
| output_enchente_analysis_est_geral, output_enchente_analysis_est_res, output_enchente_analysis_est_non_res, | |
| all_yearly_aggregates_state, output_agg_year_selector, output_agg_df, | |
| filename_display, download_btn | |
| ] + detailed_tab_outputs + value_range_outputs + flood_plots_outputs + | |
| [df_resumo_final_state, the_merge_state, iptu_rules_state, anos_projecao_state, detailed_by_value_ranges_raw_state] + | |
| visibility_outputs + [variation_year_selector] | |
| ) | |
| update_vv_button.click( | |
| fn=update_valor_venal_view, | |
| inputs=[value_range_interval_input, aggregate_from_value_input, df_resumo_final_state, the_merge_state, iptu_rules_state, anos_projecao_state, diferencia_predios_check, diferencia_estacionamentos_check], | |
| outputs=value_range_outputs | |
| ) | |
| # Lista de outputs atualizada (14 abas) | |
| variation_df_outputs = [ | |
| output_totais_gerais_var, # NOVA: Primeira aba - Totais Gerais | |
| output_predial_residencial_var, # Aba 2 - Predial Residencial | |
| output_predial_nao_residencial_var, # Aba 3 - Predial Não Residencial | |
| output_predial_geral_var, # Aba 4 - Predial Geral | |
| output_estacionamentos_geral_var, # Aba 5 - Estacionamentos Geral | |
| output_estacionamentos_residenciais_var, # Aba 6 - Estacionamentos Residenciais | |
| output_estacionamentos_nao_residenciais_var, # Aba 7 - Estacionamentos Não Residenciais | |
| output_terrenos_1df_var, # Aba 8 - Terrenos 1ª DF | |
| output_terrenos_2df_var, # Aba 9 - Terrenos 2ª DF | |
| output_terrenos_3df_var, # Aba 10 - Terrenos 3ª DF | |
| output_terrenos_esp_var, # Aba 11 - Terrenos Alíquota Especial | |
| output_terrenos_fix_var, # Aba 12 - Terrenos Alíquota Fixa | |
| output_aposentados_var, # Aba 13 - Aposentados/Deficientes | |
| output_habitacoes_var # Aba 14 - Habitações Populares | |
| ] | |
| # Total de outputs: 1 (erro) + 14 (abas) + 1 (state) = 16 outputs | |
| variation_outputs = [variation_validation_error] + variation_df_outputs + [variation_analysis_raw_state] | |
| # Chamada atualizada do botão | |
| variation_generate_btn.click( | |
| fn=generate_variation_analysis_view, | |
| inputs=[variation_limit_input, variation_interval_input, variation_year_selector, variation_abrangencia_selector, the_merge_state, iptu_rules_state, diferencia_predios_check, diferencia_estacionamentos_check], | |
| outputs=variation_outputs | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |