File size: 7,006 Bytes
f49f471
 
 
 
ab42110
f49f471
 
 
ab42110
b443816
f49f471
 
 
 
 
 
 
 
 
 
 
ab42110
 
 
 
 
f49f471
 
 
eb1db21
f49f471
ab42110
 
 
 
 
f49f471
 
 
 
eb1db21
f49f471
 
 
 
 
 
 
 
ab42110
ab91b06
 
 
 
 
ab42110
f49f471
 
 
 
 
ab42110
7d50fd3
 
 
 
 
ab42110
7d50fd3
 
 
 
 
 
 
ab42110
f49f471
 
 
 
 
 
 
 
 
 
3f75335
 
 
f49f471
 
b443816
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ab42110
 
 
 
 
3f75335
ab42110
 
 
 
f49f471
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
"""Load and provide access to project data files."""

import json
from dataclasses import dataclass
from functools import lru_cache
from pathlib import Path

import numpy as np
from shapely import contains_xy
from shapely.geometry import LineString, MultiLineString, Point, box, shape
from shapely.ops import unary_union

DATA_DIR = Path(__file__).resolve().parent.parent / "data"


@dataclass
class Station:
    name: str
    lat: float
    lon: float
    speed_kmh: float
    id: str = ""

    def __post_init__(self):
        if not self.id:
            self.id = self.name


def load_stations() -> list[Station]:
    with open(DATA_DIR / "stations.json", encoding="utf-8") as f:
        raw = json.load(f)
    stations = [Station(**s) for s in raw]
    ids = [s.id for s in stations]
    if len(set(ids)) != len(ids):
        raise ValueError(f"duplicate station ids in stations.json: {ids}")
    return stations


def load_stations_raw() -> list[dict]:
    """Raw dicts for pydeck layers."""
    with open(DATA_DIR / "stations.json", encoding="utf-8") as f:
        return json.load(f)


def load_passages() -> dict:
    with open(DATA_DIR / "passages.json") as f:
        return json.load(f)


@lru_cache(maxsize=1)
def load_risk_scenarios() -> dict:
    with open(DATA_DIR / "risk_scenarios.json", encoding="utf-8") as f:
        return json.load(f)


@lru_cache(maxsize=1)
def load_zones_geojson() -> dict:
    with open(DATA_DIR / "neva_zone.geojson") as f:
        return json.load(f)


@lru_cache(maxsize=1)
def load_shoreline_geojson() -> dict:
    with open(DATA_DIR / "shoreline.geojson", encoding="utf-8") as f:
        return json.load(f)


@lru_cache(maxsize=1)
def load_shoreline():
    """Load coastline used for shore-distance risk components."""
    geojson = load_shoreline_geojson()
    lines = [shape(f["geometry"]) for f in geojson["features"]]
    return unary_union(lines)


@lru_cache(maxsize=1)
def load_water_polygon():
    """Union of all water zone polygons as a single Shapely geometry."""
    geojson = load_zones_geojson()
    polys = [shape(f["geometry"]) for f in geojson["features"]]
    return unary_union(polys)


def load_zone_polygons() -> list[tuple[str, any]]:
    """List of (zone_name, shapely_polygon) pairs."""
    geojson = load_zones_geojson()
    return [
        (f["properties"]["zone"], shape(f["geometry"])) for f in geojson["features"]
    ]


# Bounding box around Kronshtadt island (incl. КЗС causeway segments alongside it).
# Used to extract the island outline from zone outer rings.
KRONSHTADT_BBOX = (29.62, 59.97, 29.83, 60.04)  # (lon_min, lat_min, lon_max, lat_max)

# Minimum length (in degrees) for a Kronshtadt outline segment to be kept.
# Drops tiny artifact runs (~21 pts near С-2 entry).
_KRONSHTADT_MIN_PART_LEN_DEG = 0.005

# Tolerance for detecting whether a fragment touches an "ear tip" (endpoint of
# the original outer-ring run inside the bbox). ~10m at this latitude.
_KRONSHTADT_EAR_TOL_DEG = 1e-4


@lru_cache(maxsize=1)
def load_kronshtadt_cuts():
    """User-provided line segments that trim КЗС "ears" from the Kronshtadt outline."""
    with open(DATA_DIR / "kronshtadt_cuts.geojson", encoding="utf-8") as f:
        return json.load(f)


def _extract_runs_in_bbox(bbox) -> list[LineString]:
    runs: list[LineString] = []
    for _zone_name, polygon in load_zone_polygons():
        coords = list(polygon.exterior.coords)
        cur: list[tuple[float, float]] = []
        for x, y in coords:
            if bbox.covers(Point(x, y)):
                cur.append((x, y))
            else:
                if len(cur) >= 2:
                    runs.append(LineString(cur))
                cur = []
        if len(cur) >= 2:
            runs.append(LineString(cur))
    return runs


@lru_cache(maxsize=1)
def load_kronshtadt_outline():
    """Outline of Kronshtadt as a MultiLineString.

    Pipeline:
    1. Take north + south zone outer rings restricted to KRONSHTADT_BBOX.
    2. Split each run at intersections with cuts from `kronshtadt_cuts.geojson`.
    3. Drop fragments that touch an original run endpoint — these are the КЗС
       causeway "ears" approaching С-1 / С-2.
    4. Drop fragments whose both endpoints lie on the SAME cut — these are
       inlets (ports) where the contour wraps in and out across one cut line.
    5. Drop fragments shorter than `_KRONSHTADT_MIN_PART_LEN_DEG`.
    """
    from shapely.ops import split, unary_union

    bbox = box(*KRONSHTADT_BBOX)
    raw_runs = _extract_runs_in_bbox(bbox)
    if not raw_runs:
        raise RuntimeError("Kronshtadt outline extraction returned no segments")

    ear_tips = []
    for run in raw_runs:
        ear_tips.append(Point(run.coords[0]))
        ear_tips.append(Point(run.coords[-1]))

    cuts_geojson = load_kronshtadt_cuts()
    cut_lines = [shape(f["geometry"]) for f in cuts_geojson["features"]]
    cuts_union = unary_union(cut_lines) if cut_lines else None

    def closest_cut_idx(pt: Point) -> int | None:
        if not cut_lines:
            return None
        dists = [pt.distance(c) for c in cut_lines]
        return int(min(range(len(cut_lines)), key=lambda i: dists[i]))

    kept: list[LineString] = []
    for run in raw_runs:
        pieces = [run]
        if cuts_union is not None and run.intersects(cuts_union):
            split_result = split(run, cuts_union)
            pieces = [g for g in split_result.geoms if isinstance(g, LineString)]
        for piece in pieces:
            if piece.length < _KRONSHTADT_MIN_PART_LEN_DEG:
                continue
            if any(piece.distance(t) < _KRONSHTADT_EAR_TOL_DEG for t in ear_tips):
                continue
            # Detect inlet: both ends lie on the same cut LineString (within tol).
            if cut_lines:
                a = Point(piece.coords[0])
                b = Point(piece.coords[-1])
                ia, ib = closest_cut_idx(a), closest_cut_idx(b)
                if ia is not None and ia == ib:
                    da = a.distance(cut_lines[ia])
                    db = b.distance(cut_lines[ib])
                    if da < _KRONSHTADT_EAR_TOL_DEG and db < _KRONSHTADT_EAR_TOL_DEG:
                        continue
            kept.append(piece)

    if not kept:
        raise RuntimeError("Kronshtadt outline: nothing left after cuts")
    return MultiLineString(kept)


@lru_cache(maxsize=1)
def _north_zone_union():
    return unary_union([p for name, p in load_zone_polygons() if name == "north"])


def classify_cells_by_zone(lats: np.ndarray, lons: np.ndarray) -> np.ndarray:
    """Assign each grid cell to 'N' (north) or 'S' (south) zone."""
    north = _north_zone_union()
    in_north = contains_xy(north, np.asarray(lons), np.asarray(lats))
    return np.where(in_north, "N", "S").astype("U1")


def get_passage_coords() -> list[tuple[float, float]]:
    """Return passage coordinates as [(lat, lon), ...]."""
    passages = load_passages()
    return [(v["lat"], v["lon"]) for v in passages.values()]