mesa-react / backend /app /core /visualizacao /map_payload.py
Guilherme Silberfarb Costa
alteracoes generalizadas
3e507b3
from __future__ import annotations
from html import escape
from typing import Any
import branca.colormap as cm
import numpy as np
import pandas as pd
from scipy.interpolate import griddata
from app.core.elaboracao.charts import (
_contorno_convexo_lng_lat,
_mascara_dentro_poligono,
_normalizar_stops_cor,
)
from app.core.map_layers import build_trabalhos_tecnicos_marker_payloads
from app.core.visualizacao.app import COR_PRINCIPAL, formatar_monetario
_LAT_ALIASES = {"lat", "latitude", "siat_latitude"}
_LON_ALIASES = {"lon", "longitude", "long", "siat_longitude"}
_TILE_LAYERS = [
{
"id": "positron",
"label": "Positron",
"url": "https://{s}.basemaps.cartocdn.com/light_all/{z}/{x}/{y}{r}.png",
"attribution": "© OpenStreetMap contributors © CARTO",
},
{
"id": "osm",
"label": "OpenStreetMap",
"url": "https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png",
"attribution": "© OpenStreetMap contributors",
},
]
def _primeira_serie_por_nome(dataframe: pd.DataFrame, nome_coluna: str) -> pd.Series | None:
matches = [i for i, c in enumerate(dataframe.columns) if str(c) == str(nome_coluna)]
if not matches:
return None
return dataframe.iloc[:, matches[0]]
def _detectar_coluna(df: pd.DataFrame, aliases: set[str]) -> str | None:
for col in df.columns:
if str(col).lower() in aliases:
return str(col)
return None
def _formatar_tooltip_valor(coluna: str | None, valor: Any) -> str:
if valor is None:
return "—"
try:
if pd.isna(valor):
return "—"
except Exception:
pass
col_norm = str(coluna or "").lower()
if isinstance(valor, (int, float, np.integer, np.floating)):
numero = float(valor)
if not np.isfinite(numero):
return "—"
if any(k in col_norm for k in ["valor", "preco", "vu", "vunit"]):
return formatar_monetario(numero)
return f"{numero:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".")
return str(valor)
def _formatar_valor_resumo_mapa(coluna: str | None, valor: Any) -> str:
return _formatar_tooltip_valor(coluna, valor)
def _formatar_indices_tooltip(indices: list[Any]) -> str:
return ", ".join(str(item) for item in indices)
def _formatar_indices_badge(indices: list[Any], limite: int = 28) -> str:
texto = ", ".join(str(item) for item in indices)
if len(texto) <= limite:
return texto
return texto[: max(0, limite - 1)].rstrip(" ,") + "…"
def _tooltip_html_grupo_mercado(indices: list[Any], label: str | None = None, valores: list[str] | None = None) -> str:
total = len(indices)
indices_txt = escape(_formatar_indices_tooltip(indices))
titulo = f"{total} dados neste local" if total != 1 else "1 dado neste local"
html = (
"<div style='font-family:\"Segoe UI\",Arial,sans-serif; font-size:14px; line-height:1.7; padding:2px 4px;'>"
f"<b>{escape(titulo)}</b>"
f"<br><span style='color:#555;'>Índices:</span> <b>{indices_txt}</b>"
)
valores_limpos = [str(item) for item in valores or [] if str(item).strip()]
if label and valores_limpos:
unicos = list(dict.fromkeys(valores_limpos))
if len(unicos) == 1:
resumo = unicos[0]
else:
resumo = "valores diferentes"
html += f"<br><span style='color:#555;'>{escape(str(label))}:</span> <b>{escape(str(resumo))}</b>"
html += "</div>"
return html
def _resolver_bounds(df_mapa: pd.DataFrame, lat_key: str, lon_key: str) -> list[list[float]]:
df_bounds = df_mapa
if len(df_mapa) >= 8:
lat_vals = df_mapa[lat_key]
lon_vals = df_mapa[lon_key]
lat_med = float(lat_vals.median())
lon_med = float(lon_vals.median())
lat_mad = float((lat_vals - lat_med).abs().median())
lon_mad = float((lon_vals - lon_med).abs().median())
lat_span = float(lat_vals.max() - lat_vals.min())
lon_span = float(lon_vals.max() - lon_vals.min())
lat_scale = max(lat_mad, lat_span / 30.0, 1e-6)
lon_scale = max(lon_mad, lon_span / 30.0, 1e-6)
score = ((lat_vals - lat_med) / lat_scale) ** 2 + ((lon_vals - lon_med) / lon_scale) ** 2
lim = float(score.quantile(0.75))
df_core = df_mapa[score <= lim]
if len(df_core) >= max(5, int(len(df_mapa) * 0.45)):
df_bounds = df_core
if len(df_bounds) >= 50:
lat_min, lat_max = df_bounds[lat_key].quantile([0.01, 0.99]).tolist()
lon_min, lon_max = df_bounds[lon_key].quantile([0.01, 0.99]).tolist()
else:
lat_min, lat_max = float(df_bounds[lat_key].min()), float(df_bounds[lat_key].max())
lon_min, lon_max = float(df_bounds[lon_key].min()), float(df_bounds[lon_key].max())
if not np.isfinite(lat_min) or not np.isfinite(lat_max):
lat_min, lat_max = float(df_mapa[lat_key].min()), float(df_mapa[lat_key].max())
if not np.isfinite(lon_min) or not np.isfinite(lon_max):
lon_min, lon_max = float(df_mapa[lon_key].min()), float(df_mapa[lon_key].max())
if np.isclose(lat_min, lat_max):
lat_min = float(lat_min) - 0.0008
lat_max = float(lat_max) + 0.0008
if np.isclose(lon_min, lon_max):
lon_min = float(lon_min) - 0.0008
lon_max = float(lon_max) + 0.0008
return [[float(lat_min), float(lon_min)], [float(lat_max), float(lon_max)]]
def _normalizar_bounds_lista(bounds: list[list[float]] | None) -> list[list[float]] | None:
coords = []
for item in bounds or []:
if not isinstance(item, (list, tuple)) or len(item) < 2:
continue
try:
lat = float(item[0])
lon = float(item[1])
except Exception:
continue
if not np.isfinite(lat) or not np.isfinite(lon):
continue
coords.append((lat, lon))
if not coords:
return None
lat_values = [lat for lat, _ in coords]
lon_values = [lon for _, lon in coords]
lat_min = min(lat_values)
lat_max = max(lat_values)
lon_min = min(lon_values)
lon_max = max(lon_values)
if np.isclose(lat_min, lat_max):
lat_min -= 0.0008
lat_max += 0.0008
if np.isclose(lon_min, lon_max):
lon_min -= 0.0008
lon_max += 0.0008
return [[float(lat_min), float(lon_min)], [float(lat_max), float(lon_max)]]
def build_leaflet_payload(
*,
bounds: list[list[float]] | None,
center: list[float] | None = None,
legend: dict[str, Any] | None = None,
overlay_layers: list[dict[str, Any]] | None = None,
notice: dict[str, Any] | None = None,
show_bairros: bool = True,
bairros_geojson_url: str = "/api/visualizacao/map/bairros.geojson",
) -> dict[str, Any] | None:
normalized_bounds = _normalizar_bounds_lista(bounds)
if normalized_bounds is None:
return None
if center and len(center) >= 2:
center_lat = float(center[0])
center_lon = float(center[1])
else:
center_lat = float((normalized_bounds[0][0] + normalized_bounds[1][0]) / 2.0)
center_lon = float((normalized_bounds[0][1] + normalized_bounds[1][1]) / 2.0)
final_overlay_layers = list(overlay_layers or [])
if show_bairros:
final_overlay_layers.insert(
0,
{
"id": "bairros",
"label": "Bairros",
"show": True,
"geojson_url": bairros_geojson_url,
"geojson_pane": "mesa-bairros-pane",
"geojson_style": {
"color": "#4c6882",
"weight": 1.15,
"opacity": 0.88,
"fillColor": "#f39c12",
"fillOpacity": 0.0,
},
"geojson_tooltip_properties": ["NOME", "BAIRRO", "NME_BAI", "NOME_BAIRRO"],
"geojson_tooltip_label": "Bairro",
},
)
return {
"type": "mesa_leaflet_payload",
"version": 1,
"center": [center_lat, center_lon],
"bounds": normalized_bounds,
"tile_layers": _TILE_LAYERS,
"controls": {
"fullscreen": True,
"measure": True,
"layer_control": True,
},
"radius_behavior": {
"min_radius": 1.6,
"max_radius": 52.0,
"reference_zoom": 12.0,
"growth_factor": 0.20,
},
"legend": legend,
"notice": notice,
"overlay_layers": final_overlay_layers,
}
def build_elaboracao_map_payload(
df: pd.DataFrame,
*,
lat_col: str = "lat",
lon_col: str = "lon",
cor_col: str | None = None,
indice_destacado: Any = None,
tamanho_col: str | None = None,
modo: str | None = "pontos",
cor_vmin: float | None = None,
cor_vmax: float | None = None,
cor_caption: str | None = None,
cor_colors: list[str] | None = None,
cor_stops: list[float] | None = None,
cor_tick_values: list[float] | None = None,
cor_tick_labels: list[str] | None = None,
bairros_geojson_url: str = "/api/visualizacao/map/bairros.geojson",
popup_source: str | None = "mercado",
) -> dict[str, Any] | None:
modo_normalizado = str(modo or "pontos").strip().lower()
cols_lower = {str(c).lower(): c for c in df.columns}
lat_real = cols_lower.get(str(lat_col).lower()) if str(lat_col).lower() in cols_lower else None
lon_real = cols_lower.get(str(lon_col).lower()) if str(lon_col).lower() in cols_lower else None
if lat_real is None:
for nome in ["lat", "latitude", "siat_latitude"]:
if nome in cols_lower:
lat_real = cols_lower[nome]
break
if lon_real is None:
for nome in ["lon", "longitude", "long", "siat_longitude"]:
if nome in cols_lower:
lon_real = cols_lower[nome]
break
if lat_real is None or lon_real is None:
return None
row_id_col = "__mesa_row_id__"
df_mapa = df.copy()
if popup_source and row_id_col not in df_mapa.columns:
df_mapa[row_id_col] = np.arange(len(df_mapa), dtype=int)
df_mapa[lat_real] = pd.to_numeric(df_mapa[lat_real], errors="coerce")
df_mapa[lon_real] = pd.to_numeric(df_mapa[lon_real], errors="coerce")
df_mapa = df_mapa.dropna(subset=[lat_real, lon_real])
df_mapa = df_mapa[~((df_mapa[lat_real] == 0.0) & (df_mapa[lon_real] == 0.0))]
df_mapa = df_mapa[
(df_mapa[lat_real] >= -90.0)
& (df_mapa[lat_real] <= 90.0)
& (df_mapa[lon_real] >= -180.0)
& (df_mapa[lon_real] <= 180.0)
].copy()
if df_mapa.empty:
return None
limite_pontos = 2500
total_pontos = len(df_mapa)
houve_amostragem = total_pontos > limite_pontos
if houve_amostragem:
df_mapa = df_mapa.sample(n=limite_pontos, random_state=42).copy()
centro_lat = float(df_mapa[lat_real].median())
centro_lon = float(df_mapa[lon_real].median())
colors = (
[str(item) for item in cor_colors if str(item).strip()]
if isinstance(cor_colors, list) and len(cor_colors) >= 2
else ["#2ecc71", "#a8e06c", "#f1c40f", "#e67e22", "#e74c3c"]
)
modo_calor = modo_normalizado == "calor" and tamanho_col is not None and tamanho_col in df_mapa.columns
modo_superficie = modo_normalizado == "superficie" and tamanho_col is not None and tamanho_col in df_mapa.columns
if modo_normalizado not in {"pontos", "calor", "superficie"}:
modo_normalizado = "pontos"
cor_col_resolvida = cor_col or tamanho_col
colormap = None
legend = None
if cor_col_resolvida and cor_col_resolvida in df_mapa.columns:
serie_cor = pd.to_numeric(df_mapa[cor_col_resolvida], errors="coerce")
vmin = float(cor_vmin) if cor_vmin is not None and np.isfinite(cor_vmin) else float(serie_cor.min())
vmax = float(cor_vmax) if cor_vmax is not None and np.isfinite(cor_vmax) else float(serie_cor.max())
if np.isfinite(vmin) and np.isfinite(vmax):
if np.isclose(vmin, vmax):
vmax = float(vmin) + 1.0
color_index = _normalizar_stops_cor(cor_stops, colors, float(vmin), float(vmax))
colormap_kwargs: dict[str, Any] = {
"colors": colors,
"vmin": float(vmin),
"vmax": float(vmax),
"caption": str(cor_caption or cor_col_resolvida),
}
if color_index is not None:
colormap_kwargs["index"] = color_index
colormap = cm.LinearColormap(**colormap_kwargs)
legend = {
"title": str(cor_caption or cor_col_resolvida),
"vmin": float(vmin),
"vmax": float(vmax),
"colors": colors,
}
if (
isinstance(cor_tick_values, list)
and isinstance(cor_tick_labels, list)
and len(cor_tick_values) == len(cor_tick_labels)
and len(cor_tick_values) > 1
):
try:
legend["tick_values"] = [float(item) for item in cor_tick_values]
legend["tick_labels"] = [str(item) for item in cor_tick_labels]
except (TypeError, ValueError):
pass
raio_min, raio_max = 3.0, 18.0
tamanho_func = None
if tamanho_col and tamanho_col in df_mapa.columns:
serie_tamanho = pd.to_numeric(df_mapa[tamanho_col], errors="coerce")
t_min = float(serie_tamanho.min())
t_max = float(serie_tamanho.max())
if np.isfinite(t_min) and np.isfinite(t_max):
if t_max > t_min:
tamanho_func = (
lambda v, _min=t_min, _max=t_max: raio_min
+ (v - _min) / (_max - _min) * (raio_max - raio_min)
)
else:
tamanho_func = lambda _v: (raio_min + raio_max) / 2.0
mostrar_indices = not modo_calor and not modo_superficie and len(df_mapa) <= 800
lat_plot_col = "__mesa_lat_plot__"
lon_plot_col = "__mesa_lon_plot__"
if not modo_calor and not modo_superficie:
df_plot_pontos = df_mapa.copy()
df_plot_pontos[lat_plot_col] = df_plot_pontos[lat_real]
df_plot_pontos[lon_plot_col] = df_plot_pontos[lon_real]
else:
df_plot_pontos = df_mapa.copy()
df_plot_pontos[lat_plot_col] = df_plot_pontos[lat_real]
df_plot_pontos[lon_plot_col] = df_plot_pontos[lon_real]
overlay_layers: list[dict[str, Any]] = []
if modo_calor and tamanho_col and tamanho_col in df_mapa.columns:
pesos = pd.to_numeric(df_mapa[tamanho_col], errors="coerce")
mask_pesos = np.isfinite(pesos.to_numpy())
df_calor = df_mapa.loc[mask_pesos, [lat_real, lon_real]].copy()
if not df_calor.empty:
pesos_validos = pesos.loc[df_calor.index].to_numpy(dtype=float)
fixed_scale = (
cor_vmin is not None
and cor_vmax is not None
and np.isfinite(cor_vmin)
and np.isfinite(cor_vmax)
and float(cor_vmax) > float(cor_vmin)
)
if fixed_scale:
peso_min = float(cor_vmin)
peso_max = float(cor_vmax)
pesos_clip = np.clip(pesos_validos, peso_min, peso_max)
pesos_norm = (pesos_clip - peso_min) / (peso_max - peso_min)
else:
peso_min = float(np.min(pesos_validos))
peso_max = float(np.max(pesos_validos))
if np.isfinite(peso_min) and np.isfinite(peso_max) and peso_max > peso_min:
pesos_norm = 0.1 + 0.9 * (pesos_validos - peso_min) / (peso_max - peso_min)
else:
pesos_norm = np.ones_like(pesos_validos)
gradient = None
if len(colors) >= 2:
if (
cor_vmin is not None
and cor_vmax is not None
and np.isfinite(cor_vmin)
and np.isfinite(cor_vmax)
and float(cor_vmax) > float(cor_vmin)
):
color_index = _normalizar_stops_cor(cor_stops, colors, float(cor_vmin), float(cor_vmax))
if color_index is not None:
gradient = {}
for stop, color in zip(color_index, colors):
ratio = (float(stop) - float(cor_vmin)) / (float(cor_vmax) - float(cor_vmin))
gradient[float(np.clip(ratio, 0.0, 1.0))] = color
elif len(colors) == 2:
gradient = {0.0: colors[0], 1.0: colors[1]}
else:
gradient = {i / (len(colors) - 1): colors[i] for i in range(len(colors))}
elif len(colors) == 2:
gradient = {0.0: colors[0], 1.0: colors[1]}
else:
gradient = {i / (len(colors) - 1): colors[i] for i in range(len(colors))}
overlay_layers.append(
{
"id": "mapa_calor",
"label": "Mapa de calor",
"show": True,
"heatmap": {
"points": [
{
"lat": float(df_calor.iloc[idx][lat_real]),
"lon": float(df_calor.iloc[idx][lon_real]),
"weight": float(pesos_norm[idx]),
}
for idx in range(len(df_calor))
],
"radius": 20,
"blur": 18,
"min_opacity": 0.28,
"max_zoom": 17,
"gradient": gradient,
},
}
)
elif modo_superficie and tamanho_col and tamanho_col in df_mapa.columns:
lats = pd.to_numeric(df_mapa[lat_real], errors="coerce").to_numpy(dtype=float)
lons = pd.to_numeric(df_mapa[lon_real], errors="coerce").to_numpy(dtype=float)
valores = pd.to_numeric(df_mapa[tamanho_col], errors="coerce").to_numpy(dtype=float)
mask_valid = np.isfinite(lats) & np.isfinite(lons) & np.isfinite(valores)
if mask_valid.sum() >= 6:
lats = lats[mask_valid]
lons = lons[mask_valid]
valores = valores[mask_valid]
contorno = _contorno_convexo_lng_lat(lons, lats)
if contorno is not None:
lon_min = float(np.min(contorno[:, 0]))
lon_max = float(np.max(contorno[:, 0]))
lat_min = float(np.min(contorno[:, 1]))
lat_max = float(np.max(contorno[:, 1]))
if not np.isclose(lon_min, lon_max) and not np.isclose(lat_min, lat_max):
n_obs = len(valores)
n_grid = 46 if n_obs <= 400 else (40 if n_obs <= 1200 else 34)
grid_lon = np.linspace(lon_min, lon_max, n_grid)
grid_lat = np.linspace(lat_min, lat_max, n_grid)
mesh_lon, mesh_lat = np.meshgrid(grid_lon, grid_lat)
pontos = np.column_stack([lons, lats])
try:
superficie = griddata(pontos, valores, (mesh_lon, mesh_lat), method="linear")
except Exception:
superficie = None
if superficie is not None:
superficie = np.asarray(superficie, dtype=float)
if np.isnan(superficie).all():
try:
superficie = griddata(pontos, valores, (mesh_lon, mesh_lat), method="nearest")
except Exception:
superficie = None
elif np.isnan(superficie).any():
try:
nearest = griddata(pontos, valores, (mesh_lon, mesh_lat), method="nearest")
except Exception:
nearest = None
if nearest is not None:
superficie = np.where(np.isfinite(superficie), superficie, np.asarray(nearest, dtype=float))
if superficie is not None:
superficie = np.asarray(superficie, dtype=float)
mascara = _mascara_dentro_poligono(mesh_lon, mesh_lat, contorno)
superficie = np.where(mascara, superficie, np.nan)
if np.isfinite(superficie).any():
vmin_sup = float(cor_vmin) if cor_vmin is not None and np.isfinite(cor_vmin) else float(np.nanmin(superficie))
vmax_sup = float(cor_vmax) if cor_vmax is not None and np.isfinite(cor_vmax) else float(np.nanmax(superficie))
if np.isfinite(vmin_sup) and np.isfinite(vmax_sup):
if np.isclose(vmin_sup, vmax_sup):
vmax_sup = vmin_sup + 1.0
color_index = _normalizar_stops_cor(cor_stops, colors, vmin_sup, vmax_sup)
colormap_kwargs = {
"colors": colors,
"vmin": float(vmin_sup),
"vmax": float(vmax_sup),
"caption": str(cor_caption or f"{tamanho_col} (superfície)"),
}
if color_index is not None:
colormap_kwargs["index"] = color_index
colormap = cm.LinearColormap(**colormap_kwargs)
legend = {
"title": str(cor_caption or f"{tamanho_col} (superfície)"),
"vmin": float(vmin_sup),
"vmax": float(vmax_sup),
"colors": colors,
}
if (
isinstance(cor_tick_values, list)
and isinstance(cor_tick_labels, list)
and len(cor_tick_values) == len(cor_tick_labels)
and len(cor_tick_values) > 1
):
try:
legend["tick_values"] = [float(item) for item in cor_tick_values]
legend["tick_labels"] = [str(item) for item in cor_tick_labels]
except (TypeError, ValueError):
pass
centros_lon = (grid_lon[:-1] + grid_lon[1:]) / 2.0
centros_lat = (grid_lat[:-1] + grid_lat[1:]) / 2.0
centro_mesh_lon, centro_mesh_lat = np.meshgrid(centros_lon, centros_lat)
mascara_centros = _mascara_dentro_poligono(centro_mesh_lon, centro_mesh_lat, contorno)
valores_celula = (
superficie[:-1, :-1]
+ superficie[1:, :-1]
+ superficie[:-1, 1:]
+ superficie[1:, 1:]
) / 4.0
shapes = []
for i in range(valores_celula.shape[0]):
for j in range(valores_celula.shape[1]):
if not mascara_centros[i, j]:
continue
valor = valores_celula[i, j]
if not np.isfinite(valor):
continue
cor = str(colormap(float(valor)))
valor_fmt = f"{float(valor):.3f}".replace(".", ",")
shapes.append(
{
"type": "polygon",
"coords": [
[float(grid_lat[i]), float(grid_lon[j])],
[float(grid_lat[i]), float(grid_lon[j + 1])],
[float(grid_lat[i + 1]), float(grid_lon[j + 1])],
[float(grid_lat[i + 1]), float(grid_lon[j])],
],
"color": cor,
"weight": 0,
"fill": True,
"fill_color": cor,
"fill_opacity": 0.6,
"tooltip_html": (
f"<div style='font-family:\"Segoe UI\",Arial,sans-serif; font-size:13px;'>"
f"{str(tamanho_col)} interpolado: <b>{valor_fmt}</b>"
"</div>"
),
}
)
if shapes:
overlay_layers.append(
{
"id": "superficie_continua",
"label": "Superfície contínua",
"show": True,
"shapes": shapes,
}
)
else:
market_points: list[dict[str, Any]] = []
indices_markers: list[dict[str, Any]] = []
grupos_coord = df_plot_pontos.groupby(
[df_plot_pontos[lat_real].round(7), df_plot_pontos[lon_real].round(7)],
sort=False,
).indices
for marker_ordem, posicoes_raw in enumerate(grupos_coord.values()):
posicoes = list(posicoes_raw)
rows_grupo = [df_plot_pontos.iloc[int(pos)] for pos in posicoes]
row = rows_grupo[0]
idx = row.name
registros_grupo: list[dict[str, Any]] = []
indices_display: list[Any] = []
valores_tooltip: list[str] = []
cores_grupo: list[str] = []
raios_grupo: list[float] = []
destaque_grupo = False
for pos in posicoes:
row_item = df_plot_pontos.iloc[int(pos)]
idx_item = row_item.name
idx_display_item = int(idx_item) if isinstance(idx_item, (int, np.integer)) else int(pos) + 1
indices_display.append(idx_display_item)
valor_texto = ""
if tamanho_col and tamanho_col in df_plot_pontos.columns:
valor_texto = _formatar_valor_resumo_mapa(str(tamanho_col), row_item[tamanho_col])
valores_tooltip.append(valor_texto)
cor_item = COR_PRINCIPAL
if colormap and cor_col_resolvida and cor_col_resolvida in df_mapa.columns:
valor_cor = pd.to_numeric(pd.Series([row_item.get(cor_col_resolvida)]), errors="coerce").iloc[0]
if pd.notna(valor_cor):
cor_item = str(colormap(float(valor_cor)))
cores_grupo.append(cor_item)
if idx_item == indice_destacado:
raio_item = raio_max + 4.0
destaque_grupo = True
elif tamanho_func and tamanho_col and tamanho_col in row_item.index and pd.notna(row_item[tamanho_col]):
raio_item = float(tamanho_func(float(row_item[tamanho_col])))
else:
raio_item = 4.0
raios_grupo.append(float(max(1.0, raio_item)))
row_id_raw_item = row_item[row_id_col] if popup_source and row_id_col in row_item.index else None
popup_request_item = None
if row_id_raw_item is not None and pd.notna(row_id_raw_item):
popup_request_item = {
"kind": "elaboracao_row",
"row_id": int(row_id_raw_item),
"source": str(popup_source),
}
registros_grupo.append(
{
"indice": idx_display_item,
"label": f"Índice {idx_display_item}",
"value_label": str(tamanho_col or ""),
"value": valor_texto,
"popup_request": popup_request_item,
}
)
cor = cores_grupo[0] if cores_grupo else COR_PRINCIPAL
raio = max(raios_grupo) if raios_grupo else 4.0
stroke_weight = 3.0 if destaque_grupo else 1.0
fill_opacity = 0.8 if destaque_grupo else 0.6
is_grouped = len(rows_grupo) > 1
idx_display = indices_display[0] if indices_display else marker_ordem + 1
tooltip_html = _tooltip_html_grupo_mercado(
indices_display,
label=str(tamanho_col or "") if tamanho_col else None,
valores=valores_tooltip,
) if is_grouped else (
"<div style='font-family:\"Segoe UI\",Arial,sans-serif; font-size:14px; line-height:1.7; padding:2px 4px;'>"
f"<b>Índice {idx_display}</b>"
+ (
f"<br><span style='color:#555;'>{escape(str(tamanho_col))}:</span> <b>{escape(valores_tooltip[0])}</b>"
if tamanho_col and valores_tooltip
else ""
)
+ "</div>"
)
point_payload = {
"lat": float(row[lat_plot_col]),
"lon": float(row[lon_plot_col]),
"indice": _formatar_indices_badge(indices_display) if is_grouped else idx_display,
"color": cor,
"base_radius": float(max(1.0, raio)),
"stroke_color": "#243746" if is_grouped else "#000000",
"stroke_weight": float(max(stroke_weight, 2.0) if is_grouped else stroke_weight),
"fill_opacity": float(0.74 if is_grouped else fill_opacity),
"tooltip_html": tooltip_html,
}
if is_grouped:
point_payload.update(
{
"grouped": True,
"count": len(rows_grupo),
"group_title": f"{len(rows_grupo)} dados neste local",
"group_items": registros_grupo,
}
)
else:
point_payload["popup_request"] = registros_grupo[0].get("popup_request") if registros_grupo else None
market_points.append(point_payload)
if mostrar_indices:
indices_markers.append(
{
"lat": float(row[lat_plot_col]),
"lon": float(row[lon_plot_col]),
"marker_html": (
'<div style="transform: translate(10px, -14px);display:inline-block;background: rgba(255, 255, 255, 0.9);'
+ 'border: 1px solid rgba(28, 45, 66, 0.45);border-radius: 10px;padding: 1px 6px;font-size: 11px;'
+ 'font-weight: 700;line-height: 1.2;color: #1f2f44;white-space: nowrap;box-shadow: 0 1px 2px rgba(0, 0, 0, 0.18);'
+ 'pointer-events: none;">'
+ escape(_formatar_indices_badge(indices_display))
+ "</div>"
),
"icon_size": [96 if is_grouped else 72, 24],
"icon_anchor": [0, 0],
"class_name": "mesa-indice-label",
"interactive": False,
"keyboard": False,
}
)
overlay_layers.append(
{
"id": "mercado",
"label": "Mercado",
"show": True,
"points": market_points,
}
)
if mostrar_indices and indices_markers:
overlay_layers.append(
{
"id": "indices",
"label": "Índices",
"show": False,
"markers": indices_markers,
}
)
notice = None
if houve_amostragem:
notice = {
"message": f"Exibindo {len(df_mapa)} de {total_pontos} pontos para melhor desempenho.",
"position": "topright",
}
return build_leaflet_payload(
bounds=_resolver_bounds(df_mapa, str(lat_real), str(lon_real)),
center=[centro_lat, centro_lon],
legend=legend,
notice=notice,
overlay_layers=overlay_layers,
show_bairros=True,
bairros_geojson_url=bairros_geojson_url,
)
def build_visualizacao_map_payload(
df: pd.DataFrame,
*,
cor_col: str | None = None,
tamanho_col: str | None = None,
col_y: str | None = None,
avaliandos_tecnicos: list[dict[str, Any]] | None = None,
bairros_geojson_url: str = "/api/visualizacao/map/bairros.geojson",
) -> dict[str, Any] | None:
lat_real = _detectar_coluna(df, _LAT_ALIASES)
lon_real = _detectar_coluna(df, _LON_ALIASES)
if lat_real is None or lon_real is None:
return None
df_mapa = df.copy()
lat_key = "__mesa_lat__"
lon_key = "__mesa_lon__"
lat_serie = _primeira_serie_por_nome(df_mapa, lat_real)
lon_serie = _primeira_serie_por_nome(df_mapa, lon_real)
if lat_serie is None or lon_serie is None:
return None
df_mapa[lat_key] = pd.to_numeric(lat_serie, errors="coerce")
df_mapa[lon_key] = pd.to_numeric(lon_serie, errors="coerce")
df_mapa = df_mapa.dropna(subset=[lat_key, lon_key])
df_mapa = df_mapa[
(df_mapa[lat_key] >= -90.0)
& (df_mapa[lat_key] <= 90.0)
& (df_mapa[lon_key] >= -180.0)
& (df_mapa[lon_key] <= 180.0)
].copy()
if df_mapa.empty:
return None
centro_lat = float(df_mapa[lat_key].median())
centro_lon = float(df_mapa[lon_key].median())
cor_col_resolvida = cor_col
if tamanho_col and tamanho_col != "Visualização Padrão" and not cor_col_resolvida:
cor_col_resolvida = tamanho_col
colormap = None
cor_key = None
legend = None
if cor_col_resolvida and cor_col_resolvida in df_mapa.columns:
serie_cor = _primeira_serie_por_nome(df_mapa, cor_col_resolvida)
if serie_cor is not None:
cor_key = "__mesa_cor__"
df_mapa[cor_key] = pd.to_numeric(serie_cor, errors="coerce")
vmin = df_mapa[cor_key].min()
vmax = df_mapa[cor_key].max()
if pd.notna(vmin) and pd.notna(vmax):
colormap = cm.LinearColormap(
colors=["#2ecc71", "#a8e06c", "#f1c40f", "#e67e22", "#e74c3c"],
vmin=vmin,
vmax=vmax,
caption=cor_col_resolvida,
)
legend = {
"title": str(cor_col_resolvida),
"vmin": float(vmin),
"vmax": float(vmax),
"colors": ["#2ecc71", "#a8e06c", "#f1c40f", "#e67e22", "#e74c3c"],
}
raio_min, raio_max = 3.0, 18.0
tamanho_func = None
tamanho_key = None
if tamanho_col and tamanho_col != "Visualização Padrão" and tamanho_col in df_mapa.columns:
serie_tamanho = _primeira_serie_por_nome(df_mapa, tamanho_col)
if serie_tamanho is not None:
tamanho_key = "__mesa_tamanho__"
df_mapa[tamanho_key] = pd.to_numeric(serie_tamanho, errors="coerce")
t_min = df_mapa[tamanho_key].min()
t_max = df_mapa[tamanho_key].max()
if pd.notna(t_min) and pd.notna(t_max):
if t_max > t_min:
tamanho_func = (
lambda v, _min=t_min, _max=t_max: raio_min
+ (v - _min) / (_max - _min) * (raio_max - raio_min)
)
else:
tamanho_func = lambda v: (raio_min + raio_max) / 2.0
show_indices = False
lat_plot_key = "__mesa_lat_plot__"
lon_plot_key = "__mesa_lon_plot__"
df_plot_pontos = df_mapa.copy()
df_plot_pontos[lat_plot_key] = df_plot_pontos[lat_key]
df_plot_pontos[lon_plot_key] = df_plot_pontos[lon_key]
tooltip_col = None
tooltip_key = None
if tamanho_key:
tooltip_col = tamanho_col
tooltip_key = tamanho_key
elif col_y and col_y in df_mapa.columns:
serie_tooltip = _primeira_serie_por_nome(df_mapa, col_y)
if serie_tooltip is not None:
tooltip_col = col_y
tooltip_key = "__mesa_tooltip__"
df_mapa[tooltip_key] = serie_tooltip
df_plot_pontos[tooltip_key] = df_mapa.loc[df_plot_pontos.index, tooltip_key]
total_pontos_plot = len(df_plot_pontos)
raio_padrao = 4.0 if total_pontos_plot <= 2500 else 3.0
market_points: list[dict[str, Any]] = []
grupos_coord = df_plot_pontos.groupby(
[df_plot_pontos[lat_key].round(7), df_plot_pontos[lon_key].round(7)],
sort=False,
).indices
for marker_ordem, posicoes_raw in enumerate(grupos_coord.values()):
posicoes = list(posicoes_raw)
rows_grupo = [df_plot_pontos.iloc[int(pos)] for pos in posicoes]
row = rows_grupo[0]
indices_display: list[Any] = []
registros_grupo: list[dict[str, Any]] = []
valores_tooltip: list[str] = []
cores_grupo: list[str] = []
raios_grupo: list[float] = []
for pos in posicoes:
row_item = df_plot_pontos.iloc[int(pos)]
idx_item = row_item.name
idx_display_item = int(row_item["index"]) if "index" in row_item.index else int(idx_item)
indices_display.append(idx_display_item)
valor_texto = (
_formatar_tooltip_valor(str(tooltip_col or ""), row_item[tooltip_key])
if tooltip_col and tooltip_key and tooltip_key in row_item.index
else ""
)
if valor_texto:
valores_tooltip.append(valor_texto)
cor_item = colormap(row_item[cor_key]) if colormap and cor_key and pd.notna(row_item[cor_key]) else COR_PRINCIPAL
cores_grupo.append(str(cor_item))
if tamanho_func and tamanho_key and pd.notna(row_item[tamanho_key]):
raio_item = float(tamanho_func(row_item[tamanho_key]))
else:
raio_item = raio_padrao
raios_grupo.append(float(max(1.0, raio_item)))
row_id_raw_item = row_item["__mesa_row_id__"] if "__mesa_row_id__" in row_item.index else None
row_id_item = int(row_id_raw_item) if row_id_raw_item is not None and pd.notna(row_id_raw_item) else None
registros_grupo.append(
{
"indice": idx_display_item,
"label": f"Índice {idx_display_item}",
"value_label": str(tooltip_col or ""),
"value": valor_texto,
"popup_request": (
{"kind": "visualizacao_row", "row_id": row_id_item}
if row_id_item is not None
else None
),
}
)
is_grouped = len(rows_grupo) > 1
idx_display = indices_display[0] if indices_display else marker_ordem
cor = cores_grupo[0] if cores_grupo else COR_PRINCIPAL
raio = max(raios_grupo) if raios_grupo else raio_padrao
tooltip_payload = {
"title": f"Índice {idx_display}",
"label": str(tooltip_col or ""),
"value": valores_tooltip[0] if valores_tooltip else "",
}
row_id_raw = row["__mesa_row_id__"] if "__mesa_row_id__" in row.index else None
point_payload = {
"lat": float(row[lat_plot_key]),
"lon": float(row[lon_plot_key]),
"indice": _formatar_indices_badge(indices_display) if is_grouped else idx_display,
"row_id": int(row_id_raw) if (not is_grouped and row_id_raw is not None and pd.notna(row_id_raw)) else None,
"color": str(cor),
"base_radius": float(max(1.0, raio)),
}
if is_grouped:
point_payload.update(
{
"grouped": True,
"count": len(rows_grupo),
"stroke_color": "#243746",
"stroke_weight": 2.0,
"fill_opacity": 0.74,
"tooltip_html": _tooltip_html_grupo_mercado(
indices_display,
label=str(tooltip_col or "") if tooltip_col else None,
valores=valores_tooltip,
),
"group_title": f"{len(rows_grupo)} dados neste local",
"group_items": registros_grupo,
}
)
else:
point_payload["tooltip"] = tooltip_payload
market_points.append(point_payload)
return {
"type": "mesa_leaflet_payload",
"version": 1,
"center": [centro_lat, centro_lon],
"bounds": _resolver_bounds(df_mapa, lat_key, lon_key),
"tile_layers": _TILE_LAYERS,
"controls": {
"fullscreen": True,
"measure": True,
"layer_control": True,
},
"radius_behavior": {
"min_radius": 1.6,
"max_radius": 52.0,
"reference_zoom": 12.0,
"growth_factor": 0.20,
},
"show_indices": show_indices,
"show_bairros": True,
"bairros_geojson_url": bairros_geojson_url,
"legend": legend,
"market_points": market_points,
"trabalhos_tecnicos_points": build_trabalhos_tecnicos_marker_payloads(avaliandos_tecnicos),
}