File size: 29,202 Bytes
1662122
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3e6528d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8cfb3a4
 
 
 
 
 
1662122
 
 
 
 
 
08dc5ac
1662122
 
 
 
 
76079b2
 
1662122
 
76079b2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
08dc5ac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1662122
 
 
 
 
 
 
 
 
 
8cfb3a4
 
 
 
1662122
 
 
8cfb3a4
 
 
1662122
8cfb3a4
 
 
 
 
 
 
1662122
 
08dc5ac
 
 
 
 
3e6528d
 
8cfb3a4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3e6528d
 
 
 
 
8cfb3a4
 
 
 
 
 
 
 
 
1bcd8ae
1662122
 
8cfb3a4
 
 
 
 
1662122
8cfb3a4
 
 
1662122
8cfb3a4
1662122
 
 
3e6528d
 
 
1662122
8cfb3a4
1662122
 
8cfb3a4
3e6528d
1662122
 
c53f4d8
d3d51f7
 
3e6528d
1957ae8
d3d51f7
08dc5ac
c935b3b
c53f4d8
d3d51f7
 
 
 
 
 
 
 
 
 
c53f4d8
1662122
 
3fc14e6
1662122
 
d3d51f7
 
 
 
 
 
 
 
 
1662122
 
 
 
 
 
d3d51f7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1662122
 
 
3fc14e6
1957ae8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3fc14e6
08dc5ac
3fc14e6
 
 
 
 
 
 
 
 
 
c53f4d8
1bcd8ae
 
c935b3b
 
 
 
1bcd8ae
 
 
 
 
 
c935b3b
 
1bcd8ae
3e6528d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1bcd8ae
c53f4d8
d3d51f7
 
 
 
 
 
c53f4d8
 
8cfb3a4
c53f4d8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1662122
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d3d51f7
 
 
 
 
 
 
 
 
1662122
 
1a9cb0e
 
d3d51f7
 
1662122
 
 
08dc5ac
1662122
c53f4d8
 
08dc5ac
 
 
 
 
1957ae8
08dc5ac
 
1957ae8
 
 
08dc5ac
 
 
 
1662122
3e6528d
1662122
08dc5ac
 
 
 
 
 
1662122
 
 
235874d
1662122
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
08dc5ac
1662122
 
 
 
 
 
 
 
 
 
 
 
 
f0450cf
 
 
 
 
 
 
1662122
 
08dc5ac
1662122
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c53f4d8
d3d51f7
 
3e6528d
d3d51f7
 
08dc5ac
c935b3b
c53f4d8
d3d51f7
 
 
 
 
 
c53f4d8
1662122
 
 
 
 
3e6528d
 
 
 
1662122
08dc5ac
1662122
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3e6528d
1662122
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
#' Parse content of a DWD data zip file
#'
#' @param zip_path Local path to the .zip file
#' @param start_date Optional filter start (POSIXct)
#' @param end_date Optional filter end (POSIXct)
#' @return Tibble with parsed data or NULL
#' Generate DWD Missing Value Strings
#'
#' @param base Basic codes like -999, 8000
#' @param nspace Number of leading spaces to include (DWD files often have 7 characters wide columns)
#' @return Vector of strings
generate_dwd_na_strings <- function(base = c("-999", "-777", "-888", "8000", "9999", "-9999", "999.9", "8000.0", "800.0", "-999.0"), nspace = 10) {
    res <- base
    # Add decimal variations
    res <- unique(c(res, paste0(base, ".0"), paste0(base, ".00")))
    # Add leading spaces
    for (i in 1:nspace) {
        res <- unique(c(res, paste0(strrep(" ", i), base)))
    }
    res <- unique(c(res, ""))
    return(res)
}

#' Filter DWD Rows by Date Window
#'
#' Uses interval overlap when a `datetime_end` column exists.
#'
#' @param df Data frame with `datetime` and optional `datetime_end`
#' @param start_date Optional window start
#' @param end_date Optional window end
#' @return Filtered data frame
filter_dwd_rows_by_date_window <- function(df, start_date = NULL, end_date = NULL) {
    if (is.null(df) || nrow(df) == 0) {
        return(df)
    }

    row_start <- as.Date(df$datetime)
    row_end <- if ("datetime_end" %in% names(df)) as.Date(df$datetime_end) else row_start
    keep <- rep(TRUE, nrow(df))

    if (!is.null(start_date)) {
        keep <- keep & (is.na(row_end) | row_end >= as.Date(start_date))
    }
    if (!is.null(end_date)) {
        keep <- keep & (is.na(row_start) | row_start <= as.Date(end_date))
    }

    df[keep, , drop = FALSE]
}

#' Parse content of a DWD data zip file
#'
#' @param zip_path Local path to the .zip file
#' @param start_date Optional filter start (POSIXct)
#' @param end_date Optional filter end (POSIXct)
#' @return Tibble with parsed data or NULL
#' Parse content of a DWD data zip file
#'
#' @param zip_path Local path to the .zip file
#' @param start_date Optional filter start (POSIXct)
#' @param end_date Optional filter end (POSIXct)
#' @return Tibble with parsed data or NULL
read_dwd_data <- function(zip_path, start_date = NULL, end_date = NULL, extract_metadata = FALSE) {
    # Create temp dir for extraction
    exdir <- tempfile()
    dir.create(exdir)
    on.exit(unlink(exdir, recursive = TRUE))

    # Unzip - OPTIMIZED: List first, then extract only the data file
    # This avoids extracting 10+ metadata files which saves IO
    tryCatch(
        {
            # List files in the zip
            file_list <- unzip(zip_path, list = TRUE)

            # Find the data file (produkt*.txt)
            # We look for the largest file matching the pattern, just in case
            # Typical DWD name: produkt_param_date_station.txt
            data_files <- file_list$Name[grepl("^produkt.*\\.txt$", file_list$Name)]

            if (length(data_files) == 0) {
                return(NULL)
            }

            # Determine which file to extract (largest if multiple, though usually one)
            if (length(data_files) > 1) {
                # Get sizes
                sizes <- file_list$Length[match(data_files, file_list$Name)]
                target_file <- data_files[which.max(sizes)]
            } else {
                target_file <- data_files[1]
            }

            # Extract ONLY the target file
            unzip(zip_path, files = target_file, exdir = exdir)
            data_file <- file.path(exdir, target_file)

            # Also extract metadata file for granular parameter validity periods
            granular_meta_df <- NULL
            if (extract_metadata) {
                meta_files <- file_list$Name[grepl("^Metadaten_Parameter.*\\.txt$", file_list$Name, ignore.case = TRUE)]
                if (length(meta_files) > 0) {
                    meta_file <- meta_files[1]
                    tryCatch(
                        {
                            unzip(zip_path, files = meta_file, exdir = exdir)
                            meta_path <- file.path(exdir, meta_file)
                            if (file.exists(meta_path)) {
                                # Read semicolon-separated metadata file
                                meta_dt <- data.table::fread(meta_path, sep = ";", header = TRUE, fill = TRUE, encoding = "Latin-1")
                                # Expected columns usually include: Parameter, Von_Datum, Bis_Datum
                                # Normalize column names
                                names(meta_dt) <- tolower(trimws(names(meta_dt)))
                                if ("parameter" %in% names(meta_dt)) {
                                    granular_meta_df <- data.frame(
                                        param = trimws(meta_dt$parameter),
                                        start_date = if ("von_datum" %in% names(meta_dt)) as.character(meta_dt$von_datum) else NA_character_,
                                        end_date = if ("bis_datum" %in% names(meta_dt)) as.character(meta_dt$bis_datum) else NA_character_,
                                        stringsAsFactors = FALSE
                                    )
                                }
                            }
                        },
                        error = function(e) {
                            # Silent fail for metadata - not critical
                        }
                    )
                }
            }
        },
        error = function(e) {
            warning("Unzip failed: ", e$message)
            return(NULL)
        }
    )

    # Read Data
    # DWD standard: semicolon sep
    na_vec <- generate_dwd_na_strings()
    # fread strips whitespace by default, so we should pass trimmed NA strings to avoid warnings/errors
    na_vec_clean <- unique(trimws(na_vec))
    # Ensure empty string is included if not already
    if (!"" %in% na_vec_clean) na_vec_clean <- c(na_vec_clean, "")

    tryCatch(
        {
            # Use data.table::fread for faster parsing
            # Read everything as character first to separate schema from data loading safety
            dt <- data.table::fread(
                data_file,
                sep = ";",
                header = TRUE,
                na.strings = na_vec_clean,
                colClasses = "character", # Force character to avoid type inference issues with weird DWD codes
                encoding = "Latin-1",
                showProgress = FALSE,
                data.table = TRUE
            )

            # Sanitize column names: remove whitespace
            old_names <- names(dt)
            names(dt) <- trimws(names(dt))


            has_interval_end <- "MESS_DATUM_ENDE" %in% names(dt)

            # Standardize Date Column Name
            if ("MESS_DATUM_BEGINN" %in% names(dt)) {
                data.table::setnames(dt, "MESS_DATUM_BEGINN", "MESS_DATUM")
            }

            if (!"MESS_DATUM" %in% names(dt)) {
                return(NULL)
            }

            # --- Optimization: Pre-filter rows by date string BEFORE heavy processing ---
            # DWD dates are usually monotonic strings/numbers: YYYYMMDD, YYYYMMDDHH, etc.
            # We can filter lexically or numerically without parsing to POSIXct first.
            if (!is.null(start_date) || !is.null(end_date)) {
                # Determine format from first row
                sample_date <- dt$MESS_DATUM[1]
                nch <- nchar(sample_date)

                # Format mappings correspond to DWD standards
                fmt_str <- if (nch == 8) "%Y%m%d" else if (nch == 10) "%Y%m%d%H" else if (nch == 12) "%Y%m%d%H%M" else "%Y%m%d%H"

                if (!is.na(nch)) {
                    if (!is.null(start_date)) {
                        # Format request date to match DWD string format
                        # Explicitly use UTC to avoid timezone shifts during string formatting if inputs are UTC
                        s_val <- format(as.POSIXct(start_date), format = fmt_str, tz = "UTC")
                        if (has_interval_end) {
                            dt <- dt[MESS_DATUM_ENDE >= s_val]
                        } else {
                            dt <- dt[MESS_DATUM >= s_val]
                        }
                    }

                    if (!is.null(end_date)) {
                        # Add buffer (end of day) if needed, but usually exact match on string works
                        # If end_date is 2025-01-01, we want up to 2025-01-01 23:59
                        e_limit <- as.POSIXct(end_date) + days(1)
                        e_val <- format(e_limit, format = fmt_str, tz = "UTC")
                        dt <- dt[MESS_DATUM <= e_val]
                    }
                }
            }

            # Convert to tibble/data.frame for existing dplyr pipeline compatibility
            # (Keeping existing logic for column mapping to minimize regression risk)
            df <- as_tibble(dt)

            # Parse Date logic (Original)
            raw_date <- as.character(df$MESS_DATUM)
            if (length(raw_date) == 0) {
                return(NULL)
            } # If filtered to empty

            nch <- nchar(raw_date[1])
            fmt <- if (nch == 8) "%Y%m%d" else if (nch == 10) "%Y%m%d%H" else if (nch == 12) "%Y%m%d%H%M" else "%Y%m%d%H"

            df$datetime <- as.POSIXct(raw_date, format = fmt, tz = "UTC")
            if ("MESS_DATUM_ENDE" %in% names(df)) {
                df$datetime_end <- as.POSIXct(as.character(df$MESS_DATUM_ENDE), format = fmt, tz = "UTC")
            }

            # Filter valid dates (Safety check)
            df <- df[!is.na(df$datetime), ]

            # Additional safety window filter (dates might strictly parse differently than strings)
            df <- filter_dwd_rows_by_date_window(df, start_date, end_date)

            # Column Mapping
            weather_cols <- c(
                "temp", "temp_min", "temp_max", "temp_min_avg", "temp_max_avg", "rh", "dew_point",
                "abs_humidity", "vapor_pressure", "wet_bulb_temp",
                "precip", "precip_max_day", "wind_speed", "wind_dir", "pressure", "station_pressure", "cloud_cover", "wind_gust_max", "solar_global", "sunshine_duration",
                "temp_5cm",
                "soil_temp_2cm", "soil_temp_5cm", "soil_temp_10cm", "soil_temp_20cm", "soil_temp_50cm", "soil_temp_100cm",
                "soil_temp_min_5cm",
                "snow_depth", "snow_water_equiv", "snow_fresh_sum", "snow_depth_sum",
                "thunderstorm", "glaze", "graupel", "hail", "fog", "frost", "storm_6", "storm_8", "dew",
                "precip_net_thunderstorm", "precip_net_graupel", "precip_net_hail", "precip_net_fog",
                "visibility", "weather_code",
                "cloud_layer1_code", "cloud_layer1_height", "cloud_layer1_amount",
                "cloud_layer2_code", "cloud_layer2_height", "cloud_layer2_amount",
                "cloud_layer3_code", "cloud_layer3_height", "cloud_layer3_amount",
                "cloud_layer4_code", "cloud_layer4_height", "cloud_layer4_amount"
            )
            weather_text_cols <- c(
                "cloud_cover_indicator", "cloud_layer1_abbrev", "cloud_layer2_abbrev", "cloud_layer3_abbrev", "cloud_layer4_abbrev",
                "visibility_indicator", "weather_text"
            )
            df <- df %>%
                rename_with(~ case_when(
                    # Hourly Mappings
                    . == "TT_TU" ~ "temp",
                    . == "RF_TU" ~ "rh",
                    . == "TT" ~ "temp",
                    . == "TD" ~ "dew_point",
                    . == "ABSF_STD" ~ "abs_humidity",
                    . == "VP_STD" ~ "vapor_pressure",
                    . == "TF_STD" ~ "wet_bulb_temp",
                    . == "P_STD" ~ "pressure",
                    . == "TT_STD" ~ "temp",
                    . == "RF_STD" ~ "rh",
                    . == "TD_STD" ~ "dew_point",
                    . == "R1" ~ "precip",
                    . == "F" ~ "wind_speed",
                    . == "D" ~ "wind_dir",
                    . == "P" ~ "pressure",
                    . == "P0" ~ "station_pressure",
                    . %in% c("N_8", "V_N") ~ "cloud_cover",
                    . == "V_N_I" ~ "cloud_cover_indicator",
                    . == "V_S1_CS" ~ "cloud_layer1_code",
                    . == "V_S1_CSA" ~ "cloud_layer1_abbrev",
                    . == "V_S1_HHS" ~ "cloud_layer1_height",
                    . == "V_S1_NS" ~ "cloud_layer1_amount",
                    . == "V_S2_CS" ~ "cloud_layer2_code",
                    . == "V_S2_CSA" ~ "cloud_layer2_abbrev",
                    . == "V_S2_HHS" ~ "cloud_layer2_height",
                    . == "V_S2_NS" ~ "cloud_layer2_amount",
                    . == "V_S3_CS" ~ "cloud_layer3_code",
                    . == "V_S3_CSA" ~ "cloud_layer3_abbrev",
                    . == "V_S3_HHS" ~ "cloud_layer3_height",
                    . == "V_S3_NS" ~ "cloud_layer3_amount",
                    . == "V_S4_CS" ~ "cloud_layer4_code",
                    . == "V_S4_CSA" ~ "cloud_layer4_abbrev",
                    . == "V_S4_HHS" ~ "cloud_layer4_height",
                    . == "V_S4_NS" ~ "cloud_layer4_amount",
                    . == "V_VV" ~ "visibility",
                    . == "V_VV_I" ~ "visibility_indicator",
                    . == "WW" ~ "weather_code",
                    . == "WW_Text" ~ "weather_text",
                    . %in% c("FX_10", "FX_911") ~ "wind_gust_max",
                    . %in% c("FG_LBERG", "FG_STRAHL") ~ "solar_global",
                    . %in% c("SD_LBERG", "SD_STRAHL", "SD_SO") ~ "sunshine_duration",

                    # 10-Minute Mappings
                    . == "TT_10" ~ "temp",
                    . == "RF_10" ~ "rh",
                    . == "TD_10" ~ "dew_point",
                    . == "TM5_10" ~ "temp_5cm", # Soil temp logic? or 5cm air? DWD usually 5cm above ground
                    . == "RWS_10" ~ "precip",
                    . == "RWS_DAU_10" ~ "precip_duration", # Custom?
                    . == "RWS_IND_10" ~ "precip_ind",
                    . == "FF_10" ~ "wind_speed",
                    . == "DD_10" ~ "wind_dir",
                    . == "FX_10" ~ "wind_gust_max",
                    . == "FNX_10" ~ "wind_gust_min",
                    . == "DS_10" ~ "diffuse_radiation",
                    . == "GS_10" ~ "solar_global",
                    . == "SD_10" ~ "sunshine_duration",
                    . == "LS_10" ~ "longwave_radiation", # Atmospheric radiation?
                    . == "TX_10" ~ "temp_max",
                    . == "TN_10" ~ "temp_min",
                    . == "PP_10" ~ "station_pressure",

                    # Daily Mappings
                    . == "TGK" ~ "soil_temp_min_5cm",
                    . %in% c("TMK", "TM_K") ~ "temp",
                    . == "TNK" ~ "temp_min",
                    . == "TXK" ~ "temp_max",
                    . %in% c("RSK", "RS_K", "RS") ~ "precip",
                    . == "FM" ~ "wind_speed",
                    . == "FX" ~ "wind_gust_max",
                    . %in% c("SDK", "SD_SO") ~ "sunshine_duration", # SD_SO is daily sunshine
                    . == "UPM" ~ "rh",
                    . == "PM" ~ "pressure",
                    . == "NM" ~ "cloud_cover",

                    # Monthly Mappings
                    . == "MO_TT" ~ "temp",
                    . == "MX_TX" ~ "temp_max", # Absolute Max
                    . == "MX_TN" ~ "temp_min", # Absolute Min
                    . == "MO_TX" ~ "temp_max_avg", # Average Daily Max
                    . == "MO_TN" ~ "temp_min_avg", # Average Daily Min
                    . == "MO_RR" ~ "precip",
                    . == "MX_RS" ~ "precip_max_day", # Not in standard weather_cols but useful
                    . == "MO_SD_S" ~ "sunshine_duration",
                    . == "MO_N" ~ "cloud_cover",
                    . == "MO_FK" ~ "wind_speed",
                    . == "MX_FX" ~ "wind_gust_max",
                    . == "MO_NSH" ~ "snow_fresh_sum", # Monthly Fresh Snow Sum
                    . == "MO_SH_S" ~ "snow_depth_sum", # Monthly Snow Depth Sum

                    # Annual Mappings
                    . == "JA_TT" ~ "temp",
                    . == "JA_TX" ~ "temp_max_avg",
                    . == "JA_TN" ~ "temp_min_avg",
                    . == "JA_RR" ~ "precip",
                    . == "JA_FK" ~ "wind_speed",
                    . == "JA_N" ~ "cloud_cover",
                    . == "JA_SD_S" ~ "sunshine_duration",
                    . == "JA_MX_TX" ~ "temp_max",
                    . == "JA_MX_TN" ~ "temp_min",
                    . == "JA_MX_RS" ~ "precip_max_day",
                    . == "JA_MX_FX" ~ "wind_gust_max",
                    . == "JA_NSH" ~ "snow_fresh_sum",
                    . == "JA_SH_S" ~ "snow_depth_sum",
                    . == "JA_GEWITTER" ~ "thunderstorm",
                    . == "JA_GLATTEIS" ~ "glaze",
                    . == "JA_GRAUPEL" ~ "graupel",
                    . == "JA_HAGEL" ~ "hail",
                    . == "JA_NEBEL" ~ "fog",
                    . == "JA_REIF" ~ "frost",
                    . == "JA_STURM_6" ~ "storm_6",
                    . == "JA_STURM_8" ~ "storm_8",
                    . == "JA_TAU" ~ "dew",


                    # Soil Temperature
                    . == "V_TE002" ~ "soil_temp_2cm",
                    . == "V_TE005" ~ "soil_temp_5cm",
                    . == "V_TE010" ~ "soil_temp_10cm",
                    . == "V_TE020" ~ "soil_temp_20cm",
                    . == "V_TE050" ~ "soil_temp_50cm",
                    . == "V_TE100" ~ "soil_temp_100cm",
                    . == "V_TE002M" ~ "soil_temp_2cm",
                    . == "V_TE005M" ~ "soil_temp_5cm",
                    . == "V_TE010M" ~ "soil_temp_100cm",
                    . == "V_TE020M" ~ "soil_temp_20cm",
                    . == "V_TE050M" ~ "soil_temp_50cm",

                    # Water Equivalent / Snow
                    . == "ASH_6" ~ "snow_depth_set",
                    . == "SH_TAG" ~ "snow_depth",
                    . == "WASH_6" ~ "snow_water_equiv",
                    . == "WAAS_6" ~ "snow_water_equiv_excavated",

                    # Weather Phenomena
                    . == "GEWITTER" ~ "thunderstorm",
                    . == "GLATTEIS" ~ "glaze",
                    . == "GRAUPEL" ~ "graupel",
                    . == "HAGEL" ~ "hail",
                    . == "NEBEL" ~ "fog",
                    . == "REIF" ~ "frost",
                    . == "STURM_6" ~ "storm_6",
                    . == "STURM_8" ~ "storm_8",
                    . == "TAU" ~ "dew",

                    # Preciptation Network Weather Phenomena
                    . == "RR_GEWITTER" ~ "precip_net_thunderstorm",
                    . == "RR_GRAUPEL" ~ "precip_net_graupel",
                    . == "RR_HAGEL" ~ "precip_net_hail",
                    . == "RR_NEBEL" ~ "precip_net_fog",
                    TRUE ~ .
                ))

            # Ensure output columns are numeric and all missing value variants (like 8000, -999) are NA
            # We apply this to specific weather columns for precision

            df <- df %>%
                mutate(across(any_of(weather_cols), ~ {
                    # Suppress coercion warnings here because we handle the NA conversion explicitly
                    val <- suppressWarnings(as.numeric(as.character(.)))
                    # Catch 8000, 9999, -999 and anything outside reasonable physical ranges
                    # Broad filter for DWD flags: >= 7999 or <= -998
                    res <- ifelse(is.na(val) | val >= 7999 | val <= -998, NA_real_, val)
                    res
                }))

            if (length(weather_text_cols) > 0) {
                df <- df %>%
                    mutate(across(any_of(weather_text_cols), ~ {
                        val <- trimws(as.character(.))
                        val[val == ""] <- NA_character_
                        val
                    }))
            }

            # Parameter-specific safety bounds (Physical sanity check)
            if ("temp" %in% names(df)) df$temp[is.na(df$temp) | df$temp < -80 | df$temp > 60] <- NA_real_
            if ("temp_min" %in% names(df)) df$temp_min[is.na(df$temp_min) | df$temp_min < -80 | df$temp_min > 60] <- NA_real_
            if ("temp_max" %in% names(df)) df$temp_max[is.na(df$temp_max) | df$temp_max < -80 | df$temp_max > 60] <- NA_real_
            if ("dew_point" %in% names(df)) df$dew_point[is.na(df$dew_point) | df$dew_point < -90 | df$dew_point > 60] <- NA_real_
            if ("wet_bulb_temp" %in% names(df)) df$wet_bulb_temp[is.na(df$wet_bulb_temp) | df$wet_bulb_temp < -80 | df$wet_bulb_temp > 60] <- NA_real_
            if ("rh" %in% names(df)) df$rh[is.na(df$rh) | df$rh < 0 | df$rh > 100] <- NA_real_
            if ("pressure" %in% names(df)) df$pressure[is.na(df$pressure) | df$pressure < 800 | df$pressure > 1100] <- NA_real_
            if ("station_pressure" %in% names(df)) df$station_pressure[is.na(df$station_pressure) | df$station_pressure < 500 | df$station_pressure > 1100] <- NA_real_
            if ("soil_temp_min_5cm" %in% names(df)) df$soil_temp_min_5cm[is.na(df$soil_temp_min_5cm) | df$soil_temp_min_5cm < -80 | df$soil_temp_min_5cm > 60] <- NA_real_
            if ("solar_global" %in% names(df)) df$solar_global[is.na(df$solar_global) | df$solar_global < 0 | df$solar_global > 3500] <- NA_real_
            if ("snow_depth" %in% names(df)) df$snow_depth[is.na(df$snow_depth) | df$snow_depth < 0] <- NA_real_

            # --- Fix: Ghost Pressure Data ---
            # DWD Moisture (TF), Temperature (TU), and DewPoint (TD) files often contain a "P_STD" (Pressure) column.
            # In many cases (e.g. Station 00917), this is a constant dummy value (e.g. 993.9 hPa derived from elevation) and NOT measured data.
            # We must ignore it unless the file is explicitly a Pressure file (P0) or Climate file (KL).
            # Note: We check the internal data file name (target_file), not the zip path, because the app uses temp file names.
            # EXCEPTION: For 10-minute resolution (10minutenwerte), data is valid.
            fname_lower <- tolower(target_file)
            is_tf_tu_td <- any(sapply(c("_tf_", "_tu_", "_td_"), function(p) grepl(p, fname_lower, fixed = TRUE)))
            is_10min <- grepl("10minutenwerte", fname_lower, fixed = TRUE) | grepl("zehn_min", fname_lower, fixed = TRUE)

            if (is_tf_tu_td && !is_10min) {
                if ("pressure" %in% names(df)) df$pressure <- NULL
                if ("station_pressure" %in% names(df)) df$station_pressure <- NULL
            }


            df <- df %>% select(any_of(c("datetime", "datetime_end", weather_cols, weather_text_cols)))

            result_df <- as_tibble(df)
            # Attach granular metadata if available
            if (!is.null(granular_meta_df) && nrow(granular_meta_df) > 0) {
                attr(result_df, "granular_meta") <- granular_meta_df
            }
            result_df
        },
        error = function(e) {
            warning("Failed to parse ", basename(zip_path), ": ", e$message)
            return(NULL)
        }
    )
}

#' Fetch and Parse All Data for a Station
#'
#' @param station_id GHCN/DWD ID (string 5 digits)
#' @param index_df The global file index
#' @param start_date Filter start
#' @param end_date Filter end
#' @return Merged tibble of data for the window
fetch_and_parse_station_data <- function(station_id, index_df, start_date = NULL, end_date = NULL, status_cb = NULL) {
    # Helper to call callback if it exists
    # status_cb(message, detail = NULL, value = NULL)
    notify <- function(msg, detail = NULL, value = NULL) {
        if (is.function(status_cb)) status_cb(msg, detail, value)
    }

    targets <- index_df %>% filter(id == station_id)

    if (nrow(targets) == 0) {
        notify("No data found in index for this station.", value = 1)
        return(NULL)
    }

    all_data_list <- list()
    n_targets <- nrow(targets)

    last_pct <- -1

    for (i in seq_len(n_targets)) {
        target_row <- targets[i, ]
        url <- target_row$url
        tmp_zip <- tempfile(fileext = ".zip")

        dl_status <- tryCatch(
            {
                # Use curl with a handle to get progress
                h <- curl::new_handle()
                curl::handle_setopt(h, noprogress = FALSE, progressfunction = function(down, up) {
                    if (down[1] > 0) {
                        pct <- round(down[2] / down[1] * 100)
                        if (pct != last_pct) {
                            msg <- sprintf("Downloading file %d/%d...", i, n_targets)
                            detail <- sprintf("%s (%d%%)", format_bytes(down[1]), pct)
                            # Approximate progress: 0 to 0.8 range for downloading
                            val <- ((i - 1) / n_targets) + (pct / 100 * (0.8 / n_targets))
                            notify(msg, detail = detail, value = val)
                            last_pct <<- pct
                        }
                    }
                    TRUE
                })
                resp <- curl::curl_fetch_disk(url, tmp_zip, handle = h)
                if (resp$status_code >= 400) {
                    warning("Download failed for ", url, ": HTTP ", resp$status_code)
                    FALSE
                } else {
                    TRUE
                }
            },
            error = function(e) {
                warning("Download failed for ", url, ": ", e$message)
                FALSE
            }
        )

        if (dl_status) {
            notify(sprintf("Parsing file %d/%d...", i, n_targets), value = (i - 0.2) / n_targets)
            parsed <- read_dwd_data(tmp_zip, start_date, end_date)
            if (!is.null(parsed) && nrow(parsed) > 0) {
                all_data_list[[length(all_data_list) + 1]] <- parsed
            }
            unlink(tmp_zip)
        }
    }

    if (length(all_data_list) == 0) {
        notify("Failed to parse any data files.", value = 1)
        return(NULL)
    }

    notify("Merging multiple files...", value = 0.9)
    final_df <- purrr::reduce(all_data_list, full_join, by = "datetime")

    # Identify unique weather variables across all joined columns
    weather_vars <- c(
        "temp", "temp_min", "temp_max", "temp_min_avg", "temp_max_avg", "rh", "dew_point",
        "abs_humidity", "vapor_pressure", "wet_bulb_temp",
        "precip", "precip_max_day", "wind_speed", "wind_dir", "pressure", "station_pressure", "cloud_cover", "cloud_cover_indicator",
        "wind_gust_max", "solar_global", "sunshine_duration",
        "soil_temp_2cm", "soil_temp_5cm", "soil_temp_10cm", "soil_temp_20cm", "soil_temp_50cm", "soil_temp_100cm",
        "soil_temp_min_5cm",
        "snow_depth", "snow_water_equiv", "snow_fresh_sum", "snow_depth_sum",
        "thunderstorm", "glaze", "graupel", "hail", "fog", "frost", "storm_6", "storm_8", "dew",
        "precip_net_thunderstorm", "precip_net_graupel", "precip_net_hail", "precip_net_fog",
        "visibility", "visibility_indicator", "weather_code", "weather_text",
        "cloud_layer1_code", "cloud_layer1_abbrev", "cloud_layer1_height", "cloud_layer1_amount",
        "cloud_layer2_code", "cloud_layer2_abbrev", "cloud_layer2_height", "cloud_layer2_amount",
        "cloud_layer3_code", "cloud_layer3_abbrev", "cloud_layer3_height", "cloud_layer3_amount",
        "cloud_layer4_code", "cloud_layer4_abbrev", "cloud_layer4_height", "cloud_layer4_amount"
    )

    available_cols <- names(final_df)

    # Pre-allocate clean_df with datetime
    clean_df <- final_df %>% select(datetime)
    interval_cols <- available_cols[grepl("^datetime_end(\\.|$)", available_cols)]
    if (length(interval_cols) > 0) {
        clean_df$datetime_end <- do.call(coalesce, final_df[interval_cols])
    }


    for (v in weather_vars) {
        # Find all columns that are exactly 'v' or 'v.x', 'v.y', 'v.x.x' etc.
        v_cols <- available_cols[grepl(paste0("^", v, "(\\.|$)"), available_cols)]

        if (length(v_cols) > 0) {
            # Coalesce all versions of this variable
            # We use do.call(coalesce, ...) to handle any number of duplicates
            clean_df[[v]] <- do.call(coalesce, final_df[v_cols])
        }
    }

    # Final cleanup: arrange and filter by date window
    clean_df <- clean_df %>%
        distinct(datetime, .keep_all = TRUE) %>%
        arrange(datetime)

    clean_df <- filter_dwd_rows_by_date_window(clean_df, start_date, end_date)

    n_rows <- nrow(clean_df)
    notify(sprintf("Success: %d rows processed.", n_rows))

    return(clean_df)
}

# Helper to format bytes
format_bytes <- function(x) {
    if (is.na(x)) {
        return("Unknown size")
    }
    if (x < 1024) {
        return(paste(x, "B"))
    }
    if (x < 1024^2) {
        return(paste(round(x / 1024, 1), "KB"))
    }
    return(paste(round(x / 1024^2, 1), "MB"))
}