phantom-grid / scripts /build_processed_data.py
unity4ar's picture
Ship Phantom Grid Docker Space
d2e6f94 verified
Raw
History Blame Contribute Delete
6.03 kB
from __future__ import annotations
import csv
import json
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from config import load_settings
from grid_map.atlas_builder import empty_atlas
from grid_map.storage import write_json
from grid_map.validator import validate_graph
LAYERS = {
"normal": "normal_cv_out",
"taxi": "taxi_cv_out",
"bus": "bus_cv_out",
"subway": "subway_cv_out",
}
TRANSPORT_LAYERS = ("taxi", "bus", "subway")
def main() -> None:
settings = load_settings()
layer_graphs = {layer: _read_graph(settings.raw_maps_dir / folder / "graph.json") for layer, folder in LAYERS.items()}
normal_junctions = _read_junctions(settings.raw_maps_dir / LAYERS["normal"] / "junctions.csv")
registry = build_junction_registry(normal_junctions, layer_graphs)
game_graph = build_game_graph(registry, layer_graphs)
errors = validate_graph(game_graph)
if errors:
raise SystemExit("\n".join(errors))
metadata = build_metadata(settings.raw_maps_dir, layer_graphs)
write_json(settings.junction_registry_path, registry)
write_json(settings.game_graph_path, game_graph)
write_json(settings.map_metadata_path, metadata)
if not settings.map_atlas_path.exists():
write_json(settings.map_atlas_path, empty_atlas())
print(f"Wrote {settings.junction_registry_path}")
print(f"Wrote {settings.game_graph_path}")
print(f"Wrote {settings.map_metadata_path}")
print(f"Atlas ready at {settings.map_atlas_path}")
def _read_graph(path: Path) -> dict[str, Any]:
with path.open("r", encoding="utf-8") as handle:
return json.load(handle)
def _read_junctions(path: Path) -> dict[int, dict[str, Any]]:
junctions: dict[int, dict[str, Any]] = {}
with path.open("r", encoding="utf-8", newline="") as handle:
for row in csv.DictReader(handle):
junction_id = int(row["id"])
junctions[junction_id] = {
"id": junction_id,
"x": int(float(row["x"])),
"y": int(float(row["y"])),
"radius": int(float(row["radius"])),
"neighbors": [int(value) for value in row.get("neighbors", "").split() if value],
}
return junctions
def build_junction_registry(normal_junctions: dict[int, dict[str, Any]], layer_graphs: dict[str, dict]) -> dict[str, Any]:
modes_by_junction: dict[int, set[str]] = {junction_id: set() for junction_id in normal_junctions}
for layer, graph in layer_graphs.items():
for node in graph.get("nodes", []):
modes_by_junction.setdefault(int(node["id"]), set()).add(layer)
junctions = []
for junction_id, data in sorted(normal_junctions.items()):
junctions.append(
{
"id": junction_id,
"x": data["x"],
"y": data["y"],
"radius": data["radius"],
"base_neighbors": data["neighbors"],
"layers_present": sorted(modes_by_junction.get(junction_id, [])),
"nearest_landmarks": [],
"district": None,
}
)
return {"schema_version": 1, "junctions": junctions}
def build_game_graph(registry: dict[str, Any], layer_graphs: dict[str, dict]) -> dict[str, Any]:
edge_modes: dict[tuple[int, int], set[str]] = {}
for layer in TRANSPORT_LAYERS:
for edge in layer_graphs[layer].get("edges", []):
source = int(edge["source"])
target = int(edge["target"])
key = tuple(sorted((source, target)))
edge_modes.setdefault(key, set()).add(layer)
edges = [
{"source": source, "target": target, "modes": sorted(modes)}
for (source, target), modes in sorted(edge_modes.items())
]
adjacency: dict[str, list[dict[str, Any]]] = {}
for edge in edges:
source = edge["source"]
target = edge["target"]
adjacency.setdefault(str(source), []).append({"destination": target, "modes": edge["modes"]})
adjacency.setdefault(str(target), []).append({"destination": source, "modes": edge["modes"]})
nodes = [
{
"id": junction["id"],
"x": junction["x"],
"y": junction["y"],
"radius": junction["radius"],
}
for junction in registry["junctions"]
]
return {"schema_version": 1, "nodes": nodes, "edges": edges, "adjacency": adjacency}
def build_metadata(raw_maps_dir: Path, layer_graphs: dict[str, dict]) -> dict[str, Any]:
def portable(path: Path) -> str:
return str(path.relative_to(PROJECT_ROOT))
return {
"schema_version": 1,
"generated_at": datetime.now(timezone.utc).isoformat(),
"images": {layer: portable(_display_image_for_layer(raw_maps_dir, layer)) for layer in LAYERS},
"source_images": {
"normal": portable(raw_maps_dir / "images" / "normal.png"),
"taxi": portable(raw_maps_dir / "images" / "taxi.png"),
"bus": portable(raw_maps_dir / "images" / "bus.png"),
"subway": portable(raw_maps_dir / "images" / "subway.png"),
},
"layers": {
layer: {
"folder": portable(raw_maps_dir / LAYERS[layer]),
"node_count": len(graph.get("nodes", [])),
"edge_count": len(graph.get("edges", [])),
}
for layer, graph in layer_graphs.items()
},
}
def _display_image_for_layer(raw_maps_dir: Path, layer: str) -> Path:
folder = raw_maps_dir / LAYERS[layer]
for candidate in (folder / "graph_on_map.png", folder / "junctions_labelled.png", raw_maps_dir / "images" / f"{layer}.png"):
if candidate.exists():
return candidate
return raw_maps_dir / "images" / f"{layer}.png"
if __name__ == "__main__":
main()