"""SGP4 counterfactual propagation — "what if no maneuver?" simulation. For each likely-avoidance maneuver, propagates the pre-maneuver TLE forward to estimate whether a close approach would have occurred. This generates counterfactual "would-have-collided" labels for training enrichment. Uses the sgp4 library for efficient satellite propagation. """ import math import numpy as np from datetime import datetime, timedelta, timezone try: from sgp4.api import Satrec, WGS72 from sgp4 import exporter SGP4_AVAILABLE = True except ImportError: SGP4_AVAILABLE = False # Earth parameters EARTH_RADIUS_KM = 6378.137 # Counterfactual thresholds COLLISION_THRESHOLD_KM = 1.0 # "Would have collided" if closer than this NEARBY_ALT_BAND_KM = 50.0 # Altitude proximity for neighbor selection NEARBY_RAAN_BAND_DEG = 30.0 # RAAN proximity for neighbor selection def celestrak_json_to_satrec(tle_json: dict) -> "Satrec": """Convert a CelesTrak GP JSON record to an sgp4 Satrec object. CelesTrak JSON includes TLE_LINE1/TLE_LINE2 when available. Falls back to constructing from orbital elements via sgp4init(). Args: tle_json: CelesTrak GP JSON dict with orbital elements. Returns: sgp4 Satrec object ready for propagation. Raises: ImportError: If sgp4 is not installed. ValueError: If TLE data is insufficient. """ if not SGP4_AVAILABLE: raise ImportError("sgp4 library is required: pip install sgp4") # Prefer TLE lines if available (most reliable) line1 = tle_json.get("TLE_LINE1", "") line2 = tle_json.get("TLE_LINE2", "") if line1 and line2: return Satrec.twoline2rv(line1, line2) # Construct from JSON orbital elements using sgp4init satrec = Satrec() # Parse epoch epoch_str = tle_json.get("EPOCH", "") if not epoch_str: raise ValueError("No EPOCH in TLE JSON") epoch_dt = datetime.fromisoformat(epoch_str.replace("Z", "+00:00")) if epoch_dt.tzinfo is None: epoch_dt = epoch_dt.replace(tzinfo=timezone.utc) # Convert to Julian date pair for sgp4 year = epoch_dt.year mon = epoch_dt.month day = epoch_dt.day hr = epoch_dt.hour minute = epoch_dt.minute sec = epoch_dt.second + epoch_dt.microsecond / 1e6 # sgp4init expects elements in specific units no_kozai = float(tle_json.get("MEAN_MOTION", 0)) * (2.0 * math.pi / 1440.0) # rev/day -> rad/min ecco = float(tle_json.get("ECCENTRICITY", 0)) inclo = math.radians(float(tle_json.get("INCLINATION", 0))) nodeo = math.radians(float(tle_json.get("RA_OF_ASC_NODE", 0))) argpo = math.radians(float(tle_json.get("ARG_OF_PERICENTER", 0))) mo = math.radians(float(tle_json.get("MEAN_ANOMALY", 0))) bstar = float(tle_json.get("BSTAR", 0)) norad_id = int(tle_json.get("NORAD_CAT_ID", 0)) # Epoch in Julian date jd_base = _datetime_to_jd(epoch_dt) epoch_jd = jd_base # sgp4init epoch is minutes since 1949-12-31 00:00 UTC # But the Python API uses (jdsatepoch, jdsatepochF) pair jd_whole = int(epoch_jd) jd_frac = epoch_jd - jd_whole satrec.sgp4init( WGS72, # gravity model 'i', # 'a' = old AFSPC mode, 'i' = improved norad_id, # NORAD catalog number (epoch_jd - 2433281.5), # epoch in days since 1949 Dec 31 00:00 UT bstar, # BSTAR drag term 0.0, # ndot (not used in sgp4init 'i' mode) 0.0, # nddot (not used) ecco, # eccentricity argpo, # argument of perigee (radians) inclo, # inclination (radians) mo, # mean anomaly (radians) no_kozai, # mean motion (radians/minute) nodeo, # RAAN (radians) ) return satrec def _datetime_to_jd(dt: datetime) -> float: """Convert datetime to Julian Date.""" if dt.tzinfo is not None: dt = dt.astimezone(timezone.utc).replace(tzinfo=None) a = (14 - dt.month) // 12 y = dt.year + 4800 - a m = dt.month + 12 * a - 3 jdn = dt.day + (153 * m + 2) // 5 + 365 * y + y // 4 - y // 100 + y // 400 - 32045 jd = jdn + (dt.hour - 12) / 24.0 + dt.minute / 1440.0 + dt.second / 86400.0 return jd def _propagate_positions(satrec: "Satrec", start_jd: float, hours: float, step_min: float) -> np.ndarray: """Propagate a satellite and return position array (N x 3) in km. Returns empty array if propagation fails. """ n_steps = int(hours * 60 / step_min) + 1 positions = [] for i in range(n_steps): minutes_since_epoch = (start_jd - satrec.jdsatepoch - satrec.jdsatepochF) * 1440.0 + i * step_min e, r, v = satrec.sgp4(satrec.jdsatepoch, satrec.jdsatepochF + minutes_since_epoch / 1440.0) if e != 0: continue positions.append(r) if not positions: return np.array([]).reshape(0, 3) return np.array(positions) def find_nearby_satellites( maneuvered_tle: dict, all_tles: list[dict], alt_band_km: float = NEARBY_ALT_BAND_KM, raan_band_deg: float = NEARBY_RAAN_BAND_DEG, ) -> list[dict]: """Find satellites in similar orbital shell to the maneuvered object.""" from src.data.maneuver_detector import mean_motion_to_sma, sma_to_altitude norad_id = int(maneuvered_tle.get("NORAD_CAT_ID", 0)) mm = float(maneuvered_tle.get("MEAN_MOTION", 0)) target_alt = sma_to_altitude(mean_motion_to_sma(mm)) target_raan = float(maneuvered_tle.get("RA_OF_ASC_NODE", 0)) nearby = [] for tle in all_tles: tid = int(tle.get("NORAD_CAT_ID", 0)) if tid == norad_id or tid <= 0: continue t_mm = float(tle.get("MEAN_MOTION", 0)) t_alt = sma_to_altitude(mean_motion_to_sma(t_mm)) t_raan = float(tle.get("RA_OF_ASC_NODE", 0)) alt_diff = abs(t_alt - target_alt) raan_diff = abs(t_raan - target_raan) raan_diff = min(raan_diff, 360.0 - raan_diff) if alt_diff < alt_band_km and raan_diff < raan_band_deg: nearby.append(tle) return nearby def propagate_counterfactual( pre_maneuver_tle: dict, nearby_tles: list[dict], hours_forward: float = 24.0, step_minutes: float = 10.0, ) -> dict: """Simulate "what if no maneuver?" using SGP4 propagation. Propagates the pre-maneuver TLE (before orbit change) forward and checks for close approaches with nearby satellites. Args: pre_maneuver_tle: Yesterday's TLE for the maneuvered satellite. nearby_tles: Current TLEs for nearby satellites. hours_forward: How far to propagate (hours). step_minutes: Time step for propagation (minutes). Returns: Dict with: min_distance_km, time_of_closest_approach, would_have_collided, closest_norad_id, n_neighbors_checked. """ if not SGP4_AVAILABLE: return { "min_distance_km": None, "would_have_collided": False, "error": "sgp4 not installed", } try: target_sat = celestrak_json_to_satrec(pre_maneuver_tle) except (ValueError, Exception) as e: return { "min_distance_km": None, "would_have_collided": False, "error": f"target TLE parse failed: {e}", } # Use current time as propagation start now = datetime.now(timezone.utc) start_jd = _datetime_to_jd(now) # Propagate maneuvered satellite (pre-maneuver orbit) target_positions = _propagate_positions(target_sat, start_jd, hours_forward, step_minutes) if len(target_positions) == 0: return { "min_distance_km": None, "would_have_collided": False, "error": "target propagation failed", } global_min_dist = float("inf") closest_norad = 0 closest_time_offset_min = 0.0 n_checked = 0 for neighbor_tle in nearby_tles: try: neighbor_sat = celestrak_json_to_satrec(neighbor_tle) except (ValueError, Exception): continue neighbor_positions = _propagate_positions(neighbor_sat, start_jd, hours_forward, step_minutes) if len(neighbor_positions) == 0: continue n_checked += 1 # Compute distances at each timestep (use min of overlapping steps) n_common = min(len(target_positions), len(neighbor_positions)) diffs = target_positions[:n_common] - neighbor_positions[:n_common] distances = np.linalg.norm(diffs, axis=1) min_idx = np.argmin(distances) min_dist = distances[min_idx] if min_dist < global_min_dist: global_min_dist = min_dist closest_norad = int(neighbor_tle.get("NORAD_CAT_ID", 0)) closest_time_offset_min = min_idx * step_minutes if global_min_dist == float("inf"): return { "min_distance_km": None, "would_have_collided": False, "n_neighbors_checked": n_checked, "error": "no valid neighbors propagated", } tca_dt = now + timedelta(minutes=closest_time_offset_min) return { "min_distance_km": round(global_min_dist, 3), "time_of_closest_approach": tca_dt.isoformat(), "would_have_collided": global_min_dist < COLLISION_THRESHOLD_KM, "closest_norad_id": closest_norad, "n_neighbors_checked": n_checked, } def compute_forward_trajectory( tle_1: dict, tle_2: dict, hours_forward: float = 120.0, step_minutes: float = 20.0, ) -> list[dict] | None: """Compute full trajectory time series for two satellites. Returns list of trajectory points with ECI positions and separation distance, suitable for baking into the webapp alerts JSON so the frontend doesn't need to do SGP4 propagation or load TLE data. Args: tle_1: CelesTrak GP JSON for satellite 1. tle_2: CelesTrak GP JSON for satellite 2. hours_forward: How far to propagate (default 120h = 5 days). step_minutes: Time step for propagation (minutes). Returns: List of dicts with: h (hours from start), d (distance km), s1 [x,y,z] ECI km, s2 [x,y,z] ECI km. None if propagation fails. """ if not SGP4_AVAILABLE: return None try: sat1 = celestrak_json_to_satrec(tle_1) sat2 = celestrak_json_to_satrec(tle_2) except (ValueError, Exception): return None now = datetime.now(timezone.utc) start_jd = _datetime_to_jd(now) n_steps = int(hours_forward * 60 / step_minutes) + 1 points = [] for i in range(n_steps): mins = i * step_minutes target_jd = start_jd + mins / 1440.0 jd_whole = int(target_jd) jd_frac = target_jd - jd_whole e1, r1, _ = sat1.sgp4(jd_whole, jd_frac) e2, r2, _ = sat2.sgp4(jd_whole, jd_frac) if e1 != 0 or e2 != 0: continue if not all(math.isfinite(v) for v in r1 + r2): continue dx = r1[0] - r2[0] dy = r1[1] - r2[1] dz = r1[2] - r2[2] dist = math.sqrt(dx * dx + dy * dy + dz * dz) points.append({ "h": round(mins / 60.0, 2), "d": round(dist, 1), "s1": [round(r1[0], 1), round(r1[1], 1), round(r1[2], 1)], "s2": [round(r2[0], 1), round(r2[1], 1), round(r2[2], 1)], }) return points if points else None def compute_tca_trail( tle_1: dict, tle_2: dict, tca_hours: float, half_window_min: float = 30.0, step_minutes: float = 0.25, ) -> list[dict] | None: """Compute dense trail around TCA for globe orbital path visualization. Returns 15-sec resolution positions for ±30 min around TCA. Args: tle_1: CelesTrak GP JSON for satellite 1. tle_2: CelesTrak GP JSON for satellite 2. tca_hours: Hours from now to TCA (from compute_forward_tca). half_window_min: Half window in minutes around TCA. step_minutes: Time step in minutes. Returns: List of dicts with s1 [x,y,z] and s2 [x,y,z] ECI km. None if fails. """ if not SGP4_AVAILABLE: return None try: sat1 = celestrak_json_to_satrec(tle_1) sat2 = celestrak_json_to_satrec(tle_2) except (ValueError, Exception): return None now = datetime.now(timezone.utc) start_jd = _datetime_to_jd(now) tca_min = tca_hours * 60.0 t_start = tca_min - half_window_min t_end = tca_min + half_window_min n_steps = int((t_end - t_start) / step_minutes) + 1 trail = [] for i in range(n_steps): mins = t_start + i * step_minutes target_jd = start_jd + mins / 1440.0 jd_whole = int(target_jd) jd_frac = target_jd - jd_whole e1, r1, _ = sat1.sgp4(jd_whole, jd_frac) e2, r2, _ = sat2.sgp4(jd_whole, jd_frac) if e1 != 0 or e2 != 0: continue if not all(math.isfinite(v) for v in r1 + r2): continue dx = r1[0] - r2[0] dy = r1[1] - r2[1] dz = r1[2] - r2[2] dist = math.sqrt(dx * dx + dy * dy + dz * dz) trail.append({ "h": round(mins / 60.0, 3), "d": round(dist, 1), "s1": [round(r1[0], 1), round(r1[1], 1), round(r1[2], 1)], "s2": [round(r2[0], 1), round(r2[1], 1), round(r2[2], 1)], }) return trail if trail else None def compute_forward_tca( tle_1: dict, tle_2: dict, hours_forward: float = 120.0, step_minutes: float = 10.0, ) -> dict: """Compute forward Time of Closest Approach between two satellites. Propagates both satellites forward using SGP4 and finds the minimum separation distance and when it occurs. Args: tle_1: CelesTrak GP JSON for satellite 1. tle_2: CelesTrak GP JSON for satellite 2. hours_forward: How far to propagate (default 120h = 5 days). step_minutes: Time step for propagation (minutes). Returns: Dict with: tca_hours, tca_min_distance_km, or error. """ if not SGP4_AVAILABLE: return {"tca_hours": None, "tca_min_distance_km": None} try: sat1 = celestrak_json_to_satrec(tle_1) sat2 = celestrak_json_to_satrec(tle_2) except (ValueError, Exception) as e: return {"tca_hours": None, "tca_min_distance_km": None} now = datetime.now(timezone.utc) start_jd = _datetime_to_jd(now) pos1 = _propagate_positions(sat1, start_jd, hours_forward, step_minutes) pos2 = _propagate_positions(sat2, start_jd, hours_forward, step_minutes) if len(pos1) == 0 or len(pos2) == 0: return {"tca_hours": None, "tca_min_distance_km": None} n_common = min(len(pos1), len(pos2)) diffs = pos1[:n_common] - pos2[:n_common] distances = np.linalg.norm(diffs, axis=1) min_idx = int(np.argmin(distances)) min_dist = float(distances[min_idx]) tca_hours = min_idx * step_minutes / 60.0 return { "tca_hours": round(tca_hours, 1), "tca_min_distance_km": round(min_dist, 1), }