Spaces:
Sleeping
Sleeping
| """ | |
| AsteroidNET Image Preprocessor (image_preprocessor.preprocessor) | |
| Two-pass background subtraction with source masking, cosmic-ray rejection, | |
| and multi-frame WCS alignment. | |
| Two-pass background is CRITICAL for real data: | |
| - First pass gives rough background → build source mask | |
| - Second pass with mask gives unbiased background (sources don't inflate estimate) | |
| - This matters especially in crowded fields near galactic plane | |
| Byte-order note: data must already be float32 native (done in ingestor). | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import warnings | |
| from typing import Optional | |
| import numpy as np | |
| from astropy.io import fits | |
| from astropy.stats import SigmaClip | |
| from astropy.wcs import FITSFixedWarning | |
| from asteroidnet.fits_ingestor.ingestor import FITSFrame | |
| logger = logging.getLogger(__name__) | |
| def preprocess_frame( | |
| frame: FITSFrame, | |
| config: Optional[dict] = None, | |
| ) -> tuple[np.ndarray, np.ndarray]: | |
| """ | |
| Full preprocessing pipeline for a single frame. | |
| Stages: | |
| 1. Cosmic-ray rejection (astroscrappy L.A.Cosmic) | |
| 2. Two-pass background subtraction with source masking | |
| 3. Returns (background-subtracted data, background RMS map) | |
| Parameters | |
| ---------- | |
| frame : FITSFrame | |
| Ingested frame with float32 native data. | |
| config : dict, optional | |
| Pipeline configuration. | |
| Returns | |
| ------- | |
| data_sub : ndarray | |
| Background-subtracted data (NaN where masked). | |
| bkg_rms : ndarray | |
| Per-pixel background RMS (for SNR threshold computation). | |
| """ | |
| cfg = (config or {}).get("preprocessing", {}) | |
| data = frame.data.copy() | |
| # ── Stage 1: Cosmic ray rejection ─────────────────────────────────────── | |
| data, cr_mask = _reject_cosmic_rays(data, cfg, frame.exptime_s) | |
| # ── Stage 2: Two-pass background subtraction ───────────────────────────── | |
| data_sub, bkg_rms = _subtract_background(data, cfg) | |
| logger.debug( | |
| "Preprocessed %s: CR_mask=%.3f%%, bkg_median=%.2f, bkg_rms_median=%.2f", | |
| frame.path.name, | |
| 100 * cr_mask.sum() / cr_mask.size, | |
| float(np.nanmedian(data_sub + bkg_rms)), # approx background level | |
| float(np.nanmedian(bkg_rms)), | |
| ) | |
| return data_sub, bkg_rms | |
| def align_frames( | |
| frames: list[FITSFrame], | |
| data_list: list[np.ndarray], | |
| config: Optional[dict] = None, | |
| ) -> list[np.ndarray]: | |
| """ | |
| Reproject all frames to the WCS of the first frame. | |
| Uses reproject_adaptive with conserve_flux=True — the recommended | |
| general-purpose algorithm that handles pixel scale differences. | |
| Returns aligned data arrays (same WCS as frames[0]). | |
| """ | |
| if len(frames) < 2: | |
| return data_list | |
| try: | |
| from reproject import reproject_adaptive | |
| except ImportError: | |
| logger.warning("reproject not installed — skipping alignment") | |
| return data_list | |
| ref_header = frames[0].header | |
| aligned = [data_list[0]] | |
| for i, (frame, data) in enumerate(zip(frames[1:], data_list[1:]), 1): | |
| with warnings.catch_warnings(): | |
| warnings.simplefilter("ignore", FITSFixedWarning) | |
| try: | |
| reprojected, footprint = reproject_adaptive( | |
| (data, frame.header), | |
| ref_header, | |
| conserve_flux=True, | |
| kernel="gaussian", | |
| ) | |
| # Mask pixels outside footprint | |
| reprojected[footprint < 0.5] = np.nan | |
| aligned.append(reprojected.astype(np.float32)) | |
| logger.debug("Aligned frame %d/%d to reference WCS", i, len(frames) - 1) | |
| except Exception as exc: | |
| logger.warning("Frame %d alignment failed: %s — using unaligned", i, exc) | |
| aligned.append(data) | |
| return aligned | |
| # ── Private helpers ────────────────────────────────────────────────────────── | |
| def _reject_cosmic_rays( | |
| data: np.ndarray, | |
| cfg: dict, | |
| exptime_s: float, | |
| ) -> tuple[np.ndarray, np.ndarray]: | |
| """Apply L.A.Cosmic cosmic-ray rejection via astroscrappy.""" | |
| try: | |
| import astroscrappy | |
| sigclip = float(cfg.get("cosmic_ray_sigclip", 4.5)) | |
| objlim = float(cfg.get("cosmic_ray_objlim", 5.0)) | |
| # Readnoise from config or typical survey default | |
| readnoise = float(cfg.get("readnoise_e", 10.0)) | |
| # astroscrappy requires no NaN — replace with median | |
| nan_mask = ~np.isfinite(data) | |
| fill_val = float(np.nanmedian(data)) | |
| data_fill = np.where(nan_mask, fill_val, data).astype(np.float32) | |
| cr_mask, cleaned = astroscrappy.detect_cosmics( | |
| data_fill, | |
| sigclip=sigclip, | |
| sigfrac=0.3, | |
| objlim=objlim, | |
| readnoise=readnoise, | |
| gain=1.0, | |
| verbose=False, | |
| ) | |
| # Restore original NaN positions | |
| cleaned[nan_mask] = np.nan | |
| cleaned = cleaned.astype(np.float32) | |
| cr_mask |= nan_mask | |
| n_cr = int(cr_mask.sum()) - int(nan_mask.sum()) | |
| if n_cr > 0: | |
| logger.debug("Rejected %d cosmic rays", n_cr) | |
| return cleaned, cr_mask | |
| except ImportError: | |
| logger.debug("astroscrappy not installed — skipping CR rejection") | |
| nan_mask = ~np.isfinite(data) | |
| return data, nan_mask | |
| def _subtract_background( | |
| data: np.ndarray, | |
| cfg: dict, | |
| ) -> tuple[np.ndarray, np.ndarray]: | |
| """ | |
| Two-pass sigma-clipped 2D background subtraction with source masking. | |
| Pass 1: rough background → detect sources → build mask | |
| Pass 2: re-estimate background with masked sources → final subtraction | |
| The two-pass approach is critical for crowded fields: sources bias the | |
| background estimate upward, causing under-subtraction and spurious detections. | |
| """ | |
| try: | |
| from photutils.background import Background2D, SExtractorBackground | |
| from photutils.segmentation import detect_sources | |
| except ImportError: | |
| logger.warning("photutils not installed — using sigma-clipped median background") | |
| from astropy.stats import sigma_clipped_stats | |
| _, med, std = sigma_clipped_stats(data[np.isfinite(data)]) | |
| return (data - med).astype(np.float32), np.full_like(data, std) | |
| box_size = int(cfg.get("background_box_size", 64)) | |
| filter_size = int(cfg.get("background_filter_size", 3)) | |
| sigma = float(cfg.get("sigma_clip_sigma", 3.0)) | |
| maxiters = int(cfg.get("sigma_clip_maxiters", 10)) | |
| mask_snr = float(cfg.get("source_mask_snr", 2.0)) | |
| sc = SigmaClip(sigma=sigma, maxiters=maxiters) | |
| nan_mask = ~np.isfinite(data) | |
| # Combine NaN mask with user mask | |
| edge_mask = nan_mask.copy() | |
| # ── Pass 1: rough background ────────────────────────────────────────── | |
| try: | |
| bkg1 = Background2D( | |
| data, | |
| box_size=box_size, | |
| filter_size=filter_size, | |
| sigma_clip=sc, | |
| bkg_estimator=SExtractorBackground(), | |
| mask=edge_mask, | |
| fill_value=0.0, | |
| ) | |
| rough_sub = data - bkg1.background | |
| # Build source mask from first-pass subtraction | |
| threshold1 = mask_snr * bkg1.background_rms | |
| source_mask = np.zeros_like(data, dtype=bool) | |
| try: | |
| segm = detect_sources(rough_sub, threshold1, npixels=5) | |
| if segm is not None: | |
| source_mask = segm.data > 0 | |
| except Exception: | |
| pass | |
| combined_mask = edge_mask | source_mask | |
| # ── Pass 2: refined background with source mask ────────────────── | |
| bkg2 = Background2D( | |
| data, | |
| box_size=box_size, | |
| filter_size=filter_size, | |
| sigma_clip=sc, | |
| bkg_estimator=SExtractorBackground(), | |
| mask=combined_mask, | |
| fill_value=0.0, | |
| ) | |
| data_sub = (data - bkg2.background).astype(np.float32) | |
| data_sub[nan_mask] = np.nan | |
| bkg_rms = bkg2.background_rms.astype(np.float32) | |
| logger.debug( | |
| "Two-pass background: src_mask=%.2f%%, bkg_rms_median=%.3f", | |
| 100 * source_mask.sum() / source_mask.size, | |
| float(np.nanmedian(bkg_rms)), | |
| ) | |
| return data_sub, bkg_rms | |
| except Exception as exc: | |
| logger.warning("Background2D failed (%s) — falling back to constant", exc) | |
| from astropy.stats import sigma_clipped_stats | |
| _, med, std = sigma_clipped_stats(data[np.isfinite(data)]) | |
| return (data - med).astype(np.float32), np.full_like(data, std, dtype=np.float32) | |