Spaces:
Running
Running
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| import logging | |
| from dataclasses import dataclass, field | |
| from typing import Dict, List, Optional, Set, Tuple | |
| import numpy as np | |
| from .parser import ( | |
| Patterns, | |
| filter_area, | |
| filter_node, | |
| filter_way, | |
| match_to_group, | |
| parse_area, | |
| parse_node, | |
| parse_way, | |
| ) | |
| from .reader import OSMData, OSMNode, OSMRelation, OSMWay | |
| logger = logging.getLogger(__name__) | |
| def glue(ways: List[OSMWay]) -> List[List[OSMNode]]: | |
| result: List[List[OSMNode]] = [] | |
| to_process: Set[Tuple[OSMNode]] = set() | |
| for way in ways: | |
| if way.is_cycle(): | |
| result.append(way.nodes) | |
| else: | |
| to_process.add(tuple(way.nodes)) | |
| while to_process: | |
| nodes: List[OSMNode] = list(to_process.pop()) | |
| glued: Optional[List[OSMNode]] = None | |
| other_nodes: Optional[Tuple[OSMNode]] = None | |
| for other_nodes in to_process: | |
| glued = try_to_glue(nodes, list(other_nodes)) | |
| if glued is not None: | |
| break | |
| if glued is not None: | |
| to_process.remove(other_nodes) | |
| if is_cycle(glued): | |
| result.append(glued) | |
| else: | |
| to_process.add(tuple(glued)) | |
| else: | |
| result.append(nodes) | |
| return result | |
| def is_cycle(nodes: List[OSMNode]) -> bool: | |
| """Is way a cycle way or an area boundary.""" | |
| return nodes[0] == nodes[-1] | |
| def try_to_glue(nodes: List[OSMNode], other: List[OSMNode]) -> Optional[List[OSMNode]]: | |
| """Create new combined way if ways share endpoints.""" | |
| if nodes[0] == other[0]: | |
| return list(reversed(other[1:])) + nodes | |
| if nodes[0] == other[-1]: | |
| return other[:-1] + nodes | |
| if nodes[-1] == other[-1]: | |
| return nodes + list(reversed(other[:-1])) | |
| if nodes[-1] == other[0]: | |
| return nodes + other[1:] | |
| return None | |
| def multipolygon_from_relation(rel: OSMRelation, osm: OSMData): | |
| inner_ways = [] | |
| outer_ways = [] | |
| for member in rel.members: | |
| if member.type_ == "way": | |
| if member.role == "inner": | |
| if member.ref in osm.ways: | |
| inner_ways.append(osm.ways[member.ref]) | |
| elif member.role == "outer": | |
| if member.ref in osm.ways: | |
| outer_ways.append(osm.ways[member.ref]) | |
| else: | |
| logger.warning(f'Unknown member role "{member.role}".') | |
| if outer_ways: | |
| inners_path = glue(inner_ways) | |
| outers_path = glue(outer_ways) | |
| return inners_path, outers_path | |
| class MapElement: | |
| id_: int | |
| label: str | |
| group: str | |
| tags: Optional[Dict[str, str]] | |
| class MapNode(MapElement): | |
| xy: np.ndarray | |
| def from_osm(cls, node: OSMNode, label: str, group: str): | |
| return cls( | |
| node.id_, | |
| label, | |
| group, | |
| node.tags, | |
| xy=node.xy, | |
| ) | |
| class MapLine(MapElement): | |
| xy: np.ndarray | |
| def from_osm(cls, way: OSMWay, label: str, group: str): | |
| xy = np.stack([n.xy for n in way.nodes]) | |
| return cls( | |
| way.id_, | |
| label, | |
| group, | |
| way.tags, | |
| xy=xy, | |
| ) | |
| class MapArea(MapElement): | |
| outers: List[np.ndarray] | |
| inners: List[np.ndarray] = field(default_factory=list) | |
| def from_relation(cls, rel: OSMRelation, label: str, group: str, osm: OSMData): | |
| outers_inners = multipolygon_from_relation(rel, osm) | |
| if outers_inners is None: | |
| return None | |
| outers, inners = outers_inners | |
| outers = [np.stack([n.xy for n in way]) for way in outers] | |
| inners = [np.stack([n.xy for n in way]) for way in inners] | |
| return cls( | |
| rel.id_, | |
| label, | |
| group, | |
| rel.tags, | |
| outers=outers, | |
| inners=inners, | |
| ) | |
| def from_way(cls, way: OSMWay, label: str, group: str): | |
| xy = np.stack([n.xy for n in way.nodes]) | |
| return cls( | |
| way.id_, | |
| label, | |
| group, | |
| way.tags, | |
| outers=[xy], | |
| ) | |
| class MapData: | |
| def __init__(self): | |
| self.nodes: Dict[int, MapNode] = {} | |
| self.lines: Dict[int, MapLine] = {} | |
| self.areas: Dict[int, MapArea] = {} | |
| def from_osm(cls, osm: OSMData): | |
| self = cls() | |
| for node in filter(filter_node, osm.nodes.values()): | |
| label = parse_node(node.tags) | |
| if label is None: | |
| continue | |
| group = match_to_group(label, Patterns.nodes) | |
| if group is None: | |
| group = match_to_group(label, Patterns.ways) | |
| if group is None: | |
| continue # missing | |
| self.nodes[node.id_] = MapNode.from_osm(node, label, group) | |
| for way in filter(filter_way, osm.ways.values()): | |
| label = parse_way(way.tags) | |
| if label is None: | |
| continue | |
| group = match_to_group(label, Patterns.ways) | |
| if group is None: | |
| group = match_to_group(label, Patterns.nodes) | |
| if group is None: | |
| continue # missing | |
| self.lines[way.id_] = MapLine.from_osm(way, label, group) | |
| for area in filter(filter_area, osm.ways.values()): | |
| label = parse_area(area.tags) | |
| if label is None: | |
| continue | |
| group = match_to_group(label, Patterns.areas) | |
| if group is None: | |
| group = match_to_group(label, Patterns.ways) | |
| if group is None: | |
| group = match_to_group(label, Patterns.nodes) | |
| if group is None: | |
| continue # missing | |
| self.areas[area.id_] = MapArea.from_way(area, label, group) | |
| for rel in osm.relations.values(): | |
| if rel.tags.get("type") != "multipolygon": | |
| continue | |
| label = parse_area(rel.tags) | |
| if label is None: | |
| continue | |
| group = match_to_group(label, Patterns.areas) | |
| if group is None: | |
| group = match_to_group(label, Patterns.ways) | |
| if group is None: | |
| group = match_to_group(label, Patterns.nodes) | |
| if group is None: | |
| continue # missing | |
| area = MapArea.from_relation(rel, label, group, osm) | |
| assert rel.id_ not in self.areas # not sure if there can be collision | |
| if area is not None: | |
| self.areas[rel.id_] = area | |
| return self | |