from __future__ import annotations from html import escape from typing import Any import branca.colormap as cm import numpy as np import pandas as pd from scipy.interpolate import griddata from app.core.elaboracao.charts import ( _contorno_convexo_lng_lat, _mascara_dentro_poligono, _normalizar_stops_cor, ) from app.core.map_layers import build_trabalhos_tecnicos_marker_payloads from app.core.visualizacao.app import COR_PRINCIPAL, formatar_monetario _LAT_ALIASES = {"lat", "latitude", "siat_latitude"} _LON_ALIASES = {"lon", "longitude", "long", "siat_longitude"} _TILE_LAYERS = [ { "id": "positron", "label": "Positron", "url": "https://{s}.basemaps.cartocdn.com/light_all/{z}/{x}/{y}{r}.png", "attribution": "© OpenStreetMap contributors © CARTO", }, { "id": "osm", "label": "OpenStreetMap", "url": "https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png", "attribution": "© OpenStreetMap contributors", }, ] def _primeira_serie_por_nome(dataframe: pd.DataFrame, nome_coluna: str) -> pd.Series | None: matches = [i for i, c in enumerate(dataframe.columns) if str(c) == str(nome_coluna)] if not matches: return None return dataframe.iloc[:, matches[0]] def _detectar_coluna(df: pd.DataFrame, aliases: set[str]) -> str | None: for col in df.columns: if str(col).lower() in aliases: return str(col) return None def _formatar_tooltip_valor(coluna: str | None, valor: Any) -> str: if valor is None: return "—" try: if pd.isna(valor): return "—" except Exception: pass col_norm = str(coluna or "").lower() if isinstance(valor, (int, float, np.integer, np.floating)): numero = float(valor) if not np.isfinite(numero): return "—" if any(k in col_norm for k in ["valor", "preco", "vu", "vunit"]): return formatar_monetario(numero) return f"{numero:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".") return str(valor) def _formatar_valor_resumo_mapa(coluna: str | None, valor: Any) -> str: return _formatar_tooltip_valor(coluna, valor) def _formatar_indices_tooltip(indices: list[Any]) -> str: return ", ".join(str(item) for item in indices) def _formatar_indices_badge(indices: list[Any], limite: int = 28) -> str: texto = ", ".join(str(item) for item in indices) if len(texto) <= limite: return texto return texto[: max(0, limite - 1)].rstrip(" ,") + "…" def _tooltip_html_grupo_mercado(indices: list[Any], label: str | None = None, valores: list[str] | None = None) -> str: total = len(indices) indices_txt = escape(_formatar_indices_tooltip(indices)) titulo = f"{total} dados neste local" if total != 1 else "1 dado neste local" html = ( "
" f"{escape(titulo)}" f"
Índices: {indices_txt}" ) valores_limpos = [str(item) for item in valores or [] if str(item).strip()] if label and valores_limpos: unicos = list(dict.fromkeys(valores_limpos)) if len(unicos) == 1: resumo = unicos[0] else: resumo = "valores diferentes" html += f"
{escape(str(label))}: {escape(str(resumo))}" html += "
" return html def _resolver_bounds(df_mapa: pd.DataFrame, lat_key: str, lon_key: str) -> list[list[float]]: df_bounds = df_mapa if len(df_mapa) >= 8: lat_vals = df_mapa[lat_key] lon_vals = df_mapa[lon_key] lat_med = float(lat_vals.median()) lon_med = float(lon_vals.median()) lat_mad = float((lat_vals - lat_med).abs().median()) lon_mad = float((lon_vals - lon_med).abs().median()) lat_span = float(lat_vals.max() - lat_vals.min()) lon_span = float(lon_vals.max() - lon_vals.min()) lat_scale = max(lat_mad, lat_span / 30.0, 1e-6) lon_scale = max(lon_mad, lon_span / 30.0, 1e-6) score = ((lat_vals - lat_med) / lat_scale) ** 2 + ((lon_vals - lon_med) / lon_scale) ** 2 lim = float(score.quantile(0.75)) df_core = df_mapa[score <= lim] if len(df_core) >= max(5, int(len(df_mapa) * 0.45)): df_bounds = df_core if len(df_bounds) >= 50: lat_min, lat_max = df_bounds[lat_key].quantile([0.01, 0.99]).tolist() lon_min, lon_max = df_bounds[lon_key].quantile([0.01, 0.99]).tolist() else: lat_min, lat_max = float(df_bounds[lat_key].min()), float(df_bounds[lat_key].max()) lon_min, lon_max = float(df_bounds[lon_key].min()), float(df_bounds[lon_key].max()) if not np.isfinite(lat_min) or not np.isfinite(lat_max): lat_min, lat_max = float(df_mapa[lat_key].min()), float(df_mapa[lat_key].max()) if not np.isfinite(lon_min) or not np.isfinite(lon_max): lon_min, lon_max = float(df_mapa[lon_key].min()), float(df_mapa[lon_key].max()) if np.isclose(lat_min, lat_max): lat_min = float(lat_min) - 0.0008 lat_max = float(lat_max) + 0.0008 if np.isclose(lon_min, lon_max): lon_min = float(lon_min) - 0.0008 lon_max = float(lon_max) + 0.0008 return [[float(lat_min), float(lon_min)], [float(lat_max), float(lon_max)]] def _normalizar_bounds_lista(bounds: list[list[float]] | None) -> list[list[float]] | None: coords = [] for item in bounds or []: if not isinstance(item, (list, tuple)) or len(item) < 2: continue try: lat = float(item[0]) lon = float(item[1]) except Exception: continue if not np.isfinite(lat) or not np.isfinite(lon): continue coords.append((lat, lon)) if not coords: return None lat_values = [lat for lat, _ in coords] lon_values = [lon for _, lon in coords] lat_min = min(lat_values) lat_max = max(lat_values) lon_min = min(lon_values) lon_max = max(lon_values) if np.isclose(lat_min, lat_max): lat_min -= 0.0008 lat_max += 0.0008 if np.isclose(lon_min, lon_max): lon_min -= 0.0008 lon_max += 0.0008 return [[float(lat_min), float(lon_min)], [float(lat_max), float(lon_max)]] def build_leaflet_payload( *, bounds: list[list[float]] | None, center: list[float] | None = None, legend: dict[str, Any] | None = None, overlay_layers: list[dict[str, Any]] | None = None, notice: dict[str, Any] | None = None, show_bairros: bool = True, bairros_geojson_url: str = "/api/visualizacao/map/bairros.geojson", ) -> dict[str, Any] | None: normalized_bounds = _normalizar_bounds_lista(bounds) if normalized_bounds is None: return None if center and len(center) >= 2: center_lat = float(center[0]) center_lon = float(center[1]) else: center_lat = float((normalized_bounds[0][0] + normalized_bounds[1][0]) / 2.0) center_lon = float((normalized_bounds[0][1] + normalized_bounds[1][1]) / 2.0) final_overlay_layers = list(overlay_layers or []) if show_bairros: final_overlay_layers.insert( 0, { "id": "bairros", "label": "Bairros", "show": True, "geojson_url": bairros_geojson_url, "geojson_pane": "mesa-bairros-pane", "geojson_style": { "color": "#4c6882", "weight": 1.15, "opacity": 0.88, "fillColor": "#f39c12", "fillOpacity": 0.0, }, "geojson_tooltip_properties": ["NOME", "BAIRRO", "NME_BAI", "NOME_BAIRRO"], "geojson_tooltip_label": "Bairro", }, ) return { "type": "mesa_leaflet_payload", "version": 1, "center": [center_lat, center_lon], "bounds": normalized_bounds, "tile_layers": _TILE_LAYERS, "controls": { "fullscreen": True, "measure": True, "layer_control": True, }, "radius_behavior": { "min_radius": 1.6, "max_radius": 52.0, "reference_zoom": 12.0, "growth_factor": 0.20, }, "legend": legend, "notice": notice, "overlay_layers": final_overlay_layers, } def build_elaboracao_map_payload( df: pd.DataFrame, *, lat_col: str = "lat", lon_col: str = "lon", cor_col: str | None = None, indice_destacado: Any = None, tamanho_col: str | None = None, modo: str | None = "pontos", cor_vmin: float | None = None, cor_vmax: float | None = None, cor_caption: str | None = None, cor_colors: list[str] | None = None, cor_stops: list[float] | None = None, cor_tick_values: list[float] | None = None, cor_tick_labels: list[str] | None = None, bairros_geojson_url: str = "/api/visualizacao/map/bairros.geojson", popup_source: str | None = "mercado", ) -> dict[str, Any] | None: modo_normalizado = str(modo or "pontos").strip().lower() cols_lower = {str(c).lower(): c for c in df.columns} lat_real = cols_lower.get(str(lat_col).lower()) if str(lat_col).lower() in cols_lower else None lon_real = cols_lower.get(str(lon_col).lower()) if str(lon_col).lower() in cols_lower else None if lat_real is None: for nome in ["lat", "latitude", "siat_latitude"]: if nome in cols_lower: lat_real = cols_lower[nome] break if lon_real is None: for nome in ["lon", "longitude", "long", "siat_longitude"]: if nome in cols_lower: lon_real = cols_lower[nome] break if lat_real is None or lon_real is None: return None row_id_col = "__mesa_row_id__" df_mapa = df.copy() if popup_source and row_id_col not in df_mapa.columns: df_mapa[row_id_col] = np.arange(len(df_mapa), dtype=int) df_mapa[lat_real] = pd.to_numeric(df_mapa[lat_real], errors="coerce") df_mapa[lon_real] = pd.to_numeric(df_mapa[lon_real], errors="coerce") df_mapa = df_mapa.dropna(subset=[lat_real, lon_real]) df_mapa = df_mapa[~((df_mapa[lat_real] == 0.0) & (df_mapa[lon_real] == 0.0))] df_mapa = df_mapa[ (df_mapa[lat_real] >= -90.0) & (df_mapa[lat_real] <= 90.0) & (df_mapa[lon_real] >= -180.0) & (df_mapa[lon_real] <= 180.0) ].copy() if df_mapa.empty: return None limite_pontos = 2500 total_pontos = len(df_mapa) houve_amostragem = total_pontos > limite_pontos if houve_amostragem: df_mapa = df_mapa.sample(n=limite_pontos, random_state=42).copy() centro_lat = float(df_mapa[lat_real].median()) centro_lon = float(df_mapa[lon_real].median()) colors = ( [str(item) for item in cor_colors if str(item).strip()] if isinstance(cor_colors, list) and len(cor_colors) >= 2 else ["#2ecc71", "#a8e06c", "#f1c40f", "#e67e22", "#e74c3c"] ) modo_calor = modo_normalizado == "calor" and tamanho_col is not None and tamanho_col in df_mapa.columns modo_superficie = modo_normalizado == "superficie" and tamanho_col is not None and tamanho_col in df_mapa.columns if modo_normalizado not in {"pontos", "calor", "superficie"}: modo_normalizado = "pontos" cor_col_resolvida = cor_col or tamanho_col colormap = None legend = None if cor_col_resolvida and cor_col_resolvida in df_mapa.columns: serie_cor = pd.to_numeric(df_mapa[cor_col_resolvida], errors="coerce") vmin = float(cor_vmin) if cor_vmin is not None and np.isfinite(cor_vmin) else float(serie_cor.min()) vmax = float(cor_vmax) if cor_vmax is not None and np.isfinite(cor_vmax) else float(serie_cor.max()) if np.isfinite(vmin) and np.isfinite(vmax): if np.isclose(vmin, vmax): vmax = float(vmin) + 1.0 color_index = _normalizar_stops_cor(cor_stops, colors, float(vmin), float(vmax)) colormap_kwargs: dict[str, Any] = { "colors": colors, "vmin": float(vmin), "vmax": float(vmax), "caption": str(cor_caption or cor_col_resolvida), } if color_index is not None: colormap_kwargs["index"] = color_index colormap = cm.LinearColormap(**colormap_kwargs) legend = { "title": str(cor_caption or cor_col_resolvida), "vmin": float(vmin), "vmax": float(vmax), "colors": colors, } if ( isinstance(cor_tick_values, list) and isinstance(cor_tick_labels, list) and len(cor_tick_values) == len(cor_tick_labels) and len(cor_tick_values) > 1 ): try: legend["tick_values"] = [float(item) for item in cor_tick_values] legend["tick_labels"] = [str(item) for item in cor_tick_labels] except (TypeError, ValueError): pass raio_min, raio_max = 3.0, 18.0 tamanho_func = None if tamanho_col and tamanho_col in df_mapa.columns: serie_tamanho = pd.to_numeric(df_mapa[tamanho_col], errors="coerce") t_min = float(serie_tamanho.min()) t_max = float(serie_tamanho.max()) if np.isfinite(t_min) and np.isfinite(t_max): if t_max > t_min: tamanho_func = ( lambda v, _min=t_min, _max=t_max: raio_min + (v - _min) / (_max - _min) * (raio_max - raio_min) ) else: tamanho_func = lambda _v: (raio_min + raio_max) / 2.0 mostrar_indices = not modo_calor and not modo_superficie and len(df_mapa) <= 800 lat_plot_col = "__mesa_lat_plot__" lon_plot_col = "__mesa_lon_plot__" if not modo_calor and not modo_superficie: df_plot_pontos = df_mapa.copy() df_plot_pontos[lat_plot_col] = df_plot_pontos[lat_real] df_plot_pontos[lon_plot_col] = df_plot_pontos[lon_real] else: df_plot_pontos = df_mapa.copy() df_plot_pontos[lat_plot_col] = df_plot_pontos[lat_real] df_plot_pontos[lon_plot_col] = df_plot_pontos[lon_real] overlay_layers: list[dict[str, Any]] = [] if modo_calor and tamanho_col and tamanho_col in df_mapa.columns: pesos = pd.to_numeric(df_mapa[tamanho_col], errors="coerce") mask_pesos = np.isfinite(pesos.to_numpy()) df_calor = df_mapa.loc[mask_pesos, [lat_real, lon_real]].copy() if not df_calor.empty: pesos_validos = pesos.loc[df_calor.index].to_numpy(dtype=float) fixed_scale = ( cor_vmin is not None and cor_vmax is not None and np.isfinite(cor_vmin) and np.isfinite(cor_vmax) and float(cor_vmax) > float(cor_vmin) ) if fixed_scale: peso_min = float(cor_vmin) peso_max = float(cor_vmax) pesos_clip = np.clip(pesos_validos, peso_min, peso_max) pesos_norm = (pesos_clip - peso_min) / (peso_max - peso_min) else: peso_min = float(np.min(pesos_validos)) peso_max = float(np.max(pesos_validos)) if np.isfinite(peso_min) and np.isfinite(peso_max) and peso_max > peso_min: pesos_norm = 0.1 + 0.9 * (pesos_validos - peso_min) / (peso_max - peso_min) else: pesos_norm = np.ones_like(pesos_validos) gradient = None if len(colors) >= 2: if ( cor_vmin is not None and cor_vmax is not None and np.isfinite(cor_vmin) and np.isfinite(cor_vmax) and float(cor_vmax) > float(cor_vmin) ): color_index = _normalizar_stops_cor(cor_stops, colors, float(cor_vmin), float(cor_vmax)) if color_index is not None: gradient = {} for stop, color in zip(color_index, colors): ratio = (float(stop) - float(cor_vmin)) / (float(cor_vmax) - float(cor_vmin)) gradient[float(np.clip(ratio, 0.0, 1.0))] = color elif len(colors) == 2: gradient = {0.0: colors[0], 1.0: colors[1]} else: gradient = {i / (len(colors) - 1): colors[i] for i in range(len(colors))} elif len(colors) == 2: gradient = {0.0: colors[0], 1.0: colors[1]} else: gradient = {i / (len(colors) - 1): colors[i] for i in range(len(colors))} overlay_layers.append( { "id": "mapa_calor", "label": "Mapa de calor", "show": True, "heatmap": { "points": [ { "lat": float(df_calor.iloc[idx][lat_real]), "lon": float(df_calor.iloc[idx][lon_real]), "weight": float(pesos_norm[idx]), } for idx in range(len(df_calor)) ], "radius": 20, "blur": 18, "min_opacity": 0.28, "max_zoom": 17, "gradient": gradient, }, } ) elif modo_superficie and tamanho_col and tamanho_col in df_mapa.columns: lats = pd.to_numeric(df_mapa[lat_real], errors="coerce").to_numpy(dtype=float) lons = pd.to_numeric(df_mapa[lon_real], errors="coerce").to_numpy(dtype=float) valores = pd.to_numeric(df_mapa[tamanho_col], errors="coerce").to_numpy(dtype=float) mask_valid = np.isfinite(lats) & np.isfinite(lons) & np.isfinite(valores) if mask_valid.sum() >= 6: lats = lats[mask_valid] lons = lons[mask_valid] valores = valores[mask_valid] contorno = _contorno_convexo_lng_lat(lons, lats) if contorno is not None: lon_min = float(np.min(contorno[:, 0])) lon_max = float(np.max(contorno[:, 0])) lat_min = float(np.min(contorno[:, 1])) lat_max = float(np.max(contorno[:, 1])) if not np.isclose(lon_min, lon_max) and not np.isclose(lat_min, lat_max): n_obs = len(valores) n_grid = 46 if n_obs <= 400 else (40 if n_obs <= 1200 else 34) grid_lon = np.linspace(lon_min, lon_max, n_grid) grid_lat = np.linspace(lat_min, lat_max, n_grid) mesh_lon, mesh_lat = np.meshgrid(grid_lon, grid_lat) pontos = np.column_stack([lons, lats]) try: superficie = griddata(pontos, valores, (mesh_lon, mesh_lat), method="linear") except Exception: superficie = None if superficie is not None: superficie = np.asarray(superficie, dtype=float) if np.isnan(superficie).all(): try: superficie = griddata(pontos, valores, (mesh_lon, mesh_lat), method="nearest") except Exception: superficie = None elif np.isnan(superficie).any(): try: nearest = griddata(pontos, valores, (mesh_lon, mesh_lat), method="nearest") except Exception: nearest = None if nearest is not None: superficie = np.where(np.isfinite(superficie), superficie, np.asarray(nearest, dtype=float)) if superficie is not None: superficie = np.asarray(superficie, dtype=float) mascara = _mascara_dentro_poligono(mesh_lon, mesh_lat, contorno) superficie = np.where(mascara, superficie, np.nan) if np.isfinite(superficie).any(): vmin_sup = float(cor_vmin) if cor_vmin is not None and np.isfinite(cor_vmin) else float(np.nanmin(superficie)) vmax_sup = float(cor_vmax) if cor_vmax is not None and np.isfinite(cor_vmax) else float(np.nanmax(superficie)) if np.isfinite(vmin_sup) and np.isfinite(vmax_sup): if np.isclose(vmin_sup, vmax_sup): vmax_sup = vmin_sup + 1.0 color_index = _normalizar_stops_cor(cor_stops, colors, vmin_sup, vmax_sup) colormap_kwargs = { "colors": colors, "vmin": float(vmin_sup), "vmax": float(vmax_sup), "caption": str(cor_caption or f"{tamanho_col} (superfície)"), } if color_index is not None: colormap_kwargs["index"] = color_index colormap = cm.LinearColormap(**colormap_kwargs) legend = { "title": str(cor_caption or f"{tamanho_col} (superfície)"), "vmin": float(vmin_sup), "vmax": float(vmax_sup), "colors": colors, } if ( isinstance(cor_tick_values, list) and isinstance(cor_tick_labels, list) and len(cor_tick_values) == len(cor_tick_labels) and len(cor_tick_values) > 1 ): try: legend["tick_values"] = [float(item) for item in cor_tick_values] legend["tick_labels"] = [str(item) for item in cor_tick_labels] except (TypeError, ValueError): pass centros_lon = (grid_lon[:-1] + grid_lon[1:]) / 2.0 centros_lat = (grid_lat[:-1] + grid_lat[1:]) / 2.0 centro_mesh_lon, centro_mesh_lat = np.meshgrid(centros_lon, centros_lat) mascara_centros = _mascara_dentro_poligono(centro_mesh_lon, centro_mesh_lat, contorno) valores_celula = ( superficie[:-1, :-1] + superficie[1:, :-1] + superficie[:-1, 1:] + superficie[1:, 1:] ) / 4.0 shapes = [] for i in range(valores_celula.shape[0]): for j in range(valores_celula.shape[1]): if not mascara_centros[i, j]: continue valor = valores_celula[i, j] if not np.isfinite(valor): continue cor = str(colormap(float(valor))) valor_fmt = f"{float(valor):.3f}".replace(".", ",") shapes.append( { "type": "polygon", "coords": [ [float(grid_lat[i]), float(grid_lon[j])], [float(grid_lat[i]), float(grid_lon[j + 1])], [float(grid_lat[i + 1]), float(grid_lon[j + 1])], [float(grid_lat[i + 1]), float(grid_lon[j])], ], "color": cor, "weight": 0, "fill": True, "fill_color": cor, "fill_opacity": 0.6, "tooltip_html": ( f"
" f"{str(tamanho_col)} interpolado: {valor_fmt}" "
" ), } ) if shapes: overlay_layers.append( { "id": "superficie_continua", "label": "Superfície contínua", "show": True, "shapes": shapes, } ) else: market_points: list[dict[str, Any]] = [] indices_markers: list[dict[str, Any]] = [] grupos_coord = df_plot_pontos.groupby( [df_plot_pontos[lat_real].round(7), df_plot_pontos[lon_real].round(7)], sort=False, ).indices for marker_ordem, posicoes_raw in enumerate(grupos_coord.values()): posicoes = list(posicoes_raw) rows_grupo = [df_plot_pontos.iloc[int(pos)] for pos in posicoes] row = rows_grupo[0] idx = row.name registros_grupo: list[dict[str, Any]] = [] indices_display: list[Any] = [] valores_tooltip: list[str] = [] cores_grupo: list[str] = [] raios_grupo: list[float] = [] destaque_grupo = False for pos in posicoes: row_item = df_plot_pontos.iloc[int(pos)] idx_item = row_item.name idx_display_item = int(idx_item) if isinstance(idx_item, (int, np.integer)) else int(pos) + 1 indices_display.append(idx_display_item) valor_texto = "" if tamanho_col and tamanho_col in df_plot_pontos.columns: valor_texto = _formatar_valor_resumo_mapa(str(tamanho_col), row_item[tamanho_col]) valores_tooltip.append(valor_texto) cor_item = COR_PRINCIPAL if colormap and cor_col_resolvida and cor_col_resolvida in df_mapa.columns: valor_cor = pd.to_numeric(pd.Series([row_item.get(cor_col_resolvida)]), errors="coerce").iloc[0] if pd.notna(valor_cor): cor_item = str(colormap(float(valor_cor))) cores_grupo.append(cor_item) if idx_item == indice_destacado: raio_item = raio_max + 4.0 destaque_grupo = True elif tamanho_func and tamanho_col and tamanho_col in row_item.index and pd.notna(row_item[tamanho_col]): raio_item = float(tamanho_func(float(row_item[tamanho_col]))) else: raio_item = 4.0 raios_grupo.append(float(max(1.0, raio_item))) row_id_raw_item = row_item[row_id_col] if popup_source and row_id_col in row_item.index else None popup_request_item = None if row_id_raw_item is not None and pd.notna(row_id_raw_item): popup_request_item = { "kind": "elaboracao_row", "row_id": int(row_id_raw_item), "source": str(popup_source), } registros_grupo.append( { "indice": idx_display_item, "label": f"Índice {idx_display_item}", "value_label": str(tamanho_col or ""), "value": valor_texto, "popup_request": popup_request_item, } ) cor = cores_grupo[0] if cores_grupo else COR_PRINCIPAL raio = max(raios_grupo) if raios_grupo else 4.0 stroke_weight = 3.0 if destaque_grupo else 1.0 fill_opacity = 0.8 if destaque_grupo else 0.6 is_grouped = len(rows_grupo) > 1 idx_display = indices_display[0] if indices_display else marker_ordem + 1 tooltip_html = _tooltip_html_grupo_mercado( indices_display, label=str(tamanho_col or "") if tamanho_col else None, valores=valores_tooltip, ) if is_grouped else ( "
" f"Índice {idx_display}" + ( f"
{escape(str(tamanho_col))}: {escape(valores_tooltip[0])}" if tamanho_col and valores_tooltip else "" ) + "
" ) point_payload = { "lat": float(row[lat_plot_col]), "lon": float(row[lon_plot_col]), "indice": _formatar_indices_badge(indices_display) if is_grouped else idx_display, "color": cor, "base_radius": float(max(1.0, raio)), "stroke_color": "#243746" if is_grouped else "#000000", "stroke_weight": float(max(stroke_weight, 2.0) if is_grouped else stroke_weight), "fill_opacity": float(0.74 if is_grouped else fill_opacity), "tooltip_html": tooltip_html, } if is_grouped: point_payload.update( { "grouped": True, "count": len(rows_grupo), "group_title": f"{len(rows_grupo)} dados neste local", "group_items": registros_grupo, } ) else: point_payload["popup_request"] = registros_grupo[0].get("popup_request") if registros_grupo else None market_points.append(point_payload) if mostrar_indices: indices_markers.append( { "lat": float(row[lat_plot_col]), "lon": float(row[lon_plot_col]), "marker_html": ( '
' + escape(_formatar_indices_badge(indices_display)) + "
" ), "icon_size": [96 if is_grouped else 72, 24], "icon_anchor": [0, 0], "class_name": "mesa-indice-label", "interactive": False, "keyboard": False, } ) overlay_layers.append( { "id": "mercado", "label": "Mercado", "show": True, "points": market_points, } ) if mostrar_indices and indices_markers: overlay_layers.append( { "id": "indices", "label": "Índices", "show": False, "markers": indices_markers, } ) notice = None if houve_amostragem: notice = { "message": f"Exibindo {len(df_mapa)} de {total_pontos} pontos para melhor desempenho.", "position": "topright", } return build_leaflet_payload( bounds=_resolver_bounds(df_mapa, str(lat_real), str(lon_real)), center=[centro_lat, centro_lon], legend=legend, notice=notice, overlay_layers=overlay_layers, show_bairros=True, bairros_geojson_url=bairros_geojson_url, ) def build_visualizacao_map_payload( df: pd.DataFrame, *, cor_col: str | None = None, tamanho_col: str | None = None, col_y: str | None = None, avaliandos_tecnicos: list[dict[str, Any]] | None = None, bairros_geojson_url: str = "/api/visualizacao/map/bairros.geojson", ) -> dict[str, Any] | None: lat_real = _detectar_coluna(df, _LAT_ALIASES) lon_real = _detectar_coluna(df, _LON_ALIASES) if lat_real is None or lon_real is None: return None df_mapa = df.copy() lat_key = "__mesa_lat__" lon_key = "__mesa_lon__" lat_serie = _primeira_serie_por_nome(df_mapa, lat_real) lon_serie = _primeira_serie_por_nome(df_mapa, lon_real) if lat_serie is None or lon_serie is None: return None df_mapa[lat_key] = pd.to_numeric(lat_serie, errors="coerce") df_mapa[lon_key] = pd.to_numeric(lon_serie, errors="coerce") df_mapa = df_mapa.dropna(subset=[lat_key, lon_key]) df_mapa = df_mapa[ (df_mapa[lat_key] >= -90.0) & (df_mapa[lat_key] <= 90.0) & (df_mapa[lon_key] >= -180.0) & (df_mapa[lon_key] <= 180.0) ].copy() if df_mapa.empty: return None centro_lat = float(df_mapa[lat_key].median()) centro_lon = float(df_mapa[lon_key].median()) cor_col_resolvida = cor_col if tamanho_col and tamanho_col != "Visualização Padrão" and not cor_col_resolvida: cor_col_resolvida = tamanho_col colormap = None cor_key = None legend = None if cor_col_resolvida and cor_col_resolvida in df_mapa.columns: serie_cor = _primeira_serie_por_nome(df_mapa, cor_col_resolvida) if serie_cor is not None: cor_key = "__mesa_cor__" df_mapa[cor_key] = pd.to_numeric(serie_cor, errors="coerce") vmin = df_mapa[cor_key].min() vmax = df_mapa[cor_key].max() if pd.notna(vmin) and pd.notna(vmax): colormap = cm.LinearColormap( colors=["#2ecc71", "#a8e06c", "#f1c40f", "#e67e22", "#e74c3c"], vmin=vmin, vmax=vmax, caption=cor_col_resolvida, ) legend = { "title": str(cor_col_resolvida), "vmin": float(vmin), "vmax": float(vmax), "colors": ["#2ecc71", "#a8e06c", "#f1c40f", "#e67e22", "#e74c3c"], } raio_min, raio_max = 3.0, 18.0 tamanho_func = None tamanho_key = None if tamanho_col and tamanho_col != "Visualização Padrão" and tamanho_col in df_mapa.columns: serie_tamanho = _primeira_serie_por_nome(df_mapa, tamanho_col) if serie_tamanho is not None: tamanho_key = "__mesa_tamanho__" df_mapa[tamanho_key] = pd.to_numeric(serie_tamanho, errors="coerce") t_min = df_mapa[tamanho_key].min() t_max = df_mapa[tamanho_key].max() if pd.notna(t_min) and pd.notna(t_max): if t_max > t_min: tamanho_func = ( lambda v, _min=t_min, _max=t_max: raio_min + (v - _min) / (_max - _min) * (raio_max - raio_min) ) else: tamanho_func = lambda v: (raio_min + raio_max) / 2.0 show_indices = False lat_plot_key = "__mesa_lat_plot__" lon_plot_key = "__mesa_lon_plot__" df_plot_pontos = df_mapa.copy() df_plot_pontos[lat_plot_key] = df_plot_pontos[lat_key] df_plot_pontos[lon_plot_key] = df_plot_pontos[lon_key] tooltip_col = None tooltip_key = None if tamanho_key: tooltip_col = tamanho_col tooltip_key = tamanho_key elif col_y and col_y in df_mapa.columns: serie_tooltip = _primeira_serie_por_nome(df_mapa, col_y) if serie_tooltip is not None: tooltip_col = col_y tooltip_key = "__mesa_tooltip__" df_mapa[tooltip_key] = serie_tooltip df_plot_pontos[tooltip_key] = df_mapa.loc[df_plot_pontos.index, tooltip_key] total_pontos_plot = len(df_plot_pontos) raio_padrao = 4.0 if total_pontos_plot <= 2500 else 3.0 market_points: list[dict[str, Any]] = [] grupos_coord = df_plot_pontos.groupby( [df_plot_pontos[lat_key].round(7), df_plot_pontos[lon_key].round(7)], sort=False, ).indices for marker_ordem, posicoes_raw in enumerate(grupos_coord.values()): posicoes = list(posicoes_raw) rows_grupo = [df_plot_pontos.iloc[int(pos)] for pos in posicoes] row = rows_grupo[0] indices_display: list[Any] = [] registros_grupo: list[dict[str, Any]] = [] valores_tooltip: list[str] = [] cores_grupo: list[str] = [] raios_grupo: list[float] = [] for pos in posicoes: row_item = df_plot_pontos.iloc[int(pos)] idx_item = row_item.name idx_display_item = int(row_item["index"]) if "index" in row_item.index else int(idx_item) indices_display.append(idx_display_item) valor_texto = ( _formatar_tooltip_valor(str(tooltip_col or ""), row_item[tooltip_key]) if tooltip_col and tooltip_key and tooltip_key in row_item.index else "" ) if valor_texto: valores_tooltip.append(valor_texto) cor_item = colormap(row_item[cor_key]) if colormap and cor_key and pd.notna(row_item[cor_key]) else COR_PRINCIPAL cores_grupo.append(str(cor_item)) if tamanho_func and tamanho_key and pd.notna(row_item[tamanho_key]): raio_item = float(tamanho_func(row_item[tamanho_key])) else: raio_item = raio_padrao raios_grupo.append(float(max(1.0, raio_item))) row_id_raw_item = row_item["__mesa_row_id__"] if "__mesa_row_id__" in row_item.index else None row_id_item = int(row_id_raw_item) if row_id_raw_item is not None and pd.notna(row_id_raw_item) else None registros_grupo.append( { "indice": idx_display_item, "label": f"Índice {idx_display_item}", "value_label": str(tooltip_col or ""), "value": valor_texto, "popup_request": ( {"kind": "visualizacao_row", "row_id": row_id_item} if row_id_item is not None else None ), } ) is_grouped = len(rows_grupo) > 1 idx_display = indices_display[0] if indices_display else marker_ordem cor = cores_grupo[0] if cores_grupo else COR_PRINCIPAL raio = max(raios_grupo) if raios_grupo else raio_padrao tooltip_payload = { "title": f"Índice {idx_display}", "label": str(tooltip_col or ""), "value": valores_tooltip[0] if valores_tooltip else "", } row_id_raw = row["__mesa_row_id__"] if "__mesa_row_id__" in row.index else None point_payload = { "lat": float(row[lat_plot_key]), "lon": float(row[lon_plot_key]), "indice": _formatar_indices_badge(indices_display) if is_grouped else idx_display, "row_id": int(row_id_raw) if (not is_grouped and row_id_raw is not None and pd.notna(row_id_raw)) else None, "color": str(cor), "base_radius": float(max(1.0, raio)), } if is_grouped: point_payload.update( { "grouped": True, "count": len(rows_grupo), "stroke_color": "#243746", "stroke_weight": 2.0, "fill_opacity": 0.74, "tooltip_html": _tooltip_html_grupo_mercado( indices_display, label=str(tooltip_col or "") if tooltip_col else None, valores=valores_tooltip, ), "group_title": f"{len(rows_grupo)} dados neste local", "group_items": registros_grupo, } ) else: point_payload["tooltip"] = tooltip_payload market_points.append(point_payload) return { "type": "mesa_leaflet_payload", "version": 1, "center": [centro_lat, centro_lon], "bounds": _resolver_bounds(df_mapa, lat_key, lon_key), "tile_layers": _TILE_LAYERS, "controls": { "fullscreen": True, "measure": True, "layer_control": True, }, "radius_behavior": { "min_radius": 1.6, "max_radius": 52.0, "reference_zoom": 12.0, "growth_factor": 0.20, }, "show_indices": show_indices, "show_bairros": True, "bairros_geojson_url": bairros_geojson_url, "legend": legend, "market_points": market_points, "trabalhos_tecnicos_points": build_trabalhos_tecnicos_marker_payloads(avaliandos_tecnicos), }