Spaces:
Sleeping
Sleeping
| # Generated by Claude Code -- 2026-02-13 | |
| """Detect satellite maneuvers from TLE data changes. | |
| Compares successive TLEs for the same satellite. An abrupt change in | |
| semi-major axis (> threshold) indicates a maneuver — either collision | |
| avoidance, orbit maintenance, or orbit raising. | |
| Based on Kelecy (2007) and Patera & Peterson (2021). | |
| """ | |
| import json | |
| import math | |
| import numpy as np | |
| from pathlib import Path | |
| from datetime import datetime, timedelta, timezone | |
| # Earth parameters (WGS84) | |
| MU_EARTH = 398600.4418 # km^3/s^2 | |
| EARTH_RADIUS_KM = 6378.137 | |
| # Maneuver detection thresholds | |
| DEFAULT_DELTA_A_THRESHOLD_M = 200 # meters — below this is noise | |
| STARLINK_DELTA_A_THRESHOLD_M = 100 # Starlink maneuvers can be smaller | |
| def mean_motion_to_sma(n_rev_per_day: float) -> float: | |
| """Convert mean motion (rev/day) to semi-major axis (km).""" | |
| if n_rev_per_day <= 0: | |
| return 0.0 | |
| n_rad_per_sec = n_rev_per_day * 2 * math.pi / 86400.0 | |
| return (MU_EARTH / (n_rad_per_sec ** 2)) ** (1.0 / 3.0) | |
| def sma_to_altitude(sma_km: float) -> float: | |
| """Convert semi-major axis to approximate altitude (km).""" | |
| return sma_km - EARTH_RADIUS_KM | |
| def parse_tle_epoch(epoch_str: str) -> datetime: | |
| """Parse a CelesTrak JSON epoch string (ISO 8601 format).""" | |
| # CelesTrak uses: "2026-02-13T12:00:00.000000" | |
| for fmt in ("%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d"): | |
| try: | |
| return datetime.strptime(epoch_str, fmt) | |
| except ValueError: | |
| continue | |
| raise ValueError(f"Cannot parse epoch: {epoch_str}") | |
| def extract_orbital_elements(tle_json: dict) -> dict: | |
| """Extract key orbital elements from a CelesTrak JSON TLE entry.""" | |
| norad_id = int(tle_json.get("NORAD_CAT_ID", 0)) | |
| name = tle_json.get("OBJECT_NAME", "UNKNOWN") | |
| mean_motion = float(tle_json.get("MEAN_MOTION", 0)) | |
| eccentricity = float(tle_json.get("ECCENTRICITY", 0)) | |
| inclination = float(tle_json.get("INCLINATION", 0)) | |
| raan = float(tle_json.get("RA_OF_ASC_NODE", 0)) | |
| epoch_str = tle_json.get("EPOCH", "") | |
| sma = mean_motion_to_sma(mean_motion) | |
| altitude = sma_to_altitude(sma) | |
| epoch = None | |
| if epoch_str: | |
| try: | |
| epoch = parse_tle_epoch(epoch_str) | |
| except ValueError: | |
| pass | |
| return { | |
| "norad_id": norad_id, | |
| "name": name, | |
| "mean_motion": mean_motion, | |
| "eccentricity": eccentricity, | |
| "inclination": inclination, | |
| "raan": raan, | |
| "sma_km": sma, | |
| "altitude_km": altitude, | |
| "epoch": epoch, | |
| "epoch_str": epoch_str, | |
| } | |
| def detect_maneuvers( | |
| prev_tles: list[dict], | |
| curr_tles: list[dict], | |
| threshold_m: float = DEFAULT_DELTA_A_THRESHOLD_M, | |
| ) -> list[dict]: | |
| """Compare two TLE snapshots and detect maneuvers. | |
| Args: | |
| prev_tles: Previous TLE snapshot (CelesTrak JSON format) | |
| curr_tles: Current TLE snapshot (CelesTrak JSON format) | |
| threshold_m: Semi-major axis change threshold in meters | |
| Returns: | |
| List of detected maneuvers with details | |
| """ | |
| # Index previous TLEs by NORAD ID | |
| prev_by_id = {} | |
| for tle in prev_tles: | |
| elem = extract_orbital_elements(tle) | |
| if elem["norad_id"] > 0 and elem["sma_km"] > 0: | |
| prev_by_id[elem["norad_id"]] = elem | |
| maneuvers = [] | |
| for tle in curr_tles: | |
| elem = extract_orbital_elements(tle) | |
| norad_id = elem["norad_id"] | |
| if norad_id not in prev_by_id or elem["sma_km"] <= 0: | |
| continue | |
| prev = prev_by_id[norad_id] | |
| delta_a_km = elem["sma_km"] - prev["sma_km"] | |
| delta_a_m = abs(delta_a_km) * 1000 | |
| if delta_a_m > threshold_m: | |
| # Classify maneuver type | |
| if delta_a_km > 0: | |
| maneuver_type = "orbit_raise" | |
| else: | |
| maneuver_type = "orbit_lower" | |
| # Estimate delta-v (Hohmann approximation) | |
| v_circular = math.sqrt(MU_EARTH / prev["sma_km"]) # km/s | |
| delta_v = abs(delta_a_km) / (2 * prev["sma_km"]) * v_circular * 1000 # m/s | |
| maneuvers.append({ | |
| "norad_id": norad_id, | |
| "name": elem["name"], | |
| "prev_sma_km": prev["sma_km"], | |
| "curr_sma_km": elem["sma_km"], | |
| "delta_a_m": delta_a_m, | |
| "delta_a_km": delta_a_km, | |
| "delta_v_m_s": round(delta_v, 3), | |
| "maneuver_type": maneuver_type, | |
| "altitude_km": elem["altitude_km"], | |
| "prev_epoch": prev["epoch_str"], | |
| "curr_epoch": elem["epoch_str"], | |
| "detected_at": datetime.now(timezone.utc).isoformat(), | |
| }) | |
| # Sort by delta_a descending (largest maneuvers first) | |
| maneuvers.sort(key=lambda m: m["delta_a_m"], reverse=True) | |
| return maneuvers | |
| def detect_maneuvers_dual_threshold( | |
| prev_tles: list[dict], | |
| curr_tles: list[dict], | |
| ) -> list[dict]: | |
| """Detect maneuvers using constellation-aware thresholds. | |
| Uses 100m threshold for Starlink (smaller maneuvers) and | |
| 200m for everything else. Merges results, deduplicating by NORAD ID. | |
| """ | |
| # Split current TLEs by constellation | |
| starlink_curr = [] | |
| other_curr = [] | |
| for tle in curr_tles: | |
| name = tle.get("OBJECT_NAME", "") | |
| if "STARLINK" in name.upper(): | |
| starlink_curr.append(tle) | |
| else: | |
| other_curr.append(tle) | |
| # Split previous TLEs the same way | |
| starlink_prev = [] | |
| other_prev = [] | |
| for tle in prev_tles: | |
| name = tle.get("OBJECT_NAME", "") | |
| if "STARLINK" in name.upper(): | |
| starlink_prev.append(tle) | |
| else: | |
| other_prev.append(tle) | |
| # Detect with appropriate thresholds | |
| starlink_maneuvers = detect_maneuvers( | |
| starlink_prev, starlink_curr, | |
| threshold_m=STARLINK_DELTA_A_THRESHOLD_M, | |
| ) | |
| other_maneuvers = detect_maneuvers( | |
| other_prev, other_curr, | |
| threshold_m=DEFAULT_DELTA_A_THRESHOLD_M, | |
| ) | |
| # Merge and sort by delta_a descending | |
| all_maneuvers = starlink_maneuvers + other_maneuvers | |
| all_maneuvers.sort(key=lambda m: m["delta_a_m"], reverse=True) | |
| return all_maneuvers | |
| def load_tle_snapshot(path: Path) -> list[dict]: | |
| """Load a TLE snapshot from a JSON file.""" | |
| with open(path) as f: | |
| return json.load(f) | |
| def save_tle_snapshot(tles: list[dict], path: Path): | |
| """Save a TLE snapshot to a JSON file.""" | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(path, "w") as f: | |
| json.dump(tles, f) | |