panacea-api / src /data /merge_sources.py
DTanzillo's picture
Upload folder using huggingface_hub
a4b5ecb verified
# Generated by Claude Code -- 2026-02-08
"""Merge CDM data from multiple sources into unified training format.
Combines:
1. ESA Kelvins dataset (103 features, labeled)
2. Space-Track cdm_public (16 features, unlabeled — derive risk from PC)
Strategy:
- Space-Track CDMs are grouped into "conjunction events" by (SAT_1_ID, SAT_2_ID, TCA_date)
- Each event gets a time series of CDMs ordered by CREATED date
- Risk label derived from final PC: high risk if PC > 1e-5 (same threshold as Kelvins)
- Features that exist in both sources get unified column names
- Missing features (e.g., covariance in Space-Track) are filled with 0
This gives us far more positive examples for training the risk classifier,
even though the Space-Track data has fewer features per CDM.
"""
import numpy as np
import pandas as pd
from pathlib import Path
from datetime import timedelta
# Mapping from Space-Track CDM_PUBLIC fields → unified column names
SPACETRACK_COLUMN_MAP = {
"CDM_ID": "cdm_id",
"CREATED": "created",
"TCA": "tca",
"MIN_RNG": "miss_distance", # km in Space-Track
"PC": "collision_probability",
"SAT_1_ID": "sat_1_id",
"SAT_1_NAME": "sat_1_name",
"SAT1_OBJECT_TYPE": "t_object_type",
"SAT1_RCS": "t_rcs",
"SAT_1_EXCL_VOL": "t_excl_vol",
"SAT_2_ID": "sat_2_id",
"SAT_2_NAME": "sat_2_name",
"SAT2_OBJECT_TYPE": "c_object_type",
"SAT2_RCS": "c_rcs",
"SAT_2_EXCL_VOL": "c_excl_vol",
"EMERGENCY_REPORTABLE": "emergency_reportable",
}
# Risk threshold: PC > 1e-5 = high risk (matches ESA Kelvins: risk > -5)
RISK_THRESHOLD = 1e-5
def load_spacetrack_cdms(csv_path: Path) -> pd.DataFrame:
"""Load Space-Track CDM CSV and do initial cleaning."""
df = pd.read_csv(csv_path)
# Rename columns to unified format
df = df.rename(columns=SPACETRACK_COLUMN_MAP)
# Parse dates
for col in ["created", "tca"]:
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors="coerce")
# Convert miss_distance to float
if "miss_distance" in df.columns:
df["miss_distance"] = pd.to_numeric(df["miss_distance"], errors="coerce")
# Space-Track MIN_RNG is in km; ESA Kelvins miss_distance is in meters
# Convert to meters for consistency
df["miss_distance"] = df["miss_distance"] * 1000.0
# Convert collision_probability to float
if "collision_probability" in df.columns:
df["collision_probability"] = pd.to_numeric(df["collision_probability"], errors="coerce")
# Derive risk column (log10 of PC, matching ESA format)
if "collision_probability" in df.columns:
df["risk"] = np.where(
df["collision_probability"] > 0,
np.log10(df["collision_probability"].clip(lower=1e-30)),
-30.0,
)
print(f"Loaded {len(df)} Space-Track CDMs from {csv_path.name}")
return df
def group_into_events(df: pd.DataFrame) -> pd.DataFrame:
"""
Group Space-Track CDMs into conjunction events.
An 'event' is a sequence of CDMs for the same object pair with TCA
values within 1 day of each other. Each event gets a unique event_id.
"""
if df.empty:
return df
# Sort by object pair and TCA
df = df.sort_values(["sat_1_id", "sat_2_id", "tca", "created"]).reset_index(drop=True)
# Assign event IDs: same pair + TCA within 1 day = same event
event_ids = []
current_event = 0
prev_sat1 = None
prev_sat2 = None
prev_tca = None
for _, row in df.iterrows():
sat1 = row.get("sat_1_id")
sat2 = row.get("sat_2_id")
tca = row.get("tca")
same_pair = (sat1 == prev_sat1 and sat2 == prev_sat2)
close_tca = False
if same_pair and prev_tca is not None and pd.notna(tca) and pd.notna(prev_tca):
close_tca = abs((tca - prev_tca).total_seconds()) < 86400 # 1 day
if not (same_pair and close_tca):
current_event += 1
event_ids.append(current_event)
prev_sat1 = sat1
prev_sat2 = sat2
prev_tca = tca
df["event_id"] = event_ids
# Compute time_to_tca: days from CDM creation to TCA (for each CDM in event)
if "created" in df.columns and "tca" in df.columns:
df["time_to_tca"] = (df["tca"] - df["created"]).dt.total_seconds() / 86400.0
df["time_to_tca"] = df["time_to_tca"].clip(lower=0.0)
n_events = df["event_id"].nunique()
n_high_risk = 0
if "risk" in df.columns:
event_risks = df.groupby("event_id")["risk"].last()
n_high_risk = (event_risks > -5).sum()
print(f"Grouped into {n_events} events ({n_high_risk} high-risk)")
return df
def compute_relative_speed_from_excl_vol(df: pd.DataFrame) -> pd.DataFrame:
"""Estimate relative speed from exclusion volumes if available."""
# excl_vol is in km, but we can't derive speed from it alone
# Just ensure the column exists for compatibility
if "relative_speed" not in df.columns:
df["relative_speed"] = 0.0
return df
def align_with_kelvins_schema(
spacetrack_df: pd.DataFrame,
kelvins_df: pd.DataFrame,
) -> pd.DataFrame:
"""
Align Space-Track data columns with Kelvins schema.
Missing columns get filled with 0.
"""
# Get all columns from Kelvins
kelvins_cols = set(kelvins_df.columns)
st_cols = set(spacetrack_df.columns)
# Add missing numeric columns as 0
for col in kelvins_cols:
if col not in st_cols:
spacetrack_df[col] = 0.0
# Keep only columns that exist in Kelvins + our extra metadata
extra_cols = {"sat_1_id", "sat_2_id", "sat_1_name", "sat_2_name",
"t_object_type", "collision_probability", "created", "tca",
"cdm_id", "emergency_reportable", "t_rcs", "c_rcs",
"t_excl_vol", "c_excl_vol", "source"}
keep_cols = list(kelvins_cols | extra_cols)
available = [c for c in keep_cols if c in spacetrack_df.columns]
return spacetrack_df[available]
def merge_datasets(
kelvins_train_df: pd.DataFrame,
spacetrack_df: pd.DataFrame,
offset_event_ids: bool = True,
) -> pd.DataFrame:
"""
Merge Kelvins training data with Space-Track CDMs.
Args:
kelvins_train_df: ESA Kelvins training DataFrame
spacetrack_df: Space-Track CDMs (already grouped into events)
offset_event_ids: shift Space-Track event_ids to avoid collisions
Returns:
Combined DataFrame ready for model training
"""
# Tag sources
kelvins_train_df = kelvins_train_df.copy()
kelvins_train_df["source"] = "kelvins"
spacetrack_df = spacetrack_df.copy()
spacetrack_df["source"] = "spacetrack"
# Offset Space-Track event IDs to avoid collision with Kelvins IDs
if offset_event_ids and "event_id" in kelvins_train_df.columns:
max_kelvins_id = kelvins_train_df["event_id"].max()
spacetrack_df["event_id"] = spacetrack_df["event_id"] + max_kelvins_id + 1
# Align columns
spacetrack_df = align_with_kelvins_schema(spacetrack_df, kelvins_train_df)
# Concatenate
combined = pd.concat([kelvins_train_df, spacetrack_df], ignore_index=True)
# Fill any remaining NaN
numeric_cols = combined.select_dtypes(include=[np.number]).columns
combined[numeric_cols] = combined[numeric_cols].fillna(0)
n_kelvins = kelvins_train_df["event_id"].nunique()
n_st = spacetrack_df["event_id"].nunique()
n_total = combined["event_id"].nunique()
# Count high-risk events per source
event_risk = combined.groupby(["event_id", "source"])["risk"].last().reset_index()
n_hr_kelvins = ((event_risk["source"] == "kelvins") & (event_risk["risk"] > -5)).sum()
n_hr_st = ((event_risk["source"] == "spacetrack") & (event_risk["risk"] > -5)).sum()
print(f"\nMerged dataset:")
print(f" Kelvins: {n_kelvins} events ({n_hr_kelvins} high-risk)")
print(f" Space-Track: {n_st} events ({n_hr_st} high-risk)")
print(f" Total: {n_total} events ({n_hr_kelvins + n_hr_st} high-risk)")
print(f" Columns: {len(combined.columns)}")
return combined
def load_and_merge_all(data_dir: Path) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Load all available data sources and merge into train/test DataFrames.
Returns (train_df, test_df) — test is Kelvins-only (for fair comparison).
"""
from src.data.cdm_loader import load_dataset
# Load ESA Kelvins
kelvins_dir = data_dir / "cdm"
kelvins_train, kelvins_test = load_dataset(kelvins_dir)
# Load Space-Track data if available
spacetrack_dir = data_dir / "cdm_spacetrack"
spacetrack_files = list(spacetrack_dir.glob("cdm_*.csv")) if spacetrack_dir.exists() else []
if not spacetrack_files:
print("\nNo Space-Track data found. Using Kelvins only.")
return kelvins_train, kelvins_test
# Load and merge all Space-Track CSVs
st_dfs = []
for f in spacetrack_files:
if f.name.startswith("checkpoint"):
continue
df = load_spacetrack_cdms(f)
df = group_into_events(df)
df = compute_relative_speed_from_excl_vol(df)
st_dfs.append(df)
if st_dfs:
all_st = pd.concat(st_dfs, ignore_index=True)
# Re-assign event IDs after concatenation
all_st = group_into_events(all_st)
merged_train = merge_datasets(kelvins_train, all_st)
else:
merged_train = kelvins_train
# Test set stays Kelvins-only for fair benchmarking
return merged_train, kelvins_test