"""Read flight telemetry from drone/survey imagery → a CameraModel. This is the piece that makes georeferencing (and therefore the population estimate) REAL instead of illustrative: instead of typing in an altitude and angle, we read the actual values the aircraft recorded. Drone imagery carries this metadata already: * EXIF — GPS, focal length (35mm-equiv), image dimensions. * XMP — DJI and most survey drones write rich flight tags here: RelativeAltitude (AGL!), GimbalPitch/Yaw/RollDegree, GPS, etc. Crucially, the **gimbal pitch decides nadir vs oblique automatically** — DJI reports -90° looking straight down (nadir) and 0° at the horizon. We convert that to our camera convention (0° = nadir) so the same image both georeferences correctly AND tells the app which viewpoint (and thus which model) to use — no manual toggle. That's the hard oblique case handled at the source. Only Pillow is required. Missing fields are reported, not guessed. """ from __future__ import annotations import math import re from dataclasses import dataclass from pathlib import Path from .camera import CameraModel # Full-frame reference width for converting 35mm-equivalent focal length to FOV. _FF_SENSOR_MM = 36.0 # Within this many degrees of straight-down we call it nadir. _NADIR_TOL_DEG = 12.0 @dataclass class Telemetry: gps_lat: float | None = None gps_lon: float | None = None altitude_m: float | None = None # above ground (AGL) when available gimbal_pitch_deg: float | None = None # DJI convention (-90 = nadir) gimbal_yaw_deg: float | None = None gimbal_roll_deg: float | None = None focal_35mm: float | None = None hfov_deg: float | None = None image_width: int | None = None image_height: int | None = None source: str = "none" # "xmp", "exif", "xmp+exif", "none" @property def cam_pitch_deg(self) -> float | None: """Gimbal pitch in OUR convention (0 = nadir, 90 = horizontal).""" if self.gimbal_pitch_deg is None: return None return 90.0 + self.gimbal_pitch_deg # DJI -90 -> 0 (nadir); -45 -> 45 @property def viewpoint(self) -> str | None: """'nadir' or 'oblique' from the gimbal pitch — drives model routing.""" p = self.cam_pitch_deg if p is None: return None return "nadir" if abs(p) <= _NADIR_TOL_DEG else "oblique" def missing(self) -> list[str]: need = {"altitude_m": self.altitude_m, "hfov_deg": self.hfov_deg, "gimbal_pitch_deg": self.gimbal_pitch_deg} return [k for k, v in need.items() if v is None] def to_camera(self) -> CameraModel | None: """Build a CameraModel if the geometry essentials are present.""" if self.altitude_m is None or self.hfov_deg is None or not self.image_width: return None return CameraModel( altitude=self.altitude_m, image_width=self.image_width, image_height=self.image_height or self.image_width, hfov_deg=self.hfov_deg, pitch_deg=self.cam_pitch_deg if self.cam_pitch_deg is not None else 0.0, yaw_deg=self.gimbal_yaw_deg or 0.0, roll_deg=self.gimbal_roll_deg or 0.0, ) def hfov_from_focal35(f35mm: float) -> float: """Horizontal field of view (deg) from 35mm-equivalent focal length.""" return math.degrees(2.0 * math.atan(_FF_SENSOR_MM / (2.0 * f35mm))) def _xmp_float(blob: str, *tags: str) -> float | None: """First matching DJI/XMP tag value, as attribute or element.""" for tag in tags: m = re.search(rf'{tag}\s*=\s*"([+-]?[\d.]+)"', blob) or \ re.search(rf"<{tag}>\s*([+-]?[\d.]+)\s*", blob) if m: try: return float(m.group(1)) except ValueError: pass return None def _read_xmp(path: Path) -> str | None: """Extract the XMP packet from an image's raw bytes (no XML lib needed).""" raw = path.read_bytes() start = raw.find(b"") if start == -1 or end == -1: return None return raw[start:end + len(b"")].decode("utf-8", "ignore") def parse_telemetry(path: str | Path) -> Telemetry: """Parse flight telemetry from an image's EXIF + XMP.""" path = Path(path) tel = Telemetry() got = [] # --- XMP (DJI / survey-drone flight tags) --- xmp = _read_xmp(path) if xmp: tel.altitude_m = _xmp_float(xmp, "drone-dji:RelativeAltitude", "RelativeAltitude") tel.gimbal_pitch_deg = _xmp_float(xmp, "drone-dji:GimbalPitchDegree", "GimbalPitchDegree") tel.gimbal_yaw_deg = _xmp_float(xmp, "drone-dji:GimbalYawDegree", "GimbalYawDegree") tel.gimbal_roll_deg = _xmp_float(xmp, "drone-dji:GimbalRollDegree", "GimbalRollDegree") lat = _xmp_float(xmp, "drone-dji:GpsLatitude", "GpsLatitude") lon = _xmp_float(xmp, "drone-dji:GpsLongitude", "GpsLongitude", "GpsLongtitude") if lat is not None: tel.gps_lat, tel.gps_lon = lat, lon if any(v is not None for v in (tel.altitude_m, tel.gimbal_pitch_deg)): got.append("xmp") # --- EXIF (focal length, dimensions, GPS fallback) --- try: from PIL import Image with Image.open(path) as im: tel.image_width, tel.image_height = im.size exif = im.getexif() # FocalLengthIn35mmFilm usually lives in the Exif sub-IFD (0x8769), # not the top level — real DJI images store it there. exif_ifd = exif.get_ifd(0x8769) f35 = exif.get(0xA405) or exif_ifd.get(0xA405) if f35: tel.focal_35mm = float(f35) tel.hfov_deg = hfov_from_focal35(float(f35)) if tel.gps_lat is None: gps = exif.get_ifd(0x8825) if gps: tel.gps_lat = _gps_to_deg(gps.get(2), gps.get(1)) tel.gps_lon = _gps_to_deg(gps.get(4), gps.get(3)) if tel.altitude_m is None and gps.get(6) is not None: tel.altitude_m = float(gps.get(6)) # GPS altitude (ASL, not AGL) if f35: got.append("exif") except Exception: # noqa: BLE001 — no PIL / unreadable EXIF is a valid partial state pass tel.source = "+".join(dict.fromkeys(got)) or "none" return tel def _gps_to_deg(dms, ref) -> float | None: """EXIF GPS (degrees, minutes, seconds) + hemisphere ref → signed decimal.""" if not dms: return None try: d, m, s = (float(x) for x in dms) val = d + m / 60.0 + s / 3600.0 if ref in ("S", "W", b"S", b"W"): val = -val return val except Exception: # noqa: BLE001 return None def camera_from_image(path: str | Path) -> tuple[CameraModel | None, Telemetry]: """Convenience: telemetry + a ready CameraModel (or None if insufficient).""" tel = parse_telemetry(path) return tel.to_camera(), tel