# Generated by Claude Code -- 2026-02-08 """Merge CDM data from multiple sources into unified training format. Combines: 1. ESA Kelvins dataset (103 features, labeled) 2. Space-Track cdm_public (16 features, unlabeled — derive risk from PC) Strategy: - Space-Track CDMs are grouped into "conjunction events" by (SAT_1_ID, SAT_2_ID, TCA_date) - Each event gets a time series of CDMs ordered by CREATED date - Risk label derived from final PC: high risk if PC > 1e-5 (same threshold as Kelvins) - Features that exist in both sources get unified column names - Missing features (e.g., covariance in Space-Track) are filled with 0 This gives us far more positive examples for training the risk classifier, even though the Space-Track data has fewer features per CDM. """ import numpy as np import pandas as pd from pathlib import Path from datetime import timedelta # Mapping from Space-Track CDM_PUBLIC fields → unified column names SPACETRACK_COLUMN_MAP = { "CDM_ID": "cdm_id", "CREATED": "created", "TCA": "tca", "MIN_RNG": "miss_distance", # km in Space-Track "PC": "collision_probability", "SAT_1_ID": "sat_1_id", "SAT_1_NAME": "sat_1_name", "SAT1_OBJECT_TYPE": "t_object_type", "SAT1_RCS": "t_rcs", "SAT_1_EXCL_VOL": "t_excl_vol", "SAT_2_ID": "sat_2_id", "SAT_2_NAME": "sat_2_name", "SAT2_OBJECT_TYPE": "c_object_type", "SAT2_RCS": "c_rcs", "SAT_2_EXCL_VOL": "c_excl_vol", "EMERGENCY_REPORTABLE": "emergency_reportable", } # Risk threshold: PC > 1e-5 = high risk (matches ESA Kelvins: risk > -5) RISK_THRESHOLD = 1e-5 def load_spacetrack_cdms(csv_path: Path) -> pd.DataFrame: """Load Space-Track CDM CSV and do initial cleaning.""" df = pd.read_csv(csv_path) # Rename columns to unified format df = df.rename(columns=SPACETRACK_COLUMN_MAP) # Parse dates for col in ["created", "tca"]: if col in df.columns: df[col] = pd.to_datetime(df[col], errors="coerce") # Convert miss_distance to float if "miss_distance" in df.columns: df["miss_distance"] = pd.to_numeric(df["miss_distance"], errors="coerce") # Space-Track MIN_RNG is in km; ESA Kelvins miss_distance is in meters # Convert to meters for consistency df["miss_distance"] = df["miss_distance"] * 1000.0 # Convert collision_probability to float if "collision_probability" in df.columns: df["collision_probability"] = pd.to_numeric(df["collision_probability"], errors="coerce") # Derive risk column (log10 of PC, matching ESA format) if "collision_probability" in df.columns: df["risk"] = np.where( df["collision_probability"] > 0, np.log10(df["collision_probability"].clip(lower=1e-30)), -30.0, ) print(f"Loaded {len(df)} Space-Track CDMs from {csv_path.name}") return df def group_into_events(df: pd.DataFrame) -> pd.DataFrame: """ Group Space-Track CDMs into conjunction events. An 'event' is a sequence of CDMs for the same object pair with TCA values within 1 day of each other. Each event gets a unique event_id. """ if df.empty: return df # Sort by object pair and TCA df = df.sort_values(["sat_1_id", "sat_2_id", "tca", "created"]).reset_index(drop=True) # Assign event IDs: same pair + TCA within 1 day = same event event_ids = [] current_event = 0 prev_sat1 = None prev_sat2 = None prev_tca = None for _, row in df.iterrows(): sat1 = row.get("sat_1_id") sat2 = row.get("sat_2_id") tca = row.get("tca") same_pair = (sat1 == prev_sat1 and sat2 == prev_sat2) close_tca = False if same_pair and prev_tca is not None and pd.notna(tca) and pd.notna(prev_tca): close_tca = abs((tca - prev_tca).total_seconds()) < 86400 # 1 day if not (same_pair and close_tca): current_event += 1 event_ids.append(current_event) prev_sat1 = sat1 prev_sat2 = sat2 prev_tca = tca df["event_id"] = event_ids # Compute time_to_tca: days from CDM creation to TCA (for each CDM in event) if "created" in df.columns and "tca" in df.columns: df["time_to_tca"] = (df["tca"] - df["created"]).dt.total_seconds() / 86400.0 df["time_to_tca"] = df["time_to_tca"].clip(lower=0.0) n_events = df["event_id"].nunique() n_high_risk = 0 if "risk" in df.columns: event_risks = df.groupby("event_id")["risk"].last() n_high_risk = (event_risks > -5).sum() print(f"Grouped into {n_events} events ({n_high_risk} high-risk)") return df def compute_relative_speed_from_excl_vol(df: pd.DataFrame) -> pd.DataFrame: """Estimate relative speed from exclusion volumes if available.""" # excl_vol is in km, but we can't derive speed from it alone # Just ensure the column exists for compatibility if "relative_speed" not in df.columns: df["relative_speed"] = 0.0 return df def align_with_kelvins_schema( spacetrack_df: pd.DataFrame, kelvins_df: pd.DataFrame, ) -> pd.DataFrame: """ Align Space-Track data columns with Kelvins schema. Missing columns get filled with 0. """ # Get all columns from Kelvins kelvins_cols = set(kelvins_df.columns) st_cols = set(spacetrack_df.columns) # Add missing numeric columns as 0 for col in kelvins_cols: if col not in st_cols: spacetrack_df[col] = 0.0 # Keep only columns that exist in Kelvins + our extra metadata extra_cols = {"sat_1_id", "sat_2_id", "sat_1_name", "sat_2_name", "t_object_type", "collision_probability", "created", "tca", "cdm_id", "emergency_reportable", "t_rcs", "c_rcs", "t_excl_vol", "c_excl_vol", "source"} keep_cols = list(kelvins_cols | extra_cols) available = [c for c in keep_cols if c in spacetrack_df.columns] return spacetrack_df[available] def merge_datasets( kelvins_train_df: pd.DataFrame, spacetrack_df: pd.DataFrame, offset_event_ids: bool = True, ) -> pd.DataFrame: """ Merge Kelvins training data with Space-Track CDMs. Args: kelvins_train_df: ESA Kelvins training DataFrame spacetrack_df: Space-Track CDMs (already grouped into events) offset_event_ids: shift Space-Track event_ids to avoid collisions Returns: Combined DataFrame ready for model training """ # Tag sources kelvins_train_df = kelvins_train_df.copy() kelvins_train_df["source"] = "kelvins" spacetrack_df = spacetrack_df.copy() spacetrack_df["source"] = "spacetrack" # Offset Space-Track event IDs to avoid collision with Kelvins IDs if offset_event_ids and "event_id" in kelvins_train_df.columns: max_kelvins_id = kelvins_train_df["event_id"].max() spacetrack_df["event_id"] = spacetrack_df["event_id"] + max_kelvins_id + 1 # Align columns spacetrack_df = align_with_kelvins_schema(spacetrack_df, kelvins_train_df) # Concatenate combined = pd.concat([kelvins_train_df, spacetrack_df], ignore_index=True) # Fill any remaining NaN numeric_cols = combined.select_dtypes(include=[np.number]).columns combined[numeric_cols] = combined[numeric_cols].fillna(0) n_kelvins = kelvins_train_df["event_id"].nunique() n_st = spacetrack_df["event_id"].nunique() n_total = combined["event_id"].nunique() # Count high-risk events per source event_risk = combined.groupby(["event_id", "source"])["risk"].last().reset_index() n_hr_kelvins = ((event_risk["source"] == "kelvins") & (event_risk["risk"] > -5)).sum() n_hr_st = ((event_risk["source"] == "spacetrack") & (event_risk["risk"] > -5)).sum() print(f"\nMerged dataset:") print(f" Kelvins: {n_kelvins} events ({n_hr_kelvins} high-risk)") print(f" Space-Track: {n_st} events ({n_hr_st} high-risk)") print(f" Total: {n_total} events ({n_hr_kelvins + n_hr_st} high-risk)") print(f" Columns: {len(combined.columns)}") return combined def load_and_merge_all(data_dir: Path) -> tuple[pd.DataFrame, pd.DataFrame]: """ Load all available data sources and merge into train/test DataFrames. Returns (train_df, test_df) — test is Kelvins-only (for fair comparison). """ from src.data.cdm_loader import load_dataset # Load ESA Kelvins kelvins_dir = data_dir / "cdm" kelvins_train, kelvins_test = load_dataset(kelvins_dir) # Load Space-Track data if available spacetrack_dir = data_dir / "cdm_spacetrack" spacetrack_files = list(spacetrack_dir.glob("cdm_*.csv")) if spacetrack_dir.exists() else [] if not spacetrack_files: print("\nNo Space-Track data found. Using Kelvins only.") return kelvins_train, kelvins_test # Load and merge all Space-Track CSVs st_dfs = [] for f in spacetrack_files: if f.name.startswith("checkpoint"): continue df = load_spacetrack_cdms(f) df = group_into_events(df) df = compute_relative_speed_from_excl_vol(df) st_dfs.append(df) if st_dfs: all_st = pd.concat(st_dfs, ignore_index=True) # Re-assign event IDs after concatenation all_st = group_into_events(all_st) merged_train = merge_datasets(kelvins_train, all_st) else: merged_train = kelvins_train # Test set stays Kelvins-only for fair benchmarking return merged_train, kelvins_test