panacea-api / src /data /maneuver_detector.py
DTanzillo's picture
Upload folder using huggingface_hub
a4b5ecb verified
# Generated by Claude Code -- 2026-02-13
"""Detect satellite maneuvers from TLE data changes.
Compares successive TLEs for the same satellite. An abrupt change in
semi-major axis (> threshold) indicates a maneuver — either collision
avoidance, orbit maintenance, or orbit raising.
Based on Kelecy (2007) and Patera & Peterson (2021).
"""
import json
import math
import numpy as np
from pathlib import Path
from datetime import datetime, timedelta, timezone
# Earth parameters (WGS84)
MU_EARTH = 398600.4418 # km^3/s^2
EARTH_RADIUS_KM = 6378.137
# Maneuver detection thresholds
DEFAULT_DELTA_A_THRESHOLD_M = 200 # meters — below this is noise
STARLINK_DELTA_A_THRESHOLD_M = 100 # Starlink maneuvers can be smaller
def mean_motion_to_sma(n_rev_per_day: float) -> float:
"""Convert mean motion (rev/day) to semi-major axis (km)."""
if n_rev_per_day <= 0:
return 0.0
n_rad_per_sec = n_rev_per_day * 2 * math.pi / 86400.0
return (MU_EARTH / (n_rad_per_sec ** 2)) ** (1.0 / 3.0)
def sma_to_altitude(sma_km: float) -> float:
"""Convert semi-major axis to approximate altitude (km)."""
return sma_km - EARTH_RADIUS_KM
def parse_tle_epoch(epoch_str: str) -> datetime:
"""Parse a CelesTrak JSON epoch string (ISO 8601 format)."""
# CelesTrak uses: "2026-02-13T12:00:00.000000"
for fmt in ("%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d"):
try:
return datetime.strptime(epoch_str, fmt)
except ValueError:
continue
raise ValueError(f"Cannot parse epoch: {epoch_str}")
def extract_orbital_elements(tle_json: dict) -> dict:
"""Extract key orbital elements from a CelesTrak JSON TLE entry."""
norad_id = int(tle_json.get("NORAD_CAT_ID", 0))
name = tle_json.get("OBJECT_NAME", "UNKNOWN")
mean_motion = float(tle_json.get("MEAN_MOTION", 0))
eccentricity = float(tle_json.get("ECCENTRICITY", 0))
inclination = float(tle_json.get("INCLINATION", 0))
raan = float(tle_json.get("RA_OF_ASC_NODE", 0))
epoch_str = tle_json.get("EPOCH", "")
sma = mean_motion_to_sma(mean_motion)
altitude = sma_to_altitude(sma)
epoch = None
if epoch_str:
try:
epoch = parse_tle_epoch(epoch_str)
except ValueError:
pass
return {
"norad_id": norad_id,
"name": name,
"mean_motion": mean_motion,
"eccentricity": eccentricity,
"inclination": inclination,
"raan": raan,
"sma_km": sma,
"altitude_km": altitude,
"epoch": epoch,
"epoch_str": epoch_str,
}
def detect_maneuvers(
prev_tles: list[dict],
curr_tles: list[dict],
threshold_m: float = DEFAULT_DELTA_A_THRESHOLD_M,
) -> list[dict]:
"""Compare two TLE snapshots and detect maneuvers.
Args:
prev_tles: Previous TLE snapshot (CelesTrak JSON format)
curr_tles: Current TLE snapshot (CelesTrak JSON format)
threshold_m: Semi-major axis change threshold in meters
Returns:
List of detected maneuvers with details
"""
# Index previous TLEs by NORAD ID
prev_by_id = {}
for tle in prev_tles:
elem = extract_orbital_elements(tle)
if elem["norad_id"] > 0 and elem["sma_km"] > 0:
prev_by_id[elem["norad_id"]] = elem
maneuvers = []
for tle in curr_tles:
elem = extract_orbital_elements(tle)
norad_id = elem["norad_id"]
if norad_id not in prev_by_id or elem["sma_km"] <= 0:
continue
prev = prev_by_id[norad_id]
delta_a_km = elem["sma_km"] - prev["sma_km"]
delta_a_m = abs(delta_a_km) * 1000
if delta_a_m > threshold_m:
# Classify maneuver type
if delta_a_km > 0:
maneuver_type = "orbit_raise"
else:
maneuver_type = "orbit_lower"
# Estimate delta-v (Hohmann approximation)
v_circular = math.sqrt(MU_EARTH / prev["sma_km"]) # km/s
delta_v = abs(delta_a_km) / (2 * prev["sma_km"]) * v_circular * 1000 # m/s
maneuvers.append({
"norad_id": norad_id,
"name": elem["name"],
"prev_sma_km": prev["sma_km"],
"curr_sma_km": elem["sma_km"],
"delta_a_m": delta_a_m,
"delta_a_km": delta_a_km,
"delta_v_m_s": round(delta_v, 3),
"maneuver_type": maneuver_type,
"altitude_km": elem["altitude_km"],
"prev_epoch": prev["epoch_str"],
"curr_epoch": elem["epoch_str"],
"detected_at": datetime.now(timezone.utc).isoformat(),
})
# Sort by delta_a descending (largest maneuvers first)
maneuvers.sort(key=lambda m: m["delta_a_m"], reverse=True)
return maneuvers
def detect_maneuvers_dual_threshold(
prev_tles: list[dict],
curr_tles: list[dict],
) -> list[dict]:
"""Detect maneuvers using constellation-aware thresholds.
Uses 100m threshold for Starlink (smaller maneuvers) and
200m for everything else. Merges results, deduplicating by NORAD ID.
"""
# Split current TLEs by constellation
starlink_curr = []
other_curr = []
for tle in curr_tles:
name = tle.get("OBJECT_NAME", "")
if "STARLINK" in name.upper():
starlink_curr.append(tle)
else:
other_curr.append(tle)
# Split previous TLEs the same way
starlink_prev = []
other_prev = []
for tle in prev_tles:
name = tle.get("OBJECT_NAME", "")
if "STARLINK" in name.upper():
starlink_prev.append(tle)
else:
other_prev.append(tle)
# Detect with appropriate thresholds
starlink_maneuvers = detect_maneuvers(
starlink_prev, starlink_curr,
threshold_m=STARLINK_DELTA_A_THRESHOLD_M,
)
other_maneuvers = detect_maneuvers(
other_prev, other_curr,
threshold_m=DEFAULT_DELTA_A_THRESHOLD_M,
)
# Merge and sort by delta_a descending
all_maneuvers = starlink_maneuvers + other_maneuvers
all_maneuvers.sort(key=lambda m: m["delta_a_m"], reverse=True)
return all_maneuvers
def load_tle_snapshot(path: Path) -> list[dict]:
"""Load a TLE snapshot from a JSON file."""
with open(path) as f:
return json.load(f)
def save_tle_snapshot(tles: list[dict], path: Path):
"""Save a TLE snapshot to a JSON file."""
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
json.dump(tles, f)