File size: 4,553 Bytes
a4b5ecb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
"""Classify detected satellite maneuvers into avoidance vs routine.



Enriches each maneuver with:

  - magnitude_class: micro/small/medium/large based on delta-v

  - constellation: starlink/oneweb/iridium/other

  - is_stationkeeping: regularity-based detection from maneuver history

  - likely_avoidance: heuristic combining all signals



These enrichments improve training label quality for PI-TFT fine-tuning

without changing the model's feature space.

"""

import re
import numpy as np
from datetime import datetime


# Delta-v magnitude bins (m/s)
MAGNITUDE_BINS = [
    ("micro", 0.0, 0.5),
    ("small", 0.5, 2.0),
    ("medium", 2.0, 10.0),
    ("large", 10.0, float("inf")),
]

# Constellation name patterns
CONSTELLATION_PATTERNS = [
    ("starlink", re.compile(r"STARLINK", re.IGNORECASE)),
    ("oneweb", re.compile(r"ONEWEB", re.IGNORECASE)),
    ("iridium", re.compile(r"IRIDIUM", re.IGNORECASE)),
]

# Stationkeeping regularity threshold (coefficient of variation of intervals)
STATIONKEEPING_CV_THRESHOLD = 0.3
MIN_HISTORY_FOR_SK = 3  # Need at least 3 past maneuvers to detect pattern


def classify_magnitude(delta_v_m_s: float) -> str:
    """Bin delta-v into magnitude class."""
    dv = abs(delta_v_m_s)
    for label, lo, hi in MAGNITUDE_BINS:
        if lo <= dv < hi:
            return label
    return "large"


def detect_constellation(name: str) -> str:
    """Identify constellation from satellite name."""
    for constellation, pattern in CONSTELLATION_PATTERNS:
        if pattern.search(name):
            return constellation
    return "other"


def detect_stationkeeping(history: list[dict]) -> bool:
    """Detect stationkeeping from regularity of past maneuver intervals.



    If the coefficient of variation (std/mean) of time intervals between

    consecutive maneuvers is below threshold, it's likely stationkeeping.



    Args:

        history: Past maneuver records for this NORAD ID, each with

                 'detected_at' ISO timestamp.



    Returns:

        True if maneuver pattern suggests stationkeeping.

    """
    if not history or len(history) < MIN_HISTORY_FOR_SK:
        return False

    # Parse timestamps and sort
    timestamps = []
    for h in history:
        ts_str = h.get("detected_at", "")
        if not ts_str:
            continue
        try:
            ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
            timestamps.append(ts.timestamp())
        except (ValueError, TypeError):
            continue

    if len(timestamps) < MIN_HISTORY_FOR_SK:
        return False

    timestamps.sort()
    intervals = np.diff(timestamps)

    if len(intervals) < 2:
        return False

    mean_interval = np.mean(intervals)
    if mean_interval <= 0:
        return False

    cv = np.std(intervals) / mean_interval
    return cv < STATIONKEEPING_CV_THRESHOLD


def classify_maneuver(maneuver: dict, history: list[dict] = None) -> dict:
    """Classify a detected maneuver with enrichment flags.



    Args:

        maneuver: Maneuver dict from detect_maneuvers() with keys:

                  norad_id, name, delta_v_m_s, delta_a_m, etc.

        history: Past maneuver records for same NORAD ID (optional).



    Returns:

        Dict with enrichment fields added to the original maneuver.

    """
    delta_v = maneuver.get("delta_v_m_s", 0.0)
    name = maneuver.get("name", "")

    magnitude_class = classify_magnitude(delta_v)
    constellation = detect_constellation(name)
    is_sk = detect_stationkeeping(history) if history else False

    # Likely avoidance heuristic
    likely_avoidance = False

    if not is_sk and magnitude_class in ("micro", "small") and delta_v < 5.0:
        likely_avoidance = True

    # Starlink CAMs are typically very small (< 1 m/s)
    if constellation == "starlink" and delta_v < 1.0:
        likely_avoidance = True

    enriched = dict(maneuver)
    enriched.update({
        "magnitude_class": magnitude_class,
        "constellation": constellation,
        "is_stationkeeping": is_sk,
        "likely_avoidance": likely_avoidance,
        "enrichment_version": 1,
        # Phase B/C defaults — overwritten later if data is available
        "has_cdm": False,
        "cdm_pc": None,
        "cdm_miss_distance_km": None,
        "counterfactual_min_distance_km": None,
        "would_have_collided": False,
        "counterfactual_closest_norad": None,
    })
    return enriched