server <- function(input, output, session) {
# --- State Variables ---
current_station_id <- reactiveVal(NULL)
active_export_id <- reactiveVal(NULL)
# Stages: 0=Idle, 3=Init, 4=Head, 5=Download(Chunked), 50=Download(Blocking), 6=Parse
fetch_stage <- reactiveVal(0)
current_fetch_token <- reactiveVal(NULL) # Track unique fetch session IDs
full_station_data <- reactiveVal(NULL)
station_info <- reactiveVal(NULL)
loading_station <- reactiveVal(FALSE)
fetch_message <- reactiveVal("Fetching high-resolution hourly data...")
fetch_tmp_path <- reactiveVal(NULL)
fetch_tmp_path <- reactiveVal(NULL)
current_station_label <- reactiveVal("")
previous_station_choice <- reactiveVal(NULL)
previous_station_choices_list <- reactiveVal(NULL)
# Download Progress Tracking
fetch_total_size <- reactiveVal(0)
fetch_current_pos <- reactiveVal(0)
# Map helper reactives
map_initialized <- reactiveVal(FALSE)
stations_loaded <- reactiveVal(FALSE)
current_raster_layers <- reactiveVal(character(0))
style_change_trigger <- reactiveVal(0)
stations_before_id <- reactiveVal(NULL)
basemap_debounced <- shiny::debounce(reactive(input$basemap), 200)
# Default map view (global, matching GHCNM)
initial_lat <- 10
initial_lng <- 5
initial_zoom <- 2
# --- Initialization & Filters ---
# Show loading spinner on startup until stations are drawn
session$sendCustomMessage("freezeUI", list(
text = "Loading stations...",
station = ""
))
# Update country choices on startup
observe({
if (!is.null(stations)) {
country_choices <- sort(unique(na.omit(stations$country_name)))
updateSelectInput(session, "country", choices = country_choices)
}
})
# Ensure date range is set to default on startup
observe({
updateDateRangeInput(session, "date_range",
start = default_start_date,
end = default_end_date
)
})
# Reactive station filtering
filtered_stations <- reactive({
if (is.null(stations)) {
return(NULL)
}
data <- stations
if (!is.null(input$country) && length(input$country) > 0) {
data <- data %>% filter(.data$country_name %in% input$country)
}
sel_start_yr <- lubridate::year(input$date_range[1])
sel_end_yr <- lubridate::year(input$date_range[2])
data <- data %>%
filter(
.data$start_year <= sel_end_yr,
(.data$end_year >= sel_start_yr | .data$end_year >= max_year_data)
)
data %>% filter(!is.na(.data$latitude), !is.na(.data$longitude))
})
# SF version for maplibre
filtered_stations_sf <- reactive({
df <- filtered_stations()
req(df)
sf::st_as_sf(df, coords = c("longitude", "latitude"), crs = 4326, remove = FALSE)
})
# Reactive for country-filtered data (zooming logic)
country_stations <- reactive({
if (is.null(stations)) {
return(NULL)
}
sel_start_yr <- lubridate::year(input$date_range[1])
sel_end_yr <- lubridate::year(input$date_range[2])
data <- stations %>%
filter(
.data$start_year <= sel_end_yr,
(.data$end_year >= sel_start_yr | .data$end_year >= max_year_data)
)
if (!is.null(input$country) && length(input$country) > 0) {
data <- data %>% filter(.data$country_name %in% input$country)
}
data
})
# Observer to auto-zoom
observeEvent(input$country,
{
df <- country_stations()
if (is.null(df) || nrow(df) == 0) {
maplibre_proxy("map") %>% fly_to(center = c(initial_lng, initial_lat), zoom = initial_zoom)
return()
}
if (!is.null(input$country) && length(input$country) > 0) {
rng_lat <- range(df$latitude, na.rm = TRUE)
rng_lng <- range(df$longitude, na.rm = TRUE)
maplibre_proxy("map") %>% fit_bounds(c(rng_lng[1], rng_lat[1], rng_lng[2], rng_lat[2]), animate = TRUE)
} else {
maplibre_proxy("map") %>% fly_to(center = c(initial_lng, initial_lat), zoom = initial_zoom)
}
},
ignoreNULL = FALSE
)
output$station_count_filtered <- renderText({
df <- visible_stations()
if (is.null(df)) {
return("Loading data...")
}
paste("Stations showing:", scales::comma(nrow(df)))
})
output$map <- renderMaplibre({
maplibre(
style = ofm_positron_style,
center = c(initial_lng, initial_lat),
zoom = initial_zoom
) %>%
add_navigation_control(show_compass = FALSE, visualize_pitch = FALSE, position = "top-left")
})
# Handle initial map load - use map_zoom as readiness indicator
# Delay slightly to ensure the MapLibre style is fully loaded before adding layers
observe({
req(!map_initialized())
req(input$map_zoom)
later::later(function() {
map_initialized(TRUE)
}, delay = 0.5)
})
observe({
is_loading <- loading_station()
inputs_to_toggle <- c(
"country",
"date_range",
"zoom_home",
"main_nav",
"download_hourly",
"download_daily"
)
if (is_loading) {
for (inp in inputs_to_toggle) shinyjs::disable(inp)
} else {
for (inp in inputs_to_toggle) shinyjs::enable(inp)
}
})
# --- Station Selector Logic ---
# Update choices based on filtered stations
observe({
df <- filtered_stations()
req(df)
# Create choices: "Station Name (ID)" = "ID"
ids <- as.character(df$ghcn_id)
names <- paste0(as.character(df$name), " (", ids, ")")
if (length(ids) > 0) {
new_choices <- setNames(ids, names)
} else {
new_choices <- character(0)
}
# Only update if choices have actually changed (compare IDs)
# This prevents redrawing the input unnecessarily and resetting state
prev_choices <- previous_station_choices_list()
# Sort for comparison content-wise
new_ids_sorted <- sort(unname(new_choices))
prev_ids_sorted <- if (!is.null(prev_choices)) sort(unname(prev_choices)) else NULL
if (is.null(prev_ids_sorted) || !identical(new_ids_sorted, prev_ids_sorted)) {
# Preserve selection if still in filtered list
current_sel <- input$station_selector
# If current_sel is NULL or empty, use character(0) to ensure no selection is made
sel_arg <- if (is.null(current_sel) || current_sel == "") character(0) else current_sel
updateSelectizeInput(session, "station_selector",
choices = new_choices,
selected = sel_arg,
server = TRUE
)
previous_station_choices_list(new_choices)
}
})
# Handle selection from dropdown
observeEvent(input$station_selector, {
req(input$station_selector)
id_val <- input$station_selector
# Avoid potential loop if the updates come from map click
prev <- previous_station_choice()
if (!is.null(prev) && prev == id_val) {
return()
}
# Check if we should trigger selection
curr <- current_station_id()
if (!is.null(curr) && curr == id_val) {
return()
}
# Verify station exists in current list
meta <- stations %>% dplyr::filter(.data$ghcn_id == id_val)
if (nrow(meta) > 0) {
select_station(id_val)
maplibre_proxy("map") %>%
fly_to(center = c(meta$longitude[1], meta$latitude[1]), zoom = 9)
previous_station_choice(id_val)
}
})
# Zoom to extent of all filtered stations
observeEvent(input$zoom_home, {
df <- filtered_stations()
req(df)
if (nrow(df) > 0) {
# Calculate bounds
lons <- range(df$longitude, na.rm = TRUE)
lats <- range(df$latitude, na.rm = TRUE)
# If only one station, zoom to it with a small buffer
if (nrow(df) == 1) {
maplibre_proxy("map") %>%
fly_to(center = c(df$longitude[1], df$latitude[1]), zoom = 12)
} else {
maplibre_proxy("map") %>%
fit_bounds(c(lons[1], lats[1], lons[2], lats[2]), animate = TRUE)
}
} else {
# Fallback to default
maplibre_proxy("map") %>% fly_to(center = c(initial_lng, initial_lat), zoom = initial_zoom)
}
})
# --- Helper: Broadcast current state to parent page ---
broadcast_state <- function(view_override = NULL) {
# Get active station
sid <- current_station_id()
st_meta <- NULL
if (!is.null(sid)) {
st_meta <- stations %>%
dplyr::filter(.data$ghcn_id == sid) %>%
head(1)
}
station_id <- if (!is.null(st_meta) && nrow(st_meta) > 0) as.character(st_meta$ghcn_id) else NULL
station_name <- if (!is.null(st_meta) && nrow(st_meta) > 0) as.character(st_meta$name) else NULL
country <- if (!is.null(st_meta) && nrow(st_meta) > 0) as.character(st_meta$country_name) else NULL
# Determine current view
main_tab <- input$main_nav
view <- if (!is.null(view_override)) {
view_override
} else if (!is.null(main_tab)) {
if (main_tab == "Map View") {
"map"
} else if (main_tab == "Stations Info") {
"station-info"
} else if (main_tab == "Dashboard") {
subtab <- input$dashboard_subtabs
if (!is.null(subtab) && subtab == "Data") {
"dashboard-data"
} else {
"dashboard-plots"
}
} else {
"map"
}
} else {
"map"
}
start_date <- if (!is.null(input$date_range)) as.character(input$date_range[1]) else NULL
end_date <- if (!is.null(input$date_range)) as.character(input$date_range[2]) else NULL
session$sendCustomMessage("updateParentURL", list(
station = station_id,
stationName = station_name,
country = country,
view = view,
start = start_date,
end = end_date
))
}
# Fix Map Rendering on Tab Switch & Broadcast State
observeEvent(input$main_nav, {
if (input$main_nav == "Map View") {
# Slight delay to ensure tab is visible
shinyjs::runjs("
setTimeout(function() {
var map = HTMLWidgets.find('#map');
if (map) {
// mapgl uses resize() which is roughly equivalent to invalidateSize()
map.getMap().resize();
}
}, 200);
")
}
# Broadcast state on tab change
broadcast_state()
})
# Broadcast state on dashboard subtab change
observeEvent(input$dashboard_subtabs,
{
broadcast_state()
},
ignoreInit = TRUE
)
# --- Basemap Switching Logic ---
label_layer_ids <- c(
# OpenFreeMap Positron & Bright common labels
"waterway_line_label", "water_name_point_label", "water_name_line_label",
"highway-name-path", "highway-name-minor", "highway-name-major",
"highway-shield-non-us", "highway-shield-us-interstate", "road_shield_us",
"airport", "label_other", "label_village", "label_town", "label_state",
"label_city", "label_city_capital", "label_country_3", "label_country_2", "label_country_1",
# Bright specific labels (POIs & Directions)
"road_oneway", "road_oneway_opposite", "poi_r20", "poi_r7", "poi_r1", "poi_transit",
# Dash variants
"waterway-line-label", "water-name-point-label", "water-name-line-label",
"highway-shield-non-us", "highway-shield-us-interstate", "road-shield-us",
"label-other", "label-village", "label-town", "label-state",
"label-city", "label-city-capital", "label-country-3", "label-country-2", "label-country-1",
# Legacy/Carto/OSM
"place_villages", "place_town", "place_country_2", "place_country_1",
"place_state", "place_continent", "place_city_r6", "place_city_r5",
"place_city_dot_r7", "place_city_dot_r4", "place_city_dot_r2", "place_city_dot_z7",
"place_capital_dot_z7", "place_capital", "roadname_minor", "roadname_sec",
"roadname_pri", "roadname_major", "motorway_name", "watername_ocean",
"watername_sea", "watername_lake", "watername_lake_line", "poi_stadium",
"poi_park", "poi_zoo", "airport_label", "country-label", "state-label",
"settlement-major-label", "settlement-minor-label", "settlement-subdivision-label",
"road-label", "waterway-label", "natural-point-label", "poi-label", "airport-label"
)
non_label_layer_ids <- c(
"background", "park", "water", "landcover_ice_shelf", "landcover_glacier",
"landuse_residential", "landcover_wood", "waterway", "building",
"tunnel_motorway_casing", "tunnel_motorway_inner", "aeroway-taxiway",
"aeroway-runway-casing", "aeroway-area", "aeroway-runway",
"road_area_pier", "road_pier", "highway_path", "highway_minor",
"highway_major_casing", "highway_major_inner", "highway_major_subtle",
"highway_motorway_casing", "highway_motorway_inner", "highway_motorway_subtle",
"railway_transit", "railway_transit_dashline", "railway_service",
"railway_service_dashline", "railway", "railway_dashline",
"highway_motorway_bridge_casing", "highway_motorway_bridge_inner",
"boundary_3", "boundary_2", "boundary_disputed"
)
apply_label_visibility <- function(proxy, show_labels) {
visibility <- if (isTRUE(show_labels)) "visible" else "none"
for (layer_id in label_layer_ids) {
tryCatch(
{
proxy %>% set_layout_property(layer_id, "visibility", visibility)
},
error = function(e) {}
)
}
}
observeEvent(basemap_debounced(), {
basemap <- basemap_debounced()
proxy <- maplibre_proxy("map")
if (basemap %in% c("ofm_positron", "ofm_bright")) {
style_url <- if (basemap == "ofm_positron") ofm_positron_style else ofm_bright_style
proxy %>% set_style(style_url, preserve_layers = FALSE)
stations_before_id("waterway_line_label")
current_session <- shiny::getDefaultReactiveDomain()
selected_basemap <- basemap
later::later(function() {
shiny::withReactiveDomain(current_session, {
current_basemap <- isolate(input$basemap)
if (current_basemap != selected_basemap) {
return()
}
apply_label_visibility(maplibre_proxy("map"), isolate(input$show_labels))
style_change_trigger(isolate(style_change_trigger()) + 1)
})
}, delay = 0.35)
} else if (basemap == "sentinel") {
proxy %>% set_style(ofm_positron_style, preserve_layers = FALSE)
current_session <- shiny::getDefaultReactiveDomain()
selected_basemap <- basemap
later::later(function() {
shiny::withReactiveDomain(current_session, {
current_basemap <- isolate(input$basemap)
if (current_basemap != selected_basemap) {
return()
}
unique_suffix <- as.numeric(Sys.time()) * 1000
source_id <- paste0("sentinel_source_", unique_suffix)
layer_id <- paste0("sentinel_layer_", unique_suffix)
maplibre_proxy("map") %>%
add_raster_source(id = source_id, tiles = c(sentinel_url), tileSize = 256, attribution = sentinel_attribution) %>%
add_layer(id = layer_id, type = "raster", source = source_id, paint = list("raster-opacity" = 1), before_id = "background")
for (layer_id_kill in non_label_layer_ids) {
tryCatch(
{
maplibre_proxy("map") %>% set_layout_property(layer_id_kill, "visibility", "none")
},
error = function(e) {}
)
}
apply_label_visibility(maplibre_proxy("map"), isolate(input$show_labels))
stations_before_id("waterway_line_label")
style_change_trigger(isolate(style_change_trigger()) + 1)
})
}, delay = 0.5)
}
})
observeEvent(input$show_labels,
{
apply_label_visibility(maplibre_proxy("map"), input$show_labels)
},
ignoreInit = TRUE
)
# Update markers
observe({
df <- filtered_stations_sf()
req(df, map_initialized())
style_change_trigger()
# popup_content is pre-computed in global.R (vectorized, no per-render overhead)
proxy <- maplibre_proxy("map")
before <- stations_before_id()
proxy %>%
clear_layer("stations_layer") %>%
add_circle_layer(
id = "stations_layer",
source = df,
circle_color = "navy",
circle_radius = 6,
circle_opacity = 0.7,
circle_stroke_width = 0,
before_id = before,
tooltip = "popup_content"
)
# Re-apply highlight if a station is selected
sel_id <- isolate(current_station_id())
if (!is.null(sel_id)) {
sel_meta <- stations %>% dplyr::filter(.data$ghcn_id == sel_id)
if (nrow(sel_meta) > 0) {
highlight_selected_station(proxy, sel_meta[1, ])
}
}
# Dismiss the startup loading spinner after first render
if (!isolate(stations_loaded())) {
stations_loaded(TRUE)
session$sendCustomMessage("unfreezeUI", list())
}
})
# --- Date Range Enforcement & Freeze Feedback ---
# Track previous date values to detect which date changed
prev_start_date <- reactiveVal(default_start_date)
prev_end_date <- reactiveVal(default_end_date)
observeEvent(input$date_range,
{
req(input$date_range)
start <- input$date_range[1]
end <- input$date_range[2]
# Check validity (non-null)
if (is.na(start) || is.na(end)) {
return()
}
# Calculate difference
diff_days <- as.numeric(difftime(end, start, units = "days"))
# Enforce 366-day limit bidirectionally
if (diff_days > 366) {
# Determine which date was changed by comparing with previous values
start_changed <- !identical(start, prev_start_date())
end_changed <- !identical(end, prev_end_date())
if (start_changed && !end_changed) {
# Start date was changed - adjust end date
new_end <- start + 366
updateDateRangeInput(session, "date_range", start = start, end = new_end)
prev_start_date(start)
prev_end_date(new_end)
showNotification("Date range cannot exceed 366 days. Adjusting end date.", type = "warning", duration = 4)
} else if (end_changed && !start_changed) {
# End date was changed - adjust start date
new_start <- end - 366
updateDateRangeInput(session, "date_range", start = new_start, end = end)
prev_start_date(new_start)
prev_end_date(end)
showNotification("Date range cannot exceed 366 days. Adjusting start date.", type = "warning", duration = 4)
} else {
# Both changed or unclear - default to adjusting end date
new_end <- start + 366
updateDateRangeInput(session, "date_range", start = start, end = new_end)
prev_start_date(start)
prev_end_date(new_end)
showNotification("Date range cannot exceed 366 days. Adjusting end date.", type = "warning", duration = 4)
}
# Show feedback (Blocking)
msg <- "Date range limited to 366 days. Updating..."
session$sendCustomMessage("freezeUI", list(
text = msg,
station = current_station_label()
))
} else {
# Valid change - update previous values
prev_start_date(start)
prev_end_date(end)
# Show loading feedback if we have a station context
# This provides visual feedback while the reactive graph updates
if (!is.null(current_station_id())) {
msg <- "Updating data selection..."
session$sendCustomMessage("freezeUI", list(
text = msg,
station = current_station_label()
))
}
}
# Schedule Unfreeze
# We use a small delay to allow the 'station_data' reactive and downstream plots to update
# The plots inherently take some time to render, and this overlay hides the 'flash'
later::later(function() {
session$sendCustomMessage("unfreezeUI", list())
}, 1.0) # 1 second delay
},
ignoreInit = TRUE
)
# --- Data Logic ---
full_station_data <- reactiveVal(NULL)
station_data <- reactive({
df <- full_station_data()
req(df, input$date_range)
# Validate 366-day limit to prevent double-rendering
# If range is too large, stop here and wait for the observer to correct it
diff_days <- as.numeric(difftime(input$date_range[2], input$date_range[1], units = "days"))
req(diff_days <= 366)
# Use sidebar date range for filtering
start_dt <- lubridate::as_datetime(input$date_range[1])
end_dt <- lubridate::as_datetime(input$date_range[2]) + lubridate::hours(23) + lubridate::minutes(59)
if (length(start_dt) == 0 || length(end_dt) == 0) {
return(df)
}
df %>%
dplyr::filter(.data$datetime >= start_dt, .data$datetime <= end_dt) %>%
dplyr::arrange(.data$datetime)
})
# --- Fetching Logic (The State Machine) ---
reset_fetch <- function(msg = NULL) {
if (!is.null(msg)) message("reset_fetch called with message: ", msg)
# Invalidate token to kill pending async tasks
current_fetch_token(as.numeric(Sys.time()))
loading_station(FALSE)
fetch_stage(0)
fetch_total_size(0)
fetch_current_pos(0)
if (!is.null(msg)) station_info(list(name = msg, id = current_station_id()))
tmp <- fetch_tmp_path()
if (!is.null(tmp) && file.exists(tmp)) unlink(tmp)
fetch_tmp_path(NULL)
# Unfreeze UI when fetch is reset/cancelled
session$sendCustomMessage("unfreezeUI", list())
}
# Handle Cancel from Freeze Window
observeEvent(input$cancel_loading, {
reset_fetch("Cancelled by user")
showNotification("Loading cancelled by user.", type = "warning")
})
# Helper for station selection (shared by map and table)
select_station <- function(id) {
req(id)
# Pre-fetch meta to get name/country for the loading screen
meta <- stations %>% dplyr::filter(.data$ghcn_id == id)
if (nrow(meta) > 0) {
s_name <- meta$name
s_country <- ifelse(is.na(meta$country_name), "Unknown", meta$country_name)
current_station_label(paste0(s_name, ", ", s_country))
} else {
current_station_label(paste("Station", id))
}
full_station_data(NULL)
loading_station(TRUE)
fetch_message("Fetching high-resolution hourly data...")
# Freeze UI during download and parsing
session$sendCustomMessage("freezeUI", list(
text = "Downloading station data...",
station = current_station_label()
))
fetch_total_size(0)
fetch_current_pos(0)
# Session identification
new_token <- as.numeric(Sys.time())
current_fetch_token(new_token)
current_station_id(id)
message("Station selected: station=", id, " token=", new_token)
active_export_id(id)
if (nrow(meta) == 0) {
loading_station(FALSE)
session$sendCustomMessage("unfreezeUI", list())
return()
}
station_info(list(name = meta$name, id = meta$ghcn_id))
# Start fetch
fetch_stage(3)
# Highlight selected marker using helper function
highlight_selected_station(maplibre_proxy("map"), meta)
# Broadcast state change to parent page
broadcast_state()
}
# Stage 1: Initial Click on Map
observeEvent(input$map_feature_click, {
clicked_data <- input$map_feature_click
if (is.null(clicked_data)) {
return()
}
# Check which layer was clicked
layer_id <- clicked_data$layer_id %||% clicked_data$layer
# Check if user clicked the highlight ring
if (isTRUE(layer_id == "selected_highlight")) {
if (!is.null(full_station_data())) {
updateNavbarPage(session, "main_nav", selected = "Dashboard")
}
return()
}
# Only handle clicks on the stations layer
if (!isTRUE(layer_id == "stations_layer")) {
return()
}
# Extract station ID from feature properties
id <- clicked_data$properties$ghcn_id
if (is.null(id)) {
return()
}
select_station(id)
# Zoom and center on click
meta <- stations %>%
dplyr::filter(.data$ghcn_id == id) %>%
head(1)
if (nrow(meta) > 0) {
maplibre_proxy("map") %>% fly_to(center = c(meta$longitude, meta$latitude), zoom = 9)
}
# Sync dropdown
updateSelectizeInput(session, "station_selector", selected = id)
previous_station_choice(id)
})
# Stage 1b: Selection from Table - Double Click
output$table <- DT::renderDataTable({
df <- visible_stations()
if (is.null(df)) {
return(NULL)
}
df %>%
dplyr::select(.data$ghcn_id, .data$name, .data$country_name, .data$start_year, .data$end_year, .data$elevation, .data$state) %>%
DT::datatable(
options = list(pageLength = 20),
rownames = FALSE,
selection = "none",
colnames = c("ID", "Station Name", "Country", "Start Year", "End Year", "Elevation (m)", "State/Prov"),
callback = JS("
table.on('dblclick', 'tr', function() {
var rowData = table.row(this).data();
if (rowData !== undefined && rowData !== null) {
var stationId = rowData[0];
Shiny.setInputValue('table_station_dblclick', stationId, {priority: 'event'});
}
});
")
)
})
observeEvent(input$table_station_dblclick, {
id_val <- input$table_station_dblclick
req(id_val)
# Verify station exists
meta <- stations %>% dplyr::filter(.data$ghcn_id == id_val)
if (nrow(meta) == 0) {
return()
}
select_station(id_val)
# Highlight on map and go to Map View
maplibre_proxy("map") %>%
fly_to(center = c(meta$longitude[1], meta$latitude[1]), zoom = 9)
# Sync dropdown
updateSelectizeInput(session, "station_selector", selected = id_val)
previous_station_choice(id_val)
})
# Render Panel Header
output$panel_station_name <- renderText({
id <- current_station_id()
if (is.null(id)) {
return("")
}
meta <- stations %>%
filter(.data$ghcn_id == id) %>%
first()
if (is.null(meta)) {
return("")
}
country <- ifelse(is.na(meta$country_name) || meta$country_name == "", "Unknown", meta$country_name)
paste0(meta$name, ", ", country, " (", id, ")")
})
output$panel_station_meta <- renderText({
id <- current_station_id()
if (is.null(id)) {
return("")
}
meta <- stations %>%
filter(.data$ghcn_id == id) %>%
first()
if (is.null(meta)) {
return("")
}
period <- ifelse(is.na(meta$start_year), "Unknown", paste0(meta$start_year, " - ", meta$end_year))
paste0("Period: ", period, " | Elev: ", meta$elevation, "m")
})
# Stage 3: Init
observe({
req(fetch_stage() == 3)
station_id <- current_station_id()
if (is.null(station_id)) {
reset_fetch()
return()
}
station_info(NULL)
fetch_message("Checking availability...")
message("Stage 3: scheduling Stage 4 for ", station_id, " (token ", current_fetch_token(), ")")
this_token <- current_fetch_token()
later::later(function() {
if (identical(isolate(current_fetch_token()), this_token)) {
message("Transitioning to Stage 4 for ", station_id)
fetch_stage(4)
} else {
message("Stage 4 transition aborted: token mismatch or session ended")
}
}, 0.1)
fetch_stage(-1)
})
# Stage 4: Head Check & Setup (Robust Version)
observe({
req(fetch_stage() == 4)
station_id <- current_station_id()
if (is.null(station_id)) {
reset_fetch()
return()
}
url <- paste0("https://www.ncei.noaa.gov/oa/global-historical-climatology-network/hourly/access/by-station/GHCNh_", station_id, "_por.psv")
tryCatch(
{
if (requireNamespace("curl", quietly = TRUE)) {
# --- Attempt 1: Standard HEAD request (with redirect following) ---
h <- curl::new_handle(nobody = TRUE)
curl::handle_setopt(h, followlocation = TRUE, timeout = 10, url = url)
head_result <- try(curl::curl_fetch_memory(url, handle = h), silent = TRUE)
message("Stage 4 HEAD attempt 1 status: ", if (inherits(head_result, "try-error")) "Error" else head_result$status_code)
# Helper to validate size
get_valid_size <- function(res) {
if (inherits(res, "try-error")) {
return(NA)
}
if (res$status_code == 404) {
return(-404)
} # Mark as Not Found
if (length(res$content_length) > 0 && !is.na(res$content_length) && res$content_length > 0) {
return(res$content_length)
}
return(NA)
}
size_bytes <- get_valid_size(head_result)
message("Stage 4 size_bytes calculated: ", size_bytes)
# --- Attempt 2: Range Request Fallback ---
# If HEAD failed to give size, ask for the first byte (0-0).
if (is.na(size_bytes)) {
h_range <- curl::new_handle(followlocation = TRUE)
curl::handle_setopt(h_range, range = "0-0", timeout = 10, url = url)
range_result <- try(curl::curl_fetch_memory(url, handle = h_range), silent = TRUE)
if (!inherits(range_result, "try-error") && range_result$status_code == 206) {
# Parse headers to find Content-Range
headers <- curl::parse_headers(range_result$headers)
cr_line <- grep("Content-Range", headers, ignore.case = TRUE, value = TRUE)
if (length(cr_line) > 0) {
# Extract the total size (the number after the slash)
parts <- strsplit(cr_line, "/")[[1]]
if (length(parts) > 1) {
val <- as.numeric(trimws(parts[2]))
if (!is.na(val)) size_bytes <- val
}
}
}
}
# --- Decision ---
if (!is.na(size_bytes) && size_bytes == -404) {
reset_fetch(paste("Data not available (404) for", station_id))
return()
}
if (!is.na(size_bytes) && size_bytes > 0) {
# Success! We have a size.
fetch_total_size(size_bytes)
fetch_current_pos(0)
# Prepare temp file
tmp_file <- tempfile(fileext = ".psv")
fetch_tmp_path(tmp_file)
file.create(tmp_file)
msg <- paste0("Found file: ", round(size_bytes / (1024 * 1024), 2), " MB
Starting download...")
fetch_message(msg)
session$sendCustomMessage("freezeUI", list(
text = msg,
station = current_station_label()
))
this_token <- current_fetch_token()
later::later(function() {
if (identical(isolate(current_fetch_token()), this_token)) fetch_stage(5)
}, 0.1) # Go to Chunked Download
} else {
# Fallback: Still couldn't get size, use chunked download with unknown size
fetch_total_size(0)
fetch_current_pos(0)
# Prepare temp file
tmp_file <- tempfile(fileext = ".psv")
fetch_tmp_path(tmp_file)
file.create(tmp_file)
fetch_message("Downloading source file (size unknown)...")
session$sendCustomMessage("freezeUI", list(
text = "Downloading source file (size unknown)...",
station = current_station_label()
))
this_token <- current_fetch_token()
later::later(function() {
if (identical(isolate(current_fetch_token()), this_token)) fetch_stage(5)
}, 0.1)
}
} else {
# No curl package, fallback
this_token <- current_fetch_token()
later::later(function() {
if (identical(isolate(current_fetch_token()), this_token)) fetch_stage(50)
}, 0.1)
}
},
error = function(e) {
reset_fetch(e$message)
}
)
fetch_stage(-1)
})
# Stage 5: Chunked Download Loop (High Speed Optimization)
observe({
req(fetch_stage() == 5)
station_id <- current_station_id()
tmp_file <- fetch_tmp_path()
if (is.null(station_id) || is.null(tmp_file)) {
reset_fetch()
return()
}
url <- paste0("https://www.ncei.noaa.gov/oa/global-historical-climatology-network/hourly/access/by-station/GHCNh_", station_id, "_por.psv")
# --- NEW: High-Speed Chunk Logic ---
total_bytes <- fetch_total_size()
# Constants
min_chunk <- 5 * 1024 * 1024
max_chunk <- 50 * 1024 * 1024
if (total_bytes > 0) {
# Target only 5 updates to minimize HTTP overhead
target_chunks <- 5
calculated_chunk <- ceiling(total_bytes / target_chunks)
chunk_size <- max(min_chunk, min(calculated_chunk, max_chunk))
} else {
# Default for unknown size
chunk_size <- 5 * 1024 * 1024
}
# -------------------------------------
start_byte <- fetch_current_pos()
# Calculate end byte for this chunk
if (total_bytes > 0) {
end_byte <- min(start_byte + chunk_size - 1, total_bytes - 1)
} else {
end_byte <- start_byte + chunk_size - 1
}
tryCatch(
{
h <- curl::new_handle()
# Set Range Header
curl::handle_setopt(h, range = paste0(start_byte, "-", end_byte))
# Perform request
resp <- curl::curl_fetch_memory(url, handle = h)
# Check if still active (user may have closed modal during chunk)
if (is.null(isolate(current_station_id()))) {
return()
}
if (resp$status_code == 206) {
# Partial Content (Success)
con <- file(tmp_file, open = "ab")
writeBin(resp$content, con)
close(con)
# Check actual received bytes
bytes_received <- length(resp$content)
new_pos <- start_byte + bytes_received
fetch_current_pos(new_pos)
# Update UI Message
current_mb <- round(new_pos / (1024 * 1024), 1)
if (total_bytes > 0) {
total_mb <- round(total_bytes / (1024 * 1024), 1)
pct <- round((new_pos / total_bytes) * 100)
msg <- paste0("Downloading: ", current_mb, " MB / ", total_mb, " MB (", pct, "%)")
} else {
msg <- paste0("Downloading: ", current_mb, " MB (Unknown Total)")
}
fetch_message(msg)
session$sendCustomMessage("freezeUI", list(
text = msg,
station = current_station_label()
))
# Check if done
done <- FALSE
if (total_bytes > 0) {
if (new_pos >= total_bytes) done <- TRUE
} else {
# If we received fewer bytes than requested, we hit EOF
if (bytes_received < chunk_size) done <- TRUE
}
if (done) {
final_msg <- paste0("Download Complete (", current_mb, " MB). Parsing...")
fetch_message(final_msg)
session$sendCustomMessage("freezeUI", list(
text = final_msg,
station = current_station_label()
))
# Store final size if it was unknown
if (total_bytes == 0) fetch_total_size(new_pos)
this_token <- current_fetch_token()
later::later(function() {
if (identical(isolate(current_fetch_token()), this_token)) fetch_stage(6)
}, 0.1)
} else {
# Schedule next chunk
this_token <- current_fetch_token()
later::later(function() {
if (identical(isolate(current_fetch_token()), this_token)) fetch_stage(5)
}, 0.01)
}
} else if (resp$status_code == 200) {
# Server ignored range and sent whole file
con <- file(tmp_file, open = "wb")
writeBin(resp$content, con)
close(con)
# Update size
new_pos <- length(resp$content)
fetch_total_size(new_pos)
fetch_message("Download Complete. Parsing...")
session$sendCustomMessage("freezeUI", list(
text = "Download Complete. Parsing...",
station = current_station_label()
))
this_token <- current_fetch_token()
later::later(function() {
if (identical(isolate(current_fetch_token()), this_token)) fetch_stage(6)
}, 0.1)
} else if (resp$status_code == 416) {
# Range Not Satisfiable
if (total_bytes == 0 && start_byte > 0) {
# We likely finished reading an unknown-length file exactly at the boundary previously
# Move to parse
fetch_message("Download Complete. Parsing...")
session$sendCustomMessage("freezeUI", list(
text = "Download Complete. Parsing...",
station = current_station_label()
))
this_token <- current_fetch_token()
later::later(function() {
if (identical(isolate(current_fetch_token()), this_token)) fetch_stage(6)
}, 0.1)
} else {
reset_fetch(paste("Download error: Status", resp$status_code))
}
} else {
reset_fetch(paste("Download error: Status", resp$status_code))
}
},
error = function(e) {
reset_fetch(paste("Download error:", e$message))
}
)
fetch_stage(-1)
})
# Stage 50: Fallback Blocking Download (Legacy)
observe({
req(fetch_stage() == 50)
station_id <- current_station_id()
if (is.null(station_id)) {
reset_fetch()
return()
}
url <- paste0("https://www.ncei.noaa.gov/oa/global-historical-climatology-network/hourly/access/by-station/GHCNh_", station_id, "_por.psv")
tmp_file <- tempfile(fileext = ".psv")
fetch_tmp_path(tmp_file)
tryCatch(
{
utils::download.file(url, tmp_file, quiet = TRUE, mode = "wb")
# Check if still active
if (is.null(isolate(current_station_id()))) {
return()
}
if (!file.exists(tmp_file) || file.info(tmp_file)$size == 0) {
reset_fetch("Download failed (empty file)")
return()
}
sz_mb <- round(file.info(tmp_file)$size / (1024 * 1024), 2)
fetch_total_size(file.info(tmp_file)$size) # Store for final summary
msg <- paste0("Parsing records (", sz_mb, " MB)...")
fetch_message(msg)
session$sendCustomMessage("freezeUI", list(
text = msg,
station = current_station_label()
))
this_token <- current_fetch_token()
later::later(function() {
if (identical(isolate(current_fetch_token()), this_token)) fetch_stage(6)
}, 0.1)
},
error = function(e) {
reset_fetch(e$message)
}
)
fetch_stage(-1)
})
# Stage 6: Parse
observe({
req(fetch_stage() == 6)
station_id <- current_station_id()
tmp_file <- fetch_tmp_path()
if (is.null(station_id) || is.null(tmp_file) || !file.exists(tmp_file)) {
reset_fetch("Missing download file")
return()
}
# Update overlay message for parsing stage
sz_mb <- round(fetch_total_size() / (1024 * 1024), 2)
msg <- paste0("Parsing data (Source: ", sz_mb, " MB)...")
fetch_message(msg)
session$sendCustomMessage("freezeUI", list(
text = msg,
station = current_station_label()
))
tryCatch(
{
# Use the refactored parsing function
df <- parse_ghcnh_data(tmp_file)
if (!is.null(df) && nrow(df) > 0) {
full_station_data(df)
# Final summary
mem_size <- format(object.size(df), units = "Mb")
dl_size_mb <- round(fetch_total_size() / (1024 * 1024), 2)
final_msg <- paste0(
"Processing Complete!
",
"",
"Source File: ", dl_size_mb, " MB
",
"In-Memory Data: ", mem_size, "",
""
)
fetch_message(final_msg)
loading_station(FALSE)
fetch_stage(0)
unlink(tmp_file)
fetch_tmp_path(NULL)
# Transition to rendering stage and navigate to Dashboard
this_token <- current_fetch_token()
session$sendCustomMessage("freezeUI", list(
text = "Rendering plots...",
station = current_station_label()
))
# Navigate to Dashboard tab
updateNavbarPage(session, "main_nav", selected = "Dashboard")
later::later(function() {
if (identical(isolate(current_fetch_token()), this_token)) {
session$sendCustomMessage("unfreezeUI", list())
}
}, 2.5) # Give 2.5 seconds for plots to render
} else {
reset_fetch("No valid records found")
}
},
error = function(e) {
reset_fetch(paste("Parse error:", e$message))
}
)
fetch_stage(-1)
})
output$station_ready <- reactive({
!is.null(full_station_data())
})
outputOptions(output, "station_ready", suspendWhenHidden = FALSE)
output$is_loading <- reactive({
loading_station()
})
outputOptions(output, "is_loading", suspendWhenHidden = FALSE)
# --- Plotting & Tables ---
output$temp_plot <- renderPlotly({
df <- station_data()
create_temperature_plot(df)
})
output$humidity_plot <- renderPlotly({
df <- station_data()
create_humidity_plot(df)
})
output$wind_overview_plot <- renderPlotly({
df <- station_data()
create_wind_overview_plot(df)
})
output$pressure_plot <- renderPlotly({
df <- station_data()
create_pressure_plot(df)
})
output$visibility_plot <- renderPlotly({
df <- station_data()
create_visibility_plot(df)
})
output$precip_plot <- renderPlotly({
df <- station_data()
create_precipitation_plot(df)
})
output$wind_rose <- renderPlotly({
df <- station_data()
create_wind_rose_plot(df)
})
output$diurnal_plot <- renderPlotly({
df <- station_data()
id <- current_station_id()
offset <- 0
if (!is.null(id)) {
meta <- stations %>%
dplyr::filter(.data$ghcn_id == id) %>%
dplyr::first()
if (!is.null(meta) && !is.na(meta$longitude)) {
# Approximate timezone offset: 15 degrees longitude = 1 hour
offset <- round(meta$longitude / 15)
}
}
create_diurnal_plot(df, offset_hours = offset)
})
output$weathergami_plot <- renderPlotly({
df <- station_data()
create_weathergami_plot(df)
})
# Station Info Header - Info Callout Card (like DWD)
output$station_info_header <- renderUI({
id <- current_station_id()
if (is.null(id)) {
return(NULL)
}
# Metadata
meta <- stations %>%
dplyr::filter(.data$ghcn_id == id) %>%
dplyr::first()
if (is.null(meta)) {
return(NULL)
}
s_name <- meta$name
s_country <- ifelse(is.na(meta$country_name) || meta$country_name == "", "Unknown", meta$country_name)
s_elev <- meta$elevation
# Data Range
df <- station_data()
if (is.null(df) || nrow(df) == 0) {
dates_text <- "No data loaded"
} else {
date_range <- range(as.Date(df$datetime), na.rm = TRUE)
dates_text <- paste(date_range[1], "to", date_range[2])
}
# Unified Info Card
card(
style = "margin-bottom: 20px; border-left: 5px solid #007bff;",
card_body(
padding = 15,
layout_columns(
fill = FALSE,
# Col 1: Station
div(
strong("Station"), br(),
span(s_name, style = "font-size: 1.1rem;"), br(),
tags$small(class = "text-muted", paste("ID:", id))
),
# Col 2: Location
div(
strong("Location"), br(),
span(s_country), br(),
tags$small(class = "text-muted", paste0(meta$latitude, "°N, ", meta$longitude, "°E"))
),
# Col 3: Elevation & Period
div(
strong("Technical"), br(),
span(paste0(s_elev, " m")), br(),
span(class = "badge bg-primary", "Hourly")
),
# Col 4: Period
div(
strong("Data Selection"), br(),
span(dates_text)
)
)
)
)
})
# Dynamic Details Tabs
# Dynamic Details Tabs - Grid Layout (DWD Style)
output$details_tabs <- renderUI({
df <- station_data()
req(df)
# Check data availability
has_temp <- "temp" %in% names(df) && any(!is.na(df$temp))
has_humidity <- ("rh" %in% names(df) && any(!is.na(df$rh))) || ("dew_point" %in% names(df) && any(!is.na(df$dew_point)))
has_wind <- ("wind_speed" %in% names(df) && any(!is.na(df$wind_speed))) || ("wind_gust" %in% names(df) && any(!is.na(df$wind_gust)))
has_wind_rose <- has_wind && "wind_dir" %in% names(df) && any(!is.na(df$wind_dir))
has_pressure <- ("pressure" %in% names(df) && any(!is.na(df$pressure))) || ("station_pressure" %in% names(df) && any(!is.na(df$station_pressure)))
has_vis <- "vis" %in% names(df) && any(!is.na(df$vis))
# Check precip columns (precip, precip_1h, precip_3h, etc.)
precip_cols <- grep("^precip", names(df), value = TRUE)
has_precip <- length(precip_cols) > 0 && any(sapply(df[precip_cols], function(x) any(!is.na(x))))
has_diurnal <- has_temp
has_weathergami <- has_temp
plot_list <- tagList()
if (has_temp) {
plot_list <- tagList(plot_list, div(class = "col-12 col-lg-6", plotlyOutput("temp_plot", height = "320px")))
}
if (has_humidity) {
plot_list <- tagList(plot_list, div(class = "col-12 col-lg-6", plotlyOutput("humidity_plot", height = "320px")))
}
if (has_wind) {
plot_list <- tagList(plot_list, div(class = "col-12 col-lg-6", plotlyOutput("wind_overview_plot", height = "320px")))
}
if (has_pressure) {
plot_list <- tagList(plot_list, div(class = "col-12 col-lg-6", plotlyOutput("pressure_plot", height = "320px")))
}
if (has_vis) {
plot_list <- tagList(plot_list, div(class = "col-12 col-lg-6", plotlyOutput("visibility_plot", height = "320px")))
}
if (has_precip) {
plot_list <- tagList(plot_list, div(class = "col-12 col-lg-6", plotlyOutput("precip_plot", height = "320px")))
}
if (has_wind_rose) {
plot_list <- tagList(plot_list, div(class = "col-12 col-lg-6", plotlyOutput("wind_rose", height = "320px")))
}
if (has_diurnal) {
plot_list <- tagList(plot_list, div(class = "col-12 col-lg-6", plotlyOutput("diurnal_plot", height = "320px")))
}
if (has_weathergami) {
plot_list <- tagList(plot_list, div(class = "col-12 col-lg-6", plotlyOutput("weathergami_plot", height = "320px")))
}
div(
class = "row g-3",
style = "padding: 10px;",
plot_list
)
})
output$hourly_data_table <- DT::renderDataTable({
df <- station_data()
if (is.null(df) || nrow(df) == 0) {
return(NULL)
}
# Select and rename columns for display
display_df <- df %>%
dplyr::select(
datetime,
temp,
dew_point,
rh,
pressure,
wind_speed,
wind_dir,
wind_gust,
precip,
vis
)
# Use global mapping for column names
# Create vector of display names based on selected columns
col_names <- names(display_df)
display_names <- col_names_map[col_names]
# Fallback for any missing mappings
display_names[is.na(display_names)] <- col_names[is.na(display_names)]
DT::datatable(
display_df,
colnames = unname(display_names),
options = list(
pageLength = 12,
dom = "lrtip",
scrollX = TRUE,
columnDefs = list(list(className = "dt-center", targets = "_all")),
order = list(list(0, "asc"))
),
rownames = FALSE,
selection = "none"
) %>%
DT::formatRound(columns = c("temp", "dew_point", "rh", "pressure", "wind_speed", "wind_dir", "wind_gust", "precip", "vis"), digits = 1) %>%
DT::formatDate(columns = "datetime", method = "toLocaleString")
})
output$daily_summary_main <- DT::renderDataTable({
df <- station_data()
summary_df <- create_daily_summary(df)
if (is.null(summary_df) || nrow(summary_df) == 0) {
return(NULL)
}
# Explicitly select and order columns to match the names
display_summary_df <- summary_df %>%
dplyr::select(
date,
avg_temp = Tavg,
max_temp = Tmax,
min_temp = Tmin,
max_wind = Wind_Max,
avg_rh = RH_Avg,
avg_vis = Vis_Avg,
avg_pressure = Pavg,
precip_sum = Precip_Sum
)
DT::datatable(
display_summary_df,
colnames = unname(c(
col_names_map["date"],
col_names_map["avg_temp"],
col_names_map["max_temp"],
col_names_map["min_temp"],
col_names_map["max_wind"],
col_names_map["avg_rh"],
col_names_map["avg_vis"],
col_names_map["avg_pressure"],
col_names_map["precip_sum"]
)),
options = list(
pageLength = 12,
dom = "lrtip",
columnDefs = list(list(className = "dt-center", targets = "_all")),
order = list(list(0, "asc"))
),
rownames = FALSE,
selection = "none"
)
})
# Track valid map bounds (persist when map is hidden)
last_valid_bounds <- reactiveVal(NULL)
observe({
# Only update bounds if map is visible and bounds seem valid
# This prevents the table from going empty when switching tabs (map hidden -> bounds=0)
req(input$map_bounds)
if (is.null(input$main_nav) || input$main_nav == "Map View") {
b <- input$map_bounds
# Basic validity check (avoid 0-area bounds if map is hidden/collapsed)
if (!is.null(b$north) && !is.null(b$south) && b$north != b$south) {
last_valid_bounds(b)
}
}
})
# Reactive for filtering by map bounds
visible_stations <- reactive({
data <- filtered_stations()
if (is.null(data)) {
return(NULL)
}
# Use persisted bounds if available, otherwise live bounds (or all data)
bounds <- last_valid_bounds()
# Fallback to live bounds if we haven't captured any yet (edge case)
if (is.null(bounds)) {
bounds <- input$map_bounds
}
if (is.null(bounds)) {
return(data)
}
data %>%
dplyr::filter(
latitude >= bounds$south,
latitude <= bounds$north,
longitude >= bounds$west,
longitude <= bounds$east
)
})
output$plot_info <- renderUI({
info <- station_info()
if (is.null(info)) {
return(NULL)
}
tagList(h5(info$name), p(tags$small("Station ID: ", info$id)))
})
output$download_stations <- downloadHandler(
filename = function() {
paste("ghcnh_stations_", Sys.Date(), ".xlsx", sep = "")
},
content = function(file) {
df <- visible_stations()
if (is.null(df)) {
return(NULL)
}
write_xlsx(df, path = file)
}
)
# Export Hourly Data Button
output$download_hourly <- downloadHandler(
filename = function() {
st_id <- active_export_id()
d_start <- input$date_range[1]
d_end <- input$date_range[2]
if (!isTruthy(st_id)) {
return(paste0("ghcnh_hourly_", Sys.Date(), ".xlsx"))
}
paste0("GHCNh_hourly_", st_id, "_", d_start, "_to_", d_end, ".xlsx")
},
content = function(file) {
df <- station_data()
if (is.null(df) || nrow(df) == 0) {
write_xlsx(data.frame(Message = "No data found"), path = file)
} else {
write_xlsx(df, path = file)
}
}
)
# Export Daily Data Button
output$download_daily <- downloadHandler(
filename = function() {
st_id <- active_export_id()
d_start <- input$date_range[1]
d_end <- input$date_range[2]
if (!isTruthy(st_id)) {
return(paste0("ghcnh_daily_", Sys.Date(), ".xlsx"))
}
paste0("GHCNh_daily_", st_id, "_", d_start, "_to_", d_end, ".xlsx")
},
content = function(file) {
df <- station_data()
summary_df <- create_daily_summary(df)
if (is.null(summary_df) || nrow(summary_df) == 0) {
write_xlsx(data.frame(Message = "No data found"), path = file)
} else {
write_xlsx(summary_df, path = file)
}
}
)
}