Spaces:
Running
Running
| from __future__ import annotations | |
| from branca.element import Element | |
| import json | |
| from html import escape | |
| import math | |
| import re | |
| import unicodedata | |
| from dataclasses import dataclass | |
| from datetime import date, datetime | |
| from pathlib import Path | |
| from threading import Event, Lock | |
| from typing import Any | |
| import folium | |
| from folium import plugins | |
| import numpy as np | |
| import pandas as pd | |
| from fastapi import HTTPException | |
| from joblib import load | |
| from app.core.elaboracao import geocodificacao | |
| from app.core.shapefile_runtime import load_attribute_records | |
| from app.core.elaboracao.core import _migrar_pacote_v1_para_v2, normalizar_observacao_modelo | |
| from app.core.map_layers import ( | |
| add_bairros_layer, | |
| add_marker_payloads, | |
| apply_marker_payload_jitter, | |
| build_trabalhos_tecnicos_marker_payloads, | |
| add_zoom_responsive_circle_markers, | |
| ) | |
| from app.core.visualizacao.map_payload import build_leaflet_payload | |
| from app.runtime_log import append_runtime_log | |
| from app.runtime_paths import resolve_core_path | |
| from app.services import model_repository, trabalhos_tecnicos_service | |
| from app.services.serializers import sanitize_value | |
| AREA_PRIVATIVA_ALIASES = ["APRIV", "APRIVEQ", "ATPRIV", "ACOPRIV", "AREAPRIV", "AREA_PRIVATIVA", "AREA PRIVATIVA"] | |
| AREA_TOTAL_ALIASES = ["ATTOTAL", "ATOTAL", "ATOT", "AREA_TOTAL", "AREA TOTAL", "AREA"] | |
| AREA_GERAL_ALIASES = AREA_PRIVATIVA_ALIASES + AREA_TOTAL_ALIASES + ["ACONST", "ALOC"] | |
| VALOR_UNITARIO_ALIASES = ["VU", "VUNIT", "VULOC", "VUAPRIV", "VUNIPRIV", "VUACONST", "VALOR_UNITARIO", "VALOR UNITARIO"] | |
| VALOR_TOTAL_ALIASES = ["VLOC", "VTOT", "VTOTAL", "VALOR_TOTAL", "VALOR TOTAL"] | |
| RH_ALIASES = ["RH", "FATOR_RH", "FATOR RH", "RENDA_HABITACIONAL"] | |
| DATA_ALIASES = ["DATA", "DT", "RDATA", "AVALDT", "DT ENC", "DT EST", "DT REG", "DATA_AVALIACAO", "DATA AVALIACAO", "COMPETENCIA"] | |
| BAIRRO_ALIASES = ["BAIRRO", "BAIRROS", "NOME_BAIRRO", "BAIRRO_NOME", "NME BAI", "NME_BAI"] | |
| FINALIDADE_ALIASES = ["NME IMO-FINAL", "NME_IMO_FINAL", "NME IMO FINAL", "FINALIDADE", "TIPO_IMOVEL", "TIPO IMOVEL"] | |
| LAT_ALIASES = ["LAT", "LATITUDE", "SIAT_LATITUDE"] | |
| LON_ALIASES = ["LON", "LONG", "LONGITUDE", "SIAT_LONGITUDE"] | |
| APP_ALIASES = ["% APP", "%APP"] | |
| TIPO_POR_TOKEN = { | |
| "RECOND": "Residencia em condominio", | |
| "RCOMD": "Residencia em condominio", | |
| "TCOND": "Terreno em condominio", | |
| "LCOM": "Loja", | |
| "LOJA": "Loja", | |
| "CCOM": "Casa comercial", | |
| "DEP": "Deposito", | |
| "DEPOS": "Deposito", | |
| "RES": "Residencias isoladas / casas", | |
| "SALA": "Salas comerciais", | |
| "APTO": "Apartamentos", | |
| "APART": "Apartamentos", | |
| "AP": "Apartamentos", | |
| "TERRENO": "Terrenos", | |
| "TER": "Terrenos", | |
| "EDIF": "Edificio", | |
| "CASA": "Residencias isoladas / casas", | |
| "GALPAO": "Galpao", | |
| } | |
| CAMPO_TEXTO_META_FONTES = { | |
| "finalidade": [], | |
| "bairros": [], | |
| "aval_finalidade": [], | |
| "aval_bairro": [], | |
| } | |
| CAMPO_TEXTO_ALIASES_COLUNA = { | |
| "finalidade": FINALIDADE_ALIASES, | |
| "bairros": BAIRRO_ALIASES, | |
| "aval_finalidade": FINALIDADE_ALIASES, | |
| "aval_bairro": BAIRRO_ALIASES, | |
| } | |
| CAMPO_FAIXA_META_FONTES = { | |
| "data": ["meta:faixa_data"], | |
| "area": [], | |
| "rh": [], | |
| "aval_data": [], | |
| "aval_area": [], | |
| "aval_area_privativa": [], | |
| "aval_area_total": [], | |
| "aval_rh": [], | |
| "aval_valor_unitario": [], | |
| "aval_valor_total": [], | |
| } | |
| CAMPO_FAIXA_ALIASES_COLUNA = {} | |
| CAMPO_FAIXA_ALIASES_VARIAVEL = { | |
| "area": AREA_GERAL_ALIASES, | |
| "rh": RH_ALIASES, | |
| "aval_area": AREA_GERAL_ALIASES, | |
| "aval_area_privativa": AREA_PRIVATIVA_ALIASES, | |
| "aval_area_total": AREA_TOTAL_ALIASES, | |
| "aval_rh": RH_ALIASES, | |
| "aval_valor_unitario": VALOR_UNITARIO_ALIASES, | |
| "aval_valor_total": VALOR_TOTAL_ALIASES, | |
| } | |
| FONTE_META_LABELS = { | |
| "meta:nome_modelo": "Metadado: Nome do modelo", | |
| "meta:arquivo": "Metadado: Arquivo", | |
| "meta:autor": "Metadado: Autor/Elaborador", | |
| "meta:finalidade": "Metadado: Finalidade", | |
| "meta:tipo_imovel": "Metadado: Tipo", | |
| "meta:finalidades": "Metadado: Finalidades", | |
| "meta:bairros": "Metadado: Bairros", | |
| "meta:endereco_referencia": "Metadado: Endereco de referencia", | |
| "meta:faixa_data": "Metadado: Faixa de data", | |
| "meta:faixa_area": "Metadado: Faixa de area", | |
| "meta:faixa_rh": "Metadado: Faixa de RH", | |
| "meta:faixa_area_privativa": "Metadado: Faixa de area privativa", | |
| "meta:faixa_area_total": "Metadado: Faixa de area total", | |
| "meta:faixa_valor_unitario": "Metadado: Faixa de valor unitario", | |
| "meta:faixa_valor_total": "Metadado: Faixa de valor total", | |
| } | |
| MAX_COLUNAS_INDEXADAS = 120 | |
| MAX_VALORES_INDEXADOS_POR_COLUNA = 140 | |
| MAX_LINHAS_INDEXACAO = 5000 | |
| COMPATIBILIDADE_MAP = { | |
| "area_privativa": AREA_PRIVATIVA_ALIASES, | |
| "area_total": AREA_TOTAL_ALIASES, | |
| "valor_unitario": VALOR_UNITARIO_ALIASES, | |
| "valor_total": VALOR_TOTAL_ALIASES, | |
| } | |
| RANGE_CAMPOS = { | |
| "area_privativa": AREA_PRIVATIVA_ALIASES, | |
| "area_total": AREA_TOTAL_ALIASES, | |
| "valor_unitario": VALOR_UNITARIO_ALIASES, | |
| "valor_total": VALOR_TOTAL_ALIASES, | |
| } | |
| MAP_COLORS = [ | |
| "#1f77b4", | |
| "#2ca02c", | |
| "#ff7f0e", | |
| "#9467bd", | |
| "#17becf", | |
| "#bcbd22", | |
| "#4c78a8", | |
| "#54a24b", | |
| "#72b7b2", | |
| "#f2a541", | |
| "#6c9bd2", | |
| ] | |
| DISTANCIA_METRIC_CRS = "EPSG:31982" | |
| GEOMETRIA_CACHE_VERSION = 2 | |
| TRABALHOS_TECNICOS_MODELOS_SELECIONADOS = "selecionados" | |
| TRABALHOS_TECNICOS_MODELOS_SELECIONADOS_E_OUTRAS_VERSOES = "selecionados_e_outras_versoes" | |
| TRABALHOS_TECNICOS_MODELOS_SELECIONADOS_E_ANTERIORES = "selecionados_e_anteriores" | |
| TRABALHOS_TECNICOS_PROXIMIDADE_DESATIVADA = "sem_proximidade" | |
| TRABALHOS_TECNICOS_PROXIMIDADE_ATIVADA = "proximos_ao_avaliando" | |
| TRABALHOS_TECNICOS_RAIO_PADRAO_M = 1000 | |
| TRABALHOS_TECNICOS_RAIO_MAX_M = 5000 | |
| CRITERIO_ESPACIAL_MENOR_DISTANCIA = "menor_distancia" | |
| CRITERIO_ESPACIAL_MEDIA_DISTANCIA = "media_distancia" | |
| CRITERIO_ESPACIAL_MAIOR_DISTANCIA = "maior_distancia" | |
| CRITERIO_ESPACIAL_PADRAO = CRITERIO_ESPACIAL_MAIOR_DISTANCIA | |
| VERSAO_MODELO_RE = re.compile(r"^(?P<base>.*?)(?P<sufixo>[A-Za-z])$") | |
| TIPO_LOGRADOURO_ABREV = { | |
| "AC": "AC", | |
| "AL": "AL", | |
| "AV": "AV", | |
| "BCO": "BCO", | |
| "ESC": "ESC", | |
| "EST": "EST", | |
| "LGO": "LGO", | |
| "PASS": "PASS", | |
| "PCA": "PCA", | |
| "PC": "PCA", | |
| "R": "R", | |
| "ROD": "ROD", | |
| "TRAV": "TRAV", | |
| "TV": "TV", | |
| "VL": "VL", | |
| } | |
| TIPO_LOGRADOURO_EXPANDIDO = { | |
| "AC": "ACESSO", | |
| "AL": "ALAMEDA", | |
| "AV": "AVENIDA", | |
| "BCO": "BECO", | |
| "ESC": "ESCADARIA", | |
| "EST": "ESTRADA", | |
| "LGO": "LARGO", | |
| "PASS": "PASSAGEM", | |
| "PCA": "PRACA", | |
| "PC": "PRACA", | |
| "R": "RUA", | |
| "ROD": "RODOVIA", | |
| "TRAV": "TRAVESSA", | |
| "TV": "TRAVESSA", | |
| "VL": "VILA", | |
| } | |
| class PesquisaFiltros: | |
| otica: str = "avaliando" | |
| nome: str | None = None | |
| autor: str | None = None | |
| contem_app: str | None = None | |
| tipo_modelo: str | None = None | |
| negociacao_modelo: str | None = None | |
| finalidade: str | None = None | |
| finalidade_colunas: list[str] | None = None | |
| bairro: str | None = None | |
| bairros: list[str] | None = None | |
| bairros_colunas: list[str] | None = None | |
| endereco: str | None = None | |
| data_min: str | None = None | |
| data_colunas: list[str] | None = None | |
| data_max: str | None = None | |
| area_min: float | None = None | |
| area_colunas: list[str] | None = None | |
| area_max: float | None = None | |
| rh_min: float | None = None | |
| rh_colunas: list[str] | None = None | |
| rh_max: float | None = None | |
| aval_finalidade: str | None = None | |
| aval_finalidade_colunas: list[str] | None = None | |
| aval_zona: str | None = None | |
| aval_bairro: str | None = None | |
| aval_bairro_colunas: list[str] | None = None | |
| aval_endereco: str | None = None | |
| aval_data: str | None = None | |
| aval_data_colunas: list[str] | None = None | |
| aval_area: float | None = None | |
| aval_area_colunas: list[str] | None = None | |
| aval_area_privativa: float | None = None | |
| aval_area_privativa_colunas: list[str] | None = None | |
| aval_area_total: float | None = None | |
| aval_area_total_colunas: list[str] | None = None | |
| aval_rh: float | None = None | |
| aval_rh_colunas: list[str] | None = None | |
| aval_valor_unitario: float | None = None | |
| aval_valor_unitario_colunas: list[str] | None = None | |
| aval_valor_total: float | None = None | |
| aval_valor_total_colunas: list[str] | None = None | |
| aval_lat: float | None = None | |
| aval_lon: float | None = None | |
| avaliandos_geo: list[dict[str, Any]] | None = None | |
| somente_versoes_atuais: bool = True | |
| _CACHE_LOCK = Lock() | |
| _CACHE: dict[str, dict[str, Any]] = {} | |
| _ADMIN_CONFIG_LOCK = Lock() | |
| _CACHE_SOURCE_SIGNATURE: str | None = None | |
| _ADMIN_FONTES_SESSION: dict[str, list[str]] = {} | |
| _CATALOGO_VIAS_CACHE: list[dict[str, Any]] | None = None | |
| _FAMILIAS_VERSOES_CACHE: dict[str, list[dict[str, Any]]] | None = None | |
| _FAMILIAS_VERSOES_SIGNATURE: tuple[str, tuple[tuple[str, int, int], ...]] | None = None | |
| _FAMILIAS_VERSOES_INFLIGHT: dict[tuple[str, tuple[tuple[str, int, int], ...]], Event] = {} | |
| def _resolver_repositorio_modelos() -> model_repository.ModelRepositoryResolution: | |
| global _CACHE_SOURCE_SIGNATURE, _FAMILIAS_VERSOES_CACHE, _FAMILIAS_VERSOES_SIGNATURE | |
| resolved = model_repository.resolve_model_repository() | |
| with _CACHE_LOCK: | |
| if _CACHE_SOURCE_SIGNATURE != resolved.signature: | |
| _CACHE.clear() | |
| _CACHE_SOURCE_SIGNATURE = resolved.signature | |
| _FAMILIAS_VERSOES_CACHE = None | |
| _FAMILIAS_VERSOES_SIGNATURE = None | |
| return resolved | |
| def ensure_modelos_dir() -> Path: | |
| return _resolver_repositorio_modelos().modelos_dir | |
| def obter_admin_config_pesquisa() -> dict[str, Any]: | |
| resolved = _resolver_repositorio_modelos() | |
| pasta = resolved.modelos_dir | |
| modelos = sorted(pasta.glob("*.dai"), key=lambda item: item.name.lower()) | |
| todos = [_carregar_resumo_com_cache(caminho) for caminho in modelos] | |
| colunas_filtro = _montar_config_colunas_filtro(todos) | |
| admin_fontes = _carregar_fontes_admin(colunas_filtro) | |
| return sanitize_value( | |
| { | |
| "colunas_filtro": colunas_filtro, | |
| "admin_fontes": admin_fontes, | |
| "total_modelos": len(todos), | |
| "fonte_modelos": resolved.as_payload(), | |
| } | |
| ) | |
| def salvar_admin_config_pesquisa(campos: dict[str, list[str]] | None) -> dict[str, Any]: | |
| resolved = _resolver_repositorio_modelos() | |
| pasta = resolved.modelos_dir | |
| modelos = sorted(pasta.glob("*.dai"), key=lambda item: item.name.lower()) | |
| todos = [_carregar_resumo_com_cache(caminho) for caminho in modelos] | |
| colunas_filtro = _montar_config_colunas_filtro(todos) | |
| admin_fontes = _normalizar_fontes_admin(campos or {}, colunas_filtro) | |
| _salvar_fontes_admin_sessao(admin_fontes) | |
| return sanitize_value( | |
| { | |
| "colunas_filtro": colunas_filtro, | |
| "admin_fontes": admin_fontes, | |
| "total_modelos": len(todos), | |
| "status": "Configuracao de busca aplicada para a sessao atual.", | |
| "fonte_modelos": resolved.as_payload(), | |
| } | |
| ) | |
| def listar_modelos(filtros: PesquisaFiltros, limite: int | None = None, somente_contexto: bool = False) -> dict[str, Any]: | |
| resolved = _resolver_repositorio_modelos() | |
| pasta = resolved.modelos_dir | |
| modelos = sorted(pasta.glob("*.dai"), key=lambda item: item.name.lower()) | |
| otica = _normalizar_otica(filtros.otica) | |
| filtros_exec = PesquisaFiltros(**{**filtros.__dict__, "otica": otica}) | |
| todos = [_carregar_resumo_com_cache(caminho) for caminho in modelos] | |
| colunas_filtro = _montar_config_colunas_filtro(todos) | |
| admin_fontes = _carregar_fontes_admin(colunas_filtro) | |
| sugestoes = _extrair_sugestoes(todos, admin_fontes, incluir_logradouros_eixos=False) | |
| aval_lat, aval_lon = _normalizar_coordenadas_avaliando(filtros_exec.aval_lat, filtros_exec.aval_lon) | |
| avaliandos_geo = _normalizar_avaliandos_geo(filtros_exec.avaliandos_geo) | |
| if (aval_lat is None or aval_lon is None) and len(avaliandos_geo) == 1: | |
| aval_lat, aval_lon = _normalizar_coordenadas_avaliando(avaliandos_geo[0].get("lat"), avaliandos_geo[0].get("lon")) | |
| ids_versoes_antigas = _ids_versoes_antigas(todos) if filtros_exec.somente_versoes_atuais else set() | |
| if somente_contexto: | |
| return sanitize_value( | |
| { | |
| "modelos": [], | |
| "sugestoes": sugestoes, | |
| "colunas_filtro": colunas_filtro, | |
| "total_filtrado": 0, | |
| "total_geral": len(todos), | |
| "modelos_dir": str(pasta), | |
| "fonte_modelos": resolved.as_payload(), | |
| "admin_fontes": admin_fontes, | |
| "filtros_aplicados": { | |
| "nome": filtros.nome, | |
| "autor": filtros.autor, | |
| "contem_app": filtros.contem_app, | |
| "tipo_modelo": filtros.tipo_modelo, | |
| "negociacao_modelo": filtros.negociacao_modelo, | |
| "finalidade": filtros.finalidade, | |
| "finalidade_colunas": filtros.finalidade_colunas or [], | |
| "bairro": filtros.bairro, | |
| "bairros": filtros.bairros or [], | |
| "bairros_colunas": filtros.bairros_colunas or [], | |
| "endereco": filtros.endereco, | |
| "data_min": filtros.data_min, | |
| "data_colunas": filtros.data_colunas or [], | |
| "data_max": filtros.data_max, | |
| "area_min": filtros.area_min, | |
| "area_colunas": filtros.area_colunas or [], | |
| "area_max": filtros.area_max, | |
| "rh_min": filtros.rh_min, | |
| "rh_colunas": filtros.rh_colunas or [], | |
| "rh_max": filtros.rh_max, | |
| "otica": otica, | |
| "aval_finalidade": filtros.aval_finalidade, | |
| "aval_finalidade_colunas": filtros.aval_finalidade_colunas or [], | |
| "aval_zona": filtros.aval_zona, | |
| "aval_bairro": filtros.aval_bairro, | |
| "aval_bairro_colunas": filtros.aval_bairro_colunas or [], | |
| "aval_endereco": filtros.aval_endereco, | |
| "aval_data": filtros.aval_data, | |
| "aval_data_colunas": filtros.aval_data_colunas or [], | |
| "aval_area": filtros.aval_area, | |
| "aval_area_colunas": filtros.aval_area_colunas or [], | |
| "aval_area_privativa": filtros.aval_area_privativa, | |
| "aval_area_privativa_colunas": filtros.aval_area_privativa_colunas or [], | |
| "aval_area_total": filtros.aval_area_total, | |
| "aval_area_total_colunas": filtros.aval_area_total_colunas or [], | |
| "aval_rh": filtros.aval_rh, | |
| "aval_rh_colunas": filtros.aval_rh_colunas or [], | |
| "aval_valor_unitario": filtros.aval_valor_unitario, | |
| "aval_valor_unitario_colunas": filtros.aval_valor_unitario_colunas or [], | |
| "aval_valor_total": filtros.aval_valor_total, | |
| "aval_valor_total_colunas": filtros.aval_valor_total_colunas or [], | |
| "aval_lat": aval_lat, | |
| "aval_lon": aval_lon, | |
| "avaliandos_geo": avaliandos_geo, | |
| "somente_versoes_atuais": bool(filtros_exec.somente_versoes_atuais), | |
| }, | |
| } | |
| ) | |
| filtrados = [item for item in todos if _aceita_filtros(item, filtros_exec, admin_fontes)] | |
| if otica == "avaliando": | |
| filtrados = [_anexar_avaliando_info(item, filtros_exec, admin_fontes) for item in filtrados] | |
| filtrados = [item for item in filtrados if item.get("avaliando", {}).get("aceito")] | |
| if ids_versoes_antigas: | |
| filtrados = [item for item in filtrados if str(item.get("id") or "") not in ids_versoes_antigas] | |
| if len(avaliandos_geo) > 1: | |
| filtrados = [_anexar_distancias_modelo_multiplos(item, avaliandos_geo) for item in filtrados] | |
| filtrados.sort( | |
| key=lambda item: ( | |
| item.get("distancia_resumo", {}).get("principal_distancia_km") is None, | |
| float(item.get("distancia_resumo", {}).get("principal_distancia_km") or 0.0), | |
| str(item.get("nome_modelo") or item.get("arquivo") or item.get("id") or "").lower(), | |
| ) | |
| ) | |
| elif aval_lat is not None and aval_lon is not None: | |
| filtrados = [_anexar_distancia_modelo(item, aval_lat, aval_lon) for item in filtrados] | |
| filtrados.sort( | |
| key=lambda item: ( | |
| item.get("distancia_km") is None, | |
| float(item.get("distancia_km") or 0.0), | |
| str(item.get("nome_modelo") or item.get("arquivo") or item.get("id") or "").lower(), | |
| ) | |
| ) | |
| if limite and limite > 0: | |
| filtrados = filtrados[:limite] | |
| modelos_publicos = [_modelo_publico(item) for item in filtrados] | |
| return sanitize_value( | |
| { | |
| "modelos": modelos_publicos, | |
| "sugestoes": sugestoes, | |
| "colunas_filtro": colunas_filtro, | |
| "total_filtrado": len(filtrados), | |
| "total_geral": len(todos), | |
| "modelos_dir": str(pasta), | |
| "fonte_modelos": resolved.as_payload(), | |
| "admin_fontes": admin_fontes, | |
| "filtros_aplicados": { | |
| "nome": filtros.nome, | |
| "autor": filtros.autor, | |
| "contem_app": filtros.contem_app, | |
| "tipo_modelo": filtros.tipo_modelo, | |
| "negociacao_modelo": filtros.negociacao_modelo, | |
| "finalidade": filtros.finalidade, | |
| "finalidade_colunas": filtros.finalidade_colunas or [], | |
| "bairro": filtros.bairro, | |
| "bairros": filtros.bairros or [], | |
| "bairros_colunas": filtros.bairros_colunas or [], | |
| "endereco": filtros.endereco, | |
| "data_min": filtros.data_min, | |
| "data_colunas": filtros.data_colunas or [], | |
| "data_max": filtros.data_max, | |
| "area_min": filtros.area_min, | |
| "area_colunas": filtros.area_colunas or [], | |
| "area_max": filtros.area_max, | |
| "rh_min": filtros.rh_min, | |
| "rh_colunas": filtros.rh_colunas or [], | |
| "rh_max": filtros.rh_max, | |
| "otica": otica, | |
| "aval_finalidade": filtros.aval_finalidade, | |
| "aval_finalidade_colunas": filtros.aval_finalidade_colunas or [], | |
| "aval_zona": filtros.aval_zona, | |
| "aval_bairro": filtros.aval_bairro, | |
| "aval_bairro_colunas": filtros.aval_bairro_colunas or [], | |
| "aval_endereco": filtros.aval_endereco, | |
| "aval_data": filtros.aval_data, | |
| "aval_data_colunas": filtros.aval_data_colunas or [], | |
| "aval_area": filtros.aval_area, | |
| "aval_area_colunas": filtros.aval_area_colunas or [], | |
| "aval_area_privativa": filtros.aval_area_privativa, | |
| "aval_area_privativa_colunas": filtros.aval_area_privativa_colunas or [], | |
| "aval_area_total": filtros.aval_area_total, | |
| "aval_area_total_colunas": filtros.aval_area_total_colunas or [], | |
| "aval_rh": filtros.aval_rh, | |
| "aval_rh_colunas": filtros.aval_rh_colunas or [], | |
| "aval_valor_unitario": filtros.aval_valor_unitario, | |
| "aval_valor_unitario_colunas": filtros.aval_valor_unitario_colunas or [], | |
| "aval_valor_total": filtros.aval_valor_total, | |
| "aval_valor_total_colunas": filtros.aval_valor_total_colunas or [], | |
| "aval_lat": aval_lat, | |
| "aval_lon": aval_lon, | |
| "avaliandos_geo": avaliandos_geo, | |
| "somente_versoes_atuais": bool(filtros_exec.somente_versoes_atuais), | |
| }, | |
| } | |
| ) | |
| def resolver_localizacao_avaliando( | |
| *, | |
| latitude: float | None = None, | |
| longitude: float | None = None, | |
| logradouro: str | None = None, | |
| numero: Any = None, | |
| cdlog: Any = None, | |
| ) -> dict[str, Any]: | |
| lat, lon = _normalizar_coordenadas_avaliando(latitude, longitude) | |
| if lat is not None and lon is not None: | |
| return sanitize_value( | |
| { | |
| "lat": lat, | |
| "lon": lon, | |
| "origem": "coordenadas", | |
| "status": f"Coordenadas do avaliando definidas em {lat:.6f}, {lon:.6f}.", | |
| } | |
| ) | |
| numero_int = _to_int_or_none(numero) | |
| if numero_int is None or numero_int <= 0: | |
| raise HTTPException(status_code=400, detail="Informe um numero valido para localizar o avaliando") | |
| cdlog_int = _to_int_or_none(cdlog) | |
| via_resolvida = None | |
| if cdlog_int is None: | |
| via_resolvida = _resolver_via_por_logradouro(logradouro) | |
| cdlog_int = via_resolvida["cdlog"] | |
| else: | |
| via_resolvida = _resolver_via_por_cdlog(cdlog_int) | |
| df_base = pd.DataFrame([{"CDLOG": cdlog_int, "NUMERO": numero_int}]) | |
| df_geo, df_falhas, ajustados = geocodificacao.geocodificar(df_base, "CDLOG", "NUMERO", auto_200=False) | |
| lat = _to_float_or_none(df_geo.iloc[0].get("lat")) if not df_geo.empty else None | |
| lon = _to_float_or_none(df_geo.iloc[0].get("lon")) if not df_geo.empty else None | |
| lat, lon = _normalizar_coordenadas_avaliando(lat, lon) | |
| if lat is None or lon is None: | |
| sugestoes = "" | |
| motivo = "Nao foi possivel localizar o endereco informado nos eixos." | |
| if not df_falhas.empty: | |
| linha_falha = df_falhas.iloc[0] | |
| sugestoes = str(linha_falha.get("sugestoes") or "").strip() | |
| motivo = str(linha_falha.get("motivo") or motivo).strip() or motivo | |
| detalhe = motivo | |
| if sugestoes: | |
| detalhe = f"{motivo} Sugestoes de numeracao: {sugestoes}." | |
| raise HTTPException(status_code=400, detail=detalhe) | |
| numero_usado = numero_int | |
| if ajustados: | |
| try: | |
| numero_usado = int(ajustados[0].get("numero_usado") or numero_int) | |
| except Exception: | |
| numero_usado = numero_int | |
| logradouro_resolvido = str(via_resolvida.get("logradouro") or "").strip() | |
| status = f"Endereco localizado em {logradouro_resolvido}, {numero_usado}." | |
| if numero_usado != numero_int: | |
| status += f" Numero interpolado mais proximo: {numero_usado}." | |
| return sanitize_value( | |
| { | |
| "lat": lat, | |
| "lon": lon, | |
| "origem": "eixos", | |
| "cdlog": cdlog_int, | |
| "logradouro": logradouro_resolvido, | |
| "numero_informado": numero_int, | |
| "numero_usado": numero_usado, | |
| "status": status, | |
| } | |
| ) | |
| def _ids_versoes_antigas(modelos: list[dict[str, Any]]) -> set[str]: | |
| grupos: dict[str, list[tuple[str, str]]] = {} | |
| for modelo in modelos: | |
| versao_info = _extrair_info_versao_modelo(modelo) | |
| if versao_info is None: | |
| continue | |
| base_norm, sufixo, modelo_id = versao_info | |
| grupos.setdefault(base_norm, []).append((sufixo, modelo_id)) | |
| ids_antigos: set[str] = set() | |
| for itens in grupos.values(): | |
| if len(itens) < 2: | |
| continue | |
| sufixos_distintos = {sufixo for sufixo, _modelo_id in itens} | |
| if len(sufixos_distintos) < 2: | |
| continue | |
| ultimo_sufixo = max(sufixos_distintos) | |
| for sufixo, modelo_id in itens: | |
| if sufixo != ultimo_sufixo: | |
| ids_antigos.add(modelo_id) | |
| return ids_antigos | |
| def _extrair_info_versao_modelo(modelo: dict[str, Any]) -> tuple[str, str, str] | None: | |
| modelo_id = _str_or_none(modelo.get("id")) | |
| if not modelo_id: | |
| return None | |
| candidatos = [ | |
| _texto_nome_modelo_sugestao(modelo.get("nome_modelo")), | |
| _texto_nome_modelo_sugestao(modelo.get("arquivo")), | |
| _texto_nome_modelo_sugestao(modelo.get("id")), | |
| ] | |
| for candidato in candidatos: | |
| info = _extrair_info_familia_modelo(candidato) | |
| if info is None: | |
| continue | |
| base_norm, _base_texto, sufixo = info | |
| if not sufixo: | |
| continue | |
| return (base_norm, sufixo, modelo_id) | |
| return None | |
| def _extrair_info_familia_modelo(value: Any) -> tuple[str, str, str] | None: | |
| texto = _texto_nome_modelo_sugestao(value) | |
| if not texto: | |
| return None | |
| match = VERSAO_MODELO_RE.match(texto) | |
| if match: | |
| base_texto = str(match.group("base") or "").strip() | |
| sufixo = str(match.group("sufixo") or "").upper() | |
| base_norm = _normalize(base_texto) | |
| if base_norm and base_texto: | |
| return (base_norm, base_texto, sufixo) | |
| base_norm = _normalize(texto) | |
| if not base_norm: | |
| return None | |
| return (base_norm, texto, "") | |
| def _montar_familias_versoes_modelos(caminhos_modelo: list[Path]) -> dict[str, list[dict[str, Any]]]: | |
| familias: dict[str, list[dict[str, Any]]] = {} | |
| for caminho in caminhos_modelo: | |
| info = _extrair_info_versao_modelo( | |
| { | |
| "id": caminho.stem, | |
| "arquivo": caminho.name, | |
| "nome_modelo": caminho.stem, | |
| } | |
| ) | |
| if info is None: | |
| continue | |
| base_norm, sufixo, modelo_id = info | |
| familias.setdefault(base_norm, []).append( | |
| { | |
| "sufixo": sufixo, | |
| "id": modelo_id, | |
| "arquivo": caminho.name, | |
| "nome_modelo": modelo_id, | |
| } | |
| ) | |
| for itens in familias.values(): | |
| itens.sort(key=lambda item: str(item.get("sufixo") or "")) | |
| return familias | |
| def _assinatura_modelos_familias(caminhos_modelo: list[Path]) -> tuple[tuple[str, int, int], ...]: | |
| assinatura: list[tuple[str, int, int]] = [] | |
| for caminho in sorted(caminhos_modelo, key=lambda item: item.name.lower()): | |
| try: | |
| stat = caminho.stat() | |
| except OSError: | |
| continue | |
| assinatura.append((caminho.name, int(stat.st_mtime_ns), int(stat.st_size))) | |
| return tuple(assinatura) | |
| def obter_familias_versoes_modelos_cache(caminhos_modelo: list[Path] | None = None) -> dict[str, list[dict[str, Any]]]: | |
| global _FAMILIAS_VERSOES_CACHE, _FAMILIAS_VERSOES_SIGNATURE | |
| resolved = _resolver_repositorio_modelos() | |
| caminhos = list(caminhos_modelo) if caminhos_modelo is not None else list(resolved.modelos_dir.glob("*.dai")) | |
| assinatura = (resolved.signature, _assinatura_modelos_familias(caminhos)) | |
| while True: | |
| with _CACHE_LOCK: | |
| if _FAMILIAS_VERSOES_SIGNATURE == assinatura and isinstance(_FAMILIAS_VERSOES_CACHE, dict): | |
| return { | |
| chave: [dict(item) for item in itens] | |
| for chave, itens in _FAMILIAS_VERSOES_CACHE.items() | |
| } | |
| inflight = _FAMILIAS_VERSOES_INFLIGHT.get(assinatura) | |
| if inflight is None: | |
| inflight = Event() | |
| _FAMILIAS_VERSOES_INFLIGHT[assinatura] = inflight | |
| is_builder = True | |
| else: | |
| is_builder = False | |
| if is_builder: | |
| try: | |
| familias = _montar_familias_versoes_modelos(caminhos) | |
| with _CACHE_LOCK: | |
| _FAMILIAS_VERSOES_SIGNATURE = assinatura | |
| _FAMILIAS_VERSOES_CACHE = { | |
| chave: [dict(item) for item in itens] | |
| for chave, itens in familias.items() | |
| } | |
| return { | |
| chave: [dict(item) for item in itens] | |
| for chave, itens in familias.items() | |
| } | |
| finally: | |
| with _CACHE_LOCK: | |
| waiter = _FAMILIAS_VERSOES_INFLIGHT.pop(assinatura, None) | |
| if waiter is not None: | |
| waiter.set() | |
| inflight.wait() | |
| def _agrupar_nomes_modelo_por_familia(values: list[Any]) -> dict[str, list[str]]: | |
| familias: dict[str, list[str]] = {} | |
| for value in values: | |
| texto = _texto_nome_modelo_sugestao(value) | |
| if not texto: | |
| continue | |
| info = _extrair_info_familia_modelo(texto) | |
| if info is None: | |
| continue | |
| base_norm, _base_texto, _sufixo = info | |
| _append_aliases_unicos(familias.setdefault(base_norm, []), texto) | |
| return familias | |
| def _append_aliases_unicos(destino: list[str], *values: Any) -> list[str]: | |
| vistos = {str(item or "").strip().casefold() for item in destino if str(item or "").strip()} | |
| for value in values: | |
| texto = str(value or "").strip() | |
| chave = texto.casefold() | |
| if not texto or chave in vistos: | |
| continue | |
| vistos.add(chave) | |
| destino.append(texto) | |
| return destino | |
| def _resolver_aliases_modelo_para_trabalhos_tecnicos( | |
| *, | |
| modelo_id: str, | |
| caminho: Path, | |
| nome_modelo: str, | |
| incluir_outras_versoes: bool, | |
| familias_versoes: dict[str, list[dict[str, Any]]] | None = None, | |
| familias_trabalhos_tecnicos: dict[str, list[str]] | None = None, | |
| ) -> list[str]: | |
| aliases: list[str] = [] | |
| referencias = [modelo_id, caminho.name, nome_modelo] | |
| _append_aliases_unicos(aliases, *referencias) | |
| if not incluir_outras_versoes: | |
| return aliases | |
| familias_selecionadas = _agrupar_nomes_modelo_por_familia(referencias) | |
| for nomes_familia in familias_selecionadas.values(): | |
| for nome_familia in nomes_familia: | |
| info_familia = _extrair_info_familia_modelo(nome_familia) | |
| if info_familia is None: | |
| continue | |
| _append_aliases_unicos(aliases, info_familia[1], f"{info_familia[1]}.dai") | |
| for base_norm in familias_selecionadas: | |
| for item in (familias_versoes or {}).get(base_norm, []): | |
| _append_aliases_unicos( | |
| aliases, | |
| item.get("id"), | |
| item.get("arquivo"), | |
| item.get("nome_modelo"), | |
| ) | |
| _append_aliases_unicos(aliases, *((familias_trabalhos_tecnicos or {}).get(base_norm) or [])) | |
| return aliases | |
| def _descricao_modo_trabalhos_tecnicos_modelos(value: str) -> str: | |
| if value == TRABALHOS_TECNICOS_MODELOS_SELECIONADOS_E_OUTRAS_VERSOES: | |
| return "dos modelos selecionados e de outras versões do mesmo modelo" | |
| return "dos modelos selecionados" | |
| def _modo_trabalhos_tecnicos_inclui_outras_versoes(value: str) -> bool: | |
| return value == TRABALHOS_TECNICOS_MODELOS_SELECIONADOS_E_OUTRAS_VERSOES | |
| def _normalizar_familias_trabalhos_tecnicos_modelos() -> dict[str, list[str]]: | |
| return _agrupar_nomes_modelo_por_familia(trabalhos_tecnicos_service.listar_nomes_modelos_relacionados()) | |
| def _chave_unica_trabalho_tecnico(item: dict[str, Any]) -> tuple[str, str, str, str, str, str]: | |
| return ( | |
| str(item.get("trabalho_id") or "").strip(), | |
| str(item.get("coord_lat") or "").strip(), | |
| str(item.get("coord_lon") or "").strip(), | |
| str(item.get("label") or "").strip(), | |
| str(item.get("endereco") or "").strip(), | |
| str(item.get("numero") or "").strip(), | |
| ) | |
| def _listar_avaliandos_tecnicos_proximos_por_avaliandos( | |
| avaliandos_geo: list[dict[str, Any]] | None, | |
| raio_m: int, | |
| chaves_modelos_selecionados: list[str] | None = None, | |
| ) -> list[dict[str, Any]]: | |
| agregados: dict[tuple[str, str, str, str, str, str], dict[str, Any]] = {} | |
| for idx, avaliando in enumerate(avaliandos_geo or []): | |
| lat, lon = _normalizar_coordenadas_avaliando(avaliando.get("lat"), avaliando.get("lon")) | |
| if lat is None or lon is None: | |
| continue | |
| label = str(avaliando.get("label") or f"A{idx + 1}").strip() or f"A{idx + 1}" | |
| avaliando_id = str(avaliando.get("id") or label or f"avaliando-{idx + 1}").strip() or f"avaliando-{idx + 1}" | |
| proximos = trabalhos_tecnicos_service.listar_avaliandos_proximos( | |
| lat, | |
| lon, | |
| raio_m, | |
| chaves_modelo_excluir=chaves_modelos_selecionados, | |
| ) | |
| for item in proximos or []: | |
| chave = _chave_unica_trabalho_tecnico(item) | |
| distancia_m = _to_float_or_none(item.get("distancia_m")) | |
| distancia_label = str(item.get("distancia_label") or "").strip() or None | |
| proximidade_avaliando = { | |
| "id": avaliando_id, | |
| "label": label, | |
| "distancia_m": distancia_m, | |
| "distancia_label": distancia_label, | |
| } | |
| atual = agregados.get(chave) | |
| if atual is None: | |
| novo_item = dict(item) | |
| novo_item["avaliandos_proximos"] = [proximidade_avaliando] | |
| novo_item["distancia_m_min"] = distancia_m | |
| novo_item["distancia_label_min"] = distancia_label | |
| agregados[chave] = novo_item | |
| continue | |
| proximidades_existentes = atual.setdefault("avaliandos_proximos", []) | |
| if not any(str(entry.get("id") or "").strip() == avaliando_id for entry in proximidades_existentes): | |
| proximidades_existentes.append(proximidade_avaliando) | |
| distancia_atual = _to_float_or_none(atual.get("distancia_m_min")) | |
| if distancia_m is not None and (distancia_atual is None or distancia_m < distancia_atual): | |
| atual["distancia_m_min"] = distancia_m | |
| atual["distancia_label_min"] = distancia_label | |
| consolidado = list(agregados.values()) | |
| for item in consolidado: | |
| proximidades = item.get("avaliandos_proximos") if isinstance(item.get("avaliandos_proximos"), list) else [] | |
| proximidades.sort( | |
| key=lambda entry: ( | |
| str(entry.get("label") or "").casefold(), | |
| float(entry.get("distancia_m") or 0.0), | |
| ) | |
| ) | |
| item["avaliandos_proximos"] = proximidades | |
| consolidado.sort( | |
| key=lambda item: ( | |
| float(item.get("distancia_m_min") or 0.0), | |
| str(item.get("trabalho_nome") or "").casefold(), | |
| str(item.get("label") or item.get("endereco") or "").casefold(), | |
| ) | |
| ) | |
| return sanitize_value(consolidado) | |
| def _consolidar_trabalhos_tecnicos_modelos(modelos_plotados: list[dict[str, Any]]) -> list[dict[str, Any]]: | |
| agregados: dict[tuple[str, str, str, str, str, str], dict[str, Any]] = {} | |
| for modelo in modelos_plotados or []: | |
| modelo_id = str(modelo.get("id") or "").strip() | |
| for item in (modelo.get("avaliandos_tecnicos") or []): | |
| if not isinstance(item, dict): | |
| continue | |
| chave = _chave_unica_trabalho_tecnico(item) | |
| atual = agregados.get(chave) | |
| if atual is None: | |
| novo_item = dict(item) | |
| novo_item["modelos_relacionados"] = [str(valor).strip() for valor in (item.get("modelos_relacionados") or []) if str(valor).strip()] | |
| novo_item["modelos_origem_ids"] = [modelo_id] if modelo_id else [] | |
| agregados[chave] = novo_item | |
| continue | |
| _append_aliases_unicos( | |
| atual.setdefault("modelos_relacionados", []), | |
| *[str(valor).strip() for valor in (item.get("modelos_relacionados") or []) if str(valor).strip()], | |
| ) | |
| if modelo_id: | |
| _append_aliases_unicos(atual.setdefault("modelos_origem_ids", []), modelo_id) | |
| for campo in ("trabalho_nome", "tipo_label", "label", "endereco", "numero"): | |
| if not str(atual.get(campo) or "").strip() and str(item.get(campo) or "").strip(): | |
| atual[campo] = item.get(campo) | |
| consolidado = list(agregados.values()) | |
| consolidado.sort( | |
| key=lambda item: ( | |
| str(item.get("trabalho_nome") or "").casefold(), | |
| str(item.get("label") or item.get("endereco") or "").casefold(), | |
| str(item.get("numero") or "").casefold(), | |
| ) | |
| ) | |
| return sanitize_value(consolidado) | |
| def gerar_mapa_modelos( | |
| modelos_ids: list[str], | |
| limite_pontos_por_modelo: int = 0, | |
| avaliando_lat: float | None = None, | |
| avaliando_lon: float | None = None, | |
| avaliandos: list[dict[str, Any]] | None = None, | |
| modo_exibicao: str | None = "pontos", | |
| criterio_espacial: str | None = CRITERIO_ESPACIAL_PADRAO, | |
| trabalhos_tecnicos_modelos_modo: str | None = TRABALHOS_TECNICOS_MODELOS_SELECIONADOS, | |
| trabalhos_tecnicos_proximidade_modo: str | None = TRABALHOS_TECNICOS_PROXIMIDADE_DESATIVADA, | |
| trabalhos_tecnicos_raio_m: int | float | None = TRABALHOS_TECNICOS_RAIO_PADRAO_M, | |
| ) -> dict[str, Any]: | |
| ids = [str(item).strip() for item in (modelos_ids or []) if str(item).strip()] | |
| if not ids: | |
| raise HTTPException(status_code=400, detail="Selecione ao menos um modelo para gerar o mapa") | |
| pasta = ensure_modelos_dir() | |
| caminhos_por_id = {caminho.stem: caminho for caminho in pasta.glob("*.dai")} | |
| selecionados: list[tuple[str, Path]] = [] | |
| vistos = set() | |
| for modelo_id in ids: | |
| if modelo_id in vistos: | |
| continue | |
| caminho = caminhos_por_id.get(modelo_id) | |
| if caminho is None: | |
| continue | |
| vistos.add(modelo_id) | |
| selecionados.append((modelo_id, caminho)) | |
| if not selecionados: | |
| raise HTTPException(status_code=404, detail="Nenhum modelo selecionado foi encontrado na pasta de pesquisa") | |
| modo_exibicao_norm = _normalizar_modo_exibicao_mapa(modo_exibicao) | |
| trabalhos_tecnicos_modelos_modo_norm = _normalizar_modo_trabalhos_tecnicos_modelos_mapa(trabalhos_tecnicos_modelos_modo) | |
| trabalhos_tecnicos_proximidade_modo_norm = _normalizar_modo_trabalhos_tecnicos_proximidade_mapa( | |
| trabalhos_tecnicos_proximidade_modo | |
| ) | |
| trabalhos_tecnicos_raio_m_norm = _normalizar_raio_trabalhos_tecnicos_mapa(trabalhos_tecnicos_raio_m) | |
| criterio_espacial_norm = _normalizar_criterio_espacial(criterio_espacial) | |
| modelos_plotados: list[dict[str, Any]] = [] | |
| bounds: list[list[float]] = [] | |
| aval_lat, aval_lon = _normalizar_coordenadas_avaliando(avaliando_lat, avaliando_lon) | |
| avaliandos_geo = _normalizar_avaliandos_geo(avaliandos) | |
| if not avaliandos_geo and aval_lat is not None and aval_lon is not None: | |
| avaliandos_geo = [{"id": "avaliando", "label": "Avaliando", "lat": aval_lat, "lon": aval_lon}] | |
| avaliando_unico = avaliandos_geo[0] if len(avaliandos_geo) == 1 else None | |
| if avaliando_unico is not None: | |
| aval_lat, aval_lon = _normalizar_coordenadas_avaliando(avaliando_unico.get("lat"), avaliando_unico.get("lon")) | |
| else: | |
| aval_lat, aval_lon = None, None | |
| chaves_modelos_selecionados: list[str] = [] | |
| familias_versoes = ( | |
| _montar_familias_versoes_modelos(list(caminhos_por_id.values())) | |
| if _modo_trabalhos_tecnicos_inclui_outras_versoes(trabalhos_tecnicos_modelos_modo_norm) | |
| else {} | |
| ) | |
| familias_trabalhos_tecnicos = ( | |
| _normalizar_familias_trabalhos_tecnicos_modelos() | |
| if _modo_trabalhos_tecnicos_inclui_outras_versoes(trabalhos_tecnicos_modelos_modo_norm) | |
| else {} | |
| ) | |
| for avaliando in avaliandos_geo: | |
| lat_item, lon_item = _normalizar_coordenadas_avaliando(avaliando.get("lat"), avaliando.get("lon")) | |
| if lat_item is None or lon_item is None: | |
| continue | |
| bounds.append([lat_item, lon_item]) | |
| for idx, (modelo_id, caminho) in enumerate(selecionados): | |
| resumo = _carregar_resumo_com_cache(caminho) | |
| df = _carregar_dataframe_modelo(caminho) | |
| if df is None or df.empty: | |
| continue | |
| pontos = _coletar_pontos_modelo(df, limite_pontos_por_modelo) | |
| if not pontos: | |
| continue | |
| geometria = _carregar_geometria_modelo_com_cache(caminho) | |
| cor = MAP_COLORS[idx % len(MAP_COLORS)] | |
| nome = str(resumo.get("nome_modelo") or modelo_id) | |
| distancias_avaliandos, distancia_resumo = _calcular_distancias_avaliandos( | |
| geometria, | |
| avaliandos_geo, | |
| criterio_espacial_norm, | |
| ) | |
| distancia_info = ( | |
| { | |
| "distancia_km": distancia_resumo.get("principal_distancia_km"), | |
| "distancia_label": distancia_resumo.get("principal_distancia_label"), | |
| } | |
| if len(avaliandos_geo) > 1 | |
| else _calcular_distancia_geometria_cache(geometria, aval_lat, aval_lon) | |
| ) | |
| aliases_modelo = _resolver_aliases_modelo_para_trabalhos_tecnicos( | |
| modelo_id=modelo_id, | |
| caminho=caminho, | |
| nome_modelo=nome, | |
| incluir_outras_versoes=_modo_trabalhos_tecnicos_inclui_outras_versoes(trabalhos_tecnicos_modelos_modo_norm), | |
| familias_versoes=familias_versoes, | |
| familias_trabalhos_tecnicos=familias_trabalhos_tecnicos, | |
| ) | |
| _append_aliases_unicos(chaves_modelos_selecionados, *aliases_modelo) | |
| avaliandos_tecnicos = trabalhos_tecnicos_service.listar_avaliandos_por_modelo(aliases_modelo) | |
| modelos_plotados.append( | |
| { | |
| "id": modelo_id, | |
| "nome": nome, | |
| "cor": cor, | |
| "total_pontos": len(pontos), | |
| "pontos": pontos, | |
| "geometria": geometria, | |
| "distancia_km": distancia_info.get("distancia_km"), | |
| "distancia_label": distancia_info.get("distancia_label"), | |
| "distancias_avaliandos": distancias_avaliandos, | |
| "distancia_resumo": distancia_resumo, | |
| "avaliandos_tecnicos": avaliandos_tecnicos, | |
| } | |
| ) | |
| bounds.extend([[ponto["lat"], ponto["lon"]] for ponto in pontos]) | |
| bounds.extend(_extrair_bounds_geometria_wgs84(geometria)) | |
| avaliandos_tecnicos_proximos: list[dict[str, Any]] | None = None | |
| if ( | |
| avaliandos_geo | |
| and trabalhos_tecnicos_proximidade_modo_norm == TRABALHOS_TECNICOS_PROXIMIDADE_ATIVADA | |
| ): | |
| avaliandos_tecnicos_proximos = _listar_avaliandos_tecnicos_proximos_por_avaliandos( | |
| avaliandos_geo, | |
| trabalhos_tecnicos_raio_m_norm, | |
| chaves_modelos_selecionados, | |
| ) | |
| for item in avaliandos_tecnicos_proximos or []: | |
| prox_lat, prox_lon = _normalizar_coordenadas_avaliando(item.get("coord_lat"), item.get("coord_lon")) | |
| if prox_lat is None or prox_lon is None: | |
| continue | |
| bounds.append([prox_lat, prox_lon]) | |
| if not modelos_plotados: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Nao foi possivel gerar o mapa: os modelos selecionados nao possuem coordenadas validas", | |
| ) | |
| total_pontos = 0 | |
| for modelo in modelos_plotados: | |
| total_pontos += int(modelo["total_pontos"]) | |
| mapa_payload = _build_mapa_modelos_payload( | |
| modelos_plotados, | |
| bounds, | |
| avaliandos_geo, | |
| modo_exibicao_norm, | |
| avaliandos_tecnicos_proximos=avaliandos_tecnicos_proximos, | |
| trabalhos_tecnicos_raio_m=trabalhos_tecnicos_raio_m_norm, | |
| ) | |
| mapa_html = "" | |
| if mapa_payload is None: | |
| mapa_html = _renderizar_mapa_modelos( | |
| modelos_plotados, | |
| bounds, | |
| avaliandos_geo, | |
| modo_exibicao_norm, | |
| avaliandos_tecnicos_proximos=avaliandos_tecnicos_proximos, | |
| trabalhos_tecnicos_raio_m=trabalhos_tecnicos_raio_m_norm, | |
| ) | |
| total_trabalhos_modelos = len( | |
| { | |
| _chave_unica_trabalho_tecnico(item) | |
| for modelo in modelos_plotados | |
| for item in (modelo.get("avaliandos_tecnicos") or []) | |
| } | |
| ) | |
| total_trabalhos_proximos = len(avaliandos_tecnicos_proximos or []) | |
| descricao_modelos_status = _descricao_modo_trabalhos_tecnicos_modelos(trabalhos_tecnicos_modelos_modo_norm) | |
| detalhe_trabalhos = ( | |
| f" Trabalhos técnicos {descricao_modelos_status}: {total_trabalhos_modelos}." | |
| + ( | |
| f" {'Próximos aos avaliandos' if len(avaliandos_geo) > 1 else 'Próximos ao avaliando'} (até {trabalhos_tecnicos_raio_m_norm} m): {total_trabalhos_proximos}." | |
| if ( | |
| avaliandos_geo | |
| and trabalhos_tecnicos_proximidade_modo_norm == TRABALHOS_TECNICOS_PROXIMIDADE_ATIVADA | |
| ) | |
| else "" | |
| ) | |
| ) | |
| return sanitize_value( | |
| { | |
| "mapa_html": mapa_html, | |
| "mapa_html_pontos": mapa_html if modo_exibicao_norm == "pontos" else "", | |
| "mapa_html_cobertura": mapa_html if modo_exibicao_norm == "cobertura" else "", | |
| "mapa_payload": mapa_payload, | |
| "mapa_payload_pontos": mapa_payload if modo_exibicao_norm == "pontos" else None, | |
| "mapa_payload_cobertura": mapa_payload if modo_exibicao_norm == "cobertura" else None, | |
| "total_modelos_plotados": len(modelos_plotados), | |
| "total_pontos": total_pontos, | |
| "modelos_plotados": [ | |
| { | |
| "id": modelo["id"], | |
| "nome": modelo["nome"], | |
| "cor": modelo["cor"], | |
| "total_pontos": modelo["total_pontos"], | |
| "distancia_km": modelo.get("distancia_km"), | |
| "distancia_label": modelo.get("distancia_label"), | |
| "distancia_resumo": modelo.get("distancia_resumo"), | |
| "distancias_avaliandos": modelo.get("distancias_avaliandos"), | |
| } | |
| for modelo in modelos_plotados | |
| ], | |
| "avaliando": { | |
| "lat": aval_lat, | |
| "lon": aval_lon, | |
| "ativo": bool(aval_lat is not None and aval_lon is not None), | |
| }, | |
| "avaliandos": avaliandos_geo, | |
| "criterio_espacial": criterio_espacial_norm, | |
| "modo_exibicao": modo_exibicao_norm, | |
| "trabalhos_tecnicos_modelos_modo": trabalhos_tecnicos_modelos_modo_norm, | |
| "trabalhos_tecnicos_proximidade_modo": trabalhos_tecnicos_proximidade_modo_norm, | |
| "trabalhos_tecnicos_raio_m": trabalhos_tecnicos_raio_m_norm, | |
| "total_trabalhos_tecnicos_modelos": total_trabalhos_modelos, | |
| "total_trabalhos_tecnicos_proximos": total_trabalhos_proximos, | |
| "status": ( | |
| f"Mapas de pontos e cobertura gerados com {len(modelos_plotados)} modelo(s) e {total_pontos} ponto(s)" | |
| + ( | |
| " com avaliando destacado." | |
| if avaliando_unico is not None and aval_lat is not None and aval_lon is not None | |
| else (f" com {len(avaliandos_geo)} avaliandos destacados." if len(avaliandos_geo) > 1 else ".") | |
| ) | |
| + detalhe_trabalhos | |
| ), | |
| } | |
| ) | |
| def _normalizar_modo_exibicao_mapa(value: Any) -> str: | |
| modo = str(value or "").strip().lower() | |
| if modo == "cobertura": | |
| return "cobertura" | |
| return "pontos" | |
| def _normalizar_modo_trabalhos_tecnicos_modelos_mapa(value: Any) -> str: | |
| modo = str(value or "").strip().lower() | |
| if modo in { | |
| TRABALHOS_TECNICOS_MODELOS_SELECIONADOS_E_OUTRAS_VERSOES, | |
| TRABALHOS_TECNICOS_MODELOS_SELECIONADOS_E_ANTERIORES, | |
| }: | |
| return TRABALHOS_TECNICOS_MODELOS_SELECIONADOS_E_OUTRAS_VERSOES | |
| return TRABALHOS_TECNICOS_MODELOS_SELECIONADOS | |
| def _normalizar_modo_trabalhos_tecnicos_proximidade_mapa(value: Any) -> str: | |
| modo = str(value or "").strip().lower() | |
| if modo == TRABALHOS_TECNICOS_PROXIMIDADE_ATIVADA: | |
| return TRABALHOS_TECNICOS_PROXIMIDADE_ATIVADA | |
| return TRABALHOS_TECNICOS_PROXIMIDADE_DESATIVADA | |
| def _normalizar_raio_trabalhos_tecnicos_mapa(value: Any) -> int: | |
| numero = _to_float_or_none(value) | |
| if numero is None: | |
| return int(TRABALHOS_TECNICOS_RAIO_PADRAO_M) | |
| if not np.isfinite(numero): | |
| return int(TRABALHOS_TECNICOS_RAIO_PADRAO_M) | |
| return max(0, min(int(TRABALHOS_TECNICOS_RAIO_MAX_M), int(round(numero)))) | |
| def _montar_tooltip_distancia_modelo(modelo: dict[str, Any]) -> str: | |
| resumo = modelo.get("distancia_resumo") if isinstance(modelo.get("distancia_resumo"), dict) else {} | |
| total_avaliandos = int(resumo.get("total_avaliandos") or 0) | |
| if total_avaliandos > 1: | |
| return ( | |
| "Resumo espacial" | |
| f' • Maior: {resumo.get("maior_distancia_label") or "-"}' | |
| f' • Media: {resumo.get("media_distancia_label") or "-"}' | |
| f' • Menor: {resumo.get("menor_distancia_label") or "-"}' | |
| ) | |
| if modelo.get("distancia_label"): | |
| return f'Distancia: {modelo["distancia_label"]}' | |
| return "" | |
| def _tooltip_mapa_modelo_html(modelo: dict[str, Any]) -> str: | |
| nome = str(modelo.get("nome") or "Modelo").strip() or "Modelo" | |
| tooltip_distancia = _montar_tooltip_distancia_modelo(modelo) | |
| if tooltip_distancia: | |
| return ( | |
| "<div style='font-family:\"Segoe UI\",Arial,sans-serif; font-size:13px; line-height:1.5;'>" | |
| f"<b>{escape(nome)}</b><br>{escape(tooltip_distancia)}" | |
| "</div>" | |
| ) | |
| return ( | |
| "<div style='font-family:\"Segoe UI\",Arial,sans-serif; font-size:13px; line-height:1.5;'>" | |
| f"<b>{escape(nome)}</b>" | |
| "</div>" | |
| ) | |
| def _marker_payloads_avaliandos(avaliandos_geo: list[dict[str, Any]]) -> list[dict[str, Any]]: | |
| payloads: list[dict[str, Any]] = [] | |
| for idx, avaliando in enumerate(avaliandos_geo): | |
| lat_item, lon_item = _normalizar_coordenadas_avaliando(avaliando.get("lat"), avaliando.get("lon")) | |
| if lat_item is None or lon_item is None: | |
| continue | |
| label = str(avaliando.get("label") or f"A{idx + 1}").strip() or f"A{idx + 1}" | |
| endereco = str(avaliando.get("logradouro") or "").strip() | |
| numero_usado = str(avaliando.get("numero_usado") or "").strip() | |
| tooltip = label | |
| if endereco: | |
| tooltip = f"{tooltip} • {endereco}{f', {numero_usado}' if numero_usado else ''}" | |
| marker_html = ( | |
| "<div style='display:flex;align-items:center;justify-content:center;" | |
| "width:30px;height:30px;filter:drop-shadow(0 2px 6px rgba(0,0,0,0.22));'>" | |
| "<div style='display:flex;align-items:center;justify-content:center;" | |
| "width:24px;height:24px;border-radius:999px;background:rgba(255,255,255,0.97);" | |
| "box-shadow:0 0 0 1.4px rgba(255,255,255,0.98),0 0 0 2.5px rgba(0,0,0,0.74);'>" | |
| "<svg viewBox='0 0 24 24' width='20' height='20' aria-hidden='true'>" | |
| "<path d='M12 3.2L3.9 10v10.1h5.4v-5.3h5.4v5.3h5.4V10L12 3.2Z' " | |
| "fill='#c62828' stroke='rgba(255,255,255,0.96)' stroke-width='1.2' stroke-linejoin='round'/>" | |
| "<rect x='10.1' y='15.2' width='3.8' height='4.9' rx='0.5' fill='rgba(255,255,255,0.96)'/>" | |
| "</svg>" | |
| "</div>" | |
| "</div>" | |
| ) | |
| payloads.append( | |
| { | |
| "lat": float(lat_item), | |
| "lon": float(lon_item), | |
| "tooltip_html": escape(tooltip), | |
| "marker_html": marker_html, | |
| "icon_size": [30, 30], | |
| "icon_anchor": [15, 15], | |
| "class_name": "mesa-avaliando-marker", | |
| } | |
| ) | |
| return payloads | |
| def _shapes_modelo_payload( | |
| modelo: dict[str, Any], | |
| aval_lat: float | None, | |
| aval_lon: float | None, | |
| ) -> list[dict[str, Any]]: | |
| geometria = modelo.get("geometria") or {} | |
| geom_wgs84 = geometria.get("geom_wgs84") | |
| if geom_wgs84 is None: | |
| return [] | |
| tooltip = _tooltip_mapa_modelo_html(modelo) | |
| cor = str(modelo.get("cor") or "#1f77b4") | |
| geom_type = str(geometria.get("geom_type") or "") | |
| shapes: list[dict[str, Any]] = [] | |
| try: | |
| if geom_type == "Polygon": | |
| coords = [[float(lat), float(lon)] for lon, lat in list(geom_wgs84.exterior.coords)] | |
| shapes.append({"type": "polygon", "coords": coords, "color": cor, "fill": True, "fill_opacity": 0.12, "weight": 2, "tooltip_html": tooltip}) | |
| elif geom_type == "MultiPolygon": | |
| for poligono in list(getattr(geom_wgs84, "geoms", [])): | |
| coords = [[float(lat), float(lon)] for lon, lat in list(poligono.exterior.coords)] | |
| shapes.append({"type": "polygon", "coords": coords, "color": cor, "fill": True, "fill_opacity": 0.12, "weight": 2, "tooltip_html": tooltip}) | |
| elif geom_type == "LineString": | |
| coords = [[float(lat), float(lon)] for lon, lat in list(geom_wgs84.coords)] | |
| shapes.append({"type": "polyline", "coords": coords, "color": cor, "weight": 3, "opacity": 0.8, "tooltip_html": tooltip}) | |
| elif geom_type == "MultiLineString": | |
| for linha in list(getattr(geom_wgs84, "geoms", [])): | |
| coords = [[float(lat), float(lon)] for lon, lat in list(linha.coords)] | |
| shapes.append({"type": "polyline", "coords": coords, "color": cor, "weight": 3, "opacity": 0.8, "tooltip_html": tooltip}) | |
| elif geom_type == "Point": | |
| shapes.append({"type": "circlemarker", "center": [float(geom_wgs84.y), float(geom_wgs84.x)], "radius": 6, "color": cor, "fill": True, "fill_opacity": 0.7, "tooltip_html": tooltip}) | |
| elif geom_type == "MultiPoint": | |
| for ponto in list(getattr(geom_wgs84, "geoms", [])): | |
| shapes.append({"type": "circlemarker", "center": [float(ponto.y), float(ponto.x)], "radius": 6, "color": cor, "fill": True, "fill_opacity": 0.7, "tooltip_html": tooltip}) | |
| except Exception: | |
| return [] | |
| if aval_lat is None or aval_lon is None: | |
| return shapes | |
| try: | |
| from shapely.geometry import Point | |
| from shapely.ops import nearest_points | |
| except ImportError: | |
| return shapes | |
| try: | |
| aval_point = Point(float(aval_lon), float(aval_lat)) | |
| _, nearest_geom = nearest_points(aval_point, geom_wgs84) | |
| distancia_km = _to_float_or_none(modelo.get("distancia_km")) | |
| if nearest_geom is None or distancia_km is None or distancia_km <= 0: | |
| return shapes | |
| shapes.append( | |
| { | |
| "type": "polyline", | |
| "coords": [ | |
| [float(aval_point.y), float(aval_point.x)], | |
| [float(nearest_geom.y), float(nearest_geom.x)], | |
| ], | |
| "color": cor, | |
| "weight": 2, | |
| "opacity": 0.65, | |
| "dash_array": "6,6", | |
| "tooltip_html": f"Ligação de distância • {escape(str(modelo.get('nome') or 'Modelo'))}", | |
| } | |
| ) | |
| except Exception: | |
| return shapes | |
| return shapes | |
| def _build_mapa_modelos_payload( | |
| modelos_plotados: list[dict[str, Any]], | |
| bounds: list[list[float]], | |
| avaliandos_geo: list[dict[str, Any]], | |
| modo_exibicao: str, | |
| avaliandos_tecnicos_proximos: list[dict[str, Any]] | None = None, | |
| trabalhos_tecnicos_raio_m: int | None = None, | |
| ) -> dict[str, Any] | None: | |
| overlay_layers: list[dict[str, Any]] = [] | |
| avaliando_unico = avaliandos_geo[0] if len(avaliandos_geo) == 1 else None | |
| aval_lat = avaliando_unico.get("lat") if avaliando_unico is not None else None | |
| aval_lon = avaliando_unico.get("lon") if avaliando_unico is not None else None | |
| trabalhos_tecnicos_modelos = _consolidar_trabalhos_tecnicos_modelos(modelos_plotados) | |
| trabalhos_tecnicos_modelos_markers = ( | |
| build_trabalhos_tecnicos_marker_payloads( | |
| trabalhos_tecnicos_modelos, | |
| apply_jitter=False, | |
| ) | |
| if trabalhos_tecnicos_modelos | |
| else [] | |
| ) | |
| trabalhos_tecnicos_proximos_markers = ( | |
| build_trabalhos_tecnicos_marker_payloads( | |
| avaliandos_tecnicos_proximos or [], | |
| origem="pesquisa_mapa", | |
| marker_style="estrela", | |
| ignore_bounds=False, | |
| apply_jitter=False, | |
| ) | |
| if avaliandos_tecnicos_proximos is not None | |
| else [] | |
| ) | |
| if trabalhos_tecnicos_modelos_markers or trabalhos_tecnicos_proximos_markers: | |
| apply_marker_payload_jitter([*trabalhos_tecnicos_modelos_markers, *trabalhos_tecnicos_proximos_markers]) | |
| for modelo in modelos_plotados: | |
| layer: dict[str, Any] = { | |
| "id": str(modelo.get("id") or ""), | |
| "label": str(modelo.get("nome") or "Modelo"), | |
| "show": True, | |
| "hover_highlight_group": "pesquisa-modelos", | |
| } | |
| if modo_exibicao == "pontos": | |
| tooltip_html = _tooltip_mapa_modelo_html(modelo) | |
| layer["points"] = [ | |
| { | |
| "lat": float(ponto["lat"]), | |
| "lon": float(ponto["lon"]), | |
| "color": str(modelo.get("cor") or "#1f77b4"), | |
| "base_radius": 3.0, | |
| "stroke_color": str(modelo.get("cor") or "#1f77b4"), | |
| "stroke_weight": 1.0, | |
| "fill_opacity": 0.72, | |
| "tooltip_html": tooltip_html, | |
| } | |
| for ponto in (modelo.get("pontos") or []) | |
| ] | |
| else: | |
| layer["shapes"] = _shapes_modelo_payload(modelo, aval_lat, aval_lon) | |
| overlay_layers.append(layer) | |
| if trabalhos_tecnicos_modelos: | |
| overlay_layers.append( | |
| { | |
| "id": "trabalhos-tecnicos-modelos", | |
| "label": "Trabalhos tecnicos dos modelos", | |
| "show": True, | |
| "markers": trabalhos_tecnicos_modelos_markers, | |
| } | |
| ) | |
| if avaliandos_tecnicos_proximos is not None: | |
| label_raio = f" (até {int(trabalhos_tecnicos_raio_m or 0)} m)" if trabalhos_tecnicos_raio_m is not None else "" | |
| proximos_layer: dict[str, Any] = { | |
| "id": "trabalhos-tecnicos-proximos", | |
| "label": f"Trabalhos técnicos próximos{label_raio}", | |
| "show": True, | |
| "required_overlay_ids": ["trabalhos-tecnicos-modelos"], | |
| "markers": trabalhos_tecnicos_proximos_markers, | |
| "shapes": [], | |
| } | |
| if int(trabalhos_tecnicos_raio_m or 0) > 0: | |
| for idx, avaliando in enumerate(avaliandos_geo): | |
| lat_item, lon_item = _normalizar_coordenadas_avaliando(avaliando.get("lat"), avaliando.get("lon")) | |
| if lat_item is None or lon_item is None: | |
| continue | |
| label = str(avaliando.get("label") or f"A{idx + 1}").strip() or f"A{idx + 1}" | |
| tooltip_raio = ( | |
| f"Raio de busca: até {int(trabalhos_tecnicos_raio_m)} m" | |
| if len(avaliandos_geo) == 1 | |
| else f"Raio de busca • {label}: até {int(trabalhos_tecnicos_raio_m)} m" | |
| ) | |
| proximos_layer["shapes"].append( | |
| { | |
| "type": "circle", | |
| "center": [float(lat_item), float(lon_item)], | |
| "radius_m": float(trabalhos_tecnicos_raio_m), | |
| "color": "#c62828", | |
| "weight": 2, | |
| "opacity": 0.75, | |
| "fill": True, | |
| "fill_color": "#c62828", | |
| "fill_opacity": 0.06, | |
| "dash_array": "6,6", | |
| "tooltip_html": tooltip_raio, | |
| } | |
| ) | |
| overlay_layers.append(proximos_layer) | |
| avaliandos_markers = _marker_payloads_avaliandos(avaliandos_geo) | |
| if avaliandos_markers: | |
| overlay_layers.append( | |
| { | |
| "id": "avaliandos", | |
| "label": "Avaliando" if len(avaliandos_geo) == 1 else "Avaliandos", | |
| "show": True, | |
| "markers": avaliandos_markers, | |
| } | |
| ) | |
| payload = build_leaflet_payload(bounds=bounds, overlay_layers=overlay_layers, show_bairros=True) | |
| if payload and isinstance(payload.get("controls"), dict): | |
| payload["controls"]["layer_control_collapsed"] = False | |
| return payload | |
| def _renderizar_mapa_modelos( | |
| modelos_plotados: list[dict[str, Any]], | |
| bounds: list[list[float]], | |
| avaliandos_geo: list[dict[str, Any]], | |
| modo_exibicao: str, | |
| avaliandos_tecnicos_proximos: list[dict[str, Any]] | None = None, | |
| trabalhos_tecnicos_raio_m: int | None = None, | |
| ) -> str: | |
| renderizar_pontos = modo_exibicao == "pontos" | |
| renderizar_cobertura = modo_exibicao == "cobertura" | |
| avaliando_unico = avaliandos_geo[0] if len(avaliandos_geo) == 1 else None | |
| aval_lat = avaliando_unico.get("lat") if avaliando_unico is not None else None | |
| aval_lon = avaliando_unico.get("lon") if avaliando_unico is not None else None | |
| centro_lat = sum(coord[0] for coord in bounds) / len(bounds) | |
| centro_lon = sum(coord[1] for coord in bounds) / len(bounds) | |
| mapa = folium.Map( | |
| location=[centro_lat, centro_lon], | |
| zoom_start=12, | |
| control_scale=True, | |
| tiles=None, | |
| ) | |
| folium.TileLayer(tiles="OpenStreetMap", name="OpenStreetMap", control=True, show=True).add_to(mapa) | |
| folium.TileLayer(tiles="CartoDB positron", name="Positron", control=True, show=False).add_to(mapa) | |
| add_bairros_layer(mapa, show=True) | |
| nome_camada_avaliando = "Avaliando" if len(avaliandos_geo) == 1 else "Avaliandos" | |
| camada_avaliando = folium.FeatureGroup(name=nome_camada_avaliando, show=True) | |
| trabalhos_tecnicos_modelos = _consolidar_trabalhos_tecnicos_modelos(modelos_plotados) | |
| trabalhos_tecnicos_modelos_markers = ( | |
| build_trabalhos_tecnicos_marker_payloads( | |
| trabalhos_tecnicos_modelos, | |
| apply_jitter=False, | |
| ) | |
| if trabalhos_tecnicos_modelos | |
| else [] | |
| ) | |
| trabalhos_tecnicos_proximos_markers = ( | |
| build_trabalhos_tecnicos_marker_payloads( | |
| avaliandos_tecnicos_proximos or [], | |
| origem="pesquisa_mapa", | |
| marker_style="estrela", | |
| ignore_bounds=False, | |
| apply_jitter=False, | |
| ) | |
| if avaliandos_tecnicos_proximos is not None | |
| else [] | |
| ) | |
| if trabalhos_tecnicos_modelos_markers or trabalhos_tecnicos_proximos_markers: | |
| apply_marker_payload_jitter([*trabalhos_tecnicos_modelos_markers, *trabalhos_tecnicos_proximos_markers]) | |
| camada_trabalhos_modelos = ( | |
| folium.FeatureGroup(name="Trabalhos tecnicos dos modelos", show=True) | |
| if trabalhos_tecnicos_modelos | |
| else None | |
| ) | |
| camada_trabalhos_proximos = None | |
| if avaliandos_tecnicos_proximos is not None: | |
| label_raio = f" (ate {int(trabalhos_tecnicos_raio_m or 0)} m)" if trabalhos_tecnicos_raio_m is not None else "" | |
| camada_trabalhos_proximos = folium.FeatureGroup( | |
| name=f"Trabalhos tecnicos proximos{label_raio}", | |
| show=True, | |
| ) | |
| for modelo in modelos_plotados: | |
| nome_layer = str(modelo.get("nome") or modelo.get("id") or "Modelo").strip() or "Modelo" | |
| nome_layer_html = ( | |
| "<span style='display:inline-flex;align-items:center;gap:6px;'>" | |
| f"<span style='display:inline-block;width:10px;height:10px;border-radius:999px;" | |
| f"background:{escape(str(modelo.get('cor') or '#1f77b4'))};" | |
| "border:1px solid rgba(0,0,0,0.35);flex:0 0 auto;'></span>" | |
| f"<span>{escape(nome_layer)}</span>" | |
| "</span>" | |
| ) | |
| camada_modelo = folium.FeatureGroup(name=nome_layer_html, show=True) | |
| modelo["layer_var"] = camada_modelo.get_name() | |
| if renderizar_pontos: | |
| tooltip_modelo = nome_layer | |
| tooltip_distancia = _montar_tooltip_distancia_modelo(modelo) | |
| if tooltip_distancia: | |
| tooltip_modelo = f"{tooltip_modelo} • {tooltip_distancia}" | |
| for ponto in modelo["pontos"]: | |
| marcador = folium.CircleMarker( | |
| location=[ponto["lat"], ponto["lon"]], | |
| radius=3, | |
| color=modelo["cor"], | |
| fill=True, | |
| fill_color=modelo["cor"], | |
| fill_opacity=0.72, | |
| opacity=0.9, | |
| weight=1, | |
| tooltip=tooltip_modelo, | |
| ).add_to(camada_modelo) | |
| marcador.options["mesaBaseRadius"] = 3.0 | |
| if renderizar_cobertura: | |
| _adicionar_geometria_modelo_no_mapa(camada_modelo, camada_modelo, modelo, aval_lat, aval_lon) | |
| camada_modelo.add_to(mapa) | |
| if camada_trabalhos_modelos is not None: | |
| add_marker_payloads(camada_trabalhos_modelos, trabalhos_tecnicos_modelos_markers) | |
| camada_trabalhos_modelos.add_to(mapa) | |
| if camada_trabalhos_proximos is not None: | |
| if int(trabalhos_tecnicos_raio_m or 0) > 0: | |
| for idx, avaliando in enumerate(avaliandos_geo): | |
| lat_item, lon_item = _normalizar_coordenadas_avaliando(avaliando.get("lat"), avaliando.get("lon")) | |
| if lat_item is None or lon_item is None: | |
| continue | |
| label = str(avaliando.get("label") or f"A{idx + 1}").strip() or f"A{idx + 1}" | |
| tooltip_raio = ( | |
| f"Raio de busca: ate {int(trabalhos_tecnicos_raio_m)} m" | |
| if len(avaliandos_geo) == 1 | |
| else f"Raio de busca • {label}: ate {int(trabalhos_tecnicos_raio_m)} m" | |
| ) | |
| folium.Circle( | |
| location=[lat_item, lon_item], | |
| radius=float(trabalhos_tecnicos_raio_m), | |
| color="#c62828", | |
| weight=2, | |
| opacity=0.75, | |
| fill=True, | |
| fill_color="#c62828", | |
| fill_opacity=0.06, | |
| dash_array="6,6", | |
| tooltip=tooltip_raio, | |
| ).add_to(camada_trabalhos_proximos) | |
| add_marker_payloads(camada_trabalhos_proximos, trabalhos_tecnicos_proximos_markers) | |
| camada_trabalhos_proximos.add_to(mapa) | |
| for idx, avaliando in enumerate(avaliandos_geo): | |
| lat_item, lon_item = _normalizar_coordenadas_avaliando(avaliando.get("lat"), avaliando.get("lon")) | |
| if lat_item is None or lon_item is None: | |
| continue | |
| label = str(avaliando.get("label") or f"A{idx + 1}").strip() or f"A{idx + 1}" | |
| endereco = str(avaliando.get("logradouro") or "").strip() | |
| numero_usado = str(avaliando.get("numero_usado") or "").strip() | |
| tooltip = label | |
| if endereco: | |
| tooltip = f"{tooltip} • {endereco}{f', {numero_usado}' if numero_usado else ''}" | |
| marker_icon = folium.DivIcon( | |
| html=( | |
| "<div style='display:flex;align-items:center;justify-content:center;" | |
| "width:30px;height:30px;filter:drop-shadow(0 2px 6px rgba(0,0,0,0.22));'>" | |
| "<div style='display:flex;align-items:center;justify-content:center;" | |
| "width:24px;height:24px;border-radius:999px;background:rgba(255,255,255,0.97);" | |
| "box-shadow:0 0 0 1.4px rgba(255,255,255,0.98),0 0 0 2.5px rgba(0,0,0,0.74);'>" | |
| "<svg viewBox='0 0 24 24' width='20' height='20' aria-hidden='true'>" | |
| "<path d='M12 3.2L3.9 10v10.1h5.4v-5.3h5.4v5.3h5.4V10L12 3.2Z' " | |
| "fill='#c62828' stroke='rgba(255,255,255,0.96)' stroke-width='1.2' stroke-linejoin='round'/>" | |
| "<rect x='10.1' y='15.2' width='3.8' height='4.9' rx='0.5' fill='rgba(255,255,255,0.96)'/>" | |
| "</svg>" | |
| "</div>" | |
| "</div>" | |
| ), | |
| icon_size=(30, 30), | |
| icon_anchor=(15, 15), | |
| class_name="mesa-avaliando-marker", | |
| ) | |
| folium.Marker( | |
| location=[lat_item, lon_item], | |
| tooltip=tooltip, | |
| icon=marker_icon, | |
| ).add_to(camada_avaliando) | |
| if avaliandos_geo: | |
| camada_avaliando.add_to(mapa) | |
| plugins.Fullscreen().add_to(mapa) | |
| add_zoom_responsive_circle_markers(mapa) | |
| layer_control = folium.LayerControl(collapsed=False) | |
| layer_control.add_to(mapa) | |
| _adicionar_interacao_hover_legenda_modelos(mapa, layer_control, modelos_plotados) | |
| if bounds: | |
| lat_values = [float(coord[0]) for coord in bounds] | |
| lon_values = [float(coord[1]) for coord in bounds] | |
| lat_min, lat_max = min(lat_values), max(lat_values) | |
| lon_min, lon_max = min(lon_values), max(lon_values) | |
| if math.isclose(lat_min, lat_max): | |
| lat_delta = 0.0008 | |
| lat_min -= lat_delta | |
| lat_max += lat_delta | |
| if math.isclose(lon_min, lon_max): | |
| lon_delta = 0.0008 | |
| lon_min -= lon_delta | |
| lon_max += lon_delta | |
| mapa.fit_bounds([[lat_min, lon_min], [lat_max, lon_max]], padding=(48, 48), max_zoom=18) | |
| return mapa.get_root().render() | |
| def _adicionar_interacao_hover_legenda_modelos( | |
| mapa: folium.Map, | |
| layer_control: folium.map.LayerControl, | |
| modelos_plotados: list[dict[str, Any]], | |
| ) -> None: | |
| modelos_layers = [ | |
| { | |
| "id": str(modelo.get("id") or ""), | |
| "layer_var": str(modelo.get("layer_var") or ""), | |
| } | |
| for modelo in modelos_plotados | |
| if str(modelo.get("id") or "").strip() and str(modelo.get("layer_var") or "").strip() | |
| ] | |
| if not modelos_layers: | |
| return | |
| style = """ | |
| <style> | |
| .leaflet-container .leaflet-interactive, | |
| .leaflet-container .leaflet-interactive:focus, | |
| .leaflet-container svg path.leaflet-interactive, | |
| .leaflet-container svg path.leaflet-interactive:focus { | |
| outline: none !important; | |
| box-shadow: none !important; | |
| } | |
| .leaflet-control-layers-overlays label.mesa-model-layer-toggle { | |
| transition: opacity 0.16s ease, color 0.16s ease, font-weight 0.16s ease; | |
| cursor: pointer; | |
| } | |
| .leaflet-control-layers-overlays label.mesa-model-layer-toggle.mesa-model-layer-dimmed { | |
| opacity: 0.42; | |
| } | |
| .leaflet-control-layers-overlays label.mesa-model-layer-toggle.mesa-model-layer-active { | |
| opacity: 1; | |
| color: #123b63; | |
| font-weight: 700; | |
| } | |
| </style> | |
| """ | |
| mapa.get_root().html.add_child(Element(style)) | |
| map_name = mapa.get_name() | |
| modelos_payload = json.dumps(modelos_layers, ensure_ascii=True) | |
| script = f""" | |
| <script> | |
| (function() {{ | |
| const MAP_NAME = "{map_name}"; | |
| const MODEL_LAYERS = {modelos_payload}; | |
| function getLayersControlContainer(map) {{ | |
| if (!map || typeof map.getContainer !== 'function') return null; | |
| const mapContainer = map.getContainer(); | |
| if (!mapContainer || !mapContainer.querySelector) return null; | |
| return mapContainer.querySelector('.leaflet-control-layers'); | |
| }} | |
| function visitLayerTree(layer, visited, fn) {{ | |
| if (!layer || !layer._leaflet_id || visited.has(layer._leaflet_id)) return; | |
| visited.add(layer._leaflet_id); | |
| fn(layer); | |
| if (layer.eachLayer) {{ | |
| layer.eachLayer(function(child) {{ | |
| visitLayerTree(child, visited, fn); | |
| }}); | |
| }} | |
| }} | |
| function rememberPathBaseStyle(layer) {{ | |
| if (!layer || !layer.options) return; | |
| if (!Number.isFinite(layer.options.mesaBaseOpacity)) {{ | |
| layer.options.mesaBaseOpacity = Number.isFinite(layer.options.opacity) ? layer.options.opacity : 1; | |
| }} | |
| if (!Number.isFinite(layer.options.mesaBaseFillOpacity)) {{ | |
| layer.options.mesaBaseFillOpacity = Number.isFinite(layer.options.fillOpacity) ? layer.options.fillOpacity : 0; | |
| }} | |
| if (!Number.isFinite(layer.options.mesaBaseWeight)) {{ | |
| layer.options.mesaBaseWeight = Number.isFinite(layer.options.weight) ? layer.options.weight : 1; | |
| }} | |
| if (!layer.options.mesaBaseColor && layer.options.color) {{ | |
| layer.options.mesaBaseColor = layer.options.color; | |
| }} | |
| if (!layer.options.mesaBaseFillColor && layer.options.fillColor) {{ | |
| layer.options.mesaBaseFillColor = layer.options.fillColor; | |
| }} | |
| }} | |
| function rememberMarkerBaseStyle(layer) {{ | |
| if (!layer || !layer.options) return; | |
| if (!Number.isFinite(layer.options.mesaBaseMarkerOpacity)) {{ | |
| layer.options.mesaBaseMarkerOpacity = Number.isFinite(layer.options.opacity) ? layer.options.opacity : 1; | |
| }} | |
| if (!Number.isFinite(layer.options.mesaBaseRadius) && typeof layer.getRadius === 'function') {{ | |
| const radius = layer.getRadius(); | |
| if (Number.isFinite(radius) && radius > 0) {{ | |
| layer.options.mesaBaseRadius = radius; | |
| }} | |
| }} | |
| }} | |
| function applyVisualState(layer, state) {{ | |
| if (!layer) return; | |
| const isPath = typeof layer.setStyle === 'function'; | |
| if (isPath) {{ | |
| rememberPathBaseStyle(layer); | |
| const baseOpacity = Number.isFinite(layer.options.mesaBaseOpacity) ? layer.options.mesaBaseOpacity : 1; | |
| const baseFillOpacity = Number.isFinite(layer.options.mesaBaseFillOpacity) ? layer.options.mesaBaseFillOpacity : 0; | |
| const baseWeight = Number.isFinite(layer.options.mesaBaseWeight) ? layer.options.mesaBaseWeight : 1; | |
| let nextOpacity = baseOpacity; | |
| let nextFillOpacity = baseFillOpacity; | |
| let nextWeight = baseWeight; | |
| if (state === 'dim') {{ | |
| nextOpacity = Math.max(0.06, baseOpacity * 0.14); | |
| nextFillOpacity = baseFillOpacity > 0 ? Math.max(0.02, baseFillOpacity * 0.18) : baseFillOpacity; | |
| nextWeight = Math.max(1, baseWeight * 0.82); | |
| }} else if (state === 'highlight') {{ | |
| nextOpacity = Math.max(baseOpacity, 0.98); | |
| nextFillOpacity = baseFillOpacity > 0 ? Math.max(baseFillOpacity, Math.min(0.30, baseFillOpacity * 2.1)) : baseFillOpacity; | |
| nextWeight = Math.max(baseWeight + 1, baseWeight * 1.2); | |
| }} | |
| layer.setStyle({{ | |
| opacity: nextOpacity, | |
| fillOpacity: nextFillOpacity, | |
| weight: nextWeight, | |
| color: layer.options.mesaBaseColor || layer.options.color, | |
| fillColor: layer.options.mesaBaseFillColor || layer.options.fillColor || layer.options.mesaBaseColor || layer.options.color, | |
| }}); | |
| }} | |
| if (typeof layer.setRadius === 'function') {{ | |
| rememberMarkerBaseStyle(layer); | |
| const baseRadius = Number.isFinite(layer.options.mesaBaseRadius) ? layer.options.mesaBaseRadius : null; | |
| if (baseRadius !== null) {{ | |
| let nextRadius = baseRadius; | |
| if (state === 'dim') {{ | |
| nextRadius = Math.max(1.6, baseRadius * 0.88); | |
| }} else if (state === 'highlight') {{ | |
| nextRadius = Math.min(baseRadius * 1.28, baseRadius + 3.0); | |
| }} | |
| layer.setRadius(nextRadius); | |
| }} | |
| }} | |
| if (!isPath && typeof layer.setOpacity === 'function') {{ | |
| rememberMarkerBaseStyle(layer); | |
| const baseOpacity = Number.isFinite(layer.options.mesaBaseMarkerOpacity) ? layer.options.mesaBaseMarkerOpacity : 1; | |
| let nextOpacity = baseOpacity; | |
| if (state === 'dim') {{ | |
| nextOpacity = Math.max(0.18, baseOpacity * 0.28); | |
| }} else if (state === 'highlight') {{ | |
| nextOpacity = Math.max(baseOpacity, 1); | |
| }} | |
| layer.setOpacity(nextOpacity); | |
| }} | |
| }} | |
| function applyGroupState(meta, state) {{ | |
| if (!meta || !meta.layer) return; | |
| const visited = new Set(); | |
| visitLayerTree(meta.layer, visited, function(current) {{ | |
| if (current === meta.layer) return; | |
| applyVisualState(current, state); | |
| }}); | |
| }} | |
| function updateLegendState(activeId) {{ | |
| const map = window[MAP_NAME]; | |
| const container = getLayersControlContainer(map); | |
| if (!container) return; | |
| const labels = container.querySelectorAll('label.mesa-model-layer-toggle'); | |
| labels.forEach(function(label) {{ | |
| const modelId = String(label.dataset.mesaModelId || ''); | |
| const isActive = Boolean(activeId) && modelId === activeId; | |
| label.classList.toggle('mesa-model-layer-active', isActive); | |
| label.classList.toggle('mesa-model-layer-dimmed', Boolean(activeId) && !isActive); | |
| }}); | |
| }} | |
| function clearHoverState() {{ | |
| MODEL_LAYERS.forEach(function(meta) {{ | |
| applyGroupState(meta, 'normal'); | |
| }}); | |
| updateLegendState(null); | |
| }} | |
| function setHoveredModel(activeId) {{ | |
| const map = window[MAP_NAME]; | |
| if (!map) return; | |
| if (!activeId) {{ | |
| clearHoverState(); | |
| return; | |
| }} | |
| const activeMeta = MODEL_LAYERS.find(function(meta) {{ return meta.id === activeId; }}) || null; | |
| if (!activeMeta || !activeMeta.layer || !map.hasLayer(activeMeta.layer)) {{ | |
| clearHoverState(); | |
| return; | |
| }} | |
| MODEL_LAYERS.forEach(function(meta) {{ | |
| applyGroupState(meta, meta.id === activeId ? 'highlight' : 'dim'); | |
| }}); | |
| updateLegendState(activeId); | |
| }} | |
| function resolveModelLayers() {{ | |
| MODEL_LAYERS.forEach(function(meta) {{ | |
| meta.layer = window[meta.layer_var] || null; | |
| meta.layerId = meta.layer && window.L && window.L.Util ? window.L.Util.stamp(meta.layer) : null; | |
| }}); | |
| }} | |
| function bindLegendHover() {{ | |
| const map = window[MAP_NAME]; | |
| const container = getLayersControlContainer(map); | |
| if (!container) return false; | |
| const labels = Array.from(container.querySelectorAll('.leaflet-control-layers-overlays label')); | |
| if (!labels.length) return false; | |
| MODEL_LAYERS.forEach(function(meta, index) {{ | |
| const label = labels[index]; | |
| if (!label || label.dataset.mesaModelHoverBound === '1') return; | |
| label.dataset.mesaModelHoverBound = '1'; | |
| label.dataset.mesaModelId = meta.id; | |
| label.classList.add('mesa-model-layer-toggle'); | |
| const onEnter = function() {{ | |
| setHoveredModel(meta.id); | |
| }}; | |
| const onLeave = function() {{ | |
| clearHoverState(); | |
| }}; | |
| label.addEventListener('mouseenter', function() {{ | |
| onEnter(); | |
| }}); | |
| label.addEventListener('mouseleave', function() {{ | |
| onLeave(); | |
| }}); | |
| label.addEventListener('mouseover', function() {{ | |
| onEnter(); | |
| }}); | |
| label.addEventListener('mouseout', function(event) {{ | |
| const nextTarget = event && event.relatedTarget ? event.relatedTarget : null; | |
| if (nextTarget && label.contains(nextTarget)) return; | |
| onLeave(); | |
| }}); | |
| label.addEventListener('focusin', function() {{ | |
| onEnter(); | |
| }}); | |
| label.addEventListener('focusout', function() {{ | |
| onLeave(); | |
| }}); | |
| }}); | |
| return true; | |
| }} | |
| function bindWhenReady() {{ | |
| const map = window[MAP_NAME]; | |
| if (!map || !window.L || !window.L.Util) {{ | |
| window.setTimeout(bindWhenReady, 50); | |
| return; | |
| }} | |
| resolveModelLayers(); | |
| if (!bindLegendHover()) {{ | |
| window.setTimeout(bindWhenReady, 50); | |
| return; | |
| }} | |
| if (map.__mesaLegendHoverBound) return; | |
| map.__mesaLegendHoverBound = true; | |
| map.on('overlayadd', function() {{ | |
| window.setTimeout(function() {{ | |
| resolveModelLayers(); | |
| bindLegendHover(); | |
| clearHoverState(); | |
| }}, 30); | |
| }}); | |
| map.on('overlayremove', function() {{ | |
| window.setTimeout(function() {{ | |
| resolveModelLayers(); | |
| bindLegendHover(); | |
| clearHoverState(); | |
| }}, 30); | |
| }}); | |
| map.whenReady(function() {{ | |
| window.setTimeout(function() {{ | |
| resolveModelLayers(); | |
| bindLegendHover(); | |
| clearHoverState(); | |
| }}, 40); | |
| }}); | |
| }} | |
| bindWhenReady(); | |
| }})(); | |
| </script> | |
| """ | |
| mapa.get_root().html.add_child(Element(script)) | |
| def _carregar_resumo_com_cache(caminho_modelo: Path) -> dict[str, Any]: | |
| assinatura = _assinatura_arquivos(caminho_modelo) | |
| cache_key = str(caminho_modelo) | |
| with _CACHE_LOCK: | |
| cached = _CACHE.get(cache_key) | |
| if cached and cached.get("assinatura") == assinatura: | |
| return dict(cached["resumo"]) | |
| resumo = _construir_resumo_modelo(caminho_modelo) | |
| with _CACHE_LOCK: | |
| _CACHE[cache_key] = { | |
| "assinatura": assinatura, | |
| "resumo": resumo, | |
| } | |
| return dict(resumo) | |
| def _carregar_geometria_modelo_com_cache(caminho_modelo: Path) -> dict[str, Any] | None: | |
| assinatura = _assinatura_arquivos(caminho_modelo) | |
| cache_key = str(caminho_modelo) | |
| with _CACHE_LOCK: | |
| cached = _CACHE.get(cache_key) | |
| if ( | |
| cached | |
| and cached.get("assinatura") == assinatura | |
| and cached.get("geometria_cache_version") == GEOMETRIA_CACHE_VERSION | |
| and "geometria" in cached | |
| ): | |
| return cached.get("geometria") | |
| geometria = _construir_geometria_modelo(caminho_modelo) | |
| with _CACHE_LOCK: | |
| entry = _CACHE.setdefault(cache_key, {}) | |
| entry["assinatura"] = assinatura | |
| entry["geometria_cache_version"] = GEOMETRIA_CACHE_VERSION | |
| entry["geometria"] = geometria | |
| return geometria | |
| def _assinatura_arquivos(caminho_modelo: Path) -> tuple[int, int]: | |
| stat_modelo = caminho_modelo.stat() | |
| return (stat_modelo.st_mtime_ns, stat_modelo.st_size) | |
| def _construir_resumo_modelo(caminho_modelo: Path) -> dict[str, Any]: | |
| resumo = { | |
| "id": caminho_modelo.stem, | |
| "arquivo": caminho_modelo.name, | |
| "nome_modelo": caminho_modelo.stem, | |
| "autor": None, | |
| "finalidade": _inferir_finalidade_por_nome(caminho_modelo.stem), | |
| "finalidades": [], | |
| "tipo_imovel": _inferir_tipo_por_nome(caminho_modelo.stem), | |
| "bairros": [], | |
| "faixa_area": None, | |
| "faixa_rh": None, | |
| "faixa_data": None, | |
| "total_dados": None, | |
| "total_trabalhos": None, | |
| "endereco_referencia": None, | |
| "equacao": None, | |
| "observacao_modelo": None, | |
| "r2": None, | |
| "tem_app": False, | |
| "mapa_disponivel": False, | |
| "compatibilidade_campos": {chave: [] for chave in COMPATIBILIDADE_MAP}, | |
| "faixas_por_campo": {chave: None for chave in RANGE_CAMPOS}, | |
| "variaveis_resumo": [], | |
| "status": "ok", | |
| "erro_leitura": None, | |
| "fonte": { | |
| "modelo": "dai", | |
| }, | |
| "_variaveis_modelo": [], | |
| "_texto_colunas_index": {}, | |
| "_faixa_colunas_index": {}, | |
| "_faixa_variaveis_index": {}, | |
| "_caminho_modelo": str(caminho_modelo), | |
| } | |
| try: | |
| pacote = load(caminho_modelo) | |
| except Exception as exc: | |
| resumo["status"] = "erro" | |
| resumo["erro_leitura"] = f"Falha ao ler .dai: {exc}" | |
| return resumo | |
| if not isinstance(pacote, dict): | |
| resumo["status"] = "erro" | |
| resumo["erro_leitura"] = "Arquivo .dai invalido (conteudo nao e dict)." | |
| return resumo | |
| if "versao" not in pacote: | |
| try: | |
| pacote = _migrar_pacote_v1_para_v2(pacote) | |
| except Exception: | |
| pass | |
| dados_pacote = pacote.get("dados") if isinstance(pacote.get("dados"), dict) else {} | |
| estat_df = _to_dataframe(dados_pacote.get("estatisticas")) | |
| df_modelo = _extrair_df_modelo_ajustado(dados_pacote) | |
| if resumo["autor"] is None: | |
| resumo["autor"] = _autor_do_pacote(pacote) | |
| if resumo["equacao"] is None: | |
| resumo["equacao"] = _equacao_do_pacote(pacote) | |
| if resumo["observacao_modelo"] is None: | |
| resumo["observacao_modelo"] = normalizar_observacao_modelo(pacote.get("observacao_modelo")) | |
| if resumo["r2"] is None: | |
| resumo["r2"] = _r2_do_pacote(pacote) | |
| resumo["_variaveis_modelo"] = _variaveis_do_modelo(pacote) | |
| resumo["tem_app"] = _modelo_contem_variavel(resumo, APP_ALIASES) | |
| if resumo["total_dados"] is None and df_modelo is not None: | |
| resumo["total_dados"] = int(len(df_modelo)) | |
| if resumo["total_trabalhos"] is None: | |
| resumo["total_trabalhos"] = trabalhos_tecnicos_service.contar_avaliandos_por_modelo( | |
| [resumo.get("id"), resumo.get("arquivo"), resumo.get("nome_modelo")] | |
| ) | |
| if not resumo["finalidades"] and df_modelo is not None: | |
| resumo["finalidades"] = _extrair_finalidades(df_modelo) | |
| if resumo["finalidade"] is None: | |
| resumo["finalidade"] = resumo["finalidades"][0] if resumo["finalidades"] else None | |
| if not resumo["bairros"] and df_modelo is not None: | |
| resumo["bairros"] = _extrair_bairros(df_modelo) | |
| faixas_por_campo = { | |
| chave: _extrair_faixa_por_alias(estat_df, aliases) | |
| for chave, aliases in RANGE_CAMPOS.items() | |
| } | |
| resumo["faixas_por_campo"] = faixas_por_campo | |
| resumo["faixa_area"] = _merge_ranges( | |
| resumo["faixa_area"], | |
| _extrair_faixa_por_alias(estat_df, AREA_GERAL_ALIASES), | |
| ) | |
| resumo["faixa_rh"] = _merge_ranges(resumo["faixa_rh"], _extrair_faixa_por_alias(estat_df, RH_ALIASES)) | |
| resumo["faixa_data"] = _merge_ranges(resumo["faixa_data"], _faixa_data_do_pacote(pacote)) | |
| colunas_catalogo = _coletar_colunas_para_catalogo(estat_df, df_modelo) | |
| resumo["compatibilidade_campos"] = _mapear_compatibilidade(colunas_catalogo) | |
| resumo["variaveis_resumo"] = _resumo_variaveis(estat_df) | |
| resumo["mapa_disponivel"] = _tem_colunas_mapa(df_modelo) | |
| resumo["_texto_colunas_index"] = _indexar_texto_colunas(df_modelo) | |
| resumo["_faixa_colunas_index"] = {} | |
| resumo["_faixa_variaveis_index"] = _indexar_faixas_variaveis(estat_df, resumo["_variaveis_modelo"]) | |
| return resumo | |
| def _autor_do_pacote(pacote: dict[str, Any]) -> str | None: | |
| elaborador = pacote.get("elaborador") | |
| if isinstance(elaborador, dict): | |
| return _str_or_none(elaborador.get("nome_completo")) or _str_or_none(elaborador.get("nome")) | |
| return None | |
| def _equacao_do_pacote(pacote: dict[str, Any]) -> str | None: | |
| modelo = pacote.get("modelo") if isinstance(pacote.get("modelo"), dict) else {} | |
| diagnosticos = modelo.get("diagnosticos") if isinstance(modelo.get("diagnosticos"), dict) else {} | |
| return _str_or_none(diagnosticos.get("equacao")) | |
| def _r2_do_pacote(pacote: dict[str, Any]) -> float | None: | |
| modelo = pacote.get("modelo") if isinstance(pacote.get("modelo"), dict) else {} | |
| diagnosticos = modelo.get("diagnosticos") if isinstance(modelo.get("diagnosticos"), dict) else {} | |
| gerais = diagnosticos.get("gerais") if isinstance(diagnosticos.get("gerais"), dict) else {} | |
| return _to_float_or_none(gerais.get("r2")) | |
| def _faixa_data_do_pacote(pacote: dict[str, Any]) -> dict[str, Any] | None: | |
| periodo = pacote.get("periodo_dados_mercado") if isinstance(pacote.get("periodo_dados_mercado"), dict) else {} | |
| data_inicial = _data_iso_or_none(periodo.get("data_inicial")) | |
| data_final = _data_iso_or_none(periodo.get("data_final")) | |
| if data_inicial is None and data_final is None: | |
| return None | |
| if data_inicial and data_final and data_inicial > data_final: | |
| data_inicial, data_final = data_final, data_inicial | |
| return {"min": data_inicial, "max": data_final} | |
| def _data_iso_or_none(value: Any) -> str | None: | |
| if _is_empty(value): | |
| return None | |
| text = str(value).strip() | |
| if not text: | |
| return None | |
| iso_match = re.match(r"^(\d{4})-(\d{2})-(\d{2})(?:[T\s].*)?$", text) | |
| if iso_match: | |
| try: | |
| return datetime( | |
| int(iso_match.group(1)), | |
| int(iso_match.group(2)), | |
| int(iso_match.group(3)), | |
| ).date().isoformat() | |
| except Exception: | |
| return None | |
| iso_slash_match = re.match(r"^(\d{4})/(\d{2})/(\d{2})(?:[T\s].*)?$", text) | |
| if iso_slash_match: | |
| try: | |
| return datetime( | |
| int(iso_slash_match.group(1)), | |
| int(iso_slash_match.group(2)), | |
| int(iso_slash_match.group(3)), | |
| ).date().isoformat() | |
| except Exception: | |
| return None | |
| br_match = re.match(r"^(\d{2})/(\d{2})/(\d{4})(?:[T\s].*)?$", text) | |
| if br_match: | |
| try: | |
| return datetime( | |
| int(br_match.group(3)), | |
| int(br_match.group(2)), | |
| int(br_match.group(1)), | |
| ).date().isoformat() | |
| except Exception: | |
| return None | |
| parsed = pd.to_datetime(text, errors="coerce", dayfirst=True) | |
| if pd.isna(parsed): | |
| return None | |
| return parsed.date().isoformat() | |
| def _variaveis_do_modelo(pacote: dict[str, Any]) -> list[str]: | |
| variaveis: list[str] = [] | |
| modelo = pacote.get("modelo") if isinstance(pacote.get("modelo"), dict) else {} | |
| coeficientes = modelo.get("coeficientes") if isinstance(modelo, dict) else None | |
| if isinstance(coeficientes, pd.DataFrame): | |
| variaveis.extend([str(item) for item in coeficientes.index.tolist()]) | |
| elif isinstance(coeficientes, pd.Series): | |
| variaveis.extend([str(item) for item in coeficientes.index.tolist()]) | |
| elif isinstance(coeficientes, dict): | |
| variaveis.extend([str(item) for item in coeficientes.keys()]) | |
| transformacoes = pacote.get("transformacoes") if isinstance(pacote.get("transformacoes"), dict) else {} | |
| x_transformado = transformacoes.get("X") | |
| y_transformado = transformacoes.get("y") | |
| if isinstance(x_transformado, pd.DataFrame): | |
| variaveis.extend([str(item) for item in x_transformado.columns.tolist()]) | |
| if isinstance(y_transformado, pd.Series): | |
| nome_y = _str_or_none(y_transformado.name) | |
| if nome_y: | |
| variaveis.append(nome_y) | |
| equacao = _equacao_do_pacote(pacote) | |
| if isinstance(equacao, str) and "=" in equacao: | |
| variaveis.append(equacao.split("=", 1)[0].strip()) | |
| limpas = [item for item in variaveis if _str_or_none(item) and _normalize(item) != "const"] | |
| return _dedupe_strings(limpas) | |
| def _modelo_contem_variavel(modelo: dict[str, Any], aliases: list[str]) -> bool: | |
| variaveis = list(modelo.get("_variaveis_modelo") or []) | |
| indice_faixa = modelo.get("_faixa_variaveis_index") or {} | |
| if isinstance(indice_faixa, dict): | |
| variaveis.extend([str(chave) for chave in indice_faixa.keys()]) | |
| return any(_has_alias_exato(str(variavel), aliases) for variavel in variaveis) | |
| def _to_dataframe(value: Any) -> pd.DataFrame | None: | |
| if value is None: | |
| return None | |
| if isinstance(value, pd.DataFrame): | |
| return value.copy() | |
| if isinstance(value, list): | |
| try: | |
| return pd.DataFrame(value) | |
| except Exception: | |
| return None | |
| if isinstance(value, dict): | |
| try: | |
| return pd.DataFrame(value) | |
| except Exception: | |
| return None | |
| return None | |
| def _listar_outliers_excluidos(dados_pacote: dict[str, Any]) -> list[int]: | |
| valores = dados_pacote.get("outliers_excluidos") | |
| if not isinstance(valores, (list, tuple, set)): | |
| return [] | |
| out: list[int] = [] | |
| for valor in valores: | |
| try: | |
| out.append(int(valor)) | |
| except Exception: | |
| continue | |
| return sorted(set(out)) | |
| def _extrair_df_modelo_ajustado(dados_pacote: dict[str, Any]) -> pd.DataFrame | None: | |
| # Prioriza sempre a base ajustada efetivamente usada no modelo. | |
| df_modelo = _to_dataframe(dados_pacote.get("df")) | |
| if df_modelo is not None: | |
| return df_modelo | |
| df_completo = _to_dataframe(dados_pacote.get("df_completo")) | |
| if df_completo is None: | |
| return None | |
| outliers_excluidos = _listar_outliers_excluidos(dados_pacote) | |
| if not outliers_excluidos: | |
| return df_completo | |
| return df_completo.drop(index=outliers_excluidos, errors="ignore") | |
| def _carregar_dataframe_modelo(caminho_modelo: Path) -> pd.DataFrame | None: | |
| try: | |
| pacote = load(caminho_modelo) | |
| except Exception: | |
| return None | |
| if not isinstance(pacote, dict): | |
| return None | |
| if "versao" not in pacote: | |
| try: | |
| pacote = _migrar_pacote_v1_para_v2(pacote) | |
| except Exception: | |
| return None | |
| dados = pacote.get("dados") if isinstance(pacote.get("dados"), dict) else {} | |
| return _extrair_df_modelo_ajustado(dados) | |
| def _coletar_pontos_modelo(df_modelo: pd.DataFrame, limite_pontos: int) -> list[dict[str, Any]]: | |
| if df_modelo is None or df_modelo.empty: | |
| return [] | |
| col_lat = _identificar_coluna_por_alias(df_modelo.columns, LAT_ALIASES) | |
| col_lon = _identificar_coluna_por_alias(df_modelo.columns, LON_ALIASES) | |
| if col_lat is None or col_lon is None: | |
| return [] | |
| base = df_modelo[[col_lat, col_lon]].copy() | |
| base[col_lat] = pd.to_numeric(base[col_lat], errors="coerce") | |
| base[col_lon] = pd.to_numeric(base[col_lon], errors="coerce") | |
| base = base.dropna(subset=[col_lat, col_lon]) | |
| if base.empty: | |
| return [] | |
| base = base[(base[col_lat].between(-90, 90)) & (base[col_lon].between(-180, 180))] | |
| if base.empty: | |
| return [] | |
| if limite_pontos > 0 and len(base) > limite_pontos: | |
| passo = max(1, math.ceil(len(base) / limite_pontos)) | |
| base = base.iloc[::passo].head(limite_pontos) | |
| lat_vals = base[col_lat].to_numpy(dtype=float, copy=False) | |
| lon_vals = base[col_lon].to_numpy(dtype=float, copy=False) | |
| indices = base.index.to_list() | |
| return [ | |
| {"lat": float(lat), "lon": float(lon), "indice": _formatar_indice_mapa(indice)} | |
| for lat, lon, indice in zip(lat_vals, lon_vals, indices) | |
| ] | |
| def _construir_geometria_modelo(caminho_modelo: Path) -> dict[str, Any] | None: | |
| df_modelo = _carregar_dataframe_modelo(caminho_modelo) | |
| pontos = _coletar_pontos_modelo(df_modelo, 0) | |
| if not pontos: | |
| return None | |
| try: | |
| from shapely.geometry import MultiPoint | |
| except ImportError: | |
| return None | |
| coordenadas = np.asarray([(float(item["lon"]), float(item["lat"])) for item in pontos], dtype=float) | |
| if coordenadas.size == 0: | |
| return None | |
| try: | |
| pontos_unicos = np.unique(coordenadas, axis=0) | |
| hull_convexo = MultiPoint([tuple(item) for item in pontos_unicos]).convex_hull | |
| except Exception: | |
| return None | |
| if hull_convexo is None or getattr(hull_convexo, "is_empty", False): | |
| return None | |
| hull = _construir_cobertura_concava_wgs84(pontos_unicos) | |
| if hull is None: | |
| hull = hull_convexo | |
| geom_metric = _transformar_geometria_para_metrico(hull) | |
| return { | |
| "geom_wgs84": hull, | |
| "geom_metric": geom_metric, | |
| "geom_type": str(getattr(hull, "geom_type", "") or ""), | |
| "total_pontos": len(pontos), | |
| } | |
| def _formatar_indice_mapa(indice: Any) -> str: | |
| if isinstance(indice, (int, np.integer)): | |
| return str(int(indice)) | |
| if isinstance(indice, (float, np.floating)) and np.isfinite(indice) and float(indice).is_integer(): | |
| return str(int(indice)) | |
| return str(indice) | |
| def _identificar_coluna_por_alias(colunas: Any, aliases: list[str]) -> str | None: | |
| for coluna in colunas: | |
| nome = str(coluna) | |
| if _has_alias(nome, aliases): | |
| return nome | |
| return None | |
| def _normalizar_coordenadas_avaliando(lat_raw: Any, lon_raw: Any) -> tuple[float | None, float | None]: | |
| lat = _to_float_or_none(lat_raw) | |
| lon = _to_float_or_none(lon_raw) | |
| if lat is None or lon is None: | |
| return None, None | |
| if not (-90.0 <= lat <= 90.0) or not (-180.0 <= lon <= 180.0): | |
| return None, None | |
| return float(lat), float(lon) | |
| def _normalizar_avaliandos_geo(raw: Any) -> list[dict[str, Any]]: | |
| saida: list[dict[str, Any]] = [] | |
| for item in raw or []: | |
| if not isinstance(item, dict): | |
| continue | |
| lat, lon = _normalizar_coordenadas_avaliando(item.get("lat"), item.get("lon")) | |
| if lat is None or lon is None: | |
| continue | |
| indice = len(saida) + 1 | |
| label = str(item.get("label") or f"A{indice}").strip() or f"A{indice}" | |
| avaliando_id = str(item.get("id") or label or f"avaliando-{indice}").strip() or f"avaliando-{indice}" | |
| logradouro = str(item.get("logradouro") or item.get("endereco") or "").strip() or None | |
| numero_usado_raw = item.get("numero_usado") | |
| if numero_usado_raw is None: | |
| numero_usado_raw = item.get("numero") | |
| numero_usado = None if _is_empty(numero_usado_raw) else str(numero_usado_raw).strip() or None | |
| origem = str(item.get("origem") or "").strip() or None | |
| cdlog = item.get("cdlog") | |
| saida.append( | |
| { | |
| "id": avaliando_id, | |
| "label": label, | |
| "lat": lat, | |
| "lon": lon, | |
| "logradouro": logradouro, | |
| "numero_usado": numero_usado, | |
| "cdlog": cdlog, | |
| "origem": origem, | |
| } | |
| ) | |
| return saida | |
| def _normalizar_criterio_espacial(value: Any) -> str: | |
| criterio = str(value or "").strip().lower() | |
| if criterio == CRITERIO_ESPACIAL_MENOR_DISTANCIA: | |
| return CRITERIO_ESPACIAL_MENOR_DISTANCIA | |
| if criterio == CRITERIO_ESPACIAL_MEDIA_DISTANCIA: | |
| return CRITERIO_ESPACIAL_MEDIA_DISTANCIA | |
| return CRITERIO_ESPACIAL_PADRAO | |
| def _calcular_distancias_avaliandos( | |
| geometria: dict[str, Any] | None, | |
| avaliandos_geo: list[dict[str, Any]], | |
| criterio_espacial: str = CRITERIO_ESPACIAL_PADRAO, | |
| ) -> tuple[list[dict[str, Any]], dict[str, Any]]: | |
| criterio_norm = _normalizar_criterio_espacial(criterio_espacial) | |
| distancias: list[dict[str, Any]] = [] | |
| for item in avaliandos_geo or []: | |
| distancia = _calcular_distancia_geometria_cache(geometria, item.get("lat"), item.get("lon")) | |
| distancias.append( | |
| { | |
| "id": item.get("id"), | |
| "label": item.get("label"), | |
| "lat": item.get("lat"), | |
| "lon": item.get("lon"), | |
| "logradouro": item.get("logradouro"), | |
| "numero_usado": item.get("numero_usado"), | |
| "cdlog": item.get("cdlog"), | |
| "origem": item.get("origem"), | |
| "distancia_km": distancia.get("distancia_km"), | |
| "distancia_label": distancia.get("distancia_label"), | |
| "avaliando_dentro_cobertura": distancia.get("avaliando_dentro_cobertura"), | |
| } | |
| ) | |
| return distancias, _resumir_distancias_avaliandos(distancias, criterio_norm) | |
| def _resumir_distancias_avaliandos( | |
| distancias: list[dict[str, Any]], | |
| criterio_espacial: str = CRITERIO_ESPACIAL_PADRAO, | |
| ) -> dict[str, Any]: | |
| criterio_norm = _normalizar_criterio_espacial(criterio_espacial) | |
| valores_validos = [ | |
| float(item["distancia_km"]) | |
| for item in (distancias or []) | |
| if _to_float_or_none(item.get("distancia_km")) is not None | |
| ] | |
| menor = min(valores_validos) if valores_validos else None | |
| maior = max(valores_validos) if valores_validos else None | |
| media = (sum(valores_validos) / len(valores_validos)) if valores_validos else None | |
| total_dentro = sum(1 for item in (distancias or []) if item.get("avaliando_dentro_cobertura") is True) | |
| principal_km = { | |
| CRITERIO_ESPACIAL_MENOR_DISTANCIA: menor, | |
| CRITERIO_ESPACIAL_MEDIA_DISTANCIA: media, | |
| CRITERIO_ESPACIAL_MAIOR_DISTANCIA: maior, | |
| }.get(criterio_norm, maior) | |
| return { | |
| "criterio_principal": criterio_norm, | |
| "principal_distancia_km": principal_km, | |
| "principal_distancia_label": _formatar_distancia_km(principal_km), | |
| "menor_distancia_km": menor, | |
| "menor_distancia_label": _formatar_distancia_km(menor), | |
| "media_distancia_km": media, | |
| "media_distancia_label": _formatar_distancia_km(media), | |
| "maior_distancia_km": maior, | |
| "maior_distancia_label": _formatar_distancia_km(maior), | |
| "total_avaliandos": len(distancias or []), | |
| "total_com_distancia": len(valores_validos), | |
| "total_sem_distancia": max(0, len(distancias or []) - len(valores_validos)), | |
| "total_dentro_cobertura": total_dentro, | |
| } | |
| def _anexar_distancias_modelo_multiplos( | |
| modelo: dict[str, Any], | |
| avaliandos_geo: list[dict[str, Any]], | |
| criterio_espacial: str = CRITERIO_ESPACIAL_PADRAO, | |
| ) -> dict[str, Any]: | |
| item = dict(modelo) | |
| caminho_texto = str(item.get("_caminho_modelo") or "").strip() | |
| if not caminho_texto: | |
| item["distancias_avaliandos"] = [] | |
| item["distancia_resumo"] = _resumir_distancias_avaliandos([], criterio_espacial) | |
| item["distancia_km"] = None | |
| item["distancia_label"] = "-" | |
| item["avaliando_dentro_cobertura"] = None | |
| return item | |
| geometria = _carregar_geometria_modelo_com_cache(Path(caminho_texto)) | |
| distancias, resumo = _calcular_distancias_avaliandos(geometria, avaliandos_geo, criterio_espacial) | |
| item["distancias_avaliandos"] = distancias | |
| item["distancia_resumo"] = resumo | |
| item["distancia_km"] = resumo.get("principal_distancia_km") | |
| item["distancia_label"] = resumo.get("principal_distancia_label") | |
| item["avaliando_dentro_cobertura"] = None | |
| return item | |
| def _anexar_distancia_modelo(modelo: dict[str, Any], aval_lat: float, aval_lon: float) -> dict[str, Any]: | |
| item = dict(modelo) | |
| caminho_texto = str(item.get("_caminho_modelo") or "").strip() | |
| if not caminho_texto: | |
| item["distancia_km"] = None | |
| item["distancia_label"] = "-" | |
| item["avaliando_dentro_cobertura"] = None | |
| return item | |
| geometria = _carregar_geometria_modelo_com_cache(Path(caminho_texto)) | |
| distancia = _calcular_distancia_geometria_cache(geometria, aval_lat, aval_lon) | |
| item["distancia_km"] = distancia.get("distancia_km") | |
| item["distancia_label"] = distancia.get("distancia_label") | |
| item["avaliando_dentro_cobertura"] = distancia.get("avaliando_dentro_cobertura") | |
| return item | |
| def _calcular_distancia_geometria_cache( | |
| geometria: dict[str, Any] | None, | |
| aval_lat: float | None, | |
| aval_lon: float | None, | |
| ) -> dict[str, Any]: | |
| if geometria is None or aval_lat is None or aval_lon is None: | |
| return {"distancia_km": None, "distancia_label": "-", "avaliando_dentro_cobertura": None} | |
| geom_metric = geometria.get("geom_metric") | |
| if geom_metric is None: | |
| return {"distancia_km": None, "distancia_label": "-", "avaliando_dentro_cobertura": None} | |
| ponto_metric = _criar_ponto_metrico(aval_lat, aval_lon) | |
| if ponto_metric is None: | |
| return {"distancia_km": None, "distancia_label": "-", "avaliando_dentro_cobertura": None} | |
| try: | |
| distancia_m = float(ponto_metric.distance(geom_metric)) | |
| except Exception: | |
| return {"distancia_km": None, "distancia_label": "-", "avaliando_dentro_cobertura": None} | |
| if not np.isfinite(distancia_m): | |
| return {"distancia_km": None, "distancia_label": "-", "avaliando_dentro_cobertura": None} | |
| distancia_km = max(0.0, distancia_m / 1000.0) | |
| return { | |
| "distancia_km": float(distancia_km), | |
| "distancia_label": _formatar_distancia_km(distancia_km), | |
| "avaliando_dentro_cobertura": bool(distancia_m <= 1e-6), | |
| } | |
| def _formatar_distancia_km(distancia_km: float | None) -> str: | |
| if distancia_km is None or not np.isfinite(distancia_km): | |
| return "-" | |
| return f"{float(distancia_km):,.2f} km".replace(",", "X").replace(".", ",").replace("X", ".") | |
| def _transformar_geometria_para_metrico(geom: Any) -> Any: | |
| if geom is None: | |
| return None | |
| try: | |
| from pyproj import Transformer | |
| from shapely.ops import transform | |
| except ImportError: | |
| return None | |
| try: | |
| transformer = Transformer.from_crs("EPSG:4326", DISTANCIA_METRIC_CRS, always_xy=True) | |
| return transform(transformer.transform, geom) | |
| except Exception: | |
| return None | |
| def _transformar_geometria_para_wgs84(geom: Any) -> Any: | |
| if geom is None: | |
| return None | |
| try: | |
| from pyproj import Transformer | |
| from shapely.ops import transform | |
| except ImportError: | |
| return None | |
| try: | |
| transformer = Transformer.from_crs(DISTANCIA_METRIC_CRS, "EPSG:4326", always_xy=True) | |
| return transform(transformer.transform, geom) | |
| except Exception: | |
| return None | |
| def _coordenadas_wgs84_para_metricas(coordenadas: np.ndarray) -> np.ndarray | None: | |
| arr = np.asarray(coordenadas, dtype=float) | |
| if arr.ndim != 2 or arr.shape[1] != 2 or arr.size == 0: | |
| return None | |
| try: | |
| from pyproj import Transformer | |
| except ImportError: | |
| return None | |
| try: | |
| transformer = Transformer.from_crs("EPSG:4326", DISTANCIA_METRIC_CRS, always_xy=True) | |
| xs, ys = transformer.transform(arr[:, 0], arr[:, 1]) | |
| except Exception: | |
| return None | |
| metricas = np.column_stack([xs, ys]).astype(float, copy=False) | |
| metricas = metricas[np.isfinite(metricas).all(axis=1)] | |
| if metricas.size == 0: | |
| return None | |
| return metricas | |
| def _distancia_tipica_pontos_metricos(coordenadas_metricas: np.ndarray) -> float: | |
| pontos = np.asarray(coordenadas_metricas, dtype=float) | |
| if pontos.ndim != 2 or pontos.shape[0] < 2: | |
| return 1.0 | |
| try: | |
| from scipy.spatial import cKDTree | |
| tree = cKDTree(pontos) | |
| distancias, _ = tree.query(pontos, k=min(2, len(pontos))) | |
| if getattr(distancias, "ndim", 0) == 1: | |
| vizinhos = distancias[distancias > 1e-9] | |
| else: | |
| vizinhos = distancias[:, 1] | |
| except Exception: | |
| delta = pontos[:, None, :] - pontos[None, :, :] | |
| matriz = np.sqrt(np.sum(delta * delta, axis=2)) | |
| matriz[matriz <= 1e-9] = np.inf | |
| vizinhos = np.min(matriz, axis=1) | |
| vizinhos = np.asarray(vizinhos, dtype=float) | |
| vizinhos = vizinhos[np.isfinite(vizinhos) & (vizinhos > 1e-9)] | |
| if vizinhos.size == 0: | |
| return 1.0 | |
| return max(float(np.median(vizinhos)), 1.0) | |
| def _normalizar_geometria_cobertura(geom: Any) -> Any: | |
| if geom is None or getattr(geom, "is_empty", False): | |
| return None | |
| try: | |
| from shapely.geometry import MultiPolygon, Polygon | |
| from shapely.ops import unary_union | |
| except ImportError: | |
| return None | |
| atual = geom | |
| try: | |
| if getattr(atual, "geom_type", "") == "GeometryCollection": | |
| partes = [ | |
| parte | |
| for parte in getattr(atual, "geoms", []) | |
| if getattr(parte, "geom_type", "") in {"Polygon", "MultiPolygon"} and not getattr(parte, "is_empty", False) | |
| ] | |
| if not partes: | |
| return None | |
| atual = unary_union(partes) | |
| if getattr(atual, "geom_type", "") not in {"Polygon", "MultiPolygon"}: | |
| return None | |
| if not bool(getattr(atual, "is_valid", True)): | |
| atual = atual.buffer(0) | |
| if atual is None or getattr(atual, "is_empty", False): | |
| return None | |
| if getattr(atual, "geom_type", "") == "Polygon": | |
| return Polygon(atual.exterior) | |
| if getattr(atual, "geom_type", "") == "MultiPolygon": | |
| partes = [Polygon(parte.exterior) for parte in atual.geoms if not getattr(parte, "is_empty", False)] | |
| if not partes: | |
| return None | |
| unido = unary_union(partes) | |
| if getattr(unido, "geom_type", "") in {"Polygon", "MultiPolygon"} and not getattr(unido, "is_empty", False): | |
| return unido | |
| except Exception: | |
| return None | |
| return None | |
| def _geometria_cobre_todos_os_pontos(geom_metric: Any, pontos_metricos: Any, tolerancia_m: float) -> bool: | |
| if geom_metric is None or pontos_metricos is None: | |
| return False | |
| if getattr(geom_metric, "geom_type", "") not in {"Polygon", "MultiPolygon"}: | |
| return False | |
| try: | |
| tolerancia = max(float(tolerancia_m), 0.5) | |
| return bool(geom_metric.buffer(tolerancia).covers(pontos_metricos)) | |
| except Exception: | |
| return False | |
| def _alpha_shape_metrico(coordenadas_metricas: np.ndarray, max_circumradius_m: float) -> Any: | |
| pontos = np.asarray(coordenadas_metricas, dtype=float) | |
| if pontos.ndim != 2 or pontos.shape[0] < 4: | |
| return None | |
| try: | |
| from scipy.spatial import Delaunay, QhullError | |
| from shapely.geometry import MultiLineString | |
| from shapely.ops import polygonize, unary_union | |
| except ImportError: | |
| return None | |
| try: | |
| triangulacao = Delaunay(pontos) | |
| except QhullError: | |
| return None | |
| except Exception: | |
| return None | |
| max_radius = float(max_circumradius_m) | |
| if not np.isfinite(max_radius) or max_radius <= 0: | |
| return None | |
| arestas: set[tuple[int, int]] = set() | |
| segmentos: list[tuple[tuple[float, float], tuple[float, float]]] = [] | |
| def registrar_aresta(idx_a: int, idx_b: int) -> None: | |
| chave = tuple(sorted((int(idx_a), int(idx_b)))) | |
| if chave in arestas: | |
| return | |
| arestas.add(chave) | |
| pa = pontos[int(idx_a)] | |
| pb = pontos[int(idx_b)] | |
| segmentos.append(((float(pa[0]), float(pa[1])), (float(pb[0]), float(pb[1])))) | |
| for simplex in triangulacao.simplices: | |
| ia, ib, ic = [int(valor) for valor in simplex] | |
| pa, pb, pc = pontos[[ia, ib, ic]] | |
| a = float(np.linalg.norm(pb - pc)) | |
| b = float(np.linalg.norm(pa - pc)) | |
| c = float(np.linalg.norm(pa - pb)) | |
| s = (a + b + c) / 2.0 | |
| area_sq = s * (s - a) * (s - b) * (s - c) | |
| if area_sq <= 1e-12: | |
| continue | |
| area = math.sqrt(area_sq) | |
| circumradius = (a * b * c) / max(4.0 * area, 1e-12) | |
| if not np.isfinite(circumradius) or circumradius > max_radius: | |
| continue | |
| registrar_aresta(ia, ib) | |
| registrar_aresta(ib, ic) | |
| registrar_aresta(ic, ia) | |
| if not segmentos: | |
| return None | |
| try: | |
| linhas = MultiLineString(segmentos) | |
| poligonos = list(polygonize(linhas)) | |
| if not poligonos: | |
| return None | |
| return unary_union(poligonos) | |
| except Exception: | |
| return None | |
| def _construir_cobertura_concava_wgs84(coordenadas_wgs84: np.ndarray) -> Any: | |
| try: | |
| from shapely.geometry import MultiPoint | |
| except ImportError: | |
| return None | |
| pontos_wgs84 = np.asarray(coordenadas_wgs84, dtype=float) | |
| if pontos_wgs84.ndim != 2 or pontos_wgs84.shape[0] < 4: | |
| return None | |
| pontos_metricos_arr = _coordenadas_wgs84_para_metricas(pontos_wgs84) | |
| if pontos_metricos_arr is None or len(pontos_metricos_arr) != len(pontos_wgs84): | |
| return None | |
| pontos_metricos = MultiPoint([tuple(item) for item in pontos_metricos_arr]) | |
| passo_tipico = _distancia_tipica_pontos_metricos(pontos_metricos_arr) | |
| tolerancia = max(1.0, passo_tipico * 0.08) | |
| try: | |
| from shapely import concave_hull as shapely_concave_hull | |
| except Exception: | |
| shapely_concave_hull = None | |
| if shapely_concave_hull is not None: | |
| for ratio in (0.12, 0.18, 0.26, 0.36, 0.5, 0.72): | |
| try: | |
| candidato_metric = shapely_concave_hull(pontos_metricos, ratio=float(ratio), allow_holes=False) | |
| except Exception: | |
| continue | |
| candidato_metric = _normalizar_geometria_cobertura(candidato_metric) | |
| if not _geometria_cobre_todos_os_pontos(candidato_metric, pontos_metricos, tolerancia): | |
| continue | |
| candidato_wgs84 = _transformar_geometria_para_wgs84(candidato_metric) | |
| candidato_wgs84 = _normalizar_geometria_cobertura(candidato_wgs84) | |
| if candidato_wgs84 is not None: | |
| return candidato_wgs84 | |
| for multiplicador in (2.2, 3.0, 4.0, 5.5, 7.5, 10.0, 13.5): | |
| candidato_metric = _alpha_shape_metrico(pontos_metricos_arr, passo_tipico * float(multiplicador)) | |
| candidato_metric = _normalizar_geometria_cobertura(candidato_metric) | |
| if not _geometria_cobre_todos_os_pontos(candidato_metric, pontos_metricos, tolerancia): | |
| continue | |
| candidato_wgs84 = _transformar_geometria_para_wgs84(candidato_metric) | |
| candidato_wgs84 = _normalizar_geometria_cobertura(candidato_wgs84) | |
| if candidato_wgs84 is not None: | |
| return candidato_wgs84 | |
| return None | |
| def _criar_ponto_metrico(lat: float, lon: float) -> Any: | |
| try: | |
| from shapely.geometry import Point | |
| except ImportError: | |
| return None | |
| ponto = Point(float(lon), float(lat)) | |
| return _transformar_geometria_para_metrico(ponto) | |
| def _adicionar_geometria_modelo_no_mapa( | |
| camada_poligonos: folium.FeatureGroup, | |
| camada_distancias: folium.FeatureGroup, | |
| modelo: dict[str, Any], | |
| aval_lat: float | None, | |
| aval_lon: float | None, | |
| ) -> None: | |
| geometria = modelo.get("geometria") or {} | |
| geom_wgs84 = geometria.get("geom_wgs84") | |
| if geom_wgs84 is None: | |
| return | |
| tooltip = str(modelo.get("nome") or "").strip() or "Modelo" | |
| tooltip_distancia = _montar_tooltip_distancia_modelo(modelo) | |
| if tooltip_distancia: | |
| tooltip = f"{tooltip} • {tooltip_distancia}" | |
| geom_type = str(geometria.get("geom_type") or "") | |
| cor = str(modelo.get("cor") or "#1f77b4") | |
| try: | |
| if geom_type == "Polygon": | |
| coords = [[float(lat), float(lon)] for lon, lat in list(geom_wgs84.exterior.coords)] | |
| folium.Polygon( | |
| locations=coords, | |
| color=cor, | |
| fill=True, | |
| fill_color=cor, | |
| fill_opacity=0.12, | |
| weight=2, | |
| tooltip=tooltip, | |
| ).add_to(camada_poligonos) | |
| elif geom_type == "MultiPolygon": | |
| for poligono in list(getattr(geom_wgs84, "geoms", [])): | |
| coords = [[float(lat), float(lon)] for lon, lat in list(poligono.exterior.coords)] | |
| folium.Polygon( | |
| locations=coords, | |
| color=cor, | |
| fill=True, | |
| fill_color=cor, | |
| fill_opacity=0.12, | |
| weight=2, | |
| tooltip=tooltip, | |
| ).add_to(camada_poligonos) | |
| elif geom_type == "LineString": | |
| coords = [[float(lat), float(lon)] for lon, lat in list(geom_wgs84.coords)] | |
| folium.PolyLine(locations=coords, color=cor, weight=3, opacity=0.8, tooltip=tooltip).add_to(camada_poligonos) | |
| elif geom_type == "MultiLineString": | |
| for linha in list(getattr(geom_wgs84, "geoms", [])): | |
| coords = [[float(lat), float(lon)] for lon, lat in list(linha.coords)] | |
| folium.PolyLine(locations=coords, color=cor, weight=3, opacity=0.8, tooltip=tooltip).add_to(camada_poligonos) | |
| elif geom_type == "Point": | |
| folium.CircleMarker( | |
| location=[float(geom_wgs84.y), float(geom_wgs84.x)], | |
| radius=6, | |
| color=cor, | |
| fill=True, | |
| fill_color=cor, | |
| fill_opacity=0.7, | |
| tooltip=tooltip, | |
| ).add_to(camada_poligonos) | |
| elif geom_type == "MultiPoint": | |
| for ponto in list(getattr(geom_wgs84, "geoms", [])): | |
| folium.CircleMarker( | |
| location=[float(ponto.y), float(ponto.x)], | |
| radius=6, | |
| color=cor, | |
| fill=True, | |
| fill_color=cor, | |
| fill_opacity=0.7, | |
| tooltip=tooltip, | |
| ).add_to(camada_poligonos) | |
| except Exception: | |
| return | |
| if aval_lat is None or aval_lon is None: | |
| return | |
| try: | |
| from shapely.geometry import Point | |
| from shapely.ops import nearest_points | |
| except ImportError: | |
| return | |
| try: | |
| aval_point = Point(float(aval_lon), float(aval_lat)) | |
| _, nearest_geom = nearest_points(aval_point, geom_wgs84) | |
| distancia_km = _to_float_or_none(modelo.get("distancia_km")) | |
| if nearest_geom is None or distancia_km is None or distancia_km <= 0: | |
| return | |
| folium.PolyLine( | |
| locations=[ | |
| [float(aval_point.y), float(aval_point.x)], | |
| [float(nearest_geom.y), float(nearest_geom.x)], | |
| ], | |
| color=cor, | |
| weight=2, | |
| opacity=0.65, | |
| dash_array="6,6", | |
| tooltip=f'Ligacao de distancia • {tooltip}', | |
| ).add_to(camada_distancias) | |
| except Exception: | |
| return | |
| def _extrair_bounds_geometria_wgs84(geometria: dict[str, Any] | None) -> list[list[float]]: | |
| if geometria is None or geometria.get("geom_wgs84") is None: | |
| return [] | |
| try: | |
| min_x, min_y, max_x, max_y = geometria["geom_wgs84"].bounds | |
| except Exception: | |
| return [] | |
| return [[float(min_y), float(min_x)], [float(max_y), float(max_x)]] | |
| def _resolver_via_por_cdlog(cdlog: int) -> dict[str, Any]: | |
| catalogo = _carregar_catalogo_vias() | |
| for item in catalogo: | |
| if int(item["cdlog"]) == int(cdlog): | |
| return item | |
| raise HTTPException(status_code=404, detail="CDLOG informado nao foi encontrado nos eixos") | |
| def _resolver_via_por_logradouro(logradouro: str | None) -> dict[str, Any]: | |
| texto = str(logradouro or "").strip() | |
| if not texto: | |
| raise HTTPException(status_code=400, detail="Informe o logradouro para localizar o avaliando") | |
| consulta = _normalize(texto) | |
| catalogo = _carregar_catalogo_vias() | |
| melhores: list[tuple[int, dict[str, Any]]] = [] | |
| for item in catalogo: | |
| score = _score_logradouro(consulta, item) | |
| if score <= 0: | |
| continue | |
| melhores.append((score, item)) | |
| if not melhores: | |
| raise HTTPException(status_code=404, detail="Nenhum logradouro compativel foi encontrado nos eixos") | |
| melhores.sort(key=lambda entry: (-entry[0], str(entry[1].get("logradouro") or "").lower(), int(entry[1].get("cdlog") or 0))) | |
| melhor_score, melhor_item = melhores[0] | |
| if melhor_score < 60: | |
| sugestoes = ", ".join(str(item.get("logradouro") or "") for _, item in melhores[:5]) | |
| detalhe = "Nao foi possivel identificar com seguranca o logradouro informado." | |
| if sugestoes: | |
| detalhe += f" Sugestoes: {sugestoes}." | |
| raise HTTPException(status_code=400, detail=detalhe) | |
| return melhor_item | |
| def _carregar_catalogo_vias() -> list[dict[str, Any]]: | |
| global _CATALOGO_VIAS_CACHE | |
| if _CATALOGO_VIAS_CACHE is not None: | |
| return list(_CATALOGO_VIAS_CACHE) | |
| registros: list[dict[str, Any]] = [] | |
| try: | |
| registros = load_attribute_records( | |
| resolve_core_path("dados", "EixosLogradouros.shp"), | |
| property_fields=("CDLOG", "NMIDELOG", "NMIDEPRE", "NMIDEABR", "CDIDECAT"), | |
| ) | |
| append_runtime_log(f"[mesa] pesquisa: catalogo leve de logradouros carregado com {len(registros)} registros") | |
| except Exception as exc: | |
| append_runtime_log(f"[mesa] pesquisa: falha no catalogo leve de logradouros: {exc}") | |
| def _catalogo_from_rows(rows: list[dict[str, Any]]) -> list[dict[str, Any]]: | |
| vistos: set[int] = set() | |
| catalogo_local: list[dict[str, Any]] = [] | |
| for row in rows: | |
| cdlog = _to_int_or_none(row.get("CDLOG")) | |
| if cdlog is None or cdlog in vistos: | |
| continue | |
| vistos.add(cdlog) | |
| nome = str(row.get("NMIDELOG") or "").strip() | |
| prefixo = str(row.get("NMIDEPRE") or "").strip() | |
| abreviado = str(row.get("NMIDEABR") or "").strip() | |
| categoria = str(row.get("CDIDECAT") or "").strip() | |
| logradouro, aliases, display_label = _montar_logradouro_catalogo(nome, prefixo, abreviado, categoria) | |
| catalogo_local.append( | |
| { | |
| "cdlog": cdlog, | |
| "logradouro": logradouro, | |
| "display_label": display_label, | |
| "_aliases_norm": [_normalize(item) for item in aliases if _normalize(item)], | |
| } | |
| ) | |
| return catalogo_local | |
| if registros: | |
| catalogo = _catalogo_from_rows(registros) | |
| _CATALOGO_VIAS_CACHE = list(catalogo) | |
| append_runtime_log(f"[mesa] pesquisa: catalogo de logradouros carregado com {len(catalogo)} vias") | |
| return list(catalogo) | |
| gdf = geocodificacao.carregar_eixos() | |
| if gdf is None or gdf.empty: | |
| append_runtime_log("[mesa] pesquisa: base de eixos indisponivel para catalogo de logradouros") | |
| raise HTTPException(status_code=500, detail="Base de eixos indisponivel para localizar o avaliando") | |
| cols = {str(col).upper(): col for col in gdf.columns} | |
| cdlog_col = cols.get("CDLOG") | |
| nome_col = cols.get("NMIDELOG") | |
| prefixo_col = cols.get("NMIDEPRE") | |
| abreviado_col = cols.get("NMIDEABR") | |
| categoria_col = cols.get("CDIDECAT") | |
| if cdlog_col is None or nome_col is None: | |
| append_runtime_log("[mesa] pesquisa: colunas obrigatorias dos eixos nao foram encontradas") | |
| raise HTTPException(status_code=500, detail="Base de eixos sem colunas necessarias para localizar o avaliando") | |
| vistos: set[int] = set() | |
| catalogo: list[dict[str, Any]] = [] | |
| for _, row in gdf.iterrows(): | |
| cdlog = _to_int_or_none(row.get(cdlog_col)) | |
| if cdlog is None or cdlog in vistos: | |
| continue | |
| vistos.add(cdlog) | |
| nome = str(row.get(nome_col) or "").strip() | |
| prefixo = str(row.get(prefixo_col) or "").strip() if prefixo_col else "" | |
| abreviado = str(row.get(abreviado_col) or "").strip() if abreviado_col else "" | |
| categoria = str(row.get(categoria_col) or "").strip() if categoria_col else "" | |
| logradouro, aliases, display_label = _montar_logradouro_catalogo(nome, prefixo, abreviado, categoria) | |
| catalogo.append( | |
| { | |
| "cdlog": cdlog, | |
| "logradouro": logradouro, | |
| "display_label": display_label, | |
| "_aliases_norm": [_normalize(item) for item in aliases if _normalize(item)], | |
| } | |
| ) | |
| _CATALOGO_VIAS_CACHE = list(catalogo) | |
| append_runtime_log(f"[mesa] pesquisa: catalogo de logradouros carregado com {len(catalogo)} vias") | |
| return list(catalogo) | |
| def _score_logradouro(consulta: str, item: dict[str, Any]) -> int: | |
| if not consulta: | |
| return 0 | |
| aliases = [str(alias or "").strip() for alias in item.get("_aliases_norm") or [] if str(alias or "").strip()] | |
| if not aliases: | |
| return 0 | |
| melhor = 0 | |
| consulta_tokens = [tok for tok in consulta.split() if tok] | |
| for alias in aliases: | |
| if alias == consulta: | |
| melhor = max(melhor, 120) | |
| continue | |
| if alias.startswith(consulta): | |
| melhor = max(melhor, 100) | |
| continue | |
| if consulta in alias: | |
| melhor = max(melhor, 82) | |
| if consulta_tokens and all(tok in alias for tok in consulta_tokens): | |
| melhor = max(melhor, 72) | |
| return melhor | |
| def _montar_logradouro_catalogo( | |
| nome: str, | |
| prefixo: str, | |
| abreviado: str, | |
| categoria: str, | |
| ) -> tuple[str, list[str], str]: | |
| nome_limpo = str(nome or "").strip() | |
| prefixo_limpo = str(prefixo or "").strip() | |
| abreviado_limpo = re.sub(r"\s+", " ", str(abreviado or "").strip()) | |
| categoria_limpa = str(categoria or "").strip().upper() | |
| tipo_abrev = TIPO_LOGRADOURO_ABREV.get(categoria_limpa, categoria_limpa) | |
| tipo_expandido = TIPO_LOGRADOURO_EXPANDIDO.get(categoria_limpa, categoria_limpa) | |
| partes_base = [item for item in [tipo_abrev, prefixo_limpo, nome_limpo] if item] | |
| partes_extenso = [item for item in [tipo_expandido, prefixo_limpo, nome_limpo] if item] | |
| logradouro = abreviado_limpo or " ".join(partes_base).strip() or nome_limpo | |
| display_label = abreviado_limpo or logradouro | |
| if nome_limpo and _normalize(nome_limpo) not in {_normalize(display_label), _normalize(logradouro)}: | |
| display_label = f"{display_label} ({nome_limpo})" | |
| aliases = _dedupe_strings( | |
| [ | |
| logradouro, | |
| display_label, | |
| abreviado_limpo, | |
| " ".join(partes_base).strip(), | |
| " ".join(partes_extenso).strip(), | |
| nome_limpo, | |
| ] | |
| ) | |
| return logradouro, aliases, display_label | |
| def _to_int_or_none(value: Any) -> int | None: | |
| if _is_empty(value): | |
| return None | |
| try: | |
| return int(float(str(value).strip())) | |
| except Exception: | |
| return None | |
| def _extrair_bairros(df: pd.DataFrame) -> list[str]: | |
| candidatos = [col for col in df.columns if _has_alias(str(col), BAIRRO_ALIASES)] | |
| bairros: list[str] = [] | |
| vistos = set() | |
| for col in candidatos: | |
| serie = df[col] | |
| for valor in serie.dropna(): | |
| texto = str(valor).strip() | |
| chave = _normalize(texto) | |
| if not texto or not chave or chave in vistos: | |
| continue | |
| vistos.add(chave) | |
| bairros.append(texto) | |
| return sorted(bairros, key=lambda item: item.lower()) | |
| def _extrair_finalidades(df: pd.DataFrame) -> list[str]: | |
| candidatos = [col for col in df.columns if _has_alias(str(col), FINALIDADE_ALIASES)] | |
| finalidades: list[str] = [] | |
| vistos = set() | |
| for col in candidatos: | |
| serie = df[col] | |
| for valor in serie.dropna().head(5000): | |
| texto = str(valor).strip() | |
| chave = _normalize(texto) | |
| if not texto or not chave or chave in vistos: | |
| continue | |
| vistos.add(chave) | |
| finalidades.append(texto) | |
| if len(finalidades) >= 20: | |
| break | |
| if len(finalidades) >= 20: | |
| break | |
| return finalidades | |
| def _extrair_faixa_data_dataframe(df: pd.DataFrame) -> dict[str, Any] | None: | |
| for col in _colunas_data_reais(df): | |
| faixa = _extrair_faixa_data_real_serie(df[col]) | |
| if faixa is not None: | |
| return faixa | |
| return None | |
| def _colunas_data_reais(df: pd.DataFrame | None) -> list[str]: | |
| if df is None or df.empty: | |
| return [] | |
| candidatos = [str(col) for col in df.columns if _has_alias(str(col), DATA_ALIASES)] | |
| return [col for col in candidatos if not _parece_coluna_apenas_ano(col)] | |
| def _parece_coluna_apenas_ano(nome_coluna: str) -> bool: | |
| nome_norm = _normalize(nome_coluna) | |
| if not nome_norm: | |
| return False | |
| tokens_ano = [ | |
| "ano", | |
| "anodado", | |
| "anoconstr", | |
| "anobconstrucao", | |
| "anocconstrucao", | |
| "imoanomaior", | |
| "pmianomaior", | |
| ] | |
| return any(token in nome_norm for token in tokens_ano) | |
| def _extrair_faixa_data_real_serie(serie: pd.Series) -> dict[str, Any] | None: | |
| serie_limpa = serie.dropna() | |
| if serie_limpa.empty: | |
| return None | |
| bruto = serie_limpa.astype(str).str.strip() | |
| bruto = bruto[bruto != ""] | |
| if bruto.empty: | |
| return None | |
| # Rejeita colunas que sejam basicamente ano puro (YYYY), pois nao suportam | |
| # intervalo de datas reais (dia/mes/ano). | |
| proporcao_ano_puro = float(bruto.str.fullmatch(r"\d{4}").mean()) | |
| if proporcao_ano_puro >= 0.8: | |
| return None | |
| if bruto.str.match(r"^\d{4}-\d{2}-\d{2}(?:[T\s].*)?$").all(): | |
| serie_data = pd.to_datetime(bruto.str.slice(0, 10), errors="coerce", format="%Y-%m-%d").dropna() | |
| elif bruto.str.match(r"^\d{4}/\d{2}/\d{2}(?:[T\s].*)?$").all(): | |
| serie_data = pd.to_datetime(bruto.str.slice(0, 10), errors="coerce", format="%Y/%m/%d").dropna() | |
| elif bruto.str.match(r"^\d{2}/\d{2}/\d{4}(?:[T\s].*)?$").all(): | |
| serie_data = pd.to_datetime(bruto.str.slice(0, 10), errors="coerce", format="%d/%m/%Y").dropna() | |
| else: | |
| try: | |
| serie_data = pd.to_datetime(bruto, errors="coerce", dayfirst=True, format="mixed").dropna() | |
| except TypeError: | |
| serie_data = pd.to_datetime(bruto, errors="coerce", dayfirst=True).dropna() | |
| total = len(bruto) | |
| minimo_amostras = min(3, total) | |
| if len(serie_data) < minimo_amostras: | |
| return None | |
| if total > 0 and (len(serie_data) / total) < 0.6: | |
| return None | |
| return { | |
| "min": serie_data.min().date().isoformat(), | |
| "max": serie_data.max().date().isoformat(), | |
| } | |
| def _extrair_faixa_por_alias(estat_df: pd.DataFrame | None, aliases: list[str]) -> dict[str, Any] | None: | |
| if estat_df is None or estat_df.empty: | |
| return None | |
| estat_indexado = estat_df.copy() | |
| if "Variável" in estat_indexado.columns: | |
| estat_indexado = estat_indexado.set_index("Variável") | |
| if estat_indexado.empty: | |
| return None | |
| min_col = _buscar_coluna(estat_indexado.columns, ["minimo", "mínimo", "min"]) | |
| max_col = _buscar_coluna(estat_indexado.columns, ["maximo", "máximo", "max"]) | |
| if min_col is None or max_col is None: | |
| return None | |
| nomes = estat_indexado.index.astype(str).tolist() | |
| if not nomes: | |
| return None | |
| mask_alias = np.fromiter((_has_alias(nome, aliases) for nome in nomes), dtype=bool, count=len(nomes)) | |
| if not mask_alias.any(): | |
| return None | |
| trabalho = estat_indexado.loc[mask_alias, [min_col, max_col]] | |
| mins = [valor for valor in trabalho[min_col].tolist() if not _is_empty(valor)] | |
| maxs = [valor for valor in trabalho[max_col].tolist() if not _is_empty(valor)] | |
| if not mins and not maxs: | |
| return None | |
| return { | |
| "min": _min_value(mins), | |
| "max": _max_value(maxs), | |
| } | |
| def _resumo_variaveis(estat_df: pd.DataFrame | None, limite: int = 12) -> list[dict[str, Any]]: | |
| if estat_df is None or estat_df.empty: | |
| return [] | |
| trabalho = estat_df.copy() | |
| if "Variável" in trabalho.columns: | |
| trabalho = trabalho.set_index("Variável") | |
| if trabalho.empty: | |
| return [] | |
| min_col = _buscar_coluna(trabalho.columns, ["minimo", "mínimo", "min"]) | |
| max_col = _buscar_coluna(trabalho.columns, ["maximo", "máximo", "max"]) | |
| if min_col is None or max_col is None: | |
| return [] | |
| linhas: list[dict[str, Any]] = [] | |
| nomes = trabalho.index.astype(str).tolist() | |
| mins = trabalho[min_col].tolist() | |
| maxs = trabalho[max_col].tolist() | |
| for var, min_val, max_val in zip(nomes, mins, maxs): | |
| if _is_empty(min_val) and _is_empty(max_val): | |
| continue | |
| linhas.append( | |
| { | |
| "variavel": var, | |
| "min": sanitize_value(min_val), | |
| "max": sanitize_value(max_val), | |
| } | |
| ) | |
| if len(linhas) >= limite: | |
| break | |
| return linhas | |
| def _coletar_colunas_para_catalogo(estat_df: pd.DataFrame | None, df: pd.DataFrame | None) -> list[str]: | |
| nomes: list[str] = [] | |
| if estat_df is not None and not estat_df.empty: | |
| if "Variável" in estat_df.columns: | |
| nomes.extend([str(v) for v in estat_df["Variável"].dropna().tolist()]) | |
| else: | |
| nomes.extend([str(v) for v in estat_df.index.tolist()]) | |
| if df is not None and not df.empty: | |
| nomes.extend([str(v) for v in df.columns.tolist()]) | |
| unicos = [] | |
| vistos = set() | |
| for nome in nomes: | |
| chave = _normalize(nome) | |
| if not chave or chave in vistos: | |
| continue | |
| vistos.add(chave) | |
| unicos.append(nome) | |
| return unicos | |
| def _mapear_compatibilidade(colunas: list[str]) -> dict[str, list[str]]: | |
| out: dict[str, list[str]] = {} | |
| for chave, aliases in COMPATIBILIDADE_MAP.items(): | |
| encontrados = [col for col in colunas if _has_alias(col, aliases)] | |
| out[chave] = encontrados | |
| return out | |
| def _tem_colunas_mapa(df: pd.DataFrame | None) -> bool: | |
| if df is None or df.empty: | |
| return False | |
| nomes = [str(col) for col in df.columns] | |
| tem_lat = any(_has_alias(col, LAT_ALIASES) for col in nomes) | |
| tem_lon = any(_has_alias(col, LON_ALIASES) for col in nomes) | |
| return tem_lat and tem_lon | |
| def _aceita_filtros(modelo: dict[str, Any], filtros: PesquisaFiltros, fontes_admin: dict[str, list[str]] | None = None) -> bool: | |
| fontes_admin = fontes_admin or {} | |
| if filtros.nome and not _contains_any([modelo.get("nome_modelo"), modelo.get("arquivo")], filtros.nome): | |
| return False | |
| if filtros.autor and not _contains_any([modelo.get("autor")], filtros.autor): | |
| return False | |
| app_flag = _normalizar_contem_app(filtros.contem_app) | |
| if app_flag is not None and _modelo_contem_variavel(modelo, APP_ALIASES) != app_flag: | |
| return False | |
| if filtros.tipo_modelo and not _contains_any([_tipo_modelo_modelo(modelo)], filtros.tipo_modelo): | |
| return False | |
| negociacao_modelo = _normalizar_negociacao_modelo(filtros.negociacao_modelo) | |
| if negociacao_modelo and _negociacao_modelo_modelo(modelo) != negociacao_modelo: | |
| return False | |
| if filtros.finalidade and not _aceita_texto_com_colunas(modelo, filtros.finalidade, "finalidade", fontes_admin.get("finalidade")): | |
| return False | |
| if filtros.endereco and not _contains_any([modelo.get("endereco_referencia"), ", ".join(modelo.get("bairros") or [])], filtros.endereco): | |
| return False | |
| termos_bairro = _extrair_termos_bairro(filtros) | |
| if termos_bairro: | |
| for termo in termos_bairro: | |
| if not _aceita_texto_com_colunas(modelo, termo, "bairros", fontes_admin.get("bairros")): | |
| return False | |
| if not _aceita_range_com_colunas(modelo, "area", fontes_admin.get("area"), filtros.area_min, filtros.area_max): | |
| return False | |
| if not _aceita_range_com_colunas(modelo, "rh", fontes_admin.get("rh"), filtros.rh_min, filtros.rh_max): | |
| return False | |
| data_min = filtros.data_min | |
| data_max = filtros.data_max | |
| if _is_provided(filtros.aval_data) and not _is_provided(data_min) and not _is_provided(data_max): | |
| data_min = filtros.aval_data | |
| data_max = filtros.aval_data | |
| if not _aceita_range_contido_com_colunas(modelo, "data", fontes_admin.get("data"), data_min, data_max): | |
| return False | |
| return True | |
| def _normalizar_contem_app(value: str | None) -> bool | None: | |
| chave = _normalize(value or "") | |
| if not chave: | |
| return None | |
| if chave in {"sim", "true", "1", "com", "contem"}: | |
| return True | |
| if chave in {"nao", "false", "0", "sem"}: | |
| return False | |
| return None | |
| def _normalizar_negociacao_modelo(value: str | None) -> str | None: | |
| chave = _normalize(value or "") | |
| if not chave: | |
| return None | |
| if chave in {"aluguel", "locacao", "a"}: | |
| return "aluguel" | |
| if chave in {"venda", "v"}: | |
| return "venda" | |
| return None | |
| def _normalizar_otica(value: str | None) -> str: | |
| return "avaliando" | |
| def _anexar_avaliando_info( | |
| modelo: dict[str, Any], | |
| filtros: PesquisaFiltros, | |
| fontes_admin: dict[str, list[str]] | None = None, | |
| ) -> dict[str, Any]: | |
| fontes_admin = fontes_admin or {} | |
| item = dict(modelo) | |
| checks: list[dict[str, Any]] = [] | |
| rejeicoes: list[str] = [] | |
| def registrar(campo: str, informado: Any, aceito: bool, detalhe: str) -> None: | |
| check = {"campo": campo, "informado": sanitize_value(informado), "aceito": bool(aceito), "detalhe": detalhe} | |
| checks.append(check) | |
| if _is_provided(informado) and not aceito: | |
| rejeicoes.append(f"{campo}: {detalhe}") | |
| finalidade_info = filtros.aval_finalidade | |
| if _is_provided(finalidade_info): | |
| termos_finalidade = _split_terms(str(finalidade_info)) | |
| aceito = any( | |
| _aceita_texto_com_colunas(item, termo, "aval_finalidade", fontes_admin.get("aval_finalidade")) | |
| for termo in termos_finalidade | |
| ) if termos_finalidade else False | |
| detalhe = "nenhuma das finalidades informadas foi encontrada no modelo" if len(termos_finalidade) > 1 else "nao encontrada no modelo" | |
| registrar("finalidade", finalidade_info, aceito, detalhe) | |
| bairro_info = filtros.aval_bairro | |
| if _is_provided(bairro_info): | |
| termos_bairro = _split_terms(str(bairro_info)) | |
| aceito = any( | |
| _aceita_texto_com_colunas(item, termo, "aval_bairro", fontes_admin.get("aval_bairro")) | |
| for termo in termos_bairro | |
| ) if termos_bairro else False | |
| detalhe = "nenhum dos bairros informados esta na cobertura do modelo" if len(termos_bairro) > 1 else "bairro fora da cobertura do modelo" | |
| registrar("bairro", bairro_info, aceito, detalhe) | |
| zona_info = filtros.aval_zona | |
| if _is_provided(zona_info): | |
| zonas_informadas = _extrair_termos_zona(str(zona_info)) | |
| zonas_modelo = _zonas_avaliacao_modelo(item) | |
| zonas_modelo_norm = {_normalize(zona) for zona in zonas_modelo} | |
| aceito = any(_normalize(zona) in zonas_modelo_norm for zona in zonas_informadas) if zonas_informadas else False | |
| detalhe = ( | |
| "nenhuma das zonas informadas foi encontrada no modelo" | |
| if len(zonas_informadas) > 1 | |
| else "zona fora da cobertura do modelo" | |
| ) | |
| registrar("zona_avaliacao", zona_info, aceito, detalhe) | |
| endereco_info = filtros.aval_endereco | |
| if _is_provided(endereco_info): | |
| candidatos = [item.get("endereco_referencia"), ", ".join(item.get("bairros") or [])] | |
| aceito = _contains_any(candidatos, str(endereco_info)) | |
| registrar("endereco", endereco_info, aceito, "sem correspondencia textual no modelo") | |
| data_min = filtros.data_min | |
| data_max = filtros.data_max | |
| if _is_provided(filtros.aval_data) and not _is_provided(data_min) and not _is_provided(data_max): | |
| data_min = filtros.aval_data | |
| data_max = filtros.aval_data | |
| faixa_data_ref = _faixa_resumo_com_colunas(item, "data", fontes_admin.get("data")) | |
| registrar( | |
| "data", | |
| _periodo_texto_informado(data_min, data_max), | |
| _aceita_range_contido_com_colunas(item, "data", fontes_admin.get("data"), data_min, data_max), | |
| f"fora da faixa {formatar_faixa(faixa_data_ref)}", | |
| ) | |
| faixa_rh_ref = _faixa_resumo_com_colunas(item, "aval_rh", fontes_admin.get("aval_rh")) | |
| registrar( | |
| "rh", | |
| filtros.aval_rh, | |
| _aceita_valor_com_colunas(item, "aval_rh", fontes_admin.get("aval_rh"), filtros.aval_rh), | |
| f"fora da faixa {formatar_faixa(faixa_rh_ref)}", | |
| ) | |
| faixa_area_ref = _faixa_resumo_com_colunas(item, "aval_area", fontes_admin.get("aval_area")) | |
| registrar( | |
| "area", | |
| filtros.aval_area, | |
| _aceita_valor_com_colunas(item, "aval_area", fontes_admin.get("aval_area"), filtros.aval_area), | |
| f"fora da faixa {formatar_faixa(faixa_area_ref)}", | |
| ) | |
| faixa_area_priv = _faixa_resumo_com_colunas(item, "aval_area_privativa", fontes_admin.get("aval_area_privativa")) | |
| faixa_area_total = _faixa_resumo_com_colunas(item, "aval_area_total", fontes_admin.get("aval_area_total")) | |
| faixa_valor_unit = _faixa_resumo_com_colunas(item, "aval_valor_unitario", fontes_admin.get("aval_valor_unitario")) | |
| faixa_valor_tot = _faixa_resumo_com_colunas(item, "aval_valor_total", fontes_admin.get("aval_valor_total")) | |
| registrar( | |
| "area_privativa", | |
| filtros.aval_area_privativa, | |
| _aceita_valor_com_colunas(item, "aval_area_privativa", fontes_admin.get("aval_area_privativa"), filtros.aval_area_privativa), | |
| f"fora da faixa {formatar_faixa(faixa_area_priv)}", | |
| ) | |
| registrar( | |
| "area_total", | |
| filtros.aval_area_total, | |
| _aceita_valor_com_colunas(item, "aval_area_total", fontes_admin.get("aval_area_total"), filtros.aval_area_total), | |
| f"fora da faixa {formatar_faixa(faixa_area_total)}", | |
| ) | |
| registrar( | |
| "valor_unitario", | |
| filtros.aval_valor_unitario, | |
| _aceita_valor_com_colunas(item, "aval_valor_unitario", fontes_admin.get("aval_valor_unitario"), filtros.aval_valor_unitario), | |
| f"fora da faixa {formatar_faixa(faixa_valor_unit)}", | |
| ) | |
| registrar( | |
| "valor_total", | |
| filtros.aval_valor_total, | |
| _aceita_valor_com_colunas(item, "aval_valor_total", fontes_admin.get("aval_valor_total"), filtros.aval_valor_total), | |
| f"fora da faixa {formatar_faixa(faixa_valor_tot)}", | |
| ) | |
| checks_informados = [check for check in checks if _is_provided(check.get("informado"))] | |
| aceito = all(check.get("aceito") for check in checks_informados) if checks_informados else True | |
| item["avaliando"] = { | |
| "aceito": bool(aceito), | |
| "checks": checks, | |
| "motivos_rejeicao": rejeicoes, | |
| "campos_informados": len(checks_informados), | |
| } | |
| return item | |
| def _extrair_sugestoes( | |
| modelos: list[dict[str, Any]], | |
| fontes_admin: dict[str, list[str]] | None = None, | |
| limite: int = 200, | |
| incluir_logradouros_eixos: bool = True, | |
| ) -> dict[str, list[str]]: | |
| fontes_admin = fontes_admin or {} | |
| nomes: list[str] = [] | |
| autores: list[str] = [] | |
| finalidades: list[str] = [] | |
| bairros: list[str] = [] | |
| enderecos: list[str] = [] | |
| logradouros_eixos: list[str] = [] | |
| tipos_modelo: list[str] = [] | |
| zonas_avaliacao: list[str] = [] | |
| fontes_finalidade = _dedupe_strings((fontes_admin.get("finalidade") or []) + (fontes_admin.get("aval_finalidade") or [])) | |
| fontes_bairro = _dedupe_strings((fontes_admin.get("bairros") or []) + (fontes_admin.get("aval_bairro") or [])) | |
| for modelo in modelos: | |
| nomes.extend(_nomes_modelo_sugestao(modelo)) | |
| autores.append(str(modelo.get("autor") or "")) | |
| if fontes_finalidade: | |
| finalidades.extend(_valores_para_fontes(modelo, fontes_finalidade)) | |
| else: | |
| finalidades.append(str(modelo.get("finalidade") or "")) | |
| finalidades.extend([str(item) for item in (modelo.get("finalidades") or [])]) | |
| if fontes_bairro: | |
| bairros.extend(_valores_para_fontes(modelo, fontes_bairro)) | |
| else: | |
| bairros.extend([str(item) for item in (modelo.get("bairros") or [])]) | |
| enderecos.append(str(modelo.get("endereco_referencia") or "")) | |
| tipos_modelo.append(str(_tipo_modelo_modelo(modelo) or "")) | |
| zonas_avaliacao.extend(_zonas_avaliacao_modelo(modelo)) | |
| if incluir_logradouros_eixos: | |
| try: | |
| logradouros_eixos = [ | |
| str(item.get("logradouro") or "").strip() | |
| for item in _carregar_catalogo_vias() | |
| ] | |
| except HTTPException: | |
| logradouros_eixos = [] | |
| return { | |
| "nomes_modelo": _lista_textos_unicos(nomes, limite), | |
| "autores": _lista_textos_unicos(autores, limite), | |
| "finalidades": _lista_textos_unicos(finalidades, limite), | |
| "bairros": _lista_textos_unicos(bairros, limite), | |
| "enderecos": _lista_textos_unicos(enderecos, limite), | |
| "logradouros_eixos": _lista_textos_unicos(logradouros_eixos, None), | |
| "tipos_modelo": _lista_textos_unicos(tipos_modelo, limite), | |
| "zonas_avaliacao": _lista_zonas_unicas(zonas_avaliacao, limite), | |
| } | |
| def listar_logradouros_eixos(limite: int | None = None) -> dict[str, Any]: | |
| try: | |
| opcoes: list[dict[str, str]] = [] | |
| vistos: set[str] = set() | |
| for item in _carregar_catalogo_vias(): | |
| value = str(item.get("logradouro") or "").strip() | |
| if not value or value in vistos: | |
| continue | |
| vistos.add(value) | |
| label = str(item.get("display_label") or value).strip() or value | |
| opcoes.append({"value": value, "label": label}) | |
| except HTTPException: | |
| opcoes = [] | |
| opcoes.sort(key=lambda item: (str(item.get("label") or "").casefold(), str(item.get("value") or "").casefold())) | |
| if limite is not None and limite > 0: | |
| opcoes = opcoes[:limite] | |
| return sanitize_value( | |
| { | |
| "logradouros_eixos": opcoes, | |
| "total_logradouros": len(opcoes), | |
| } | |
| ) | |
| def _nomes_modelo_sugestao(modelo: dict[str, Any]) -> list[str]: | |
| nomes: list[str] = [] | |
| vistos = set() | |
| for candidato in [modelo.get("nome_modelo"), modelo.get("arquivo")]: | |
| texto = _texto_nome_modelo_sugestao(candidato) | |
| if not texto: | |
| continue | |
| chave = _normalize(texto) | |
| if not chave or chave in vistos: | |
| continue | |
| vistos.add(chave) | |
| nomes.append(texto) | |
| return nomes | |
| def _texto_nome_modelo_sugestao(value: Any) -> str | None: | |
| texto = _str_or_none(value) | |
| if not texto: | |
| return None | |
| if texto.lower().endswith(".dai"): | |
| texto = texto[:-4].strip() | |
| return texto or None | |
| def _modelo_publico(modelo: dict[str, Any]) -> dict[str, Any]: | |
| return {chave: valor for chave, valor in modelo.items() if not str(chave).startswith("_")} | |
| def _montar_config_colunas_filtro(modelos: list[dict[str, Any]]) -> dict[str, Any]: | |
| config: dict[str, Any] = {} | |
| campos = list(CAMPO_TEXTO_META_FONTES.keys()) + [campo for campo in CAMPO_FAIXA_META_FONTES.keys() if campo not in CAMPO_TEXTO_META_FONTES] | |
| for campo in campos: | |
| fontes_campo = set() | |
| fontes_padrao = set() | |
| for modelo in modelos: | |
| if campo in CAMPO_TEXTO_META_FONTES: | |
| fontes_campo.update(_fontes_texto_disponiveis(modelo, campo)) | |
| fontes_padrao.update(_fontes_texto_padrao(modelo, campo)) | |
| else: | |
| fontes_campo.update(_fontes_faixa_disponiveis(modelo, campo)) | |
| fontes_padrao.update(_fontes_faixa_padrao(modelo, campo)) | |
| fontes = sorted(fontes_campo, key=lambda item: _rotulo_fonte(item).lower()) | |
| padrao = [item for item in sorted(fontes_padrao, key=lambda item: _rotulo_fonte(item).lower()) if item in set(fontes)] | |
| if not padrao: | |
| padrao = list(fontes) | |
| config[campo] = { | |
| "disponiveis": [{"id": fonte, "label": _rotulo_fonte(fonte)} for fonte in fontes], | |
| "padrao": padrao, | |
| } | |
| return config | |
| def _carregar_fontes_admin(colunas_filtro: dict[str, Any]) -> dict[str, list[str]]: | |
| with _ADMIN_CONFIG_LOCK: | |
| raw = dict(_ADMIN_FONTES_SESSION or {}) | |
| return _normalizar_fontes_admin(raw, colunas_filtro) | |
| def _salvar_fontes_admin_sessao(fontes_admin: dict[str, list[str]]) -> None: | |
| payload = {campo: _dedupe_strings(valores) for campo, valores in (fontes_admin or {}).items()} | |
| global _ADMIN_FONTES_SESSION | |
| with _ADMIN_CONFIG_LOCK: | |
| _ADMIN_FONTES_SESSION = payload | |
| def _normalizar_fontes_admin(raw: dict[str, Any], colunas_filtro: dict[str, Any]) -> dict[str, list[str]]: | |
| saida: dict[str, list[str]] = {} | |
| for campo, config in (colunas_filtro or {}).items(): | |
| disponiveis = [str(item.get("id")) for item in (config.get("disponiveis") or []) if isinstance(item, dict) and item.get("id")] | |
| padrao = [str(item) for item in (config.get("padrao") or []) if str(item).strip()] | |
| escolhidas_raw = raw.get(campo) if isinstance(raw, dict) else None | |
| escolhidas = [str(item) for item in (escolhidas_raw or []) if str(item).strip()] | |
| set_disponiveis = set(disponiveis) | |
| escolhidas_validas = [item for item in _dedupe_strings(escolhidas) if item in set_disponiveis] | |
| if not escolhidas_validas: | |
| escolhidas_validas = [item for item in _dedupe_strings(padrao) if item in set_disponiveis] | |
| if not escolhidas_validas and disponiveis: | |
| escolhidas_validas = [disponiveis[0]] | |
| saida[campo] = escolhidas_validas | |
| return saida | |
| def _rotulo_fonte(fonte: str) -> str: | |
| if fonte.startswith("col:"): | |
| return fonte[4:] | |
| if fonte.startswith("var:"): | |
| return f"Variavel: {fonte[4:]}" | |
| return FONTE_META_LABELS.get(fonte, fonte) | |
| def _fontes_texto_disponiveis(modelo: dict[str, Any], campo: str) -> list[str]: | |
| aliases = CAMPO_TEXTO_ALIASES_COLUNA.get(campo, []) | |
| indice = modelo.get("_texto_colunas_index") or {} | |
| if not isinstance(indice, dict): | |
| return [] | |
| fontes: list[str] = [] | |
| for col in indice.keys(): | |
| col_text = str(col) | |
| if _has_alias(col_text, aliases): | |
| fontes.append(f"col:{col_text}") | |
| return _dedupe_strings(fontes) | |
| def _fontes_texto_padrao(modelo: dict[str, Any], campo: str) -> list[str]: | |
| return _fontes_texto_disponiveis(modelo, campo) | |
| def _fontes_faixa_disponiveis(modelo: dict[str, Any], campo: str) -> list[str]: | |
| fontes: list[str] = [] | |
| for fonte_meta in CAMPO_FAIXA_META_FONTES.get(campo, []): | |
| faixa_meta = _faixa_meta(modelo, fonte_meta) | |
| if isinstance(faixa_meta, dict) and not (_is_empty(faixa_meta.get("min")) and _is_empty(faixa_meta.get("max"))): | |
| fontes.append(fonte_meta) | |
| aliases_coluna = CAMPO_FAIXA_ALIASES_COLUNA.get(campo, []) | |
| indice_colunas = modelo.get("_faixa_colunas_index") or {} | |
| if isinstance(indice_colunas, dict): | |
| for col in indice_colunas.keys(): | |
| col_text = str(col) | |
| if _has_alias(col_text, aliases_coluna): | |
| fontes.append(f"col:{col_text}") | |
| permite_variaveis = campo in CAMPO_FAIXA_ALIASES_VARIAVEL | |
| indice_variaveis = modelo.get("_faixa_variaveis_index") or {} | |
| if permite_variaveis and isinstance(indice_variaveis, dict): | |
| for var in indice_variaveis.keys(): | |
| fontes.append(f"var:{str(var)}") | |
| return _dedupe_strings(fontes) | |
| def _fontes_faixa_padrao(modelo: dict[str, Any], campo: str) -> list[str]: | |
| fontes: list[str] = [] | |
| for fonte_meta in CAMPO_FAIXA_META_FONTES.get(campo, []): | |
| faixa_meta = _faixa_meta(modelo, fonte_meta) | |
| if isinstance(faixa_meta, dict) and not (_is_empty(faixa_meta.get("min")) and _is_empty(faixa_meta.get("max"))): | |
| fontes.append(fonte_meta) | |
| aliases_coluna = CAMPO_FAIXA_ALIASES_COLUNA.get(campo, []) | |
| indice_colunas = modelo.get("_faixa_colunas_index") or {} | |
| if isinstance(indice_colunas, dict): | |
| for col in indice_colunas.keys(): | |
| col_text = str(col) | |
| if _has_alias(col_text, aliases_coluna): | |
| fontes.append(f"col:{col_text}") | |
| aliases_variavel = CAMPO_FAIXA_ALIASES_VARIAVEL.get(campo, []) | |
| indice_variaveis = modelo.get("_faixa_variaveis_index") or {} | |
| if isinstance(indice_variaveis, dict): | |
| for var in indice_variaveis.keys(): | |
| var_text = str(var) | |
| if _has_alias_exato(var_text, aliases_variavel): | |
| fontes.append(f"var:{var_text}") | |
| return _dedupe_strings(fontes) | |
| def _dedupe_strings(values: list[str]) -> list[str]: | |
| out: list[str] = [] | |
| seen = set() | |
| for value in values: | |
| text = str(value).strip() | |
| if not text or text in seen: | |
| continue | |
| seen.add(text) | |
| out.append(text) | |
| return out | |
| def _aceita_texto_com_colunas(modelo: dict[str, Any], consulta: str, campo: str, fontes_selecionadas: list[str] | None) -> bool: | |
| fontes = _resolver_fontes_campo(modelo, campo, fontes_selecionadas) | |
| candidatos = _valores_para_fontes(modelo, fontes) | |
| return _contains_any(candidatos, consulta) | |
| def _aceita_range_com_colunas( | |
| modelo: dict[str, Any], | |
| campo: str, | |
| fontes_selecionadas: list[str] | None, | |
| filtro_min: Any, | |
| filtro_max: Any, | |
| ) -> bool: | |
| if filtro_min is None and filtro_max is None: | |
| return True | |
| faixas = _faixas_para_campo(modelo, campo, fontes_selecionadas) | |
| if not faixas: | |
| return False | |
| return any(_range_overlaps(faixa, filtro_min, filtro_max) for faixa in faixas) | |
| def _aceita_range_contido_com_colunas( | |
| modelo: dict[str, Any], | |
| campo: str, | |
| fontes_selecionadas: list[str] | None, | |
| filtro_min: Any, | |
| filtro_max: Any, | |
| ) -> bool: | |
| if filtro_min is None and filtro_max is None: | |
| return True | |
| faixas = _faixas_para_campo(modelo, campo, fontes_selecionadas) | |
| if not faixas: | |
| return False | |
| return any(_range_contains(faixa, filtro_min, filtro_max) for faixa in faixas) | |
| def _aceita_valor_com_colunas( | |
| modelo: dict[str, Any], | |
| campo: str, | |
| fontes_selecionadas: list[str] | None, | |
| valor: Any, | |
| ) -> bool: | |
| if not _is_provided(valor): | |
| return True | |
| return _aceita_range_com_colunas(modelo, campo, fontes_selecionadas, valor, valor) | |
| def _faixa_resumo_com_colunas(modelo: dict[str, Any], campo: str, fontes_selecionadas: list[str] | None) -> dict[str, Any] | None: | |
| faixas = _faixas_para_campo(modelo, campo, fontes_selecionadas) | |
| return _combinar_faixas(faixas) | |
| def _faixas_para_campo(modelo: dict[str, Any], campo: str, fontes_selecionadas: list[str] | None) -> list[dict[str, Any]]: | |
| fontes = _resolver_fontes_faixa(modelo, campo, fontes_selecionadas) | |
| return _faixas_para_fontes(modelo, fontes) | |
| def _resolver_fontes_campo(modelo: dict[str, Any], campo: str, fontes_selecionadas: list[str] | None) -> list[str]: | |
| disponiveis = _fontes_texto_disponiveis(modelo, campo) | |
| selecionadas = _dedupe_strings([str(item) for item in (fontes_selecionadas or [])]) | |
| if selecionadas: | |
| set_disponiveis = set(disponiveis) | |
| return [item for item in selecionadas if item in set_disponiveis] | |
| return disponiveis | |
| def _resolver_fontes_faixa(modelo: dict[str, Any], campo: str, fontes_selecionadas: list[str] | None) -> list[str]: | |
| disponiveis = _fontes_faixa_disponiveis(modelo, campo) | |
| selecionadas = _dedupe_strings([str(item) for item in (fontes_selecionadas or [])]) | |
| if selecionadas: | |
| set_disponiveis = set(disponiveis) | |
| return [item for item in selecionadas if item in set_disponiveis] | |
| return disponiveis | |
| def _valores_para_fontes(modelo: dict[str, Any], fontes: list[str]) -> list[str]: | |
| candidatos: list[str] = [] | |
| indice_colunas = modelo.get("_texto_colunas_index") or {} | |
| if not isinstance(indice_colunas, dict): | |
| indice_colunas = {} | |
| for fonte in fontes: | |
| if fonte.startswith("meta:"): | |
| candidatos.extend(_valores_meta(modelo, fonte)) | |
| continue | |
| if fonte.startswith("col:"): | |
| col = fonte[4:] | |
| valores = indice_colunas.get(col) or [] | |
| candidatos.extend([str(item) for item in valores if _str_or_none(item)]) | |
| return candidatos | |
| def _valores_meta(modelo: dict[str, Any], fonte: str) -> list[str]: | |
| if fonte == "meta:nome_modelo": | |
| return [str(modelo.get("nome_modelo") or "")] | |
| if fonte == "meta:arquivo": | |
| return [str(modelo.get("arquivo") or "")] | |
| if fonte == "meta:autor": | |
| return [str(modelo.get("autor") or "")] | |
| if fonte == "meta:finalidade": | |
| return [str(modelo.get("finalidade") or "")] | |
| if fonte == "meta:tipo_imovel": | |
| return [str(modelo.get("tipo_imovel") or "")] | |
| if fonte == "meta:finalidades": | |
| return [str(item) for item in (modelo.get("finalidades") or [])] | |
| if fonte == "meta:bairros": | |
| return [str(item) for item in (modelo.get("bairros") or [])] | |
| if fonte == "meta:endereco_referencia": | |
| return [str(modelo.get("endereco_referencia") or "")] | |
| return [] | |
| def _faixas_para_fontes(modelo: dict[str, Any], fontes: list[str]) -> list[dict[str, Any]]: | |
| candidatos: list[dict[str, Any]] = [] | |
| indice_colunas = modelo.get("_faixa_colunas_index") or {} | |
| if not isinstance(indice_colunas, dict): | |
| indice_colunas = {} | |
| indice_variaveis = modelo.get("_faixa_variaveis_index") or {} | |
| if not isinstance(indice_variaveis, dict): | |
| indice_variaveis = {} | |
| for fonte in fontes: | |
| faixa: dict[str, Any] | None = None | |
| if fonte.startswith("meta:"): | |
| faixa = _faixa_meta(modelo, fonte) | |
| elif fonte.startswith("col:"): | |
| faixa = indice_colunas.get(fonte[4:]) | |
| elif fonte.startswith("var:"): | |
| faixa = indice_variaveis.get(fonte[4:]) | |
| if not isinstance(faixa, dict): | |
| continue | |
| if _is_empty(faixa.get("min")) and _is_empty(faixa.get("max")): | |
| continue | |
| candidatos.append(faixa) | |
| return candidatos | |
| def _faixa_meta(modelo: dict[str, Any], fonte: str) -> dict[str, Any] | None: | |
| if fonte == "meta:faixa_data": | |
| return modelo.get("faixa_data") | |
| if fonte == "meta:faixa_area": | |
| return modelo.get("faixa_area") | |
| if fonte == "meta:faixa_rh": | |
| return modelo.get("faixa_rh") | |
| faixas_por_campo = modelo.get("faixas_por_campo") or {} | |
| if not isinstance(faixas_por_campo, dict): | |
| faixas_por_campo = {} | |
| if fonte == "meta:faixa_area_privativa": | |
| return faixas_por_campo.get("area_privativa") | |
| if fonte == "meta:faixa_area_total": | |
| return faixas_por_campo.get("area_total") | |
| if fonte == "meta:faixa_valor_unitario": | |
| return faixas_por_campo.get("valor_unitario") | |
| if fonte == "meta:faixa_valor_total": | |
| return faixas_por_campo.get("valor_total") | |
| return None | |
| def _combinar_faixas(faixas: list[dict[str, Any]]) -> dict[str, Any] | None: | |
| kind: str | None = None | |
| min_vals: list[Any] = [] | |
| max_vals: list[Any] = [] | |
| for faixa in faixas: | |
| cmp_min = _to_comparable(faixa.get("min")) | |
| cmp_max = _to_comparable(faixa.get("max")) | |
| faixa_kind = cmp_min[0] if cmp_min is not None else (cmp_max[0] if cmp_max is not None else None) | |
| if faixa_kind is None: | |
| continue | |
| if kind is None: | |
| kind = faixa_kind | |
| if faixa_kind != kind: | |
| continue | |
| if cmp_min is not None: | |
| min_vals.append(cmp_min[1]) | |
| if cmp_max is not None: | |
| max_vals.append(cmp_max[1]) | |
| if not min_vals and not max_vals: | |
| return None | |
| minimo = min(min_vals) if min_vals else None | |
| maximo = max(max_vals) if max_vals else None | |
| return { | |
| "min": _formatar_limite_faixa(minimo), | |
| "max": _formatar_limite_faixa(maximo), | |
| } | |
| def _formatar_limite_faixa(valor: Any) -> Any: | |
| if valor is None: | |
| return None | |
| if isinstance(valor, datetime): | |
| return valor.date().isoformat() | |
| return sanitize_value(valor) | |
| def _indexar_texto_colunas(df_modelo: pd.DataFrame | None) -> dict[str, list[str]]: | |
| if df_modelo is None or df_modelo.empty: | |
| return {} | |
| indice: dict[str, list[str]] = {} | |
| colunas = [str(col) for col in df_modelo.columns[:MAX_COLUNAS_INDEXADAS]] | |
| base = df_modelo[colunas].head(MAX_LINHAS_INDEXACAO) | |
| for coluna in colunas: | |
| serie = base[coluna] | |
| valores: list[str] = [] | |
| vistos = set() | |
| for valor in serie.dropna().tolist(): | |
| texto = _str_or_none(valor) | |
| if texto is None and isinstance(valor, (int, float)): | |
| texto = str(valor) | |
| if texto is None: | |
| continue | |
| chave = _normalize(texto) | |
| if not chave or chave in vistos: | |
| continue | |
| vistos.add(chave) | |
| valores.append(texto) | |
| if len(valores) >= MAX_VALORES_INDEXADOS_POR_COLUNA: | |
| break | |
| if valores: | |
| indice[coluna] = valores | |
| return indice | |
| def _indexar_faixas_colunas(df_modelo: pd.DataFrame | None) -> dict[str, dict[str, Any]]: | |
| if df_modelo is None or df_modelo.empty: | |
| return {} | |
| indice: dict[str, dict[str, Any]] = {} | |
| colunas = _colunas_data_reais(df_modelo)[:MAX_COLUNAS_INDEXADAS] | |
| if not colunas: | |
| return indice | |
| base = df_modelo[colunas].head(MAX_LINHAS_INDEXACAO) | |
| for coluna in colunas: | |
| faixa = _extrair_faixa_data_real_serie(base[coluna]) | |
| if faixa is not None: | |
| indice[coluna] = faixa | |
| return indice | |
| def _indexar_faixas_variaveis(estat_df: pd.DataFrame | None, variaveis_modelo: list[str] | None) -> dict[str, dict[str, Any]]: | |
| if estat_df is None or estat_df.empty: | |
| return {} | |
| trabalho = estat_df.copy() | |
| if "Variável" in trabalho.columns: | |
| trabalho = trabalho.set_index("Variável") | |
| if trabalho.empty: | |
| return {} | |
| min_col = _buscar_coluna(trabalho.columns, ["minimo", "mínimo", "min"]) | |
| max_col = _buscar_coluna(trabalho.columns, ["maximo", "máximo", "max"]) | |
| if min_col is None or max_col is None: | |
| return {} | |
| variaveis_norm = {_normalize(item) for item in (variaveis_modelo or []) if _str_or_none(item)} | |
| indice: dict[str, dict[str, Any]] = {} | |
| nomes = trabalho.index.astype(str).tolist() | |
| mins = trabalho[min_col].tolist() | |
| maxs = trabalho[max_col].tolist() | |
| for nome, min_bruto, max_bruto in zip(nomes, mins, maxs): | |
| if variaveis_norm and _normalize(nome) not in variaveis_norm: | |
| continue | |
| min_val = sanitize_value(min_bruto) | |
| max_val = sanitize_value(max_bruto) | |
| if _is_empty(min_val) and _is_empty(max_val): | |
| continue | |
| indice[nome] = { | |
| "min": None if _is_empty(min_val) else min_val, | |
| "max": None if _is_empty(max_val) else max_val, | |
| } | |
| return indice | |
| def _lista_textos_unicos(valores: list[str], limite: int | None) -> list[str]: | |
| unicos: list[str] = [] | |
| vistos = set() | |
| for valor in valores: | |
| texto = _str_or_none(valor) | |
| if not texto: | |
| continue | |
| chave = _normalize(texto) | |
| if not chave or chave in vistos: | |
| continue | |
| vistos.add(chave) | |
| unicos.append(texto) | |
| ordenados = sorted(unicos, key=lambda item: item.lower()) | |
| if limite is None or limite <= 0: | |
| return ordenados | |
| return ordenados[:limite] | |
| def _aceita_valor_na_faixa(faixa: dict[str, Any] | None, valor: Any) -> bool: | |
| if not _is_provided(valor): | |
| return True | |
| return _range_overlaps(faixa, valor, valor) | |
| def formatar_faixa(faixa: dict[str, Any] | None) -> str: | |
| if not faixa: | |
| return "nao disponivel" | |
| minimo = faixa.get("min") | |
| maximo = faixa.get("max") | |
| if _is_empty(minimo) and _is_empty(maximo): | |
| return "nao disponivel" | |
| if not _is_empty(minimo) and not _is_empty(maximo): | |
| return f"{minimo} a {maximo}" | |
| if not _is_empty(minimo): | |
| return f"a partir de {minimo}" | |
| return f"ate {maximo}" | |
| def _periodo_texto_informado(data_min: Any, data_max: Any) -> str | None: | |
| inicio = _str_or_none(data_min) | |
| fim = _str_or_none(data_max) | |
| if not inicio and not fim: | |
| return None | |
| if inicio and fim: | |
| return f"{inicio} a {fim}" | |
| if inicio: | |
| return f"a partir de {inicio}" | |
| return f"ate {fim}" | |
| def _tipo_modelo_modelo(modelo: dict[str, Any]) -> str | None: | |
| tipo = _str_or_none(modelo.get("tipo_imovel")) | |
| if tipo: | |
| return tipo | |
| nome_referencia = _str_or_none(modelo.get("nome_modelo")) or _str_or_none(modelo.get("arquivo")) or "" | |
| return _inferir_tipo_por_nome(nome_referencia) | |
| def _negociacao_modelo_modelo(modelo: dict[str, Any]) -> str | None: | |
| nome_referencia = _str_or_none(modelo.get("arquivo")) or _str_or_none(modelo.get("nome_modelo")) or "" | |
| nome_upper = nome_referencia.upper() | |
| if re.search(r"(^|_)A(_|$)", nome_upper): | |
| return "aluguel" | |
| if re.search(r"(^|_)V(_|$)", nome_upper): | |
| return "venda" | |
| return None | |
| def _normalizar_token_zona(value: Any) -> str | None: | |
| texto = _str_or_none(value) | |
| if not texto: | |
| return None | |
| match = re.search(r"Z?\s*0*(\d+)", texto.upper()) | |
| if not match: | |
| return None | |
| numero = int(match.group(1)) | |
| if numero <= 0: | |
| return None | |
| return f"Z{numero}" | |
| def _extrair_termos_zona(texto: str) -> list[str]: | |
| zonas = [] | |
| vistos = set() | |
| for termo in _split_terms(texto): | |
| zona = _normalizar_token_zona(termo) | |
| if not zona: | |
| continue | |
| chave = _normalize(zona) | |
| if chave in vistos: | |
| continue | |
| vistos.add(chave) | |
| zonas.append(zona) | |
| return zonas | |
| def _zonas_avaliacao_modelo(modelo: dict[str, Any]) -> list[str]: | |
| nomes_ref = [ | |
| _str_or_none(modelo.get("arquivo")) or "", | |
| _str_or_none(modelo.get("nome_modelo")) or "", | |
| ] | |
| zonas = [] | |
| vistos = set() | |
| for nome in nomes_ref: | |
| nome_upper = nome.upper() | |
| for match in re.finditer(r"(?:^|_)Z(\d+)(?=_|$)", nome_upper): | |
| zona = _normalizar_token_zona(f"Z{match.group(1)}") | |
| if not zona: | |
| continue | |
| chave = _normalize(zona) | |
| if chave in vistos: | |
| continue | |
| vistos.add(chave) | |
| zonas.append(zona) | |
| return sorted(zonas, key=lambda item: int(re.search(r"\d+", item).group(0))) | |
| def _lista_zonas_unicas(valores: list[str], limite: int = 200) -> list[str]: | |
| unicas = [] | |
| vistos = set() | |
| for valor in valores: | |
| zona = _normalizar_token_zona(valor) | |
| if not zona: | |
| continue | |
| chave = _normalize(zona) | |
| if not chave or chave in vistos: | |
| continue | |
| vistos.add(chave) | |
| unicas.append(zona) | |
| if len(unicas) >= limite: | |
| break | |
| return sorted(unicas, key=lambda item: int(re.search(r"\d+", item).group(0))) | |
| def _extrair_termos_bairro(filtros: PesquisaFiltros) -> list[str]: | |
| termos: list[str] = [] | |
| if filtros.bairro: | |
| termos.extend(_split_terms(filtros.bairro)) | |
| if filtros.bairros: | |
| for entrada in filtros.bairros: | |
| termos.extend(_split_terms(entrada)) | |
| limpos = [] | |
| vistos = set() | |
| for termo in termos: | |
| chave = _normalize(termo) | |
| if not chave or chave in vistos: | |
| continue | |
| vistos.add(chave) | |
| limpos.append(termo) | |
| return limpos | |
| def _split_terms(texto: str) -> list[str]: | |
| if not texto: | |
| return [] | |
| partes = re.split(r"[,;|]", texto) | |
| return [parte.strip() for parte in partes if parte.strip()] | |
| def _range_overlaps(model_range: dict[str, Any] | None, filtro_min: Any, filtro_max: Any) -> bool: | |
| if filtro_min is None and filtro_max is None: | |
| return True | |
| if not model_range: | |
| return False | |
| model_min_cmp = _to_comparable(model_range.get("min")) | |
| model_max_cmp = _to_comparable(model_range.get("max")) | |
| filtro_min_cmp = _to_comparable(filtro_min) if filtro_min is not None else None | |
| filtro_max_cmp = _to_comparable(filtro_max) if filtro_max is not None else None | |
| if filtro_min_cmp is None and filtro_max_cmp is None: | |
| return True | |
| kinds = { | |
| item[0] | |
| for item in [model_min_cmp, model_max_cmp, filtro_min_cmp, filtro_max_cmp] | |
| if item is not None | |
| } | |
| if len(kinds) != 1: | |
| return False | |
| model_min_val = model_min_cmp[1] if model_min_cmp is not None else None | |
| model_max_val = model_max_cmp[1] if model_max_cmp is not None else None | |
| filtro_min_val = filtro_min_cmp[1] if filtro_min_cmp is not None else None | |
| filtro_max_val = filtro_max_cmp[1] if filtro_max_cmp is not None else None | |
| if model_min_val is None and model_max_val is None: | |
| return False | |
| if model_min_val is None: | |
| model_min_val = model_max_val | |
| if model_max_val is None: | |
| model_max_val = model_min_val | |
| if filtro_min_val is not None and model_max_val < filtro_min_val: | |
| return False | |
| if filtro_max_val is not None and model_min_val > filtro_max_val: | |
| return False | |
| return True | |
| def _range_contains(model_range: dict[str, Any] | None, filtro_min: Any, filtro_max: Any) -> bool: | |
| if filtro_min is None and filtro_max is None: | |
| return True | |
| if not model_range: | |
| return False | |
| model_min_cmp = _to_comparable(model_range.get("min")) | |
| model_max_cmp = _to_comparable(model_range.get("max")) | |
| filtro_min_cmp = _to_comparable(filtro_min) if filtro_min is not None else None | |
| filtro_max_cmp = _to_comparable(filtro_max) if filtro_max is not None else None | |
| if filtro_min_cmp is None and filtro_max_cmp is None: | |
| return True | |
| kinds = { | |
| item[0] | |
| for item in [model_min_cmp, model_max_cmp, filtro_min_cmp, filtro_max_cmp] | |
| if item is not None | |
| } | |
| if len(kinds) != 1: | |
| return False | |
| model_min_val = model_min_cmp[1] if model_min_cmp is not None else None | |
| model_max_val = model_max_cmp[1] if model_max_cmp is not None else None | |
| filtro_min_val = filtro_min_cmp[1] if filtro_min_cmp is not None else None | |
| filtro_max_val = filtro_max_cmp[1] if filtro_max_cmp is not None else None | |
| if model_min_val is None or model_max_val is None: | |
| return False | |
| if filtro_min_val is not None and filtro_max_val is not None and filtro_min_val > filtro_max_val: | |
| filtro_min_val, filtro_max_val = filtro_max_val, filtro_min_val | |
| if filtro_min_val is not None: | |
| if filtro_min_val < model_min_val or filtro_min_val > model_max_val: | |
| return False | |
| if filtro_max_val is not None: | |
| if filtro_max_val < model_min_val or filtro_max_val > model_max_val: | |
| return False | |
| return True | |
| def _to_comparable(value: Any) -> tuple[str, Any] | None: | |
| if value is None: | |
| return None | |
| if isinstance(value, (int, float)): | |
| if isinstance(value, float) and (math.isnan(value) or math.isinf(value)): | |
| return None | |
| return ("num", float(value)) | |
| if isinstance(value, pd.Timestamp): | |
| return ("dt", value.to_pydatetime()) | |
| if isinstance(value, datetime): | |
| return ("dt", value) | |
| if isinstance(value, date): | |
| return ("dt", datetime(value.year, value.month, value.day)) | |
| texto = str(value).strip() | |
| if not texto: | |
| return None | |
| numero = _to_float_or_none(texto) | |
| if numero is not None: | |
| return ("num", float(numero)) | |
| data_val = _parse_datetime(texto) | |
| if data_val is not None: | |
| return ("dt", data_val) | |
| return None | |
| def _parse_datetime(texto: str) -> datetime | None: | |
| for fmt in ("%Y-%m-%d", "%d/%m/%Y", "%Y/%m/%d", "%Y%m%d", "%Y"): | |
| try: | |
| return datetime.strptime(texto, fmt) | |
| except Exception: | |
| continue | |
| return None | |
| def _contains_any(candidatos: list[Any], consulta: str) -> bool: | |
| alvo = _normalize(consulta) | |
| if not alvo: | |
| return True | |
| for item in candidatos: | |
| if item is None: | |
| continue | |
| if alvo in _normalize(str(item)): | |
| return True | |
| return False | |
| def _merge_ranges(preferencial: dict[str, Any] | None, fallback: dict[str, Any] | None) -> dict[str, Any] | None: | |
| if not preferencial and not fallback: | |
| return None | |
| if not preferencial: | |
| return fallback | |
| if not fallback: | |
| return preferencial | |
| min_final = preferencial.get("min") if not _is_empty(preferencial.get("min")) else fallback.get("min") | |
| max_final = preferencial.get("max") if not _is_empty(preferencial.get("max")) else fallback.get("max") | |
| if _is_empty(min_final) and _is_empty(max_final): | |
| return None | |
| return {"min": sanitize_value(min_final), "max": sanitize_value(max_final)} | |
| def _buscar_coluna(colunas: Any, aliases: list[str]) -> Any: | |
| for coluna in colunas: | |
| if _has_alias(str(coluna), aliases): | |
| return coluna | |
| return None | |
| def _has_alias(nome: str, aliases: list[str]) -> bool: | |
| nome_norm = _normalize(nome) | |
| if not nome_norm: | |
| return False | |
| for alias in aliases: | |
| alias_norm = _normalize(alias) | |
| if alias_norm == nome_norm or alias_norm in nome_norm: | |
| return True | |
| return False | |
| def _has_alias_exato(nome: str, aliases: list[str]) -> bool: | |
| nome_norm = _normalize(nome) | |
| if not nome_norm: | |
| return False | |
| return any(_normalize(alias) == nome_norm for alias in aliases) | |
| def _normalize(value: str) -> str: | |
| ascii_text = unicodedata.normalize("NFKD", str(value)).encode("ascii", "ignore").decode("ascii") | |
| ascii_text = ascii_text.lower().strip() | |
| return re.sub(r"[^a-z0-9]+", "", ascii_text) | |
| def _min_value(values: list[Any]) -> Any: | |
| comparaveis = [_to_comparable(v) for v in values] | |
| comparaveis = [c for c in comparaveis if c is not None] | |
| if not comparaveis: | |
| return sanitize_value(values[0]) if values else None | |
| kind = comparaveis[0][0] | |
| valores = [c[1] for c in comparaveis if c[0] == kind] | |
| menor = min(valores) | |
| if isinstance(menor, datetime): | |
| return menor.date().isoformat() | |
| return sanitize_value(menor) | |
| def _max_value(values: list[Any]) -> Any: | |
| comparaveis = [_to_comparable(v) for v in values] | |
| comparaveis = [c for c in comparaveis if c is not None] | |
| if not comparaveis: | |
| return sanitize_value(values[0]) if values else None | |
| kind = comparaveis[0][0] | |
| valores = [c[1] for c in comparaveis if c[0] == kind] | |
| maior = max(valores) | |
| if isinstance(maior, datetime): | |
| return maior.date().isoformat() | |
| return sanitize_value(maior) | |
| def _inferir_finalidade_por_nome(nome_arquivo: str) -> str | None: | |
| nome_upper = nome_arquivo.upper() | |
| if re.search(r"(^|_)A(_|$)", nome_upper) or "ALUG" in nome_upper: | |
| return "Aluguel" | |
| if re.search(r"(^|_)V(_|$)", nome_upper) or "VENDA" in nome_upper: | |
| return "Venda" | |
| return None | |
| def _inferir_tipo_por_nome(nome_arquivo: str) -> str | None: | |
| nome_upper = nome_arquivo.upper() | |
| tokens_ordenados = sorted(TIPO_POR_TOKEN.items(), key=lambda item: len(item[0]), reverse=True) | |
| for token, tipo in tokens_ordenados: | |
| if _contains_tipo_token(nome_upper, token): | |
| return tipo | |
| return None | |
| def _contains_tipo_token(nome_upper: str, token: str) -> bool: | |
| padrao = rf"(^|[^A-Z]){re.escape(token)}([^A-Z]|$)" | |
| return re.search(padrao, nome_upper) is not None | |
| def _to_float_or_none(value: Any) -> float | None: | |
| if value is None: | |
| return None | |
| if isinstance(value, bool): | |
| return None | |
| if isinstance(value, (int, float)): | |
| if isinstance(value, float) and (math.isnan(value) or math.isinf(value)): | |
| return None | |
| return float(value) | |
| texto = str(value).strip() | |
| if not texto: | |
| return None | |
| if "," in texto and "." in texto: | |
| if texto.rfind(",") > texto.rfind("."): | |
| texto = texto.replace(".", "").replace(",", ".") | |
| else: | |
| texto = texto.replace(",", "") | |
| else: | |
| texto = texto.replace(",", ".") | |
| try: | |
| return float(texto) | |
| except Exception: | |
| return None | |
| def _str_or_none(value: Any) -> str | None: | |
| if value is None: | |
| return None | |
| texto = str(value).strip() | |
| return texto or None | |
| def _is_empty(value: Any) -> bool: | |
| if value is None: | |
| return True | |
| if isinstance(value, str) and not value.strip(): | |
| return True | |
| try: | |
| if pd.isna(value): | |
| return True | |
| except Exception: | |
| pass | |
| return False | |
| def _is_provided(value: Any) -> bool: | |
| if value is None: | |
| return False | |
| if isinstance(value, str): | |
| return bool(value.strip()) | |
| return True | |