# Generated by Claude Code -- 2026-02-13 """Detect satellite maneuvers from TLE data changes. Compares successive TLEs for the same satellite. An abrupt change in semi-major axis (> threshold) indicates a maneuver — either collision avoidance, orbit maintenance, or orbit raising. Based on Kelecy (2007) and Patera & Peterson (2021). """ import json import math import numpy as np from pathlib import Path from datetime import datetime, timedelta, timezone # Earth parameters (WGS84) MU_EARTH = 398600.4418 # km^3/s^2 EARTH_RADIUS_KM = 6378.137 # Maneuver detection thresholds DEFAULT_DELTA_A_THRESHOLD_M = 200 # meters — below this is noise STARLINK_DELTA_A_THRESHOLD_M = 100 # Starlink maneuvers can be smaller def mean_motion_to_sma(n_rev_per_day: float) -> float: """Convert mean motion (rev/day) to semi-major axis (km).""" if n_rev_per_day <= 0: return 0.0 n_rad_per_sec = n_rev_per_day * 2 * math.pi / 86400.0 return (MU_EARTH / (n_rad_per_sec ** 2)) ** (1.0 / 3.0) def sma_to_altitude(sma_km: float) -> float: """Convert semi-major axis to approximate altitude (km).""" return sma_km - EARTH_RADIUS_KM def parse_tle_epoch(epoch_str: str) -> datetime: """Parse a CelesTrak JSON epoch string (ISO 8601 format).""" # CelesTrak uses: "2026-02-13T12:00:00.000000" for fmt in ("%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d"): try: return datetime.strptime(epoch_str, fmt) except ValueError: continue raise ValueError(f"Cannot parse epoch: {epoch_str}") def extract_orbital_elements(tle_json: dict) -> dict: """Extract key orbital elements from a CelesTrak JSON TLE entry.""" norad_id = int(tle_json.get("NORAD_CAT_ID", 0)) name = tle_json.get("OBJECT_NAME", "UNKNOWN") mean_motion = float(tle_json.get("MEAN_MOTION", 0)) eccentricity = float(tle_json.get("ECCENTRICITY", 0)) inclination = float(tle_json.get("INCLINATION", 0)) raan = float(tle_json.get("RA_OF_ASC_NODE", 0)) epoch_str = tle_json.get("EPOCH", "") sma = mean_motion_to_sma(mean_motion) altitude = sma_to_altitude(sma) epoch = None if epoch_str: try: epoch = parse_tle_epoch(epoch_str) except ValueError: pass return { "norad_id": norad_id, "name": name, "mean_motion": mean_motion, "eccentricity": eccentricity, "inclination": inclination, "raan": raan, "sma_km": sma, "altitude_km": altitude, "epoch": epoch, "epoch_str": epoch_str, } def detect_maneuvers( prev_tles: list[dict], curr_tles: list[dict], threshold_m: float = DEFAULT_DELTA_A_THRESHOLD_M, ) -> list[dict]: """Compare two TLE snapshots and detect maneuvers. Args: prev_tles: Previous TLE snapshot (CelesTrak JSON format) curr_tles: Current TLE snapshot (CelesTrak JSON format) threshold_m: Semi-major axis change threshold in meters Returns: List of detected maneuvers with details """ # Index previous TLEs by NORAD ID prev_by_id = {} for tle in prev_tles: elem = extract_orbital_elements(tle) if elem["norad_id"] > 0 and elem["sma_km"] > 0: prev_by_id[elem["norad_id"]] = elem maneuvers = [] for tle in curr_tles: elem = extract_orbital_elements(tle) norad_id = elem["norad_id"] if norad_id not in prev_by_id or elem["sma_km"] <= 0: continue prev = prev_by_id[norad_id] delta_a_km = elem["sma_km"] - prev["sma_km"] delta_a_m = abs(delta_a_km) * 1000 if delta_a_m > threshold_m: # Classify maneuver type if delta_a_km > 0: maneuver_type = "orbit_raise" else: maneuver_type = "orbit_lower" # Estimate delta-v (Hohmann approximation) v_circular = math.sqrt(MU_EARTH / prev["sma_km"]) # km/s delta_v = abs(delta_a_km) / (2 * prev["sma_km"]) * v_circular * 1000 # m/s maneuvers.append({ "norad_id": norad_id, "name": elem["name"], "prev_sma_km": prev["sma_km"], "curr_sma_km": elem["sma_km"], "delta_a_m": delta_a_m, "delta_a_km": delta_a_km, "delta_v_m_s": round(delta_v, 3), "maneuver_type": maneuver_type, "altitude_km": elem["altitude_km"], "prev_epoch": prev["epoch_str"], "curr_epoch": elem["epoch_str"], "detected_at": datetime.now(timezone.utc).isoformat(), }) # Sort by delta_a descending (largest maneuvers first) maneuvers.sort(key=lambda m: m["delta_a_m"], reverse=True) return maneuvers def detect_maneuvers_dual_threshold( prev_tles: list[dict], curr_tles: list[dict], ) -> list[dict]: """Detect maneuvers using constellation-aware thresholds. Uses 100m threshold for Starlink (smaller maneuvers) and 200m for everything else. Merges results, deduplicating by NORAD ID. """ # Split current TLEs by constellation starlink_curr = [] other_curr = [] for tle in curr_tles: name = tle.get("OBJECT_NAME", "") if "STARLINK" in name.upper(): starlink_curr.append(tle) else: other_curr.append(tle) # Split previous TLEs the same way starlink_prev = [] other_prev = [] for tle in prev_tles: name = tle.get("OBJECT_NAME", "") if "STARLINK" in name.upper(): starlink_prev.append(tle) else: other_prev.append(tle) # Detect with appropriate thresholds starlink_maneuvers = detect_maneuvers( starlink_prev, starlink_curr, threshold_m=STARLINK_DELTA_A_THRESHOLD_M, ) other_maneuvers = detect_maneuvers( other_prev, other_curr, threshold_m=DEFAULT_DELTA_A_THRESHOLD_M, ) # Merge and sort by delta_a descending all_maneuvers = starlink_maneuvers + other_maneuvers all_maneuvers.sort(key=lambda m: m["delta_a_m"], reverse=True) return all_maneuvers def load_tle_snapshot(path: Path) -> list[dict]: """Load a TLE snapshot from a JSON file.""" with open(path) as f: return json.load(f) def save_tle_snapshot(tles: list[dict], path: Path): """Save a TLE snapshot to a JSON file.""" path.parent.mkdir(parents=True, exist_ok=True) with open(path, "w") as f: json.dump(tles, f)