mmrech's picture
feat: v0.2 — real FITS support, TAI/UTC fix, SkyBoT, two-pass bg
41d98e2 verified
"""
AsteroidNET Image Preprocessor (image_preprocessor.preprocessor)
Two-pass background subtraction with source masking, cosmic-ray rejection,
and multi-frame WCS alignment.
Two-pass background is CRITICAL for real data:
- First pass gives rough background → build source mask
- Second pass with mask gives unbiased background (sources don't inflate estimate)
- This matters especially in crowded fields near galactic plane
Byte-order note: data must already be float32 native (done in ingestor).
"""
from __future__ import annotations
import logging
import warnings
from typing import Optional
import numpy as np
from astropy.io import fits
from astropy.stats import SigmaClip
from astropy.wcs import FITSFixedWarning
from asteroidnet.fits_ingestor.ingestor import FITSFrame
logger = logging.getLogger(__name__)
def preprocess_frame(
frame: FITSFrame,
config: Optional[dict] = None,
) -> tuple[np.ndarray, np.ndarray]:
"""
Full preprocessing pipeline for a single frame.
Stages:
1. Cosmic-ray rejection (astroscrappy L.A.Cosmic)
2. Two-pass background subtraction with source masking
3. Returns (background-subtracted data, background RMS map)
Parameters
----------
frame : FITSFrame
Ingested frame with float32 native data.
config : dict, optional
Pipeline configuration.
Returns
-------
data_sub : ndarray
Background-subtracted data (NaN where masked).
bkg_rms : ndarray
Per-pixel background RMS (for SNR threshold computation).
"""
cfg = (config or {}).get("preprocessing", {})
data = frame.data.copy()
# ── Stage 1: Cosmic ray rejection ───────────────────────────────────────
data, cr_mask = _reject_cosmic_rays(data, cfg, frame.exptime_s)
# ── Stage 2: Two-pass background subtraction ─────────────────────────────
data_sub, bkg_rms = _subtract_background(data, cfg)
logger.debug(
"Preprocessed %s: CR_mask=%.3f%%, bkg_median=%.2f, bkg_rms_median=%.2f",
frame.path.name,
100 * cr_mask.sum() / cr_mask.size,
float(np.nanmedian(data_sub + bkg_rms)), # approx background level
float(np.nanmedian(bkg_rms)),
)
return data_sub, bkg_rms
def align_frames(
frames: list[FITSFrame],
data_list: list[np.ndarray],
config: Optional[dict] = None,
) -> list[np.ndarray]:
"""
Reproject all frames to the WCS of the first frame.
Uses reproject_adaptive with conserve_flux=True — the recommended
general-purpose algorithm that handles pixel scale differences.
Returns aligned data arrays (same WCS as frames[0]).
"""
if len(frames) < 2:
return data_list
try:
from reproject import reproject_adaptive
except ImportError:
logger.warning("reproject not installed — skipping alignment")
return data_list
ref_header = frames[0].header
aligned = [data_list[0]]
for i, (frame, data) in enumerate(zip(frames[1:], data_list[1:]), 1):
with warnings.catch_warnings():
warnings.simplefilter("ignore", FITSFixedWarning)
try:
reprojected, footprint = reproject_adaptive(
(data, frame.header),
ref_header,
conserve_flux=True,
kernel="gaussian",
)
# Mask pixels outside footprint
reprojected[footprint < 0.5] = np.nan
aligned.append(reprojected.astype(np.float32))
logger.debug("Aligned frame %d/%d to reference WCS", i, len(frames) - 1)
except Exception as exc:
logger.warning("Frame %d alignment failed: %s — using unaligned", i, exc)
aligned.append(data)
return aligned
# ── Private helpers ──────────────────────────────────────────────────────────
def _reject_cosmic_rays(
data: np.ndarray,
cfg: dict,
exptime_s: float,
) -> tuple[np.ndarray, np.ndarray]:
"""Apply L.A.Cosmic cosmic-ray rejection via astroscrappy."""
try:
import astroscrappy
sigclip = float(cfg.get("cosmic_ray_sigclip", 4.5))
objlim = float(cfg.get("cosmic_ray_objlim", 5.0))
# Readnoise from config or typical survey default
readnoise = float(cfg.get("readnoise_e", 10.0))
# astroscrappy requires no NaN — replace with median
nan_mask = ~np.isfinite(data)
fill_val = float(np.nanmedian(data))
data_fill = np.where(nan_mask, fill_val, data).astype(np.float32)
cr_mask, cleaned = astroscrappy.detect_cosmics(
data_fill,
sigclip=sigclip,
sigfrac=0.3,
objlim=objlim,
readnoise=readnoise,
gain=1.0,
verbose=False,
)
# Restore original NaN positions
cleaned[nan_mask] = np.nan
cleaned = cleaned.astype(np.float32)
cr_mask |= nan_mask
n_cr = int(cr_mask.sum()) - int(nan_mask.sum())
if n_cr > 0:
logger.debug("Rejected %d cosmic rays", n_cr)
return cleaned, cr_mask
except ImportError:
logger.debug("astroscrappy not installed — skipping CR rejection")
nan_mask = ~np.isfinite(data)
return data, nan_mask
def _subtract_background(
data: np.ndarray,
cfg: dict,
) -> tuple[np.ndarray, np.ndarray]:
"""
Two-pass sigma-clipped 2D background subtraction with source masking.
Pass 1: rough background → detect sources → build mask
Pass 2: re-estimate background with masked sources → final subtraction
The two-pass approach is critical for crowded fields: sources bias the
background estimate upward, causing under-subtraction and spurious detections.
"""
try:
from photutils.background import Background2D, SExtractorBackground
from photutils.segmentation import detect_sources
except ImportError:
logger.warning("photutils not installed — using sigma-clipped median background")
from astropy.stats import sigma_clipped_stats
_, med, std = sigma_clipped_stats(data[np.isfinite(data)])
return (data - med).astype(np.float32), np.full_like(data, std)
box_size = int(cfg.get("background_box_size", 64))
filter_size = int(cfg.get("background_filter_size", 3))
sigma = float(cfg.get("sigma_clip_sigma", 3.0))
maxiters = int(cfg.get("sigma_clip_maxiters", 10))
mask_snr = float(cfg.get("source_mask_snr", 2.0))
sc = SigmaClip(sigma=sigma, maxiters=maxiters)
nan_mask = ~np.isfinite(data)
# Combine NaN mask with user mask
edge_mask = nan_mask.copy()
# ── Pass 1: rough background ──────────────────────────────────────────
try:
bkg1 = Background2D(
data,
box_size=box_size,
filter_size=filter_size,
sigma_clip=sc,
bkg_estimator=SExtractorBackground(),
mask=edge_mask,
fill_value=0.0,
)
rough_sub = data - bkg1.background
# Build source mask from first-pass subtraction
threshold1 = mask_snr * bkg1.background_rms
source_mask = np.zeros_like(data, dtype=bool)
try:
segm = detect_sources(rough_sub, threshold1, npixels=5)
if segm is not None:
source_mask = segm.data > 0
except Exception:
pass
combined_mask = edge_mask | source_mask
# ── Pass 2: refined background with source mask ──────────────────
bkg2 = Background2D(
data,
box_size=box_size,
filter_size=filter_size,
sigma_clip=sc,
bkg_estimator=SExtractorBackground(),
mask=combined_mask,
fill_value=0.0,
)
data_sub = (data - bkg2.background).astype(np.float32)
data_sub[nan_mask] = np.nan
bkg_rms = bkg2.background_rms.astype(np.float32)
logger.debug(
"Two-pass background: src_mask=%.2f%%, bkg_rms_median=%.3f",
100 * source_mask.sum() / source_mask.size,
float(np.nanmedian(bkg_rms)),
)
return data_sub, bkg_rms
except Exception as exc:
logger.warning("Background2D failed (%s) — falling back to constant", exc)
from astropy.stats import sigma_clipped_stats
_, med, std = sigma_clipped_stats(data[np.isfinite(data)])
return (data - med).astype(np.float32), np.full_like(data, std, dtype=np.float32)