Spaces:
Running
Running
File size: 6,784 Bytes
a4b5ecb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 | # Generated by Claude Code -- 2026-02-13
"""Detect satellite maneuvers from TLE data changes.
Compares successive TLEs for the same satellite. An abrupt change in
semi-major axis (> threshold) indicates a maneuver — either collision
avoidance, orbit maintenance, or orbit raising.
Based on Kelecy (2007) and Patera & Peterson (2021).
"""
import json
import math
import numpy as np
from pathlib import Path
from datetime import datetime, timedelta, timezone
# Earth parameters (WGS84)
MU_EARTH = 398600.4418 # km^3/s^2
EARTH_RADIUS_KM = 6378.137
# Maneuver detection thresholds
DEFAULT_DELTA_A_THRESHOLD_M = 200 # meters — below this is noise
STARLINK_DELTA_A_THRESHOLD_M = 100 # Starlink maneuvers can be smaller
def mean_motion_to_sma(n_rev_per_day: float) -> float:
"""Convert mean motion (rev/day) to semi-major axis (km)."""
if n_rev_per_day <= 0:
return 0.0
n_rad_per_sec = n_rev_per_day * 2 * math.pi / 86400.0
return (MU_EARTH / (n_rad_per_sec ** 2)) ** (1.0 / 3.0)
def sma_to_altitude(sma_km: float) -> float:
"""Convert semi-major axis to approximate altitude (km)."""
return sma_km - EARTH_RADIUS_KM
def parse_tle_epoch(epoch_str: str) -> datetime:
"""Parse a CelesTrak JSON epoch string (ISO 8601 format)."""
# CelesTrak uses: "2026-02-13T12:00:00.000000"
for fmt in ("%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d"):
try:
return datetime.strptime(epoch_str, fmt)
except ValueError:
continue
raise ValueError(f"Cannot parse epoch: {epoch_str}")
def extract_orbital_elements(tle_json: dict) -> dict:
"""Extract key orbital elements from a CelesTrak JSON TLE entry."""
norad_id = int(tle_json.get("NORAD_CAT_ID", 0))
name = tle_json.get("OBJECT_NAME", "UNKNOWN")
mean_motion = float(tle_json.get("MEAN_MOTION", 0))
eccentricity = float(tle_json.get("ECCENTRICITY", 0))
inclination = float(tle_json.get("INCLINATION", 0))
raan = float(tle_json.get("RA_OF_ASC_NODE", 0))
epoch_str = tle_json.get("EPOCH", "")
sma = mean_motion_to_sma(mean_motion)
altitude = sma_to_altitude(sma)
epoch = None
if epoch_str:
try:
epoch = parse_tle_epoch(epoch_str)
except ValueError:
pass
return {
"norad_id": norad_id,
"name": name,
"mean_motion": mean_motion,
"eccentricity": eccentricity,
"inclination": inclination,
"raan": raan,
"sma_km": sma,
"altitude_km": altitude,
"epoch": epoch,
"epoch_str": epoch_str,
}
def detect_maneuvers(
prev_tles: list[dict],
curr_tles: list[dict],
threshold_m: float = DEFAULT_DELTA_A_THRESHOLD_M,
) -> list[dict]:
"""Compare two TLE snapshots and detect maneuvers.
Args:
prev_tles: Previous TLE snapshot (CelesTrak JSON format)
curr_tles: Current TLE snapshot (CelesTrak JSON format)
threshold_m: Semi-major axis change threshold in meters
Returns:
List of detected maneuvers with details
"""
# Index previous TLEs by NORAD ID
prev_by_id = {}
for tle in prev_tles:
elem = extract_orbital_elements(tle)
if elem["norad_id"] > 0 and elem["sma_km"] > 0:
prev_by_id[elem["norad_id"]] = elem
maneuvers = []
for tle in curr_tles:
elem = extract_orbital_elements(tle)
norad_id = elem["norad_id"]
if norad_id not in prev_by_id or elem["sma_km"] <= 0:
continue
prev = prev_by_id[norad_id]
delta_a_km = elem["sma_km"] - prev["sma_km"]
delta_a_m = abs(delta_a_km) * 1000
if delta_a_m > threshold_m:
# Classify maneuver type
if delta_a_km > 0:
maneuver_type = "orbit_raise"
else:
maneuver_type = "orbit_lower"
# Estimate delta-v (Hohmann approximation)
v_circular = math.sqrt(MU_EARTH / prev["sma_km"]) # km/s
delta_v = abs(delta_a_km) / (2 * prev["sma_km"]) * v_circular * 1000 # m/s
maneuvers.append({
"norad_id": norad_id,
"name": elem["name"],
"prev_sma_km": prev["sma_km"],
"curr_sma_km": elem["sma_km"],
"delta_a_m": delta_a_m,
"delta_a_km": delta_a_km,
"delta_v_m_s": round(delta_v, 3),
"maneuver_type": maneuver_type,
"altitude_km": elem["altitude_km"],
"prev_epoch": prev["epoch_str"],
"curr_epoch": elem["epoch_str"],
"detected_at": datetime.now(timezone.utc).isoformat(),
})
# Sort by delta_a descending (largest maneuvers first)
maneuvers.sort(key=lambda m: m["delta_a_m"], reverse=True)
return maneuvers
def detect_maneuvers_dual_threshold(
prev_tles: list[dict],
curr_tles: list[dict],
) -> list[dict]:
"""Detect maneuvers using constellation-aware thresholds.
Uses 100m threshold for Starlink (smaller maneuvers) and
200m for everything else. Merges results, deduplicating by NORAD ID.
"""
# Split current TLEs by constellation
starlink_curr = []
other_curr = []
for tle in curr_tles:
name = tle.get("OBJECT_NAME", "")
if "STARLINK" in name.upper():
starlink_curr.append(tle)
else:
other_curr.append(tle)
# Split previous TLEs the same way
starlink_prev = []
other_prev = []
for tle in prev_tles:
name = tle.get("OBJECT_NAME", "")
if "STARLINK" in name.upper():
starlink_prev.append(tle)
else:
other_prev.append(tle)
# Detect with appropriate thresholds
starlink_maneuvers = detect_maneuvers(
starlink_prev, starlink_curr,
threshold_m=STARLINK_DELTA_A_THRESHOLD_M,
)
other_maneuvers = detect_maneuvers(
other_prev, other_curr,
threshold_m=DEFAULT_DELTA_A_THRESHOLD_M,
)
# Merge and sort by delta_a descending
all_maneuvers = starlink_maneuvers + other_maneuvers
all_maneuvers.sort(key=lambda m: m["delta_a_m"], reverse=True)
return all_maneuvers
def load_tle_snapshot(path: Path) -> list[dict]:
"""Load a TLE snapshot from a JSON file."""
with open(path) as f:
return json.load(f)
def save_tle_snapshot(tles: list[dict], path: Path):
"""Save a TLE snapshot to a JSON file."""
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
json.dump(tles, f)
|