Tadiwa-M
Deploy: auto-derive dedup radius (drop hardcoded 3m bypass)
58aefd4
Raw
History Blame Contribute Delete
7.12 kB
"""Read flight telemetry from drone/survey imagery → a CameraModel.
This is the piece that makes georeferencing (and therefore the population
estimate) REAL instead of illustrative: instead of typing in an altitude and
angle, we read the actual values the aircraft recorded.
Drone imagery carries this metadata already:
* EXIF — GPS, focal length (35mm-equiv), image dimensions.
* XMP — DJI and most survey drones write rich flight tags here:
RelativeAltitude (AGL!), GimbalPitch/Yaw/RollDegree, GPS, etc.
Crucially, the **gimbal pitch decides nadir vs oblique automatically** — DJI
reports -90° looking straight down (nadir) and 0° at the horizon. We convert
that to our camera convention (0° = nadir) so the same image both georeferences
correctly AND tells the app which viewpoint (and thus which model) to use — no
manual toggle. That's the hard oblique case handled at the source.
Only Pillow is required. Missing fields are reported, not guessed.
"""
from __future__ import annotations
import math
import re
from dataclasses import dataclass
from pathlib import Path
from .camera import CameraModel
# Full-frame reference width for converting 35mm-equivalent focal length to FOV.
_FF_SENSOR_MM = 36.0
# Within this many degrees of straight-down we call it nadir.
_NADIR_TOL_DEG = 12.0
@dataclass
class Telemetry:
gps_lat: float | None = None
gps_lon: float | None = None
altitude_m: float | None = None # above ground (AGL) when available
gimbal_pitch_deg: float | None = None # DJI convention (-90 = nadir)
gimbal_yaw_deg: float | None = None
gimbal_roll_deg: float | None = None
focal_35mm: float | None = None
hfov_deg: float | None = None
image_width: int | None = None
image_height: int | None = None
source: str = "none" # "xmp", "exif", "xmp+exif", "none"
@property
def cam_pitch_deg(self) -> float | None:
"""Gimbal pitch in OUR convention (0 = nadir, 90 = horizontal)."""
if self.gimbal_pitch_deg is None:
return None
return 90.0 + self.gimbal_pitch_deg # DJI -90 -> 0 (nadir); -45 -> 45
@property
def viewpoint(self) -> str | None:
"""'nadir' or 'oblique' from the gimbal pitch — drives model routing."""
p = self.cam_pitch_deg
if p is None:
return None
return "nadir" if abs(p) <= _NADIR_TOL_DEG else "oblique"
def missing(self) -> list[str]:
need = {"altitude_m": self.altitude_m, "hfov_deg": self.hfov_deg,
"gimbal_pitch_deg": self.gimbal_pitch_deg}
return [k for k, v in need.items() if v is None]
def to_camera(self) -> CameraModel | None:
"""Build a CameraModel if the geometry essentials are present."""
if self.altitude_m is None or self.hfov_deg is None or not self.image_width:
return None
return CameraModel(
altitude=self.altitude_m,
image_width=self.image_width,
image_height=self.image_height or self.image_width,
hfov_deg=self.hfov_deg,
pitch_deg=self.cam_pitch_deg if self.cam_pitch_deg is not None else 0.0,
yaw_deg=self.gimbal_yaw_deg or 0.0,
roll_deg=self.gimbal_roll_deg or 0.0,
)
def hfov_from_focal35(f35mm: float) -> float:
"""Horizontal field of view (deg) from 35mm-equivalent focal length."""
return math.degrees(2.0 * math.atan(_FF_SENSOR_MM / (2.0 * f35mm)))
def _xmp_float(blob: str, *tags: str) -> float | None:
"""First matching DJI/XMP tag value, as attribute or element."""
for tag in tags:
m = re.search(rf'{tag}\s*=\s*"([+-]?[\d.]+)"', blob) or \
re.search(rf"<{tag}>\s*([+-]?[\d.]+)\s*</{tag}>", blob)
if m:
try:
return float(m.group(1))
except ValueError:
pass
return None
def _read_xmp(path: Path) -> str | None:
"""Extract the XMP packet from an image's raw bytes (no XML lib needed)."""
raw = path.read_bytes()
start = raw.find(b"<x:xmpmeta")
end = raw.find(b"</x:xmpmeta>")
if start == -1 or end == -1:
return None
return raw[start:end + len(b"</x:xmpmeta>")].decode("utf-8", "ignore")
def parse_telemetry(path: str | Path) -> Telemetry:
"""Parse flight telemetry from an image's EXIF + XMP."""
path = Path(path)
tel = Telemetry()
got = []
# --- XMP (DJI / survey-drone flight tags) ---
xmp = _read_xmp(path)
if xmp:
tel.altitude_m = _xmp_float(xmp, "drone-dji:RelativeAltitude", "RelativeAltitude")
tel.gimbal_pitch_deg = _xmp_float(xmp, "drone-dji:GimbalPitchDegree", "GimbalPitchDegree")
tel.gimbal_yaw_deg = _xmp_float(xmp, "drone-dji:GimbalYawDegree", "GimbalYawDegree")
tel.gimbal_roll_deg = _xmp_float(xmp, "drone-dji:GimbalRollDegree", "GimbalRollDegree")
lat = _xmp_float(xmp, "drone-dji:GpsLatitude", "GpsLatitude")
lon = _xmp_float(xmp, "drone-dji:GpsLongitude", "GpsLongitude", "GpsLongtitude")
if lat is not None:
tel.gps_lat, tel.gps_lon = lat, lon
if any(v is not None for v in (tel.altitude_m, tel.gimbal_pitch_deg)):
got.append("xmp")
# --- EXIF (focal length, dimensions, GPS fallback) ---
try:
from PIL import Image
with Image.open(path) as im:
tel.image_width, tel.image_height = im.size
exif = im.getexif()
# FocalLengthIn35mmFilm usually lives in the Exif sub-IFD (0x8769),
# not the top level — real DJI images store it there.
exif_ifd = exif.get_ifd(0x8769)
f35 = exif.get(0xA405) or exif_ifd.get(0xA405)
if f35:
tel.focal_35mm = float(f35)
tel.hfov_deg = hfov_from_focal35(float(f35))
if tel.gps_lat is None:
gps = exif.get_ifd(0x8825)
if gps:
tel.gps_lat = _gps_to_deg(gps.get(2), gps.get(1))
tel.gps_lon = _gps_to_deg(gps.get(4), gps.get(3))
if tel.altitude_m is None and gps.get(6) is not None:
tel.altitude_m = float(gps.get(6)) # GPS altitude (ASL, not AGL)
if f35:
got.append("exif")
except Exception: # noqa: BLE001 — no PIL / unreadable EXIF is a valid partial state
pass
tel.source = "+".join(dict.fromkeys(got)) or "none"
return tel
def _gps_to_deg(dms, ref) -> float | None:
"""EXIF GPS (degrees, minutes, seconds) + hemisphere ref → signed decimal."""
if not dms:
return None
try:
d, m, s = (float(x) for x in dms)
val = d + m / 60.0 + s / 3600.0
if ref in ("S", "W", b"S", b"W"):
val = -val
return val
except Exception: # noqa: BLE001
return None
def camera_from_image(path: str | Path) -> tuple[CameraModel | None, Telemetry]:
"""Convenience: telemetry + a ready CameraModel (or None if insufficient)."""
tel = parse_telemetry(path)
return tel.to_camera(), tel