| """Read flight telemetry from drone/survey imagery → a CameraModel. |
| |
| This is the piece that makes georeferencing (and therefore the population |
| estimate) REAL instead of illustrative: instead of typing in an altitude and |
| angle, we read the actual values the aircraft recorded. |
| |
| Drone imagery carries this metadata already: |
| * EXIF — GPS, focal length (35mm-equiv), image dimensions. |
| * XMP — DJI and most survey drones write rich flight tags here: |
| RelativeAltitude (AGL!), GimbalPitch/Yaw/RollDegree, GPS, etc. |
| |
| Crucially, the **gimbal pitch decides nadir vs oblique automatically** — DJI |
| reports -90° looking straight down (nadir) and 0° at the horizon. We convert |
| that to our camera convention (0° = nadir) so the same image both georeferences |
| correctly AND tells the app which viewpoint (and thus which model) to use — no |
| manual toggle. That's the hard oblique case handled at the source. |
| |
| Only Pillow is required. Missing fields are reported, not guessed. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import math |
| import re |
| from dataclasses import dataclass |
| from pathlib import Path |
|
|
| from .camera import CameraModel |
|
|
| |
| _FF_SENSOR_MM = 36.0 |
| |
| _NADIR_TOL_DEG = 12.0 |
|
|
|
|
| @dataclass |
| class Telemetry: |
| gps_lat: float | None = None |
| gps_lon: float | None = None |
| altitude_m: float | None = None |
| gimbal_pitch_deg: float | None = None |
| gimbal_yaw_deg: float | None = None |
| gimbal_roll_deg: float | None = None |
| focal_35mm: float | None = None |
| hfov_deg: float | None = None |
| image_width: int | None = None |
| image_height: int | None = None |
| source: str = "none" |
|
|
| @property |
| def cam_pitch_deg(self) -> float | None: |
| """Gimbal pitch in OUR convention (0 = nadir, 90 = horizontal).""" |
| if self.gimbal_pitch_deg is None: |
| return None |
| return 90.0 + self.gimbal_pitch_deg |
|
|
| @property |
| def viewpoint(self) -> str | None: |
| """'nadir' or 'oblique' from the gimbal pitch — drives model routing.""" |
| p = self.cam_pitch_deg |
| if p is None: |
| return None |
| return "nadir" if abs(p) <= _NADIR_TOL_DEG else "oblique" |
|
|
| def missing(self) -> list[str]: |
| need = {"altitude_m": self.altitude_m, "hfov_deg": self.hfov_deg, |
| "gimbal_pitch_deg": self.gimbal_pitch_deg} |
| return [k for k, v in need.items() if v is None] |
|
|
| def to_camera(self) -> CameraModel | None: |
| """Build a CameraModel if the geometry essentials are present.""" |
| if self.altitude_m is None or self.hfov_deg is None or not self.image_width: |
| return None |
| return CameraModel( |
| altitude=self.altitude_m, |
| image_width=self.image_width, |
| image_height=self.image_height or self.image_width, |
| hfov_deg=self.hfov_deg, |
| pitch_deg=self.cam_pitch_deg if self.cam_pitch_deg is not None else 0.0, |
| yaw_deg=self.gimbal_yaw_deg or 0.0, |
| roll_deg=self.gimbal_roll_deg or 0.0, |
| ) |
|
|
|
|
| def hfov_from_focal35(f35mm: float) -> float: |
| """Horizontal field of view (deg) from 35mm-equivalent focal length.""" |
| return math.degrees(2.0 * math.atan(_FF_SENSOR_MM / (2.0 * f35mm))) |
|
|
|
|
| def _xmp_float(blob: str, *tags: str) -> float | None: |
| """First matching DJI/XMP tag value, as attribute or element.""" |
| for tag in tags: |
| m = re.search(rf'{tag}\s*=\s*"([+-]?[\d.]+)"', blob) or \ |
| re.search(rf"<{tag}>\s*([+-]?[\d.]+)\s*</{tag}>", blob) |
| if m: |
| try: |
| return float(m.group(1)) |
| except ValueError: |
| pass |
| return None |
|
|
|
|
| def _read_xmp(path: Path) -> str | None: |
| """Extract the XMP packet from an image's raw bytes (no XML lib needed).""" |
| raw = path.read_bytes() |
| start = raw.find(b"<x:xmpmeta") |
| end = raw.find(b"</x:xmpmeta>") |
| if start == -1 or end == -1: |
| return None |
| return raw[start:end + len(b"</x:xmpmeta>")].decode("utf-8", "ignore") |
|
|
|
|
| def parse_telemetry(path: str | Path) -> Telemetry: |
| """Parse flight telemetry from an image's EXIF + XMP.""" |
| path = Path(path) |
| tel = Telemetry() |
| got = [] |
|
|
| |
| xmp = _read_xmp(path) |
| if xmp: |
| tel.altitude_m = _xmp_float(xmp, "drone-dji:RelativeAltitude", "RelativeAltitude") |
| tel.gimbal_pitch_deg = _xmp_float(xmp, "drone-dji:GimbalPitchDegree", "GimbalPitchDegree") |
| tel.gimbal_yaw_deg = _xmp_float(xmp, "drone-dji:GimbalYawDegree", "GimbalYawDegree") |
| tel.gimbal_roll_deg = _xmp_float(xmp, "drone-dji:GimbalRollDegree", "GimbalRollDegree") |
| lat = _xmp_float(xmp, "drone-dji:GpsLatitude", "GpsLatitude") |
| lon = _xmp_float(xmp, "drone-dji:GpsLongitude", "GpsLongitude", "GpsLongtitude") |
| if lat is not None: |
| tel.gps_lat, tel.gps_lon = lat, lon |
| if any(v is not None for v in (tel.altitude_m, tel.gimbal_pitch_deg)): |
| got.append("xmp") |
|
|
| |
| try: |
| from PIL import Image |
| with Image.open(path) as im: |
| tel.image_width, tel.image_height = im.size |
| exif = im.getexif() |
| |
| |
| exif_ifd = exif.get_ifd(0x8769) |
| f35 = exif.get(0xA405) or exif_ifd.get(0xA405) |
| if f35: |
| tel.focal_35mm = float(f35) |
| tel.hfov_deg = hfov_from_focal35(float(f35)) |
| if tel.gps_lat is None: |
| gps = exif.get_ifd(0x8825) |
| if gps: |
| tel.gps_lat = _gps_to_deg(gps.get(2), gps.get(1)) |
| tel.gps_lon = _gps_to_deg(gps.get(4), gps.get(3)) |
| if tel.altitude_m is None and gps.get(6) is not None: |
| tel.altitude_m = float(gps.get(6)) |
| if f35: |
| got.append("exif") |
| except Exception: |
| pass |
|
|
| tel.source = "+".join(dict.fromkeys(got)) or "none" |
| return tel |
|
|
|
|
| def _gps_to_deg(dms, ref) -> float | None: |
| """EXIF GPS (degrees, minutes, seconds) + hemisphere ref → signed decimal.""" |
| if not dms: |
| return None |
| try: |
| d, m, s = (float(x) for x in dms) |
| val = d + m / 60.0 + s / 3600.0 |
| if ref in ("S", "W", b"S", b"W"): |
| val = -val |
| return val |
| except Exception: |
| return None |
|
|
|
|
| def camera_from_image(path: str | Path) -> tuple[CameraModel | None, Telemetry]: |
| """Convenience: telemetry + a ready CameraModel (or None if insufficient).""" |
| tel = parse_telemetry(path) |
| return tel.to_camera(), tel |
|
|