Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import cv2 | |
| from PIL import Image, ImageDraw, ImageFont | |
| import requests | |
| import colorsys | |
| from typing import Dict, List, Tuple, Optional | |
| import json | |
| from dataclasses import dataclass | |
| class ColorRange: | |
| """Represents a precipitation intensity range with its color.""" | |
| min_value: float | |
| max_value: float | |
| rgb: Tuple[int, int, int] | |
| name: str | |
| class RadarAnalyzer: | |
| """ | |
| Dual radar analyzer for Canadian and US weather radar data. | |
| Analyzes dBZ reflectivity values using color mapping from radar legends. | |
| """ | |
| def __init__(self, radar_type: str = "canadian", legend_path: str = None): | |
| self.radar_type = radar_type.lower() | |
| if self.radar_type == "canadian": | |
| # ECCC Radar WMS endpoints | |
| self.wms_base_url = "https://geo.weather.gc.ca/geomet" | |
| self.radar_layer = "RADAR_1KM_RRAI" # 1km Rain Radar | |
| self.default_legend = "radar_legendwellcropped.png" | |
| self.bounds = (20.0, 85.0, -170.0, -40.0) # lat_min, lat_max, lon_min, lon_max | |
| elif self.radar_type == "american" or self.radar_type == "us": | |
| # NOAA/NWS Radar endpoints (will be added) | |
| self.wms_base_url = "https://nowcoast.noaa.gov/arcgis/services/nowcoast/radar_meteo_imagery_nexrad_time/MapServer/WMSServer" | |
| self.radar_layer = "1" # NEXRAD reflectivity | |
| self.default_legend = "us_radar_legend_cropped.png" # Will be provided by user | |
| self.bounds = (20.0, 50.0, -130.0, -65.0) # Continental US bounds | |
| else: | |
| raise ValueError(f"Unsupported radar type: {radar_type}. Use 'canadian' or 'american'") | |
| # Load legend data | |
| self.legend_path = legend_path or self.default_legend | |
| self.precipitation_scale = self._load_precomputed_legend_data() | |
| # Color tolerance for matching (RGB distance) | |
| self.color_tolerance = 40 | |
| # Initialize reference colors from legend | |
| self.reference_colors = {} | |
| if not self.precipitation_scale: | |
| # Fallback to manual colors if legend extraction fails | |
| self._extract_legend_colors() | |
| def _load_precomputed_legend_data(self) -> List[ColorRange]: | |
| """ | |
| Load pre-computed legend data from JSON file. | |
| Much faster than processing legend image at runtime. | |
| """ | |
| try: | |
| # Determine legend data file based on radar type | |
| if self.radar_type == "american" or self.radar_type == "us": | |
| legend_data_file = "us_radar_legend_data.json" | |
| else: | |
| legend_data_file = "radar_legend_data.json" | |
| with open(legend_data_file, 'r') as f: | |
| legend_data = json.load(f) | |
| color_ranges = [] | |
| for item in legend_data['colors']: | |
| color_ranges.append(ColorRange( | |
| item['min_value'], | |
| item['max_value'], | |
| tuple(item['rgb']), | |
| item['name'] | |
| )) | |
| print(f"Loaded {len(color_ranges)} pre-computed legend colors") | |
| return color_ranges | |
| except Exception as e: | |
| print(f"Error loading pre-computed legend data: {e}") | |
| if self.radar_type == "american" and "us_radar_legend_data.json" in str(e): | |
| print("US radar legend data not found. Please provide a cropped US legend and run create_us_legend_data.py") | |
| return [] | |
| else: | |
| print("Falling back to runtime legend extraction...") | |
| return self._load_or_extract_legend_colors() | |
| def _load_or_extract_legend_colors(self) -> List[ColorRange]: | |
| """ | |
| Load legend colors from cache or extract if not cached. | |
| """ | |
| import os | |
| import hashlib | |
| cache_file = "legend_cache.json" | |
| # Check if legend file exists | |
| if not os.path.exists(self.legend_path): | |
| print(f"Legend file not found: {self.legend_path}") | |
| return [] | |
| # Calculate legend file hash for cache validation | |
| with open(self.legend_path, 'rb') as f: | |
| legend_hash = hashlib.md5(f.read()).hexdigest() | |
| # Try to load from cache | |
| if os.path.exists(cache_file): | |
| try: | |
| with open(cache_file, 'r') as f: | |
| cache_data = json.load(f) | |
| if cache_data.get('legend_hash') == legend_hash: | |
| print(f"Loading cached legend colors ({len(cache_data['colors'])} entries)") | |
| color_ranges = [] | |
| for item in cache_data['colors']: | |
| color_ranges.append(ColorRange( | |
| item['min_value'], item['max_value'], | |
| tuple(item['rgb']), item['name'] | |
| )) | |
| return color_ranges | |
| else: | |
| print("Legend file changed, re-extracting colors...") | |
| except Exception as e: | |
| print(f"Error loading cache: {e}") | |
| # Extract and cache colors | |
| print("Extracting legend colors (first time or cache miss)...") | |
| color_ranges = self._extract_precise_legend_colors() | |
| # Save to cache | |
| try: | |
| cache_data = { | |
| 'legend_hash': legend_hash, | |
| 'colors': [] | |
| } | |
| for cr in color_ranges: | |
| cache_data['colors'].append({ | |
| 'min_value': cr.min_value, | |
| 'max_value': cr.max_value, | |
| 'rgb': list(cr.rgb), | |
| 'name': cr.name | |
| }) | |
| with open(cache_file, 'w') as f: | |
| json.dump(cache_data, f, indent=2) | |
| print(f"Cached {len(color_ranges)} legend colors") | |
| except Exception as e: | |
| print(f"Error saving cache: {e}") | |
| return color_ranges | |
| def _extract_precise_legend_colors(self) -> List[ColorRange]: | |
| """ | |
| Extract precise color mappings from the Canadian radar legend image. | |
| """ | |
| try: | |
| import os | |
| if not os.path.exists(self.legend_path): | |
| print(f"Legend file not found: {self.legend_path}, using fallback colors") | |
| return [] | |
| # Load legend image | |
| legend = cv2.imread(self.legend_path) | |
| if legend is None: | |
| print(f"Could not load legend: {self.legend_path}") | |
| return [] | |
| legend_rgb = cv2.cvtColor(legend, cv2.COLOR_BGR2RGB) | |
| height, width = legend_rgb.shape[:2] | |
| # Sample colors from center column of legend | |
| center_x = width // 2 | |
| color_ranges = [] | |
| # Sample every pixel from the legend for maximum precision | |
| print(f"Sampling colors from legend (height: {height})...") | |
| # Sample every pixel for maximum precision - no artificial limits | |
| for y in range(0, height, 1): # Every pixel for maximum sampling | |
| # Map pixel position to dBZ value (bottom = low, top = high) | |
| progress = (height - 1 - y) / (height - 1) # Invert: bottom = 0, top = 1 | |
| dbz_value = 0.01 + (499.99 * progress) # 0.01 to 500 dBZ - extended range | |
| # Sample color from center of legend | |
| rgb_color = tuple(int(c) for c in legend_rgb[y, center_x]) | |
| # Skip very dark or very light colors (background) | |
| color_sum = sum(rgb_color) | |
| if color_sum < 30 or color_sum > 750: | |
| continue | |
| # Categorize dBZ range for naming | |
| if dbz_value < 1.0: | |
| name = "Very Light" | |
| elif dbz_value < 2.0: | |
| name = "Light" | |
| elif dbz_value < 4.0: | |
| name = "Light-Moderate" | |
| elif dbz_value < 8.0: | |
| name = "Moderate" | |
| elif dbz_value < 12.0: | |
| name = "Moderate-Heavy" | |
| elif dbz_value < 24.0: | |
| name = "Heavy" | |
| elif dbz_value < 32.0: | |
| name = "Very Heavy" | |
| elif dbz_value < 50.0: | |
| name = "Extreme" | |
| elif dbz_value < 64.0: | |
| name = "Severe" | |
| elif dbz_value < 100.0: | |
| name = "Intense" | |
| elif dbz_value < 150.0: | |
| name = "Violent" | |
| else: | |
| name = "Exceptional" | |
| # Create tight range for precise matching - no upper limit | |
| range_size = 1.0 # Even smaller range for maximum precision | |
| min_dbz = max(0.01, dbz_value - range_size/2) | |
| max_dbz = dbz_value + range_size/2 # No artificial cap | |
| color_ranges.append(ColorRange(min_dbz, max_dbz, rgb_color, name)) | |
| print(f"Extracted {len(color_ranges)} precise color ranges from legend:") | |
| for i, cr in enumerate(color_ranges): | |
| print(f" {i+1}. {cr.name}: {cr.min_value}-{cr.max_value} dBZ -> RGB{cr.rgb}") | |
| return color_ranges | |
| except Exception as e: | |
| print(f"Error extracting legend colors: {e}") | |
| return [] | |
| def _extract_legend_colors(self): | |
| """Extract precise colors from the downloaded radar legend.""" | |
| try: | |
| legend_img = cv2.imread('radar_legend.png') | |
| if legend_img is None: | |
| print("Warning: Could not load radar_legend.png, using default colors") | |
| return | |
| # Convert BGR to RGB | |
| legend_rgb = cv2.cvtColor(legend_img, cv2.COLOR_BGR2RGB) | |
| # Sample colors from the legend bar (left side of image) | |
| height, width = legend_rgb.shape[:2] | |
| color_bar_width = min(50, width // 4) # Sample from left quarter | |
| # Extract colors at regular intervals | |
| num_samples = len(self.precipitation_scale) | |
| for i, scale_item in enumerate(self.precipitation_scale): | |
| y_pos = int((i / (num_samples - 1)) * (height - 20)) + 10 | |
| # Sample from multiple pixels and average | |
| color_samples = [] | |
| for x in range(5, color_bar_width, 5): | |
| if y_pos < height and x < width: | |
| color_samples.append(legend_rgb[y_pos, x]) | |
| if color_samples: | |
| avg_color = np.mean(color_samples, axis=0).astype(int) | |
| self.reference_colors[scale_item.name] = tuple(avg_color) | |
| # Update the RGB in our scale | |
| scale_item.rgb = tuple(avg_color) | |
| except Exception as e: | |
| print(f"Error extracting legend colors: {e}") | |
| def fetch_current_radar(self, bbox: Tuple[float, float, float, float] = (-150, 40, -50, 85), | |
| width: int = 1200, height: int = 800) -> Optional[np.ndarray]: | |
| """ | |
| Fetch the most recent Canadian radar image. | |
| Args: | |
| bbox: Bounding box (west, south, east, north) in EPSG:4326 | |
| width: Image width in pixels | |
| height: Image height in pixels | |
| Returns: | |
| RGB numpy array of the radar image | |
| """ | |
| params = { | |
| 'SERVICE': 'WMS', | |
| 'VERSION': '1.3.0', | |
| 'REQUEST': 'GetMap', | |
| 'BBOX': f'{bbox[0]},{bbox[1]},{bbox[2]},{bbox[3]}', | |
| 'CRS': 'EPSG:4326', | |
| 'WIDTH': width, | |
| 'HEIGHT': height, | |
| 'LAYERS': self.radar_layer, | |
| 'FORMAT': 'image/png', | |
| 'TRANSPARENT': 'true' | |
| } | |
| try: | |
| response = requests.get(self.wms_base_url, params=params, timeout=30) | |
| response.raise_for_status() | |
| # Convert to numpy array | |
| img_pil = Image.open(response.content) | |
| img_array = np.array(img_pil) | |
| # Handle RGBA by removing alpha channel | |
| if img_array.shape[2] == 4: | |
| img_array = img_array[:, :, :3] | |
| return img_array | |
| except Exception as e: | |
| print(f"Error fetching radar data: {e}") | |
| return None | |
| def color_distance(self, color1: Tuple[int, int, int], color2: Tuple[int, int, int]) -> float: | |
| """Calculate Euclidean distance between two RGB colors.""" | |
| # Convert to int to prevent overflow | |
| c1_int = [int(c) for c in color1] | |
| c2_int = [int(c) for c in color2] | |
| return np.sqrt(sum((c1 - c2) ** 2 for c1, c2 in zip(c1_int, c2_int))) | |
| def find_precipitation_value(self, rgb_color: Tuple[int, int, int]) -> Optional[ColorRange]: | |
| """ | |
| Find the precipitation intensity for a given RGB color. | |
| Args: | |
| rgb_color: RGB tuple (r, g, b) | |
| Returns: | |
| ColorRange object with precipitation data, or None if no match | |
| """ | |
| best_match = None | |
| min_distance = float('inf') | |
| for scale_item in self.precipitation_scale: | |
| distance = self.color_distance(rgb_color, scale_item.rgb) | |
| if distance < min_distance: | |
| min_distance = distance | |
| best_match = scale_item | |
| # Debug logging for first few unmatched pixels | |
| if hasattr(self, '_debug_count'): | |
| self._debug_count += 1 | |
| else: | |
| self._debug_count = 1 | |
| if self._debug_count <= 5 and min_distance > self.color_tolerance: | |
| print(f"Debug: Pixel {rgb_color} closest match: {best_match.name if best_match else 'None'} " | |
| f"(distance: {min_distance:.1f}, tolerance: {self.color_tolerance})") | |
| return best_match if min_distance <= self.color_tolerance else None | |
| def analyze_radar_pixels(self, radar_image: np.ndarray) -> Dict: | |
| """ | |
| Analyze every pixel in the radar image for precipitation intensity. | |
| Args: | |
| radar_image: RGB numpy array of radar data | |
| Returns: | |
| Dictionary with analysis results | |
| """ | |
| height, width = radar_image.shape[:2] | |
| precipitation_map = np.zeros((height, width), dtype=float) | |
| color_map = np.zeros((height, width, 3), dtype=int) | |
| # Analyze each pixel | |
| pixel_stats = {} | |
| for scale_item in self.precipitation_scale: | |
| pixel_stats[scale_item.name] = 0 | |
| print(f"Starting FAST pixel analysis of {width}x{height} image...") | |
| # OPTIMIZATION: Group identical colors first, analyze once per unique color | |
| print("Pass 1: Collecting unique colors (skipping white pixels)...") | |
| color_groups = {} # RGB -> list of (y,x) positions | |
| total_non_bg = 0 | |
| for y in range(height): | |
| for x in range(width): | |
| pixel_rgb = tuple(int(c) for c in radar_image[y, x]) | |
| # Skip background pixels (black, white, or very light gray) | |
| pixel_sum = sum(pixel_rgb) | |
| if (pixel_sum < 30 or # Very dark pixels (black background) | |
| pixel_sum > 750 or # Very bright pixels (white background) | |
| (pixel_rgb[0] > 240 and pixel_rgb[1] > 240 and pixel_rgb[2] > 240)): # Near-white | |
| continue | |
| total_non_bg += 1 | |
| if pixel_rgb not in color_groups: | |
| color_groups[pixel_rgb] = [] | |
| color_groups[pixel_rgb].append((y, x)) | |
| print(f"Found {len(color_groups)} unique colors in {total_non_bg:,} non-background pixels") | |
| # OPTIMIZATION: Analyze each unique color only once | |
| print("Pass 2: Analyzing unique colors...") | |
| analyzed_colors = {} # RGB -> (match, dbz_value) | |
| sample_pixels = list(color_groups.keys())[:20] # Sample for debugging | |
| for pixel_rgb in color_groups.keys(): | |
| match = self.find_precipitation_value(pixel_rgb) | |
| if match: | |
| dbz_value = (match.min_value + match.max_value) / 2 | |
| analyzed_colors[pixel_rgb] = (match, dbz_value) | |
| print(f"Matched {len(analyzed_colors)}/{len(color_groups)} unique colors to precipitation") | |
| # OPTIMIZATION: Apply results to all positions at once | |
| print("Pass 3: Applying results to all pixel positions...") | |
| for pixel_rgb, (match, dbz_value) in analyzed_colors.items(): | |
| positions = color_groups[pixel_rgb] | |
| pixel_stats[match.name] += len(positions) | |
| # Apply to precipitation and color maps efficiently | |
| for y, x in positions: | |
| precipitation_map[y, x] = dbz_value | |
| color_map[y, x] = match.rgb | |
| print(f"Pixel analysis complete!") | |
| print(f"Sample non-background pixels found: {sample_pixels[:10]}") | |
| print(f"Total non-background pixels examined: {len(sample_pixels)}") | |
| print(f"Precipitation pixels detected: {sum(pixel_stats.values()) if pixel_stats else 0}") | |
| # Show unique colors found | |
| unique_colors = list(set(sample_pixels))[:10] | |
| print(f"Unique non-background colors: {unique_colors}") | |
| total_pixels = height * width | |
| precipitation_pixels = sum(pixel_stats.values()) | |
| precipitation_percentage = (precipitation_pixels / total_pixels) * 100 if total_pixels > 0 else 0 | |
| return { | |
| 'precipitation_map': precipitation_map, | |
| 'color_map': color_map, | |
| 'pixel_statistics': pixel_stats, | |
| 'total_pixels': total_pixels, | |
| 'precipitation_pixels': precipitation_pixels, | |
| 'precipitation_percentage': precipitation_percentage, | |
| 'intensity_levels': pixel_stats | |
| } | |
| def find_color_regions(self, radar_image: np.ndarray, min_region_size: int = 1, fast_mode: bool = False) -> List[Dict]: | |
| """ | |
| Find connected regions of the same precipitation intensity. | |
| Args: | |
| radar_image: RGB numpy array of radar data | |
| min_region_size: Minimum number of pixels for a region | |
| Returns: | |
| List of region dictionaries with boundaries and values | |
| """ | |
| height, width = radar_image.shape[:2] | |
| visited = np.zeros((height, width), dtype=bool) | |
| regions = [] | |
| if fast_mode: | |
| print("Using HIGH-RESOLUTION region mode - separate regions per unique color") | |
| # Create individual regions for each unique color (highest resolution) | |
| from collections import defaultdict | |
| color_groups = defaultdict(list) | |
| for y in range(height): | |
| for x in range(width): | |
| pixel_rgb = tuple(int(c) for c in radar_image[y, x]) | |
| pixel_sum = sum(pixel_rgb) | |
| if not (pixel_sum < 30 or pixel_sum > 750 or | |
| (pixel_rgb[0] > 240 and pixel_rgb[1] > 240 and pixel_rgb[2] > 240)): | |
| match = self.find_precipitation_value(pixel_rgb) | |
| if match: | |
| # Group by exact RGB color for maximum resolution | |
| color_groups[pixel_rgb].append((y, x, match)) | |
| print(f"Found {len(color_groups)} unique precipitation colors") | |
| for color_rgb, pixel_data in color_groups.items(): | |
| if len(pixel_data) >= min_region_size: | |
| pixels = [(p[0], p[1]) for p in pixel_data] | |
| match = pixel_data[0][2] # Get precipitation match | |
| ys = [p[0] for p in pixels] | |
| xs = [p[1] for p in pixels] | |
| # Calculate precise dBZ value for this exact color | |
| dbz_value = (match.min_value + match.max_value) / 2 | |
| regions.append({ | |
| 'pixels': pixels, | |
| 'precipitation': match, | |
| 'exact_color': color_rgb, | |
| 'dbz_value': dbz_value, | |
| 'bbox': { | |
| 'min_y': min(ys), | |
| 'max_y': max(ys), | |
| 'min_x': min(xs), | |
| 'max_x': max(xs) | |
| }, | |
| 'pixel_count': len(pixels), | |
| 'center': (int(np.mean(ys)), int(np.mean(xs))) | |
| }) | |
| print(f"High-resolution mode: Created {len(regions)} color-specific regions") | |
| return regions | |
| print(f"Starting DETAILED region analysis on {width}x{height} image...") | |
| # OPTIMIZATION: Pre-identify precipitation pixels to avoid scanning empty areas | |
| precip_pixels = [] | |
| for y in range(height): | |
| for x in range(width): | |
| pixel_rgb = tuple(int(c) for c in radar_image[y, x]) | |
| pixel_sum = sum(pixel_rgb) | |
| if not (pixel_sum < 30 or pixel_sum > 750 or | |
| (pixel_rgb[0] > 240 and pixel_rgb[1] > 240 and pixel_rgb[2] > 240)): | |
| match = self.find_precipitation_value(pixel_rgb) | |
| if match: | |
| precip_pixels.append((y, x)) | |
| print(f"Found {len(precip_pixels)} precipitation pixels to analyze for regions") | |
| def flood_fill(start_y: int, start_x: int, target_color: Tuple[int, int, int]) -> List[Tuple[int, int]]: | |
| """Optimized flood fill algorithm.""" | |
| stack = [(start_y, start_x)] | |
| region_pixels = [] | |
| while stack: | |
| if len(stack) % 1000 == 0 and len(stack) > 1000: | |
| print(f" Flood fill processing {len(stack)} pixels in stack...") | |
| y, x = stack.pop() | |
| if (y < 0 or y >= height or x < 0 or x >= width or visited[y, x]): | |
| continue | |
| pixel_color = tuple(int(c) for c in radar_image[y, x]) | |
| if self.color_distance(pixel_color, target_color) > self.color_tolerance: | |
| continue | |
| visited[y, x] = True | |
| region_pixels.append((y, x)) | |
| # Add 4-connected neighbors | |
| for dy, dx in [(0, 1), (1, 0), (0, -1), (-1, 0)]: | |
| new_y, new_x = y + dy, x + dx | |
| if (0 <= new_y < height and 0 <= new_x < width and not visited[new_y, new_x]): | |
| stack.append((new_y, new_x)) | |
| return region_pixels | |
| # OPTIMIZATION: Only process known precipitation pixels | |
| print(f"Processing {len(precip_pixels)} precipitation pixels for regions...") | |
| regions_found = 0 | |
| for i, (y, x) in enumerate(precip_pixels): | |
| if i % 500 == 0: | |
| progress = (i / len(precip_pixels)) * 100 | |
| print(f" Region progress: {progress:.1f}% ({i}/{len(precip_pixels)} pixels) - {regions_found} regions found") | |
| if not visited[y, x]: | |
| pixel_rgb = tuple(int(c) for c in radar_image[y, x]) | |
| match = self.find_precipitation_value(pixel_rgb) | |
| if match: | |
| print(f" Starting flood fill from ({y},{x}) for {match.name}...") | |
| region_pixels = flood_fill(y, x, pixel_rgb) | |
| if len(region_pixels) >= min_region_size: | |
| # Calculate bounding box | |
| ys = [p[0] for p in region_pixels] | |
| xs = [p[1] for p in region_pixels] | |
| regions.append({ | |
| 'pixels': region_pixels, | |
| 'precipitation': match, | |
| 'bbox': { | |
| 'min_y': min(ys), | |
| 'max_y': max(ys), | |
| 'min_x': min(xs), | |
| 'max_x': max(xs) | |
| }, | |
| 'pixel_count': len(region_pixels), | |
| 'center': (int(np.mean(ys)), int(np.mean(xs))) | |
| }) | |
| regions_found += 1 | |
| print(f" Found region {regions_found} with {len(region_pixels)} pixels") | |
| else: | |
| print(f" Skipped small region with {len(region_pixels)} pixels (min: {min_region_size})") | |
| print(f"Region analysis complete! Found {len(regions)} regions") | |
| return regions | |
| def create_annotated_image(self, radar_image: np.ndarray, regions: List[Dict]) -> np.ndarray: | |
| """ | |
| Create an annotated image with precipitation values labeled on regions. | |
| Args: | |
| radar_image: Original radar image | |
| regions: List of detected regions | |
| Returns: | |
| Annotated image as numpy array | |
| """ | |
| # Convert to PIL for text drawing | |
| img_pil = Image.fromarray(radar_image) | |
| draw = ImageDraw.Draw(img_pil) | |
| # Try to load a font, fall back to default if not available | |
| try: | |
| font = ImageFont.truetype("arial.ttf", 12) | |
| except: | |
| font = ImageFont.load_default() | |
| # Draw labels on regions | |
| for region in regions: | |
| center_y, center_x = region['center'] | |
| precip = region['precipitation'] | |
| # Create label text | |
| avg_value = (precip.min_value + precip.max_value) / 2 | |
| label = f"{avg_value:.1f} mm/h" | |
| # Calculate text size | |
| bbox = draw.textbbox((0, 0), label, font=font) | |
| text_width = bbox[2] - bbox[0] | |
| text_height = bbox[3] - bbox[1] | |
| # Draw semi-transparent background for text | |
| bg_coords = [ | |
| center_x - text_width//2 - 2, | |
| center_y - text_height//2 - 2, | |
| center_x + text_width//2 + 2, | |
| center_y + text_height//2 + 2 | |
| ] | |
| # Draw white background with transparency | |
| draw.rectangle(bg_coords, fill=(255, 255, 255, 180)) | |
| # Draw text | |
| text_coords = (center_x - text_width//2, center_y - text_height//2) | |
| draw.text(text_coords, label, fill=(0, 0, 0), font=font) | |
| # Draw bounding box for larger regions | |
| if region['pixel_count'] > 500: | |
| bbox_coords = [ | |
| region['bbox']['min_x'], | |
| region['bbox']['min_y'], | |
| region['bbox']['max_x'], | |
| region['bbox']['max_y'] | |
| ] | |
| draw.rectangle(bbox_coords, outline=(255, 255, 255), width=1) | |
| return np.array(img_pil) | |
| def save_analysis_results(self, results: Dict, filename: str = "radar_analysis.json"): | |
| """Save analysis results to JSON file.""" | |
| # Convert numpy arrays to lists for JSON serialization | |
| serializable_results = {} | |
| for key, value in results.items(): | |
| if isinstance(value, np.ndarray): | |
| serializable_results[key] = value.tolist() | |
| else: | |
| serializable_results[key] = value | |
| with open(filename, 'w') as f: | |
| json.dump(serializable_results, f, indent=2) | |
| def analyze_radar(self, radar_image_path: str, legend_path: str) -> Dict: | |
| """ | |
| Complete radar analysis pipeline. | |
| Args: | |
| radar_image_path: Path to radar image file | |
| legend_path: Path to legend image file (for reference) | |
| Returns: | |
| Dictionary containing analysis results and output file path | |
| """ | |
| try: | |
| # Load radar image | |
| radar_image = cv2.imread(radar_image_path) | |
| if radar_image is None: | |
| raise ValueError(f"Could not load radar image: {radar_image_path}") | |
| # Convert BGR to RGB | |
| radar_image_rgb = cv2.cvtColor(radar_image, cv2.COLOR_BGR2RGB) | |
| # Perform pixel analysis | |
| analysis = self.analyze_radar_pixels(radar_image_rgb) | |
| # Find regions | |
| regions = self.find_color_regions(radar_image_rgb) | |
| # Create annotated image | |
| annotated_image = self.create_annotated_image(radar_image_rgb, regions) | |
| # Save annotated image | |
| output_filename = radar_image_path.replace('.png', '_analyzed.png') | |
| annotated_bgr = cv2.cvtColor(annotated_image, cv2.COLOR_RGB2BGR) | |
| cv2.imwrite(output_filename, annotated_bgr) | |
| # Create hover data for map integration with EXPANDED WMS bounds | |
| # Extended bounds to cover ALL North American radar data including Alaska and southeastern US | |
| wms_bounds = (20.0, 85.0, -170.0, -40.0) # lat_min, lat_max, lon_min, lon_max | |
| hover_data = self.create_hover_data(analysis['precipitation_map'], wms_bounds) | |
| # Prepare results | |
| results = { | |
| 'analysis': analysis, | |
| 'regions': regions, | |
| 'output_file': output_filename, | |
| 'input_file': radar_image_path, | |
| 'legend_file': legend_path, | |
| 'hover_data': hover_data | |
| } | |
| return results | |
| except Exception as e: | |
| raise Exception(f"Radar analysis failed: {str(e)}") | |
| def convert_colors_to_american_scale(self, radar_image: np.ndarray) -> np.ndarray: | |
| """ | |
| Convert Canadian radar colors to American radar color scale. | |
| Uses the same dBZ values but maps them to American colors. | |
| """ | |
| print("🔄 Converting Canadian radar colors to American color scale...") | |
| # Load American color scale | |
| try: | |
| with open("us_radar_legend_data.json", 'r') as f: | |
| us_legend = json.load(f) | |
| us_colors = [] | |
| for item in us_legend['colors']: | |
| us_colors.append({ | |
| 'min_dbz': item['min_value'], | |
| 'max_dbz': item['max_value'], | |
| 'rgb': tuple(item['rgb']), | |
| 'name': item['name'] | |
| }) | |
| except Exception as e: | |
| print(f"❌ Could not load US color scale: {e}") | |
| return radar_image # Return original if conversion fails | |
| height, width = radar_image.shape[:2] | |
| converted_image = radar_image.copy() | |
| print(f"🎨 Converting {width}x{height} image to American colors...") | |
| # For each pixel, find its dBZ value and map to American color | |
| conversion_count = 0 | |
| for y in range(height): | |
| for x in range(width): | |
| pixel_rgb = tuple(int(c) for c in radar_image[y, x]) | |
| # Find the Canadian dBZ value for this pixel | |
| canadian_match = self.find_precipitation_value(pixel_rgb) | |
| if canadian_match: | |
| # Get the dBZ value | |
| dbz_value = (canadian_match.min_value + canadian_match.max_value) / 2 | |
| # Find corresponding American color for this dBZ value | |
| american_color = None | |
| for us_color in us_colors: | |
| if us_color['min_dbz'] <= dbz_value <= us_color['max_dbz']: | |
| american_color = us_color['rgb'] | |
| break | |
| # If no exact match, find closest | |
| if not american_color: | |
| best_match = None | |
| min_distance = float('inf') | |
| for us_color in us_colors: | |
| center_dbz = (us_color['min_dbz'] + us_color['max_dbz']) / 2 | |
| distance = abs(dbz_value - center_dbz) | |
| if distance < min_distance: | |
| min_distance = distance | |
| best_match = us_color | |
| if best_match: | |
| american_color = best_match['rgb'] | |
| # Apply the American color | |
| if american_color: | |
| converted_image[y, x] = american_color | |
| conversion_count += 1 | |
| print(f"✅ Converted {conversion_count:,} pixels to American color scale") | |
| return converted_image | |
| def create_hover_data(self, dbz_map: np.ndarray, image_bounds: tuple = None) -> Dict: | |
| """ | |
| Create MAXIMUM RESOLUTION hover data for map integration. | |
| Include every precipitation pixel across the ENTIRE radar image coverage. | |
| No artificial caps or limits - full resolution coverage. | |
| """ | |
| height, width = dbz_map.shape | |
| hover_points = [] | |
| # Use provided image bounds or default Canada radar bounds | |
| if image_bounds: | |
| lat_min, lat_max, lon_min, lon_max = image_bounds | |
| print(f"Using provided image bounds: {lat_min}°N to {lat_max}°N, {lon_min}°W to {lon_max}°W") | |
| else: | |
| # Default Canada radar bounds - EXACT same as working version for proper alignment | |
| lat_min, lat_max = 42.0, 84.0 | |
| lon_min, lon_max = -142.0, -52.0 | |
| print(f"Using default bounds: {lat_min}°N to {lat_max}°N, {lon_min}°W to {lon_max}°W") | |
| print(f"Creating MAXIMUM RESOLUTION hover data from {width}x{height} dBZ map...") | |
| # First, find the actual data bounds within the image | |
| precip_rows, precip_cols = np.where(dbz_map > 0) | |
| if len(precip_rows) > 0: | |
| data_min_row, data_max_row = precip_rows.min(), precip_rows.max() | |
| data_min_col, data_max_col = precip_cols.min(), precip_cols.max() | |
| print(f"📊 Actual data bounds: rows {data_min_row}-{data_max_row}, cols {data_min_col}-{data_max_col}") | |
| print(f"📊 Data coverage: {len(precip_rows):,} precipitation pixels found") | |
| else: | |
| print("❌ No precipitation data found in image") | |
| return {'points': [], 'total_points': 0, 'resolution': 'none'} | |
| # SCAN ENTIRE IMAGE with maximum resolution - no bounding box limitations | |
| # Process EVERY SINGLE PIXEL to capture all unique color variations | |
| pixel_count = 0 | |
| unique_colors = set() | |
| print(f"🔍 Scanning ENTIRE {width}x{height} image for ALL precipitation pixels...") | |
| for y in range(height): | |
| if y % 50 == 0: # Progress logging | |
| progress = (y / height) * 100 | |
| print(f" Full image scan: {progress:.1f}% - {pixel_count} points, {len(unique_colors)} unique colors") | |
| for x in range(width): | |
| dbz_value = dbz_map[y, x] | |
| # Include ALL precipitation pixels across ENTIRE image | |
| if dbz_value > 0: # Capture any precipitation value | |
| # Convert pixel coordinates to lat/lon - cover ENTIRE image bounds | |
| lat = lat_max - (y / height) * (lat_max - lat_min) | |
| lon = lon_min + (x / width) * (lon_max - lon_min) | |
| # Track unique dBZ values for maximum detail | |
| unique_colors.add(round(dbz_value, 3)) | |
| hover_points.append({ | |
| 'lat': round(lat, 8), # Ultra-high precision coordinates | |
| 'lon': round(lon, 8), # Ultra-high precision coordinates | |
| 'dbz': round(dbz_value, 3), # Ultra-high precision dBZ | |
| 'intensity': self.categorize_dbz_simple(dbz_value), | |
| 'pixel_x': x, | |
| 'pixel_y': y | |
| }) | |
| pixel_count += 1 | |
| print(f"📊 Full image scan complete:") | |
| print(f" - Total pixels scanned: {width * height:,}") | |
| print(f" - Precipitation pixels found: {pixel_count:,}") | |
| print(f" - Unique dBZ values: {len(unique_colors)}") | |
| print(f" - Coverage area: Full {width}x{height} image") | |
| print(f"✅ Created {len(hover_points)} MAXIMUM RESOLUTION hover points") | |
| print(f"✅ Coverage: ENTIRE radar image ({width}x{height} pixels)") | |
| print(f"✅ Resolution: EVERY precipitation pixel included") | |
| print(f"✅ No caps or limits applied") | |
| return { | |
| 'points': hover_points, | |
| 'total_points': len(hover_points), | |
| 'resolution': 'maximum', # Every single pixel included | |
| 'coverage': 'full_image', # Complete radar image coverage | |
| 'precision': 'ultra_high' # 8-decimal coordinate precision | |
| } | |
| def categorize_dbz_simple(self, dbz_value: float) -> str: | |
| """Extended dBZ categorization for hover display - no caps, full range.""" | |
| if dbz_value < 1.0: | |
| return "Very Light" | |
| elif dbz_value < 4.0: | |
| return "Light" | |
| elif dbz_value < 8.0: | |
| return "Light-Moderate" | |
| elif dbz_value < 16.0: | |
| return "Moderate" | |
| elif dbz_value < 32.0: | |
| return "Heavy" | |
| elif dbz_value < 48.0: | |
| return "Very Heavy" | |
| elif dbz_value < 64.0: | |
| return "Intense" | |
| elif dbz_value < 80.0: | |
| return "Severe" | |
| elif dbz_value < 100.0: | |
| return "Extreme" | |
| elif dbz_value < 150.0: | |
| return "Violent" | |
| else: | |
| return f"Exceptional ({dbz_value:.1f} dBZ)" | |
| # Keep backward compatibility | |
| class CanadianRadarAnalyzer(RadarAnalyzer): | |
| """Backward compatibility class for Canadian radar.""" | |
| def __init__(self, legend_path: str = "radar_legendwellcropped.png"): | |
| super().__init__(radar_type="canadian", legend_path=legend_path) | |
| if __name__ == "__main__": | |
| # Test the analyzer | |
| analyzer = CanadianRadarAnalyzer() | |
| # Load existing radar image for testing | |
| test_image = cv2.imread('test_radar_proper.png') | |
| if test_image is not None: | |
| test_image_rgb = cv2.cvtColor(test_image, cv2.COLOR_BGR2RGB) | |
| print("Analyzing radar image...") | |
| analysis = analyzer.analyze_radar_pixels(test_image_rgb) | |
| print(f"Found precipitation in {analysis['precipitation_pixels']} pixels") | |
| print("Finding regions...") | |
| regions = analyzer.find_color_regions(test_image_rgb) | |
| print(f"Found {len(regions)} precipitation regions") | |
| print("Creating annotated image...") | |
| annotated = analyzer.create_annotated_image(test_image_rgb, regions) | |
| # Save results | |
| cv2.imwrite('annotated_radar.png', cv2.cvtColor(annotated, cv2.COLOR_RGB2BGR)) | |
| analyzer.save_analysis_results(analysis) | |
| print("Analysis complete! Check annotated_radar.png and radar_analysis.json") |