Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import cv2 | |
| from PIL import Image | |
| from typing import Dict, List, Tuple, Optional | |
| import json | |
| class PreciseRadarColorAnalyzer: | |
| """ | |
| Extracts precise color mappings from the Canadian radar legend | |
| and performs accurate dBZ analysis on radar images. | |
| """ | |
| def __init__(self, legend_path: str = "canadaradarlegend_point1_to_200dbz.png"): | |
| self.legend_path = legend_path | |
| self.color_map = self.extract_precise_colors() | |
| def extract_precise_colors(self) -> List[Tuple[float, Tuple[int, int, int]]]: | |
| """ | |
| Extract precise color-to-dBZ mappings from the legend image. | |
| Returns list of (dBZ_value, RGB_color) tuples. | |
| """ | |
| # Load legend image | |
| legend = cv2.imread(self.legend_path) | |
| if legend is None: | |
| raise ValueError(f"Could not load legend: {self.legend_path}") | |
| legend_rgb = cv2.cvtColor(legend, cv2.COLOR_BGR2RGB) | |
| height, width = legend_rgb.shape[:2] | |
| # Sample colors from the legend (assuming vertical gradient) | |
| color_samples = [] | |
| # dBZ values from 0.1 to 200 (logarithmic scale typical for radar) | |
| dbz_values = [ | |
| 0.1, 0.2, 0.5, 1.0, 2.0, 4.0, 8.0, 12.0, 16.0, 24.0, | |
| 32.0, 50.0, 64.0, 100.0, 125.0, 150.0, 175.0, 200.0 | |
| ] | |
| # Sample colors from center column of legend | |
| center_x = width // 2 | |
| for i, dbz in enumerate(dbz_values): | |
| # Map dBZ value to position in legend (top to bottom) | |
| # Assuming legend goes from high dBZ (top) to low dBZ (bottom) | |
| progress = i / (len(dbz_values) - 1) | |
| y_pos = int(progress * (height - 1)) | |
| # Sample color from center of that row | |
| rgb_color = tuple(legend_rgb[y_pos, center_x]) | |
| color_samples.append((dbz, rgb_color)) | |
| # Also sample every 5th pixel for more granular color mapping | |
| detailed_samples = [] | |
| for y in range(0, height, 5): | |
| # Map pixel position back to approximate dBZ value | |
| progress = y / (height - 1) | |
| # Reverse mapping: top = high dBZ, bottom = low dBZ | |
| dbz_approx = 200.0 - (progress * 199.9) # 200 to 0.1 | |
| rgb_color = tuple(legend_rgb[y, center_x]) | |
| detailed_samples.append((dbz_approx, rgb_color)) | |
| # Combine and sort by dBZ value | |
| all_samples = color_samples + detailed_samples | |
| all_samples.sort(key=lambda x: x[0]) | |
| return all_samples | |
| def find_closest_dbz(self, pixel_rgb: Tuple[int, int, int]) -> Optional[float]: | |
| """ | |
| Find the closest dBZ value for a given RGB pixel. | |
| """ | |
| if not self.color_map: | |
| return None | |
| min_distance = float('inf') | |
| closest_dbz = None | |
| for dbz, color in self.color_map: | |
| # Calculate Euclidean distance in RGB space | |
| distance = np.sqrt(sum((p - c) ** 2 for p, c in zip(pixel_rgb, color))) | |
| if distance < min_distance: | |
| min_distance = distance | |
| closest_dbz = dbz | |
| # Only return match if reasonably close (within color tolerance) | |
| return closest_dbz if min_distance < 25 else None | |
| def categorize_dbz(self, dbz_value: float) -> str: | |
| """Categorize dBZ value into intensity levels.""" | |
| if dbz_value < 1.0: | |
| return "Very Light (0.1-1.0 dBZ)" | |
| elif dbz_value < 4.0: | |
| return "Light (1.0-4.0 dBZ)" | |
| elif dbz_value < 12.0: | |
| return "Light-Moderate (4.0-12.0 dBZ)" | |
| elif dbz_value < 24.0: | |
| return "Moderate (12.0-24.0 dBZ)" | |
| elif dbz_value < 32.0: | |
| return "Moderate-Heavy (24.0-32.0 dBZ)" | |
| elif dbz_value < 50.0: | |
| return "Heavy (32.0-50.0 dBZ)" | |
| elif dbz_value < 64.0: | |
| return "Very Heavy (50.0-64.0 dBZ)" | |
| elif dbz_value < 100.0: | |
| return "Extreme (64.0-100.0 dBZ)" | |
| else: | |
| return "Severe (100.0+ dBZ)" | |
| def analyze_radar_image(self, radar_path: str) -> Dict: | |
| """ | |
| Perform precise dBZ analysis on radar image. | |
| """ | |
| # Load radar image | |
| radar = cv2.imread(radar_path) | |
| if radar is None: | |
| raise ValueError(f"Could not load radar: {radar_path}") | |
| radar_rgb = cv2.cvtColor(radar, cv2.COLOR_BGR2RGB) | |
| height, width = radar_rgb.shape[:2] | |
| # Initialize analysis data | |
| dbz_map = np.zeros((height, width), dtype=float) | |
| pixel_stats = {} | |
| total_precipitation_pixels = 0 | |
| print(f"Analyzing {width}x{height} radar image...") | |
| # Analyze each pixel | |
| for y in range(height): | |
| if y % 50 == 0: # Progress indicator | |
| print(f"Processing row {y}/{height}") | |
| for x in range(width): | |
| pixel_rgb = tuple(int(c) for c in radar_rgb[y, x]) | |
| # Skip very dark pixels (background) | |
| if sum(pixel_rgb) < 30: | |
| continue | |
| # Find closest dBZ value | |
| dbz_value = self.find_closest_dbz(pixel_rgb) | |
| if dbz_value is not None: | |
| dbz_map[y, x] = dbz_value | |
| total_precipitation_pixels += 1 | |
| # Categorize for statistics | |
| category = self.categorize_dbz(dbz_value) | |
| pixel_stats[category] = pixel_stats.get(category, 0) + 1 | |
| total_pixels = height * width | |
| coverage_percent = (total_precipitation_pixels / total_pixels) * 100 | |
| print(f"Analysis complete! Found precipitation in {total_precipitation_pixels:,} pixels") | |
| return { | |
| 'dbz_map': dbz_map, | |
| 'pixel_statistics': pixel_stats, | |
| 'total_pixels': total_pixels, | |
| 'precipitation_pixels': total_precipitation_pixels, | |
| 'precipitation_percentage': coverage_percent, | |
| 'intensity_levels': pixel_stats, | |
| 'color_mapping_samples': len(self.color_map) | |
| } | |
| def find_precipitation_regions(self, radar_rgb: np.ndarray, dbz_map: np.ndarray, min_region_size: int = 50) -> List[Dict]: | |
| """ | |
| Find connected regions of similar precipitation intensity. | |
| """ | |
| height, width = radar_rgb.shape[:2] | |
| visited = np.zeros((height, width), dtype=bool) | |
| regions = [] | |
| def flood_fill(start_y: int, start_x: int, target_dbz: float, tolerance: float = 5.0) -> List[Tuple[int, int]]: | |
| """Flood fill to find connected pixels with similar dBZ values.""" | |
| stack = [(start_y, start_x)] | |
| region_pixels = [] | |
| while stack: | |
| y, x = stack.pop() | |
| if (y < 0 or y >= height or x < 0 or x >= width or | |
| visited[y, x] or dbz_map[y, x] == 0): | |
| continue | |
| current_dbz = dbz_map[y, x] | |
| if abs(current_dbz - target_dbz) > tolerance: | |
| continue | |
| visited[y, x] = True | |
| region_pixels.append((y, x)) | |
| # Add 4-connected neighbors | |
| for dy, dx in [(-1,0), (1,0), (0,-1), (0,1)]: | |
| stack.append((y+dy, x+dx)) | |
| return region_pixels | |
| print("Finding precipitation regions...") | |
| # Find regions | |
| for y in range(height): | |
| for x in range(width): | |
| if not visited[y, x] and dbz_map[y, x] > 0: | |
| target_dbz = dbz_map[y, x] | |
| region_pixels = flood_fill(y, x, target_dbz) | |
| if len(region_pixels) >= min_region_size: | |
| # Calculate region statistics | |
| dbz_values = [dbz_map[py, px] for py, px in region_pixels] | |
| avg_dbz = np.mean(dbz_values) | |
| # Calculate bounding box | |
| ys = [py for py, px in region_pixels] | |
| xs = [px for py, px in region_pixels] | |
| bbox = (min(xs), min(ys), max(xs), max(ys)) | |
| regions.append({ | |
| 'pixels': len(region_pixels), | |
| 'avg_dbz': avg_dbz, | |
| 'category': self.categorize_dbz(avg_dbz), | |
| 'bbox': bbox, | |
| 'center': (np.mean(xs), np.mean(ys)) | |
| }) | |
| print(f"Found {len(regions)} precipitation regions") | |
| return regions | |
| def create_annotated_image(self, radar_path: str, analysis: Dict, regions: List[Dict]) -> str: | |
| """ | |
| Create annotated radar image with dBZ values labeled. | |
| """ | |
| # Load original image | |
| radar = cv2.imread(radar_path) | |
| radar_rgb = cv2.cvtColor(radar, cv2.COLOR_BGR2RGB) | |
| # Convert to PIL for text drawing | |
| img_pil = Image.fromarray(radar_rgb) | |
| from PIL import ImageDraw, ImageFont | |
| draw = ImageDraw.Draw(img_pil) | |
| # Try to load a font | |
| try: | |
| font = ImageFont.truetype("/System/Library/Fonts/Arial.ttf", 12) | |
| except: | |
| font = ImageFont.load_default() | |
| # Annotate regions | |
| for i, region in enumerate(regions): | |
| if region['pixels'] > 100: # Only annotate significant regions | |
| x, y = region['center'] | |
| text = f"{region['avg_dbz']:.1f} dBZ" | |
| # Draw text with background | |
| text_bbox = draw.textbbox((int(x), int(y)), text, font=font) | |
| draw.rectangle(text_bbox, fill=(0, 0, 0, 128)) | |
| draw.text((int(x), int(y)), text, fill=(255, 255, 255), font=font) | |
| # Draw bounding box | |
| bbox = region['bbox'] | |
| draw.rectangle(bbox, outline=(255, 255, 0), width=2) | |
| # Save annotated image | |
| output_path = radar_path.replace('.png', '_precise_analysis.png') | |
| annotated_array = np.array(img_pil) | |
| annotated_bgr = cv2.cvtColor(annotated_array, cv2.COLOR_RGB2BGR) | |
| cv2.imwrite(output_path, annotated_bgr) | |
| return output_path | |
| def test_precise_analyzer(): | |
| """Test the precise color analyzer.""" | |
| analyzer = PreciseRadarColorAnalyzer() | |
| print(f"Extracted {len(analyzer.color_map)} color samples from legend") | |
| # Test on current radar image | |
| radar_files = ["test_radar_proper.png", "current_radar_fetch.png"] | |
| for radar_file in radar_files: | |
| try: | |
| print(f"\nAnalyzing {radar_file}...") | |
| analysis = analyzer.analyze_radar_image(radar_file) | |
| print(f"Results:") | |
| print(f"- Total pixels: {analysis['total_pixels']:,}") | |
| print(f"- Precipitation pixels: {analysis['precipitation_pixels']:,}") | |
| print(f"- Coverage: {analysis['precipitation_percentage']:.2f}%") | |
| print(f"- Categories found:") | |
| for category, count in analysis['pixel_statistics'].items(): | |
| print(f" * {category}: {count:,} pixels") | |
| # Find regions | |
| radar = cv2.imread(radar_file) | |
| radar_rgb = cv2.cvtColor(radar, cv2.COLOR_BGR2RGB) | |
| regions = analyzer.find_precipitation_regions(radar_rgb, analysis['dbz_map']) | |
| # Create annotated image | |
| output_file = analyzer.create_annotated_image(radar_file, analysis, regions) | |
| print(f"- Annotated image saved: {output_file}") | |
| break # Success, use this file | |
| except Exception as e: | |
| print(f"Error with {radar_file}: {e}") | |
| continue | |
| if __name__ == "__main__": | |
| test_precise_analyzer() |