| | import logging |
| | import os |
| | from collections import deque |
| | from pathlib import Path |
| | from threading import Event |
| | from typing import Dict, Optional, Tuple |
| |
|
| | import gradio as gr |
| |
|
| | from gefs_wave import WaveDownloadManager, logger as wave_logger |
| |
|
| | DATA_ROOT = Path(os.environ.get("GEFS_WAVE_DATA_DIR", "data/gefswave")) |
| | manager = WaveDownloadManager(DATA_ROOT) |
| | _refresh_started = Event() |
| | _log_buffer: deque[str] = deque(maxlen=200) |
| |
|
| |
|
| | class UILogHandler(logging.Handler): |
| | def emit(self, record) -> None: |
| | try: |
| | message = self.format(record) |
| | except Exception: |
| | message = record.getMessage() |
| | _log_buffer.append(message) |
| |
|
| |
|
| | log_formatter = logging.Formatter("%Y-%m-%d %H:%M:%S %(levelname)s %(message)s") |
| | ui_log_handler = UILogHandler() |
| | ui_log_handler.setFormatter(log_formatter) |
| |
|
| | if not any(isinstance(handler, UILogHandler) for handler in wave_logger.handlers): |
| | wave_logger.addHandler(ui_log_handler) |
| | wave_logger.setLevel(logging.INFO) |
| |
|
| |
|
| | def _log_text() -> str: |
| | if not _log_buffer: |
| | return "No log messages yet." |
| | return "\n".join(_log_buffer) |
| |
|
| |
|
| | def ensure_refresh() -> None: |
| | """Kick off the downloader thread once per process.""" |
| | if not _refresh_started.is_set(): |
| | _refresh_started.set() |
| | manager.trigger_refresh() |
| |
|
| |
|
| | def format_status(status: Dict, notice: Optional[str] = None) -> Tuple[Dict, str]: |
| | """Return status JSON and a markdown summary.""" |
| | latest = status.get("latest_state", {}) |
| | lines = [ |
| | f"**Downloader status:** `{status.get('status', 'unknown')}`", |
| | ] |
| | if "message" in status: |
| | lines.append(f"- Message: {status['message']}") |
| | if latest: |
| | if dataset := latest.get("product_label"): |
| | lines.append(f"- Dataset: `{dataset}`") |
| | lines.extend( |
| | [ |
| | f"- Latest cycle: `{latest.get('cycle', 'N/A')}`", |
| | f"- Updated at: `{latest.get('updated_at', 'N/A')}`", |
| | f"- Files cached: {len(latest.get('files', []))}", |
| | ] |
| | ) |
| | if tarball := latest.get("tarball"): |
| | lines.append(f"- Tarball path: `{tarball}`") |
| | else: |
| | lines.append("- No cycle downloaded yet.") |
| | lines.append("\nUse **Trigger Refresh** to fetch a newer cycle when available.") |
| | if notice: |
| | lines.append(f"\n⚠️ {notice}") |
| | return status, "\n".join(lines) |
| |
|
| |
|
| | def get_status() -> Tuple[Dict, str, Optional[str], str]: |
| | ensure_refresh() |
| | status = manager.status() |
| | status_dict, status_text = format_status(status) |
| | return status_dict, status_text, None, _log_text() |
| |
|
| |
|
| | def trigger_refresh() -> Tuple[Dict, str, Optional[str], str]: |
| | ensure_refresh() |
| | manager.trigger_refresh() |
| | status = manager.status() |
| | status_dict, status_text = format_status(status) |
| | return status_dict, status_text, None, _log_text() |
| |
|
| |
|
| | def download_tarball() -> Tuple[Dict, str, Optional[str], str]: |
| | ensure_refresh() |
| | status = manager.status() |
| | state = status.get("latest_state", {}) |
| |
|
| | tarball_rel = state.get("tarball") |
| | if not tarball_rel: |
| | status_dict, status_text = format_status( |
| | status, "Wave dataset not available yet. Trigger a refresh and try again." |
| | ) |
| | return status_dict, status_text, None, _log_text() |
| |
|
| | tarball_path = DATA_ROOT / tarball_rel |
| | if not tarball_path.exists(): |
| | status_dict, status_text = format_status( |
| | status, "Tarball missing on disk. Trigger a refresh and try again." |
| | ) |
| | return status_dict, status_text, None, _log_text() |
| |
|
| | status_dict, status_text = format_status(status) |
| | return status_dict, status_text, str(tarball_path), _log_text() |
| |
|
| |
|
| | with gr.Blocks(title="GFES Wave 0 Downloader") as demo: |
| | gr.Markdown( |
| | """ |
| | ## GFES Wave 0 Downloader |
| | Fetch the latest NOAA GEFS Wave control member (`c00`) global 0.25° dataset. |
| | |
| | 1. Check the status panel to confirm the current cycle. |
| | 2. Press **Trigger Refresh** to request a newer cycle if one is available. |
| | 3. Use **Download Latest Tarball** to grab a `.tar.gz` of every GRIB2 file for that cycle. |
| | """ |
| | ) |
| | status_json = gr.JSON(label="Downloader Status") |
| | status_md = gr.Markdown() |
| | logs_box = gr.Textbox(label="Recent Logs", lines=12, max_lines=12, interactive=False) |
| |
|
| | with gr.Row(): |
| | refresh_button = gr.Button("Trigger Refresh", variant="secondary") |
| | download_button = gr.DownloadButton("Download Latest Tarball", variant="primary") |
| |
|
| | demo.load(get_status, outputs=[status_json, status_md, download_button, logs_box]) |
| | refresh_button.click(trigger_refresh, outputs=[status_json, status_md, download_button, logs_box]) |
| | download_button.click(download_tarball, outputs=[status_json, status_md, download_button, logs_box]) |
| |
|
| | demo.queue() |
| |
|
| |
|
| | if __name__ == "__main__": |
| | ensure_refresh() |
| | port = int(os.environ.get("PORT", 7860)) |
| | demo.launch(server_name="0.0.0.0", server_port=port, show_api=False) |
| |
|