""" AsteroidNET Image Preprocessor (image_preprocessor.preprocessor) Two-pass background subtraction with source masking, cosmic-ray rejection, and multi-frame WCS alignment. Two-pass background is CRITICAL for real data: - First pass gives rough background → build source mask - Second pass with mask gives unbiased background (sources don't inflate estimate) - This matters especially in crowded fields near galactic plane Byte-order note: data must already be float32 native (done in ingestor). """ from __future__ import annotations import logging import warnings from typing import Optional import numpy as np from astropy.io import fits from astropy.stats import SigmaClip from astropy.wcs import FITSFixedWarning from asteroidnet.fits_ingestor.ingestor import FITSFrame logger = logging.getLogger(__name__) def preprocess_frame( frame: FITSFrame, config: Optional[dict] = None, ) -> tuple[np.ndarray, np.ndarray]: """ Full preprocessing pipeline for a single frame. Stages: 1. Cosmic-ray rejection (astroscrappy L.A.Cosmic) 2. Two-pass background subtraction with source masking 3. Returns (background-subtracted data, background RMS map) Parameters ---------- frame : FITSFrame Ingested frame with float32 native data. config : dict, optional Pipeline configuration. Returns ------- data_sub : ndarray Background-subtracted data (NaN where masked). bkg_rms : ndarray Per-pixel background RMS (for SNR threshold computation). """ cfg = (config or {}).get("preprocessing", {}) data = frame.data.copy() # ── Stage 1: Cosmic ray rejection ─────────────────────────────────────── data, cr_mask = _reject_cosmic_rays(data, cfg, frame.exptime_s) # ── Stage 2: Two-pass background subtraction ───────────────────────────── data_sub, bkg_rms = _subtract_background(data, cfg) logger.debug( "Preprocessed %s: CR_mask=%.3f%%, bkg_median=%.2f, bkg_rms_median=%.2f", frame.path.name, 100 * cr_mask.sum() / cr_mask.size, float(np.nanmedian(data_sub + bkg_rms)), # approx background level float(np.nanmedian(bkg_rms)), ) return data_sub, bkg_rms def align_frames( frames: list[FITSFrame], data_list: list[np.ndarray], config: Optional[dict] = None, ) -> list[np.ndarray]: """ Reproject all frames to the WCS of the first frame. Uses reproject_adaptive with conserve_flux=True — the recommended general-purpose algorithm that handles pixel scale differences. Returns aligned data arrays (same WCS as frames[0]). """ if len(frames) < 2: return data_list try: from reproject import reproject_adaptive except ImportError: logger.warning("reproject not installed — skipping alignment") return data_list ref_header = frames[0].header aligned = [data_list[0]] for i, (frame, data) in enumerate(zip(frames[1:], data_list[1:]), 1): with warnings.catch_warnings(): warnings.simplefilter("ignore", FITSFixedWarning) try: reprojected, footprint = reproject_adaptive( (data, frame.header), ref_header, conserve_flux=True, kernel="gaussian", ) # Mask pixels outside footprint reprojected[footprint < 0.5] = np.nan aligned.append(reprojected.astype(np.float32)) logger.debug("Aligned frame %d/%d to reference WCS", i, len(frames) - 1) except Exception as exc: logger.warning("Frame %d alignment failed: %s — using unaligned", i, exc) aligned.append(data) return aligned # ── Private helpers ────────────────────────────────────────────────────────── def _reject_cosmic_rays( data: np.ndarray, cfg: dict, exptime_s: float, ) -> tuple[np.ndarray, np.ndarray]: """Apply L.A.Cosmic cosmic-ray rejection via astroscrappy.""" try: import astroscrappy sigclip = float(cfg.get("cosmic_ray_sigclip", 4.5)) objlim = float(cfg.get("cosmic_ray_objlim", 5.0)) # Readnoise from config or typical survey default readnoise = float(cfg.get("readnoise_e", 10.0)) # astroscrappy requires no NaN — replace with median nan_mask = ~np.isfinite(data) fill_val = float(np.nanmedian(data)) data_fill = np.where(nan_mask, fill_val, data).astype(np.float32) cr_mask, cleaned = astroscrappy.detect_cosmics( data_fill, sigclip=sigclip, sigfrac=0.3, objlim=objlim, readnoise=readnoise, gain=1.0, verbose=False, ) # Restore original NaN positions cleaned[nan_mask] = np.nan cleaned = cleaned.astype(np.float32) cr_mask |= nan_mask n_cr = int(cr_mask.sum()) - int(nan_mask.sum()) if n_cr > 0: logger.debug("Rejected %d cosmic rays", n_cr) return cleaned, cr_mask except ImportError: logger.debug("astroscrappy not installed — skipping CR rejection") nan_mask = ~np.isfinite(data) return data, nan_mask def _subtract_background( data: np.ndarray, cfg: dict, ) -> tuple[np.ndarray, np.ndarray]: """ Two-pass sigma-clipped 2D background subtraction with source masking. Pass 1: rough background → detect sources → build mask Pass 2: re-estimate background with masked sources → final subtraction The two-pass approach is critical for crowded fields: sources bias the background estimate upward, causing under-subtraction and spurious detections. """ try: from photutils.background import Background2D, SExtractorBackground from photutils.segmentation import detect_sources except ImportError: logger.warning("photutils not installed — using sigma-clipped median background") from astropy.stats import sigma_clipped_stats _, med, std = sigma_clipped_stats(data[np.isfinite(data)]) return (data - med).astype(np.float32), np.full_like(data, std) box_size = int(cfg.get("background_box_size", 64)) filter_size = int(cfg.get("background_filter_size", 3)) sigma = float(cfg.get("sigma_clip_sigma", 3.0)) maxiters = int(cfg.get("sigma_clip_maxiters", 10)) mask_snr = float(cfg.get("source_mask_snr", 2.0)) sc = SigmaClip(sigma=sigma, maxiters=maxiters) nan_mask = ~np.isfinite(data) # Combine NaN mask with user mask edge_mask = nan_mask.copy() # ── Pass 1: rough background ────────────────────────────────────────── try: bkg1 = Background2D( data, box_size=box_size, filter_size=filter_size, sigma_clip=sc, bkg_estimator=SExtractorBackground(), mask=edge_mask, fill_value=0.0, ) rough_sub = data - bkg1.background # Build source mask from first-pass subtraction threshold1 = mask_snr * bkg1.background_rms source_mask = np.zeros_like(data, dtype=bool) try: segm = detect_sources(rough_sub, threshold1, npixels=5) if segm is not None: source_mask = segm.data > 0 except Exception: pass combined_mask = edge_mask | source_mask # ── Pass 2: refined background with source mask ────────────────── bkg2 = Background2D( data, box_size=box_size, filter_size=filter_size, sigma_clip=sc, bkg_estimator=SExtractorBackground(), mask=combined_mask, fill_value=0.0, ) data_sub = (data - bkg2.background).astype(np.float32) data_sub[nan_mask] = np.nan bkg_rms = bkg2.background_rms.astype(np.float32) logger.debug( "Two-pass background: src_mask=%.2f%%, bkg_rms_median=%.3f", 100 * source_mask.sum() / source_mask.size, float(np.nanmedian(bkg_rms)), ) return data_sub, bkg_rms except Exception as exc: logger.warning("Background2D failed (%s) — falling back to constant", exc) from astropy.stats import sigma_clipped_stats _, med, std = sigma_clipped_stats(data[np.isfinite(data)]) return (data - med).astype(np.float32), np.full_like(data, std, dtype=np.float32)