Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import requests | |
| from datetime import datetime, timedelta | |
| from pytz import timezone | |
| from io import BytesIO | |
| import time | |
| import folium | |
| import base64 | |
| import pandas as pd | |
| import numpy as np | |
| from PIL import Image, ImageFilter, ImageEnhance | |
| st.set_page_config(layout="wide", page_title="Rainfall Data Dashboard") | |
| HONG_KONG_TZ = timezone('Asia/Hong_Kong') | |
| RADAR_BASE_URL = "https://www.hko.gov.hk/wxinfo/radars/rad_064_png/2d064nradar_{}.jpg" | |
| API_URL = "https://data.weather.gov.hk/weatherAPI/opendata/weather.php?dataType=rhrread&lang=en" | |
| COLORS_TO_EXTRACT = [ | |
| "#ed00f0", "#c3006a", "#dc0201", "#f00000", "#ed8202", | |
| "#eeb000", "#fada04", "#e1cf00", "#8fff00", "#01f908", | |
| "#01f808", "#00d002", "#01a835", "#008448", "#3b96ff", | |
| "#008ff5", "#00c8fb" | |
| ] | |
| COLORS_TO_EXTRACT_RGB = [tuple(int(color[i:i+2], 16) for i in (1, 3, 5)) for color in COLORS_TO_EXTRACT] | |
| def get_nearest_6_minute_interval(time): | |
| return time.replace(minute=(time.minute // 6) * 6, second=0, microsecond=0) | |
| def get_backward_6_minute_intervals(current_time, hours=3): | |
| intervals = [] | |
| interval_time = get_nearest_6_minute_interval(current_time) | |
| end_time = current_time - timedelta(hours=hours) | |
| while interval_time >= end_time: | |
| intervals.append(interval_time) | |
| interval_time -= timedelta(minutes=6) | |
| return intervals | |
| def fetch_radar_image(timestamp): | |
| url = RADAR_BASE_URL.format(timestamp.strftime('%Y%m%d%H%M')) | |
| response = requests.get(url) | |
| return Image.open(BytesIO(response.content)) if response.status_code == 200 else None | |
| def fetch_radar_image_with_rollback(timestamp): | |
| for i in range(31): # 30 steps of 6 minutes = 3 hours | |
| image = fetch_radar_image(timestamp - timedelta(minutes=6 * i)) | |
| if image: | |
| return image, timestamp - timedelta(minutes=6 * i) | |
| return None, None | |
| def image_to_base64(image): | |
| buffered = BytesIO() | |
| image.save(buffered, format="PNG") | |
| return base64.b64encode(buffered.getvalue()).decode() | |
| def extract_color_pixels(img_array, colors, tolerance=30): | |
| return np.any([np.all(np.abs(img_array - color) <= tolerance, axis=-1) for color in colors], axis=0) | |
| def filter_image_by_color(image, colors_to_extract_rgb): | |
| img_array = np.array(image.convert("RGBA")) | |
| color_mask = extract_color_pixels(img_array[..., :3], colors_to_extract_rgb) | |
| img_array[~color_mask] = [255, 255, 255, 0] | |
| return Image.fromarray(img_array) | |
| def smooth_image(image): | |
| return image.filter(ImageFilter.GaussianBlur(radius=1)) | |
| def enhance_contrast(image, factor=1.5): | |
| enhancer = ImageEnhance.Contrast(image) | |
| enhanced_image = enhancer.enhance(factor) | |
| return enhanced_image | |
| def create_map_with_radar_tile(image): | |
| filtered_image = filter_image_by_color(image, COLORS_TO_EXTRACT_RGB) | |
| smoothed_image = smooth_image(filtered_image) | |
| enhanced_image = enhance_contrast(smoothed_image, factor=1.5) | |
| m = folium.Map(location=[22.364, 114.148], zoom_start=10, min_zoom=10, max_zoom=19, | |
| tiles='https://mapapi.geodata.gov.hk/gs/api/v1.0.0/xyz/imagery/wgs84/{z}/{x}/{y}.png', | |
| attr="Map information from Lands Department", control_scale=True, name="Basemap") | |
| folium.TileLayer( | |
| tiles='https://mapapi.geodata.gov.hk/gs/api/v1.0.0/xyz/label/hk/en/wgs84/{z}/{x}/{y}.png', | |
| attr="Map information from Lands Department", | |
| overlay=True, | |
| name="Labels" | |
| ).add_to(m) | |
| img_url = f"data:image/png;base64,{image_to_base64(enhanced_image)}" | |
| folium.raster_layers.ImageOverlay( | |
| image=img_url, | |
| name="HKO Radar Image", | |
| bounds=[[22.893, 113.538], [21.716, 115.362]], | |
| opacity=0.95, | |
| interactive=False, | |
| cross_origin=False, | |
| zindex=1, | |
| ).add_to(m) | |
| folium.LayerControl().add_to(m) | |
| return m._repr_html_() | |
| def fetch_and_process_rainfall_data(): | |
| response = requests.get(API_URL) | |
| data = response.json() | |
| df = pd.DataFrame(data['rainfall']['data']) | |
| df['max'] = pd.to_numeric(df['max'], errors='coerce') | |
| return df | |
| # Main app | |
| current_time_hkt = datetime.utcnow().replace(tzinfo=timezone('UTC')).astimezone(HONG_KONG_TZ) | |
| time_intervals = get_backward_6_minute_intervals(current_time_hkt) | |
| default_time = get_nearest_6_minute_interval(current_time_hkt) | |
| col1, col2 = st.columns([2.2, 1]) | |
| with col1: | |
| st.subheader('Georeferenced Radar Image (64 km)') | |
| slider = st.empty() | |
| selected_time = slider.slider( | |
| "Select Time:", | |
| min_value=min(time_intervals), | |
| max_value=max(time_intervals), | |
| value=default_time, | |
| format="YYYY-MM-DD HH:mm", | |
| step=timedelta(minutes=6), | |
| key="initial_time_slider" | |
| ) | |
| map_placeholder = st.empty() | |
| info_placeholder = st.empty() | |
| cola1, cola2 = st.columns([1, 3]) | |
| with cola1: | |
| play = st.button("3-hour Sequence") | |
| with cola2: | |
| st.markdown(f""" | |
| <style> | |
| .color-bar {{ | |
| height: 20px; | |
| width: 100%; | |
| background: linear-gradient(to right, {', '.join(COLORS_TO_EXTRACT)}); | |
| }} | |
| .color-labels {{ | |
| display: flex; | |
| justify-content: space-between; | |
| font-size: 10px; | |
| }} | |
| </style> | |
| <div class="color-labels">Rainfall rate (mm/h)</div> | |
| <div class="color-bar"></div> | |
| <div class="color-labels">{' '.join([f'<span>{label}</span>' for label in ['>300', '200-300', '150-200', '100-150', '75-100', '50-75', '30-50', '15-30', '10-15', '7-10', '5-7', '3-5', '2-3', '1-2', '0.50-1', '0.15-0.50']])}</div> | |
| """, unsafe_allow_html=True) | |
| if play: | |
| for i, interval in enumerate(reversed(time_intervals)): | |
| image, actual_time = fetch_radar_image_with_rollback(interval) | |
| if image: | |
| # Update slider with the actual time of the image | |
| slider.slider( | |
| "Select Time:", | |
| min_value=min(time_intervals), | |
| max_value=max(time_intervals), | |
| value=actual_time, | |
| format="YYYY-MM-DD HH:mm", | |
| step=timedelta(minutes=6), | |
| key=f"time_slider_{i}" | |
| ) | |
| # Create and display the map | |
| map_html = create_map_with_radar_tile(image) | |
| map_placeholder.empty() | |
| map_placeholder = st.components.v1.html(map_html,height=750) | |
| if actual_time != interval: | |
| info_placeholder.warning( | |
| f"Showing the nearest available image from {actual_time.strftime('%Y-%m-%d %H:%M')}.") | |
| else: | |
| info_placeholder.empty() | |
| time.sleep(0.01) | |
| else: | |
| info_placeholder.error(f"Could not fetch any radar image for {interval.strftime('%Y-%m-%d %H:%M')}") | |
| else: | |
| # Fetch the radar image with rollback for the selected time | |
| image, actual_time = fetch_radar_image_with_rollback(selected_time) | |
| if image: | |
| # Create and display the map | |
| map_html = create_map_with_radar_tile(image) | |
| map_placeholder.empty() | |
| map_placeholder = st.components.v1.html(map_html, height=750) | |
| if actual_time != selected_time: | |
| info_placeholder.warning( | |
| f"Showing the nearest available image from {actual_time.strftime('%Y-%m-%d %H:%M')}.") | |
| # Update slider to match the actual image time | |
| slider.slider( | |
| "Select Time:", | |
| min_value=min(time_intervals), | |
| max_value=max(time_intervals), | |
| value=actual_time, | |
| format="YYYY-MM-DD HH:mm", | |
| step=timedelta(minutes=6), | |
| key="adjusted_time_slider" | |
| ) | |
| else: | |
| info_placeholder.empty() | |
| else: | |
| info_placeholder.error( | |
| f"Could not fetch any radar image within the last 3 hours of {selected_time.strftime('%Y-%m-%d %H:%M')}") | |
| with col2: | |
| df = fetch_and_process_rainfall_data() | |
| areas_with_rainfall = df[df['max'] > 0]['place'].tolist() | |
| areas_with_no_rainfall = df[df['max'] == 0]['place'].tolist() | |
| st.caption('The following is the past hour rainfall from HKO Automatic Weather Station, updated hourly.') | |
| col_1, col_2 = st.columns(2) | |
| st.markdown( | |
| """ | |
| <style> | |
| [data-testid="stMetricValue"] { | |
| font-size: 26px; | |
| } | |
| </style> | |
| """, | |
| unsafe_allow_html=True, | |
| ) | |
| with col_1: | |
| st.metric("Average Rainfall", f"{df['max'].mean():.2f} mm") | |
| st.metric("Maximum Rainfall", f"{df['max'].max()} mm") | |
| with col_2: | |
| st.metric("Areas with Rainfall", f"{len(areas_with_rainfall)}") | |
| st.metric("Areas with No Rainfall", f"{len(areas_with_no_rainfall)}") | |
| st.dataframe(df.sort_values(by='max', ascending=False)[['place', 'max']], use_container_width=True, height=480) | |
| # JavaScript for auto-reloading every 5 minutes | |
| st.markdown( | |
| """ | |
| <script> | |
| function reloadPage() { | |
| window.location.reload(); | |
| } | |
| setTimeout(reloadPage, 100000); | |
| </script> | |
| """, | |
| unsafe_allow_html=True | |
| ) | |