from __future__ import annotations from pathlib import Path import matplotlib import numpy as np from matplotlib import colors matplotlib.use("Agg") import matplotlib.pyplot as plt DEFAULT_PANEL_SIZE_MULTIPLIER = 1.5 DEFAULT_BILLBOARD_ALPHA = 1.0 DEFAULT_PANEL_SCALE = 0.12 DEFAULT_DAY_STEP = 1 DEFAULT_DIVERGING_CMAP = colors.LinearSegmentedColormap.from_list( "zone_load_diverging", ["#82B0D2", "#FFFFFF", "#FA7F6F"], N=256, ) # Keep color mapping consistent with cuger/__analyse/visualise.py. TYPE_COLORS = { "window": "#FFBE7A", "shading": "#999999", "floor": "#82B0D2", "wall": "#8ECFC9", "airwall": "#E7DAD2", "space": "#FA7F6F", "void": "#FFFFFF", None: "#FFFFFF", } def _axis_limits_from_points(points: np.ndarray) -> tuple[tuple[float, float], tuple[float, float], tuple[float, float]]: x_min, x_max = float(np.min(points[:, 0])), float(np.max(points[:, 0])) y_min, y_max = float(np.min(points[:, 1])), float(np.max(points[:, 1])) z_min, z_max = float(np.min(points[:, 2])), float(np.max(points[:, 2])) max_range = max(x_max - x_min, y_max - y_min, z_max - z_min) / 2.0 max_range = max(max_range, 1e-6) * 1.08 x_mid = (x_max + x_min) / 2.0 y_mid = (y_max + y_min) / 2.0 z_mid = (z_max + z_min) / 2.0 return ( (x_mid - max_range, x_mid + max_range), (y_mid - max_range, y_mid + max_range), (z_mid - max_range, z_mid + max_range), ) def _first_existing(data: dict[str, np.ndarray], keys: list[str]) -> np.ndarray | None: for k in keys: if k in data: return np.asarray(data[k]) return None def _as_2d_points(arr: np.ndarray | None) -> np.ndarray: if arr is None: return np.zeros((0, 3), dtype=float) pts = np.asarray(arr, dtype=float) if pts.ndim != 2 or pts.shape[1] < 3: return np.zeros((0, 3), dtype=float) return pts[:, :3] def _camera_basis(elev: float, azim: float) -> tuple[np.ndarray, np.ndarray]: az = np.deg2rad(float(azim)) el = np.deg2rad(float(elev)) forward = np.array( [ np.cos(el) * np.cos(az), np.cos(el) * np.sin(az), np.sin(el), ], dtype=float, ) forward_norm = np.linalg.norm(forward) if forward_norm < 1e-9: forward = np.array([1.0, 0.0, 0.0], dtype=float) else: forward = forward / forward_norm world_up = np.array([0.0, 0.0, 1.0], dtype=float) right = np.cross(forward, world_up) right_norm = np.linalg.norm(right) if right_norm < 1e-9: right = np.array([1.0, 0.0, 0.0], dtype=float) else: right = right / right_norm up = np.cross(right, forward) up_norm = np.linalg.norm(up) if up_norm < 1e-9: up = np.array([0.0, 1.0, 0.0], dtype=float) else: up = up / up_norm return right, up def _isometric_panel_basis(elev: float, azim: float) -> tuple[np.ndarray, np.ndarray]: cam_right, cam_up = _camera_basis(elev=elev, azim=azim) c30 = np.cos(np.deg2rad(30.0)) s30 = np.sin(np.deg2rad(30.0)) u = c30 * cam_right + s30 * cam_up v = -c30 * cam_right + s30 * cam_up u = u / max(np.linalg.norm(u), 1e-9) v = v / max(np.linalg.norm(v), 1e-9) return u, v def _to_hourly_zone(values: np.ndarray) -> np.ndarray: arr = np.asarray(values) if arr.ndim != 2: raise ValueError(f"`values` must be 2D, got shape={arr.shape}") if arr.shape[0] == 8760: return arr.astype(np.float32) if arr.shape[1] == 8760: return arr.T.astype(np.float32) raise ValueError(f"Neither axis equals 8760, shape={arr.shape}") def _try_parse_space_indices(raw_values: np.ndarray | None, zone_count: int, space_count: int) -> np.ndarray | None: if raw_values is None: return None vals = np.asarray(raw_values).reshape(-1) if vals.size < zone_count: return None out: list[int] = [] for i in range(zone_count): v = vals[i] idx: int | None = None if isinstance(v, (int, np.integer)): idx = int(v) elif isinstance(v, (float, np.floating)): vf = float(v) if np.isfinite(vf) and float(vf).is_integer(): idx = int(vf) else: text = v.decode("utf-8", errors="ignore") if isinstance(v, (bytes, np.bytes_)) else str(v) text = text.strip() try: idx = int(text) except ValueError: return None if idx is None or not (0 <= idx < space_count): return None out.append(idx) return np.asarray(out, dtype=np.int64) def _zone_day_hour_matrix(zone_hourly: np.ndarray, day_step: int = 1) -> np.ndarray: series = np.asarray(zone_hourly, dtype=np.float32).reshape(-1) if series.size < 8760: raise ValueError(f"Zone series length must be >= 8760, got {series.size}") day_hour = series[:8760].reshape(365, 24) step = max(1, int(day_step)) if step == 1: return day_hour rows: list[np.ndarray] = [] for s in range(0, 365, step): e = min(365, s + step) rows.append(np.mean(day_hour[s:e, :], axis=0)) return np.asarray(rows, dtype=np.float32) def _decode_types(type_arr: np.ndarray | None, expected_count: int, fallback: str) -> list[str]: if type_arr is None: return [fallback] * expected_count raw = np.asarray(type_arr).reshape(-1) out: list[str] = [] for val in raw[:expected_count]: if isinstance(val, (bytes, np.bytes_)): text = val.decode("utf-8", errors="ignore").strip().lower() else: text = str(val).strip().lower() if text in TYPE_COLORS: out.append(text) continue # Numeric fallback for compact encodings. try: num = float(text) if np.isfinite(num): if num <= 0: out.append("wall") else: out.append("window") continue except ValueError: pass out.append(fallback) if len(out) < expected_count: out.extend([fallback] * (expected_count - len(out))) return out def _face_colors_from_binary_t(face_feats: np.ndarray | None, face_count: int) -> list[str] | None: """Use binary t from face_feats last column when available. Rule requested by user: t == 0 -> yellow(window color). """ if face_feats is None: return None feats = np.asarray(face_feats, dtype=float) if feats.ndim != 2 or feats.shape[1] < 1 or feats.shape[0] < face_count: return None t_col = np.rint(feats[:face_count, -1]).astype(np.int32) if not np.all(np.isin(t_col, [0, 1])): return None out: list[str] = [] for t_val in t_col: if t_val == 0: out.append(TYPE_COLORS["window"]) else: out.append(TYPE_COLORS["wall"]) return out def _plot_edges(ax, starts: np.ndarray, ends: np.ndarray, color: str, linewidth: float, linestyle: str, alpha: float) -> None: for p0, p1 in zip(starts, ends): ax.plot( [p0[0], p1[0]], [p0[1], p1[1]], [p0[2], p1[2]], color=color, linewidth=linewidth, linestyle=linestyle, alpha=alpha, ) def _infer_space_count(graph_data: dict[str, np.ndarray], sf_edges: np.ndarray | None) -> int: valid_spaces = graph_data.get("valid_energy_spaces") if valid_spaces is not None: size = int(np.asarray(valid_spaces).reshape(-1).size) if size > 0: return size space_feats = _first_existing(graph_data, ["space_feats", "space_c", "space_centers"]) if space_feats is not None: feats = np.asarray(space_feats) if feats.ndim >= 1 and feats.shape[0] > 0: return int(feats.shape[0]) if sf_edges is not None: edges = np.asarray(sf_edges, dtype=np.int64) if edges.ndim == 2 and edges.shape[1] >= 2 and edges.shape[0] > 0: c0_max = int(np.max(edges[:, 0])) c1_max = int(np.max(edges[:, 1])) return max(c0_max, c1_max) + 1 return 0 def _extract_face_space_pairs(sf_edges: np.ndarray | None, n_faces: int, n_spaces: int) -> list[tuple[int, int]]: if sf_edges is None or n_faces <= 0 or n_spaces <= 0: return [] edges = np.asarray(sf_edges, dtype=np.int64) if edges.ndim != 2 or edges.shape[1] < 2: return [] # PACK building npz uses [face_idx, space_idx] in sf_edges. c0_max = int(np.max(edges[:, 0])) if edges.shape[0] > 0 else -1 c1_max = int(np.max(edges[:, 1])) if edges.shape[0] > 0 else -1 face_space_ok = c0_max < n_faces and c1_max < n_spaces space_face_ok = c1_max < n_faces and c0_max < n_spaces use_face_space = True if face_space_ok and not space_face_ok: use_face_space = True elif space_face_ok and not face_space_ok: use_face_space = False pairs: list[tuple[int, int]] = [] for e in edges: a, b = int(e[0]), int(e[1]) if use_face_space: f_idx, s_idx = a, b else: f_idx, s_idx = b, a if 0 <= f_idx < n_faces and 0 <= s_idx < n_spaces: pairs.append((f_idx, s_idx)) return pairs def _infer_space_centers_from_edges_indexed(face_centers: np.ndarray, sf_edges: np.ndarray | None, n_spaces: int) -> np.ndarray: if len(face_centers) == 0 or n_spaces <= 0: return np.zeros((0, 3), dtype=float) pairs = _extract_face_space_pairs(sf_edges, n_faces=len(face_centers), n_spaces=n_spaces) if not pairs: return np.full((n_spaces, 3), np.nan, dtype=float) buckets: list[list[np.ndarray]] = [[] for _ in range(n_spaces)] for f_idx, s_idx in pairs: buckets[s_idx].append(face_centers[f_idx]) centers = np.full((n_spaces, 3), np.nan, dtype=float) for s_idx, pts in enumerate(buckets): if pts: centers[s_idx] = np.mean(np.asarray(pts, dtype=float), axis=0) return centers def _resolve_space_layout( face_centers: np.ndarray, graph_data: dict[str, np.ndarray], ) -> tuple[np.ndarray, list[tuple[int, int]], int, dict[int, int]]: sf_edges_raw = _first_existing(graph_data, ["sf_edges", "face_space_edges"]) sf_edges = np.asarray(sf_edges_raw, dtype=np.int64) if sf_edges_raw is not None else np.zeros((0, 2), dtype=np.int64) explicit_space_centers = _as_2d_points(_first_existing(graph_data, ["space_c", "space_centers"])) if len(explicit_space_centers) > 0: raw_space_count = int(explicit_space_centers.shape[0]) raw_to_compact = {int(i): int(i) for i in range(raw_space_count)} space_centers = explicit_space_centers else: raw_space_count = _infer_space_count(graph_data, sf_edges) raw_space_centers = _infer_space_centers_from_edges_indexed(face_centers, sf_edges, n_spaces=raw_space_count) if len(raw_space_centers) == 0: return np.zeros((0, 3), dtype=float), [], raw_space_count, {} valid_mask = np.isfinite(raw_space_centers).all(axis=1) valid_raw_idx = np.where(valid_mask)[0].astype(np.int64) raw_to_compact = {int(raw_idx): int(compact_idx) for compact_idx, raw_idx in enumerate(valid_raw_idx.tolist())} space_centers = raw_space_centers[valid_mask] pairs_raw = _extract_face_space_pairs(sf_edges, n_faces=len(face_centers), n_spaces=raw_space_count) pairs_compact = [(f_idx, raw_to_compact[s_idx]) for f_idx, s_idx in pairs_raw if s_idx in raw_to_compact] return space_centers, pairs_compact, raw_space_count, raw_to_compact def _plot_generic_graph(ax, graph_data: dict[str, np.ndarray]) -> bool: centers = _first_existing(graph_data, ["c", "center", "centers", "node_c", "node_centers", "face_c"]) points = _as_2d_points(centers) if len(points) == 0: return False type_arr = _first_existing(graph_data, ["t", "type", "node_t", "node_type", "types"]) node_types = _decode_types(type_arr, expected_count=len(points), fallback="wall") node_colors = [TYPE_COLORS.get(t, "#8ECFC9") for t in node_types] ax.scatter(points[:, 0], points[:, 1], points[:, 2], c=node_colors, s=30, edgecolors="k", alpha=0.95) edge_arr = _first_existing(graph_data, ["edges", "edge_index", "ff_edges"]) if edge_arr is not None: edges = np.asarray(edge_arr, dtype=np.int64) if edges.ndim == 2 and edges.shape[1] >= 2: valid = (edges[:, 0] >= 0) & (edges[:, 1] >= 0) & (edges[:, 0] < len(points)) & (edges[:, 1] < len(points)) valid_edges = edges[valid] if len(valid_edges) > 0: starts = points[valid_edges[:, 0]] ends = points[valid_edges[:, 1]] _plot_edges(ax, starts, ends, color="#666666", linewidth=1.0, linestyle="-", alpha=0.45) x_lim, y_lim, z_lim = _axis_limits_from_points(points) ax.set_xlim(*x_lim) ax.set_ylim(*y_lim) ax.set_zlim(*z_lim) return True def _plot_pack_graph(ax, graph_data: dict[str, np.ndarray], geometry_npz: Path | None) -> bool: face_centers = _as_2d_points(_first_existing(graph_data, ["face_c"])) if len(face_centers) == 0 and geometry_npz is not None: with np.load(geometry_npz, allow_pickle=True) as g_data: if "face_c" in g_data: face_centers = _as_2d_points(np.asarray(g_data["face_c"])) space_centers, pairs_compact, _, _ = _resolve_space_layout(face_centers, graph_data) if len(face_centers) == 0 and len(space_centers) == 0: return False face_types = _decode_types(_first_existing(graph_data, ["face_t", "face_type", "t", "type"]), len(face_centers), "wall") face_colors = [TYPE_COLORS.get(t, "#8ECFC9") for t in face_types] binary_colors = _face_colors_from_binary_t(_first_existing(graph_data, ["face_feats"]), len(face_centers)) if binary_colors is not None: face_colors = binary_colors if len(face_centers) > 0: ax.scatter(face_centers[:, 0], face_centers[:, 1], face_centers[:, 2], c=face_colors, s=28, marker="D", edgecolors="k", alpha=0.9) if len(space_centers) > 0: ax.scatter(space_centers[:, 0], space_centers[:, 1], space_centers[:, 2], c=TYPE_COLORS["space"], s=70, edgecolors="k", alpha=0.95) ff_edges = _first_existing(graph_data, ["ff_edges", "face_edges"]) if ff_edges is not None and len(face_centers) > 0: edges = np.asarray(ff_edges, dtype=np.int64) if edges.ndim == 2 and edges.shape[1] >= 2: valid = (edges[:, 0] >= 0) & (edges[:, 1] >= 0) & (edges[:, 0] < len(face_centers)) & (edges[:, 1] < len(face_centers)) edge_ok = edges[valid] if len(edge_ok) > 0: _plot_edges( ax, face_centers[edge_ok[:, 0]], face_centers[edge_ok[:, 1]], color="#999999", linewidth=1.2, linestyle="-", alpha=0.35, ) if len(face_centers) > 0 and len(space_centers) > 0 and pairs_compact: starts = np.asarray([face_centers[f_idx] for f_idx, _ in pairs_compact], dtype=float) ends = np.asarray([space_centers[s_idx] for _, s_idx in pairs_compact], dtype=float) _plot_edges( ax, starts, ends, color="#555555", linewidth=1.8, linestyle="--", alpha=0.9, ) all_points = face_centers if len(space_centers) == 0 else np.vstack([face_centers, space_centers]) x_lim, y_lim, z_lim = _axis_limits_from_points(all_points) ax.set_xlim(*x_lim) ax.set_ylim(*y_lim) ax.set_zlim(*z_lim) return True def _plot_pack_graph_overlay( ax, graph_data: dict[str, np.ndarray], energy_data: dict[str, np.ndarray], geometry_npz: Path | None, *, elev: float, azim: float, panel_scale: float, day_step: int, ) -> bool: face_centers = _as_2d_points(_first_existing(graph_data, ["face_c"])) if len(face_centers) == 0 and geometry_npz is not None: with np.load(geometry_npz, allow_pickle=True) as g_data: if "face_c" in g_data: face_centers = _as_2d_points(np.asarray(g_data["face_c"])) if len(face_centers) == 0: return False if "values" not in energy_data: return False space_centers, pairs_compact, raw_space_count, raw_to_compact = _resolve_space_layout(face_centers, graph_data) if len(space_centers) == 0: return False hourly_zone = _to_hourly_zone(np.asarray(energy_data["values"], dtype=np.float32)) zone_count = int(hourly_zone.shape[1]) valid_energy_spaces_raw = graph_data.get("valid_energy_spaces") valid_energy_spaces = _try_parse_space_indices(valid_energy_spaces_raw, zone_count=zone_count, space_count=raw_space_count) zone_to_space: list[tuple[int, int]] = [] if valid_energy_spaces is not None and valid_energy_spaces.size >= zone_count: for z_idx in range(zone_count): raw_s_idx = int(valid_energy_spaces[z_idx]) compact_s_idx = raw_to_compact.get(raw_s_idx) if compact_s_idx is not None: zone_to_space.append((z_idx, compact_s_idx)) else: for z_idx in range(min(zone_count, raw_space_count)): compact_s_idx = raw_to_compact.get(z_idx) if compact_s_idx is not None: zone_to_space.append((z_idx, compact_s_idx)) if not zone_to_space: fallback_n = min(zone_count, len(space_centers)) zone_to_space = [(z_idx, z_idx) for z_idx in range(fallback_n)] if not zone_to_space: return False if hasattr(ax, "computed_zorder"): ax.computed_zorder = True ax.scatter(face_centers[:, 0], face_centers[:, 1], face_centers[:, 2], c="#A8A8A8", s=8, alpha=0.24, zorder=2) ax.scatter(space_centers[:, 0], space_centers[:, 1], space_centers[:, 2], c="#6E6E6E", s=14, alpha=0.5, zorder=3) if pairs_compact: starts = np.asarray([space_centers[s_idx] for _, s_idx in pairs_compact], dtype=float) ends = np.asarray([face_centers[f_idx] for f_idx, _ in pairs_compact], dtype=float) _plot_edges( ax, starts, ends, color="#777777", linewidth=0.8, linestyle="--", alpha=0.3, ) all_points = np.vstack([face_centers, space_centers]) x_lim, y_lim, z_lim = _axis_limits_from_points(all_points) span = max(x_lim[1] - x_lim[0], y_lim[1] - y_lim[0], z_lim[1] - z_lim[0]) panel_w = max(span * float(panel_scale) * DEFAULT_PANEL_SIZE_MULTIPLIER, 1e-4) panel_h = panel_w * 0.75 right, up = _isometric_panel_basis(elev=elev, azim=azim) for z_idx, s_idx in zone_to_space: center = space_centers[s_idx] mat = _zone_day_hour_matrix(hourly_zone[:, z_idx], day_step=day_step) zmin = float(np.min(mat)) zmax = float(np.max(mat)) if zmin < 0.0 < zmax: zone_norm = colors.TwoSlopeNorm(vmin=zmin, vcenter=0.0, vmax=zmax) elif abs(zmax - zmin) < 1e-12: zone_norm = colors.Normalize(vmin=zmin - 1.0, vmax=zmax + 1.0) else: zone_norm = colors.Normalize(vmin=zmin, vmax=zmax) facecolors_rgba = DEFAULT_DIVERGING_CMAP(zone_norm(mat)) n_rows, n_cols = mat.shape u = np.linspace(-0.5, 0.5, n_cols, dtype=float) * panel_w v = np.linspace(-0.5, 0.5, n_rows, dtype=float) * panel_h uu, vv = np.meshgrid(u, v) x = center[0] + right[0] * uu + up[0] * vv y = center[1] + right[1] * uu + up[1] * vv z = center[2] + right[2] * uu + up[2] * vv surf = ax.plot_surface( x, y, z, facecolors=facecolors_rgba, shade=False, linewidth=0.0, antialiased=False, alpha=DEFAULT_BILLBOARD_ALPHA, ) if hasattr(surf, "set_zsort"): surf.set_zsort("average") c1 = center - 0.5 * panel_w * right - 0.5 * panel_h * up c2 = center + 0.5 * panel_w * right - 0.5 * panel_h * up c3 = center + 0.5 * panel_w * right + 0.5 * panel_h * up c4 = center - 0.5 * panel_w * right + 0.5 * panel_h * up border = np.asarray([c1, c2, c3, c4, c1], dtype=float) ax.plot( border[:, 0], border[:, 1], border[:, 2], color="#000000", linewidth=0.85, alpha=1.0, ) ax.set_xlim(*x_lim) ax.set_ylim(*y_lim) ax.set_zlim(*z_lim) return True def _looks_like_pack_graph(graph_data: dict[str, np.ndarray]) -> bool: pack_keys = {"face_c", "sf_edges", "face_space_edges", "space_c", "space_centers", "face_feats", "ff_edges"} return any(k in graph_data for k in pack_keys) def visualize_graph( graph_npz: str | Path, output_png: str | Path, *, geometry_npz: str | Path | None = None, elev: float = 35.0, azim: float = 15.0, dpi: int = 300, ) -> Path: """Render graph nodes/edges from graph npz using c/t/type conventions when available.""" graph_npz = Path(graph_npz) output_png = Path(output_png) geometry_path = Path(geometry_npz) if geometry_npz is not None else None with np.load(graph_npz, allow_pickle=True) as data: graph_data = {k: np.asarray(data[k]) for k in data.files} fig = plt.figure(figsize=(4, 4)) ax = fig.add_subplot(111, projection="3d") ax.view_init(elev=elev, azim=azim) fig.subplots_adjust(left=0, right=1, top=1, bottom=0) if _looks_like_pack_graph(graph_data): ok = _plot_pack_graph(ax, graph_data, geometry_npz=geometry_path) if not ok: ok = _plot_generic_graph(ax, graph_data) else: ok = _plot_generic_graph(ax, graph_data) if not ok: ok = _plot_pack_graph(ax, graph_data, geometry_npz=geometry_path) if not ok: keys = ", ".join(sorted(graph_data.keys())) plt.close(fig) raise ValueError(f"Cannot parse graph centers/edges from {graph_npz}; keys=[{keys}]") ax.set_box_aspect([1, 1, 1]) ax.set_axis_off() output_png.parent.mkdir(parents=True, exist_ok=True) fig.savefig(output_png, dpi=dpi, bbox_inches="tight", pad_inches=0.04, transparent=True) plt.close(fig) return output_png def visualize_graph_overlap( graph_npz: str | Path, energy_npz: str | Path, output_png: str | Path, *, geometry_npz: str | Path | None = None, elev: float = 35.0, azim: float = 15.0, panel_scale: float = DEFAULT_PANEL_SCALE, day_step: int = DEFAULT_DAY_STEP, dpi: int = 300, ) -> Path: """Render PACK graph with per-space energy billboard overlays.""" graph_npz = Path(graph_npz) energy_npz = Path(energy_npz) output_png = Path(output_png) geometry_path = Path(geometry_npz) if geometry_npz is not None else None with np.load(graph_npz, allow_pickle=True) as data: graph_data = {k: np.asarray(data[k]) for k in data.files} with np.load(energy_npz, allow_pickle=True) as data: energy_data = {k: np.asarray(data[k]) for k in data.files} fig = plt.figure(figsize=(4, 4)) ax = fig.add_subplot(111, projection="3d") ax.view_init(elev=elev, azim=azim) fig.subplots_adjust(left=0, right=1, top=1, bottom=0) ok = _plot_pack_graph_overlay( ax, graph_data, energy_data, geometry_npz=geometry_path, elev=elev, azim=azim, panel_scale=panel_scale, day_step=day_step, ) if not ok: ok = _plot_pack_graph(ax, graph_data, geometry_npz=geometry_path) if not ok: ok = _plot_generic_graph(ax, graph_data) if not ok: keys = ", ".join(sorted(graph_data.keys())) plt.close(fig) raise ValueError(f"Cannot parse graph centers/edges from {graph_npz}; keys=[{keys}]") ax.set_box_aspect([1, 1, 1]) ax.set_axis_off() output_png.parent.mkdir(parents=True, exist_ok=True) fig.savefig(output_png, dpi=dpi, bbox_inches="tight", pad_inches=0.04, transparent=True) plt.close(fig) return output_png