#' Parse content of a DWD data zip file #' #' @param zip_path Local path to the .zip file #' @param start_date Optional filter start (POSIXct) #' @param end_date Optional filter end (POSIXct) #' @return Tibble with parsed data or NULL #' Generate DWD Missing Value Strings #' #' @param base Basic codes like -999, 8000 #' @param nspace Number of leading spaces to include (DWD files often have 7 characters wide columns) #' @return Vector of strings generate_dwd_na_strings <- function(base = c("-999", "-777", "-888", "8000", "9999", "-9999", "999.9", "8000.0", "800.0", "-999.0"), nspace = 10) { res <- base # Add decimal variations res <- unique(c(res, paste0(base, ".0"), paste0(base, ".00"))) # Add leading spaces for (i in 1:nspace) { res <- unique(c(res, paste0(strrep(" ", i), base))) } res <- unique(c(res, "")) return(res) } #' Parse content of a DWD data zip file #' #' @param zip_path Local path to the .zip file #' @param start_date Optional filter start (POSIXct) #' @param end_date Optional filter end (POSIXct) #' @return Tibble with parsed data or NULL #' Parse content of a DWD data zip file #' #' @param zip_path Local path to the .zip file #' @param start_date Optional filter start (POSIXct) #' @param end_date Optional filter end (POSIXct) #' @return Tibble with parsed data or NULL read_dwd_data <- function(zip_path, start_date = NULL, end_date = NULL, extract_metadata = FALSE) { # Create temp dir for extraction exdir <- tempfile() dir.create(exdir) on.exit(unlink(exdir, recursive = TRUE)) # Unzip - OPTIMIZED: List first, then extract only the data file # This avoids extracting 10+ metadata files which saves IO tryCatch( { # List files in the zip file_list <- unzip(zip_path, list = TRUE) # Find the data file (produkt*.txt) # We look for the largest file matching the pattern, just in case # Typical DWD name: produkt_param_date_station.txt data_files <- file_list$Name[grepl("^produkt.*\\.txt$", file_list$Name)] if (length(data_files) == 0) { return(NULL) } # Determine which file to extract (largest if multiple, though usually one) if (length(data_files) > 1) { # Get sizes sizes <- file_list$Length[match(data_files, file_list$Name)] target_file <- data_files[which.max(sizes)] } else { target_file <- data_files[1] } # Extract ONLY the target file unzip(zip_path, files = target_file, exdir = exdir) data_file <- file.path(exdir, target_file) # Also extract metadata file for granular parameter validity periods granular_meta_df <- NULL if (extract_metadata) { meta_files <- file_list$Name[grepl("^Metadaten_Parameter.*\\.txt$", file_list$Name, ignore.case = TRUE)] if (length(meta_files) > 0) { meta_file <- meta_files[1] tryCatch( { unzip(zip_path, files = meta_file, exdir = exdir) meta_path <- file.path(exdir, meta_file) if (file.exists(meta_path)) { # Read semicolon-separated metadata file meta_dt <- data.table::fread(meta_path, sep = ";", header = TRUE, fill = TRUE, encoding = "Latin-1") # Expected columns usually include: Parameter, Von_Datum, Bis_Datum # Normalize column names names(meta_dt) <- tolower(trimws(names(meta_dt))) if ("parameter" %in% names(meta_dt)) { granular_meta_df <- data.frame( param = trimws(meta_dt$parameter), start_date = if ("von_datum" %in% names(meta_dt)) as.character(meta_dt$von_datum) else NA_character_, end_date = if ("bis_datum" %in% names(meta_dt)) as.character(meta_dt$bis_datum) else NA_character_, stringsAsFactors = FALSE ) } } }, error = function(e) { # Silent fail for metadata - not critical } ) } } }, error = function(e) { warning("Unzip failed: ", e$message) return(NULL) } ) # Read Data # DWD standard: semicolon sep na_vec <- generate_dwd_na_strings() # fread strips whitespace by default, so we should pass trimmed NA strings to avoid warnings/errors na_vec_clean <- unique(trimws(na_vec)) # Ensure empty string is included if not already if (!"" %in% na_vec_clean) na_vec_clean <- c(na_vec_clean, "") tryCatch( { # Use data.table::fread for faster parsing # Read everything as character first to separate schema from data loading safety dt <- data.table::fread( data_file, sep = ";", header = TRUE, na.strings = na_vec_clean, colClasses = "character", # Force character to avoid type inference issues with weird DWD codes encoding = "Latin-1", showProgress = FALSE, data.table = TRUE ) # Sanitize column names: remove whitespace old_names <- names(dt) names(dt) <- trimws(names(dt)) # Standardize Date Column Name if ("MESS_DATUM_BEGINN" %in% names(dt)) { data.table::setnames(dt, "MESS_DATUM_BEGINN", "MESS_DATUM") } if (!"MESS_DATUM" %in% names(dt)) { return(NULL) } # --- Optimization: Pre-filter rows by date string BEFORE heavy processing --- # DWD dates are usually monotonic strings/numbers: YYYYMMDD, YYYYMMDDHH, etc. # We can filter lexically or numerically without parsing to POSIXct first. if (!is.null(start_date) || !is.null(end_date)) { # Determine format from first row sample_date <- dt$MESS_DATUM[1] nch <- nchar(sample_date) # Format mappings correspond to DWD standards fmt_str <- if (nch == 8) "%Y%m%d" else if (nch == 10) "%Y%m%d%H" else if (nch == 12) "%Y%m%d%H%M" else "%Y%m%d%H" if (!is.na(nch)) { if (!is.null(start_date)) { # Format request date to match DWD string format # Explicitly use UTC to avoid timezone shifts during string formatting if inputs are UTC s_val <- format(as.POSIXct(start_date), format = fmt_str, tz = "UTC") dt <- dt[MESS_DATUM >= s_val] } if (!is.null(end_date)) { # Add buffer (end of day) if needed, but usually exact match on string works # If end_date is 2025-01-01, we want up to 2025-01-01 23:59 e_limit <- as.POSIXct(end_date) + days(1) e_val <- format(e_limit, format = fmt_str, tz = "UTC") dt <- dt[MESS_DATUM <= e_val] } } } # Convert to tibble/data.frame for existing dplyr pipeline compatibility # (Keeping existing logic for column mapping to minimize regression risk) df <- as_tibble(dt) # Parse Date logic (Original) raw_date <- as.character(df$MESS_DATUM) if (length(raw_date) == 0) { return(NULL) } # If filtered to empty nch <- nchar(raw_date[1]) fmt <- if (nch == 8) "%Y%m%d" else if (nch == 10) "%Y%m%d%H" else if (nch == 12) "%Y%m%d%H%M" else "%Y%m%d%H" df$datetime <- as.POSIXct(raw_date, format = fmt, tz = "UTC") # Filter valid dates (Safety check) df <- df[!is.na(df$datetime), ] # Additional safety window filter (dates might strictly parse differently than strings) if (!is.null(start_date)) { s_limit <- as.POSIXct(start_date) df <- df[df$datetime >= s_limit, ] } if (!is.null(end_date)) { e_limit <- as.POSIXct(end_date) + days(1) df <- df[df$datetime <= e_limit, ] } # Column Mapping weather_cols <- c( "temp", "temp_min", "temp_max", "temp_min_avg", "temp_max_avg", "rh", "dew_point", "abs_humidity", "vapor_pressure", "wet_bulb_temp", "precip", "wind_speed", "wind_dir", "pressure", "station_pressure", "cloud_cover", "wind_gust_max", "solar_global", "sunshine_duration", "temp_5cm", "soil_temp_2cm", "soil_temp_5cm", "soil_temp_10cm", "soil_temp_20cm", "soil_temp_50cm", "soil_temp_100cm", "soil_temp_min_5cm", "snow_depth", "snow_water_equiv", "snow_fresh_sum", "snow_depth_sum", "thunderstorm", "glaze", "graupel", "hail", "fog", "frost", "storm_6", "storm_8", "dew", "precip_net_thunderstorm", "precip_net_graupel", "precip_net_hail", "precip_net_fog", "visibility", "weather_code", "cloud_layer1_code", "cloud_layer1_height", "cloud_layer1_amount", "cloud_layer2_code", "cloud_layer2_height", "cloud_layer2_amount", "cloud_layer3_code", "cloud_layer3_height", "cloud_layer3_amount", "cloud_layer4_code", "cloud_layer4_height", "cloud_layer4_amount" ) weather_text_cols <- c( "cloud_cover_indicator", "cloud_layer1_abbrev", "cloud_layer2_abbrev", "cloud_layer3_abbrev", "cloud_layer4_abbrev", "visibility_indicator", "weather_text" ) df <- df %>% rename_with(~ case_when( # Hourly Mappings . == "TT_TU" ~ "temp", . == "RF_TU" ~ "rh", . == "TT" ~ "temp", . == "TD" ~ "dew_point", . == "ABSF_STD" ~ "abs_humidity", . == "VP_STD" ~ "vapor_pressure", . == "TF_STD" ~ "wet_bulb_temp", . == "P_STD" ~ "pressure", . == "TT_STD" ~ "temp", . == "RF_STD" ~ "rh", . == "TD_STD" ~ "dew_point", . == "R1" ~ "precip", . == "F" ~ "wind_speed", . == "D" ~ "wind_dir", . == "P" ~ "pressure", . == "P0" ~ "station_pressure", . %in% c("N_8", "V_N") ~ "cloud_cover", . == "V_N_I" ~ "cloud_cover_indicator", . == "V_S1_CS" ~ "cloud_layer1_code", . == "V_S1_CSA" ~ "cloud_layer1_abbrev", . == "V_S1_HHS" ~ "cloud_layer1_height", . == "V_S1_NS" ~ "cloud_layer1_amount", . == "V_S2_CS" ~ "cloud_layer2_code", . == "V_S2_CSA" ~ "cloud_layer2_abbrev", . == "V_S2_HHS" ~ "cloud_layer2_height", . == "V_S2_NS" ~ "cloud_layer2_amount", . == "V_S3_CS" ~ "cloud_layer3_code", . == "V_S3_CSA" ~ "cloud_layer3_abbrev", . == "V_S3_HHS" ~ "cloud_layer3_height", . == "V_S3_NS" ~ "cloud_layer3_amount", . == "V_S4_CS" ~ "cloud_layer4_code", . == "V_S4_CSA" ~ "cloud_layer4_abbrev", . == "V_S4_HHS" ~ "cloud_layer4_height", . == "V_S4_NS" ~ "cloud_layer4_amount", . == "V_VV" ~ "visibility", . == "V_VV_I" ~ "visibility_indicator", . == "WW" ~ "weather_code", . == "WW_Text" ~ "weather_text", . %in% c("FX_10", "FX_911") ~ "wind_gust_max", . %in% c("FG_LBERG", "FG_STRAHL") ~ "solar_global", . %in% c("SD_LBERG", "SD_STRAHL", "SD_SO") ~ "sunshine_duration", # 10-Minute Mappings . == "TT_10" ~ "temp", . == "RF_10" ~ "rh", . == "TD_10" ~ "dew_point", . == "TM5_10" ~ "temp_5cm", # Soil temp logic? or 5cm air? DWD usually 5cm above ground . == "RWS_10" ~ "precip", . == "RWS_DAU_10" ~ "precip_duration", # Custom? . == "RWS_IND_10" ~ "precip_ind", . == "FF_10" ~ "wind_speed", . == "DD_10" ~ "wind_dir", . == "FX_10" ~ "wind_gust_max", . == "FNX_10" ~ "wind_gust_min", . == "DS_10" ~ "diffuse_radiation", . == "GS_10" ~ "solar_global", . == "SD_10" ~ "sunshine_duration", . == "LS_10" ~ "longwave_radiation", # Atmospheric radiation? . == "TX_10" ~ "temp_max", . == "TN_10" ~ "temp_min", . == "PP_10" ~ "station_pressure", # Daily Mappings . == "TGK" ~ "soil_temp_min_5cm", . %in% c("TMK", "TM_K") ~ "temp", . == "TNK" ~ "temp_min", . == "TXK" ~ "temp_max", . %in% c("RSK", "RS_K", "RS") ~ "precip", . == "FM" ~ "wind_speed", . == "FX" ~ "wind_gust_max", . %in% c("SDK", "SD_SO") ~ "sunshine_duration", # SD_SO is daily sunshine . == "UPM" ~ "rh", . == "PM" ~ "pressure", . == "NM" ~ "cloud_cover", # Monthly Mappings . == "MO_TT" ~ "temp", . == "MX_TX" ~ "temp_max", # Absolute Max . == "MX_TN" ~ "temp_min", # Absolute Min . == "MO_TX" ~ "temp_max_avg", # Average Daily Max . == "MO_TN" ~ "temp_min_avg", # Average Daily Min . == "MO_RR" ~ "precip", . == "MX_RS" ~ "precip_max_day", # Not in standard weather_cols but useful . == "MO_SD_S" ~ "sunshine_duration", . == "MO_N" ~ "cloud_cover", . == "MO_FK" ~ "wind_speed", . == "MX_FX" ~ "wind_gust_max", . == "MO_NSH" ~ "snow_fresh_sum", # Monthly Fresh Snow Sum . == "MO_SH_S" ~ "snow_depth_sum", # Monthly Snow Depth Sum # Soil Temperature . == "V_TE002" ~ "soil_temp_2cm", . == "V_TE005" ~ "soil_temp_5cm", . == "V_TE010" ~ "soil_temp_10cm", . == "V_TE020" ~ "soil_temp_20cm", . == "V_TE050" ~ "soil_temp_50cm", . == "V_TE100" ~ "soil_temp_100cm", . == "V_TE002M" ~ "soil_temp_2cm", . == "V_TE005M" ~ "soil_temp_5cm", . == "V_TE010M" ~ "soil_temp_100cm", . == "V_TE020M" ~ "soil_temp_20cm", . == "V_TE050M" ~ "soil_temp_50cm", # Water Equivalent / Snow . == "ASH_6" ~ "snow_depth_set", . == "SH_TAG" ~ "snow_depth", . == "WASH_6" ~ "snow_water_equiv", . == "WAAS_6" ~ "snow_water_equiv_excavated", # Weather Phenomena . == "GEWITTER" ~ "thunderstorm", . == "GLATTEIS" ~ "glaze", . == "GRAUPEL" ~ "graupel", . == "HAGEL" ~ "hail", . == "NEBEL" ~ "fog", . == "REIF" ~ "frost", . == "STURM_6" ~ "storm_6", . == "STURM_8" ~ "storm_8", . == "TAU" ~ "dew", # Preciptation Network Weather Phenomena . == "RR_GEWITTER" ~ "precip_net_thunderstorm", . == "RR_GRAUPEL" ~ "precip_net_graupel", . == "RR_HAGEL" ~ "precip_net_hail", . == "RR_NEBEL" ~ "precip_net_fog", TRUE ~ . )) # Ensure output columns are numeric and all missing value variants (like 8000, -999) are NA # We apply this to specific weather columns for precision df <- df %>% mutate(across(any_of(weather_cols), ~ { # Suppress coercion warnings here because we handle the NA conversion explicitly val <- suppressWarnings(as.numeric(as.character(.))) # Catch 8000, 9999, -999 and anything outside reasonable physical ranges # Broad filter for DWD flags: >= 7999 or <= -998 res <- ifelse(is.na(val) | val >= 7999 | val <= -998, NA_real_, val) res })) if (length(weather_text_cols) > 0) { df <- df %>% mutate(across(any_of(weather_text_cols), ~ { val <- trimws(as.character(.)) val[val == ""] <- NA_character_ val })) } # Parameter-specific safety bounds (Physical sanity check) if ("temp" %in% names(df)) df$temp[is.na(df$temp) | df$temp < -80 | df$temp > 60] <- NA_real_ if ("temp_min" %in% names(df)) df$temp_min[is.na(df$temp_min) | df$temp_min < -80 | df$temp_min > 60] <- NA_real_ if ("temp_max" %in% names(df)) df$temp_max[is.na(df$temp_max) | df$temp_max < -80 | df$temp_max > 60] <- NA_real_ if ("dew_point" %in% names(df)) df$dew_point[is.na(df$dew_point) | df$dew_point < -90 | df$dew_point > 60] <- NA_real_ if ("wet_bulb_temp" %in% names(df)) df$wet_bulb_temp[is.na(df$wet_bulb_temp) | df$wet_bulb_temp < -80 | df$wet_bulb_temp > 60] <- NA_real_ if ("rh" %in% names(df)) df$rh[is.na(df$rh) | df$rh < 0 | df$rh > 100] <- NA_real_ if ("pressure" %in% names(df)) df$pressure[is.na(df$pressure) | df$pressure < 800 | df$pressure > 1100] <- NA_real_ if ("station_pressure" %in% names(df)) df$station_pressure[is.na(df$station_pressure) | df$station_pressure < 500 | df$station_pressure > 1100] <- NA_real_ if ("soil_temp_min_5cm" %in% names(df)) df$soil_temp_min_5cm[is.na(df$soil_temp_min_5cm) | df$soil_temp_min_5cm < -80 | df$soil_temp_min_5cm > 60] <- NA_real_ if ("solar_global" %in% names(df)) df$solar_global[is.na(df$solar_global) | df$solar_global < 0 | df$solar_global > 3500] <- NA_real_ if ("snow_depth" %in% names(df)) df$snow_depth[is.na(df$snow_depth) | df$snow_depth < 0] <- NA_real_ # --- Fix: Ghost Pressure Data --- # DWD Moisture (TF), Temperature (TU), and DewPoint (TD) files often contain a "P_STD" (Pressure) column. # In many cases (e.g. Station 00917), this is a constant dummy value (e.g. 993.9 hPa derived from elevation) and NOT measured data. # We must ignore it unless the file is explicitly a Pressure file (P0) or Climate file (KL). # Note: We check the internal data file name (target_file), not the zip path, because the app uses temp file names. # EXCEPTION: For 10-minute resolution (10minutenwerte), data is valid. fname_lower <- tolower(target_file) is_tf_tu_td <- any(sapply(c("_tf_", "_tu_", "_td_"), function(p) grepl(p, fname_lower, fixed = TRUE))) is_10min <- grepl("10minutenwerte", fname_lower, fixed = TRUE) | grepl("zehn_min", fname_lower, fixed = TRUE) if (is_tf_tu_td && !is_10min) { if ("pressure" %in% names(df)) df$pressure <- NULL if ("station_pressure" %in% names(df)) df$station_pressure <- NULL } df <- df %>% select(datetime, any_of(c(weather_cols, weather_text_cols))) result_df <- as_tibble(df) # Attach granular metadata if available if (!is.null(granular_meta_df) && nrow(granular_meta_df) > 0) { attr(result_df, "granular_meta") <- granular_meta_df } result_df }, error = function(e) { warning("Failed to parse ", basename(zip_path), ": ", e$message) return(NULL) } ) } #' Fetch and Parse All Data for a Station #' #' @param station_id GHCN/DWD ID (string 5 digits) #' @param index_df The global file index #' @param start_date Filter start #' @param end_date Filter end #' @return Merged tibble of data for the window fetch_and_parse_station_data <- function(station_id, index_df, start_date = NULL, end_date = NULL, status_cb = NULL) { # Helper to call callback if it exists # status_cb(message, detail = NULL, value = NULL) notify <- function(msg, detail = NULL, value = NULL) { if (is.function(status_cb)) status_cb(msg, detail, value) } targets <- index_df %>% filter(id == station_id) if (nrow(targets) == 0) { notify("No data found in index for this station.", value = 1) return(NULL) } all_data_list <- list() n_targets <- nrow(targets) last_pct <- -1 for (i in seq_len(n_targets)) { target_row <- targets[i, ] url <- target_row$url tmp_zip <- tempfile(fileext = ".zip") dl_status <- tryCatch( { # Use curl with a handle to get progress h <- curl::new_handle() curl::handle_setopt(h, noprogress = FALSE, progressfunction = function(down, up) { if (down[1] > 0) { pct <- round(down[2] / down[1] * 100) if (pct != last_pct) { msg <- sprintf("Downloading file %d/%d...", i, n_targets) detail <- sprintf("%s (%d%%)", format_bytes(down[1]), pct) # Approximate progress: 0 to 0.8 range for downloading val <- ((i - 1) / n_targets) + (pct / 100 * (0.8 / n_targets)) notify(msg, detail = detail, value = val) last_pct <<- pct } } TRUE }) curl::curl_fetch_disk(url, tmp_zip, handle = h) TRUE }, error = function(e) { warning("Download failed for ", url, ": ", e$message) FALSE } ) if (dl_status) { notify(sprintf("Parsing file %d/%d...", i, n_targets), value = (i - 0.2) / n_targets) parsed <- read_dwd_data(tmp_zip, start_date, end_date) if (!is.null(parsed) && nrow(parsed) > 0) { all_data_list[[length(all_data_list) + 1]] <- parsed } unlink(tmp_zip) } } if (length(all_data_list) == 0) { notify("Failed to parse any data files.", value = 1) return(NULL) } notify("Merging multiple files...", value = 0.9) final_df <- purrr::reduce(all_data_list, full_join, by = "datetime") # Identify unique weather variables across all joined columns weather_vars <- c( "temp", "temp_min", "temp_max", "temp_min_avg", "temp_max_avg", "rh", "dew_point", "abs_humidity", "vapor_pressure", "wet_bulb_temp", "precip", "wind_speed", "wind_dir", "pressure", "station_pressure", "cloud_cover", "cloud_cover_indicator", "wind_gust_max", "solar_global", "sunshine_duration", "soil_temp_2cm", "soil_temp_5cm", "soil_temp_10cm", "soil_temp_20cm", "soil_temp_50cm", "soil_temp_100cm", "soil_temp_min_5cm", "snow_depth", "snow_water_equiv", "snow_fresh_sum", "snow_depth_sum", "thunderstorm", "glaze", "graupel", "hail", "fog", "frost", "storm_6", "storm_8", "dew", "precip_net_thunderstorm", "precip_net_graupel", "precip_net_hail", "precip_net_fog", "visibility", "visibility_indicator", "weather_code", "weather_text", "cloud_layer1_code", "cloud_layer1_abbrev", "cloud_layer1_height", "cloud_layer1_amount", "cloud_layer2_code", "cloud_layer2_abbrev", "cloud_layer2_height", "cloud_layer2_amount", "cloud_layer3_code", "cloud_layer3_abbrev", "cloud_layer3_height", "cloud_layer3_amount", "cloud_layer4_code", "cloud_layer4_abbrev", "cloud_layer4_height", "cloud_layer4_amount" ) available_cols <- names(final_df) # Pre-allocate clean_df with datetime clean_df <- final_df %>% select(datetime) for (v in weather_vars) { # Find all columns that are exactly 'v' or 'v.x', 'v.y', 'v.x.x' etc. v_cols <- available_cols[grepl(paste0("^", v, "(\\.|$)"), available_cols)] if (length(v_cols) > 0) { # Coalesce all versions of this variable # We use do.call(coalesce, ...) to handle any number of duplicates clean_df[[v]] <- do.call(coalesce, final_df[v_cols]) } } # Final cleanup: arrange and filter by date window clean_df <- clean_df %>% distinct(datetime, .keep_all = TRUE) %>% arrange(datetime) if (!is.null(start_date) && !is.null(end_date)) { clean_df <- clean_df %>% filter(datetime >= as.POSIXct(start_date), datetime <= as.POSIXct(end_date) + days(1)) } n_rows <- nrow(clean_df) notify(sprintf("Success: %d rows processed.", n_rows)) return(clean_df) } # Helper to format bytes format_bytes <- function(x) { if (is.na(x)) { return("Unknown size") } if (x < 1024) { return(paste(x, "B")) } if (x < 1024^2) { return(paste(round(x / 1024, 1), "KB")) } return(paste(round(x / 1024^2, 1), "MB")) }