panacea-api / src /data /augment.py
DTanzillo's picture
Upload folder using huggingface_hub
a4b5ecb verified
# Generated by Claude Code -- 2026-02-08
"""Data augmentation for the conjunction prediction dataset.
The fundamental problem: only 67 high-risk events out of 13,154 in training (0.5%).
This module provides two augmentation strategies:
1. SPACE-TRACK INTEGRATION: Merge real high-risk CDMs from Space-Track's cdm_public
feed. These have fewer features (16 vs 103) but provide real positive examples.
2. TIME-SERIES AUGMENTATION: Create synthetic variants of existing high-risk events
by applying realistic perturbations:
- Gaussian noise on covariance/position/velocity features
- Temporal jittering (shift CDM creation times slightly)
- Feature dropout (randomly zero out some features, simulating missing data)
- Sequence truncation (remove early CDMs, simulating late detection)
Both strategies are physics-aware: they don't generate impossible configurations
(e.g., negative miss distances or covariance values).
"""
import numpy as np
import pandas as pd
from pathlib import Path
def augment_event_noise(
event_df: pd.DataFrame,
noise_scale: float = 0.05,
n_augments: int = 5,
rng: np.random.Generator = None,
) -> list[pd.DataFrame]:
"""
Create n_augments noisy variants of a single conjunction event.
Applies Gaussian noise to numeric features, scaled by each column's
standard deviation within the event. Preserves event_id structure and
ensures physical constraints (non-negative distances, etc.).
"""
if rng is None:
rng = np.random.default_rng(42)
# Identify numeric columns to perturb (exclude IDs and targets)
exclude = {"event_id", "time_to_tca", "risk", "mission_id", "source"}
numeric_cols = event_df.select_dtypes(include=[np.number]).columns
perturb_cols = [c for c in numeric_cols if c not in exclude]
augmented = []
for i in range(n_augments):
aug = event_df.copy()
for col in perturb_cols:
values = aug[col].values.astype(float)
col_std = np.std(values)
if col_std < 1e-10:
col_std = np.abs(np.mean(values)) * 0.01 + 1e-10
noise = rng.normal(0, noise_scale * col_std, size=len(values))
aug[col] = values + noise
# Physical constraints
if "miss_distance" in aug.columns:
aug["miss_distance"] = aug["miss_distance"].clip(lower=0)
if "relative_speed" in aug.columns:
aug["relative_speed"] = aug["relative_speed"].clip(lower=0)
# Ensure covariance sigma columns stay positive
sigma_cols = [c for c in perturb_cols if "sigma" in c.lower()]
for col in sigma_cols:
aug[col] = aug[col].clip(lower=0)
augmented.append(aug)
return augmented
def augment_event_truncate(
event_df: pd.DataFrame,
min_keep: int = 3,
n_augments: int = 3,
rng: np.random.Generator = None,
) -> list[pd.DataFrame]:
"""
Create truncated variants by removing early CDMs.
Simulates late-detection scenarios where only the most recent CDMs
are available (closer to TCA).
"""
if rng is None:
rng = np.random.default_rng(42)
# Sort by time_to_tca descending (first CDM = furthest from TCA)
event_df = event_df.sort_values("time_to_tca", ascending=False)
n_cdms = len(event_df)
if n_cdms <= min_keep:
return []
augmented = []
for _ in range(n_augments):
# Keep between min_keep and n_cdms-1 CDMs (always keep the last few)
n_keep = rng.integers(min_keep, n_cdms)
aug = event_df.iloc[-n_keep:].copy()
augmented.append(aug)
return augmented
def augment_positive_events(
df: pd.DataFrame,
target_ratio: float = 0.05,
noise_scale: float = 0.05,
seed: int = 42,
) -> pd.DataFrame:
"""
Augment the positive (high-risk) class to reach target_ratio.
Args:
df: full training DataFrame with event_id, risk columns
target_ratio: desired fraction of high-risk events (default 5%)
noise_scale: std dev of Gaussian noise as fraction of feature std
seed: random seed
Returns:
Augmented DataFrame with new synthetic positive events appended
"""
rng = np.random.default_rng(seed)
# Find positive events
event_risks = df.groupby("event_id")["risk"].last()
pos_event_ids = event_risks[event_risks > -5].index.tolist()
neg_event_ids = event_risks[event_risks <= -5].index.tolist()
n_pos = len(pos_event_ids)
n_neg = len(neg_event_ids)
n_total = n_pos + n_neg
# How many positive events do we need?
target_pos = int(target_ratio * (n_total / (1 - target_ratio)))
n_needed = max(0, target_pos - n_pos)
if n_needed == 0:
print(f"Already at target ratio ({n_pos}/{n_total} = {n_pos/n_total:.1%})")
return df
print(f"Augmenting: {n_pos} positive events → {n_pos + n_needed} "
f"(target {target_ratio:.0%} of {n_total + n_needed})")
# Generate augmented events
max_event_id = df["event_id"].max()
augmented_dfs = []
generated = 0
while generated < n_needed:
# Pick a random positive event to augment
src_event_id = rng.choice(pos_event_ids)
src_event = df[df["event_id"] == src_event_id]
# Apply noise augmentation
aug_variants = augment_event_noise(
src_event, noise_scale=noise_scale, n_augments=1, rng=rng
)
# Also try truncation sometimes
if rng.random() < 0.3 and len(src_event) > 3:
trunc_variants = augment_event_truncate(
src_event, n_augments=1, rng=rng
)
aug_variants.extend(trunc_variants)
for aug_df in aug_variants:
if generated >= n_needed:
break
max_event_id += 1
aug_df = aug_df.copy()
aug_df["event_id"] = max_event_id
aug_df["source"] = "augmented"
augmented_dfs.append(aug_df)
generated += 1
if augmented_dfs:
augmented = pd.concat(augmented_dfs, ignore_index=True)
result = pd.concat([df, augmented], ignore_index=True)
# Verify
event_risks = result.groupby("event_id")["risk"].last()
new_pos = (event_risks > -5).sum()
new_total = len(event_risks)
print(f"Result: {new_pos} positive / {new_total} total "
f"({new_pos/new_total:.1%})")
return result
return df
def integrate_spacetrack_positives(
kelvins_df: pd.DataFrame,
spacetrack_path: Path,
) -> pd.DataFrame:
"""
Add Space-Track emergency CDMs as additional positive training examples.
Since Space-Track cdm_public has only 16 features vs Kelvins' 103,
missing features are filled with 0. The model will learn to use whatever
features are available.
"""
if not spacetrack_path.exists():
print(f"No Space-Track data at {spacetrack_path}")
return kelvins_df
from src.data.merge_sources import (
load_spacetrack_cdms, group_into_events, merge_datasets
)
st_df = load_spacetrack_cdms(spacetrack_path)
st_df = group_into_events(st_df)
merged = merge_datasets(kelvins_df, st_df)
return merged
def build_augmented_training_set(
data_dir: Path,
target_positive_ratio: float = 0.05,
noise_scale: float = 0.05,
seed: int = 42,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Build the full augmented training set from all available sources.
Steps:
1. Load ESA Kelvins train/test
2. Merge Space-Track emergency CDMs into training set
3. Apply time-series augmentation to positive events
4. Return (augmented_train, original_test)
Test set is NEVER augmented — it stays as Kelvins-only for fair evaluation.
"""
from src.data.cdm_loader import load_dataset
print("=" * 60)
print(" Building Augmented Training Set")
print("=" * 60)
# Step 1: Load Kelvins
print("\n1. Loading ESA Kelvins dataset ...")
train_df, test_df = load_dataset(data_dir / "cdm")
# Defragment and tag source
train_df = train_df.copy()
test_df = test_df.copy()
train_df["source"] = "kelvins"
test_df["source"] = "kelvins"
# Count initial positives
event_risks = train_df.groupby("event_id")["risk"].last()
n_pos_initial = (event_risks > -5).sum()
n_total_initial = len(event_risks)
print(f" Initial: {n_pos_initial} positive / {n_total_initial} total "
f"({n_pos_initial/n_total_initial:.2%})")
# Step 2: Space-Track integration
st_path = data_dir / "cdm_spacetrack" / "cdm_spacetrack_emergency.csv"
if st_path.exists():
print(f"\n2. Integrating Space-Track emergency CDMs ...")
train_df = integrate_spacetrack_positives(train_df, st_path)
else:
print(f"\n2. No Space-Track data found (skipping)")
# Step 3: Time-series augmentation
print(f"\n3. Augmenting positive events (target ratio: {target_positive_ratio:.0%}) ...")
train_df = augment_positive_events(
train_df,
target_ratio=target_positive_ratio,
noise_scale=noise_scale,
seed=seed,
)
# Final stats
event_risks = train_df.groupby("event_id")["risk"].last()
event_sources = train_df.groupby("event_id")["source"].first()
n_kelvins = (event_sources == "kelvins").sum()
n_spacetrack = (event_sources == "spacetrack").sum()
n_augmented = (event_sources == "augmented").sum()
n_pos_final = (event_risks > -5).sum()
n_total_final = len(event_risks)
print(f"\n{'=' * 60}")
print(f" Final Training Set:")
print(f" Kelvins events: {n_kelvins}")
print(f" Space-Track events: {n_spacetrack}")
print(f" Augmented events: {n_augmented}")
print(f" Total events: {n_total_final}")
print(f" Positive events: {n_pos_final} ({n_pos_final/n_total_final:.1%})")
print(f" Total CDM rows: {len(train_df)}")
print(f"{'=' * 60}")
return train_df, test_df