Spaces:
Running
Running
| from __future__ import annotations | |
| from html import escape | |
| from typing import Any | |
| import branca.colormap as cm | |
| import numpy as np | |
| import pandas as pd | |
| from scipy.interpolate import griddata | |
| from app.core.elaboracao.charts import ( | |
| _contorno_convexo_lng_lat, | |
| _mascara_dentro_poligono, | |
| _normalizar_stops_cor, | |
| ) | |
| from app.core.map_layers import build_trabalhos_tecnicos_marker_payloads | |
| from app.core.visualizacao.app import COR_PRINCIPAL, formatar_monetario | |
| _LAT_ALIASES = {"lat", "latitude", "siat_latitude"} | |
| _LON_ALIASES = {"lon", "longitude", "long", "siat_longitude"} | |
| _TILE_LAYERS = [ | |
| { | |
| "id": "positron", | |
| "label": "Positron", | |
| "url": "https://{s}.basemaps.cartocdn.com/light_all/{z}/{x}/{y}{r}.png", | |
| "attribution": "© OpenStreetMap contributors © CARTO", | |
| }, | |
| { | |
| "id": "osm", | |
| "label": "OpenStreetMap", | |
| "url": "https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png", | |
| "attribution": "© OpenStreetMap contributors", | |
| }, | |
| ] | |
| def _primeira_serie_por_nome(dataframe: pd.DataFrame, nome_coluna: str) -> pd.Series | None: | |
| matches = [i for i, c in enumerate(dataframe.columns) if str(c) == str(nome_coluna)] | |
| if not matches: | |
| return None | |
| return dataframe.iloc[:, matches[0]] | |
| def _detectar_coluna(df: pd.DataFrame, aliases: set[str]) -> str | None: | |
| for col in df.columns: | |
| if str(col).lower() in aliases: | |
| return str(col) | |
| return None | |
| def _formatar_tooltip_valor(coluna: str | None, valor: Any) -> str: | |
| if valor is None: | |
| return "—" | |
| try: | |
| if pd.isna(valor): | |
| return "—" | |
| except Exception: | |
| pass | |
| col_norm = str(coluna or "").lower() | |
| if isinstance(valor, (int, float, np.integer, np.floating)): | |
| numero = float(valor) | |
| if not np.isfinite(numero): | |
| return "—" | |
| if any(k in col_norm for k in ["valor", "preco", "vu", "vunit"]): | |
| return formatar_monetario(numero) | |
| return f"{numero:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".") | |
| return str(valor) | |
| def _formatar_valor_resumo_mapa(coluna: str | None, valor: Any) -> str: | |
| return _formatar_tooltip_valor(coluna, valor) | |
| def _formatar_indices_tooltip(indices: list[Any]) -> str: | |
| return ", ".join(str(item) for item in indices) | |
| def _formatar_indices_badge(indices: list[Any], limite: int = 28) -> str: | |
| texto = ", ".join(str(item) for item in indices) | |
| if len(texto) <= limite: | |
| return texto | |
| return texto[: max(0, limite - 1)].rstrip(" ,") + "…" | |
| def _tooltip_html_grupo_mercado(indices: list[Any], label: str | None = None, valores: list[str] | None = None) -> str: | |
| total = len(indices) | |
| indices_txt = escape(_formatar_indices_tooltip(indices)) | |
| titulo = f"{total} dados neste local" if total != 1 else "1 dado neste local" | |
| html = ( | |
| "<div style='font-family:\"Segoe UI\",Arial,sans-serif; font-size:14px; line-height:1.7; padding:2px 4px;'>" | |
| f"<b>{escape(titulo)}</b>" | |
| f"<br><span style='color:#555;'>Índices:</span> <b>{indices_txt}</b>" | |
| ) | |
| valores_limpos = [str(item) for item in valores or [] if str(item).strip()] | |
| if label and valores_limpos: | |
| unicos = list(dict.fromkeys(valores_limpos)) | |
| if len(unicos) == 1: | |
| resumo = unicos[0] | |
| else: | |
| resumo = "valores diferentes" | |
| html += f"<br><span style='color:#555;'>{escape(str(label))}:</span> <b>{escape(str(resumo))}</b>" | |
| html += "</div>" | |
| return html | |
| def _resolver_bounds(df_mapa: pd.DataFrame, lat_key: str, lon_key: str) -> list[list[float]]: | |
| df_bounds = df_mapa | |
| if len(df_mapa) >= 8: | |
| lat_vals = df_mapa[lat_key] | |
| lon_vals = df_mapa[lon_key] | |
| lat_med = float(lat_vals.median()) | |
| lon_med = float(lon_vals.median()) | |
| lat_mad = float((lat_vals - lat_med).abs().median()) | |
| lon_mad = float((lon_vals - lon_med).abs().median()) | |
| lat_span = float(lat_vals.max() - lat_vals.min()) | |
| lon_span = float(lon_vals.max() - lon_vals.min()) | |
| lat_scale = max(lat_mad, lat_span / 30.0, 1e-6) | |
| lon_scale = max(lon_mad, lon_span / 30.0, 1e-6) | |
| score = ((lat_vals - lat_med) / lat_scale) ** 2 + ((lon_vals - lon_med) / lon_scale) ** 2 | |
| lim = float(score.quantile(0.75)) | |
| df_core = df_mapa[score <= lim] | |
| if len(df_core) >= max(5, int(len(df_mapa) * 0.45)): | |
| df_bounds = df_core | |
| if len(df_bounds) >= 50: | |
| lat_min, lat_max = df_bounds[lat_key].quantile([0.01, 0.99]).tolist() | |
| lon_min, lon_max = df_bounds[lon_key].quantile([0.01, 0.99]).tolist() | |
| else: | |
| lat_min, lat_max = float(df_bounds[lat_key].min()), float(df_bounds[lat_key].max()) | |
| lon_min, lon_max = float(df_bounds[lon_key].min()), float(df_bounds[lon_key].max()) | |
| if not np.isfinite(lat_min) or not np.isfinite(lat_max): | |
| lat_min, lat_max = float(df_mapa[lat_key].min()), float(df_mapa[lat_key].max()) | |
| if not np.isfinite(lon_min) or not np.isfinite(lon_max): | |
| lon_min, lon_max = float(df_mapa[lon_key].min()), float(df_mapa[lon_key].max()) | |
| if np.isclose(lat_min, lat_max): | |
| lat_min = float(lat_min) - 0.0008 | |
| lat_max = float(lat_max) + 0.0008 | |
| if np.isclose(lon_min, lon_max): | |
| lon_min = float(lon_min) - 0.0008 | |
| lon_max = float(lon_max) + 0.0008 | |
| return [[float(lat_min), float(lon_min)], [float(lat_max), float(lon_max)]] | |
| def _normalizar_bounds_lista(bounds: list[list[float]] | None) -> list[list[float]] | None: | |
| coords = [] | |
| for item in bounds or []: | |
| if not isinstance(item, (list, tuple)) or len(item) < 2: | |
| continue | |
| try: | |
| lat = float(item[0]) | |
| lon = float(item[1]) | |
| except Exception: | |
| continue | |
| if not np.isfinite(lat) or not np.isfinite(lon): | |
| continue | |
| coords.append((lat, lon)) | |
| if not coords: | |
| return None | |
| lat_values = [lat for lat, _ in coords] | |
| lon_values = [lon for _, lon in coords] | |
| lat_min = min(lat_values) | |
| lat_max = max(lat_values) | |
| lon_min = min(lon_values) | |
| lon_max = max(lon_values) | |
| if np.isclose(lat_min, lat_max): | |
| lat_min -= 0.0008 | |
| lat_max += 0.0008 | |
| if np.isclose(lon_min, lon_max): | |
| lon_min -= 0.0008 | |
| lon_max += 0.0008 | |
| return [[float(lat_min), float(lon_min)], [float(lat_max), float(lon_max)]] | |
| def build_leaflet_payload( | |
| *, | |
| bounds: list[list[float]] | None, | |
| center: list[float] | None = None, | |
| legend: dict[str, Any] | None = None, | |
| overlay_layers: list[dict[str, Any]] | None = None, | |
| notice: dict[str, Any] | None = None, | |
| show_bairros: bool = True, | |
| bairros_geojson_url: str = "/api/visualizacao/map/bairros.geojson", | |
| ) -> dict[str, Any] | None: | |
| normalized_bounds = _normalizar_bounds_lista(bounds) | |
| if normalized_bounds is None: | |
| return None | |
| if center and len(center) >= 2: | |
| center_lat = float(center[0]) | |
| center_lon = float(center[1]) | |
| else: | |
| center_lat = float((normalized_bounds[0][0] + normalized_bounds[1][0]) / 2.0) | |
| center_lon = float((normalized_bounds[0][1] + normalized_bounds[1][1]) / 2.0) | |
| final_overlay_layers = list(overlay_layers or []) | |
| if show_bairros: | |
| final_overlay_layers.insert( | |
| 0, | |
| { | |
| "id": "bairros", | |
| "label": "Bairros", | |
| "show": True, | |
| "geojson_url": bairros_geojson_url, | |
| "geojson_pane": "mesa-bairros-pane", | |
| "geojson_style": { | |
| "color": "#4c6882", | |
| "weight": 1.15, | |
| "opacity": 0.88, | |
| "fillColor": "#f39c12", | |
| "fillOpacity": 0.0, | |
| }, | |
| "geojson_tooltip_properties": ["NOME", "BAIRRO", "NME_BAI", "NOME_BAIRRO"], | |
| "geojson_tooltip_label": "Bairro", | |
| }, | |
| ) | |
| return { | |
| "type": "mesa_leaflet_payload", | |
| "version": 1, | |
| "center": [center_lat, center_lon], | |
| "bounds": normalized_bounds, | |
| "tile_layers": _TILE_LAYERS, | |
| "controls": { | |
| "fullscreen": True, | |
| "measure": True, | |
| "layer_control": True, | |
| }, | |
| "radius_behavior": { | |
| "min_radius": 1.6, | |
| "max_radius": 52.0, | |
| "reference_zoom": 12.0, | |
| "growth_factor": 0.20, | |
| }, | |
| "legend": legend, | |
| "notice": notice, | |
| "overlay_layers": final_overlay_layers, | |
| } | |
| def build_elaboracao_map_payload( | |
| df: pd.DataFrame, | |
| *, | |
| lat_col: str = "lat", | |
| lon_col: str = "lon", | |
| cor_col: str | None = None, | |
| indice_destacado: Any = None, | |
| tamanho_col: str | None = None, | |
| modo: str | None = "pontos", | |
| cor_vmin: float | None = None, | |
| cor_vmax: float | None = None, | |
| cor_caption: str | None = None, | |
| cor_colors: list[str] | None = None, | |
| cor_stops: list[float] | None = None, | |
| cor_tick_values: list[float] | None = None, | |
| cor_tick_labels: list[str] | None = None, | |
| bairros_geojson_url: str = "/api/visualizacao/map/bairros.geojson", | |
| popup_source: str | None = "mercado", | |
| ) -> dict[str, Any] | None: | |
| modo_normalizado = str(modo or "pontos").strip().lower() | |
| cols_lower = {str(c).lower(): c for c in df.columns} | |
| lat_real = cols_lower.get(str(lat_col).lower()) if str(lat_col).lower() in cols_lower else None | |
| lon_real = cols_lower.get(str(lon_col).lower()) if str(lon_col).lower() in cols_lower else None | |
| if lat_real is None: | |
| for nome in ["lat", "latitude", "siat_latitude"]: | |
| if nome in cols_lower: | |
| lat_real = cols_lower[nome] | |
| break | |
| if lon_real is None: | |
| for nome in ["lon", "longitude", "long", "siat_longitude"]: | |
| if nome in cols_lower: | |
| lon_real = cols_lower[nome] | |
| break | |
| if lat_real is None or lon_real is None: | |
| return None | |
| row_id_col = "__mesa_row_id__" | |
| df_mapa = df.copy() | |
| if popup_source and row_id_col not in df_mapa.columns: | |
| df_mapa[row_id_col] = np.arange(len(df_mapa), dtype=int) | |
| df_mapa[lat_real] = pd.to_numeric(df_mapa[lat_real], errors="coerce") | |
| df_mapa[lon_real] = pd.to_numeric(df_mapa[lon_real], errors="coerce") | |
| df_mapa = df_mapa.dropna(subset=[lat_real, lon_real]) | |
| df_mapa = df_mapa[~((df_mapa[lat_real] == 0.0) & (df_mapa[lon_real] == 0.0))] | |
| df_mapa = df_mapa[ | |
| (df_mapa[lat_real] >= -90.0) | |
| & (df_mapa[lat_real] <= 90.0) | |
| & (df_mapa[lon_real] >= -180.0) | |
| & (df_mapa[lon_real] <= 180.0) | |
| ].copy() | |
| if df_mapa.empty: | |
| return None | |
| limite_pontos = 2500 | |
| total_pontos = len(df_mapa) | |
| houve_amostragem = total_pontos > limite_pontos | |
| if houve_amostragem: | |
| df_mapa = df_mapa.sample(n=limite_pontos, random_state=42).copy() | |
| centro_lat = float(df_mapa[lat_real].median()) | |
| centro_lon = float(df_mapa[lon_real].median()) | |
| colors = ( | |
| [str(item) for item in cor_colors if str(item).strip()] | |
| if isinstance(cor_colors, list) and len(cor_colors) >= 2 | |
| else ["#2ecc71", "#a8e06c", "#f1c40f", "#e67e22", "#e74c3c"] | |
| ) | |
| modo_calor = modo_normalizado == "calor" and tamanho_col is not None and tamanho_col in df_mapa.columns | |
| modo_superficie = modo_normalizado == "superficie" and tamanho_col is not None and tamanho_col in df_mapa.columns | |
| if modo_normalizado not in {"pontos", "calor", "superficie"}: | |
| modo_normalizado = "pontos" | |
| cor_col_resolvida = cor_col or tamanho_col | |
| colormap = None | |
| legend = None | |
| if cor_col_resolvida and cor_col_resolvida in df_mapa.columns: | |
| serie_cor = pd.to_numeric(df_mapa[cor_col_resolvida], errors="coerce") | |
| vmin = float(cor_vmin) if cor_vmin is not None and np.isfinite(cor_vmin) else float(serie_cor.min()) | |
| vmax = float(cor_vmax) if cor_vmax is not None and np.isfinite(cor_vmax) else float(serie_cor.max()) | |
| if np.isfinite(vmin) and np.isfinite(vmax): | |
| if np.isclose(vmin, vmax): | |
| vmax = float(vmin) + 1.0 | |
| color_index = _normalizar_stops_cor(cor_stops, colors, float(vmin), float(vmax)) | |
| colormap_kwargs: dict[str, Any] = { | |
| "colors": colors, | |
| "vmin": float(vmin), | |
| "vmax": float(vmax), | |
| "caption": str(cor_caption or cor_col_resolvida), | |
| } | |
| if color_index is not None: | |
| colormap_kwargs["index"] = color_index | |
| colormap = cm.LinearColormap(**colormap_kwargs) | |
| legend = { | |
| "title": str(cor_caption or cor_col_resolvida), | |
| "vmin": float(vmin), | |
| "vmax": float(vmax), | |
| "colors": colors, | |
| } | |
| if ( | |
| isinstance(cor_tick_values, list) | |
| and isinstance(cor_tick_labels, list) | |
| and len(cor_tick_values) == len(cor_tick_labels) | |
| and len(cor_tick_values) > 1 | |
| ): | |
| try: | |
| legend["tick_values"] = [float(item) for item in cor_tick_values] | |
| legend["tick_labels"] = [str(item) for item in cor_tick_labels] | |
| except (TypeError, ValueError): | |
| pass | |
| raio_min, raio_max = 3.0, 18.0 | |
| tamanho_func = None | |
| if tamanho_col and tamanho_col in df_mapa.columns: | |
| serie_tamanho = pd.to_numeric(df_mapa[tamanho_col], errors="coerce") | |
| t_min = float(serie_tamanho.min()) | |
| t_max = float(serie_tamanho.max()) | |
| if np.isfinite(t_min) and np.isfinite(t_max): | |
| if t_max > t_min: | |
| tamanho_func = ( | |
| lambda v, _min=t_min, _max=t_max: raio_min | |
| + (v - _min) / (_max - _min) * (raio_max - raio_min) | |
| ) | |
| else: | |
| tamanho_func = lambda _v: (raio_min + raio_max) / 2.0 | |
| mostrar_indices = not modo_calor and not modo_superficie and len(df_mapa) <= 800 | |
| lat_plot_col = "__mesa_lat_plot__" | |
| lon_plot_col = "__mesa_lon_plot__" | |
| if not modo_calor and not modo_superficie: | |
| df_plot_pontos = df_mapa.copy() | |
| df_plot_pontos[lat_plot_col] = df_plot_pontos[lat_real] | |
| df_plot_pontos[lon_plot_col] = df_plot_pontos[lon_real] | |
| else: | |
| df_plot_pontos = df_mapa.copy() | |
| df_plot_pontos[lat_plot_col] = df_plot_pontos[lat_real] | |
| df_plot_pontos[lon_plot_col] = df_plot_pontos[lon_real] | |
| overlay_layers: list[dict[str, Any]] = [] | |
| if modo_calor and tamanho_col and tamanho_col in df_mapa.columns: | |
| pesos = pd.to_numeric(df_mapa[tamanho_col], errors="coerce") | |
| mask_pesos = np.isfinite(pesos.to_numpy()) | |
| df_calor = df_mapa.loc[mask_pesos, [lat_real, lon_real]].copy() | |
| if not df_calor.empty: | |
| pesos_validos = pesos.loc[df_calor.index].to_numpy(dtype=float) | |
| fixed_scale = ( | |
| cor_vmin is not None | |
| and cor_vmax is not None | |
| and np.isfinite(cor_vmin) | |
| and np.isfinite(cor_vmax) | |
| and float(cor_vmax) > float(cor_vmin) | |
| ) | |
| if fixed_scale: | |
| peso_min = float(cor_vmin) | |
| peso_max = float(cor_vmax) | |
| pesos_clip = np.clip(pesos_validos, peso_min, peso_max) | |
| pesos_norm = (pesos_clip - peso_min) / (peso_max - peso_min) | |
| else: | |
| peso_min = float(np.min(pesos_validos)) | |
| peso_max = float(np.max(pesos_validos)) | |
| if np.isfinite(peso_min) and np.isfinite(peso_max) and peso_max > peso_min: | |
| pesos_norm = 0.1 + 0.9 * (pesos_validos - peso_min) / (peso_max - peso_min) | |
| else: | |
| pesos_norm = np.ones_like(pesos_validos) | |
| gradient = None | |
| if len(colors) >= 2: | |
| if ( | |
| cor_vmin is not None | |
| and cor_vmax is not None | |
| and np.isfinite(cor_vmin) | |
| and np.isfinite(cor_vmax) | |
| and float(cor_vmax) > float(cor_vmin) | |
| ): | |
| color_index = _normalizar_stops_cor(cor_stops, colors, float(cor_vmin), float(cor_vmax)) | |
| if color_index is not None: | |
| gradient = {} | |
| for stop, color in zip(color_index, colors): | |
| ratio = (float(stop) - float(cor_vmin)) / (float(cor_vmax) - float(cor_vmin)) | |
| gradient[float(np.clip(ratio, 0.0, 1.0))] = color | |
| elif len(colors) == 2: | |
| gradient = {0.0: colors[0], 1.0: colors[1]} | |
| else: | |
| gradient = {i / (len(colors) - 1): colors[i] for i in range(len(colors))} | |
| elif len(colors) == 2: | |
| gradient = {0.0: colors[0], 1.0: colors[1]} | |
| else: | |
| gradient = {i / (len(colors) - 1): colors[i] for i in range(len(colors))} | |
| overlay_layers.append( | |
| { | |
| "id": "mapa_calor", | |
| "label": "Mapa de calor", | |
| "show": True, | |
| "heatmap": { | |
| "points": [ | |
| { | |
| "lat": float(df_calor.iloc[idx][lat_real]), | |
| "lon": float(df_calor.iloc[idx][lon_real]), | |
| "weight": float(pesos_norm[idx]), | |
| } | |
| for idx in range(len(df_calor)) | |
| ], | |
| "radius": 20, | |
| "blur": 18, | |
| "min_opacity": 0.28, | |
| "max_zoom": 17, | |
| "gradient": gradient, | |
| }, | |
| } | |
| ) | |
| elif modo_superficie and tamanho_col and tamanho_col in df_mapa.columns: | |
| lats = pd.to_numeric(df_mapa[lat_real], errors="coerce").to_numpy(dtype=float) | |
| lons = pd.to_numeric(df_mapa[lon_real], errors="coerce").to_numpy(dtype=float) | |
| valores = pd.to_numeric(df_mapa[tamanho_col], errors="coerce").to_numpy(dtype=float) | |
| mask_valid = np.isfinite(lats) & np.isfinite(lons) & np.isfinite(valores) | |
| if mask_valid.sum() >= 6: | |
| lats = lats[mask_valid] | |
| lons = lons[mask_valid] | |
| valores = valores[mask_valid] | |
| contorno = _contorno_convexo_lng_lat(lons, lats) | |
| if contorno is not None: | |
| lon_min = float(np.min(contorno[:, 0])) | |
| lon_max = float(np.max(contorno[:, 0])) | |
| lat_min = float(np.min(contorno[:, 1])) | |
| lat_max = float(np.max(contorno[:, 1])) | |
| if not np.isclose(lon_min, lon_max) and not np.isclose(lat_min, lat_max): | |
| n_obs = len(valores) | |
| n_grid = 46 if n_obs <= 400 else (40 if n_obs <= 1200 else 34) | |
| grid_lon = np.linspace(lon_min, lon_max, n_grid) | |
| grid_lat = np.linspace(lat_min, lat_max, n_grid) | |
| mesh_lon, mesh_lat = np.meshgrid(grid_lon, grid_lat) | |
| pontos = np.column_stack([lons, lats]) | |
| try: | |
| superficie = griddata(pontos, valores, (mesh_lon, mesh_lat), method="linear") | |
| except Exception: | |
| superficie = None | |
| if superficie is not None: | |
| superficie = np.asarray(superficie, dtype=float) | |
| if np.isnan(superficie).all(): | |
| try: | |
| superficie = griddata(pontos, valores, (mesh_lon, mesh_lat), method="nearest") | |
| except Exception: | |
| superficie = None | |
| elif np.isnan(superficie).any(): | |
| try: | |
| nearest = griddata(pontos, valores, (mesh_lon, mesh_lat), method="nearest") | |
| except Exception: | |
| nearest = None | |
| if nearest is not None: | |
| superficie = np.where(np.isfinite(superficie), superficie, np.asarray(nearest, dtype=float)) | |
| if superficie is not None: | |
| superficie = np.asarray(superficie, dtype=float) | |
| mascara = _mascara_dentro_poligono(mesh_lon, mesh_lat, contorno) | |
| superficie = np.where(mascara, superficie, np.nan) | |
| if np.isfinite(superficie).any(): | |
| vmin_sup = float(cor_vmin) if cor_vmin is not None and np.isfinite(cor_vmin) else float(np.nanmin(superficie)) | |
| vmax_sup = float(cor_vmax) if cor_vmax is not None and np.isfinite(cor_vmax) else float(np.nanmax(superficie)) | |
| if np.isfinite(vmin_sup) and np.isfinite(vmax_sup): | |
| if np.isclose(vmin_sup, vmax_sup): | |
| vmax_sup = vmin_sup + 1.0 | |
| color_index = _normalizar_stops_cor(cor_stops, colors, vmin_sup, vmax_sup) | |
| colormap_kwargs = { | |
| "colors": colors, | |
| "vmin": float(vmin_sup), | |
| "vmax": float(vmax_sup), | |
| "caption": str(cor_caption or f"{tamanho_col} (superfície)"), | |
| } | |
| if color_index is not None: | |
| colormap_kwargs["index"] = color_index | |
| colormap = cm.LinearColormap(**colormap_kwargs) | |
| legend = { | |
| "title": str(cor_caption or f"{tamanho_col} (superfície)"), | |
| "vmin": float(vmin_sup), | |
| "vmax": float(vmax_sup), | |
| "colors": colors, | |
| } | |
| if ( | |
| isinstance(cor_tick_values, list) | |
| and isinstance(cor_tick_labels, list) | |
| and len(cor_tick_values) == len(cor_tick_labels) | |
| and len(cor_tick_values) > 1 | |
| ): | |
| try: | |
| legend["tick_values"] = [float(item) for item in cor_tick_values] | |
| legend["tick_labels"] = [str(item) for item in cor_tick_labels] | |
| except (TypeError, ValueError): | |
| pass | |
| centros_lon = (grid_lon[:-1] + grid_lon[1:]) / 2.0 | |
| centros_lat = (grid_lat[:-1] + grid_lat[1:]) / 2.0 | |
| centro_mesh_lon, centro_mesh_lat = np.meshgrid(centros_lon, centros_lat) | |
| mascara_centros = _mascara_dentro_poligono(centro_mesh_lon, centro_mesh_lat, contorno) | |
| valores_celula = ( | |
| superficie[:-1, :-1] | |
| + superficie[1:, :-1] | |
| + superficie[:-1, 1:] | |
| + superficie[1:, 1:] | |
| ) / 4.0 | |
| shapes = [] | |
| for i in range(valores_celula.shape[0]): | |
| for j in range(valores_celula.shape[1]): | |
| if not mascara_centros[i, j]: | |
| continue | |
| valor = valores_celula[i, j] | |
| if not np.isfinite(valor): | |
| continue | |
| cor = str(colormap(float(valor))) | |
| valor_fmt = f"{float(valor):.3f}".replace(".", ",") | |
| shapes.append( | |
| { | |
| "type": "polygon", | |
| "coords": [ | |
| [float(grid_lat[i]), float(grid_lon[j])], | |
| [float(grid_lat[i]), float(grid_lon[j + 1])], | |
| [float(grid_lat[i + 1]), float(grid_lon[j + 1])], | |
| [float(grid_lat[i + 1]), float(grid_lon[j])], | |
| ], | |
| "color": cor, | |
| "weight": 0, | |
| "fill": True, | |
| "fill_color": cor, | |
| "fill_opacity": 0.6, | |
| "tooltip_html": ( | |
| f"<div style='font-family:\"Segoe UI\",Arial,sans-serif; font-size:13px;'>" | |
| f"{str(tamanho_col)} interpolado: <b>{valor_fmt}</b>" | |
| "</div>" | |
| ), | |
| } | |
| ) | |
| if shapes: | |
| overlay_layers.append( | |
| { | |
| "id": "superficie_continua", | |
| "label": "Superfície contínua", | |
| "show": True, | |
| "shapes": shapes, | |
| } | |
| ) | |
| else: | |
| market_points: list[dict[str, Any]] = [] | |
| indices_markers: list[dict[str, Any]] = [] | |
| grupos_coord = df_plot_pontos.groupby( | |
| [df_plot_pontos[lat_real].round(7), df_plot_pontos[lon_real].round(7)], | |
| sort=False, | |
| ).indices | |
| for marker_ordem, posicoes_raw in enumerate(grupos_coord.values()): | |
| posicoes = list(posicoes_raw) | |
| rows_grupo = [df_plot_pontos.iloc[int(pos)] for pos in posicoes] | |
| row = rows_grupo[0] | |
| idx = row.name | |
| registros_grupo: list[dict[str, Any]] = [] | |
| indices_display: list[Any] = [] | |
| valores_tooltip: list[str] = [] | |
| cores_grupo: list[str] = [] | |
| raios_grupo: list[float] = [] | |
| destaque_grupo = False | |
| for pos in posicoes: | |
| row_item = df_plot_pontos.iloc[int(pos)] | |
| idx_item = row_item.name | |
| idx_display_item = int(idx_item) if isinstance(idx_item, (int, np.integer)) else int(pos) + 1 | |
| indices_display.append(idx_display_item) | |
| valor_texto = "" | |
| if tamanho_col and tamanho_col in df_plot_pontos.columns: | |
| valor_texto = _formatar_valor_resumo_mapa(str(tamanho_col), row_item[tamanho_col]) | |
| valores_tooltip.append(valor_texto) | |
| cor_item = COR_PRINCIPAL | |
| if colormap and cor_col_resolvida and cor_col_resolvida in df_mapa.columns: | |
| valor_cor = pd.to_numeric(pd.Series([row_item.get(cor_col_resolvida)]), errors="coerce").iloc[0] | |
| if pd.notna(valor_cor): | |
| cor_item = str(colormap(float(valor_cor))) | |
| cores_grupo.append(cor_item) | |
| if idx_item == indice_destacado: | |
| raio_item = raio_max + 4.0 | |
| destaque_grupo = True | |
| elif tamanho_func and tamanho_col and tamanho_col in row_item.index and pd.notna(row_item[tamanho_col]): | |
| raio_item = float(tamanho_func(float(row_item[tamanho_col]))) | |
| else: | |
| raio_item = 4.0 | |
| raios_grupo.append(float(max(1.0, raio_item))) | |
| row_id_raw_item = row_item[row_id_col] if popup_source and row_id_col in row_item.index else None | |
| popup_request_item = None | |
| if row_id_raw_item is not None and pd.notna(row_id_raw_item): | |
| popup_request_item = { | |
| "kind": "elaboracao_row", | |
| "row_id": int(row_id_raw_item), | |
| "source": str(popup_source), | |
| } | |
| registros_grupo.append( | |
| { | |
| "indice": idx_display_item, | |
| "label": f"Índice {idx_display_item}", | |
| "value_label": str(tamanho_col or ""), | |
| "value": valor_texto, | |
| "popup_request": popup_request_item, | |
| } | |
| ) | |
| cor = cores_grupo[0] if cores_grupo else COR_PRINCIPAL | |
| raio = max(raios_grupo) if raios_grupo else 4.0 | |
| stroke_weight = 3.0 if destaque_grupo else 1.0 | |
| fill_opacity = 0.8 if destaque_grupo else 0.6 | |
| is_grouped = len(rows_grupo) > 1 | |
| idx_display = indices_display[0] if indices_display else marker_ordem + 1 | |
| tooltip_html = _tooltip_html_grupo_mercado( | |
| indices_display, | |
| label=str(tamanho_col or "") if tamanho_col else None, | |
| valores=valores_tooltip, | |
| ) if is_grouped else ( | |
| "<div style='font-family:\"Segoe UI\",Arial,sans-serif; font-size:14px; line-height:1.7; padding:2px 4px;'>" | |
| f"<b>Índice {idx_display}</b>" | |
| + ( | |
| f"<br><span style='color:#555;'>{escape(str(tamanho_col))}:</span> <b>{escape(valores_tooltip[0])}</b>" | |
| if tamanho_col and valores_tooltip | |
| else "" | |
| ) | |
| + "</div>" | |
| ) | |
| point_payload = { | |
| "lat": float(row[lat_plot_col]), | |
| "lon": float(row[lon_plot_col]), | |
| "indice": _formatar_indices_badge(indices_display) if is_grouped else idx_display, | |
| "color": cor, | |
| "base_radius": float(max(1.0, raio)), | |
| "stroke_color": "#243746" if is_grouped else "#000000", | |
| "stroke_weight": float(max(stroke_weight, 2.0) if is_grouped else stroke_weight), | |
| "fill_opacity": float(0.74 if is_grouped else fill_opacity), | |
| "tooltip_html": tooltip_html, | |
| } | |
| if is_grouped: | |
| point_payload.update( | |
| { | |
| "grouped": True, | |
| "count": len(rows_grupo), | |
| "group_title": f"{len(rows_grupo)} dados neste local", | |
| "group_items": registros_grupo, | |
| } | |
| ) | |
| else: | |
| point_payload["popup_request"] = registros_grupo[0].get("popup_request") if registros_grupo else None | |
| market_points.append(point_payload) | |
| if mostrar_indices: | |
| indices_markers.append( | |
| { | |
| "lat": float(row[lat_plot_col]), | |
| "lon": float(row[lon_plot_col]), | |
| "marker_html": ( | |
| '<div style="transform: translate(10px, -14px);display:inline-block;background: rgba(255, 255, 255, 0.9);' | |
| + 'border: 1px solid rgba(28, 45, 66, 0.45);border-radius: 10px;padding: 1px 6px;font-size: 11px;' | |
| + 'font-weight: 700;line-height: 1.2;color: #1f2f44;white-space: nowrap;box-shadow: 0 1px 2px rgba(0, 0, 0, 0.18);' | |
| + 'pointer-events: none;">' | |
| + escape(_formatar_indices_badge(indices_display)) | |
| + "</div>" | |
| ), | |
| "icon_size": [96 if is_grouped else 72, 24], | |
| "icon_anchor": [0, 0], | |
| "class_name": "mesa-indice-label", | |
| "interactive": False, | |
| "keyboard": False, | |
| } | |
| ) | |
| overlay_layers.append( | |
| { | |
| "id": "mercado", | |
| "label": "Mercado", | |
| "show": True, | |
| "points": market_points, | |
| } | |
| ) | |
| if mostrar_indices and indices_markers: | |
| overlay_layers.append( | |
| { | |
| "id": "indices", | |
| "label": "Índices", | |
| "show": False, | |
| "markers": indices_markers, | |
| } | |
| ) | |
| notice = None | |
| if houve_amostragem: | |
| notice = { | |
| "message": f"Exibindo {len(df_mapa)} de {total_pontos} pontos para melhor desempenho.", | |
| "position": "topright", | |
| } | |
| return build_leaflet_payload( | |
| bounds=_resolver_bounds(df_mapa, str(lat_real), str(lon_real)), | |
| center=[centro_lat, centro_lon], | |
| legend=legend, | |
| notice=notice, | |
| overlay_layers=overlay_layers, | |
| show_bairros=True, | |
| bairros_geojson_url=bairros_geojson_url, | |
| ) | |
| def build_visualizacao_map_payload( | |
| df: pd.DataFrame, | |
| *, | |
| cor_col: str | None = None, | |
| tamanho_col: str | None = None, | |
| col_y: str | None = None, | |
| avaliandos_tecnicos: list[dict[str, Any]] | None = None, | |
| bairros_geojson_url: str = "/api/visualizacao/map/bairros.geojson", | |
| ) -> dict[str, Any] | None: | |
| lat_real = _detectar_coluna(df, _LAT_ALIASES) | |
| lon_real = _detectar_coluna(df, _LON_ALIASES) | |
| if lat_real is None or lon_real is None: | |
| return None | |
| df_mapa = df.copy() | |
| lat_key = "__mesa_lat__" | |
| lon_key = "__mesa_lon__" | |
| lat_serie = _primeira_serie_por_nome(df_mapa, lat_real) | |
| lon_serie = _primeira_serie_por_nome(df_mapa, lon_real) | |
| if lat_serie is None or lon_serie is None: | |
| return None | |
| df_mapa[lat_key] = pd.to_numeric(lat_serie, errors="coerce") | |
| df_mapa[lon_key] = pd.to_numeric(lon_serie, errors="coerce") | |
| df_mapa = df_mapa.dropna(subset=[lat_key, lon_key]) | |
| df_mapa = df_mapa[ | |
| (df_mapa[lat_key] >= -90.0) | |
| & (df_mapa[lat_key] <= 90.0) | |
| & (df_mapa[lon_key] >= -180.0) | |
| & (df_mapa[lon_key] <= 180.0) | |
| ].copy() | |
| if df_mapa.empty: | |
| return None | |
| centro_lat = float(df_mapa[lat_key].median()) | |
| centro_lon = float(df_mapa[lon_key].median()) | |
| cor_col_resolvida = cor_col | |
| if tamanho_col and tamanho_col != "Visualização Padrão" and not cor_col_resolvida: | |
| cor_col_resolvida = tamanho_col | |
| colormap = None | |
| cor_key = None | |
| legend = None | |
| if cor_col_resolvida and cor_col_resolvida in df_mapa.columns: | |
| serie_cor = _primeira_serie_por_nome(df_mapa, cor_col_resolvida) | |
| if serie_cor is not None: | |
| cor_key = "__mesa_cor__" | |
| df_mapa[cor_key] = pd.to_numeric(serie_cor, errors="coerce") | |
| vmin = df_mapa[cor_key].min() | |
| vmax = df_mapa[cor_key].max() | |
| if pd.notna(vmin) and pd.notna(vmax): | |
| colormap = cm.LinearColormap( | |
| colors=["#2ecc71", "#a8e06c", "#f1c40f", "#e67e22", "#e74c3c"], | |
| vmin=vmin, | |
| vmax=vmax, | |
| caption=cor_col_resolvida, | |
| ) | |
| legend = { | |
| "title": str(cor_col_resolvida), | |
| "vmin": float(vmin), | |
| "vmax": float(vmax), | |
| "colors": ["#2ecc71", "#a8e06c", "#f1c40f", "#e67e22", "#e74c3c"], | |
| } | |
| raio_min, raio_max = 3.0, 18.0 | |
| tamanho_func = None | |
| tamanho_key = None | |
| if tamanho_col and tamanho_col != "Visualização Padrão" and tamanho_col in df_mapa.columns: | |
| serie_tamanho = _primeira_serie_por_nome(df_mapa, tamanho_col) | |
| if serie_tamanho is not None: | |
| tamanho_key = "__mesa_tamanho__" | |
| df_mapa[tamanho_key] = pd.to_numeric(serie_tamanho, errors="coerce") | |
| t_min = df_mapa[tamanho_key].min() | |
| t_max = df_mapa[tamanho_key].max() | |
| if pd.notna(t_min) and pd.notna(t_max): | |
| if t_max > t_min: | |
| tamanho_func = ( | |
| lambda v, _min=t_min, _max=t_max: raio_min | |
| + (v - _min) / (_max - _min) * (raio_max - raio_min) | |
| ) | |
| else: | |
| tamanho_func = lambda v: (raio_min + raio_max) / 2.0 | |
| show_indices = False | |
| lat_plot_key = "__mesa_lat_plot__" | |
| lon_plot_key = "__mesa_lon_plot__" | |
| df_plot_pontos = df_mapa.copy() | |
| df_plot_pontos[lat_plot_key] = df_plot_pontos[lat_key] | |
| df_plot_pontos[lon_plot_key] = df_plot_pontos[lon_key] | |
| tooltip_col = None | |
| tooltip_key = None | |
| if tamanho_key: | |
| tooltip_col = tamanho_col | |
| tooltip_key = tamanho_key | |
| elif col_y and col_y in df_mapa.columns: | |
| serie_tooltip = _primeira_serie_por_nome(df_mapa, col_y) | |
| if serie_tooltip is not None: | |
| tooltip_col = col_y | |
| tooltip_key = "__mesa_tooltip__" | |
| df_mapa[tooltip_key] = serie_tooltip | |
| df_plot_pontos[tooltip_key] = df_mapa.loc[df_plot_pontos.index, tooltip_key] | |
| total_pontos_plot = len(df_plot_pontos) | |
| raio_padrao = 4.0 if total_pontos_plot <= 2500 else 3.0 | |
| market_points: list[dict[str, Any]] = [] | |
| grupos_coord = df_plot_pontos.groupby( | |
| [df_plot_pontos[lat_key].round(7), df_plot_pontos[lon_key].round(7)], | |
| sort=False, | |
| ).indices | |
| for marker_ordem, posicoes_raw in enumerate(grupos_coord.values()): | |
| posicoes = list(posicoes_raw) | |
| rows_grupo = [df_plot_pontos.iloc[int(pos)] for pos in posicoes] | |
| row = rows_grupo[0] | |
| indices_display: list[Any] = [] | |
| registros_grupo: list[dict[str, Any]] = [] | |
| valores_tooltip: list[str] = [] | |
| cores_grupo: list[str] = [] | |
| raios_grupo: list[float] = [] | |
| for pos in posicoes: | |
| row_item = df_plot_pontos.iloc[int(pos)] | |
| idx_item = row_item.name | |
| idx_display_item = int(row_item["index"]) if "index" in row_item.index else int(idx_item) | |
| indices_display.append(idx_display_item) | |
| valor_texto = ( | |
| _formatar_tooltip_valor(str(tooltip_col or ""), row_item[tooltip_key]) | |
| if tooltip_col and tooltip_key and tooltip_key in row_item.index | |
| else "" | |
| ) | |
| if valor_texto: | |
| valores_tooltip.append(valor_texto) | |
| cor_item = colormap(row_item[cor_key]) if colormap and cor_key and pd.notna(row_item[cor_key]) else COR_PRINCIPAL | |
| cores_grupo.append(str(cor_item)) | |
| if tamanho_func and tamanho_key and pd.notna(row_item[tamanho_key]): | |
| raio_item = float(tamanho_func(row_item[tamanho_key])) | |
| else: | |
| raio_item = raio_padrao | |
| raios_grupo.append(float(max(1.0, raio_item))) | |
| row_id_raw_item = row_item["__mesa_row_id__"] if "__mesa_row_id__" in row_item.index else None | |
| row_id_item = int(row_id_raw_item) if row_id_raw_item is not None and pd.notna(row_id_raw_item) else None | |
| registros_grupo.append( | |
| { | |
| "indice": idx_display_item, | |
| "label": f"Índice {idx_display_item}", | |
| "value_label": str(tooltip_col or ""), | |
| "value": valor_texto, | |
| "popup_request": ( | |
| {"kind": "visualizacao_row", "row_id": row_id_item} | |
| if row_id_item is not None | |
| else None | |
| ), | |
| } | |
| ) | |
| is_grouped = len(rows_grupo) > 1 | |
| idx_display = indices_display[0] if indices_display else marker_ordem | |
| cor = cores_grupo[0] if cores_grupo else COR_PRINCIPAL | |
| raio = max(raios_grupo) if raios_grupo else raio_padrao | |
| tooltip_payload = { | |
| "title": f"Índice {idx_display}", | |
| "label": str(tooltip_col or ""), | |
| "value": valores_tooltip[0] if valores_tooltip else "", | |
| } | |
| row_id_raw = row["__mesa_row_id__"] if "__mesa_row_id__" in row.index else None | |
| point_payload = { | |
| "lat": float(row[lat_plot_key]), | |
| "lon": float(row[lon_plot_key]), | |
| "indice": _formatar_indices_badge(indices_display) if is_grouped else idx_display, | |
| "row_id": int(row_id_raw) if (not is_grouped and row_id_raw is not None and pd.notna(row_id_raw)) else None, | |
| "color": str(cor), | |
| "base_radius": float(max(1.0, raio)), | |
| } | |
| if is_grouped: | |
| point_payload.update( | |
| { | |
| "grouped": True, | |
| "count": len(rows_grupo), | |
| "stroke_color": "#243746", | |
| "stroke_weight": 2.0, | |
| "fill_opacity": 0.74, | |
| "tooltip_html": _tooltip_html_grupo_mercado( | |
| indices_display, | |
| label=str(tooltip_col or "") if tooltip_col else None, | |
| valores=valores_tooltip, | |
| ), | |
| "group_title": f"{len(rows_grupo)} dados neste local", | |
| "group_items": registros_grupo, | |
| } | |
| ) | |
| else: | |
| point_payload["tooltip"] = tooltip_payload | |
| market_points.append(point_payload) | |
| return { | |
| "type": "mesa_leaflet_payload", | |
| "version": 1, | |
| "center": [centro_lat, centro_lon], | |
| "bounds": _resolver_bounds(df_mapa, lat_key, lon_key), | |
| "tile_layers": _TILE_LAYERS, | |
| "controls": { | |
| "fullscreen": True, | |
| "measure": True, | |
| "layer_control": True, | |
| }, | |
| "radius_behavior": { | |
| "min_radius": 1.6, | |
| "max_radius": 52.0, | |
| "reference_zoom": 12.0, | |
| "growth_factor": 0.20, | |
| }, | |
| "show_indices": show_indices, | |
| "show_bairros": True, | |
| "bairros_geojson_url": bairros_geojson_url, | |
| "legend": legend, | |
| "market_points": market_points, | |
| "trabalhos_tecnicos_points": build_trabalhos_tecnicos_marker_payloads(avaliandos_tecnicos), | |
| } | |