Spaces:
Running
Running
| from __future__ import annotations | |
| import csv | |
| import random | |
| import time | |
| from dataclasses import dataclass | |
| from functools import lru_cache | |
| from pathlib import Path | |
| from pathlib import PurePosixPath | |
| import gradio as gr | |
| from huggingface_hub import hf_hub_download | |
| from visualization.building import visualize_graph, visualize_graph_overlap | |
| from visualization.energy import visualize_energy | |
| from visualization.geometry import visualize_geometry | |
| from visualization.weather import visualize_weather | |
| DATASET_REPO_ID = "ArchEGraph/ArchEGraph-demo" | |
| OUTPUT_ROOT = Path("/tmp/archegraph_outputs") | |
| DEFAULT_ENERGY_ZONE_INDEX = 0 | |
| DEFAULT_WEATHER_WINDOW_START_HOUR = 1 | |
| DEFAULT_WEATHER_WINDOW_HOURS = 24 | |
| class SampleRecord: | |
| sample_id: str | |
| weather_id: str | |
| building_id: str | |
| energy_file: str | |
| n_steps: int | |
| n_spaces: int | |
| def _manifest_index() -> dict[str, SampleRecord]: | |
| manifest_path = hf_hub_download( | |
| repo_id=DATASET_REPO_ID, | |
| repo_type="dataset", | |
| filename="manifest.csv", | |
| ) | |
| out: dict[str, SampleRecord] = {} | |
| with Path(manifest_path).open("r", encoding="utf-8", newline="") as f: | |
| reader = csv.DictReader(f) | |
| for row in reader: | |
| sample_id = (row.get("sample_id") or "").strip() | |
| weather_id = (row.get("weather_id") or "").strip() | |
| building_id = (row.get("building_id") or "").strip() | |
| energy_file = (row.get("energy_file") or "").strip() | |
| if not sample_id or not weather_id or not energy_file: | |
| continue | |
| if not building_id: | |
| building_id = sample_id.split("__", 1)[0].lstrip("0") or "0" | |
| n_steps = int(row.get("n_steps") or 0) | |
| n_spaces = int(row.get("n_spaces") or 0) | |
| out[sample_id] = SampleRecord( | |
| sample_id=sample_id, | |
| weather_id=weather_id, | |
| building_id=building_id, | |
| energy_file=energy_file, | |
| n_steps=n_steps, | |
| n_spaces=n_spaces, | |
| ) | |
| if not out: | |
| raise RuntimeError("manifest.csv is empty or invalid.") | |
| return out | |
| def _numeric_sort_key(value: str) -> tuple[int, str]: | |
| text = (value or "").strip() | |
| if text.isdigit(): | |
| return (0, f"{int(text):09d}") | |
| return (1, text.lower()) | |
| def _weather_to_buildings() -> dict[str, list[str]]: | |
| mapping: dict[str, set[str]] = {} | |
| for rec in _manifest_index().values(): | |
| mapping.setdefault(rec.weather_id, set()).add(rec.building_id) | |
| return {k: sorted(v, key=_numeric_sort_key) for k, v in mapping.items()} | |
| def _pair_to_sample() -> dict[tuple[str, str], SampleRecord]: | |
| out: dict[tuple[str, str], SampleRecord] = {} | |
| for rec in _manifest_index().values(): | |
| out[(rec.weather_id, rec.building_id)] = rec | |
| return out | |
| def _weather_choices() -> list[str]: | |
| return sorted(_weather_to_buildings().keys(), key=lambda x: x.lower()) | |
| def _building_choices(weather_id: str) -> list[str]: | |
| return _weather_to_buildings().get((weather_id or "").strip(), []) | |
| def _default_selection() -> tuple[str, str]: | |
| weather_choices = _weather_choices() | |
| if not weather_choices: | |
| raise RuntimeError("No weather options found in manifest") | |
| weather = weather_choices[0] | |
| buildings = _building_choices(weather) | |
| if not buildings: | |
| raise RuntimeError(f"No building options for weather '{weather}'") | |
| return weather, buildings[0] | |
| def _safe_dropdown_defaults() -> tuple[list[str], str | None, list[str], str | None]: | |
| try: | |
| weather_choices = _weather_choices() | |
| if not weather_choices: | |
| return [], None, [], None | |
| weather, building = _default_selection() | |
| building_choices = _building_choices(weather) | |
| return weather_choices, weather, building_choices, building | |
| except Exception: | |
| return [], None, [], None | |
| def _resolve_record(weather_id: str, building_id: str) -> SampleRecord: | |
| weather = (weather_id or "").strip() | |
| building = (building_id or "").strip() | |
| pair_map = _pair_to_sample() | |
| rec = pair_map.get((weather, building)) | |
| if rec is not None: | |
| return rec | |
| weather_opts = _weather_choices() | |
| if not weather_opts: | |
| raise ValueError("No weather options available") | |
| if weather not in weather_opts: | |
| raise ValueError(f"Unknown weather city '{weather}'. Example values: {', '.join(weather_opts[:8])}") | |
| building_opts = _building_choices(weather) | |
| raise ValueError( | |
| f"Unknown building id '{building}' for city '{weather}'. " | |
| f"Available building ids: {', '.join(building_opts[:12])}" | |
| ) | |
| def _repo_dataset_path(*parts: str) -> str: | |
| clean_parts: list[str] = [] | |
| for part in parts: | |
| text = (part or "").replace("\\", "/").strip("/") | |
| if text: | |
| clean_parts.extend(segment for segment in text.split("/") if segment and segment != ".") | |
| return str(PurePosixPath(*clean_parts)) | |
| def _download_modalities(record: SampleRecord) -> tuple[Path, Path, Path, Path]: | |
| building_key = record.sample_id.split("__", 1)[0] | |
| energy_relpath = record.energy_file.replace("\\", "/").lstrip("/") | |
| if energy_relpath.startswith("energy/"): | |
| energy_relpath = energy_relpath[len("energy/") :] | |
| geometry_npz = hf_hub_download( | |
| repo_id=DATASET_REPO_ID, | |
| repo_type="dataset", | |
| filename=_repo_dataset_path("geometry", f"{building_key}.npz"), | |
| ) | |
| graph_npz = hf_hub_download( | |
| repo_id=DATASET_REPO_ID, | |
| repo_type="dataset", | |
| filename=_repo_dataset_path("building", f"{building_key}.npz"), | |
| ) | |
| weather_npz = hf_hub_download( | |
| repo_id=DATASET_REPO_ID, | |
| repo_type="dataset", | |
| filename=_repo_dataset_path("weather", f"{record.weather_id}.npz"), | |
| ) | |
| energy_npz = hf_hub_download( | |
| repo_id=DATASET_REPO_ID, | |
| repo_type="dataset", | |
| filename=_repo_dataset_path("energy", energy_relpath), | |
| ) | |
| return Path(geometry_npz), Path(graph_npz), Path(weather_npz), Path(energy_npz) | |
| def _output_paths(sample_id: str) -> tuple[Path, Path, Path, Path, Path]: | |
| OUTPUT_ROOT.mkdir(parents=True, exist_ok=True) | |
| safe = sample_id.replace("/", "_").replace("\\", "_") | |
| run_dir = OUTPUT_ROOT / f"{safe}_{int(time.time() * 1000)}" | |
| run_dir.mkdir(parents=True, exist_ok=True) | |
| return ( | |
| run_dir / "geometry.png", | |
| run_dir / "graph.png", | |
| run_dir / "overlap.png", | |
| run_dir / "weather.png", | |
| run_dir / "energy.png", | |
| ) | |
| def update_building_dropdown(weather_id: str) -> gr.update: | |
| buildings = _building_choices(weather_id) | |
| value = buildings[0] if buildings else None | |
| return gr.update(choices=buildings, value=value) | |
| def render_sample( | |
| weather_id: str, | |
| building_id: str, | |
| energy_zone_index: int, | |
| use_custom_window: bool, | |
| window_start_hour: int, | |
| window_hours: int, | |
| ) -> tuple[str, str, str, str, str, str]: | |
| record = _resolve_record(weather_id=weather_id, building_id=building_id) | |
| if use_custom_window: | |
| start_hour = max(1, int(window_start_hour)) | |
| hours = max(1, int(window_hours)) | |
| else: | |
| start_hour = DEFAULT_WEATHER_WINDOW_START_HOUR | |
| hours = DEFAULT_WEATHER_WINDOW_HOURS | |
| zone_idx = max(0, int(energy_zone_index)) if energy_zone_index is not None else DEFAULT_ENERGY_ZONE_INDEX | |
| try: | |
| geometry_npz, graph_npz, weather_npz, energy_npz = _download_modalities(record) | |
| out_geometry, out_graph, out_overlap, out_weather, out_energy = _output_paths(record.sample_id) | |
| visualize_geometry(geometry_npz=geometry_npz, output_png=out_geometry) | |
| visualize_graph(graph_npz=graph_npz, geometry_npz=geometry_npz, output_png=out_graph) | |
| visualize_graph_overlap(graph_npz=graph_npz, geometry_npz=geometry_npz, energy_npz=energy_npz, output_png=out_overlap) | |
| visualize_weather( | |
| weather_npz=weather_npz, | |
| output_png=out_weather, | |
| start_hour=start_hour, | |
| window_hours=hours, | |
| ) | |
| visualize_energy( | |
| energy_npz=energy_npz, | |
| output_png=out_energy, | |
| zone_index=zone_idx, | |
| start_hour=start_hour, | |
| window_hours=hours, | |
| ) | |
| except Exception as exc: | |
| raise gr.Error(f"Failed to render sample {record.sample_id}: {exc}") from exc | |
| if use_custom_window: | |
| window_text = f"custom window: start={start_hour}, hours={hours}" | |
| else: | |
| window_text = "default window: Jan-1 first 24 hours" | |
| summary = ( | |
| f"Rendered **{record.sample_id}** from `{DATASET_REPO_ID}` \n" | |
| f"weather_id: `{record.weather_id}` \n" | |
| f"building_id: `{record.building_id}` \n" | |
| f"n_steps: `{record.n_steps}` \n" | |
| f"n_spaces: `{record.n_spaces}` \n" | |
| f"zone_index: `{zone_idx}` \n" | |
| f"{window_text}" | |
| ) | |
| return str(out_geometry), str(out_graph), str(out_overlap), str(out_weather), str(out_energy), summary | |
| def pick_random_sample() -> tuple[str, gr.update]: | |
| rec = random.choice(list(_manifest_index().values())) | |
| choices = _building_choices(rec.weather_id) | |
| return rec.weather_id, gr.update(choices=choices, value=rec.building_id) | |
| def _startup_note() -> str: | |
| try: | |
| total = len(_manifest_index()) | |
| weather_count = len(_weather_choices()) | |
| return f"Manifest loaded: {total} samples, {weather_count} weather cities from {DATASET_REPO_ID}." | |
| except Exception as exc: | |
| return f"Manifest will be loaded lazily on first run. Reason: {exc}" | |
| default_weathers, default_weather, default_buildings, default_building = _safe_dropdown_defaults() | |
| APP_CSS = """ | |
| .gradio-container { | |
| max-width: 1920px !important; | |
| } | |
| #ctrl-row-1, #ctrl-row-2, #ctrl-row-3 { | |
| gap: 8px !important; | |
| } | |
| #ctrl-row-1 .gr-block, #ctrl-row-2 .gr-block, #ctrl-row-3 .gr-block { | |
| padding-top: 4px !important; | |
| padding-bottom: 4px !important; | |
| } | |
| #status-box { | |
| margin-top: 2px !important; | |
| margin-bottom: 4px !important; | |
| } | |
| #viz-row-all { | |
| gap: 8px !important; | |
| } | |
| #viz-row-all img { | |
| object-fit: contain !important; | |
| } | |
| """ | |
| with gr.Blocks(title="ArchEGraph Visualizer", css=APP_CSS) as demo: | |
| gr.Markdown( | |
| "# ArchEGraph Visualizer\n" | |
| "Visualize geometry, graph, weather and energy files from " | |
| "[ArchEGraph/ArchEGraph-demo](https://huggingface.co/datasets/ArchEGraph/ArchEGraph-demo)." | |
| ) | |
| gr.Markdown(_startup_note()) | |
| with gr.Row(elem_id="ctrl-row-1"): | |
| weather_dropdown = gr.Dropdown( | |
| label="Weather City", | |
| choices=default_weathers, | |
| value=default_weather, | |
| allow_custom_value=False, | |
| scale=2, | |
| ) | |
| building_dropdown = gr.Dropdown( | |
| label="Building ID", | |
| choices=default_buildings, | |
| value=default_building, | |
| allow_custom_value=True, | |
| scale=2, | |
| ) | |
| energy_zone_index_input = gr.Slider( | |
| label="Energy Zone Index", | |
| minimum=0, | |
| maximum=24, | |
| step=1, | |
| value=DEFAULT_ENERGY_ZONE_INDEX, | |
| scale=1, | |
| ) | |
| with gr.Row(elem_id="ctrl-row-2"): | |
| use_custom_window_input = gr.Checkbox( | |
| label="Custom Window", | |
| value=False, | |
| scale=1, | |
| ) | |
| window_start_hour_input = gr.Slider( | |
| label="Start Hour", | |
| minimum=1, | |
| maximum=8760, | |
| step=1, | |
| value=DEFAULT_WEATHER_WINDOW_START_HOUR, | |
| scale=2, | |
| ) | |
| window_hours_input = gr.Slider( | |
| label="Window Hours", | |
| minimum=1, | |
| maximum=8760, | |
| step=1, | |
| value=DEFAULT_WEATHER_WINDOW_HOURS, | |
| scale=2, | |
| ) | |
| with gr.Row(elem_id="ctrl-row-3"): | |
| run_btn = gr.Button("Visualize", variant="primary") | |
| random_btn = gr.Button("Pick Random Sample") | |
| status_md = gr.Markdown(elem_id="status-box") | |
| with gr.Row(elem_id="viz-row-all"): | |
| geometry_img = gr.Image(label="Geometry", type="filepath", height=215) | |
| graph_img = gr.Image(label="Building", type="filepath", height=215) | |
| overlap_img = gr.Image(label="Overlap", type="filepath", height=215) | |
| weather_img = gr.Image(label="Weather (line)", type="filepath", height=215) | |
| energy_img = gr.Image(label="Energy (selected zone)", type="filepath", height=215) | |
| weather_dropdown.change( | |
| fn=update_building_dropdown, | |
| inputs=[weather_dropdown], | |
| outputs=[building_dropdown], | |
| ) | |
| run_btn.click( | |
| fn=render_sample, | |
| inputs=[ | |
| weather_dropdown, | |
| building_dropdown, | |
| energy_zone_index_input, | |
| use_custom_window_input, | |
| window_start_hour_input, | |
| window_hours_input, | |
| ], | |
| outputs=[geometry_img, graph_img, overlap_img, weather_img, energy_img, status_md], | |
| api_name="render_sample", | |
| ) | |
| random_btn.click( | |
| fn=pick_random_sample, | |
| inputs=None, | |
| outputs=[weather_dropdown, building_dropdown], | |
| api_name="pick_random_sample", | |
| ) | |
| demo.queue() | |
| if __name__ == "__main__": | |
| demo.launch() | |