File size: 13,746 Bytes
8bcb60f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
"""
Quality Control for GHCN Data

Implements quality checks and cleaning procedures for weather observations.
Based on GHCN quality control flags and additional statistical checks.
"""

from dataclasses import dataclass
from enum import Enum
from typing import Optional

import numpy as np
import pandas as pd
from loguru import logger


class QCFlag(Enum):
    """Quality control flag values."""

    PASSED = "P"  # Passed all checks
    DUPLICATE = "D"  # Duplicate value
    GAP_FILLED = "G"  # Value was interpolated
    SUSPECT_RANGE = "R"  # Outside valid range
    SUSPECT_SPATIAL = "S"  # Spatial consistency check failed
    SUSPECT_TEMPORAL = "T"  # Temporal consistency check failed
    SUSPECT_CLIMATE = "C"  # Exceeds climatological bounds
    FAILED = "F"  # Failed quality check, value removed


@dataclass
class QCConfig:
    """Configuration for quality control checks."""

    # Temperature bounds (°C)
    temp_min: float = -90.0
    temp_max: float = 60.0
    temp_daily_change_max: float = 30.0  # Max change between consecutive days

    # Precipitation bounds (mm)
    precip_min: float = 0.0
    precip_max: float = 1000.0  # Single day max

    # Wind bounds (m/s)
    wind_min: float = 0.0
    wind_max: float = 120.0

    # Pressure bounds (hPa)
    pressure_min: float = 870.0
    pressure_max: float = 1085.0

    # Spike detection
    spike_threshold: float = 4.0  # Standard deviations

    # Climatology bounds (number of standard deviations from monthly mean)
    climate_std_threshold: float = 5.0

    # Gap filling
    max_gap_hours: int = 6  # Maximum gap to interpolate for hourly data
    max_gap_days: int = 3  # Maximum gap to interpolate for daily data


class QualityController:
    """
    Applies quality control checks to weather observation data.

    Checks include:
    1. Range checks (physical bounds)
    2. Temporal consistency (spike detection)
    3. Spatial consistency (comparison with neighbors)
    4. Climatological bounds
    5. Duplicate detection

    Example usage:
        qc = QualityController()
        df_clean, flags = qc.process(df)
    """

    def __init__(self, config: Optional[QCConfig] = None):
        self.config = config or QCConfig()
        self._climatology: Optional[pd.DataFrame] = None

    def process(
        self,
        df: pd.DataFrame,
        station_id: Optional[str] = None,
    ) -> tuple[pd.DataFrame, pd.DataFrame]:
        """
        Apply all quality control checks to a DataFrame.

        Args:
            df: DataFrame with datetime index and weather variable columns
            station_id: Optional station identifier for logging

        Returns:
            Tuple of (cleaned_df, flags_df) where flags_df contains QC flags
        """
        logger.info(f"Running QC on {len(df)} records" + (f" for {station_id}" if station_id else ""))

        # Initialize flags DataFrame
        flags = pd.DataFrame(index=df.index)
        for col in df.columns:
            flags[f"{col}_flag"] = QCFlag.PASSED.value

        # Create working copy
        df_clean = df.copy()

        # 1. Range checks
        df_clean, flags = self._range_check(df_clean, flags)

        # 2. Temporal consistency (spike detection)
        df_clean, flags = self._temporal_check(df_clean, flags)

        # 3. Duplicate detection
        df_clean, flags = self._duplicate_check(df_clean, flags)

        # 4. Climatological bounds (if climatology is loaded)
        if self._climatology is not None:
            df_clean, flags = self._climate_check(df_clean, flags, station_id)

        # Count flags
        for col in df.columns:
            flag_col = f"{col}_flag"
            if flag_col in flags.columns:
                flag_counts = flags[flag_col].value_counts()
                for flag, count in flag_counts.items():
                    if flag != QCFlag.PASSED.value:
                        logger.debug(f"{col}: {count} records flagged as {flag}")

        # Calculate overall pass rate
        total_checks = len(df) * len(df.columns)
        passed = sum(
            (flags[f"{col}_flag"] == QCFlag.PASSED.value).sum()
            for col in df.columns
            if f"{col}_flag" in flags.columns
        )
        pass_rate = passed / total_checks if total_checks > 0 else 0
        logger.info(f"QC pass rate: {pass_rate:.1%}")

        return df_clean, flags

    def _range_check(
        self,
        df: pd.DataFrame,
        flags: pd.DataFrame,
    ) -> tuple[pd.DataFrame, pd.DataFrame]:
        """Apply physical range checks."""
        cfg = self.config

        # Temperature columns
        for col in ["TMAX", "TMIN", "TAVG", "temperature", "temp_mean", "temp_max", "temp_min"]:
            if col in df.columns:
                mask = (df[col] < cfg.temp_min) | (df[col] > cfg.temp_max)
                flags.loc[mask, f"{col}_flag"] = QCFlag.SUSPECT_RANGE.value
                df.loc[mask, col] = np.nan

        # TMAX should be >= TMIN
        if "TMAX" in df.columns and "TMIN" in df.columns:
            mask = df["TMAX"] < df["TMIN"]
            flags.loc[mask, "TMAX_flag"] = QCFlag.SUSPECT_RANGE.value
            flags.loc[mask, "TMIN_flag"] = QCFlag.SUSPECT_RANGE.value

        # Precipitation
        for col in ["PRCP", "precipitation", "precip", "precipitation_1h", "precipitation_6h"]:
            if col in df.columns:
                mask = (df[col] < cfg.precip_min) | (df[col] > cfg.precip_max)
                flags.loc[mask, f"{col}_flag"] = QCFlag.SUSPECT_RANGE.value
                df.loc[mask, col] = np.nan

        # Wind speed
        for col in ["wind_speed", "AWND", "wind_gust"]:
            if col in df.columns:
                mask = (df[col] < cfg.wind_min) | (df[col] > cfg.wind_max)
                flags.loc[mask, f"{col}_flag"] = QCFlag.SUSPECT_RANGE.value
                df.loc[mask, col] = np.nan

        # Pressure
        for col in ["sea_level_pressure", "station_pressure", "pressure"]:
            if col in df.columns:
                mask = (df[col] < cfg.pressure_min) | (df[col] > cfg.pressure_max)
                flags.loc[mask, f"{col}_flag"] = QCFlag.SUSPECT_RANGE.value
                df.loc[mask, col] = np.nan

        return df, flags

    def _temporal_check(
        self,
        df: pd.DataFrame,
        flags: pd.DataFrame,
    ) -> tuple[pd.DataFrame, pd.DataFrame]:
        """
        Check for temporal consistency (spike detection).

        Uses a rolling window to detect values that deviate significantly
        from their temporal neighbors.
        """
        cfg = self.config

        for col in df.columns:
            if df[col].dtype not in [np.float64, np.float32, np.int64, np.int32]:
                continue

            # Calculate rolling statistics
            window = 7 if "temp" in col.lower() or col in ["TMAX", "TMIN", "TAVG"] else 3
            rolling_mean = df[col].rolling(window, center=True, min_periods=1).mean()
            rolling_std = df[col].rolling(window, center=True, min_periods=1).std()

            # Flag values that deviate too much from rolling mean
            deviation = np.abs(df[col] - rolling_mean)
            threshold = cfg.spike_threshold * rolling_std.clip(lower=0.1)  # Minimum std

            mask = deviation > threshold
            mask = mask & ~df[col].isna()  # Don't flag already-missing values

            if mask.any():
                # Update flags (don't overwrite worse flags)
                current_flags = flags[f"{col}_flag"]
                new_flags = current_flags.where(
                    current_flags != QCFlag.PASSED.value,
                    QCFlag.SUSPECT_TEMPORAL.value,
                )
                flags.loc[mask, f"{col}_flag"] = new_flags[mask]

                # Optionally remove values (or just flag them)
                # df.loc[mask, col] = np.nan

        return df, flags

    def _duplicate_check(
        self,
        df: pd.DataFrame,
        flags: pd.DataFrame,
    ) -> tuple[pd.DataFrame, pd.DataFrame]:
        """
        Check for duplicate records.

        Flags rows with identical timestamps or suspiciously repeated values.
        """
        # Check for duplicate indices
        if df.index.duplicated().any():
            dup_mask = df.index.duplicated(keep="first")
            for col in df.columns:
                flag_col = f"{col}_flag"
                if flag_col in flags.columns:
                    flags.loc[dup_mask, flag_col] = QCFlag.DUPLICATE.value

            # Remove duplicates (keep first)
            df = df[~df.index.duplicated(keep="first")]
            flags = flags[~flags.index.duplicated(keep="first")]

        # Check for stuck sensors (many repeated values)
        for col in df.columns:
            if df[col].dtype not in [np.float64, np.float32, np.int64, np.int32]:
                continue

            # Count consecutive identical values
            shifted = df[col].shift(1)
            same_as_prev = df[col] == shifted
            consecutive_same = same_as_prev.groupby((~same_as_prev).cumsum()).cumsum()

            # Flag if more than 5 consecutive identical values (possible stuck sensor)
            stuck_mask = consecutive_same > 5
            if stuck_mask.any():
                logger.debug(f"Possible stuck sensor detected in {col}")
                # Just log, don't automatically flag (could be valid calm conditions)

        return df, flags

    def _climate_check(
        self,
        df: pd.DataFrame,
        flags: pd.DataFrame,
        station_id: Optional[str] = None,
    ) -> tuple[pd.DataFrame, pd.DataFrame]:
        """
        Check values against climatological bounds.

        Requires climatology data to be loaded first.
        """
        if self._climatology is None:
            return df, flags

        cfg = self.config

        # Get month for each record
        months = df.index.month

        for col in df.columns:
            if col not in self._climatology.columns:
                continue

            # Get climatology for each month
            clim_mean = months.map(
                lambda m: self._climatology.loc[m, f"{col}_mean"]
                if m in self._climatology.index
                else np.nan
            )
            clim_std = months.map(
                lambda m: self._climatology.loc[m, f"{col}_std"]
                if m in self._climatology.index
                else np.nan
            )

            # Flag values outside climatological bounds
            deviation = np.abs(df[col] - clim_mean)
            threshold = cfg.climate_std_threshold * clim_std

            mask = deviation > threshold
            mask = mask & ~df[col].isna()

            if mask.any():
                flags.loc[mask, f"{col}_flag"] = QCFlag.SUSPECT_CLIMATE.value

        return df, flags

    def load_climatology(self, path: str) -> None:
        """
        Load climatology data for climate checks.

        Expects a CSV with columns: month, {variable}_mean, {variable}_std
        """
        self._climatology = pd.read_csv(path, index_col="month")
        logger.info(f"Loaded climatology with {len(self._climatology)} months")

    def fill_gaps(
        self,
        df: pd.DataFrame,
        method: str = "linear",
        max_gap: Optional[int] = None,
    ) -> tuple[pd.DataFrame, pd.DataFrame]:
        """
        Fill small gaps in the data using interpolation.

        Args:
            df: DataFrame with datetime index
            method: Interpolation method ('linear', 'time', 'spline')
            max_gap: Maximum gap size to fill (uses config default if None)

        Returns:
            Tuple of (filled_df, flags_df) indicating which values were interpolated
        """
        if max_gap is None:
            # Determine if hourly or daily based on index frequency
            if len(df) > 1:
                freq = pd.infer_freq(df.index)
                if freq and "H" in freq:
                    max_gap = self.config.max_gap_hours
                else:
                    max_gap = self.config.max_gap_days
            else:
                max_gap = self.config.max_gap_days

        # Track which values were interpolated
        was_null = df.isna()

        # Interpolate
        df_filled = df.interpolate(method=method, limit=max_gap)

        # Create flags for interpolated values
        flags = pd.DataFrame(index=df.index)
        for col in df.columns:
            flags[f"{col}_flag"] = np.where(
                was_null[col] & ~df_filled[col].isna(),
                QCFlag.GAP_FILLED.value,
                QCFlag.PASSED.value,
            )

        return df_filled, flags


def main():
    """CLI entry point for running quality control."""
    import argparse

    parser = argparse.ArgumentParser(description="Run quality control on weather data")
    parser.add_argument("input", help="Input CSV or Parquet file")
    parser.add_argument("output", help="Output file path")
    parser.add_argument("--climatology", help="Optional climatology file for climate checks")

    args = parser.parse_args()

    # Load data
    if args.input.endswith(".parquet"):
        df = pd.read_parquet(args.input)
    else:
        df = pd.read_csv(args.input, index_col=0, parse_dates=True)

    # Run QC
    qc = QualityController()
    if args.climatology:
        qc.load_climatology(args.climatology)

    df_clean, flags = qc.process(df)

    # Save
    if args.output.endswith(".parquet"):
        df_clean.to_parquet(args.output)
        flags.to_parquet(args.output.replace(".parquet", "_flags.parquet"))
    else:
        df_clean.to_csv(args.output)
        flags.to_csv(args.output.replace(".csv", "_flags.csv"))


if __name__ == "__main__":
    main()