mesa-react / backend /app /services /pesquisa_service.py
Guilherme Silberfarb Costa
Refine map markers and elaboracao diagnostics
1aafde3
from __future__ import annotations
from branca.element import Element
import json
from html import escape
import math
import re
import unicodedata
from dataclasses import dataclass
from datetime import date, datetime
from pathlib import Path
from threading import Event, Lock
from typing import Any
import folium
from folium import plugins
import numpy as np
import pandas as pd
from fastapi import HTTPException
from joblib import load
from app.core.elaboracao import geocodificacao
from app.core.shapefile_runtime import load_attribute_records
from app.core.elaboracao.core import _migrar_pacote_v1_para_v2, normalizar_observacao_modelo
from app.core.map_layers import (
add_bairros_layer,
add_marker_payloads,
apply_marker_payload_jitter,
build_trabalhos_tecnicos_marker_payloads,
add_zoom_responsive_circle_markers,
)
from app.core.visualizacao.map_payload import build_leaflet_payload
from app.runtime_log import append_runtime_log
from app.runtime_paths import resolve_core_path
from app.services import model_repository, trabalhos_tecnicos_service
from app.services.serializers import sanitize_value
AREA_PRIVATIVA_ALIASES = ["APRIV", "APRIVEQ", "ATPRIV", "ACOPRIV", "AREAPRIV", "AREA_PRIVATIVA", "AREA PRIVATIVA"]
AREA_TOTAL_ALIASES = ["ATTOTAL", "ATOTAL", "ATOT", "AREA_TOTAL", "AREA TOTAL", "AREA"]
AREA_GERAL_ALIASES = AREA_PRIVATIVA_ALIASES + AREA_TOTAL_ALIASES + ["ACONST", "ALOC"]
VALOR_UNITARIO_ALIASES = ["VU", "VUNIT", "VULOC", "VUAPRIV", "VUNIPRIV", "VUACONST", "VALOR_UNITARIO", "VALOR UNITARIO"]
VALOR_TOTAL_ALIASES = ["VLOC", "VTOT", "VTOTAL", "VALOR_TOTAL", "VALOR TOTAL"]
RH_ALIASES = ["RH", "FATOR_RH", "FATOR RH", "RENDA_HABITACIONAL"]
DATA_ALIASES = ["DATA", "DT", "RDATA", "AVALDT", "DT ENC", "DT EST", "DT REG", "DATA_AVALIACAO", "DATA AVALIACAO", "COMPETENCIA"]
BAIRRO_ALIASES = ["BAIRRO", "BAIRROS", "NOME_BAIRRO", "BAIRRO_NOME", "NME BAI", "NME_BAI"]
FINALIDADE_ALIASES = ["NME IMO-FINAL", "NME_IMO_FINAL", "NME IMO FINAL", "FINALIDADE", "TIPO_IMOVEL", "TIPO IMOVEL"]
LAT_ALIASES = ["LAT", "LATITUDE", "SIAT_LATITUDE"]
LON_ALIASES = ["LON", "LONG", "LONGITUDE", "SIAT_LONGITUDE"]
APP_ALIASES = ["% APP", "%APP"]
TIPO_POR_TOKEN = {
"RECOND": "Residencia em condominio",
"RCOMD": "Residencia em condominio",
"TCOND": "Terreno em condominio",
"LCOM": "Loja",
"LOJA": "Loja",
"CCOM": "Casa comercial",
"DEP": "Deposito",
"DEPOS": "Deposito",
"RES": "Residencias isoladas / casas",
"SALA": "Salas comerciais",
"APTO": "Apartamentos",
"APART": "Apartamentos",
"AP": "Apartamentos",
"TERRENO": "Terrenos",
"TER": "Terrenos",
"EDIF": "Edificio",
"CASA": "Residencias isoladas / casas",
"GALPAO": "Galpao",
}
CAMPO_TEXTO_META_FONTES = {
"finalidade": [],
"bairros": [],
"aval_finalidade": [],
"aval_bairro": [],
}
CAMPO_TEXTO_ALIASES_COLUNA = {
"finalidade": FINALIDADE_ALIASES,
"bairros": BAIRRO_ALIASES,
"aval_finalidade": FINALIDADE_ALIASES,
"aval_bairro": BAIRRO_ALIASES,
}
CAMPO_FAIXA_META_FONTES = {
"data": ["meta:faixa_data"],
"area": [],
"rh": [],
"aval_data": [],
"aval_area": [],
"aval_area_privativa": [],
"aval_area_total": [],
"aval_rh": [],
"aval_valor_unitario": [],
"aval_valor_total": [],
}
CAMPO_FAIXA_ALIASES_COLUNA = {}
CAMPO_FAIXA_ALIASES_VARIAVEL = {
"area": AREA_GERAL_ALIASES,
"rh": RH_ALIASES,
"aval_area": AREA_GERAL_ALIASES,
"aval_area_privativa": AREA_PRIVATIVA_ALIASES,
"aval_area_total": AREA_TOTAL_ALIASES,
"aval_rh": RH_ALIASES,
"aval_valor_unitario": VALOR_UNITARIO_ALIASES,
"aval_valor_total": VALOR_TOTAL_ALIASES,
}
FONTE_META_LABELS = {
"meta:nome_modelo": "Metadado: Nome do modelo",
"meta:arquivo": "Metadado: Arquivo",
"meta:autor": "Metadado: Autor/Elaborador",
"meta:finalidade": "Metadado: Finalidade",
"meta:tipo_imovel": "Metadado: Tipo",
"meta:finalidades": "Metadado: Finalidades",
"meta:bairros": "Metadado: Bairros",
"meta:endereco_referencia": "Metadado: Endereco de referencia",
"meta:faixa_data": "Metadado: Faixa de data",
"meta:faixa_area": "Metadado: Faixa de area",
"meta:faixa_rh": "Metadado: Faixa de RH",
"meta:faixa_area_privativa": "Metadado: Faixa de area privativa",
"meta:faixa_area_total": "Metadado: Faixa de area total",
"meta:faixa_valor_unitario": "Metadado: Faixa de valor unitario",
"meta:faixa_valor_total": "Metadado: Faixa de valor total",
}
MAX_COLUNAS_INDEXADAS = 120
MAX_VALORES_INDEXADOS_POR_COLUNA = 140
MAX_LINHAS_INDEXACAO = 5000
COMPATIBILIDADE_MAP = {
"area_privativa": AREA_PRIVATIVA_ALIASES,
"area_total": AREA_TOTAL_ALIASES,
"valor_unitario": VALOR_UNITARIO_ALIASES,
"valor_total": VALOR_TOTAL_ALIASES,
}
RANGE_CAMPOS = {
"area_privativa": AREA_PRIVATIVA_ALIASES,
"area_total": AREA_TOTAL_ALIASES,
"valor_unitario": VALOR_UNITARIO_ALIASES,
"valor_total": VALOR_TOTAL_ALIASES,
}
MAP_COLORS = [
"#1f77b4",
"#2ca02c",
"#ff7f0e",
"#9467bd",
"#17becf",
"#bcbd22",
"#4c78a8",
"#54a24b",
"#72b7b2",
"#f2a541",
"#6c9bd2",
]
DISTANCIA_METRIC_CRS = "EPSG:31982"
GEOMETRIA_CACHE_VERSION = 2
TRABALHOS_TECNICOS_MODELOS_SELECIONADOS = "selecionados"
TRABALHOS_TECNICOS_MODELOS_SELECIONADOS_E_OUTRAS_VERSOES = "selecionados_e_outras_versoes"
TRABALHOS_TECNICOS_MODELOS_SELECIONADOS_E_ANTERIORES = "selecionados_e_anteriores"
TRABALHOS_TECNICOS_PROXIMIDADE_DESATIVADA = "sem_proximidade"
TRABALHOS_TECNICOS_PROXIMIDADE_ATIVADA = "proximos_ao_avaliando"
TRABALHOS_TECNICOS_RAIO_PADRAO_M = 1000
TRABALHOS_TECNICOS_RAIO_MAX_M = 5000
CRITERIO_ESPACIAL_MENOR_DISTANCIA = "menor_distancia"
CRITERIO_ESPACIAL_MEDIA_DISTANCIA = "media_distancia"
CRITERIO_ESPACIAL_MAIOR_DISTANCIA = "maior_distancia"
CRITERIO_ESPACIAL_PADRAO = CRITERIO_ESPACIAL_MAIOR_DISTANCIA
VERSAO_MODELO_RE = re.compile(r"^(?P<base>.*?)(?P<sufixo>[A-Za-z])$")
TIPO_LOGRADOURO_ABREV = {
"AC": "AC",
"AL": "AL",
"AV": "AV",
"BCO": "BCO",
"ESC": "ESC",
"EST": "EST",
"LGO": "LGO",
"PASS": "PASS",
"PCA": "PCA",
"PC": "PCA",
"R": "R",
"ROD": "ROD",
"TRAV": "TRAV",
"TV": "TV",
"VL": "VL",
}
TIPO_LOGRADOURO_EXPANDIDO = {
"AC": "ACESSO",
"AL": "ALAMEDA",
"AV": "AVENIDA",
"BCO": "BECO",
"ESC": "ESCADARIA",
"EST": "ESTRADA",
"LGO": "LARGO",
"PASS": "PASSAGEM",
"PCA": "PRACA",
"PC": "PRACA",
"R": "RUA",
"ROD": "RODOVIA",
"TRAV": "TRAVESSA",
"TV": "TRAVESSA",
"VL": "VILA",
}
@dataclass(frozen=True)
class PesquisaFiltros:
otica: str = "avaliando"
nome: str | None = None
autor: str | None = None
contem_app: str | None = None
tipo_modelo: str | None = None
negociacao_modelo: str | None = None
finalidade: str | None = None
finalidade_colunas: list[str] | None = None
bairro: str | None = None
bairros: list[str] | None = None
bairros_colunas: list[str] | None = None
endereco: str | None = None
data_min: str | None = None
data_colunas: list[str] | None = None
data_max: str | None = None
area_min: float | None = None
area_colunas: list[str] | None = None
area_max: float | None = None
rh_min: float | None = None
rh_colunas: list[str] | None = None
rh_max: float | None = None
aval_finalidade: str | None = None
aval_finalidade_colunas: list[str] | None = None
aval_zona: str | None = None
aval_bairro: str | None = None
aval_bairro_colunas: list[str] | None = None
aval_endereco: str | None = None
aval_data: str | None = None
aval_data_colunas: list[str] | None = None
aval_area: float | None = None
aval_area_colunas: list[str] | None = None
aval_area_privativa: float | None = None
aval_area_privativa_colunas: list[str] | None = None
aval_area_total: float | None = None
aval_area_total_colunas: list[str] | None = None
aval_rh: float | None = None
aval_rh_colunas: list[str] | None = None
aval_valor_unitario: float | None = None
aval_valor_unitario_colunas: list[str] | None = None
aval_valor_total: float | None = None
aval_valor_total_colunas: list[str] | None = None
aval_lat: float | None = None
aval_lon: float | None = None
avaliandos_geo: list[dict[str, Any]] | None = None
somente_versoes_atuais: bool = True
_CACHE_LOCK = Lock()
_CACHE: dict[str, dict[str, Any]] = {}
_ADMIN_CONFIG_LOCK = Lock()
_CACHE_SOURCE_SIGNATURE: str | None = None
_ADMIN_FONTES_SESSION: dict[str, list[str]] = {}
_CATALOGO_VIAS_CACHE: list[dict[str, Any]] | None = None
_FAMILIAS_VERSOES_CACHE: dict[str, list[dict[str, Any]]] | None = None
_FAMILIAS_VERSOES_SIGNATURE: tuple[str, tuple[tuple[str, int, int], ...]] | None = None
_FAMILIAS_VERSOES_INFLIGHT: dict[tuple[str, tuple[tuple[str, int, int], ...]], Event] = {}
def _resolver_repositorio_modelos() -> model_repository.ModelRepositoryResolution:
global _CACHE_SOURCE_SIGNATURE, _FAMILIAS_VERSOES_CACHE, _FAMILIAS_VERSOES_SIGNATURE
resolved = model_repository.resolve_model_repository()
with _CACHE_LOCK:
if _CACHE_SOURCE_SIGNATURE != resolved.signature:
_CACHE.clear()
_CACHE_SOURCE_SIGNATURE = resolved.signature
_FAMILIAS_VERSOES_CACHE = None
_FAMILIAS_VERSOES_SIGNATURE = None
return resolved
def ensure_modelos_dir() -> Path:
return _resolver_repositorio_modelos().modelos_dir
def obter_admin_config_pesquisa() -> dict[str, Any]:
resolved = _resolver_repositorio_modelos()
pasta = resolved.modelos_dir
modelos = sorted(pasta.glob("*.dai"), key=lambda item: item.name.lower())
todos = [_carregar_resumo_com_cache(caminho) for caminho in modelos]
colunas_filtro = _montar_config_colunas_filtro(todos)
admin_fontes = _carregar_fontes_admin(colunas_filtro)
return sanitize_value(
{
"colunas_filtro": colunas_filtro,
"admin_fontes": admin_fontes,
"total_modelos": len(todos),
"fonte_modelos": resolved.as_payload(),
}
)
def salvar_admin_config_pesquisa(campos: dict[str, list[str]] | None) -> dict[str, Any]:
resolved = _resolver_repositorio_modelos()
pasta = resolved.modelos_dir
modelos = sorted(pasta.glob("*.dai"), key=lambda item: item.name.lower())
todos = [_carregar_resumo_com_cache(caminho) for caminho in modelos]
colunas_filtro = _montar_config_colunas_filtro(todos)
admin_fontes = _normalizar_fontes_admin(campos or {}, colunas_filtro)
_salvar_fontes_admin_sessao(admin_fontes)
return sanitize_value(
{
"colunas_filtro": colunas_filtro,
"admin_fontes": admin_fontes,
"total_modelos": len(todos),
"status": "Configuracao de busca aplicada para a sessao atual.",
"fonte_modelos": resolved.as_payload(),
}
)
def listar_modelos(filtros: PesquisaFiltros, limite: int | None = None, somente_contexto: bool = False) -> dict[str, Any]:
resolved = _resolver_repositorio_modelos()
pasta = resolved.modelos_dir
modelos = sorted(pasta.glob("*.dai"), key=lambda item: item.name.lower())
otica = _normalizar_otica(filtros.otica)
filtros_exec = PesquisaFiltros(**{**filtros.__dict__, "otica": otica})
todos = [_carregar_resumo_com_cache(caminho) for caminho in modelos]
colunas_filtro = _montar_config_colunas_filtro(todos)
admin_fontes = _carregar_fontes_admin(colunas_filtro)
sugestoes = _extrair_sugestoes(todos, admin_fontes, incluir_logradouros_eixos=False)
aval_lat, aval_lon = _normalizar_coordenadas_avaliando(filtros_exec.aval_lat, filtros_exec.aval_lon)
avaliandos_geo = _normalizar_avaliandos_geo(filtros_exec.avaliandos_geo)
if (aval_lat is None or aval_lon is None) and len(avaliandos_geo) == 1:
aval_lat, aval_lon = _normalizar_coordenadas_avaliando(avaliandos_geo[0].get("lat"), avaliandos_geo[0].get("lon"))
ids_versoes_antigas = _ids_versoes_antigas(todos) if filtros_exec.somente_versoes_atuais else set()
if somente_contexto:
return sanitize_value(
{
"modelos": [],
"sugestoes": sugestoes,
"colunas_filtro": colunas_filtro,
"total_filtrado": 0,
"total_geral": len(todos),
"modelos_dir": str(pasta),
"fonte_modelos": resolved.as_payload(),
"admin_fontes": admin_fontes,
"filtros_aplicados": {
"nome": filtros.nome,
"autor": filtros.autor,
"contem_app": filtros.contem_app,
"tipo_modelo": filtros.tipo_modelo,
"negociacao_modelo": filtros.negociacao_modelo,
"finalidade": filtros.finalidade,
"finalidade_colunas": filtros.finalidade_colunas or [],
"bairro": filtros.bairro,
"bairros": filtros.bairros or [],
"bairros_colunas": filtros.bairros_colunas or [],
"endereco": filtros.endereco,
"data_min": filtros.data_min,
"data_colunas": filtros.data_colunas or [],
"data_max": filtros.data_max,
"area_min": filtros.area_min,
"area_colunas": filtros.area_colunas or [],
"area_max": filtros.area_max,
"rh_min": filtros.rh_min,
"rh_colunas": filtros.rh_colunas or [],
"rh_max": filtros.rh_max,
"otica": otica,
"aval_finalidade": filtros.aval_finalidade,
"aval_finalidade_colunas": filtros.aval_finalidade_colunas or [],
"aval_zona": filtros.aval_zona,
"aval_bairro": filtros.aval_bairro,
"aval_bairro_colunas": filtros.aval_bairro_colunas or [],
"aval_endereco": filtros.aval_endereco,
"aval_data": filtros.aval_data,
"aval_data_colunas": filtros.aval_data_colunas or [],
"aval_area": filtros.aval_area,
"aval_area_colunas": filtros.aval_area_colunas or [],
"aval_area_privativa": filtros.aval_area_privativa,
"aval_area_privativa_colunas": filtros.aval_area_privativa_colunas or [],
"aval_area_total": filtros.aval_area_total,
"aval_area_total_colunas": filtros.aval_area_total_colunas or [],
"aval_rh": filtros.aval_rh,
"aval_rh_colunas": filtros.aval_rh_colunas or [],
"aval_valor_unitario": filtros.aval_valor_unitario,
"aval_valor_unitario_colunas": filtros.aval_valor_unitario_colunas or [],
"aval_valor_total": filtros.aval_valor_total,
"aval_valor_total_colunas": filtros.aval_valor_total_colunas or [],
"aval_lat": aval_lat,
"aval_lon": aval_lon,
"avaliandos_geo": avaliandos_geo,
"somente_versoes_atuais": bool(filtros_exec.somente_versoes_atuais),
},
}
)
filtrados = [item for item in todos if _aceita_filtros(item, filtros_exec, admin_fontes)]
if otica == "avaliando":
filtrados = [_anexar_avaliando_info(item, filtros_exec, admin_fontes) for item in filtrados]
filtrados = [item for item in filtrados if item.get("avaliando", {}).get("aceito")]
if ids_versoes_antigas:
filtrados = [item for item in filtrados if str(item.get("id") or "") not in ids_versoes_antigas]
if len(avaliandos_geo) > 1:
filtrados = [_anexar_distancias_modelo_multiplos(item, avaliandos_geo) for item in filtrados]
filtrados.sort(
key=lambda item: (
item.get("distancia_resumo", {}).get("principal_distancia_km") is None,
float(item.get("distancia_resumo", {}).get("principal_distancia_km") or 0.0),
str(item.get("nome_modelo") or item.get("arquivo") or item.get("id") or "").lower(),
)
)
elif aval_lat is not None and aval_lon is not None:
filtrados = [_anexar_distancia_modelo(item, aval_lat, aval_lon) for item in filtrados]
filtrados.sort(
key=lambda item: (
item.get("distancia_km") is None,
float(item.get("distancia_km") or 0.0),
str(item.get("nome_modelo") or item.get("arquivo") or item.get("id") or "").lower(),
)
)
if limite and limite > 0:
filtrados = filtrados[:limite]
modelos_publicos = [_modelo_publico(item) for item in filtrados]
return sanitize_value(
{
"modelos": modelos_publicos,
"sugestoes": sugestoes,
"colunas_filtro": colunas_filtro,
"total_filtrado": len(filtrados),
"total_geral": len(todos),
"modelos_dir": str(pasta),
"fonte_modelos": resolved.as_payload(),
"admin_fontes": admin_fontes,
"filtros_aplicados": {
"nome": filtros.nome,
"autor": filtros.autor,
"contem_app": filtros.contem_app,
"tipo_modelo": filtros.tipo_modelo,
"negociacao_modelo": filtros.negociacao_modelo,
"finalidade": filtros.finalidade,
"finalidade_colunas": filtros.finalidade_colunas or [],
"bairro": filtros.bairro,
"bairros": filtros.bairros or [],
"bairros_colunas": filtros.bairros_colunas or [],
"endereco": filtros.endereco,
"data_min": filtros.data_min,
"data_colunas": filtros.data_colunas or [],
"data_max": filtros.data_max,
"area_min": filtros.area_min,
"area_colunas": filtros.area_colunas or [],
"area_max": filtros.area_max,
"rh_min": filtros.rh_min,
"rh_colunas": filtros.rh_colunas or [],
"rh_max": filtros.rh_max,
"otica": otica,
"aval_finalidade": filtros.aval_finalidade,
"aval_finalidade_colunas": filtros.aval_finalidade_colunas or [],
"aval_zona": filtros.aval_zona,
"aval_bairro": filtros.aval_bairro,
"aval_bairro_colunas": filtros.aval_bairro_colunas or [],
"aval_endereco": filtros.aval_endereco,
"aval_data": filtros.aval_data,
"aval_data_colunas": filtros.aval_data_colunas or [],
"aval_area": filtros.aval_area,
"aval_area_colunas": filtros.aval_area_colunas or [],
"aval_area_privativa": filtros.aval_area_privativa,
"aval_area_privativa_colunas": filtros.aval_area_privativa_colunas or [],
"aval_area_total": filtros.aval_area_total,
"aval_area_total_colunas": filtros.aval_area_total_colunas or [],
"aval_rh": filtros.aval_rh,
"aval_rh_colunas": filtros.aval_rh_colunas or [],
"aval_valor_unitario": filtros.aval_valor_unitario,
"aval_valor_unitario_colunas": filtros.aval_valor_unitario_colunas or [],
"aval_valor_total": filtros.aval_valor_total,
"aval_valor_total_colunas": filtros.aval_valor_total_colunas or [],
"aval_lat": aval_lat,
"aval_lon": aval_lon,
"avaliandos_geo": avaliandos_geo,
"somente_versoes_atuais": bool(filtros_exec.somente_versoes_atuais),
},
}
)
def resolver_localizacao_avaliando(
*,
latitude: float | None = None,
longitude: float | None = None,
logradouro: str | None = None,
numero: Any = None,
cdlog: Any = None,
) -> dict[str, Any]:
lat, lon = _normalizar_coordenadas_avaliando(latitude, longitude)
if lat is not None and lon is not None:
return sanitize_value(
{
"lat": lat,
"lon": lon,
"origem": "coordenadas",
"status": f"Coordenadas do avaliando definidas em {lat:.6f}, {lon:.6f}.",
}
)
numero_int = _to_int_or_none(numero)
if numero_int is None or numero_int <= 0:
raise HTTPException(status_code=400, detail="Informe um numero valido para localizar o avaliando")
cdlog_int = _to_int_or_none(cdlog)
via_resolvida = None
if cdlog_int is None:
via_resolvida = _resolver_via_por_logradouro(logradouro)
cdlog_int = via_resolvida["cdlog"]
else:
via_resolvida = _resolver_via_por_cdlog(cdlog_int)
df_base = pd.DataFrame([{"CDLOG": cdlog_int, "NUMERO": numero_int}])
df_geo, df_falhas, ajustados = geocodificacao.geocodificar(df_base, "CDLOG", "NUMERO", auto_200=False)
lat = _to_float_or_none(df_geo.iloc[0].get("lat")) if not df_geo.empty else None
lon = _to_float_or_none(df_geo.iloc[0].get("lon")) if not df_geo.empty else None
lat, lon = _normalizar_coordenadas_avaliando(lat, lon)
if lat is None or lon is None:
sugestoes = ""
motivo = "Nao foi possivel localizar o endereco informado nos eixos."
if not df_falhas.empty:
linha_falha = df_falhas.iloc[0]
sugestoes = str(linha_falha.get("sugestoes") or "").strip()
motivo = str(linha_falha.get("motivo") or motivo).strip() or motivo
detalhe = motivo
if sugestoes:
detalhe = f"{motivo} Sugestoes de numeracao: {sugestoes}."
raise HTTPException(status_code=400, detail=detalhe)
numero_usado = numero_int
if ajustados:
try:
numero_usado = int(ajustados[0].get("numero_usado") or numero_int)
except Exception:
numero_usado = numero_int
logradouro_resolvido = str(via_resolvida.get("logradouro") or "").strip()
status = f"Endereco localizado em {logradouro_resolvido}, {numero_usado}."
if numero_usado != numero_int:
status += f" Numero interpolado mais proximo: {numero_usado}."
return sanitize_value(
{
"lat": lat,
"lon": lon,
"origem": "eixos",
"cdlog": cdlog_int,
"logradouro": logradouro_resolvido,
"numero_informado": numero_int,
"numero_usado": numero_usado,
"status": status,
}
)
def _ids_versoes_antigas(modelos: list[dict[str, Any]]) -> set[str]:
grupos: dict[str, list[tuple[str, str]]] = {}
for modelo in modelos:
versao_info = _extrair_info_versao_modelo(modelo)
if versao_info is None:
continue
base_norm, sufixo, modelo_id = versao_info
grupos.setdefault(base_norm, []).append((sufixo, modelo_id))
ids_antigos: set[str] = set()
for itens in grupos.values():
if len(itens) < 2:
continue
sufixos_distintos = {sufixo for sufixo, _modelo_id in itens}
if len(sufixos_distintos) < 2:
continue
ultimo_sufixo = max(sufixos_distintos)
for sufixo, modelo_id in itens:
if sufixo != ultimo_sufixo:
ids_antigos.add(modelo_id)
return ids_antigos
def _extrair_info_versao_modelo(modelo: dict[str, Any]) -> tuple[str, str, str] | None:
modelo_id = _str_or_none(modelo.get("id"))
if not modelo_id:
return None
candidatos = [
_texto_nome_modelo_sugestao(modelo.get("nome_modelo")),
_texto_nome_modelo_sugestao(modelo.get("arquivo")),
_texto_nome_modelo_sugestao(modelo.get("id")),
]
for candidato in candidatos:
info = _extrair_info_familia_modelo(candidato)
if info is None:
continue
base_norm, _base_texto, sufixo = info
if not sufixo:
continue
return (base_norm, sufixo, modelo_id)
return None
def _extrair_info_familia_modelo(value: Any) -> tuple[str, str, str] | None:
texto = _texto_nome_modelo_sugestao(value)
if not texto:
return None
match = VERSAO_MODELO_RE.match(texto)
if match:
base_texto = str(match.group("base") or "").strip()
sufixo = str(match.group("sufixo") or "").upper()
base_norm = _normalize(base_texto)
if base_norm and base_texto:
return (base_norm, base_texto, sufixo)
base_norm = _normalize(texto)
if not base_norm:
return None
return (base_norm, texto, "")
def _montar_familias_versoes_modelos(caminhos_modelo: list[Path]) -> dict[str, list[dict[str, Any]]]:
familias: dict[str, list[dict[str, Any]]] = {}
for caminho in caminhos_modelo:
info = _extrair_info_versao_modelo(
{
"id": caminho.stem,
"arquivo": caminho.name,
"nome_modelo": caminho.stem,
}
)
if info is None:
continue
base_norm, sufixo, modelo_id = info
familias.setdefault(base_norm, []).append(
{
"sufixo": sufixo,
"id": modelo_id,
"arquivo": caminho.name,
"nome_modelo": modelo_id,
}
)
for itens in familias.values():
itens.sort(key=lambda item: str(item.get("sufixo") or ""))
return familias
def _assinatura_modelos_familias(caminhos_modelo: list[Path]) -> tuple[tuple[str, int, int], ...]:
assinatura: list[tuple[str, int, int]] = []
for caminho in sorted(caminhos_modelo, key=lambda item: item.name.lower()):
try:
stat = caminho.stat()
except OSError:
continue
assinatura.append((caminho.name, int(stat.st_mtime_ns), int(stat.st_size)))
return tuple(assinatura)
def obter_familias_versoes_modelos_cache(caminhos_modelo: list[Path] | None = None) -> dict[str, list[dict[str, Any]]]:
global _FAMILIAS_VERSOES_CACHE, _FAMILIAS_VERSOES_SIGNATURE
resolved = _resolver_repositorio_modelos()
caminhos = list(caminhos_modelo) if caminhos_modelo is not None else list(resolved.modelos_dir.glob("*.dai"))
assinatura = (resolved.signature, _assinatura_modelos_familias(caminhos))
while True:
with _CACHE_LOCK:
if _FAMILIAS_VERSOES_SIGNATURE == assinatura and isinstance(_FAMILIAS_VERSOES_CACHE, dict):
return {
chave: [dict(item) for item in itens]
for chave, itens in _FAMILIAS_VERSOES_CACHE.items()
}
inflight = _FAMILIAS_VERSOES_INFLIGHT.get(assinatura)
if inflight is None:
inflight = Event()
_FAMILIAS_VERSOES_INFLIGHT[assinatura] = inflight
is_builder = True
else:
is_builder = False
if is_builder:
try:
familias = _montar_familias_versoes_modelos(caminhos)
with _CACHE_LOCK:
_FAMILIAS_VERSOES_SIGNATURE = assinatura
_FAMILIAS_VERSOES_CACHE = {
chave: [dict(item) for item in itens]
for chave, itens in familias.items()
}
return {
chave: [dict(item) for item in itens]
for chave, itens in familias.items()
}
finally:
with _CACHE_LOCK:
waiter = _FAMILIAS_VERSOES_INFLIGHT.pop(assinatura, None)
if waiter is not None:
waiter.set()
inflight.wait()
def _agrupar_nomes_modelo_por_familia(values: list[Any]) -> dict[str, list[str]]:
familias: dict[str, list[str]] = {}
for value in values:
texto = _texto_nome_modelo_sugestao(value)
if not texto:
continue
info = _extrair_info_familia_modelo(texto)
if info is None:
continue
base_norm, _base_texto, _sufixo = info
_append_aliases_unicos(familias.setdefault(base_norm, []), texto)
return familias
def _append_aliases_unicos(destino: list[str], *values: Any) -> list[str]:
vistos = {str(item or "").strip().casefold() for item in destino if str(item or "").strip()}
for value in values:
texto = str(value or "").strip()
chave = texto.casefold()
if not texto or chave in vistos:
continue
vistos.add(chave)
destino.append(texto)
return destino
def _resolver_aliases_modelo_para_trabalhos_tecnicos(
*,
modelo_id: str,
caminho: Path,
nome_modelo: str,
incluir_outras_versoes: bool,
familias_versoes: dict[str, list[dict[str, Any]]] | None = None,
familias_trabalhos_tecnicos: dict[str, list[str]] | None = None,
) -> list[str]:
aliases: list[str] = []
referencias = [modelo_id, caminho.name, nome_modelo]
_append_aliases_unicos(aliases, *referencias)
if not incluir_outras_versoes:
return aliases
familias_selecionadas = _agrupar_nomes_modelo_por_familia(referencias)
for nomes_familia in familias_selecionadas.values():
for nome_familia in nomes_familia:
info_familia = _extrair_info_familia_modelo(nome_familia)
if info_familia is None:
continue
_append_aliases_unicos(aliases, info_familia[1], f"{info_familia[1]}.dai")
for base_norm in familias_selecionadas:
for item in (familias_versoes or {}).get(base_norm, []):
_append_aliases_unicos(
aliases,
item.get("id"),
item.get("arquivo"),
item.get("nome_modelo"),
)
_append_aliases_unicos(aliases, *((familias_trabalhos_tecnicos or {}).get(base_norm) or []))
return aliases
def _descricao_modo_trabalhos_tecnicos_modelos(value: str) -> str:
if value == TRABALHOS_TECNICOS_MODELOS_SELECIONADOS_E_OUTRAS_VERSOES:
return "dos modelos selecionados e de outras versões do mesmo modelo"
return "dos modelos selecionados"
def _modo_trabalhos_tecnicos_inclui_outras_versoes(value: str) -> bool:
return value == TRABALHOS_TECNICOS_MODELOS_SELECIONADOS_E_OUTRAS_VERSOES
def _normalizar_familias_trabalhos_tecnicos_modelos() -> dict[str, list[str]]:
return _agrupar_nomes_modelo_por_familia(trabalhos_tecnicos_service.listar_nomes_modelos_relacionados())
def _chave_unica_trabalho_tecnico(item: dict[str, Any]) -> tuple[str, str, str, str, str, str]:
return (
str(item.get("trabalho_id") or "").strip(),
str(item.get("coord_lat") or "").strip(),
str(item.get("coord_lon") or "").strip(),
str(item.get("label") or "").strip(),
str(item.get("endereco") or "").strip(),
str(item.get("numero") or "").strip(),
)
def _listar_avaliandos_tecnicos_proximos_por_avaliandos(
avaliandos_geo: list[dict[str, Any]] | None,
raio_m: int,
chaves_modelos_selecionados: list[str] | None = None,
) -> list[dict[str, Any]]:
agregados: dict[tuple[str, str, str, str, str, str], dict[str, Any]] = {}
for idx, avaliando in enumerate(avaliandos_geo or []):
lat, lon = _normalizar_coordenadas_avaliando(avaliando.get("lat"), avaliando.get("lon"))
if lat is None or lon is None:
continue
label = str(avaliando.get("label") or f"A{idx + 1}").strip() or f"A{idx + 1}"
avaliando_id = str(avaliando.get("id") or label or f"avaliando-{idx + 1}").strip() or f"avaliando-{idx + 1}"
proximos = trabalhos_tecnicos_service.listar_avaliandos_proximos(
lat,
lon,
raio_m,
chaves_modelo_excluir=chaves_modelos_selecionados,
)
for item in proximos or []:
chave = _chave_unica_trabalho_tecnico(item)
distancia_m = _to_float_or_none(item.get("distancia_m"))
distancia_label = str(item.get("distancia_label") or "").strip() or None
proximidade_avaliando = {
"id": avaliando_id,
"label": label,
"distancia_m": distancia_m,
"distancia_label": distancia_label,
}
atual = agregados.get(chave)
if atual is None:
novo_item = dict(item)
novo_item["avaliandos_proximos"] = [proximidade_avaliando]
novo_item["distancia_m_min"] = distancia_m
novo_item["distancia_label_min"] = distancia_label
agregados[chave] = novo_item
continue
proximidades_existentes = atual.setdefault("avaliandos_proximos", [])
if not any(str(entry.get("id") or "").strip() == avaliando_id for entry in proximidades_existentes):
proximidades_existentes.append(proximidade_avaliando)
distancia_atual = _to_float_or_none(atual.get("distancia_m_min"))
if distancia_m is not None and (distancia_atual is None or distancia_m < distancia_atual):
atual["distancia_m_min"] = distancia_m
atual["distancia_label_min"] = distancia_label
consolidado = list(agregados.values())
for item in consolidado:
proximidades = item.get("avaliandos_proximos") if isinstance(item.get("avaliandos_proximos"), list) else []
proximidades.sort(
key=lambda entry: (
str(entry.get("label") or "").casefold(),
float(entry.get("distancia_m") or 0.0),
)
)
item["avaliandos_proximos"] = proximidades
consolidado.sort(
key=lambda item: (
float(item.get("distancia_m_min") or 0.0),
str(item.get("trabalho_nome") or "").casefold(),
str(item.get("label") or item.get("endereco") or "").casefold(),
)
)
return sanitize_value(consolidado)
def _consolidar_trabalhos_tecnicos_modelos(modelos_plotados: list[dict[str, Any]]) -> list[dict[str, Any]]:
agregados: dict[tuple[str, str, str, str, str, str], dict[str, Any]] = {}
for modelo in modelos_plotados or []:
modelo_id = str(modelo.get("id") or "").strip()
for item in (modelo.get("avaliandos_tecnicos") or []):
if not isinstance(item, dict):
continue
chave = _chave_unica_trabalho_tecnico(item)
atual = agregados.get(chave)
if atual is None:
novo_item = dict(item)
novo_item["modelos_relacionados"] = [str(valor).strip() for valor in (item.get("modelos_relacionados") or []) if str(valor).strip()]
novo_item["modelos_origem_ids"] = [modelo_id] if modelo_id else []
agregados[chave] = novo_item
continue
_append_aliases_unicos(
atual.setdefault("modelos_relacionados", []),
*[str(valor).strip() for valor in (item.get("modelos_relacionados") or []) if str(valor).strip()],
)
if modelo_id:
_append_aliases_unicos(atual.setdefault("modelos_origem_ids", []), modelo_id)
for campo in ("trabalho_nome", "tipo_label", "label", "endereco", "numero"):
if not str(atual.get(campo) or "").strip() and str(item.get(campo) or "").strip():
atual[campo] = item.get(campo)
consolidado = list(agregados.values())
consolidado.sort(
key=lambda item: (
str(item.get("trabalho_nome") or "").casefold(),
str(item.get("label") or item.get("endereco") or "").casefold(),
str(item.get("numero") or "").casefold(),
)
)
return sanitize_value(consolidado)
def gerar_mapa_modelos(
modelos_ids: list[str],
limite_pontos_por_modelo: int = 0,
avaliando_lat: float | None = None,
avaliando_lon: float | None = None,
avaliandos: list[dict[str, Any]] | None = None,
modo_exibicao: str | None = "pontos",
criterio_espacial: str | None = CRITERIO_ESPACIAL_PADRAO,
trabalhos_tecnicos_modelos_modo: str | None = TRABALHOS_TECNICOS_MODELOS_SELECIONADOS,
trabalhos_tecnicos_proximidade_modo: str | None = TRABALHOS_TECNICOS_PROXIMIDADE_DESATIVADA,
trabalhos_tecnicos_raio_m: int | float | None = TRABALHOS_TECNICOS_RAIO_PADRAO_M,
) -> dict[str, Any]:
ids = [str(item).strip() for item in (modelos_ids or []) if str(item).strip()]
if not ids:
raise HTTPException(status_code=400, detail="Selecione ao menos um modelo para gerar o mapa")
pasta = ensure_modelos_dir()
caminhos_por_id = {caminho.stem: caminho for caminho in pasta.glob("*.dai")}
selecionados: list[tuple[str, Path]] = []
vistos = set()
for modelo_id in ids:
if modelo_id in vistos:
continue
caminho = caminhos_por_id.get(modelo_id)
if caminho is None:
continue
vistos.add(modelo_id)
selecionados.append((modelo_id, caminho))
if not selecionados:
raise HTTPException(status_code=404, detail="Nenhum modelo selecionado foi encontrado na pasta de pesquisa")
modo_exibicao_norm = _normalizar_modo_exibicao_mapa(modo_exibicao)
trabalhos_tecnicos_modelos_modo_norm = _normalizar_modo_trabalhos_tecnicos_modelos_mapa(trabalhos_tecnicos_modelos_modo)
trabalhos_tecnicos_proximidade_modo_norm = _normalizar_modo_trabalhos_tecnicos_proximidade_mapa(
trabalhos_tecnicos_proximidade_modo
)
trabalhos_tecnicos_raio_m_norm = _normalizar_raio_trabalhos_tecnicos_mapa(trabalhos_tecnicos_raio_m)
criterio_espacial_norm = _normalizar_criterio_espacial(criterio_espacial)
modelos_plotados: list[dict[str, Any]] = []
bounds: list[list[float]] = []
aval_lat, aval_lon = _normalizar_coordenadas_avaliando(avaliando_lat, avaliando_lon)
avaliandos_geo = _normalizar_avaliandos_geo(avaliandos)
if not avaliandos_geo and aval_lat is not None and aval_lon is not None:
avaliandos_geo = [{"id": "avaliando", "label": "Avaliando", "lat": aval_lat, "lon": aval_lon}]
avaliando_unico = avaliandos_geo[0] if len(avaliandos_geo) == 1 else None
if avaliando_unico is not None:
aval_lat, aval_lon = _normalizar_coordenadas_avaliando(avaliando_unico.get("lat"), avaliando_unico.get("lon"))
else:
aval_lat, aval_lon = None, None
chaves_modelos_selecionados: list[str] = []
familias_versoes = (
_montar_familias_versoes_modelos(list(caminhos_por_id.values()))
if _modo_trabalhos_tecnicos_inclui_outras_versoes(trabalhos_tecnicos_modelos_modo_norm)
else {}
)
familias_trabalhos_tecnicos = (
_normalizar_familias_trabalhos_tecnicos_modelos()
if _modo_trabalhos_tecnicos_inclui_outras_versoes(trabalhos_tecnicos_modelos_modo_norm)
else {}
)
for avaliando in avaliandos_geo:
lat_item, lon_item = _normalizar_coordenadas_avaliando(avaliando.get("lat"), avaliando.get("lon"))
if lat_item is None or lon_item is None:
continue
bounds.append([lat_item, lon_item])
for idx, (modelo_id, caminho) in enumerate(selecionados):
resumo = _carregar_resumo_com_cache(caminho)
df = _carregar_dataframe_modelo(caminho)
if df is None or df.empty:
continue
pontos = _coletar_pontos_modelo(df, limite_pontos_por_modelo)
if not pontos:
continue
geometria = _carregar_geometria_modelo_com_cache(caminho)
cor = MAP_COLORS[idx % len(MAP_COLORS)]
nome = str(resumo.get("nome_modelo") or modelo_id)
distancias_avaliandos, distancia_resumo = _calcular_distancias_avaliandos(
geometria,
avaliandos_geo,
criterio_espacial_norm,
)
distancia_info = (
{
"distancia_km": distancia_resumo.get("principal_distancia_km"),
"distancia_label": distancia_resumo.get("principal_distancia_label"),
}
if len(avaliandos_geo) > 1
else _calcular_distancia_geometria_cache(geometria, aval_lat, aval_lon)
)
aliases_modelo = _resolver_aliases_modelo_para_trabalhos_tecnicos(
modelo_id=modelo_id,
caminho=caminho,
nome_modelo=nome,
incluir_outras_versoes=_modo_trabalhos_tecnicos_inclui_outras_versoes(trabalhos_tecnicos_modelos_modo_norm),
familias_versoes=familias_versoes,
familias_trabalhos_tecnicos=familias_trabalhos_tecnicos,
)
_append_aliases_unicos(chaves_modelos_selecionados, *aliases_modelo)
avaliandos_tecnicos = trabalhos_tecnicos_service.listar_avaliandos_por_modelo(aliases_modelo)
modelos_plotados.append(
{
"id": modelo_id,
"nome": nome,
"cor": cor,
"total_pontos": len(pontos),
"pontos": pontos,
"geometria": geometria,
"distancia_km": distancia_info.get("distancia_km"),
"distancia_label": distancia_info.get("distancia_label"),
"distancias_avaliandos": distancias_avaliandos,
"distancia_resumo": distancia_resumo,
"avaliandos_tecnicos": avaliandos_tecnicos,
}
)
bounds.extend([[ponto["lat"], ponto["lon"]] for ponto in pontos])
bounds.extend(_extrair_bounds_geometria_wgs84(geometria))
avaliandos_tecnicos_proximos: list[dict[str, Any]] | None = None
if (
avaliandos_geo
and trabalhos_tecnicos_proximidade_modo_norm == TRABALHOS_TECNICOS_PROXIMIDADE_ATIVADA
):
avaliandos_tecnicos_proximos = _listar_avaliandos_tecnicos_proximos_por_avaliandos(
avaliandos_geo,
trabalhos_tecnicos_raio_m_norm,
chaves_modelos_selecionados,
)
for item in avaliandos_tecnicos_proximos or []:
prox_lat, prox_lon = _normalizar_coordenadas_avaliando(item.get("coord_lat"), item.get("coord_lon"))
if prox_lat is None or prox_lon is None:
continue
bounds.append([prox_lat, prox_lon])
if not modelos_plotados:
raise HTTPException(
status_code=400,
detail="Nao foi possivel gerar o mapa: os modelos selecionados nao possuem coordenadas validas",
)
total_pontos = 0
for modelo in modelos_plotados:
total_pontos += int(modelo["total_pontos"])
mapa_payload = _build_mapa_modelos_payload(
modelos_plotados,
bounds,
avaliandos_geo,
modo_exibicao_norm,
avaliandos_tecnicos_proximos=avaliandos_tecnicos_proximos,
trabalhos_tecnicos_raio_m=trabalhos_tecnicos_raio_m_norm,
)
mapa_html = ""
if mapa_payload is None:
mapa_html = _renderizar_mapa_modelos(
modelos_plotados,
bounds,
avaliandos_geo,
modo_exibicao_norm,
avaliandos_tecnicos_proximos=avaliandos_tecnicos_proximos,
trabalhos_tecnicos_raio_m=trabalhos_tecnicos_raio_m_norm,
)
total_trabalhos_modelos = len(
{
_chave_unica_trabalho_tecnico(item)
for modelo in modelos_plotados
for item in (modelo.get("avaliandos_tecnicos") or [])
}
)
total_trabalhos_proximos = len(avaliandos_tecnicos_proximos or [])
descricao_modelos_status = _descricao_modo_trabalhos_tecnicos_modelos(trabalhos_tecnicos_modelos_modo_norm)
detalhe_trabalhos = (
f" Trabalhos técnicos {descricao_modelos_status}: {total_trabalhos_modelos}."
+ (
f" {'Próximos aos avaliandos' if len(avaliandos_geo) > 1 else 'Próximos ao avaliando'} (até {trabalhos_tecnicos_raio_m_norm} m): {total_trabalhos_proximos}."
if (
avaliandos_geo
and trabalhos_tecnicos_proximidade_modo_norm == TRABALHOS_TECNICOS_PROXIMIDADE_ATIVADA
)
else ""
)
)
return sanitize_value(
{
"mapa_html": mapa_html,
"mapa_html_pontos": mapa_html if modo_exibicao_norm == "pontos" else "",
"mapa_html_cobertura": mapa_html if modo_exibicao_norm == "cobertura" else "",
"mapa_payload": mapa_payload,
"mapa_payload_pontos": mapa_payload if modo_exibicao_norm == "pontos" else None,
"mapa_payload_cobertura": mapa_payload if modo_exibicao_norm == "cobertura" else None,
"total_modelos_plotados": len(modelos_plotados),
"total_pontos": total_pontos,
"modelos_plotados": [
{
"id": modelo["id"],
"nome": modelo["nome"],
"cor": modelo["cor"],
"total_pontos": modelo["total_pontos"],
"distancia_km": modelo.get("distancia_km"),
"distancia_label": modelo.get("distancia_label"),
"distancia_resumo": modelo.get("distancia_resumo"),
"distancias_avaliandos": modelo.get("distancias_avaliandos"),
}
for modelo in modelos_plotados
],
"avaliando": {
"lat": aval_lat,
"lon": aval_lon,
"ativo": bool(aval_lat is not None and aval_lon is not None),
},
"avaliandos": avaliandos_geo,
"criterio_espacial": criterio_espacial_norm,
"modo_exibicao": modo_exibicao_norm,
"trabalhos_tecnicos_modelos_modo": trabalhos_tecnicos_modelos_modo_norm,
"trabalhos_tecnicos_proximidade_modo": trabalhos_tecnicos_proximidade_modo_norm,
"trabalhos_tecnicos_raio_m": trabalhos_tecnicos_raio_m_norm,
"total_trabalhos_tecnicos_modelos": total_trabalhos_modelos,
"total_trabalhos_tecnicos_proximos": total_trabalhos_proximos,
"status": (
f"Mapas de pontos e cobertura gerados com {len(modelos_plotados)} modelo(s) e {total_pontos} ponto(s)"
+ (
" com avaliando destacado."
if avaliando_unico is not None and aval_lat is not None and aval_lon is not None
else (f" com {len(avaliandos_geo)} avaliandos destacados." if len(avaliandos_geo) > 1 else ".")
)
+ detalhe_trabalhos
),
}
)
def _normalizar_modo_exibicao_mapa(value: Any) -> str:
modo = str(value or "").strip().lower()
if modo == "cobertura":
return "cobertura"
return "pontos"
def _normalizar_modo_trabalhos_tecnicos_modelos_mapa(value: Any) -> str:
modo = str(value or "").strip().lower()
if modo in {
TRABALHOS_TECNICOS_MODELOS_SELECIONADOS_E_OUTRAS_VERSOES,
TRABALHOS_TECNICOS_MODELOS_SELECIONADOS_E_ANTERIORES,
}:
return TRABALHOS_TECNICOS_MODELOS_SELECIONADOS_E_OUTRAS_VERSOES
return TRABALHOS_TECNICOS_MODELOS_SELECIONADOS
def _normalizar_modo_trabalhos_tecnicos_proximidade_mapa(value: Any) -> str:
modo = str(value or "").strip().lower()
if modo == TRABALHOS_TECNICOS_PROXIMIDADE_ATIVADA:
return TRABALHOS_TECNICOS_PROXIMIDADE_ATIVADA
return TRABALHOS_TECNICOS_PROXIMIDADE_DESATIVADA
def _normalizar_raio_trabalhos_tecnicos_mapa(value: Any) -> int:
numero = _to_float_or_none(value)
if numero is None:
return int(TRABALHOS_TECNICOS_RAIO_PADRAO_M)
if not np.isfinite(numero):
return int(TRABALHOS_TECNICOS_RAIO_PADRAO_M)
return max(0, min(int(TRABALHOS_TECNICOS_RAIO_MAX_M), int(round(numero))))
def _montar_tooltip_distancia_modelo(modelo: dict[str, Any]) -> str:
resumo = modelo.get("distancia_resumo") if isinstance(modelo.get("distancia_resumo"), dict) else {}
total_avaliandos = int(resumo.get("total_avaliandos") or 0)
if total_avaliandos > 1:
return (
"Resumo espacial"
f' • Maior: {resumo.get("maior_distancia_label") or "-"}'
f' • Media: {resumo.get("media_distancia_label") or "-"}'
f' • Menor: {resumo.get("menor_distancia_label") or "-"}'
)
if modelo.get("distancia_label"):
return f'Distancia: {modelo["distancia_label"]}'
return ""
def _tooltip_mapa_modelo_html(modelo: dict[str, Any]) -> str:
nome = str(modelo.get("nome") or "Modelo").strip() or "Modelo"
tooltip_distancia = _montar_tooltip_distancia_modelo(modelo)
if tooltip_distancia:
return (
"<div style='font-family:\"Segoe UI\",Arial,sans-serif; font-size:13px; line-height:1.5;'>"
f"<b>{escape(nome)}</b><br>{escape(tooltip_distancia)}"
"</div>"
)
return (
"<div style='font-family:\"Segoe UI\",Arial,sans-serif; font-size:13px; line-height:1.5;'>"
f"<b>{escape(nome)}</b>"
"</div>"
)
def _marker_payloads_avaliandos(avaliandos_geo: list[dict[str, Any]]) -> list[dict[str, Any]]:
payloads: list[dict[str, Any]] = []
for idx, avaliando in enumerate(avaliandos_geo):
lat_item, lon_item = _normalizar_coordenadas_avaliando(avaliando.get("lat"), avaliando.get("lon"))
if lat_item is None or lon_item is None:
continue
label = str(avaliando.get("label") or f"A{idx + 1}").strip() or f"A{idx + 1}"
endereco = str(avaliando.get("logradouro") or "").strip()
numero_usado = str(avaliando.get("numero_usado") or "").strip()
tooltip = label
if endereco:
tooltip = f"{tooltip}{endereco}{f', {numero_usado}' if numero_usado else ''}"
marker_html = (
"<div style='display:flex;align-items:center;justify-content:center;"
"width:30px;height:30px;filter:drop-shadow(0 2px 6px rgba(0,0,0,0.22));'>"
"<div style='display:flex;align-items:center;justify-content:center;"
"width:24px;height:24px;border-radius:999px;background:rgba(255,255,255,0.97);"
"box-shadow:0 0 0 1.4px rgba(255,255,255,0.98),0 0 0 2.5px rgba(0,0,0,0.74);'>"
"<svg viewBox='0 0 24 24' width='20' height='20' aria-hidden='true'>"
"<path d='M12 3.2L3.9 10v10.1h5.4v-5.3h5.4v5.3h5.4V10L12 3.2Z' "
"fill='#c62828' stroke='rgba(255,255,255,0.96)' stroke-width='1.2' stroke-linejoin='round'/>"
"<rect x='10.1' y='15.2' width='3.8' height='4.9' rx='0.5' fill='rgba(255,255,255,0.96)'/>"
"</svg>"
"</div>"
"</div>"
)
payloads.append(
{
"lat": float(lat_item),
"lon": float(lon_item),
"tooltip_html": escape(tooltip),
"marker_html": marker_html,
"icon_size": [30, 30],
"icon_anchor": [15, 15],
"class_name": "mesa-avaliando-marker",
}
)
return payloads
def _shapes_modelo_payload(
modelo: dict[str, Any],
aval_lat: float | None,
aval_lon: float | None,
) -> list[dict[str, Any]]:
geometria = modelo.get("geometria") or {}
geom_wgs84 = geometria.get("geom_wgs84")
if geom_wgs84 is None:
return []
tooltip = _tooltip_mapa_modelo_html(modelo)
cor = str(modelo.get("cor") or "#1f77b4")
geom_type = str(geometria.get("geom_type") or "")
shapes: list[dict[str, Any]] = []
try:
if geom_type == "Polygon":
coords = [[float(lat), float(lon)] for lon, lat in list(geom_wgs84.exterior.coords)]
shapes.append({"type": "polygon", "coords": coords, "color": cor, "fill": True, "fill_opacity": 0.12, "weight": 2, "tooltip_html": tooltip})
elif geom_type == "MultiPolygon":
for poligono in list(getattr(geom_wgs84, "geoms", [])):
coords = [[float(lat), float(lon)] for lon, lat in list(poligono.exterior.coords)]
shapes.append({"type": "polygon", "coords": coords, "color": cor, "fill": True, "fill_opacity": 0.12, "weight": 2, "tooltip_html": tooltip})
elif geom_type == "LineString":
coords = [[float(lat), float(lon)] for lon, lat in list(geom_wgs84.coords)]
shapes.append({"type": "polyline", "coords": coords, "color": cor, "weight": 3, "opacity": 0.8, "tooltip_html": tooltip})
elif geom_type == "MultiLineString":
for linha in list(getattr(geom_wgs84, "geoms", [])):
coords = [[float(lat), float(lon)] for lon, lat in list(linha.coords)]
shapes.append({"type": "polyline", "coords": coords, "color": cor, "weight": 3, "opacity": 0.8, "tooltip_html": tooltip})
elif geom_type == "Point":
shapes.append({"type": "circlemarker", "center": [float(geom_wgs84.y), float(geom_wgs84.x)], "radius": 6, "color": cor, "fill": True, "fill_opacity": 0.7, "tooltip_html": tooltip})
elif geom_type == "MultiPoint":
for ponto in list(getattr(geom_wgs84, "geoms", [])):
shapes.append({"type": "circlemarker", "center": [float(ponto.y), float(ponto.x)], "radius": 6, "color": cor, "fill": True, "fill_opacity": 0.7, "tooltip_html": tooltip})
except Exception:
return []
if aval_lat is None or aval_lon is None:
return shapes
try:
from shapely.geometry import Point
from shapely.ops import nearest_points
except ImportError:
return shapes
try:
aval_point = Point(float(aval_lon), float(aval_lat))
_, nearest_geom = nearest_points(aval_point, geom_wgs84)
distancia_km = _to_float_or_none(modelo.get("distancia_km"))
if nearest_geom is None or distancia_km is None or distancia_km <= 0:
return shapes
shapes.append(
{
"type": "polyline",
"coords": [
[float(aval_point.y), float(aval_point.x)],
[float(nearest_geom.y), float(nearest_geom.x)],
],
"color": cor,
"weight": 2,
"opacity": 0.65,
"dash_array": "6,6",
"tooltip_html": f"Ligação de distância • {escape(str(modelo.get('nome') or 'Modelo'))}",
}
)
except Exception:
return shapes
return shapes
def _build_mapa_modelos_payload(
modelos_plotados: list[dict[str, Any]],
bounds: list[list[float]],
avaliandos_geo: list[dict[str, Any]],
modo_exibicao: str,
avaliandos_tecnicos_proximos: list[dict[str, Any]] | None = None,
trabalhos_tecnicos_raio_m: int | None = None,
) -> dict[str, Any] | None:
overlay_layers: list[dict[str, Any]] = []
avaliando_unico = avaliandos_geo[0] if len(avaliandos_geo) == 1 else None
aval_lat = avaliando_unico.get("lat") if avaliando_unico is not None else None
aval_lon = avaliando_unico.get("lon") if avaliando_unico is not None else None
trabalhos_tecnicos_modelos = _consolidar_trabalhos_tecnicos_modelos(modelos_plotados)
trabalhos_tecnicos_modelos_markers = (
build_trabalhos_tecnicos_marker_payloads(
trabalhos_tecnicos_modelos,
apply_jitter=False,
)
if trabalhos_tecnicos_modelos
else []
)
trabalhos_tecnicos_proximos_markers = (
build_trabalhos_tecnicos_marker_payloads(
avaliandos_tecnicos_proximos or [],
origem="pesquisa_mapa",
marker_style="estrela",
ignore_bounds=False,
apply_jitter=False,
)
if avaliandos_tecnicos_proximos is not None
else []
)
if trabalhos_tecnicos_modelos_markers or trabalhos_tecnicos_proximos_markers:
apply_marker_payload_jitter([*trabalhos_tecnicos_modelos_markers, *trabalhos_tecnicos_proximos_markers])
for modelo in modelos_plotados:
layer: dict[str, Any] = {
"id": str(modelo.get("id") or ""),
"label": str(modelo.get("nome") or "Modelo"),
"show": True,
"hover_highlight_group": "pesquisa-modelos",
}
if modo_exibicao == "pontos":
tooltip_html = _tooltip_mapa_modelo_html(modelo)
layer["points"] = [
{
"lat": float(ponto["lat"]),
"lon": float(ponto["lon"]),
"color": str(modelo.get("cor") or "#1f77b4"),
"base_radius": 3.0,
"stroke_color": str(modelo.get("cor") or "#1f77b4"),
"stroke_weight": 1.0,
"fill_opacity": 0.72,
"tooltip_html": tooltip_html,
}
for ponto in (modelo.get("pontos") or [])
]
else:
layer["shapes"] = _shapes_modelo_payload(modelo, aval_lat, aval_lon)
overlay_layers.append(layer)
if trabalhos_tecnicos_modelos:
overlay_layers.append(
{
"id": "trabalhos-tecnicos-modelos",
"label": "Trabalhos tecnicos dos modelos",
"show": True,
"markers": trabalhos_tecnicos_modelos_markers,
}
)
if avaliandos_tecnicos_proximos is not None:
label_raio = f" (até {int(trabalhos_tecnicos_raio_m or 0)} m)" if trabalhos_tecnicos_raio_m is not None else ""
proximos_layer: dict[str, Any] = {
"id": "trabalhos-tecnicos-proximos",
"label": f"Trabalhos técnicos próximos{label_raio}",
"show": True,
"required_overlay_ids": ["trabalhos-tecnicos-modelos"],
"markers": trabalhos_tecnicos_proximos_markers,
"shapes": [],
}
if int(trabalhos_tecnicos_raio_m or 0) > 0:
for idx, avaliando in enumerate(avaliandos_geo):
lat_item, lon_item = _normalizar_coordenadas_avaliando(avaliando.get("lat"), avaliando.get("lon"))
if lat_item is None or lon_item is None:
continue
label = str(avaliando.get("label") or f"A{idx + 1}").strip() or f"A{idx + 1}"
tooltip_raio = (
f"Raio de busca: até {int(trabalhos_tecnicos_raio_m)} m"
if len(avaliandos_geo) == 1
else f"Raio de busca • {label}: até {int(trabalhos_tecnicos_raio_m)} m"
)
proximos_layer["shapes"].append(
{
"type": "circle",
"center": [float(lat_item), float(lon_item)],
"radius_m": float(trabalhos_tecnicos_raio_m),
"color": "#c62828",
"weight": 2,
"opacity": 0.75,
"fill": True,
"fill_color": "#c62828",
"fill_opacity": 0.06,
"dash_array": "6,6",
"tooltip_html": tooltip_raio,
}
)
overlay_layers.append(proximos_layer)
avaliandos_markers = _marker_payloads_avaliandos(avaliandos_geo)
if avaliandos_markers:
overlay_layers.append(
{
"id": "avaliandos",
"label": "Avaliando" if len(avaliandos_geo) == 1 else "Avaliandos",
"show": True,
"markers": avaliandos_markers,
}
)
payload = build_leaflet_payload(bounds=bounds, overlay_layers=overlay_layers, show_bairros=True)
if payload and isinstance(payload.get("controls"), dict):
payload["controls"]["layer_control_collapsed"] = False
return payload
def _renderizar_mapa_modelos(
modelos_plotados: list[dict[str, Any]],
bounds: list[list[float]],
avaliandos_geo: list[dict[str, Any]],
modo_exibicao: str,
avaliandos_tecnicos_proximos: list[dict[str, Any]] | None = None,
trabalhos_tecnicos_raio_m: int | None = None,
) -> str:
renderizar_pontos = modo_exibicao == "pontos"
renderizar_cobertura = modo_exibicao == "cobertura"
avaliando_unico = avaliandos_geo[0] if len(avaliandos_geo) == 1 else None
aval_lat = avaliando_unico.get("lat") if avaliando_unico is not None else None
aval_lon = avaliando_unico.get("lon") if avaliando_unico is not None else None
centro_lat = sum(coord[0] for coord in bounds) / len(bounds)
centro_lon = sum(coord[1] for coord in bounds) / len(bounds)
mapa = folium.Map(
location=[centro_lat, centro_lon],
zoom_start=12,
control_scale=True,
tiles=None,
)
folium.TileLayer(tiles="OpenStreetMap", name="OpenStreetMap", control=True, show=True).add_to(mapa)
folium.TileLayer(tiles="CartoDB positron", name="Positron", control=True, show=False).add_to(mapa)
add_bairros_layer(mapa, show=True)
nome_camada_avaliando = "Avaliando" if len(avaliandos_geo) == 1 else "Avaliandos"
camada_avaliando = folium.FeatureGroup(name=nome_camada_avaliando, show=True)
trabalhos_tecnicos_modelos = _consolidar_trabalhos_tecnicos_modelos(modelos_plotados)
trabalhos_tecnicos_modelos_markers = (
build_trabalhos_tecnicos_marker_payloads(
trabalhos_tecnicos_modelos,
apply_jitter=False,
)
if trabalhos_tecnicos_modelos
else []
)
trabalhos_tecnicos_proximos_markers = (
build_trabalhos_tecnicos_marker_payloads(
avaliandos_tecnicos_proximos or [],
origem="pesquisa_mapa",
marker_style="estrela",
ignore_bounds=False,
apply_jitter=False,
)
if avaliandos_tecnicos_proximos is not None
else []
)
if trabalhos_tecnicos_modelos_markers or trabalhos_tecnicos_proximos_markers:
apply_marker_payload_jitter([*trabalhos_tecnicos_modelos_markers, *trabalhos_tecnicos_proximos_markers])
camada_trabalhos_modelos = (
folium.FeatureGroup(name="Trabalhos tecnicos dos modelos", show=True)
if trabalhos_tecnicos_modelos
else None
)
camada_trabalhos_proximos = None
if avaliandos_tecnicos_proximos is not None:
label_raio = f" (ate {int(trabalhos_tecnicos_raio_m or 0)} m)" if trabalhos_tecnicos_raio_m is not None else ""
camada_trabalhos_proximos = folium.FeatureGroup(
name=f"Trabalhos tecnicos proximos{label_raio}",
show=True,
)
for modelo in modelos_plotados:
nome_layer = str(modelo.get("nome") or modelo.get("id") or "Modelo").strip() or "Modelo"
nome_layer_html = (
"<span style='display:inline-flex;align-items:center;gap:6px;'>"
f"<span style='display:inline-block;width:10px;height:10px;border-radius:999px;"
f"background:{escape(str(modelo.get('cor') or '#1f77b4'))};"
"border:1px solid rgba(0,0,0,0.35);flex:0 0 auto;'></span>"
f"<span>{escape(nome_layer)}</span>"
"</span>"
)
camada_modelo = folium.FeatureGroup(name=nome_layer_html, show=True)
modelo["layer_var"] = camada_modelo.get_name()
if renderizar_pontos:
tooltip_modelo = nome_layer
tooltip_distancia = _montar_tooltip_distancia_modelo(modelo)
if tooltip_distancia:
tooltip_modelo = f"{tooltip_modelo}{tooltip_distancia}"
for ponto in modelo["pontos"]:
marcador = folium.CircleMarker(
location=[ponto["lat"], ponto["lon"]],
radius=3,
color=modelo["cor"],
fill=True,
fill_color=modelo["cor"],
fill_opacity=0.72,
opacity=0.9,
weight=1,
tooltip=tooltip_modelo,
).add_to(camada_modelo)
marcador.options["mesaBaseRadius"] = 3.0
if renderizar_cobertura:
_adicionar_geometria_modelo_no_mapa(camada_modelo, camada_modelo, modelo, aval_lat, aval_lon)
camada_modelo.add_to(mapa)
if camada_trabalhos_modelos is not None:
add_marker_payloads(camada_trabalhos_modelos, trabalhos_tecnicos_modelos_markers)
camada_trabalhos_modelos.add_to(mapa)
if camada_trabalhos_proximos is not None:
if int(trabalhos_tecnicos_raio_m or 0) > 0:
for idx, avaliando in enumerate(avaliandos_geo):
lat_item, lon_item = _normalizar_coordenadas_avaliando(avaliando.get("lat"), avaliando.get("lon"))
if lat_item is None or lon_item is None:
continue
label = str(avaliando.get("label") or f"A{idx + 1}").strip() or f"A{idx + 1}"
tooltip_raio = (
f"Raio de busca: ate {int(trabalhos_tecnicos_raio_m)} m"
if len(avaliandos_geo) == 1
else f"Raio de busca • {label}: ate {int(trabalhos_tecnicos_raio_m)} m"
)
folium.Circle(
location=[lat_item, lon_item],
radius=float(trabalhos_tecnicos_raio_m),
color="#c62828",
weight=2,
opacity=0.75,
fill=True,
fill_color="#c62828",
fill_opacity=0.06,
dash_array="6,6",
tooltip=tooltip_raio,
).add_to(camada_trabalhos_proximos)
add_marker_payloads(camada_trabalhos_proximos, trabalhos_tecnicos_proximos_markers)
camada_trabalhos_proximos.add_to(mapa)
for idx, avaliando in enumerate(avaliandos_geo):
lat_item, lon_item = _normalizar_coordenadas_avaliando(avaliando.get("lat"), avaliando.get("lon"))
if lat_item is None or lon_item is None:
continue
label = str(avaliando.get("label") or f"A{idx + 1}").strip() or f"A{idx + 1}"
endereco = str(avaliando.get("logradouro") or "").strip()
numero_usado = str(avaliando.get("numero_usado") or "").strip()
tooltip = label
if endereco:
tooltip = f"{tooltip}{endereco}{f', {numero_usado}' if numero_usado else ''}"
marker_icon = folium.DivIcon(
html=(
"<div style='display:flex;align-items:center;justify-content:center;"
"width:30px;height:30px;filter:drop-shadow(0 2px 6px rgba(0,0,0,0.22));'>"
"<div style='display:flex;align-items:center;justify-content:center;"
"width:24px;height:24px;border-radius:999px;background:rgba(255,255,255,0.97);"
"box-shadow:0 0 0 1.4px rgba(255,255,255,0.98),0 0 0 2.5px rgba(0,0,0,0.74);'>"
"<svg viewBox='0 0 24 24' width='20' height='20' aria-hidden='true'>"
"<path d='M12 3.2L3.9 10v10.1h5.4v-5.3h5.4v5.3h5.4V10L12 3.2Z' "
"fill='#c62828' stroke='rgba(255,255,255,0.96)' stroke-width='1.2' stroke-linejoin='round'/>"
"<rect x='10.1' y='15.2' width='3.8' height='4.9' rx='0.5' fill='rgba(255,255,255,0.96)'/>"
"</svg>"
"</div>"
"</div>"
),
icon_size=(30, 30),
icon_anchor=(15, 15),
class_name="mesa-avaliando-marker",
)
folium.Marker(
location=[lat_item, lon_item],
tooltip=tooltip,
icon=marker_icon,
).add_to(camada_avaliando)
if avaliandos_geo:
camada_avaliando.add_to(mapa)
plugins.Fullscreen().add_to(mapa)
add_zoom_responsive_circle_markers(mapa)
layer_control = folium.LayerControl(collapsed=False)
layer_control.add_to(mapa)
_adicionar_interacao_hover_legenda_modelos(mapa, layer_control, modelos_plotados)
if bounds:
lat_values = [float(coord[0]) for coord in bounds]
lon_values = [float(coord[1]) for coord in bounds]
lat_min, lat_max = min(lat_values), max(lat_values)
lon_min, lon_max = min(lon_values), max(lon_values)
if math.isclose(lat_min, lat_max):
lat_delta = 0.0008
lat_min -= lat_delta
lat_max += lat_delta
if math.isclose(lon_min, lon_max):
lon_delta = 0.0008
lon_min -= lon_delta
lon_max += lon_delta
mapa.fit_bounds([[lat_min, lon_min], [lat_max, lon_max]], padding=(48, 48), max_zoom=18)
return mapa.get_root().render()
def _adicionar_interacao_hover_legenda_modelos(
mapa: folium.Map,
layer_control: folium.map.LayerControl,
modelos_plotados: list[dict[str, Any]],
) -> None:
modelos_layers = [
{
"id": str(modelo.get("id") or ""),
"layer_var": str(modelo.get("layer_var") or ""),
}
for modelo in modelos_plotados
if str(modelo.get("id") or "").strip() and str(modelo.get("layer_var") or "").strip()
]
if not modelos_layers:
return
style = """
<style>
.leaflet-container .leaflet-interactive,
.leaflet-container .leaflet-interactive:focus,
.leaflet-container svg path.leaflet-interactive,
.leaflet-container svg path.leaflet-interactive:focus {
outline: none !important;
box-shadow: none !important;
}
.leaflet-control-layers-overlays label.mesa-model-layer-toggle {
transition: opacity 0.16s ease, color 0.16s ease, font-weight 0.16s ease;
cursor: pointer;
}
.leaflet-control-layers-overlays label.mesa-model-layer-toggle.mesa-model-layer-dimmed {
opacity: 0.42;
}
.leaflet-control-layers-overlays label.mesa-model-layer-toggle.mesa-model-layer-active {
opacity: 1;
color: #123b63;
font-weight: 700;
}
</style>
"""
mapa.get_root().html.add_child(Element(style))
map_name = mapa.get_name()
modelos_payload = json.dumps(modelos_layers, ensure_ascii=True)
script = f"""
<script>
(function() {{
const MAP_NAME = "{map_name}";
const MODEL_LAYERS = {modelos_payload};
function getLayersControlContainer(map) {{
if (!map || typeof map.getContainer !== 'function') return null;
const mapContainer = map.getContainer();
if (!mapContainer || !mapContainer.querySelector) return null;
return mapContainer.querySelector('.leaflet-control-layers');
}}
function visitLayerTree(layer, visited, fn) {{
if (!layer || !layer._leaflet_id || visited.has(layer._leaflet_id)) return;
visited.add(layer._leaflet_id);
fn(layer);
if (layer.eachLayer) {{
layer.eachLayer(function(child) {{
visitLayerTree(child, visited, fn);
}});
}}
}}
function rememberPathBaseStyle(layer) {{
if (!layer || !layer.options) return;
if (!Number.isFinite(layer.options.mesaBaseOpacity)) {{
layer.options.mesaBaseOpacity = Number.isFinite(layer.options.opacity) ? layer.options.opacity : 1;
}}
if (!Number.isFinite(layer.options.mesaBaseFillOpacity)) {{
layer.options.mesaBaseFillOpacity = Number.isFinite(layer.options.fillOpacity) ? layer.options.fillOpacity : 0;
}}
if (!Number.isFinite(layer.options.mesaBaseWeight)) {{
layer.options.mesaBaseWeight = Number.isFinite(layer.options.weight) ? layer.options.weight : 1;
}}
if (!layer.options.mesaBaseColor && layer.options.color) {{
layer.options.mesaBaseColor = layer.options.color;
}}
if (!layer.options.mesaBaseFillColor && layer.options.fillColor) {{
layer.options.mesaBaseFillColor = layer.options.fillColor;
}}
}}
function rememberMarkerBaseStyle(layer) {{
if (!layer || !layer.options) return;
if (!Number.isFinite(layer.options.mesaBaseMarkerOpacity)) {{
layer.options.mesaBaseMarkerOpacity = Number.isFinite(layer.options.opacity) ? layer.options.opacity : 1;
}}
if (!Number.isFinite(layer.options.mesaBaseRadius) && typeof layer.getRadius === 'function') {{
const radius = layer.getRadius();
if (Number.isFinite(radius) && radius > 0) {{
layer.options.mesaBaseRadius = radius;
}}
}}
}}
function applyVisualState(layer, state) {{
if (!layer) return;
const isPath = typeof layer.setStyle === 'function';
if (isPath) {{
rememberPathBaseStyle(layer);
const baseOpacity = Number.isFinite(layer.options.mesaBaseOpacity) ? layer.options.mesaBaseOpacity : 1;
const baseFillOpacity = Number.isFinite(layer.options.mesaBaseFillOpacity) ? layer.options.mesaBaseFillOpacity : 0;
const baseWeight = Number.isFinite(layer.options.mesaBaseWeight) ? layer.options.mesaBaseWeight : 1;
let nextOpacity = baseOpacity;
let nextFillOpacity = baseFillOpacity;
let nextWeight = baseWeight;
if (state === 'dim') {{
nextOpacity = Math.max(0.06, baseOpacity * 0.14);
nextFillOpacity = baseFillOpacity > 0 ? Math.max(0.02, baseFillOpacity * 0.18) : baseFillOpacity;
nextWeight = Math.max(1, baseWeight * 0.82);
}} else if (state === 'highlight') {{
nextOpacity = Math.max(baseOpacity, 0.98);
nextFillOpacity = baseFillOpacity > 0 ? Math.max(baseFillOpacity, Math.min(0.30, baseFillOpacity * 2.1)) : baseFillOpacity;
nextWeight = Math.max(baseWeight + 1, baseWeight * 1.2);
}}
layer.setStyle({{
opacity: nextOpacity,
fillOpacity: nextFillOpacity,
weight: nextWeight,
color: layer.options.mesaBaseColor || layer.options.color,
fillColor: layer.options.mesaBaseFillColor || layer.options.fillColor || layer.options.mesaBaseColor || layer.options.color,
}});
}}
if (typeof layer.setRadius === 'function') {{
rememberMarkerBaseStyle(layer);
const baseRadius = Number.isFinite(layer.options.mesaBaseRadius) ? layer.options.mesaBaseRadius : null;
if (baseRadius !== null) {{
let nextRadius = baseRadius;
if (state === 'dim') {{
nextRadius = Math.max(1.6, baseRadius * 0.88);
}} else if (state === 'highlight') {{
nextRadius = Math.min(baseRadius * 1.28, baseRadius + 3.0);
}}
layer.setRadius(nextRadius);
}}
}}
if (!isPath && typeof layer.setOpacity === 'function') {{
rememberMarkerBaseStyle(layer);
const baseOpacity = Number.isFinite(layer.options.mesaBaseMarkerOpacity) ? layer.options.mesaBaseMarkerOpacity : 1;
let nextOpacity = baseOpacity;
if (state === 'dim') {{
nextOpacity = Math.max(0.18, baseOpacity * 0.28);
}} else if (state === 'highlight') {{
nextOpacity = Math.max(baseOpacity, 1);
}}
layer.setOpacity(nextOpacity);
}}
}}
function applyGroupState(meta, state) {{
if (!meta || !meta.layer) return;
const visited = new Set();
visitLayerTree(meta.layer, visited, function(current) {{
if (current === meta.layer) return;
applyVisualState(current, state);
}});
}}
function updateLegendState(activeId) {{
const map = window[MAP_NAME];
const container = getLayersControlContainer(map);
if (!container) return;
const labels = container.querySelectorAll('label.mesa-model-layer-toggle');
labels.forEach(function(label) {{
const modelId = String(label.dataset.mesaModelId || '');
const isActive = Boolean(activeId) && modelId === activeId;
label.classList.toggle('mesa-model-layer-active', isActive);
label.classList.toggle('mesa-model-layer-dimmed', Boolean(activeId) && !isActive);
}});
}}
function clearHoverState() {{
MODEL_LAYERS.forEach(function(meta) {{
applyGroupState(meta, 'normal');
}});
updateLegendState(null);
}}
function setHoveredModel(activeId) {{
const map = window[MAP_NAME];
if (!map) return;
if (!activeId) {{
clearHoverState();
return;
}}
const activeMeta = MODEL_LAYERS.find(function(meta) {{ return meta.id === activeId; }}) || null;
if (!activeMeta || !activeMeta.layer || !map.hasLayer(activeMeta.layer)) {{
clearHoverState();
return;
}}
MODEL_LAYERS.forEach(function(meta) {{
applyGroupState(meta, meta.id === activeId ? 'highlight' : 'dim');
}});
updateLegendState(activeId);
}}
function resolveModelLayers() {{
MODEL_LAYERS.forEach(function(meta) {{
meta.layer = window[meta.layer_var] || null;
meta.layerId = meta.layer && window.L && window.L.Util ? window.L.Util.stamp(meta.layer) : null;
}});
}}
function bindLegendHover() {{
const map = window[MAP_NAME];
const container = getLayersControlContainer(map);
if (!container) return false;
const labels = Array.from(container.querySelectorAll('.leaflet-control-layers-overlays label'));
if (!labels.length) return false;
MODEL_LAYERS.forEach(function(meta, index) {{
const label = labels[index];
if (!label || label.dataset.mesaModelHoverBound === '1') return;
label.dataset.mesaModelHoverBound = '1';
label.dataset.mesaModelId = meta.id;
label.classList.add('mesa-model-layer-toggle');
const onEnter = function() {{
setHoveredModel(meta.id);
}};
const onLeave = function() {{
clearHoverState();
}};
label.addEventListener('mouseenter', function() {{
onEnter();
}});
label.addEventListener('mouseleave', function() {{
onLeave();
}});
label.addEventListener('mouseover', function() {{
onEnter();
}});
label.addEventListener('mouseout', function(event) {{
const nextTarget = event && event.relatedTarget ? event.relatedTarget : null;
if (nextTarget && label.contains(nextTarget)) return;
onLeave();
}});
label.addEventListener('focusin', function() {{
onEnter();
}});
label.addEventListener('focusout', function() {{
onLeave();
}});
}});
return true;
}}
function bindWhenReady() {{
const map = window[MAP_NAME];
if (!map || !window.L || !window.L.Util) {{
window.setTimeout(bindWhenReady, 50);
return;
}}
resolveModelLayers();
if (!bindLegendHover()) {{
window.setTimeout(bindWhenReady, 50);
return;
}}
if (map.__mesaLegendHoverBound) return;
map.__mesaLegendHoverBound = true;
map.on('overlayadd', function() {{
window.setTimeout(function() {{
resolveModelLayers();
bindLegendHover();
clearHoverState();
}}, 30);
}});
map.on('overlayremove', function() {{
window.setTimeout(function() {{
resolveModelLayers();
bindLegendHover();
clearHoverState();
}}, 30);
}});
map.whenReady(function() {{
window.setTimeout(function() {{
resolveModelLayers();
bindLegendHover();
clearHoverState();
}}, 40);
}});
}}
bindWhenReady();
}})();
</script>
"""
mapa.get_root().html.add_child(Element(script))
def _carregar_resumo_com_cache(caminho_modelo: Path) -> dict[str, Any]:
assinatura = _assinatura_arquivos(caminho_modelo)
cache_key = str(caminho_modelo)
with _CACHE_LOCK:
cached = _CACHE.get(cache_key)
if cached and cached.get("assinatura") == assinatura:
return dict(cached["resumo"])
resumo = _construir_resumo_modelo(caminho_modelo)
with _CACHE_LOCK:
_CACHE[cache_key] = {
"assinatura": assinatura,
"resumo": resumo,
}
return dict(resumo)
def _carregar_geometria_modelo_com_cache(caminho_modelo: Path) -> dict[str, Any] | None:
assinatura = _assinatura_arquivos(caminho_modelo)
cache_key = str(caminho_modelo)
with _CACHE_LOCK:
cached = _CACHE.get(cache_key)
if (
cached
and cached.get("assinatura") == assinatura
and cached.get("geometria_cache_version") == GEOMETRIA_CACHE_VERSION
and "geometria" in cached
):
return cached.get("geometria")
geometria = _construir_geometria_modelo(caminho_modelo)
with _CACHE_LOCK:
entry = _CACHE.setdefault(cache_key, {})
entry["assinatura"] = assinatura
entry["geometria_cache_version"] = GEOMETRIA_CACHE_VERSION
entry["geometria"] = geometria
return geometria
def _assinatura_arquivos(caminho_modelo: Path) -> tuple[int, int]:
stat_modelo = caminho_modelo.stat()
return (stat_modelo.st_mtime_ns, stat_modelo.st_size)
def _construir_resumo_modelo(caminho_modelo: Path) -> dict[str, Any]:
resumo = {
"id": caminho_modelo.stem,
"arquivo": caminho_modelo.name,
"nome_modelo": caminho_modelo.stem,
"autor": None,
"finalidade": _inferir_finalidade_por_nome(caminho_modelo.stem),
"finalidades": [],
"tipo_imovel": _inferir_tipo_por_nome(caminho_modelo.stem),
"bairros": [],
"faixa_area": None,
"faixa_rh": None,
"faixa_data": None,
"total_dados": None,
"total_trabalhos": None,
"endereco_referencia": None,
"equacao": None,
"observacao_modelo": None,
"r2": None,
"tem_app": False,
"mapa_disponivel": False,
"compatibilidade_campos": {chave: [] for chave in COMPATIBILIDADE_MAP},
"faixas_por_campo": {chave: None for chave in RANGE_CAMPOS},
"variaveis_resumo": [],
"status": "ok",
"erro_leitura": None,
"fonte": {
"modelo": "dai",
},
"_variaveis_modelo": [],
"_texto_colunas_index": {},
"_faixa_colunas_index": {},
"_faixa_variaveis_index": {},
"_caminho_modelo": str(caminho_modelo),
}
try:
pacote = load(caminho_modelo)
except Exception as exc:
resumo["status"] = "erro"
resumo["erro_leitura"] = f"Falha ao ler .dai: {exc}"
return resumo
if not isinstance(pacote, dict):
resumo["status"] = "erro"
resumo["erro_leitura"] = "Arquivo .dai invalido (conteudo nao e dict)."
return resumo
if "versao" not in pacote:
try:
pacote = _migrar_pacote_v1_para_v2(pacote)
except Exception:
pass
dados_pacote = pacote.get("dados") if isinstance(pacote.get("dados"), dict) else {}
estat_df = _to_dataframe(dados_pacote.get("estatisticas"))
df_modelo = _extrair_df_modelo_ajustado(dados_pacote)
if resumo["autor"] is None:
resumo["autor"] = _autor_do_pacote(pacote)
if resumo["equacao"] is None:
resumo["equacao"] = _equacao_do_pacote(pacote)
if resumo["observacao_modelo"] is None:
resumo["observacao_modelo"] = normalizar_observacao_modelo(pacote.get("observacao_modelo"))
if resumo["r2"] is None:
resumo["r2"] = _r2_do_pacote(pacote)
resumo["_variaveis_modelo"] = _variaveis_do_modelo(pacote)
resumo["tem_app"] = _modelo_contem_variavel(resumo, APP_ALIASES)
if resumo["total_dados"] is None and df_modelo is not None:
resumo["total_dados"] = int(len(df_modelo))
if resumo["total_trabalhos"] is None:
resumo["total_trabalhos"] = trabalhos_tecnicos_service.contar_avaliandos_por_modelo(
[resumo.get("id"), resumo.get("arquivo"), resumo.get("nome_modelo")]
)
if not resumo["finalidades"] and df_modelo is not None:
resumo["finalidades"] = _extrair_finalidades(df_modelo)
if resumo["finalidade"] is None:
resumo["finalidade"] = resumo["finalidades"][0] if resumo["finalidades"] else None
if not resumo["bairros"] and df_modelo is not None:
resumo["bairros"] = _extrair_bairros(df_modelo)
faixas_por_campo = {
chave: _extrair_faixa_por_alias(estat_df, aliases)
for chave, aliases in RANGE_CAMPOS.items()
}
resumo["faixas_por_campo"] = faixas_por_campo
resumo["faixa_area"] = _merge_ranges(
resumo["faixa_area"],
_extrair_faixa_por_alias(estat_df, AREA_GERAL_ALIASES),
)
resumo["faixa_rh"] = _merge_ranges(resumo["faixa_rh"], _extrair_faixa_por_alias(estat_df, RH_ALIASES))
resumo["faixa_data"] = _merge_ranges(resumo["faixa_data"], _faixa_data_do_pacote(pacote))
colunas_catalogo = _coletar_colunas_para_catalogo(estat_df, df_modelo)
resumo["compatibilidade_campos"] = _mapear_compatibilidade(colunas_catalogo)
resumo["variaveis_resumo"] = _resumo_variaveis(estat_df)
resumo["mapa_disponivel"] = _tem_colunas_mapa(df_modelo)
resumo["_texto_colunas_index"] = _indexar_texto_colunas(df_modelo)
resumo["_faixa_colunas_index"] = {}
resumo["_faixa_variaveis_index"] = _indexar_faixas_variaveis(estat_df, resumo["_variaveis_modelo"])
return resumo
def _autor_do_pacote(pacote: dict[str, Any]) -> str | None:
elaborador = pacote.get("elaborador")
if isinstance(elaborador, dict):
return _str_or_none(elaborador.get("nome_completo")) or _str_or_none(elaborador.get("nome"))
return None
def _equacao_do_pacote(pacote: dict[str, Any]) -> str | None:
modelo = pacote.get("modelo") if isinstance(pacote.get("modelo"), dict) else {}
diagnosticos = modelo.get("diagnosticos") if isinstance(modelo.get("diagnosticos"), dict) else {}
return _str_or_none(diagnosticos.get("equacao"))
def _r2_do_pacote(pacote: dict[str, Any]) -> float | None:
modelo = pacote.get("modelo") if isinstance(pacote.get("modelo"), dict) else {}
diagnosticos = modelo.get("diagnosticos") if isinstance(modelo.get("diagnosticos"), dict) else {}
gerais = diagnosticos.get("gerais") if isinstance(diagnosticos.get("gerais"), dict) else {}
return _to_float_or_none(gerais.get("r2"))
def _faixa_data_do_pacote(pacote: dict[str, Any]) -> dict[str, Any] | None:
periodo = pacote.get("periodo_dados_mercado") if isinstance(pacote.get("periodo_dados_mercado"), dict) else {}
data_inicial = _data_iso_or_none(periodo.get("data_inicial"))
data_final = _data_iso_or_none(periodo.get("data_final"))
if data_inicial is None and data_final is None:
return None
if data_inicial and data_final and data_inicial > data_final:
data_inicial, data_final = data_final, data_inicial
return {"min": data_inicial, "max": data_final}
def _data_iso_or_none(value: Any) -> str | None:
if _is_empty(value):
return None
text = str(value).strip()
if not text:
return None
iso_match = re.match(r"^(\d{4})-(\d{2})-(\d{2})(?:[T\s].*)?$", text)
if iso_match:
try:
return datetime(
int(iso_match.group(1)),
int(iso_match.group(2)),
int(iso_match.group(3)),
).date().isoformat()
except Exception:
return None
iso_slash_match = re.match(r"^(\d{4})/(\d{2})/(\d{2})(?:[T\s].*)?$", text)
if iso_slash_match:
try:
return datetime(
int(iso_slash_match.group(1)),
int(iso_slash_match.group(2)),
int(iso_slash_match.group(3)),
).date().isoformat()
except Exception:
return None
br_match = re.match(r"^(\d{2})/(\d{2})/(\d{4})(?:[T\s].*)?$", text)
if br_match:
try:
return datetime(
int(br_match.group(3)),
int(br_match.group(2)),
int(br_match.group(1)),
).date().isoformat()
except Exception:
return None
parsed = pd.to_datetime(text, errors="coerce", dayfirst=True)
if pd.isna(parsed):
return None
return parsed.date().isoformat()
def _variaveis_do_modelo(pacote: dict[str, Any]) -> list[str]:
variaveis: list[str] = []
modelo = pacote.get("modelo") if isinstance(pacote.get("modelo"), dict) else {}
coeficientes = modelo.get("coeficientes") if isinstance(modelo, dict) else None
if isinstance(coeficientes, pd.DataFrame):
variaveis.extend([str(item) for item in coeficientes.index.tolist()])
elif isinstance(coeficientes, pd.Series):
variaveis.extend([str(item) for item in coeficientes.index.tolist()])
elif isinstance(coeficientes, dict):
variaveis.extend([str(item) for item in coeficientes.keys()])
transformacoes = pacote.get("transformacoes") if isinstance(pacote.get("transformacoes"), dict) else {}
x_transformado = transformacoes.get("X")
y_transformado = transformacoes.get("y")
if isinstance(x_transformado, pd.DataFrame):
variaveis.extend([str(item) for item in x_transformado.columns.tolist()])
if isinstance(y_transformado, pd.Series):
nome_y = _str_or_none(y_transformado.name)
if nome_y:
variaveis.append(nome_y)
equacao = _equacao_do_pacote(pacote)
if isinstance(equacao, str) and "=" in equacao:
variaveis.append(equacao.split("=", 1)[0].strip())
limpas = [item for item in variaveis if _str_or_none(item) and _normalize(item) != "const"]
return _dedupe_strings(limpas)
def _modelo_contem_variavel(modelo: dict[str, Any], aliases: list[str]) -> bool:
variaveis = list(modelo.get("_variaveis_modelo") or [])
indice_faixa = modelo.get("_faixa_variaveis_index") or {}
if isinstance(indice_faixa, dict):
variaveis.extend([str(chave) for chave in indice_faixa.keys()])
return any(_has_alias_exato(str(variavel), aliases) for variavel in variaveis)
def _to_dataframe(value: Any) -> pd.DataFrame | None:
if value is None:
return None
if isinstance(value, pd.DataFrame):
return value.copy()
if isinstance(value, list):
try:
return pd.DataFrame(value)
except Exception:
return None
if isinstance(value, dict):
try:
return pd.DataFrame(value)
except Exception:
return None
return None
def _listar_outliers_excluidos(dados_pacote: dict[str, Any]) -> list[int]:
valores = dados_pacote.get("outliers_excluidos")
if not isinstance(valores, (list, tuple, set)):
return []
out: list[int] = []
for valor in valores:
try:
out.append(int(valor))
except Exception:
continue
return sorted(set(out))
def _extrair_df_modelo_ajustado(dados_pacote: dict[str, Any]) -> pd.DataFrame | None:
# Prioriza sempre a base ajustada efetivamente usada no modelo.
df_modelo = _to_dataframe(dados_pacote.get("df"))
if df_modelo is not None:
return df_modelo
df_completo = _to_dataframe(dados_pacote.get("df_completo"))
if df_completo is None:
return None
outliers_excluidos = _listar_outliers_excluidos(dados_pacote)
if not outliers_excluidos:
return df_completo
return df_completo.drop(index=outliers_excluidos, errors="ignore")
def _carregar_dataframe_modelo(caminho_modelo: Path) -> pd.DataFrame | None:
try:
pacote = load(caminho_modelo)
except Exception:
return None
if not isinstance(pacote, dict):
return None
if "versao" not in pacote:
try:
pacote = _migrar_pacote_v1_para_v2(pacote)
except Exception:
return None
dados = pacote.get("dados") if isinstance(pacote.get("dados"), dict) else {}
return _extrair_df_modelo_ajustado(dados)
def _coletar_pontos_modelo(df_modelo: pd.DataFrame, limite_pontos: int) -> list[dict[str, Any]]:
if df_modelo is None or df_modelo.empty:
return []
col_lat = _identificar_coluna_por_alias(df_modelo.columns, LAT_ALIASES)
col_lon = _identificar_coluna_por_alias(df_modelo.columns, LON_ALIASES)
if col_lat is None or col_lon is None:
return []
base = df_modelo[[col_lat, col_lon]].copy()
base[col_lat] = pd.to_numeric(base[col_lat], errors="coerce")
base[col_lon] = pd.to_numeric(base[col_lon], errors="coerce")
base = base.dropna(subset=[col_lat, col_lon])
if base.empty:
return []
base = base[(base[col_lat].between(-90, 90)) & (base[col_lon].between(-180, 180))]
if base.empty:
return []
if limite_pontos > 0 and len(base) > limite_pontos:
passo = max(1, math.ceil(len(base) / limite_pontos))
base = base.iloc[::passo].head(limite_pontos)
lat_vals = base[col_lat].to_numpy(dtype=float, copy=False)
lon_vals = base[col_lon].to_numpy(dtype=float, copy=False)
indices = base.index.to_list()
return [
{"lat": float(lat), "lon": float(lon), "indice": _formatar_indice_mapa(indice)}
for lat, lon, indice in zip(lat_vals, lon_vals, indices)
]
def _construir_geometria_modelo(caminho_modelo: Path) -> dict[str, Any] | None:
df_modelo = _carregar_dataframe_modelo(caminho_modelo)
pontos = _coletar_pontos_modelo(df_modelo, 0)
if not pontos:
return None
try:
from shapely.geometry import MultiPoint
except ImportError:
return None
coordenadas = np.asarray([(float(item["lon"]), float(item["lat"])) for item in pontos], dtype=float)
if coordenadas.size == 0:
return None
try:
pontos_unicos = np.unique(coordenadas, axis=0)
hull_convexo = MultiPoint([tuple(item) for item in pontos_unicos]).convex_hull
except Exception:
return None
if hull_convexo is None or getattr(hull_convexo, "is_empty", False):
return None
hull = _construir_cobertura_concava_wgs84(pontos_unicos)
if hull is None:
hull = hull_convexo
geom_metric = _transformar_geometria_para_metrico(hull)
return {
"geom_wgs84": hull,
"geom_metric": geom_metric,
"geom_type": str(getattr(hull, "geom_type", "") or ""),
"total_pontos": len(pontos),
}
def _formatar_indice_mapa(indice: Any) -> str:
if isinstance(indice, (int, np.integer)):
return str(int(indice))
if isinstance(indice, (float, np.floating)) and np.isfinite(indice) and float(indice).is_integer():
return str(int(indice))
return str(indice)
def _identificar_coluna_por_alias(colunas: Any, aliases: list[str]) -> str | None:
for coluna in colunas:
nome = str(coluna)
if _has_alias(nome, aliases):
return nome
return None
def _normalizar_coordenadas_avaliando(lat_raw: Any, lon_raw: Any) -> tuple[float | None, float | None]:
lat = _to_float_or_none(lat_raw)
lon = _to_float_or_none(lon_raw)
if lat is None or lon is None:
return None, None
if not (-90.0 <= lat <= 90.0) or not (-180.0 <= lon <= 180.0):
return None, None
return float(lat), float(lon)
def _normalizar_avaliandos_geo(raw: Any) -> list[dict[str, Any]]:
saida: list[dict[str, Any]] = []
for item in raw or []:
if not isinstance(item, dict):
continue
lat, lon = _normalizar_coordenadas_avaliando(item.get("lat"), item.get("lon"))
if lat is None or lon is None:
continue
indice = len(saida) + 1
label = str(item.get("label") or f"A{indice}").strip() or f"A{indice}"
avaliando_id = str(item.get("id") or label or f"avaliando-{indice}").strip() or f"avaliando-{indice}"
logradouro = str(item.get("logradouro") or item.get("endereco") or "").strip() or None
numero_usado_raw = item.get("numero_usado")
if numero_usado_raw is None:
numero_usado_raw = item.get("numero")
numero_usado = None if _is_empty(numero_usado_raw) else str(numero_usado_raw).strip() or None
origem = str(item.get("origem") or "").strip() or None
cdlog = item.get("cdlog")
saida.append(
{
"id": avaliando_id,
"label": label,
"lat": lat,
"lon": lon,
"logradouro": logradouro,
"numero_usado": numero_usado,
"cdlog": cdlog,
"origem": origem,
}
)
return saida
def _normalizar_criterio_espacial(value: Any) -> str:
criterio = str(value or "").strip().lower()
if criterio == CRITERIO_ESPACIAL_MENOR_DISTANCIA:
return CRITERIO_ESPACIAL_MENOR_DISTANCIA
if criterio == CRITERIO_ESPACIAL_MEDIA_DISTANCIA:
return CRITERIO_ESPACIAL_MEDIA_DISTANCIA
return CRITERIO_ESPACIAL_PADRAO
def _calcular_distancias_avaliandos(
geometria: dict[str, Any] | None,
avaliandos_geo: list[dict[str, Any]],
criterio_espacial: str = CRITERIO_ESPACIAL_PADRAO,
) -> tuple[list[dict[str, Any]], dict[str, Any]]:
criterio_norm = _normalizar_criterio_espacial(criterio_espacial)
distancias: list[dict[str, Any]] = []
for item in avaliandos_geo or []:
distancia = _calcular_distancia_geometria_cache(geometria, item.get("lat"), item.get("lon"))
distancias.append(
{
"id": item.get("id"),
"label": item.get("label"),
"lat": item.get("lat"),
"lon": item.get("lon"),
"logradouro": item.get("logradouro"),
"numero_usado": item.get("numero_usado"),
"cdlog": item.get("cdlog"),
"origem": item.get("origem"),
"distancia_km": distancia.get("distancia_km"),
"distancia_label": distancia.get("distancia_label"),
"avaliando_dentro_cobertura": distancia.get("avaliando_dentro_cobertura"),
}
)
return distancias, _resumir_distancias_avaliandos(distancias, criterio_norm)
def _resumir_distancias_avaliandos(
distancias: list[dict[str, Any]],
criterio_espacial: str = CRITERIO_ESPACIAL_PADRAO,
) -> dict[str, Any]:
criterio_norm = _normalizar_criterio_espacial(criterio_espacial)
valores_validos = [
float(item["distancia_km"])
for item in (distancias or [])
if _to_float_or_none(item.get("distancia_km")) is not None
]
menor = min(valores_validos) if valores_validos else None
maior = max(valores_validos) if valores_validos else None
media = (sum(valores_validos) / len(valores_validos)) if valores_validos else None
total_dentro = sum(1 for item in (distancias or []) if item.get("avaliando_dentro_cobertura") is True)
principal_km = {
CRITERIO_ESPACIAL_MENOR_DISTANCIA: menor,
CRITERIO_ESPACIAL_MEDIA_DISTANCIA: media,
CRITERIO_ESPACIAL_MAIOR_DISTANCIA: maior,
}.get(criterio_norm, maior)
return {
"criterio_principal": criterio_norm,
"principal_distancia_km": principal_km,
"principal_distancia_label": _formatar_distancia_km(principal_km),
"menor_distancia_km": menor,
"menor_distancia_label": _formatar_distancia_km(menor),
"media_distancia_km": media,
"media_distancia_label": _formatar_distancia_km(media),
"maior_distancia_km": maior,
"maior_distancia_label": _formatar_distancia_km(maior),
"total_avaliandos": len(distancias or []),
"total_com_distancia": len(valores_validos),
"total_sem_distancia": max(0, len(distancias or []) - len(valores_validos)),
"total_dentro_cobertura": total_dentro,
}
def _anexar_distancias_modelo_multiplos(
modelo: dict[str, Any],
avaliandos_geo: list[dict[str, Any]],
criterio_espacial: str = CRITERIO_ESPACIAL_PADRAO,
) -> dict[str, Any]:
item = dict(modelo)
caminho_texto = str(item.get("_caminho_modelo") or "").strip()
if not caminho_texto:
item["distancias_avaliandos"] = []
item["distancia_resumo"] = _resumir_distancias_avaliandos([], criterio_espacial)
item["distancia_km"] = None
item["distancia_label"] = "-"
item["avaliando_dentro_cobertura"] = None
return item
geometria = _carregar_geometria_modelo_com_cache(Path(caminho_texto))
distancias, resumo = _calcular_distancias_avaliandos(geometria, avaliandos_geo, criterio_espacial)
item["distancias_avaliandos"] = distancias
item["distancia_resumo"] = resumo
item["distancia_km"] = resumo.get("principal_distancia_km")
item["distancia_label"] = resumo.get("principal_distancia_label")
item["avaliando_dentro_cobertura"] = None
return item
def _anexar_distancia_modelo(modelo: dict[str, Any], aval_lat: float, aval_lon: float) -> dict[str, Any]:
item = dict(modelo)
caminho_texto = str(item.get("_caminho_modelo") or "").strip()
if not caminho_texto:
item["distancia_km"] = None
item["distancia_label"] = "-"
item["avaliando_dentro_cobertura"] = None
return item
geometria = _carregar_geometria_modelo_com_cache(Path(caminho_texto))
distancia = _calcular_distancia_geometria_cache(geometria, aval_lat, aval_lon)
item["distancia_km"] = distancia.get("distancia_km")
item["distancia_label"] = distancia.get("distancia_label")
item["avaliando_dentro_cobertura"] = distancia.get("avaliando_dentro_cobertura")
return item
def _calcular_distancia_geometria_cache(
geometria: dict[str, Any] | None,
aval_lat: float | None,
aval_lon: float | None,
) -> dict[str, Any]:
if geometria is None or aval_lat is None or aval_lon is None:
return {"distancia_km": None, "distancia_label": "-", "avaliando_dentro_cobertura": None}
geom_metric = geometria.get("geom_metric")
if geom_metric is None:
return {"distancia_km": None, "distancia_label": "-", "avaliando_dentro_cobertura": None}
ponto_metric = _criar_ponto_metrico(aval_lat, aval_lon)
if ponto_metric is None:
return {"distancia_km": None, "distancia_label": "-", "avaliando_dentro_cobertura": None}
try:
distancia_m = float(ponto_metric.distance(geom_metric))
except Exception:
return {"distancia_km": None, "distancia_label": "-", "avaliando_dentro_cobertura": None}
if not np.isfinite(distancia_m):
return {"distancia_km": None, "distancia_label": "-", "avaliando_dentro_cobertura": None}
distancia_km = max(0.0, distancia_m / 1000.0)
return {
"distancia_km": float(distancia_km),
"distancia_label": _formatar_distancia_km(distancia_km),
"avaliando_dentro_cobertura": bool(distancia_m <= 1e-6),
}
def _formatar_distancia_km(distancia_km: float | None) -> str:
if distancia_km is None or not np.isfinite(distancia_km):
return "-"
return f"{float(distancia_km):,.2f} km".replace(",", "X").replace(".", ",").replace("X", ".")
def _transformar_geometria_para_metrico(geom: Any) -> Any:
if geom is None:
return None
try:
from pyproj import Transformer
from shapely.ops import transform
except ImportError:
return None
try:
transformer = Transformer.from_crs("EPSG:4326", DISTANCIA_METRIC_CRS, always_xy=True)
return transform(transformer.transform, geom)
except Exception:
return None
def _transformar_geometria_para_wgs84(geom: Any) -> Any:
if geom is None:
return None
try:
from pyproj import Transformer
from shapely.ops import transform
except ImportError:
return None
try:
transformer = Transformer.from_crs(DISTANCIA_METRIC_CRS, "EPSG:4326", always_xy=True)
return transform(transformer.transform, geom)
except Exception:
return None
def _coordenadas_wgs84_para_metricas(coordenadas: np.ndarray) -> np.ndarray | None:
arr = np.asarray(coordenadas, dtype=float)
if arr.ndim != 2 or arr.shape[1] != 2 or arr.size == 0:
return None
try:
from pyproj import Transformer
except ImportError:
return None
try:
transformer = Transformer.from_crs("EPSG:4326", DISTANCIA_METRIC_CRS, always_xy=True)
xs, ys = transformer.transform(arr[:, 0], arr[:, 1])
except Exception:
return None
metricas = np.column_stack([xs, ys]).astype(float, copy=False)
metricas = metricas[np.isfinite(metricas).all(axis=1)]
if metricas.size == 0:
return None
return metricas
def _distancia_tipica_pontos_metricos(coordenadas_metricas: np.ndarray) -> float:
pontos = np.asarray(coordenadas_metricas, dtype=float)
if pontos.ndim != 2 or pontos.shape[0] < 2:
return 1.0
try:
from scipy.spatial import cKDTree
tree = cKDTree(pontos)
distancias, _ = tree.query(pontos, k=min(2, len(pontos)))
if getattr(distancias, "ndim", 0) == 1:
vizinhos = distancias[distancias > 1e-9]
else:
vizinhos = distancias[:, 1]
except Exception:
delta = pontos[:, None, :] - pontos[None, :, :]
matriz = np.sqrt(np.sum(delta * delta, axis=2))
matriz[matriz <= 1e-9] = np.inf
vizinhos = np.min(matriz, axis=1)
vizinhos = np.asarray(vizinhos, dtype=float)
vizinhos = vizinhos[np.isfinite(vizinhos) & (vizinhos > 1e-9)]
if vizinhos.size == 0:
return 1.0
return max(float(np.median(vizinhos)), 1.0)
def _normalizar_geometria_cobertura(geom: Any) -> Any:
if geom is None or getattr(geom, "is_empty", False):
return None
try:
from shapely.geometry import MultiPolygon, Polygon
from shapely.ops import unary_union
except ImportError:
return None
atual = geom
try:
if getattr(atual, "geom_type", "") == "GeometryCollection":
partes = [
parte
for parte in getattr(atual, "geoms", [])
if getattr(parte, "geom_type", "") in {"Polygon", "MultiPolygon"} and not getattr(parte, "is_empty", False)
]
if not partes:
return None
atual = unary_union(partes)
if getattr(atual, "geom_type", "") not in {"Polygon", "MultiPolygon"}:
return None
if not bool(getattr(atual, "is_valid", True)):
atual = atual.buffer(0)
if atual is None or getattr(atual, "is_empty", False):
return None
if getattr(atual, "geom_type", "") == "Polygon":
return Polygon(atual.exterior)
if getattr(atual, "geom_type", "") == "MultiPolygon":
partes = [Polygon(parte.exterior) for parte in atual.geoms if not getattr(parte, "is_empty", False)]
if not partes:
return None
unido = unary_union(partes)
if getattr(unido, "geom_type", "") in {"Polygon", "MultiPolygon"} and not getattr(unido, "is_empty", False):
return unido
except Exception:
return None
return None
def _geometria_cobre_todos_os_pontos(geom_metric: Any, pontos_metricos: Any, tolerancia_m: float) -> bool:
if geom_metric is None or pontos_metricos is None:
return False
if getattr(geom_metric, "geom_type", "") not in {"Polygon", "MultiPolygon"}:
return False
try:
tolerancia = max(float(tolerancia_m), 0.5)
return bool(geom_metric.buffer(tolerancia).covers(pontos_metricos))
except Exception:
return False
def _alpha_shape_metrico(coordenadas_metricas: np.ndarray, max_circumradius_m: float) -> Any:
pontos = np.asarray(coordenadas_metricas, dtype=float)
if pontos.ndim != 2 or pontos.shape[0] < 4:
return None
try:
from scipy.spatial import Delaunay, QhullError
from shapely.geometry import MultiLineString
from shapely.ops import polygonize, unary_union
except ImportError:
return None
try:
triangulacao = Delaunay(pontos)
except QhullError:
return None
except Exception:
return None
max_radius = float(max_circumradius_m)
if not np.isfinite(max_radius) or max_radius <= 0:
return None
arestas: set[tuple[int, int]] = set()
segmentos: list[tuple[tuple[float, float], tuple[float, float]]] = []
def registrar_aresta(idx_a: int, idx_b: int) -> None:
chave = tuple(sorted((int(idx_a), int(idx_b))))
if chave in arestas:
return
arestas.add(chave)
pa = pontos[int(idx_a)]
pb = pontos[int(idx_b)]
segmentos.append(((float(pa[0]), float(pa[1])), (float(pb[0]), float(pb[1]))))
for simplex in triangulacao.simplices:
ia, ib, ic = [int(valor) for valor in simplex]
pa, pb, pc = pontos[[ia, ib, ic]]
a = float(np.linalg.norm(pb - pc))
b = float(np.linalg.norm(pa - pc))
c = float(np.linalg.norm(pa - pb))
s = (a + b + c) / 2.0
area_sq = s * (s - a) * (s - b) * (s - c)
if area_sq <= 1e-12:
continue
area = math.sqrt(area_sq)
circumradius = (a * b * c) / max(4.0 * area, 1e-12)
if not np.isfinite(circumradius) or circumradius > max_radius:
continue
registrar_aresta(ia, ib)
registrar_aresta(ib, ic)
registrar_aresta(ic, ia)
if not segmentos:
return None
try:
linhas = MultiLineString(segmentos)
poligonos = list(polygonize(linhas))
if not poligonos:
return None
return unary_union(poligonos)
except Exception:
return None
def _construir_cobertura_concava_wgs84(coordenadas_wgs84: np.ndarray) -> Any:
try:
from shapely.geometry import MultiPoint
except ImportError:
return None
pontos_wgs84 = np.asarray(coordenadas_wgs84, dtype=float)
if pontos_wgs84.ndim != 2 or pontos_wgs84.shape[0] < 4:
return None
pontos_metricos_arr = _coordenadas_wgs84_para_metricas(pontos_wgs84)
if pontos_metricos_arr is None or len(pontos_metricos_arr) != len(pontos_wgs84):
return None
pontos_metricos = MultiPoint([tuple(item) for item in pontos_metricos_arr])
passo_tipico = _distancia_tipica_pontos_metricos(pontos_metricos_arr)
tolerancia = max(1.0, passo_tipico * 0.08)
try:
from shapely import concave_hull as shapely_concave_hull
except Exception:
shapely_concave_hull = None
if shapely_concave_hull is not None:
for ratio in (0.12, 0.18, 0.26, 0.36, 0.5, 0.72):
try:
candidato_metric = shapely_concave_hull(pontos_metricos, ratio=float(ratio), allow_holes=False)
except Exception:
continue
candidato_metric = _normalizar_geometria_cobertura(candidato_metric)
if not _geometria_cobre_todos_os_pontos(candidato_metric, pontos_metricos, tolerancia):
continue
candidato_wgs84 = _transformar_geometria_para_wgs84(candidato_metric)
candidato_wgs84 = _normalizar_geometria_cobertura(candidato_wgs84)
if candidato_wgs84 is not None:
return candidato_wgs84
for multiplicador in (2.2, 3.0, 4.0, 5.5, 7.5, 10.0, 13.5):
candidato_metric = _alpha_shape_metrico(pontos_metricos_arr, passo_tipico * float(multiplicador))
candidato_metric = _normalizar_geometria_cobertura(candidato_metric)
if not _geometria_cobre_todos_os_pontos(candidato_metric, pontos_metricos, tolerancia):
continue
candidato_wgs84 = _transformar_geometria_para_wgs84(candidato_metric)
candidato_wgs84 = _normalizar_geometria_cobertura(candidato_wgs84)
if candidato_wgs84 is not None:
return candidato_wgs84
return None
def _criar_ponto_metrico(lat: float, lon: float) -> Any:
try:
from shapely.geometry import Point
except ImportError:
return None
ponto = Point(float(lon), float(lat))
return _transformar_geometria_para_metrico(ponto)
def _adicionar_geometria_modelo_no_mapa(
camada_poligonos: folium.FeatureGroup,
camada_distancias: folium.FeatureGroup,
modelo: dict[str, Any],
aval_lat: float | None,
aval_lon: float | None,
) -> None:
geometria = modelo.get("geometria") or {}
geom_wgs84 = geometria.get("geom_wgs84")
if geom_wgs84 is None:
return
tooltip = str(modelo.get("nome") or "").strip() or "Modelo"
tooltip_distancia = _montar_tooltip_distancia_modelo(modelo)
if tooltip_distancia:
tooltip = f"{tooltip}{tooltip_distancia}"
geom_type = str(geometria.get("geom_type") or "")
cor = str(modelo.get("cor") or "#1f77b4")
try:
if geom_type == "Polygon":
coords = [[float(lat), float(lon)] for lon, lat in list(geom_wgs84.exterior.coords)]
folium.Polygon(
locations=coords,
color=cor,
fill=True,
fill_color=cor,
fill_opacity=0.12,
weight=2,
tooltip=tooltip,
).add_to(camada_poligonos)
elif geom_type == "MultiPolygon":
for poligono in list(getattr(geom_wgs84, "geoms", [])):
coords = [[float(lat), float(lon)] for lon, lat in list(poligono.exterior.coords)]
folium.Polygon(
locations=coords,
color=cor,
fill=True,
fill_color=cor,
fill_opacity=0.12,
weight=2,
tooltip=tooltip,
).add_to(camada_poligonos)
elif geom_type == "LineString":
coords = [[float(lat), float(lon)] for lon, lat in list(geom_wgs84.coords)]
folium.PolyLine(locations=coords, color=cor, weight=3, opacity=0.8, tooltip=tooltip).add_to(camada_poligonos)
elif geom_type == "MultiLineString":
for linha in list(getattr(geom_wgs84, "geoms", [])):
coords = [[float(lat), float(lon)] for lon, lat in list(linha.coords)]
folium.PolyLine(locations=coords, color=cor, weight=3, opacity=0.8, tooltip=tooltip).add_to(camada_poligonos)
elif geom_type == "Point":
folium.CircleMarker(
location=[float(geom_wgs84.y), float(geom_wgs84.x)],
radius=6,
color=cor,
fill=True,
fill_color=cor,
fill_opacity=0.7,
tooltip=tooltip,
).add_to(camada_poligonos)
elif geom_type == "MultiPoint":
for ponto in list(getattr(geom_wgs84, "geoms", [])):
folium.CircleMarker(
location=[float(ponto.y), float(ponto.x)],
radius=6,
color=cor,
fill=True,
fill_color=cor,
fill_opacity=0.7,
tooltip=tooltip,
).add_to(camada_poligonos)
except Exception:
return
if aval_lat is None or aval_lon is None:
return
try:
from shapely.geometry import Point
from shapely.ops import nearest_points
except ImportError:
return
try:
aval_point = Point(float(aval_lon), float(aval_lat))
_, nearest_geom = nearest_points(aval_point, geom_wgs84)
distancia_km = _to_float_or_none(modelo.get("distancia_km"))
if nearest_geom is None or distancia_km is None or distancia_km <= 0:
return
folium.PolyLine(
locations=[
[float(aval_point.y), float(aval_point.x)],
[float(nearest_geom.y), float(nearest_geom.x)],
],
color=cor,
weight=2,
opacity=0.65,
dash_array="6,6",
tooltip=f'Ligacao de distancia • {tooltip}',
).add_to(camada_distancias)
except Exception:
return
def _extrair_bounds_geometria_wgs84(geometria: dict[str, Any] | None) -> list[list[float]]:
if geometria is None or geometria.get("geom_wgs84") is None:
return []
try:
min_x, min_y, max_x, max_y = geometria["geom_wgs84"].bounds
except Exception:
return []
return [[float(min_y), float(min_x)], [float(max_y), float(max_x)]]
def _resolver_via_por_cdlog(cdlog: int) -> dict[str, Any]:
catalogo = _carregar_catalogo_vias()
for item in catalogo:
if int(item["cdlog"]) == int(cdlog):
return item
raise HTTPException(status_code=404, detail="CDLOG informado nao foi encontrado nos eixos")
def _resolver_via_por_logradouro(logradouro: str | None) -> dict[str, Any]:
texto = str(logradouro or "").strip()
if not texto:
raise HTTPException(status_code=400, detail="Informe o logradouro para localizar o avaliando")
consulta = _normalize(texto)
catalogo = _carregar_catalogo_vias()
melhores: list[tuple[int, dict[str, Any]]] = []
for item in catalogo:
score = _score_logradouro(consulta, item)
if score <= 0:
continue
melhores.append((score, item))
if not melhores:
raise HTTPException(status_code=404, detail="Nenhum logradouro compativel foi encontrado nos eixos")
melhores.sort(key=lambda entry: (-entry[0], str(entry[1].get("logradouro") or "").lower(), int(entry[1].get("cdlog") or 0)))
melhor_score, melhor_item = melhores[0]
if melhor_score < 60:
sugestoes = ", ".join(str(item.get("logradouro") or "") for _, item in melhores[:5])
detalhe = "Nao foi possivel identificar com seguranca o logradouro informado."
if sugestoes:
detalhe += f" Sugestoes: {sugestoes}."
raise HTTPException(status_code=400, detail=detalhe)
return melhor_item
def _carregar_catalogo_vias() -> list[dict[str, Any]]:
global _CATALOGO_VIAS_CACHE
if _CATALOGO_VIAS_CACHE is not None:
return list(_CATALOGO_VIAS_CACHE)
registros: list[dict[str, Any]] = []
try:
registros = load_attribute_records(
resolve_core_path("dados", "EixosLogradouros.shp"),
property_fields=("CDLOG", "NMIDELOG", "NMIDEPRE", "NMIDEABR", "CDIDECAT"),
)
append_runtime_log(f"[mesa] pesquisa: catalogo leve de logradouros carregado com {len(registros)} registros")
except Exception as exc:
append_runtime_log(f"[mesa] pesquisa: falha no catalogo leve de logradouros: {exc}")
def _catalogo_from_rows(rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
vistos: set[int] = set()
catalogo_local: list[dict[str, Any]] = []
for row in rows:
cdlog = _to_int_or_none(row.get("CDLOG"))
if cdlog is None or cdlog in vistos:
continue
vistos.add(cdlog)
nome = str(row.get("NMIDELOG") or "").strip()
prefixo = str(row.get("NMIDEPRE") or "").strip()
abreviado = str(row.get("NMIDEABR") or "").strip()
categoria = str(row.get("CDIDECAT") or "").strip()
logradouro, aliases, display_label = _montar_logradouro_catalogo(nome, prefixo, abreviado, categoria)
catalogo_local.append(
{
"cdlog": cdlog,
"logradouro": logradouro,
"display_label": display_label,
"_aliases_norm": [_normalize(item) for item in aliases if _normalize(item)],
}
)
return catalogo_local
if registros:
catalogo = _catalogo_from_rows(registros)
_CATALOGO_VIAS_CACHE = list(catalogo)
append_runtime_log(f"[mesa] pesquisa: catalogo de logradouros carregado com {len(catalogo)} vias")
return list(catalogo)
gdf = geocodificacao.carregar_eixos()
if gdf is None or gdf.empty:
append_runtime_log("[mesa] pesquisa: base de eixos indisponivel para catalogo de logradouros")
raise HTTPException(status_code=500, detail="Base de eixos indisponivel para localizar o avaliando")
cols = {str(col).upper(): col for col in gdf.columns}
cdlog_col = cols.get("CDLOG")
nome_col = cols.get("NMIDELOG")
prefixo_col = cols.get("NMIDEPRE")
abreviado_col = cols.get("NMIDEABR")
categoria_col = cols.get("CDIDECAT")
if cdlog_col is None or nome_col is None:
append_runtime_log("[mesa] pesquisa: colunas obrigatorias dos eixos nao foram encontradas")
raise HTTPException(status_code=500, detail="Base de eixos sem colunas necessarias para localizar o avaliando")
vistos: set[int] = set()
catalogo: list[dict[str, Any]] = []
for _, row in gdf.iterrows():
cdlog = _to_int_or_none(row.get(cdlog_col))
if cdlog is None or cdlog in vistos:
continue
vistos.add(cdlog)
nome = str(row.get(nome_col) or "").strip()
prefixo = str(row.get(prefixo_col) or "").strip() if prefixo_col else ""
abreviado = str(row.get(abreviado_col) or "").strip() if abreviado_col else ""
categoria = str(row.get(categoria_col) or "").strip() if categoria_col else ""
logradouro, aliases, display_label = _montar_logradouro_catalogo(nome, prefixo, abreviado, categoria)
catalogo.append(
{
"cdlog": cdlog,
"logradouro": logradouro,
"display_label": display_label,
"_aliases_norm": [_normalize(item) for item in aliases if _normalize(item)],
}
)
_CATALOGO_VIAS_CACHE = list(catalogo)
append_runtime_log(f"[mesa] pesquisa: catalogo de logradouros carregado com {len(catalogo)} vias")
return list(catalogo)
def _score_logradouro(consulta: str, item: dict[str, Any]) -> int:
if not consulta:
return 0
aliases = [str(alias or "").strip() for alias in item.get("_aliases_norm") or [] if str(alias or "").strip()]
if not aliases:
return 0
melhor = 0
consulta_tokens = [tok for tok in consulta.split() if tok]
for alias in aliases:
if alias == consulta:
melhor = max(melhor, 120)
continue
if alias.startswith(consulta):
melhor = max(melhor, 100)
continue
if consulta in alias:
melhor = max(melhor, 82)
if consulta_tokens and all(tok in alias for tok in consulta_tokens):
melhor = max(melhor, 72)
return melhor
def _montar_logradouro_catalogo(
nome: str,
prefixo: str,
abreviado: str,
categoria: str,
) -> tuple[str, list[str], str]:
nome_limpo = str(nome or "").strip()
prefixo_limpo = str(prefixo or "").strip()
abreviado_limpo = re.sub(r"\s+", " ", str(abreviado or "").strip())
categoria_limpa = str(categoria or "").strip().upper()
tipo_abrev = TIPO_LOGRADOURO_ABREV.get(categoria_limpa, categoria_limpa)
tipo_expandido = TIPO_LOGRADOURO_EXPANDIDO.get(categoria_limpa, categoria_limpa)
partes_base = [item for item in [tipo_abrev, prefixo_limpo, nome_limpo] if item]
partes_extenso = [item for item in [tipo_expandido, prefixo_limpo, nome_limpo] if item]
logradouro = abreviado_limpo or " ".join(partes_base).strip() or nome_limpo
display_label = abreviado_limpo or logradouro
if nome_limpo and _normalize(nome_limpo) not in {_normalize(display_label), _normalize(logradouro)}:
display_label = f"{display_label} ({nome_limpo})"
aliases = _dedupe_strings(
[
logradouro,
display_label,
abreviado_limpo,
" ".join(partes_base).strip(),
" ".join(partes_extenso).strip(),
nome_limpo,
]
)
return logradouro, aliases, display_label
def _to_int_or_none(value: Any) -> int | None:
if _is_empty(value):
return None
try:
return int(float(str(value).strip()))
except Exception:
return None
def _extrair_bairros(df: pd.DataFrame) -> list[str]:
candidatos = [col for col in df.columns if _has_alias(str(col), BAIRRO_ALIASES)]
bairros: list[str] = []
vistos = set()
for col in candidatos:
serie = df[col]
for valor in serie.dropna():
texto = str(valor).strip()
chave = _normalize(texto)
if not texto or not chave or chave in vistos:
continue
vistos.add(chave)
bairros.append(texto)
return sorted(bairros, key=lambda item: item.lower())
def _extrair_finalidades(df: pd.DataFrame) -> list[str]:
candidatos = [col for col in df.columns if _has_alias(str(col), FINALIDADE_ALIASES)]
finalidades: list[str] = []
vistos = set()
for col in candidatos:
serie = df[col]
for valor in serie.dropna().head(5000):
texto = str(valor).strip()
chave = _normalize(texto)
if not texto or not chave or chave in vistos:
continue
vistos.add(chave)
finalidades.append(texto)
if len(finalidades) >= 20:
break
if len(finalidades) >= 20:
break
return finalidades
def _extrair_faixa_data_dataframe(df: pd.DataFrame) -> dict[str, Any] | None:
for col in _colunas_data_reais(df):
faixa = _extrair_faixa_data_real_serie(df[col])
if faixa is not None:
return faixa
return None
def _colunas_data_reais(df: pd.DataFrame | None) -> list[str]:
if df is None or df.empty:
return []
candidatos = [str(col) for col in df.columns if _has_alias(str(col), DATA_ALIASES)]
return [col for col in candidatos if not _parece_coluna_apenas_ano(col)]
def _parece_coluna_apenas_ano(nome_coluna: str) -> bool:
nome_norm = _normalize(nome_coluna)
if not nome_norm:
return False
tokens_ano = [
"ano",
"anodado",
"anoconstr",
"anobconstrucao",
"anocconstrucao",
"imoanomaior",
"pmianomaior",
]
return any(token in nome_norm for token in tokens_ano)
def _extrair_faixa_data_real_serie(serie: pd.Series) -> dict[str, Any] | None:
serie_limpa = serie.dropna()
if serie_limpa.empty:
return None
bruto = serie_limpa.astype(str).str.strip()
bruto = bruto[bruto != ""]
if bruto.empty:
return None
# Rejeita colunas que sejam basicamente ano puro (YYYY), pois nao suportam
# intervalo de datas reais (dia/mes/ano).
proporcao_ano_puro = float(bruto.str.fullmatch(r"\d{4}").mean())
if proporcao_ano_puro >= 0.8:
return None
if bruto.str.match(r"^\d{4}-\d{2}-\d{2}(?:[T\s].*)?$").all():
serie_data = pd.to_datetime(bruto.str.slice(0, 10), errors="coerce", format="%Y-%m-%d").dropna()
elif bruto.str.match(r"^\d{4}/\d{2}/\d{2}(?:[T\s].*)?$").all():
serie_data = pd.to_datetime(bruto.str.slice(0, 10), errors="coerce", format="%Y/%m/%d").dropna()
elif bruto.str.match(r"^\d{2}/\d{2}/\d{4}(?:[T\s].*)?$").all():
serie_data = pd.to_datetime(bruto.str.slice(0, 10), errors="coerce", format="%d/%m/%Y").dropna()
else:
try:
serie_data = pd.to_datetime(bruto, errors="coerce", dayfirst=True, format="mixed").dropna()
except TypeError:
serie_data = pd.to_datetime(bruto, errors="coerce", dayfirst=True).dropna()
total = len(bruto)
minimo_amostras = min(3, total)
if len(serie_data) < minimo_amostras:
return None
if total > 0 and (len(serie_data) / total) < 0.6:
return None
return {
"min": serie_data.min().date().isoformat(),
"max": serie_data.max().date().isoformat(),
}
def _extrair_faixa_por_alias(estat_df: pd.DataFrame | None, aliases: list[str]) -> dict[str, Any] | None:
if estat_df is None or estat_df.empty:
return None
estat_indexado = estat_df.copy()
if "Variável" in estat_indexado.columns:
estat_indexado = estat_indexado.set_index("Variável")
if estat_indexado.empty:
return None
min_col = _buscar_coluna(estat_indexado.columns, ["minimo", "mínimo", "min"])
max_col = _buscar_coluna(estat_indexado.columns, ["maximo", "máximo", "max"])
if min_col is None or max_col is None:
return None
nomes = estat_indexado.index.astype(str).tolist()
if not nomes:
return None
mask_alias = np.fromiter((_has_alias(nome, aliases) for nome in nomes), dtype=bool, count=len(nomes))
if not mask_alias.any():
return None
trabalho = estat_indexado.loc[mask_alias, [min_col, max_col]]
mins = [valor for valor in trabalho[min_col].tolist() if not _is_empty(valor)]
maxs = [valor for valor in trabalho[max_col].tolist() if not _is_empty(valor)]
if not mins and not maxs:
return None
return {
"min": _min_value(mins),
"max": _max_value(maxs),
}
def _resumo_variaveis(estat_df: pd.DataFrame | None, limite: int = 12) -> list[dict[str, Any]]:
if estat_df is None or estat_df.empty:
return []
trabalho = estat_df.copy()
if "Variável" in trabalho.columns:
trabalho = trabalho.set_index("Variável")
if trabalho.empty:
return []
min_col = _buscar_coluna(trabalho.columns, ["minimo", "mínimo", "min"])
max_col = _buscar_coluna(trabalho.columns, ["maximo", "máximo", "max"])
if min_col is None or max_col is None:
return []
linhas: list[dict[str, Any]] = []
nomes = trabalho.index.astype(str).tolist()
mins = trabalho[min_col].tolist()
maxs = trabalho[max_col].tolist()
for var, min_val, max_val in zip(nomes, mins, maxs):
if _is_empty(min_val) and _is_empty(max_val):
continue
linhas.append(
{
"variavel": var,
"min": sanitize_value(min_val),
"max": sanitize_value(max_val),
}
)
if len(linhas) >= limite:
break
return linhas
def _coletar_colunas_para_catalogo(estat_df: pd.DataFrame | None, df: pd.DataFrame | None) -> list[str]:
nomes: list[str] = []
if estat_df is not None and not estat_df.empty:
if "Variável" in estat_df.columns:
nomes.extend([str(v) for v in estat_df["Variável"].dropna().tolist()])
else:
nomes.extend([str(v) for v in estat_df.index.tolist()])
if df is not None and not df.empty:
nomes.extend([str(v) for v in df.columns.tolist()])
unicos = []
vistos = set()
for nome in nomes:
chave = _normalize(nome)
if not chave or chave in vistos:
continue
vistos.add(chave)
unicos.append(nome)
return unicos
def _mapear_compatibilidade(colunas: list[str]) -> dict[str, list[str]]:
out: dict[str, list[str]] = {}
for chave, aliases in COMPATIBILIDADE_MAP.items():
encontrados = [col for col in colunas if _has_alias(col, aliases)]
out[chave] = encontrados
return out
def _tem_colunas_mapa(df: pd.DataFrame | None) -> bool:
if df is None or df.empty:
return False
nomes = [str(col) for col in df.columns]
tem_lat = any(_has_alias(col, LAT_ALIASES) for col in nomes)
tem_lon = any(_has_alias(col, LON_ALIASES) for col in nomes)
return tem_lat and tem_lon
def _aceita_filtros(modelo: dict[str, Any], filtros: PesquisaFiltros, fontes_admin: dict[str, list[str]] | None = None) -> bool:
fontes_admin = fontes_admin or {}
if filtros.nome and not _contains_any([modelo.get("nome_modelo"), modelo.get("arquivo")], filtros.nome):
return False
if filtros.autor and not _contains_any([modelo.get("autor")], filtros.autor):
return False
app_flag = _normalizar_contem_app(filtros.contem_app)
if app_flag is not None and _modelo_contem_variavel(modelo, APP_ALIASES) != app_flag:
return False
if filtros.tipo_modelo and not _contains_any([_tipo_modelo_modelo(modelo)], filtros.tipo_modelo):
return False
negociacao_modelo = _normalizar_negociacao_modelo(filtros.negociacao_modelo)
if negociacao_modelo and _negociacao_modelo_modelo(modelo) != negociacao_modelo:
return False
if filtros.finalidade and not _aceita_texto_com_colunas(modelo, filtros.finalidade, "finalidade", fontes_admin.get("finalidade")):
return False
if filtros.endereco and not _contains_any([modelo.get("endereco_referencia"), ", ".join(modelo.get("bairros") or [])], filtros.endereco):
return False
termos_bairro = _extrair_termos_bairro(filtros)
if termos_bairro:
for termo in termos_bairro:
if not _aceita_texto_com_colunas(modelo, termo, "bairros", fontes_admin.get("bairros")):
return False
if not _aceita_range_com_colunas(modelo, "area", fontes_admin.get("area"), filtros.area_min, filtros.area_max):
return False
if not _aceita_range_com_colunas(modelo, "rh", fontes_admin.get("rh"), filtros.rh_min, filtros.rh_max):
return False
data_min = filtros.data_min
data_max = filtros.data_max
if _is_provided(filtros.aval_data) and not _is_provided(data_min) and not _is_provided(data_max):
data_min = filtros.aval_data
data_max = filtros.aval_data
if not _aceita_range_contido_com_colunas(modelo, "data", fontes_admin.get("data"), data_min, data_max):
return False
return True
def _normalizar_contem_app(value: str | None) -> bool | None:
chave = _normalize(value or "")
if not chave:
return None
if chave in {"sim", "true", "1", "com", "contem"}:
return True
if chave in {"nao", "false", "0", "sem"}:
return False
return None
def _normalizar_negociacao_modelo(value: str | None) -> str | None:
chave = _normalize(value or "")
if not chave:
return None
if chave in {"aluguel", "locacao", "a"}:
return "aluguel"
if chave in {"venda", "v"}:
return "venda"
return None
def _normalizar_otica(value: str | None) -> str:
return "avaliando"
def _anexar_avaliando_info(
modelo: dict[str, Any],
filtros: PesquisaFiltros,
fontes_admin: dict[str, list[str]] | None = None,
) -> dict[str, Any]:
fontes_admin = fontes_admin or {}
item = dict(modelo)
checks: list[dict[str, Any]] = []
rejeicoes: list[str] = []
def registrar(campo: str, informado: Any, aceito: bool, detalhe: str) -> None:
check = {"campo": campo, "informado": sanitize_value(informado), "aceito": bool(aceito), "detalhe": detalhe}
checks.append(check)
if _is_provided(informado) and not aceito:
rejeicoes.append(f"{campo}: {detalhe}")
finalidade_info = filtros.aval_finalidade
if _is_provided(finalidade_info):
termos_finalidade = _split_terms(str(finalidade_info))
aceito = any(
_aceita_texto_com_colunas(item, termo, "aval_finalidade", fontes_admin.get("aval_finalidade"))
for termo in termos_finalidade
) if termos_finalidade else False
detalhe = "nenhuma das finalidades informadas foi encontrada no modelo" if len(termos_finalidade) > 1 else "nao encontrada no modelo"
registrar("finalidade", finalidade_info, aceito, detalhe)
bairro_info = filtros.aval_bairro
if _is_provided(bairro_info):
termos_bairro = _split_terms(str(bairro_info))
aceito = any(
_aceita_texto_com_colunas(item, termo, "aval_bairro", fontes_admin.get("aval_bairro"))
for termo in termos_bairro
) if termos_bairro else False
detalhe = "nenhum dos bairros informados esta na cobertura do modelo" if len(termos_bairro) > 1 else "bairro fora da cobertura do modelo"
registrar("bairro", bairro_info, aceito, detalhe)
zona_info = filtros.aval_zona
if _is_provided(zona_info):
zonas_informadas = _extrair_termos_zona(str(zona_info))
zonas_modelo = _zonas_avaliacao_modelo(item)
zonas_modelo_norm = {_normalize(zona) for zona in zonas_modelo}
aceito = any(_normalize(zona) in zonas_modelo_norm for zona in zonas_informadas) if zonas_informadas else False
detalhe = (
"nenhuma das zonas informadas foi encontrada no modelo"
if len(zonas_informadas) > 1
else "zona fora da cobertura do modelo"
)
registrar("zona_avaliacao", zona_info, aceito, detalhe)
endereco_info = filtros.aval_endereco
if _is_provided(endereco_info):
candidatos = [item.get("endereco_referencia"), ", ".join(item.get("bairros") or [])]
aceito = _contains_any(candidatos, str(endereco_info))
registrar("endereco", endereco_info, aceito, "sem correspondencia textual no modelo")
data_min = filtros.data_min
data_max = filtros.data_max
if _is_provided(filtros.aval_data) and not _is_provided(data_min) and not _is_provided(data_max):
data_min = filtros.aval_data
data_max = filtros.aval_data
faixa_data_ref = _faixa_resumo_com_colunas(item, "data", fontes_admin.get("data"))
registrar(
"data",
_periodo_texto_informado(data_min, data_max),
_aceita_range_contido_com_colunas(item, "data", fontes_admin.get("data"), data_min, data_max),
f"fora da faixa {formatar_faixa(faixa_data_ref)}",
)
faixa_rh_ref = _faixa_resumo_com_colunas(item, "aval_rh", fontes_admin.get("aval_rh"))
registrar(
"rh",
filtros.aval_rh,
_aceita_valor_com_colunas(item, "aval_rh", fontes_admin.get("aval_rh"), filtros.aval_rh),
f"fora da faixa {formatar_faixa(faixa_rh_ref)}",
)
faixa_area_ref = _faixa_resumo_com_colunas(item, "aval_area", fontes_admin.get("aval_area"))
registrar(
"area",
filtros.aval_area,
_aceita_valor_com_colunas(item, "aval_area", fontes_admin.get("aval_area"), filtros.aval_area),
f"fora da faixa {formatar_faixa(faixa_area_ref)}",
)
faixa_area_priv = _faixa_resumo_com_colunas(item, "aval_area_privativa", fontes_admin.get("aval_area_privativa"))
faixa_area_total = _faixa_resumo_com_colunas(item, "aval_area_total", fontes_admin.get("aval_area_total"))
faixa_valor_unit = _faixa_resumo_com_colunas(item, "aval_valor_unitario", fontes_admin.get("aval_valor_unitario"))
faixa_valor_tot = _faixa_resumo_com_colunas(item, "aval_valor_total", fontes_admin.get("aval_valor_total"))
registrar(
"area_privativa",
filtros.aval_area_privativa,
_aceita_valor_com_colunas(item, "aval_area_privativa", fontes_admin.get("aval_area_privativa"), filtros.aval_area_privativa),
f"fora da faixa {formatar_faixa(faixa_area_priv)}",
)
registrar(
"area_total",
filtros.aval_area_total,
_aceita_valor_com_colunas(item, "aval_area_total", fontes_admin.get("aval_area_total"), filtros.aval_area_total),
f"fora da faixa {formatar_faixa(faixa_area_total)}",
)
registrar(
"valor_unitario",
filtros.aval_valor_unitario,
_aceita_valor_com_colunas(item, "aval_valor_unitario", fontes_admin.get("aval_valor_unitario"), filtros.aval_valor_unitario),
f"fora da faixa {formatar_faixa(faixa_valor_unit)}",
)
registrar(
"valor_total",
filtros.aval_valor_total,
_aceita_valor_com_colunas(item, "aval_valor_total", fontes_admin.get("aval_valor_total"), filtros.aval_valor_total),
f"fora da faixa {formatar_faixa(faixa_valor_tot)}",
)
checks_informados = [check for check in checks if _is_provided(check.get("informado"))]
aceito = all(check.get("aceito") for check in checks_informados) if checks_informados else True
item["avaliando"] = {
"aceito": bool(aceito),
"checks": checks,
"motivos_rejeicao": rejeicoes,
"campos_informados": len(checks_informados),
}
return item
def _extrair_sugestoes(
modelos: list[dict[str, Any]],
fontes_admin: dict[str, list[str]] | None = None,
limite: int = 200,
incluir_logradouros_eixos: bool = True,
) -> dict[str, list[str]]:
fontes_admin = fontes_admin or {}
nomes: list[str] = []
autores: list[str] = []
finalidades: list[str] = []
bairros: list[str] = []
enderecos: list[str] = []
logradouros_eixos: list[str] = []
tipos_modelo: list[str] = []
zonas_avaliacao: list[str] = []
fontes_finalidade = _dedupe_strings((fontes_admin.get("finalidade") or []) + (fontes_admin.get("aval_finalidade") or []))
fontes_bairro = _dedupe_strings((fontes_admin.get("bairros") or []) + (fontes_admin.get("aval_bairro") or []))
for modelo in modelos:
nomes.extend(_nomes_modelo_sugestao(modelo))
autores.append(str(modelo.get("autor") or ""))
if fontes_finalidade:
finalidades.extend(_valores_para_fontes(modelo, fontes_finalidade))
else:
finalidades.append(str(modelo.get("finalidade") or ""))
finalidades.extend([str(item) for item in (modelo.get("finalidades") or [])])
if fontes_bairro:
bairros.extend(_valores_para_fontes(modelo, fontes_bairro))
else:
bairros.extend([str(item) for item in (modelo.get("bairros") or [])])
enderecos.append(str(modelo.get("endereco_referencia") or ""))
tipos_modelo.append(str(_tipo_modelo_modelo(modelo) or ""))
zonas_avaliacao.extend(_zonas_avaliacao_modelo(modelo))
if incluir_logradouros_eixos:
try:
logradouros_eixos = [
str(item.get("logradouro") or "").strip()
for item in _carregar_catalogo_vias()
]
except HTTPException:
logradouros_eixos = []
return {
"nomes_modelo": _lista_textos_unicos(nomes, limite),
"autores": _lista_textos_unicos(autores, limite),
"finalidades": _lista_textos_unicos(finalidades, limite),
"bairros": _lista_textos_unicos(bairros, limite),
"enderecos": _lista_textos_unicos(enderecos, limite),
"logradouros_eixos": _lista_textos_unicos(logradouros_eixos, None),
"tipos_modelo": _lista_textos_unicos(tipos_modelo, limite),
"zonas_avaliacao": _lista_zonas_unicas(zonas_avaliacao, limite),
}
def listar_logradouros_eixos(limite: int | None = None) -> dict[str, Any]:
try:
opcoes: list[dict[str, str]] = []
vistos: set[str] = set()
for item in _carregar_catalogo_vias():
value = str(item.get("logradouro") or "").strip()
if not value or value in vistos:
continue
vistos.add(value)
label = str(item.get("display_label") or value).strip() or value
opcoes.append({"value": value, "label": label})
except HTTPException:
opcoes = []
opcoes.sort(key=lambda item: (str(item.get("label") or "").casefold(), str(item.get("value") or "").casefold()))
if limite is not None and limite > 0:
opcoes = opcoes[:limite]
return sanitize_value(
{
"logradouros_eixos": opcoes,
"total_logradouros": len(opcoes),
}
)
def _nomes_modelo_sugestao(modelo: dict[str, Any]) -> list[str]:
nomes: list[str] = []
vistos = set()
for candidato in [modelo.get("nome_modelo"), modelo.get("arquivo")]:
texto = _texto_nome_modelo_sugestao(candidato)
if not texto:
continue
chave = _normalize(texto)
if not chave or chave in vistos:
continue
vistos.add(chave)
nomes.append(texto)
return nomes
def _texto_nome_modelo_sugestao(value: Any) -> str | None:
texto = _str_or_none(value)
if not texto:
return None
if texto.lower().endswith(".dai"):
texto = texto[:-4].strip()
return texto or None
def _modelo_publico(modelo: dict[str, Any]) -> dict[str, Any]:
return {chave: valor for chave, valor in modelo.items() if not str(chave).startswith("_")}
def _montar_config_colunas_filtro(modelos: list[dict[str, Any]]) -> dict[str, Any]:
config: dict[str, Any] = {}
campos = list(CAMPO_TEXTO_META_FONTES.keys()) + [campo for campo in CAMPO_FAIXA_META_FONTES.keys() if campo not in CAMPO_TEXTO_META_FONTES]
for campo in campos:
fontes_campo = set()
fontes_padrao = set()
for modelo in modelos:
if campo in CAMPO_TEXTO_META_FONTES:
fontes_campo.update(_fontes_texto_disponiveis(modelo, campo))
fontes_padrao.update(_fontes_texto_padrao(modelo, campo))
else:
fontes_campo.update(_fontes_faixa_disponiveis(modelo, campo))
fontes_padrao.update(_fontes_faixa_padrao(modelo, campo))
fontes = sorted(fontes_campo, key=lambda item: _rotulo_fonte(item).lower())
padrao = [item for item in sorted(fontes_padrao, key=lambda item: _rotulo_fonte(item).lower()) if item in set(fontes)]
if not padrao:
padrao = list(fontes)
config[campo] = {
"disponiveis": [{"id": fonte, "label": _rotulo_fonte(fonte)} for fonte in fontes],
"padrao": padrao,
}
return config
def _carregar_fontes_admin(colunas_filtro: dict[str, Any]) -> dict[str, list[str]]:
with _ADMIN_CONFIG_LOCK:
raw = dict(_ADMIN_FONTES_SESSION or {})
return _normalizar_fontes_admin(raw, colunas_filtro)
def _salvar_fontes_admin_sessao(fontes_admin: dict[str, list[str]]) -> None:
payload = {campo: _dedupe_strings(valores) for campo, valores in (fontes_admin or {}).items()}
global _ADMIN_FONTES_SESSION
with _ADMIN_CONFIG_LOCK:
_ADMIN_FONTES_SESSION = payload
def _normalizar_fontes_admin(raw: dict[str, Any], colunas_filtro: dict[str, Any]) -> dict[str, list[str]]:
saida: dict[str, list[str]] = {}
for campo, config in (colunas_filtro or {}).items():
disponiveis = [str(item.get("id")) for item in (config.get("disponiveis") or []) if isinstance(item, dict) and item.get("id")]
padrao = [str(item) for item in (config.get("padrao") or []) if str(item).strip()]
escolhidas_raw = raw.get(campo) if isinstance(raw, dict) else None
escolhidas = [str(item) for item in (escolhidas_raw or []) if str(item).strip()]
set_disponiveis = set(disponiveis)
escolhidas_validas = [item for item in _dedupe_strings(escolhidas) if item in set_disponiveis]
if not escolhidas_validas:
escolhidas_validas = [item for item in _dedupe_strings(padrao) if item in set_disponiveis]
if not escolhidas_validas and disponiveis:
escolhidas_validas = [disponiveis[0]]
saida[campo] = escolhidas_validas
return saida
def _rotulo_fonte(fonte: str) -> str:
if fonte.startswith("col:"):
return fonte[4:]
if fonte.startswith("var:"):
return f"Variavel: {fonte[4:]}"
return FONTE_META_LABELS.get(fonte, fonte)
def _fontes_texto_disponiveis(modelo: dict[str, Any], campo: str) -> list[str]:
aliases = CAMPO_TEXTO_ALIASES_COLUNA.get(campo, [])
indice = modelo.get("_texto_colunas_index") or {}
if not isinstance(indice, dict):
return []
fontes: list[str] = []
for col in indice.keys():
col_text = str(col)
if _has_alias(col_text, aliases):
fontes.append(f"col:{col_text}")
return _dedupe_strings(fontes)
def _fontes_texto_padrao(modelo: dict[str, Any], campo: str) -> list[str]:
return _fontes_texto_disponiveis(modelo, campo)
def _fontes_faixa_disponiveis(modelo: dict[str, Any], campo: str) -> list[str]:
fontes: list[str] = []
for fonte_meta in CAMPO_FAIXA_META_FONTES.get(campo, []):
faixa_meta = _faixa_meta(modelo, fonte_meta)
if isinstance(faixa_meta, dict) and not (_is_empty(faixa_meta.get("min")) and _is_empty(faixa_meta.get("max"))):
fontes.append(fonte_meta)
aliases_coluna = CAMPO_FAIXA_ALIASES_COLUNA.get(campo, [])
indice_colunas = modelo.get("_faixa_colunas_index") or {}
if isinstance(indice_colunas, dict):
for col in indice_colunas.keys():
col_text = str(col)
if _has_alias(col_text, aliases_coluna):
fontes.append(f"col:{col_text}")
permite_variaveis = campo in CAMPO_FAIXA_ALIASES_VARIAVEL
indice_variaveis = modelo.get("_faixa_variaveis_index") or {}
if permite_variaveis and isinstance(indice_variaveis, dict):
for var in indice_variaveis.keys():
fontes.append(f"var:{str(var)}")
return _dedupe_strings(fontes)
def _fontes_faixa_padrao(modelo: dict[str, Any], campo: str) -> list[str]:
fontes: list[str] = []
for fonte_meta in CAMPO_FAIXA_META_FONTES.get(campo, []):
faixa_meta = _faixa_meta(modelo, fonte_meta)
if isinstance(faixa_meta, dict) and not (_is_empty(faixa_meta.get("min")) and _is_empty(faixa_meta.get("max"))):
fontes.append(fonte_meta)
aliases_coluna = CAMPO_FAIXA_ALIASES_COLUNA.get(campo, [])
indice_colunas = modelo.get("_faixa_colunas_index") or {}
if isinstance(indice_colunas, dict):
for col in indice_colunas.keys():
col_text = str(col)
if _has_alias(col_text, aliases_coluna):
fontes.append(f"col:{col_text}")
aliases_variavel = CAMPO_FAIXA_ALIASES_VARIAVEL.get(campo, [])
indice_variaveis = modelo.get("_faixa_variaveis_index") or {}
if isinstance(indice_variaveis, dict):
for var in indice_variaveis.keys():
var_text = str(var)
if _has_alias_exato(var_text, aliases_variavel):
fontes.append(f"var:{var_text}")
return _dedupe_strings(fontes)
def _dedupe_strings(values: list[str]) -> list[str]:
out: list[str] = []
seen = set()
for value in values:
text = str(value).strip()
if not text or text in seen:
continue
seen.add(text)
out.append(text)
return out
def _aceita_texto_com_colunas(modelo: dict[str, Any], consulta: str, campo: str, fontes_selecionadas: list[str] | None) -> bool:
fontes = _resolver_fontes_campo(modelo, campo, fontes_selecionadas)
candidatos = _valores_para_fontes(modelo, fontes)
return _contains_any(candidatos, consulta)
def _aceita_range_com_colunas(
modelo: dict[str, Any],
campo: str,
fontes_selecionadas: list[str] | None,
filtro_min: Any,
filtro_max: Any,
) -> bool:
if filtro_min is None and filtro_max is None:
return True
faixas = _faixas_para_campo(modelo, campo, fontes_selecionadas)
if not faixas:
return False
return any(_range_overlaps(faixa, filtro_min, filtro_max) for faixa in faixas)
def _aceita_range_contido_com_colunas(
modelo: dict[str, Any],
campo: str,
fontes_selecionadas: list[str] | None,
filtro_min: Any,
filtro_max: Any,
) -> bool:
if filtro_min is None and filtro_max is None:
return True
faixas = _faixas_para_campo(modelo, campo, fontes_selecionadas)
if not faixas:
return False
return any(_range_contains(faixa, filtro_min, filtro_max) for faixa in faixas)
def _aceita_valor_com_colunas(
modelo: dict[str, Any],
campo: str,
fontes_selecionadas: list[str] | None,
valor: Any,
) -> bool:
if not _is_provided(valor):
return True
return _aceita_range_com_colunas(modelo, campo, fontes_selecionadas, valor, valor)
def _faixa_resumo_com_colunas(modelo: dict[str, Any], campo: str, fontes_selecionadas: list[str] | None) -> dict[str, Any] | None:
faixas = _faixas_para_campo(modelo, campo, fontes_selecionadas)
return _combinar_faixas(faixas)
def _faixas_para_campo(modelo: dict[str, Any], campo: str, fontes_selecionadas: list[str] | None) -> list[dict[str, Any]]:
fontes = _resolver_fontes_faixa(modelo, campo, fontes_selecionadas)
return _faixas_para_fontes(modelo, fontes)
def _resolver_fontes_campo(modelo: dict[str, Any], campo: str, fontes_selecionadas: list[str] | None) -> list[str]:
disponiveis = _fontes_texto_disponiveis(modelo, campo)
selecionadas = _dedupe_strings([str(item) for item in (fontes_selecionadas or [])])
if selecionadas:
set_disponiveis = set(disponiveis)
return [item for item in selecionadas if item in set_disponiveis]
return disponiveis
def _resolver_fontes_faixa(modelo: dict[str, Any], campo: str, fontes_selecionadas: list[str] | None) -> list[str]:
disponiveis = _fontes_faixa_disponiveis(modelo, campo)
selecionadas = _dedupe_strings([str(item) for item in (fontes_selecionadas or [])])
if selecionadas:
set_disponiveis = set(disponiveis)
return [item for item in selecionadas if item in set_disponiveis]
return disponiveis
def _valores_para_fontes(modelo: dict[str, Any], fontes: list[str]) -> list[str]:
candidatos: list[str] = []
indice_colunas = modelo.get("_texto_colunas_index") or {}
if not isinstance(indice_colunas, dict):
indice_colunas = {}
for fonte in fontes:
if fonte.startswith("meta:"):
candidatos.extend(_valores_meta(modelo, fonte))
continue
if fonte.startswith("col:"):
col = fonte[4:]
valores = indice_colunas.get(col) or []
candidatos.extend([str(item) for item in valores if _str_or_none(item)])
return candidatos
def _valores_meta(modelo: dict[str, Any], fonte: str) -> list[str]:
if fonte == "meta:nome_modelo":
return [str(modelo.get("nome_modelo") or "")]
if fonte == "meta:arquivo":
return [str(modelo.get("arquivo") or "")]
if fonte == "meta:autor":
return [str(modelo.get("autor") or "")]
if fonte == "meta:finalidade":
return [str(modelo.get("finalidade") or "")]
if fonte == "meta:tipo_imovel":
return [str(modelo.get("tipo_imovel") or "")]
if fonte == "meta:finalidades":
return [str(item) for item in (modelo.get("finalidades") or [])]
if fonte == "meta:bairros":
return [str(item) for item in (modelo.get("bairros") or [])]
if fonte == "meta:endereco_referencia":
return [str(modelo.get("endereco_referencia") or "")]
return []
def _faixas_para_fontes(modelo: dict[str, Any], fontes: list[str]) -> list[dict[str, Any]]:
candidatos: list[dict[str, Any]] = []
indice_colunas = modelo.get("_faixa_colunas_index") or {}
if not isinstance(indice_colunas, dict):
indice_colunas = {}
indice_variaveis = modelo.get("_faixa_variaveis_index") or {}
if not isinstance(indice_variaveis, dict):
indice_variaveis = {}
for fonte in fontes:
faixa: dict[str, Any] | None = None
if fonte.startswith("meta:"):
faixa = _faixa_meta(modelo, fonte)
elif fonte.startswith("col:"):
faixa = indice_colunas.get(fonte[4:])
elif fonte.startswith("var:"):
faixa = indice_variaveis.get(fonte[4:])
if not isinstance(faixa, dict):
continue
if _is_empty(faixa.get("min")) and _is_empty(faixa.get("max")):
continue
candidatos.append(faixa)
return candidatos
def _faixa_meta(modelo: dict[str, Any], fonte: str) -> dict[str, Any] | None:
if fonte == "meta:faixa_data":
return modelo.get("faixa_data")
if fonte == "meta:faixa_area":
return modelo.get("faixa_area")
if fonte == "meta:faixa_rh":
return modelo.get("faixa_rh")
faixas_por_campo = modelo.get("faixas_por_campo") or {}
if not isinstance(faixas_por_campo, dict):
faixas_por_campo = {}
if fonte == "meta:faixa_area_privativa":
return faixas_por_campo.get("area_privativa")
if fonte == "meta:faixa_area_total":
return faixas_por_campo.get("area_total")
if fonte == "meta:faixa_valor_unitario":
return faixas_por_campo.get("valor_unitario")
if fonte == "meta:faixa_valor_total":
return faixas_por_campo.get("valor_total")
return None
def _combinar_faixas(faixas: list[dict[str, Any]]) -> dict[str, Any] | None:
kind: str | None = None
min_vals: list[Any] = []
max_vals: list[Any] = []
for faixa in faixas:
cmp_min = _to_comparable(faixa.get("min"))
cmp_max = _to_comparable(faixa.get("max"))
faixa_kind = cmp_min[0] if cmp_min is not None else (cmp_max[0] if cmp_max is not None else None)
if faixa_kind is None:
continue
if kind is None:
kind = faixa_kind
if faixa_kind != kind:
continue
if cmp_min is not None:
min_vals.append(cmp_min[1])
if cmp_max is not None:
max_vals.append(cmp_max[1])
if not min_vals and not max_vals:
return None
minimo = min(min_vals) if min_vals else None
maximo = max(max_vals) if max_vals else None
return {
"min": _formatar_limite_faixa(minimo),
"max": _formatar_limite_faixa(maximo),
}
def _formatar_limite_faixa(valor: Any) -> Any:
if valor is None:
return None
if isinstance(valor, datetime):
return valor.date().isoformat()
return sanitize_value(valor)
def _indexar_texto_colunas(df_modelo: pd.DataFrame | None) -> dict[str, list[str]]:
if df_modelo is None or df_modelo.empty:
return {}
indice: dict[str, list[str]] = {}
colunas = [str(col) for col in df_modelo.columns[:MAX_COLUNAS_INDEXADAS]]
base = df_modelo[colunas].head(MAX_LINHAS_INDEXACAO)
for coluna in colunas:
serie = base[coluna]
valores: list[str] = []
vistos = set()
for valor in serie.dropna().tolist():
texto = _str_or_none(valor)
if texto is None and isinstance(valor, (int, float)):
texto = str(valor)
if texto is None:
continue
chave = _normalize(texto)
if not chave or chave in vistos:
continue
vistos.add(chave)
valores.append(texto)
if len(valores) >= MAX_VALORES_INDEXADOS_POR_COLUNA:
break
if valores:
indice[coluna] = valores
return indice
def _indexar_faixas_colunas(df_modelo: pd.DataFrame | None) -> dict[str, dict[str, Any]]:
if df_modelo is None or df_modelo.empty:
return {}
indice: dict[str, dict[str, Any]] = {}
colunas = _colunas_data_reais(df_modelo)[:MAX_COLUNAS_INDEXADAS]
if not colunas:
return indice
base = df_modelo[colunas].head(MAX_LINHAS_INDEXACAO)
for coluna in colunas:
faixa = _extrair_faixa_data_real_serie(base[coluna])
if faixa is not None:
indice[coluna] = faixa
return indice
def _indexar_faixas_variaveis(estat_df: pd.DataFrame | None, variaveis_modelo: list[str] | None) -> dict[str, dict[str, Any]]:
if estat_df is None or estat_df.empty:
return {}
trabalho = estat_df.copy()
if "Variável" in trabalho.columns:
trabalho = trabalho.set_index("Variável")
if trabalho.empty:
return {}
min_col = _buscar_coluna(trabalho.columns, ["minimo", "mínimo", "min"])
max_col = _buscar_coluna(trabalho.columns, ["maximo", "máximo", "max"])
if min_col is None or max_col is None:
return {}
variaveis_norm = {_normalize(item) for item in (variaveis_modelo or []) if _str_or_none(item)}
indice: dict[str, dict[str, Any]] = {}
nomes = trabalho.index.astype(str).tolist()
mins = trabalho[min_col].tolist()
maxs = trabalho[max_col].tolist()
for nome, min_bruto, max_bruto in zip(nomes, mins, maxs):
if variaveis_norm and _normalize(nome) not in variaveis_norm:
continue
min_val = sanitize_value(min_bruto)
max_val = sanitize_value(max_bruto)
if _is_empty(min_val) and _is_empty(max_val):
continue
indice[nome] = {
"min": None if _is_empty(min_val) else min_val,
"max": None if _is_empty(max_val) else max_val,
}
return indice
def _lista_textos_unicos(valores: list[str], limite: int | None) -> list[str]:
unicos: list[str] = []
vistos = set()
for valor in valores:
texto = _str_or_none(valor)
if not texto:
continue
chave = _normalize(texto)
if not chave or chave in vistos:
continue
vistos.add(chave)
unicos.append(texto)
ordenados = sorted(unicos, key=lambda item: item.lower())
if limite is None or limite <= 0:
return ordenados
return ordenados[:limite]
def _aceita_valor_na_faixa(faixa: dict[str, Any] | None, valor: Any) -> bool:
if not _is_provided(valor):
return True
return _range_overlaps(faixa, valor, valor)
def formatar_faixa(faixa: dict[str, Any] | None) -> str:
if not faixa:
return "nao disponivel"
minimo = faixa.get("min")
maximo = faixa.get("max")
if _is_empty(minimo) and _is_empty(maximo):
return "nao disponivel"
if not _is_empty(minimo) and not _is_empty(maximo):
return f"{minimo} a {maximo}"
if not _is_empty(minimo):
return f"a partir de {minimo}"
return f"ate {maximo}"
def _periodo_texto_informado(data_min: Any, data_max: Any) -> str | None:
inicio = _str_or_none(data_min)
fim = _str_or_none(data_max)
if not inicio and not fim:
return None
if inicio and fim:
return f"{inicio} a {fim}"
if inicio:
return f"a partir de {inicio}"
return f"ate {fim}"
def _tipo_modelo_modelo(modelo: dict[str, Any]) -> str | None:
tipo = _str_or_none(modelo.get("tipo_imovel"))
if tipo:
return tipo
nome_referencia = _str_or_none(modelo.get("nome_modelo")) or _str_or_none(modelo.get("arquivo")) or ""
return _inferir_tipo_por_nome(nome_referencia)
def _negociacao_modelo_modelo(modelo: dict[str, Any]) -> str | None:
nome_referencia = _str_or_none(modelo.get("arquivo")) or _str_or_none(modelo.get("nome_modelo")) or ""
nome_upper = nome_referencia.upper()
if re.search(r"(^|_)A(_|$)", nome_upper):
return "aluguel"
if re.search(r"(^|_)V(_|$)", nome_upper):
return "venda"
return None
def _normalizar_token_zona(value: Any) -> str | None:
texto = _str_or_none(value)
if not texto:
return None
match = re.search(r"Z?\s*0*(\d+)", texto.upper())
if not match:
return None
numero = int(match.group(1))
if numero <= 0:
return None
return f"Z{numero}"
def _extrair_termos_zona(texto: str) -> list[str]:
zonas = []
vistos = set()
for termo in _split_terms(texto):
zona = _normalizar_token_zona(termo)
if not zona:
continue
chave = _normalize(zona)
if chave in vistos:
continue
vistos.add(chave)
zonas.append(zona)
return zonas
def _zonas_avaliacao_modelo(modelo: dict[str, Any]) -> list[str]:
nomes_ref = [
_str_or_none(modelo.get("arquivo")) or "",
_str_or_none(modelo.get("nome_modelo")) or "",
]
zonas = []
vistos = set()
for nome in nomes_ref:
nome_upper = nome.upper()
for match in re.finditer(r"(?:^|_)Z(\d+)(?=_|$)", nome_upper):
zona = _normalizar_token_zona(f"Z{match.group(1)}")
if not zona:
continue
chave = _normalize(zona)
if chave in vistos:
continue
vistos.add(chave)
zonas.append(zona)
return sorted(zonas, key=lambda item: int(re.search(r"\d+", item).group(0)))
def _lista_zonas_unicas(valores: list[str], limite: int = 200) -> list[str]:
unicas = []
vistos = set()
for valor in valores:
zona = _normalizar_token_zona(valor)
if not zona:
continue
chave = _normalize(zona)
if not chave or chave in vistos:
continue
vistos.add(chave)
unicas.append(zona)
if len(unicas) >= limite:
break
return sorted(unicas, key=lambda item: int(re.search(r"\d+", item).group(0)))
def _extrair_termos_bairro(filtros: PesquisaFiltros) -> list[str]:
termos: list[str] = []
if filtros.bairro:
termos.extend(_split_terms(filtros.bairro))
if filtros.bairros:
for entrada in filtros.bairros:
termos.extend(_split_terms(entrada))
limpos = []
vistos = set()
for termo in termos:
chave = _normalize(termo)
if not chave or chave in vistos:
continue
vistos.add(chave)
limpos.append(termo)
return limpos
def _split_terms(texto: str) -> list[str]:
if not texto:
return []
partes = re.split(r"[,;|]", texto)
return [parte.strip() for parte in partes if parte.strip()]
def _range_overlaps(model_range: dict[str, Any] | None, filtro_min: Any, filtro_max: Any) -> bool:
if filtro_min is None and filtro_max is None:
return True
if not model_range:
return False
model_min_cmp = _to_comparable(model_range.get("min"))
model_max_cmp = _to_comparable(model_range.get("max"))
filtro_min_cmp = _to_comparable(filtro_min) if filtro_min is not None else None
filtro_max_cmp = _to_comparable(filtro_max) if filtro_max is not None else None
if filtro_min_cmp is None and filtro_max_cmp is None:
return True
kinds = {
item[0]
for item in [model_min_cmp, model_max_cmp, filtro_min_cmp, filtro_max_cmp]
if item is not None
}
if len(kinds) != 1:
return False
model_min_val = model_min_cmp[1] if model_min_cmp is not None else None
model_max_val = model_max_cmp[1] if model_max_cmp is not None else None
filtro_min_val = filtro_min_cmp[1] if filtro_min_cmp is not None else None
filtro_max_val = filtro_max_cmp[1] if filtro_max_cmp is not None else None
if model_min_val is None and model_max_val is None:
return False
if model_min_val is None:
model_min_val = model_max_val
if model_max_val is None:
model_max_val = model_min_val
if filtro_min_val is not None and model_max_val < filtro_min_val:
return False
if filtro_max_val is not None and model_min_val > filtro_max_val:
return False
return True
def _range_contains(model_range: dict[str, Any] | None, filtro_min: Any, filtro_max: Any) -> bool:
if filtro_min is None and filtro_max is None:
return True
if not model_range:
return False
model_min_cmp = _to_comparable(model_range.get("min"))
model_max_cmp = _to_comparable(model_range.get("max"))
filtro_min_cmp = _to_comparable(filtro_min) if filtro_min is not None else None
filtro_max_cmp = _to_comparable(filtro_max) if filtro_max is not None else None
if filtro_min_cmp is None and filtro_max_cmp is None:
return True
kinds = {
item[0]
for item in [model_min_cmp, model_max_cmp, filtro_min_cmp, filtro_max_cmp]
if item is not None
}
if len(kinds) != 1:
return False
model_min_val = model_min_cmp[1] if model_min_cmp is not None else None
model_max_val = model_max_cmp[1] if model_max_cmp is not None else None
filtro_min_val = filtro_min_cmp[1] if filtro_min_cmp is not None else None
filtro_max_val = filtro_max_cmp[1] if filtro_max_cmp is not None else None
if model_min_val is None or model_max_val is None:
return False
if filtro_min_val is not None and filtro_max_val is not None and filtro_min_val > filtro_max_val:
filtro_min_val, filtro_max_val = filtro_max_val, filtro_min_val
if filtro_min_val is not None:
if filtro_min_val < model_min_val or filtro_min_val > model_max_val:
return False
if filtro_max_val is not None:
if filtro_max_val < model_min_val or filtro_max_val > model_max_val:
return False
return True
def _to_comparable(value: Any) -> tuple[str, Any] | None:
if value is None:
return None
if isinstance(value, (int, float)):
if isinstance(value, float) and (math.isnan(value) or math.isinf(value)):
return None
return ("num", float(value))
if isinstance(value, pd.Timestamp):
return ("dt", value.to_pydatetime())
if isinstance(value, datetime):
return ("dt", value)
if isinstance(value, date):
return ("dt", datetime(value.year, value.month, value.day))
texto = str(value).strip()
if not texto:
return None
numero = _to_float_or_none(texto)
if numero is not None:
return ("num", float(numero))
data_val = _parse_datetime(texto)
if data_val is not None:
return ("dt", data_val)
return None
def _parse_datetime(texto: str) -> datetime | None:
for fmt in ("%Y-%m-%d", "%d/%m/%Y", "%Y/%m/%d", "%Y%m%d", "%Y"):
try:
return datetime.strptime(texto, fmt)
except Exception:
continue
return None
def _contains_any(candidatos: list[Any], consulta: str) -> bool:
alvo = _normalize(consulta)
if not alvo:
return True
for item in candidatos:
if item is None:
continue
if alvo in _normalize(str(item)):
return True
return False
def _merge_ranges(preferencial: dict[str, Any] | None, fallback: dict[str, Any] | None) -> dict[str, Any] | None:
if not preferencial and not fallback:
return None
if not preferencial:
return fallback
if not fallback:
return preferencial
min_final = preferencial.get("min") if not _is_empty(preferencial.get("min")) else fallback.get("min")
max_final = preferencial.get("max") if not _is_empty(preferencial.get("max")) else fallback.get("max")
if _is_empty(min_final) and _is_empty(max_final):
return None
return {"min": sanitize_value(min_final), "max": sanitize_value(max_final)}
def _buscar_coluna(colunas: Any, aliases: list[str]) -> Any:
for coluna in colunas:
if _has_alias(str(coluna), aliases):
return coluna
return None
def _has_alias(nome: str, aliases: list[str]) -> bool:
nome_norm = _normalize(nome)
if not nome_norm:
return False
for alias in aliases:
alias_norm = _normalize(alias)
if alias_norm == nome_norm or alias_norm in nome_norm:
return True
return False
def _has_alias_exato(nome: str, aliases: list[str]) -> bool:
nome_norm = _normalize(nome)
if not nome_norm:
return False
return any(_normalize(alias) == nome_norm for alias in aliases)
def _normalize(value: str) -> str:
ascii_text = unicodedata.normalize("NFKD", str(value)).encode("ascii", "ignore").decode("ascii")
ascii_text = ascii_text.lower().strip()
return re.sub(r"[^a-z0-9]+", "", ascii_text)
def _min_value(values: list[Any]) -> Any:
comparaveis = [_to_comparable(v) for v in values]
comparaveis = [c for c in comparaveis if c is not None]
if not comparaveis:
return sanitize_value(values[0]) if values else None
kind = comparaveis[0][0]
valores = [c[1] for c in comparaveis if c[0] == kind]
menor = min(valores)
if isinstance(menor, datetime):
return menor.date().isoformat()
return sanitize_value(menor)
def _max_value(values: list[Any]) -> Any:
comparaveis = [_to_comparable(v) for v in values]
comparaveis = [c for c in comparaveis if c is not None]
if not comparaveis:
return sanitize_value(values[0]) if values else None
kind = comparaveis[0][0]
valores = [c[1] for c in comparaveis if c[0] == kind]
maior = max(valores)
if isinstance(maior, datetime):
return maior.date().isoformat()
return sanitize_value(maior)
def _inferir_finalidade_por_nome(nome_arquivo: str) -> str | None:
nome_upper = nome_arquivo.upper()
if re.search(r"(^|_)A(_|$)", nome_upper) or "ALUG" in nome_upper:
return "Aluguel"
if re.search(r"(^|_)V(_|$)", nome_upper) or "VENDA" in nome_upper:
return "Venda"
return None
def _inferir_tipo_por_nome(nome_arquivo: str) -> str | None:
nome_upper = nome_arquivo.upper()
tokens_ordenados = sorted(TIPO_POR_TOKEN.items(), key=lambda item: len(item[0]), reverse=True)
for token, tipo in tokens_ordenados:
if _contains_tipo_token(nome_upper, token):
return tipo
return None
def _contains_tipo_token(nome_upper: str, token: str) -> bool:
padrao = rf"(^|[^A-Z]){re.escape(token)}([^A-Z]|$)"
return re.search(padrao, nome_upper) is not None
def _to_float_or_none(value: Any) -> float | None:
if value is None:
return None
if isinstance(value, bool):
return None
if isinstance(value, (int, float)):
if isinstance(value, float) and (math.isnan(value) or math.isinf(value)):
return None
return float(value)
texto = str(value).strip()
if not texto:
return None
if "," in texto and "." in texto:
if texto.rfind(",") > texto.rfind("."):
texto = texto.replace(".", "").replace(",", ".")
else:
texto = texto.replace(",", "")
else:
texto = texto.replace(",", ".")
try:
return float(texto)
except Exception:
return None
def _str_or_none(value: Any) -> str | None:
if value is None:
return None
texto = str(value).strip()
return texto or None
def _is_empty(value: Any) -> bool:
if value is None:
return True
if isinstance(value, str) and not value.strip():
return True
try:
if pd.isna(value):
return True
except Exception:
pass
return False
def _is_provided(value: Any) -> bool:
if value is None:
return False
if isinstance(value, str):
return bool(value.strip())
return True