Lilith-Weather / data /processing /quality_control.py
consigcody94's picture
Upload source code and documentation
8bcb60f verified
"""
Quality Control for GHCN Data
Implements quality checks and cleaning procedures for weather observations.
Based on GHCN quality control flags and additional statistical checks.
"""
from dataclasses import dataclass
from enum import Enum
from typing import Optional
import numpy as np
import pandas as pd
from loguru import logger
class QCFlag(Enum):
"""Quality control flag values."""
PASSED = "P" # Passed all checks
DUPLICATE = "D" # Duplicate value
GAP_FILLED = "G" # Value was interpolated
SUSPECT_RANGE = "R" # Outside valid range
SUSPECT_SPATIAL = "S" # Spatial consistency check failed
SUSPECT_TEMPORAL = "T" # Temporal consistency check failed
SUSPECT_CLIMATE = "C" # Exceeds climatological bounds
FAILED = "F" # Failed quality check, value removed
@dataclass
class QCConfig:
"""Configuration for quality control checks."""
# Temperature bounds (°C)
temp_min: float = -90.0
temp_max: float = 60.0
temp_daily_change_max: float = 30.0 # Max change between consecutive days
# Precipitation bounds (mm)
precip_min: float = 0.0
precip_max: float = 1000.0 # Single day max
# Wind bounds (m/s)
wind_min: float = 0.0
wind_max: float = 120.0
# Pressure bounds (hPa)
pressure_min: float = 870.0
pressure_max: float = 1085.0
# Spike detection
spike_threshold: float = 4.0 # Standard deviations
# Climatology bounds (number of standard deviations from monthly mean)
climate_std_threshold: float = 5.0
# Gap filling
max_gap_hours: int = 6 # Maximum gap to interpolate for hourly data
max_gap_days: int = 3 # Maximum gap to interpolate for daily data
class QualityController:
"""
Applies quality control checks to weather observation data.
Checks include:
1. Range checks (physical bounds)
2. Temporal consistency (spike detection)
3. Spatial consistency (comparison with neighbors)
4. Climatological bounds
5. Duplicate detection
Example usage:
qc = QualityController()
df_clean, flags = qc.process(df)
"""
def __init__(self, config: Optional[QCConfig] = None):
self.config = config or QCConfig()
self._climatology: Optional[pd.DataFrame] = None
def process(
self,
df: pd.DataFrame,
station_id: Optional[str] = None,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Apply all quality control checks to a DataFrame.
Args:
df: DataFrame with datetime index and weather variable columns
station_id: Optional station identifier for logging
Returns:
Tuple of (cleaned_df, flags_df) where flags_df contains QC flags
"""
logger.info(f"Running QC on {len(df)} records" + (f" for {station_id}" if station_id else ""))
# Initialize flags DataFrame
flags = pd.DataFrame(index=df.index)
for col in df.columns:
flags[f"{col}_flag"] = QCFlag.PASSED.value
# Create working copy
df_clean = df.copy()
# 1. Range checks
df_clean, flags = self._range_check(df_clean, flags)
# 2. Temporal consistency (spike detection)
df_clean, flags = self._temporal_check(df_clean, flags)
# 3. Duplicate detection
df_clean, flags = self._duplicate_check(df_clean, flags)
# 4. Climatological bounds (if climatology is loaded)
if self._climatology is not None:
df_clean, flags = self._climate_check(df_clean, flags, station_id)
# Count flags
for col in df.columns:
flag_col = f"{col}_flag"
if flag_col in flags.columns:
flag_counts = flags[flag_col].value_counts()
for flag, count in flag_counts.items():
if flag != QCFlag.PASSED.value:
logger.debug(f"{col}: {count} records flagged as {flag}")
# Calculate overall pass rate
total_checks = len(df) * len(df.columns)
passed = sum(
(flags[f"{col}_flag"] == QCFlag.PASSED.value).sum()
for col in df.columns
if f"{col}_flag" in flags.columns
)
pass_rate = passed / total_checks if total_checks > 0 else 0
logger.info(f"QC pass rate: {pass_rate:.1%}")
return df_clean, flags
def _range_check(
self,
df: pd.DataFrame,
flags: pd.DataFrame,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Apply physical range checks."""
cfg = self.config
# Temperature columns
for col in ["TMAX", "TMIN", "TAVG", "temperature", "temp_mean", "temp_max", "temp_min"]:
if col in df.columns:
mask = (df[col] < cfg.temp_min) | (df[col] > cfg.temp_max)
flags.loc[mask, f"{col}_flag"] = QCFlag.SUSPECT_RANGE.value
df.loc[mask, col] = np.nan
# TMAX should be >= TMIN
if "TMAX" in df.columns and "TMIN" in df.columns:
mask = df["TMAX"] < df["TMIN"]
flags.loc[mask, "TMAX_flag"] = QCFlag.SUSPECT_RANGE.value
flags.loc[mask, "TMIN_flag"] = QCFlag.SUSPECT_RANGE.value
# Precipitation
for col in ["PRCP", "precipitation", "precip", "precipitation_1h", "precipitation_6h"]:
if col in df.columns:
mask = (df[col] < cfg.precip_min) | (df[col] > cfg.precip_max)
flags.loc[mask, f"{col}_flag"] = QCFlag.SUSPECT_RANGE.value
df.loc[mask, col] = np.nan
# Wind speed
for col in ["wind_speed", "AWND", "wind_gust"]:
if col in df.columns:
mask = (df[col] < cfg.wind_min) | (df[col] > cfg.wind_max)
flags.loc[mask, f"{col}_flag"] = QCFlag.SUSPECT_RANGE.value
df.loc[mask, col] = np.nan
# Pressure
for col in ["sea_level_pressure", "station_pressure", "pressure"]:
if col in df.columns:
mask = (df[col] < cfg.pressure_min) | (df[col] > cfg.pressure_max)
flags.loc[mask, f"{col}_flag"] = QCFlag.SUSPECT_RANGE.value
df.loc[mask, col] = np.nan
return df, flags
def _temporal_check(
self,
df: pd.DataFrame,
flags: pd.DataFrame,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Check for temporal consistency (spike detection).
Uses a rolling window to detect values that deviate significantly
from their temporal neighbors.
"""
cfg = self.config
for col in df.columns:
if df[col].dtype not in [np.float64, np.float32, np.int64, np.int32]:
continue
# Calculate rolling statistics
window = 7 if "temp" in col.lower() or col in ["TMAX", "TMIN", "TAVG"] else 3
rolling_mean = df[col].rolling(window, center=True, min_periods=1).mean()
rolling_std = df[col].rolling(window, center=True, min_periods=1).std()
# Flag values that deviate too much from rolling mean
deviation = np.abs(df[col] - rolling_mean)
threshold = cfg.spike_threshold * rolling_std.clip(lower=0.1) # Minimum std
mask = deviation > threshold
mask = mask & ~df[col].isna() # Don't flag already-missing values
if mask.any():
# Update flags (don't overwrite worse flags)
current_flags = flags[f"{col}_flag"]
new_flags = current_flags.where(
current_flags != QCFlag.PASSED.value,
QCFlag.SUSPECT_TEMPORAL.value,
)
flags.loc[mask, f"{col}_flag"] = new_flags[mask]
# Optionally remove values (or just flag them)
# df.loc[mask, col] = np.nan
return df, flags
def _duplicate_check(
self,
df: pd.DataFrame,
flags: pd.DataFrame,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Check for duplicate records.
Flags rows with identical timestamps or suspiciously repeated values.
"""
# Check for duplicate indices
if df.index.duplicated().any():
dup_mask = df.index.duplicated(keep="first")
for col in df.columns:
flag_col = f"{col}_flag"
if flag_col in flags.columns:
flags.loc[dup_mask, flag_col] = QCFlag.DUPLICATE.value
# Remove duplicates (keep first)
df = df[~df.index.duplicated(keep="first")]
flags = flags[~flags.index.duplicated(keep="first")]
# Check for stuck sensors (many repeated values)
for col in df.columns:
if df[col].dtype not in [np.float64, np.float32, np.int64, np.int32]:
continue
# Count consecutive identical values
shifted = df[col].shift(1)
same_as_prev = df[col] == shifted
consecutive_same = same_as_prev.groupby((~same_as_prev).cumsum()).cumsum()
# Flag if more than 5 consecutive identical values (possible stuck sensor)
stuck_mask = consecutive_same > 5
if stuck_mask.any():
logger.debug(f"Possible stuck sensor detected in {col}")
# Just log, don't automatically flag (could be valid calm conditions)
return df, flags
def _climate_check(
self,
df: pd.DataFrame,
flags: pd.DataFrame,
station_id: Optional[str] = None,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Check values against climatological bounds.
Requires climatology data to be loaded first.
"""
if self._climatology is None:
return df, flags
cfg = self.config
# Get month for each record
months = df.index.month
for col in df.columns:
if col not in self._climatology.columns:
continue
# Get climatology for each month
clim_mean = months.map(
lambda m: self._climatology.loc[m, f"{col}_mean"]
if m in self._climatology.index
else np.nan
)
clim_std = months.map(
lambda m: self._climatology.loc[m, f"{col}_std"]
if m in self._climatology.index
else np.nan
)
# Flag values outside climatological bounds
deviation = np.abs(df[col] - clim_mean)
threshold = cfg.climate_std_threshold * clim_std
mask = deviation > threshold
mask = mask & ~df[col].isna()
if mask.any():
flags.loc[mask, f"{col}_flag"] = QCFlag.SUSPECT_CLIMATE.value
return df, flags
def load_climatology(self, path: str) -> None:
"""
Load climatology data for climate checks.
Expects a CSV with columns: month, {variable}_mean, {variable}_std
"""
self._climatology = pd.read_csv(path, index_col="month")
logger.info(f"Loaded climatology with {len(self._climatology)} months")
def fill_gaps(
self,
df: pd.DataFrame,
method: str = "linear",
max_gap: Optional[int] = None,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Fill small gaps in the data using interpolation.
Args:
df: DataFrame with datetime index
method: Interpolation method ('linear', 'time', 'spline')
max_gap: Maximum gap size to fill (uses config default if None)
Returns:
Tuple of (filled_df, flags_df) indicating which values were interpolated
"""
if max_gap is None:
# Determine if hourly or daily based on index frequency
if len(df) > 1:
freq = pd.infer_freq(df.index)
if freq and "H" in freq:
max_gap = self.config.max_gap_hours
else:
max_gap = self.config.max_gap_days
else:
max_gap = self.config.max_gap_days
# Track which values were interpolated
was_null = df.isna()
# Interpolate
df_filled = df.interpolate(method=method, limit=max_gap)
# Create flags for interpolated values
flags = pd.DataFrame(index=df.index)
for col in df.columns:
flags[f"{col}_flag"] = np.where(
was_null[col] & ~df_filled[col].isna(),
QCFlag.GAP_FILLED.value,
QCFlag.PASSED.value,
)
return df_filled, flags
def main():
"""CLI entry point for running quality control."""
import argparse
parser = argparse.ArgumentParser(description="Run quality control on weather data")
parser.add_argument("input", help="Input CSV or Parquet file")
parser.add_argument("output", help="Output file path")
parser.add_argument("--climatology", help="Optional climatology file for climate checks")
args = parser.parse_args()
# Load data
if args.input.endswith(".parquet"):
df = pd.read_parquet(args.input)
else:
df = pd.read_csv(args.input, index_col=0, parse_dates=True)
# Run QC
qc = QualityController()
if args.climatology:
qc.load_climatology(args.climatology)
df_clean, flags = qc.process(df)
# Save
if args.output.endswith(".parquet"):
df_clean.to_parquet(args.output)
flags.to_parquet(args.output.replace(".parquet", "_flags.parquet"))
else:
df_clean.to_csv(args.output)
flags.to_csv(args.output.replace(".csv", "_flags.csv"))
if __name__ == "__main__":
main()