site-intelligence-studio / src /topography.py
Eishaan's picture
Add KML terrain and soil analysis layers
2f91d7e
Raw
History Blame Contribute Delete
6.51 kB
from __future__ import annotations
import math
from statistics import mean
from typing import Any
from .evidence import make_evidence
from .geometry import EARTH_RADIUS_M
from .http_client import get_json
from .models import EvidenceItem, SiteSelection
OPENTOPO_URL = "https://api.opentopodata.org/v1/srtm90m"
def fetch_topography(selection: SiteSelection) -> tuple[dict[str, Any], list[EvidenceItem]]:
points = _sample_points(selection)
if not points:
return {}, [
make_evidence(
category="Topography",
finding="Topography sampling was skipped because no real-world site coordinate is available.",
source_name="OpenTopoData",
source_url="https://www.opentopodata.org/api/",
source_type="public elevation API",
resolution_or_scope="not available",
confidence="low",
limitation="Elevation needs a WGS84 coordinate.",
design_implication="Use user/CAD contour notes until the site is georeferenced.",
verification_needed="Provide a map/KML/GeoJSON boundary or anchor coordinate.",
output_label="site_visit_required",
)
]
try:
locations = "|".join(f"{lat:.6f},{lon:.6f}" for lat, lon in points)
data = get_json(OPENTOPO_URL, {"locations": locations}, timeout=25)
elevations = [
float(item["elevation"])
for item in data.get("results", [])
if item.get("elevation") is not None
]
if not elevations:
raise ValueError("No elevation values returned.")
relief = max(elevations) - min(elevations)
diagonal_m = _selection_diagonal_m(selection) or max(selection.radius_m or 250, 1)
slope_pct = (relief / diagonal_m) * 100 if diagonal_m else None
topo = {
"source": "OpenTopoData SRTM 90m",
"sample_count": len(elevations),
"min_elevation_m": round(min(elevations), 1),
"max_elevation_m": round(max(elevations), 1),
"mean_elevation_m": round(mean(elevations), 1),
"relief_m": round(relief, 1),
"approx_slope_pct": round(slope_pct, 2) if slope_pct is not None else None,
"interpretation": _topography_interpretation(relief, slope_pct),
}
return topo, [
make_evidence(
category="Topography",
finding=(
f"Elevation sampling found mean elevation {topo['mean_elevation_m']} m "
f"and approximate relief {topo['relief_m']} m across sampled points."
),
source_name="OpenTopoData SRTM 90m",
source_url="https://www.opentopodata.org/api/",
source_type="public elevation API",
resolution_or_scope="sampled centroid and boundary/bbox points; SRTM-scale terrain",
confidence="medium",
limitation="SRTM/OpenTopoData is not a site survey and may miss small plot-level slopes, steps, retaining walls, and drains.",
design_implication="Use for early slope/drainage awareness and decide what to verify on site.",
verification_needed="Check actual slope, contours, waterlogging, retaining edges, and drainage outlets during site visit.",
output_label="public_data",
)
]
except Exception as exc: # noqa: BLE001
return {}, [
make_evidence(
category="Topography",
finding="Topography/elevation data could not be retrieved.",
source_name="OpenTopoData",
source_url="https://www.opentopodata.org/api/",
source_type="public elevation API",
resolution_or_scope="not available",
confidence="low",
limitation=f"API request failed: {type(exc).__name__}.",
design_implication="Do not infer slope or drainage direction from this missing layer.",
verification_needed="Use CAD contours, survey levels, site observation, or another DEM source.",
output_label="site_visit_required",
)
]
def _sample_points(selection: SiteSelection) -> list[tuple[float, float]]:
points: list[tuple[float, float]] = []
if selection.anchor_lat is not None and selection.anchor_lon is not None:
points.append((selection.anchor_lat, selection.anchor_lon))
if selection.bbox:
min_lon, min_lat, max_lon, max_lat = selection.bbox
points.extend(
[
(min_lat, min_lon),
(min_lat, max_lon),
(max_lat, min_lon),
(max_lat, max_lon),
((min_lat + max_lat) / 2, min_lon),
((min_lat + max_lat) / 2, max_lon),
]
)
return _dedupe(points)[:8]
def _dedupe(points: list[tuple[float, float]]) -> list[tuple[float, float]]:
seen = set()
out = []
for lat, lon in points:
key = (round(lat, 6), round(lon, 6))
if key not in seen:
seen.add(key)
out.append((lat, lon))
return out
def _selection_diagonal_m(selection: SiteSelection) -> float | None:
if not selection.bbox:
return None
min_lon, min_lat, max_lon, max_lat = selection.bbox
return _distance_m(min_lat, min_lon, max_lat, max_lon)
def _distance_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
phi1 = math.radians(lat1)
phi2 = math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlambda = math.radians(lon2 - lon1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlambda / 2) ** 2
return 2 * EARTH_RADIUS_M * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def _topography_interpretation(relief_m: float, slope_pct: float | None) -> str:
if slope_pct is None:
return "Elevation was sampled, but slope could not be estimated."
if relief_m < 1.0 and slope_pct < 1.0:
return "Public elevation samples suggest a broadly flat site, but micro-drainage still needs site verification."
if slope_pct < 3.0:
return "Public elevation samples suggest a gentle level change; verify drainage and accessible-route gradients."
return "Public elevation samples suggest meaningful level change; verify contours, cut-fill, drainage, and retaining edges."