File size: 5,876 Bytes
41d98e2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
"""
AsteroidNET Time Utilities (utils.time_utils)

Handles the TAI vs UTC mismatch between Pan-STARRS and ZTF — the most
critical gotcha when processing real survey data.

Key facts:
  - Pan-STARRS MJD-OBS is in TAI (full skycell images MISSING 'TIMESYS' header)
  - ZTF OBSJD / OBSMJD / DATE-OBS are in UTC
  - TAI is ahead of UTC by 37 seconds (as of 2017, PS1 DR2 era)
  - 37 seconds = ~0.5–2 arcseconds of asteroid apparent motion
    (enough to put a predicted position outside the detection aperture)
  - For asteroid ephemeris queries, JPL Horizons expects UTC
  - Mid-exposure time = start_time + 0.5 * EXPTIME
"""

from __future__ import annotations

import logging
from typing import Optional

from astropy.time import Time

import astropy.units as u

logger = logging.getLogger(__name__)

# TAI is ahead of UTC by this many seconds during the PS1 observation period.
# Updated per IERS bulletins — 37 seconds since 2017-01-01.
_PS1_TAI_UTC_OFFSET_S = 37.0

# Survey-specific time scale lookup
_SURVEY_TIMESCALE = {
    "ps1":       "tai",   # Pan-STARRS — TAI (header often silent about this)
    "ztf":       "utc",   # ZTF — UTC
    "catalina":  "utc",
    "atlas":     "utc",
    "generic":   "utc",
}


def parse_fits_time(
    header,
    survey: str = "generic",
    exptime_key: str = "EXPTIME",
    obs_time_key: Optional[str] = None,
    return_midexp: bool = True,
) -> Time:
    """
    Parse observation time from a FITS header, correctly handling survey-specific
    time scale conventions (most importantly Pan-STARRS TAI vs ZTF UTC).

    Parameters
    ----------
    header : fits.Header or dict
        FITS header containing time keywords.
    survey : str
        Survey identifier: 'ps1', 'ztf', 'catalina', 'atlas', 'generic'.
        Controls which time scale is assumed.
    exptime_key : str
        Header keyword for exposure time in seconds.
    obs_time_key : str, optional
        Override the time keyword to read. If None, auto-detected from survey.
    return_midexp : bool
        If True, add half the exposure time to get mid-exposure time.

    Returns
    -------
    Time
        Observation time in UTC scale, at mid-exposure if return_midexp=True.
    """
    survey = survey.lower()
    scale = _SURVEY_TIMESCALE.get(survey, "utc")

    # Determine which header keyword holds the observation time
    if obs_time_key is not None:
        time_key = obs_time_key
    elif survey == "ps1":
        # Pan-STARRS: MJD-OBS is TAI (even if TIMESYS is absent/wrong)
        time_key = "MJD-OBS"
    elif survey == "ztf":
        # ZTF: prefer OBSMJD (MJD), fall back to DATE-OBS (ISO)
        time_key = "OBSMJD" if "OBSMJD" in header else "DATE-OBS"
    else:
        # Generic: try common keywords in order
        for k in ("DATE-OBS", "MJD-OBS", "OBSMJD", "OBSJD"):
            if k in header:
                time_key = k
                break
        else:
            raise KeyError(f"No recognized time keyword in header. Keys: {list(header.keys())[:20]}")

    raw_time = header[time_key]

    # Parse according to format
    if isinstance(raw_time, (int, float)):
        # Floating-point MJD or JD
        fmt = "jd" if raw_time > 2400000 else "mjd"
        t = Time(raw_time, format=fmt, scale=scale)
    else:
        # ISO string
        t = Time(str(raw_time), format="isot", scale=scale)

    # Convert to UTC (all downstream code expects UTC)
    t_utc = t.utc

    # Add half exposure time to get mid-exposure
    if return_midexp and exptime_key in header:
        exptime_s = float(header[exptime_key])
        t_utc = t_utc + (exptime_s / 2.0) * u.s
        logger.debug(
            "Mid-exposure time (%s, scale=%s→utc): %s (exptime=%.1fs)",
            survey, scale, t_utc.isot, exptime_s
        )
    else:
        logger.debug("Start-of-exposure time (%s→utc): %s", survey, t_utc.isot)

    return t_utc


def detect_survey_from_header(header) -> str:
    """
    Attempt to identify the survey from FITS header keywords.

    Returns one of: 'ps1', 'ztf', 'catalina', 'atlas', 'generic'
    """
    telescop = str(header.get("TELESCOP", "")).lower()
    instrume = str(header.get("INSTRUME", "")).lower()
    origin   = str(header.get("ORIGIN", "")).lower()
    
    if any(x in telescop for x in ("ps1", "panstarrs", "pan-starrs", "haleakala", "p60")):
        return "ps1"
    if any(x in telescop for x in ("p48", "palomar48", "ztf")):
        return "ztf"
    if "catalina" in telescop or "css" in instrume:
        return "catalina"
    if "atlas" in telescop:
        return "atlas"
    if "ps1" in str(header.get("FILENAME", "")).lower():
        return "ps1"
    if "ztf_" in str(header.get("FILENAME", "")).lower():
        return "ztf"

    return "generic"


def fix_ps1_header(header) -> None:
    """
    In-place fix for Pan-STARRS full skycell FITS headers.

    Known PS1 DR2 issues (from STScI documentation):
      1. TIMESYS keyword absent — should be 'TAI'
      2. RADESYS keyword absent — should be 'FK5'
      3. WCS uses obsolete PC001001 naming instead of PC1_1

    The fitscut.cgi service corrects these, but full skycell downloads don't.
    """
    if "TIMESYS" not in header:
        header["TIMESYS"] = ("TAI", "Time system [added by AsteroidNET]")
        logger.debug("Added missing TIMESYS=TAI to PS1 header")

    if "RADESYS" not in header:
        header["RADESYS"] = ("FK5", "Celestial coordinate system [added by AsteroidNET]")
        logger.debug("Added missing RADESYS=FK5 to PS1 header")

    # Fix obsolete PC matrix naming (PC001001 → PC1_1 etc.)
    for old, new in [("PC001001", "PC1_1"), ("PC001002", "PC1_2"),
                     ("PC002001", "PC2_1"), ("PC002002", "PC2_2")]:
        if old in header and new not in header:
            header[new] = header[old]
            logger.debug("Renamed WCS keyword %s → %s", old, new)