|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
generate_dwd_na_strings <- function(base = c("-999", "-777", "-888", "8000", "9999", "-9999", "999.9", "8000.0", "800.0", "-999.0"), nspace = 10) { |
|
|
res <- base |
|
|
|
|
|
res <- unique(c(res, paste0(base, ".0"), paste0(base, ".00"))) |
|
|
|
|
|
for (i in 1:nspace) { |
|
|
res <- unique(c(res, paste0(strrep(" ", i), base))) |
|
|
} |
|
|
res <- unique(c(res, "")) |
|
|
return(res) |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
read_dwd_data <- function(zip_path, start_date = NULL, end_date = NULL, extract_metadata = FALSE) { |
|
|
|
|
|
exdir <- tempfile() |
|
|
dir.create(exdir) |
|
|
on.exit(unlink(exdir, recursive = TRUE)) |
|
|
|
|
|
|
|
|
|
|
|
tryCatch( |
|
|
{ |
|
|
|
|
|
file_list <- unzip(zip_path, list = TRUE) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
data_files <- file_list$Name[grepl("^produkt.*\\.txt$", file_list$Name)] |
|
|
|
|
|
if (length(data_files) == 0) { |
|
|
return(NULL) |
|
|
} |
|
|
|
|
|
|
|
|
if (length(data_files) > 1) { |
|
|
|
|
|
sizes <- file_list$Length[match(data_files, file_list$Name)] |
|
|
target_file <- data_files[which.max(sizes)] |
|
|
} else { |
|
|
target_file <- data_files[1] |
|
|
} |
|
|
|
|
|
|
|
|
unzip(zip_path, files = target_file, exdir = exdir) |
|
|
data_file <- file.path(exdir, target_file) |
|
|
|
|
|
|
|
|
granular_meta_df <- NULL |
|
|
if (extract_metadata) { |
|
|
meta_files <- file_list$Name[grepl("^Metadaten_Parameter.*\\.txt$", file_list$Name, ignore.case = TRUE)] |
|
|
if (length(meta_files) > 0) { |
|
|
meta_file <- meta_files[1] |
|
|
tryCatch( |
|
|
{ |
|
|
unzip(zip_path, files = meta_file, exdir = exdir) |
|
|
meta_path <- file.path(exdir, meta_file) |
|
|
if (file.exists(meta_path)) { |
|
|
|
|
|
meta_dt <- data.table::fread(meta_path, sep = ";", header = TRUE, fill = TRUE, encoding = "Latin-1") |
|
|
|
|
|
|
|
|
names(meta_dt) <- tolower(trimws(names(meta_dt))) |
|
|
if ("parameter" %in% names(meta_dt)) { |
|
|
granular_meta_df <- data.frame( |
|
|
param = trimws(meta_dt$parameter), |
|
|
start_date = if ("von_datum" %in% names(meta_dt)) as.character(meta_dt$von_datum) else NA_character_, |
|
|
end_date = if ("bis_datum" %in% names(meta_dt)) as.character(meta_dt$bis_datum) else NA_character_, |
|
|
stringsAsFactors = FALSE |
|
|
) |
|
|
} |
|
|
} |
|
|
}, |
|
|
error = function(e) { |
|
|
|
|
|
} |
|
|
) |
|
|
} |
|
|
} |
|
|
}, |
|
|
error = function(e) { |
|
|
warning("Unzip failed: ", e$message) |
|
|
return(NULL) |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
na_vec <- generate_dwd_na_strings() |
|
|
|
|
|
na_vec_clean <- unique(trimws(na_vec)) |
|
|
|
|
|
if (!"" %in% na_vec_clean) na_vec_clean <- c(na_vec_clean, "") |
|
|
|
|
|
tryCatch( |
|
|
{ |
|
|
|
|
|
|
|
|
dt <- data.table::fread( |
|
|
data_file, |
|
|
sep = ";", |
|
|
header = TRUE, |
|
|
na.strings = na_vec_clean, |
|
|
colClasses = "character", |
|
|
encoding = "Latin-1", |
|
|
showProgress = FALSE, |
|
|
data.table = TRUE |
|
|
) |
|
|
|
|
|
|
|
|
old_names <- names(dt) |
|
|
names(dt) <- trimws(names(dt)) |
|
|
|
|
|
|
|
|
|
|
|
if ("MESS_DATUM_BEGINN" %in% names(dt)) { |
|
|
data.table::setnames(dt, "MESS_DATUM_BEGINN", "MESS_DATUM") |
|
|
} |
|
|
|
|
|
if (!"MESS_DATUM" %in% names(dt)) { |
|
|
return(NULL) |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (!is.null(start_date) || !is.null(end_date)) { |
|
|
|
|
|
sample_date <- dt$MESS_DATUM[1] |
|
|
nch <- nchar(sample_date) |
|
|
|
|
|
|
|
|
fmt_str <- if (nch == 8) "%Y%m%d" else if (nch == 10) "%Y%m%d%H" else if (nch == 12) "%Y%m%d%H%M" else "%Y%m%d%H" |
|
|
|
|
|
if (!is.na(nch)) { |
|
|
if (!is.null(start_date)) { |
|
|
|
|
|
|
|
|
s_val <- format(as.POSIXct(start_date), format = fmt_str, tz = "UTC") |
|
|
dt <- dt[MESS_DATUM >= s_val] |
|
|
} |
|
|
|
|
|
if (!is.null(end_date)) { |
|
|
|
|
|
|
|
|
e_limit <- as.POSIXct(end_date) + days(1) |
|
|
e_val <- format(e_limit, format = fmt_str, tz = "UTC") |
|
|
dt <- dt[MESS_DATUM <= e_val] |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
df <- as_tibble(dt) |
|
|
|
|
|
|
|
|
raw_date <- as.character(df$MESS_DATUM) |
|
|
if (length(raw_date) == 0) { |
|
|
return(NULL) |
|
|
} |
|
|
|
|
|
nch <- nchar(raw_date[1]) |
|
|
fmt <- if (nch == 8) "%Y%m%d" else if (nch == 10) "%Y%m%d%H" else if (nch == 12) "%Y%m%d%H%M" else "%Y%m%d%H" |
|
|
|
|
|
df$datetime <- as.POSIXct(raw_date, format = fmt, tz = "UTC") |
|
|
|
|
|
|
|
|
df <- df[!is.na(df$datetime), ] |
|
|
|
|
|
|
|
|
if (!is.null(start_date)) { |
|
|
s_limit <- as.POSIXct(start_date) |
|
|
df <- df[df$datetime >= s_limit, ] |
|
|
} |
|
|
if (!is.null(end_date)) { |
|
|
e_limit <- as.POSIXct(end_date) + days(1) |
|
|
df <- df[df$datetime <= e_limit, ] |
|
|
} |
|
|
|
|
|
|
|
|
weather_cols <- c( |
|
|
"temp", "temp_min", "temp_max", "temp_min_avg", "temp_max_avg", "rh", "dew_point", |
|
|
"abs_humidity", "vapor_pressure", "wet_bulb_temp", |
|
|
"precip", "wind_speed", "wind_dir", "pressure", "station_pressure", "cloud_cover", "wind_gust_max", "solar_global", "sunshine_duration", |
|
|
"temp_5cm", |
|
|
"soil_temp_2cm", "soil_temp_5cm", "soil_temp_10cm", "soil_temp_20cm", "soil_temp_50cm", "soil_temp_100cm", |
|
|
"soil_temp_min_5cm", |
|
|
"snow_depth", "snow_water_equiv", "snow_fresh_sum", "snow_depth_sum", |
|
|
"thunderstorm", "glaze", "graupel", "hail", "fog", "frost", "storm_6", "storm_8", "dew", |
|
|
"precip_net_thunderstorm", "precip_net_graupel", "precip_net_hail", "precip_net_fog", |
|
|
"visibility", "weather_code", |
|
|
"cloud_layer1_code", "cloud_layer1_height", "cloud_layer1_amount", |
|
|
"cloud_layer2_code", "cloud_layer2_height", "cloud_layer2_amount", |
|
|
"cloud_layer3_code", "cloud_layer3_height", "cloud_layer3_amount", |
|
|
"cloud_layer4_code", "cloud_layer4_height", "cloud_layer4_amount" |
|
|
) |
|
|
weather_text_cols <- c( |
|
|
"cloud_cover_indicator", "cloud_layer1_abbrev", "cloud_layer2_abbrev", "cloud_layer3_abbrev", "cloud_layer4_abbrev", |
|
|
"visibility_indicator", "weather_text" |
|
|
) |
|
|
df <- df %>% |
|
|
rename_with(~ case_when( |
|
|
|
|
|
. == "TT_TU" ~ "temp", |
|
|
. == "RF_TU" ~ "rh", |
|
|
. == "TT" ~ "temp", |
|
|
. == "TD" ~ "dew_point", |
|
|
. == "ABSF_STD" ~ "abs_humidity", |
|
|
. == "VP_STD" ~ "vapor_pressure", |
|
|
. == "TF_STD" ~ "wet_bulb_temp", |
|
|
. == "P_STD" ~ "pressure", |
|
|
. == "TT_STD" ~ "temp", |
|
|
. == "RF_STD" ~ "rh", |
|
|
. == "TD_STD" ~ "dew_point", |
|
|
. == "R1" ~ "precip", |
|
|
. == "F" ~ "wind_speed", |
|
|
. == "D" ~ "wind_dir", |
|
|
. == "P" ~ "pressure", |
|
|
. == "P0" ~ "station_pressure", |
|
|
. %in% c("N_8", "V_N") ~ "cloud_cover", |
|
|
. == "V_N_I" ~ "cloud_cover_indicator", |
|
|
. == "V_S1_CS" ~ "cloud_layer1_code", |
|
|
. == "V_S1_CSA" ~ "cloud_layer1_abbrev", |
|
|
. == "V_S1_HHS" ~ "cloud_layer1_height", |
|
|
. == "V_S1_NS" ~ "cloud_layer1_amount", |
|
|
. == "V_S2_CS" ~ "cloud_layer2_code", |
|
|
. == "V_S2_CSA" ~ "cloud_layer2_abbrev", |
|
|
. == "V_S2_HHS" ~ "cloud_layer2_height", |
|
|
. == "V_S2_NS" ~ "cloud_layer2_amount", |
|
|
. == "V_S3_CS" ~ "cloud_layer3_code", |
|
|
. == "V_S3_CSA" ~ "cloud_layer3_abbrev", |
|
|
. == "V_S3_HHS" ~ "cloud_layer3_height", |
|
|
. == "V_S3_NS" ~ "cloud_layer3_amount", |
|
|
. == "V_S4_CS" ~ "cloud_layer4_code", |
|
|
. == "V_S4_CSA" ~ "cloud_layer4_abbrev", |
|
|
. == "V_S4_HHS" ~ "cloud_layer4_height", |
|
|
. == "V_S4_NS" ~ "cloud_layer4_amount", |
|
|
. == "V_VV" ~ "visibility", |
|
|
. == "V_VV_I" ~ "visibility_indicator", |
|
|
. == "WW" ~ "weather_code", |
|
|
. == "WW_Text" ~ "weather_text", |
|
|
. %in% c("FX_10", "FX_911") ~ "wind_gust_max", |
|
|
. %in% c("FG_LBERG", "FG_STRAHL") ~ "solar_global", |
|
|
. %in% c("SD_LBERG", "SD_STRAHL", "SD_SO") ~ "sunshine_duration", |
|
|
|
|
|
|
|
|
. == "TT_10" ~ "temp", |
|
|
. == "RF_10" ~ "rh", |
|
|
. == "TD_10" ~ "dew_point", |
|
|
. == "TM5_10" ~ "temp_5cm", |
|
|
. == "RWS_10" ~ "precip", |
|
|
. == "RWS_DAU_10" ~ "precip_duration", |
|
|
. == "RWS_IND_10" ~ "precip_ind", |
|
|
. == "FF_10" ~ "wind_speed", |
|
|
. == "DD_10" ~ "wind_dir", |
|
|
. == "FX_10" ~ "wind_gust_max", |
|
|
. == "FNX_10" ~ "wind_gust_min", |
|
|
. == "DS_10" ~ "diffuse_radiation", |
|
|
. == "GS_10" ~ "solar_global", |
|
|
. == "SD_10" ~ "sunshine_duration", |
|
|
. == "LS_10" ~ "longwave_radiation", |
|
|
. == "TX_10" ~ "temp_max", |
|
|
. == "TN_10" ~ "temp_min", |
|
|
. == "PP_10" ~ "station_pressure", |
|
|
|
|
|
|
|
|
. == "TGK" ~ "soil_temp_min_5cm", |
|
|
. %in% c("TMK", "TM_K") ~ "temp", |
|
|
. == "TNK" ~ "temp_min", |
|
|
. == "TXK" ~ "temp_max", |
|
|
. %in% c("RSK", "RS_K", "RS") ~ "precip", |
|
|
. == "FM" ~ "wind_speed", |
|
|
. == "FX" ~ "wind_gust_max", |
|
|
. %in% c("SDK", "SD_SO") ~ "sunshine_duration", |
|
|
. == "UPM" ~ "rh", |
|
|
. == "PM" ~ "pressure", |
|
|
. == "NM" ~ "cloud_cover", |
|
|
|
|
|
|
|
|
. == "MO_TT" ~ "temp", |
|
|
. == "MX_TX" ~ "temp_max", |
|
|
. == "MX_TN" ~ "temp_min", |
|
|
. == "MO_TX" ~ "temp_max_avg", |
|
|
. == "MO_TN" ~ "temp_min_avg", |
|
|
. == "MO_RR" ~ "precip", |
|
|
. == "MX_RS" ~ "precip_max_day", |
|
|
. == "MO_SD_S" ~ "sunshine_duration", |
|
|
. == "MO_N" ~ "cloud_cover", |
|
|
. == "MO_FK" ~ "wind_speed", |
|
|
. == "MX_FX" ~ "wind_gust_max", |
|
|
. == "MO_NSH" ~ "snow_fresh_sum", |
|
|
. == "MO_SH_S" ~ "snow_depth_sum", |
|
|
|
|
|
|
|
|
|
|
|
. == "V_TE002" ~ "soil_temp_2cm", |
|
|
. == "V_TE005" ~ "soil_temp_5cm", |
|
|
. == "V_TE010" ~ "soil_temp_10cm", |
|
|
. == "V_TE020" ~ "soil_temp_20cm", |
|
|
. == "V_TE050" ~ "soil_temp_50cm", |
|
|
. == "V_TE100" ~ "soil_temp_100cm", |
|
|
. == "V_TE002M" ~ "soil_temp_2cm", |
|
|
. == "V_TE005M" ~ "soil_temp_5cm", |
|
|
. == "V_TE010M" ~ "soil_temp_100cm", |
|
|
. == "V_TE020M" ~ "soil_temp_20cm", |
|
|
. == "V_TE050M" ~ "soil_temp_50cm", |
|
|
|
|
|
|
|
|
. == "ASH_6" ~ "snow_depth_set", |
|
|
. == "SH_TAG" ~ "snow_depth", |
|
|
. == "WASH_6" ~ "snow_water_equiv", |
|
|
. == "WAAS_6" ~ "snow_water_equiv_excavated", |
|
|
|
|
|
|
|
|
. == "GEWITTER" ~ "thunderstorm", |
|
|
. == "GLATTEIS" ~ "glaze", |
|
|
. == "GRAUPEL" ~ "graupel", |
|
|
. == "HAGEL" ~ "hail", |
|
|
. == "NEBEL" ~ "fog", |
|
|
. == "REIF" ~ "frost", |
|
|
. == "STURM_6" ~ "storm_6", |
|
|
. == "STURM_8" ~ "storm_8", |
|
|
. == "TAU" ~ "dew", |
|
|
|
|
|
|
|
|
. == "RR_GEWITTER" ~ "precip_net_thunderstorm", |
|
|
. == "RR_GRAUPEL" ~ "precip_net_graupel", |
|
|
. == "RR_HAGEL" ~ "precip_net_hail", |
|
|
. == "RR_NEBEL" ~ "precip_net_fog", |
|
|
TRUE ~ . |
|
|
)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
df <- df %>% |
|
|
mutate(across(any_of(weather_cols), ~ { |
|
|
|
|
|
val <- suppressWarnings(as.numeric(as.character(.))) |
|
|
|
|
|
|
|
|
res <- ifelse(is.na(val) | val >= 7999 | val <= -998, NA_real_, val) |
|
|
res |
|
|
})) |
|
|
|
|
|
if (length(weather_text_cols) > 0) { |
|
|
df <- df %>% |
|
|
mutate(across(any_of(weather_text_cols), ~ { |
|
|
val <- trimws(as.character(.)) |
|
|
val[val == ""] <- NA_character_ |
|
|
val |
|
|
})) |
|
|
} |
|
|
|
|
|
|
|
|
if ("temp" %in% names(df)) df$temp[is.na(df$temp) | df$temp < -80 | df$temp > 60] <- NA_real_ |
|
|
if ("temp_min" %in% names(df)) df$temp_min[is.na(df$temp_min) | df$temp_min < -80 | df$temp_min > 60] <- NA_real_ |
|
|
if ("temp_max" %in% names(df)) df$temp_max[is.na(df$temp_max) | df$temp_max < -80 | df$temp_max > 60] <- NA_real_ |
|
|
if ("dew_point" %in% names(df)) df$dew_point[is.na(df$dew_point) | df$dew_point < -90 | df$dew_point > 60] <- NA_real_ |
|
|
if ("wet_bulb_temp" %in% names(df)) df$wet_bulb_temp[is.na(df$wet_bulb_temp) | df$wet_bulb_temp < -80 | df$wet_bulb_temp > 60] <- NA_real_ |
|
|
if ("rh" %in% names(df)) df$rh[is.na(df$rh) | df$rh < 0 | df$rh > 100] <- NA_real_ |
|
|
if ("pressure" %in% names(df)) df$pressure[is.na(df$pressure) | df$pressure < 800 | df$pressure > 1100] <- NA_real_ |
|
|
if ("station_pressure" %in% names(df)) df$station_pressure[is.na(df$station_pressure) | df$station_pressure < 500 | df$station_pressure > 1100] <- NA_real_ |
|
|
if ("soil_temp_min_5cm" %in% names(df)) df$soil_temp_min_5cm[is.na(df$soil_temp_min_5cm) | df$soil_temp_min_5cm < -80 | df$soil_temp_min_5cm > 60] <- NA_real_ |
|
|
if ("solar_global" %in% names(df)) df$solar_global[is.na(df$solar_global) | df$solar_global < 0 | df$solar_global > 3500] <- NA_real_ |
|
|
if ("snow_depth" %in% names(df)) df$snow_depth[is.na(df$snow_depth) | df$snow_depth < 0] <- NA_real_ |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
fname_lower <- tolower(target_file) |
|
|
is_tf_tu_td <- any(sapply(c("_tf_", "_tu_", "_td_"), function(p) grepl(p, fname_lower, fixed = TRUE))) |
|
|
is_10min <- grepl("10minutenwerte", fname_lower, fixed = TRUE) | grepl("zehn_min", fname_lower, fixed = TRUE) |
|
|
|
|
|
if (is_tf_tu_td && !is_10min) { |
|
|
if ("pressure" %in% names(df)) df$pressure <- NULL |
|
|
if ("station_pressure" %in% names(df)) df$station_pressure <- NULL |
|
|
} |
|
|
|
|
|
|
|
|
df <- df %>% select(datetime, any_of(c(weather_cols, weather_text_cols))) |
|
|
|
|
|
result_df <- as_tibble(df) |
|
|
|
|
|
if (!is.null(granular_meta_df) && nrow(granular_meta_df) > 0) { |
|
|
attr(result_df, "granular_meta") <- granular_meta_df |
|
|
} |
|
|
result_df |
|
|
}, |
|
|
error = function(e) { |
|
|
warning("Failed to parse ", basename(zip_path), ": ", e$message) |
|
|
return(NULL) |
|
|
} |
|
|
) |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
fetch_and_parse_station_data <- function(station_id, index_df, start_date = NULL, end_date = NULL, status_cb = NULL) { |
|
|
|
|
|
|
|
|
notify <- function(msg, detail = NULL, value = NULL) { |
|
|
if (is.function(status_cb)) status_cb(msg, detail, value) |
|
|
} |
|
|
|
|
|
targets <- index_df %>% filter(id == station_id) |
|
|
|
|
|
if (nrow(targets) == 0) { |
|
|
notify("No data found in index for this station.", value = 1) |
|
|
return(NULL) |
|
|
} |
|
|
|
|
|
all_data_list <- list() |
|
|
n_targets <- nrow(targets) |
|
|
|
|
|
last_pct <- -1 |
|
|
|
|
|
for (i in seq_len(n_targets)) { |
|
|
target_row <- targets[i, ] |
|
|
url <- target_row$url |
|
|
tmp_zip <- tempfile(fileext = ".zip") |
|
|
|
|
|
dl_status <- tryCatch( |
|
|
{ |
|
|
|
|
|
h <- curl::new_handle() |
|
|
curl::handle_setopt(h, noprogress = FALSE, progressfunction = function(down, up) { |
|
|
if (down[1] > 0) { |
|
|
pct <- round(down[2] / down[1] * 100) |
|
|
if (pct != last_pct) { |
|
|
msg <- sprintf("Downloading file %d/%d...", i, n_targets) |
|
|
detail <- sprintf("%s (%d%%)", format_bytes(down[1]), pct) |
|
|
|
|
|
val <- ((i - 1) / n_targets) + (pct / 100 * (0.8 / n_targets)) |
|
|
notify(msg, detail = detail, value = val) |
|
|
last_pct <<- pct |
|
|
} |
|
|
} |
|
|
TRUE |
|
|
}) |
|
|
curl::curl_fetch_disk(url, tmp_zip, handle = h) |
|
|
TRUE |
|
|
}, |
|
|
error = function(e) { |
|
|
warning("Download failed for ", url, ": ", e$message) |
|
|
FALSE |
|
|
} |
|
|
) |
|
|
|
|
|
if (dl_status) { |
|
|
notify(sprintf("Parsing file %d/%d...", i, n_targets), value = (i - 0.2) / n_targets) |
|
|
parsed <- read_dwd_data(tmp_zip, start_date, end_date) |
|
|
if (!is.null(parsed) && nrow(parsed) > 0) { |
|
|
all_data_list[[length(all_data_list) + 1]] <- parsed |
|
|
} |
|
|
unlink(tmp_zip) |
|
|
} |
|
|
} |
|
|
|
|
|
if (length(all_data_list) == 0) { |
|
|
notify("Failed to parse any data files.", value = 1) |
|
|
return(NULL) |
|
|
} |
|
|
|
|
|
notify("Merging multiple files...", value = 0.9) |
|
|
final_df <- purrr::reduce(all_data_list, full_join, by = "datetime") |
|
|
|
|
|
|
|
|
weather_vars <- c( |
|
|
"temp", "temp_min", "temp_max", "temp_min_avg", "temp_max_avg", "rh", "dew_point", |
|
|
"abs_humidity", "vapor_pressure", "wet_bulb_temp", |
|
|
"precip", "wind_speed", "wind_dir", "pressure", "station_pressure", "cloud_cover", "cloud_cover_indicator", |
|
|
"wind_gust_max", "solar_global", "sunshine_duration", |
|
|
"soil_temp_2cm", "soil_temp_5cm", "soil_temp_10cm", "soil_temp_20cm", "soil_temp_50cm", "soil_temp_100cm", |
|
|
"soil_temp_min_5cm", |
|
|
"snow_depth", "snow_water_equiv", "snow_fresh_sum", "snow_depth_sum", |
|
|
"thunderstorm", "glaze", "graupel", "hail", "fog", "frost", "storm_6", "storm_8", "dew", |
|
|
"precip_net_thunderstorm", "precip_net_graupel", "precip_net_hail", "precip_net_fog", |
|
|
"visibility", "visibility_indicator", "weather_code", "weather_text", |
|
|
"cloud_layer1_code", "cloud_layer1_abbrev", "cloud_layer1_height", "cloud_layer1_amount", |
|
|
"cloud_layer2_code", "cloud_layer2_abbrev", "cloud_layer2_height", "cloud_layer2_amount", |
|
|
"cloud_layer3_code", "cloud_layer3_abbrev", "cloud_layer3_height", "cloud_layer3_amount", |
|
|
"cloud_layer4_code", "cloud_layer4_abbrev", "cloud_layer4_height", "cloud_layer4_amount" |
|
|
) |
|
|
|
|
|
available_cols <- names(final_df) |
|
|
|
|
|
|
|
|
clean_df <- final_df %>% select(datetime) |
|
|
|
|
|
|
|
|
for (v in weather_vars) { |
|
|
|
|
|
v_cols <- available_cols[grepl(paste0("^", v, "(\\.|$)"), available_cols)] |
|
|
|
|
|
if (length(v_cols) > 0) { |
|
|
|
|
|
|
|
|
clean_df[[v]] <- do.call(coalesce, final_df[v_cols]) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
clean_df <- clean_df %>% |
|
|
distinct(datetime, .keep_all = TRUE) %>% |
|
|
arrange(datetime) |
|
|
|
|
|
if (!is.null(start_date) && !is.null(end_date)) { |
|
|
clean_df <- clean_df %>% |
|
|
filter(datetime >= as.POSIXct(start_date), datetime <= as.POSIXct(end_date) + days(1)) |
|
|
} |
|
|
|
|
|
n_rows <- nrow(clean_df) |
|
|
notify(sprintf("Success: %d rows processed.", n_rows)) |
|
|
|
|
|
return(clean_df) |
|
|
} |
|
|
|
|
|
|
|
|
format_bytes <- function(x) { |
|
|
if (is.na(x)) { |
|
|
return("Unknown size") |
|
|
} |
|
|
if (x < 1024) { |
|
|
return(paste(x, "B")) |
|
|
} |
|
|
if (x < 1024^2) { |
|
|
return(paste(round(x / 1024, 1), "KB")) |
|
|
} |
|
|
return(paste(round(x / 1024^2, 1), "MB")) |
|
|
} |
|
|
|