# Generated by Claude Code -- 2026-02-08 """Data augmentation for the conjunction prediction dataset. The fundamental problem: only 67 high-risk events out of 13,154 in training (0.5%). This module provides two augmentation strategies: 1. SPACE-TRACK INTEGRATION: Merge real high-risk CDMs from Space-Track's cdm_public feed. These have fewer features (16 vs 103) but provide real positive examples. 2. TIME-SERIES AUGMENTATION: Create synthetic variants of existing high-risk events by applying realistic perturbations: - Gaussian noise on covariance/position/velocity features - Temporal jittering (shift CDM creation times slightly) - Feature dropout (randomly zero out some features, simulating missing data) - Sequence truncation (remove early CDMs, simulating late detection) Both strategies are physics-aware: they don't generate impossible configurations (e.g., negative miss distances or covariance values). """ import numpy as np import pandas as pd from pathlib import Path def augment_event_noise( event_df: pd.DataFrame, noise_scale: float = 0.05, n_augments: int = 5, rng: np.random.Generator = None, ) -> list[pd.DataFrame]: """ Create n_augments noisy variants of a single conjunction event. Applies Gaussian noise to numeric features, scaled by each column's standard deviation within the event. Preserves event_id structure and ensures physical constraints (non-negative distances, etc.). """ if rng is None: rng = np.random.default_rng(42) # Identify numeric columns to perturb (exclude IDs and targets) exclude = {"event_id", "time_to_tca", "risk", "mission_id", "source"} numeric_cols = event_df.select_dtypes(include=[np.number]).columns perturb_cols = [c for c in numeric_cols if c not in exclude] augmented = [] for i in range(n_augments): aug = event_df.copy() for col in perturb_cols: values = aug[col].values.astype(float) col_std = np.std(values) if col_std < 1e-10: col_std = np.abs(np.mean(values)) * 0.01 + 1e-10 noise = rng.normal(0, noise_scale * col_std, size=len(values)) aug[col] = values + noise # Physical constraints if "miss_distance" in aug.columns: aug["miss_distance"] = aug["miss_distance"].clip(lower=0) if "relative_speed" in aug.columns: aug["relative_speed"] = aug["relative_speed"].clip(lower=0) # Ensure covariance sigma columns stay positive sigma_cols = [c for c in perturb_cols if "sigma" in c.lower()] for col in sigma_cols: aug[col] = aug[col].clip(lower=0) augmented.append(aug) return augmented def augment_event_truncate( event_df: pd.DataFrame, min_keep: int = 3, n_augments: int = 3, rng: np.random.Generator = None, ) -> list[pd.DataFrame]: """ Create truncated variants by removing early CDMs. Simulates late-detection scenarios where only the most recent CDMs are available (closer to TCA). """ if rng is None: rng = np.random.default_rng(42) # Sort by time_to_tca descending (first CDM = furthest from TCA) event_df = event_df.sort_values("time_to_tca", ascending=False) n_cdms = len(event_df) if n_cdms <= min_keep: return [] augmented = [] for _ in range(n_augments): # Keep between min_keep and n_cdms-1 CDMs (always keep the last few) n_keep = rng.integers(min_keep, n_cdms) aug = event_df.iloc[-n_keep:].copy() augmented.append(aug) return augmented def augment_positive_events( df: pd.DataFrame, target_ratio: float = 0.05, noise_scale: float = 0.05, seed: int = 42, ) -> pd.DataFrame: """ Augment the positive (high-risk) class to reach target_ratio. Args: df: full training DataFrame with event_id, risk columns target_ratio: desired fraction of high-risk events (default 5%) noise_scale: std dev of Gaussian noise as fraction of feature std seed: random seed Returns: Augmented DataFrame with new synthetic positive events appended """ rng = np.random.default_rng(seed) # Find positive events event_risks = df.groupby("event_id")["risk"].last() pos_event_ids = event_risks[event_risks > -5].index.tolist() neg_event_ids = event_risks[event_risks <= -5].index.tolist() n_pos = len(pos_event_ids) n_neg = len(neg_event_ids) n_total = n_pos + n_neg # How many positive events do we need? target_pos = int(target_ratio * (n_total / (1 - target_ratio))) n_needed = max(0, target_pos - n_pos) if n_needed == 0: print(f"Already at target ratio ({n_pos}/{n_total} = {n_pos/n_total:.1%})") return df print(f"Augmenting: {n_pos} positive events → {n_pos + n_needed} " f"(target {target_ratio:.0%} of {n_total + n_needed})") # Generate augmented events max_event_id = df["event_id"].max() augmented_dfs = [] generated = 0 while generated < n_needed: # Pick a random positive event to augment src_event_id = rng.choice(pos_event_ids) src_event = df[df["event_id"] == src_event_id] # Apply noise augmentation aug_variants = augment_event_noise( src_event, noise_scale=noise_scale, n_augments=1, rng=rng ) # Also try truncation sometimes if rng.random() < 0.3 and len(src_event) > 3: trunc_variants = augment_event_truncate( src_event, n_augments=1, rng=rng ) aug_variants.extend(trunc_variants) for aug_df in aug_variants: if generated >= n_needed: break max_event_id += 1 aug_df = aug_df.copy() aug_df["event_id"] = max_event_id aug_df["source"] = "augmented" augmented_dfs.append(aug_df) generated += 1 if augmented_dfs: augmented = pd.concat(augmented_dfs, ignore_index=True) result = pd.concat([df, augmented], ignore_index=True) # Verify event_risks = result.groupby("event_id")["risk"].last() new_pos = (event_risks > -5).sum() new_total = len(event_risks) print(f"Result: {new_pos} positive / {new_total} total " f"({new_pos/new_total:.1%})") return result return df def integrate_spacetrack_positives( kelvins_df: pd.DataFrame, spacetrack_path: Path, ) -> pd.DataFrame: """ Add Space-Track emergency CDMs as additional positive training examples. Since Space-Track cdm_public has only 16 features vs Kelvins' 103, missing features are filled with 0. The model will learn to use whatever features are available. """ if not spacetrack_path.exists(): print(f"No Space-Track data at {spacetrack_path}") return kelvins_df from src.data.merge_sources import ( load_spacetrack_cdms, group_into_events, merge_datasets ) st_df = load_spacetrack_cdms(spacetrack_path) st_df = group_into_events(st_df) merged = merge_datasets(kelvins_df, st_df) return merged def build_augmented_training_set( data_dir: Path, target_positive_ratio: float = 0.05, noise_scale: float = 0.05, seed: int = 42, ) -> tuple[pd.DataFrame, pd.DataFrame]: """ Build the full augmented training set from all available sources. Steps: 1. Load ESA Kelvins train/test 2. Merge Space-Track emergency CDMs into training set 3. Apply time-series augmentation to positive events 4. Return (augmented_train, original_test) Test set is NEVER augmented — it stays as Kelvins-only for fair evaluation. """ from src.data.cdm_loader import load_dataset print("=" * 60) print(" Building Augmented Training Set") print("=" * 60) # Step 1: Load Kelvins print("\n1. Loading ESA Kelvins dataset ...") train_df, test_df = load_dataset(data_dir / "cdm") # Defragment and tag source train_df = train_df.copy() test_df = test_df.copy() train_df["source"] = "kelvins" test_df["source"] = "kelvins" # Count initial positives event_risks = train_df.groupby("event_id")["risk"].last() n_pos_initial = (event_risks > -5).sum() n_total_initial = len(event_risks) print(f" Initial: {n_pos_initial} positive / {n_total_initial} total " f"({n_pos_initial/n_total_initial:.2%})") # Step 2: Space-Track integration st_path = data_dir / "cdm_spacetrack" / "cdm_spacetrack_emergency.csv" if st_path.exists(): print(f"\n2. Integrating Space-Track emergency CDMs ...") train_df = integrate_spacetrack_positives(train_df, st_path) else: print(f"\n2. No Space-Track data found (skipping)") # Step 3: Time-series augmentation print(f"\n3. Augmenting positive events (target ratio: {target_positive_ratio:.0%}) ...") train_df = augment_positive_events( train_df, target_ratio=target_positive_ratio, noise_scale=noise_scale, seed=seed, ) # Final stats event_risks = train_df.groupby("event_id")["risk"].last() event_sources = train_df.groupby("event_id")["source"].first() n_kelvins = (event_sources == "kelvins").sum() n_spacetrack = (event_sources == "spacetrack").sum() n_augmented = (event_sources == "augmented").sum() n_pos_final = (event_risks > -5).sum() n_total_final = len(event_risks) print(f"\n{'=' * 60}") print(f" Final Training Set:") print(f" Kelvins events: {n_kelvins}") print(f" Space-Track events: {n_spacetrack}") print(f" Augmented events: {n_augmented}") print(f" Total events: {n_total_final}") print(f" Positive events: {n_pos_final} ({n_pos_final/n_total_final:.1%})") print(f" Total CDM rows: {len(train_df)}") print(f"{'=' * 60}") return train_df, test_df