File size: 6,784 Bytes
a4b5ecb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# Generated by Claude Code -- 2026-02-13
"""Detect satellite maneuvers from TLE data changes.



Compares successive TLEs for the same satellite. An abrupt change in

semi-major axis (> threshold) indicates a maneuver — either collision

avoidance, orbit maintenance, or orbit raising.



Based on Kelecy (2007) and Patera & Peterson (2021).

"""

import json
import math
import numpy as np
from pathlib import Path
from datetime import datetime, timedelta, timezone


# Earth parameters (WGS84)
MU_EARTH = 398600.4418  # km^3/s^2
EARTH_RADIUS_KM = 6378.137

# Maneuver detection thresholds
DEFAULT_DELTA_A_THRESHOLD_M = 200  # meters — below this is noise
STARLINK_DELTA_A_THRESHOLD_M = 100  # Starlink maneuvers can be smaller


def mean_motion_to_sma(n_rev_per_day: float) -> float:
    """Convert mean motion (rev/day) to semi-major axis (km)."""
    if n_rev_per_day <= 0:
        return 0.0
    n_rad_per_sec = n_rev_per_day * 2 * math.pi / 86400.0
    return (MU_EARTH / (n_rad_per_sec ** 2)) ** (1.0 / 3.0)


def sma_to_altitude(sma_km: float) -> float:
    """Convert semi-major axis to approximate altitude (km)."""
    return sma_km - EARTH_RADIUS_KM


def parse_tle_epoch(epoch_str: str) -> datetime:
    """Parse a CelesTrak JSON epoch string (ISO 8601 format)."""
    # CelesTrak uses: "2026-02-13T12:00:00.000000"
    for fmt in ("%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d"):
        try:
            return datetime.strptime(epoch_str, fmt)
        except ValueError:
            continue
    raise ValueError(f"Cannot parse epoch: {epoch_str}")


def extract_orbital_elements(tle_json: dict) -> dict:
    """Extract key orbital elements from a CelesTrak JSON TLE entry."""
    norad_id = int(tle_json.get("NORAD_CAT_ID", 0))
    name = tle_json.get("OBJECT_NAME", "UNKNOWN")
    mean_motion = float(tle_json.get("MEAN_MOTION", 0))
    eccentricity = float(tle_json.get("ECCENTRICITY", 0))
    inclination = float(tle_json.get("INCLINATION", 0))
    raan = float(tle_json.get("RA_OF_ASC_NODE", 0))
    epoch_str = tle_json.get("EPOCH", "")

    sma = mean_motion_to_sma(mean_motion)
    altitude = sma_to_altitude(sma)

    epoch = None
    if epoch_str:
        try:
            epoch = parse_tle_epoch(epoch_str)
        except ValueError:
            pass

    return {
        "norad_id": norad_id,
        "name": name,
        "mean_motion": mean_motion,
        "eccentricity": eccentricity,
        "inclination": inclination,
        "raan": raan,
        "sma_km": sma,
        "altitude_km": altitude,
        "epoch": epoch,
        "epoch_str": epoch_str,
    }


def detect_maneuvers(

    prev_tles: list[dict],

    curr_tles: list[dict],

    threshold_m: float = DEFAULT_DELTA_A_THRESHOLD_M,

) -> list[dict]:
    """Compare two TLE snapshots and detect maneuvers.



    Args:

        prev_tles: Previous TLE snapshot (CelesTrak JSON format)

        curr_tles: Current TLE snapshot (CelesTrak JSON format)

        threshold_m: Semi-major axis change threshold in meters



    Returns:

        List of detected maneuvers with details

    """
    # Index previous TLEs by NORAD ID
    prev_by_id = {}
    for tle in prev_tles:
        elem = extract_orbital_elements(tle)
        if elem["norad_id"] > 0 and elem["sma_km"] > 0:
            prev_by_id[elem["norad_id"]] = elem

    maneuvers = []
    for tle in curr_tles:
        elem = extract_orbital_elements(tle)
        norad_id = elem["norad_id"]

        if norad_id not in prev_by_id or elem["sma_km"] <= 0:
            continue

        prev = prev_by_id[norad_id]
        delta_a_km = elem["sma_km"] - prev["sma_km"]
        delta_a_m = abs(delta_a_km) * 1000

        if delta_a_m > threshold_m:
            # Classify maneuver type
            if delta_a_km > 0:
                maneuver_type = "orbit_raise"
            else:
                maneuver_type = "orbit_lower"

            # Estimate delta-v (Hohmann approximation)
            v_circular = math.sqrt(MU_EARTH / prev["sma_km"])  # km/s
            delta_v = abs(delta_a_km) / (2 * prev["sma_km"]) * v_circular * 1000  # m/s

            maneuvers.append({
                "norad_id": norad_id,
                "name": elem["name"],
                "prev_sma_km": prev["sma_km"],
                "curr_sma_km": elem["sma_km"],
                "delta_a_m": delta_a_m,
                "delta_a_km": delta_a_km,
                "delta_v_m_s": round(delta_v, 3),
                "maneuver_type": maneuver_type,
                "altitude_km": elem["altitude_km"],
                "prev_epoch": prev["epoch_str"],
                "curr_epoch": elem["epoch_str"],
                "detected_at": datetime.now(timezone.utc).isoformat(),
            })

    # Sort by delta_a descending (largest maneuvers first)
    maneuvers.sort(key=lambda m: m["delta_a_m"], reverse=True)
    return maneuvers


def detect_maneuvers_dual_threshold(

    prev_tles: list[dict],

    curr_tles: list[dict],

) -> list[dict]:
    """Detect maneuvers using constellation-aware thresholds.



    Uses 100m threshold for Starlink (smaller maneuvers) and

    200m for everything else. Merges results, deduplicating by NORAD ID.

    """
    # Split current TLEs by constellation
    starlink_curr = []
    other_curr = []
    for tle in curr_tles:
        name = tle.get("OBJECT_NAME", "")
        if "STARLINK" in name.upper():
            starlink_curr.append(tle)
        else:
            other_curr.append(tle)

    # Split previous TLEs the same way
    starlink_prev = []
    other_prev = []
    for tle in prev_tles:
        name = tle.get("OBJECT_NAME", "")
        if "STARLINK" in name.upper():
            starlink_prev.append(tle)
        else:
            other_prev.append(tle)

    # Detect with appropriate thresholds
    starlink_maneuvers = detect_maneuvers(
        starlink_prev, starlink_curr,
        threshold_m=STARLINK_DELTA_A_THRESHOLD_M,
    )
    other_maneuvers = detect_maneuvers(
        other_prev, other_curr,
        threshold_m=DEFAULT_DELTA_A_THRESHOLD_M,
    )

    # Merge and sort by delta_a descending
    all_maneuvers = starlink_maneuvers + other_maneuvers
    all_maneuvers.sort(key=lambda m: m["delta_a_m"], reverse=True)
    return all_maneuvers


def load_tle_snapshot(path: Path) -> list[dict]:
    """Load a TLE snapshot from a JSON file."""
    with open(path) as f:
        return json.load(f)


def save_tle_snapshot(tles: list[dict], path: Path):
    """Save a TLE snapshot to a JSON file."""
    path.parent.mkdir(parents=True, exist_ok=True)
    with open(path, "w") as f:
        json.dump(tles, f)