radar_wizard / radar_analyzer.py
nakas's picture
Implement color conversion system: Canadian radar with American color toggle
00ee5cd
import numpy as np
import cv2
from PIL import Image, ImageDraw, ImageFont
import requests
import colorsys
from typing import Dict, List, Tuple, Optional
import json
from dataclasses import dataclass
@dataclass
class ColorRange:
"""Represents a precipitation intensity range with its color."""
min_value: float
max_value: float
rgb: Tuple[int, int, int]
name: str
class RadarAnalyzer:
"""
Dual radar analyzer for Canadian and US weather radar data.
Analyzes dBZ reflectivity values using color mapping from radar legends.
"""
def __init__(self, radar_type: str = "canadian", legend_path: str = None):
self.radar_type = radar_type.lower()
if self.radar_type == "canadian":
# ECCC Radar WMS endpoints
self.wms_base_url = "https://geo.weather.gc.ca/geomet"
self.radar_layer = "RADAR_1KM_RRAI" # 1km Rain Radar
self.default_legend = "radar_legendwellcropped.png"
self.bounds = (20.0, 85.0, -170.0, -40.0) # lat_min, lat_max, lon_min, lon_max
elif self.radar_type == "american" or self.radar_type == "us":
# NOAA/NWS Radar endpoints (will be added)
self.wms_base_url = "https://nowcoast.noaa.gov/arcgis/services/nowcoast/radar_meteo_imagery_nexrad_time/MapServer/WMSServer"
self.radar_layer = "1" # NEXRAD reflectivity
self.default_legend = "us_radar_legend_cropped.png" # Will be provided by user
self.bounds = (20.0, 50.0, -130.0, -65.0) # Continental US bounds
else:
raise ValueError(f"Unsupported radar type: {radar_type}. Use 'canadian' or 'american'")
# Load legend data
self.legend_path = legend_path or self.default_legend
self.precipitation_scale = self._load_precomputed_legend_data()
# Color tolerance for matching (RGB distance)
self.color_tolerance = 40
# Initialize reference colors from legend
self.reference_colors = {}
if not self.precipitation_scale:
# Fallback to manual colors if legend extraction fails
self._extract_legend_colors()
def _load_precomputed_legend_data(self) -> List[ColorRange]:
"""
Load pre-computed legend data from JSON file.
Much faster than processing legend image at runtime.
"""
try:
# Determine legend data file based on radar type
if self.radar_type == "american" or self.radar_type == "us":
legend_data_file = "us_radar_legend_data.json"
else:
legend_data_file = "radar_legend_data.json"
with open(legend_data_file, 'r') as f:
legend_data = json.load(f)
color_ranges = []
for item in legend_data['colors']:
color_ranges.append(ColorRange(
item['min_value'],
item['max_value'],
tuple(item['rgb']),
item['name']
))
print(f"Loaded {len(color_ranges)} pre-computed legend colors")
return color_ranges
except Exception as e:
print(f"Error loading pre-computed legend data: {e}")
if self.radar_type == "american" and "us_radar_legend_data.json" in str(e):
print("US radar legend data not found. Please provide a cropped US legend and run create_us_legend_data.py")
return []
else:
print("Falling back to runtime legend extraction...")
return self._load_or_extract_legend_colors()
def _load_or_extract_legend_colors(self) -> List[ColorRange]:
"""
Load legend colors from cache or extract if not cached.
"""
import os
import hashlib
cache_file = "legend_cache.json"
# Check if legend file exists
if not os.path.exists(self.legend_path):
print(f"Legend file not found: {self.legend_path}")
return []
# Calculate legend file hash for cache validation
with open(self.legend_path, 'rb') as f:
legend_hash = hashlib.md5(f.read()).hexdigest()
# Try to load from cache
if os.path.exists(cache_file):
try:
with open(cache_file, 'r') as f:
cache_data = json.load(f)
if cache_data.get('legend_hash') == legend_hash:
print(f"Loading cached legend colors ({len(cache_data['colors'])} entries)")
color_ranges = []
for item in cache_data['colors']:
color_ranges.append(ColorRange(
item['min_value'], item['max_value'],
tuple(item['rgb']), item['name']
))
return color_ranges
else:
print("Legend file changed, re-extracting colors...")
except Exception as e:
print(f"Error loading cache: {e}")
# Extract and cache colors
print("Extracting legend colors (first time or cache miss)...")
color_ranges = self._extract_precise_legend_colors()
# Save to cache
try:
cache_data = {
'legend_hash': legend_hash,
'colors': []
}
for cr in color_ranges:
cache_data['colors'].append({
'min_value': cr.min_value,
'max_value': cr.max_value,
'rgb': list(cr.rgb),
'name': cr.name
})
with open(cache_file, 'w') as f:
json.dump(cache_data, f, indent=2)
print(f"Cached {len(color_ranges)} legend colors")
except Exception as e:
print(f"Error saving cache: {e}")
return color_ranges
def _extract_precise_legend_colors(self) -> List[ColorRange]:
"""
Extract precise color mappings from the Canadian radar legend image.
"""
try:
import os
if not os.path.exists(self.legend_path):
print(f"Legend file not found: {self.legend_path}, using fallback colors")
return []
# Load legend image
legend = cv2.imread(self.legend_path)
if legend is None:
print(f"Could not load legend: {self.legend_path}")
return []
legend_rgb = cv2.cvtColor(legend, cv2.COLOR_BGR2RGB)
height, width = legend_rgb.shape[:2]
# Sample colors from center column of legend
center_x = width // 2
color_ranges = []
# Sample every pixel from the legend for maximum precision
print(f"Sampling colors from legend (height: {height})...")
# Sample every pixel for maximum precision - no artificial limits
for y in range(0, height, 1): # Every pixel for maximum sampling
# Map pixel position to dBZ value (bottom = low, top = high)
progress = (height - 1 - y) / (height - 1) # Invert: bottom = 0, top = 1
dbz_value = 0.01 + (499.99 * progress) # 0.01 to 500 dBZ - extended range
# Sample color from center of legend
rgb_color = tuple(int(c) for c in legend_rgb[y, center_x])
# Skip very dark or very light colors (background)
color_sum = sum(rgb_color)
if color_sum < 30 or color_sum > 750:
continue
# Categorize dBZ range for naming
if dbz_value < 1.0:
name = "Very Light"
elif dbz_value < 2.0:
name = "Light"
elif dbz_value < 4.0:
name = "Light-Moderate"
elif dbz_value < 8.0:
name = "Moderate"
elif dbz_value < 12.0:
name = "Moderate-Heavy"
elif dbz_value < 24.0:
name = "Heavy"
elif dbz_value < 32.0:
name = "Very Heavy"
elif dbz_value < 50.0:
name = "Extreme"
elif dbz_value < 64.0:
name = "Severe"
elif dbz_value < 100.0:
name = "Intense"
elif dbz_value < 150.0:
name = "Violent"
else:
name = "Exceptional"
# Create tight range for precise matching - no upper limit
range_size = 1.0 # Even smaller range for maximum precision
min_dbz = max(0.01, dbz_value - range_size/2)
max_dbz = dbz_value + range_size/2 # No artificial cap
color_ranges.append(ColorRange(min_dbz, max_dbz, rgb_color, name))
print(f"Extracted {len(color_ranges)} precise color ranges from legend:")
for i, cr in enumerate(color_ranges):
print(f" {i+1}. {cr.name}: {cr.min_value}-{cr.max_value} dBZ -> RGB{cr.rgb}")
return color_ranges
except Exception as e:
print(f"Error extracting legend colors: {e}")
return []
def _extract_legend_colors(self):
"""Extract precise colors from the downloaded radar legend."""
try:
legend_img = cv2.imread('radar_legend.png')
if legend_img is None:
print("Warning: Could not load radar_legend.png, using default colors")
return
# Convert BGR to RGB
legend_rgb = cv2.cvtColor(legend_img, cv2.COLOR_BGR2RGB)
# Sample colors from the legend bar (left side of image)
height, width = legend_rgb.shape[:2]
color_bar_width = min(50, width // 4) # Sample from left quarter
# Extract colors at regular intervals
num_samples = len(self.precipitation_scale)
for i, scale_item in enumerate(self.precipitation_scale):
y_pos = int((i / (num_samples - 1)) * (height - 20)) + 10
# Sample from multiple pixels and average
color_samples = []
for x in range(5, color_bar_width, 5):
if y_pos < height and x < width:
color_samples.append(legend_rgb[y_pos, x])
if color_samples:
avg_color = np.mean(color_samples, axis=0).astype(int)
self.reference_colors[scale_item.name] = tuple(avg_color)
# Update the RGB in our scale
scale_item.rgb = tuple(avg_color)
except Exception as e:
print(f"Error extracting legend colors: {e}")
def fetch_current_radar(self, bbox: Tuple[float, float, float, float] = (-150, 40, -50, 85),
width: int = 1200, height: int = 800) -> Optional[np.ndarray]:
"""
Fetch the most recent Canadian radar image.
Args:
bbox: Bounding box (west, south, east, north) in EPSG:4326
width: Image width in pixels
height: Image height in pixels
Returns:
RGB numpy array of the radar image
"""
params = {
'SERVICE': 'WMS',
'VERSION': '1.3.0',
'REQUEST': 'GetMap',
'BBOX': f'{bbox[0]},{bbox[1]},{bbox[2]},{bbox[3]}',
'CRS': 'EPSG:4326',
'WIDTH': width,
'HEIGHT': height,
'LAYERS': self.radar_layer,
'FORMAT': 'image/png',
'TRANSPARENT': 'true'
}
try:
response = requests.get(self.wms_base_url, params=params, timeout=30)
response.raise_for_status()
# Convert to numpy array
img_pil = Image.open(response.content)
img_array = np.array(img_pil)
# Handle RGBA by removing alpha channel
if img_array.shape[2] == 4:
img_array = img_array[:, :, :3]
return img_array
except Exception as e:
print(f"Error fetching radar data: {e}")
return None
def color_distance(self, color1: Tuple[int, int, int], color2: Tuple[int, int, int]) -> float:
"""Calculate Euclidean distance between two RGB colors."""
# Convert to int to prevent overflow
c1_int = [int(c) for c in color1]
c2_int = [int(c) for c in color2]
return np.sqrt(sum((c1 - c2) ** 2 for c1, c2 in zip(c1_int, c2_int)))
def find_precipitation_value(self, rgb_color: Tuple[int, int, int]) -> Optional[ColorRange]:
"""
Find the precipitation intensity for a given RGB color.
Args:
rgb_color: RGB tuple (r, g, b)
Returns:
ColorRange object with precipitation data, or None if no match
"""
best_match = None
min_distance = float('inf')
for scale_item in self.precipitation_scale:
distance = self.color_distance(rgb_color, scale_item.rgb)
if distance < min_distance:
min_distance = distance
best_match = scale_item
# Debug logging for first few unmatched pixels
if hasattr(self, '_debug_count'):
self._debug_count += 1
else:
self._debug_count = 1
if self._debug_count <= 5 and min_distance > self.color_tolerance:
print(f"Debug: Pixel {rgb_color} closest match: {best_match.name if best_match else 'None'} "
f"(distance: {min_distance:.1f}, tolerance: {self.color_tolerance})")
return best_match if min_distance <= self.color_tolerance else None
def analyze_radar_pixels(self, radar_image: np.ndarray) -> Dict:
"""
Analyze every pixel in the radar image for precipitation intensity.
Args:
radar_image: RGB numpy array of radar data
Returns:
Dictionary with analysis results
"""
height, width = radar_image.shape[:2]
precipitation_map = np.zeros((height, width), dtype=float)
color_map = np.zeros((height, width, 3), dtype=int)
# Analyze each pixel
pixel_stats = {}
for scale_item in self.precipitation_scale:
pixel_stats[scale_item.name] = 0
print(f"Starting FAST pixel analysis of {width}x{height} image...")
# OPTIMIZATION: Group identical colors first, analyze once per unique color
print("Pass 1: Collecting unique colors (skipping white pixels)...")
color_groups = {} # RGB -> list of (y,x) positions
total_non_bg = 0
for y in range(height):
for x in range(width):
pixel_rgb = tuple(int(c) for c in radar_image[y, x])
# Skip background pixels (black, white, or very light gray)
pixel_sum = sum(pixel_rgb)
if (pixel_sum < 30 or # Very dark pixels (black background)
pixel_sum > 750 or # Very bright pixels (white background)
(pixel_rgb[0] > 240 and pixel_rgb[1] > 240 and pixel_rgb[2] > 240)): # Near-white
continue
total_non_bg += 1
if pixel_rgb not in color_groups:
color_groups[pixel_rgb] = []
color_groups[pixel_rgb].append((y, x))
print(f"Found {len(color_groups)} unique colors in {total_non_bg:,} non-background pixels")
# OPTIMIZATION: Analyze each unique color only once
print("Pass 2: Analyzing unique colors...")
analyzed_colors = {} # RGB -> (match, dbz_value)
sample_pixels = list(color_groups.keys())[:20] # Sample for debugging
for pixel_rgb in color_groups.keys():
match = self.find_precipitation_value(pixel_rgb)
if match:
dbz_value = (match.min_value + match.max_value) / 2
analyzed_colors[pixel_rgb] = (match, dbz_value)
print(f"Matched {len(analyzed_colors)}/{len(color_groups)} unique colors to precipitation")
# OPTIMIZATION: Apply results to all positions at once
print("Pass 3: Applying results to all pixel positions...")
for pixel_rgb, (match, dbz_value) in analyzed_colors.items():
positions = color_groups[pixel_rgb]
pixel_stats[match.name] += len(positions)
# Apply to precipitation and color maps efficiently
for y, x in positions:
precipitation_map[y, x] = dbz_value
color_map[y, x] = match.rgb
print(f"Pixel analysis complete!")
print(f"Sample non-background pixels found: {sample_pixels[:10]}")
print(f"Total non-background pixels examined: {len(sample_pixels)}")
print(f"Precipitation pixels detected: {sum(pixel_stats.values()) if pixel_stats else 0}")
# Show unique colors found
unique_colors = list(set(sample_pixels))[:10]
print(f"Unique non-background colors: {unique_colors}")
total_pixels = height * width
precipitation_pixels = sum(pixel_stats.values())
precipitation_percentage = (precipitation_pixels / total_pixels) * 100 if total_pixels > 0 else 0
return {
'precipitation_map': precipitation_map,
'color_map': color_map,
'pixel_statistics': pixel_stats,
'total_pixels': total_pixels,
'precipitation_pixels': precipitation_pixels,
'precipitation_percentage': precipitation_percentage,
'intensity_levels': pixel_stats
}
def find_color_regions(self, radar_image: np.ndarray, min_region_size: int = 1, fast_mode: bool = False) -> List[Dict]:
"""
Find connected regions of the same precipitation intensity.
Args:
radar_image: RGB numpy array of radar data
min_region_size: Minimum number of pixels for a region
Returns:
List of region dictionaries with boundaries and values
"""
height, width = radar_image.shape[:2]
visited = np.zeros((height, width), dtype=bool)
regions = []
if fast_mode:
print("Using HIGH-RESOLUTION region mode - separate regions per unique color")
# Create individual regions for each unique color (highest resolution)
from collections import defaultdict
color_groups = defaultdict(list)
for y in range(height):
for x in range(width):
pixel_rgb = tuple(int(c) for c in radar_image[y, x])
pixel_sum = sum(pixel_rgb)
if not (pixel_sum < 30 or pixel_sum > 750 or
(pixel_rgb[0] > 240 and pixel_rgb[1] > 240 and pixel_rgb[2] > 240)):
match = self.find_precipitation_value(pixel_rgb)
if match:
# Group by exact RGB color for maximum resolution
color_groups[pixel_rgb].append((y, x, match))
print(f"Found {len(color_groups)} unique precipitation colors")
for color_rgb, pixel_data in color_groups.items():
if len(pixel_data) >= min_region_size:
pixels = [(p[0], p[1]) for p in pixel_data]
match = pixel_data[0][2] # Get precipitation match
ys = [p[0] for p in pixels]
xs = [p[1] for p in pixels]
# Calculate precise dBZ value for this exact color
dbz_value = (match.min_value + match.max_value) / 2
regions.append({
'pixels': pixels,
'precipitation': match,
'exact_color': color_rgb,
'dbz_value': dbz_value,
'bbox': {
'min_y': min(ys),
'max_y': max(ys),
'min_x': min(xs),
'max_x': max(xs)
},
'pixel_count': len(pixels),
'center': (int(np.mean(ys)), int(np.mean(xs)))
})
print(f"High-resolution mode: Created {len(regions)} color-specific regions")
return regions
print(f"Starting DETAILED region analysis on {width}x{height} image...")
# OPTIMIZATION: Pre-identify precipitation pixels to avoid scanning empty areas
precip_pixels = []
for y in range(height):
for x in range(width):
pixel_rgb = tuple(int(c) for c in radar_image[y, x])
pixel_sum = sum(pixel_rgb)
if not (pixel_sum < 30 or pixel_sum > 750 or
(pixel_rgb[0] > 240 and pixel_rgb[1] > 240 and pixel_rgb[2] > 240)):
match = self.find_precipitation_value(pixel_rgb)
if match:
precip_pixels.append((y, x))
print(f"Found {len(precip_pixels)} precipitation pixels to analyze for regions")
def flood_fill(start_y: int, start_x: int, target_color: Tuple[int, int, int]) -> List[Tuple[int, int]]:
"""Optimized flood fill algorithm."""
stack = [(start_y, start_x)]
region_pixels = []
while stack:
if len(stack) % 1000 == 0 and len(stack) > 1000:
print(f" Flood fill processing {len(stack)} pixels in stack...")
y, x = stack.pop()
if (y < 0 or y >= height or x < 0 or x >= width or visited[y, x]):
continue
pixel_color = tuple(int(c) for c in radar_image[y, x])
if self.color_distance(pixel_color, target_color) > self.color_tolerance:
continue
visited[y, x] = True
region_pixels.append((y, x))
# Add 4-connected neighbors
for dy, dx in [(0, 1), (1, 0), (0, -1), (-1, 0)]:
new_y, new_x = y + dy, x + dx
if (0 <= new_y < height and 0 <= new_x < width and not visited[new_y, new_x]):
stack.append((new_y, new_x))
return region_pixels
# OPTIMIZATION: Only process known precipitation pixels
print(f"Processing {len(precip_pixels)} precipitation pixels for regions...")
regions_found = 0
for i, (y, x) in enumerate(precip_pixels):
if i % 500 == 0:
progress = (i / len(precip_pixels)) * 100
print(f" Region progress: {progress:.1f}% ({i}/{len(precip_pixels)} pixels) - {regions_found} regions found")
if not visited[y, x]:
pixel_rgb = tuple(int(c) for c in radar_image[y, x])
match = self.find_precipitation_value(pixel_rgb)
if match:
print(f" Starting flood fill from ({y},{x}) for {match.name}...")
region_pixels = flood_fill(y, x, pixel_rgb)
if len(region_pixels) >= min_region_size:
# Calculate bounding box
ys = [p[0] for p in region_pixels]
xs = [p[1] for p in region_pixels]
regions.append({
'pixels': region_pixels,
'precipitation': match,
'bbox': {
'min_y': min(ys),
'max_y': max(ys),
'min_x': min(xs),
'max_x': max(xs)
},
'pixel_count': len(region_pixels),
'center': (int(np.mean(ys)), int(np.mean(xs)))
})
regions_found += 1
print(f" Found region {regions_found} with {len(region_pixels)} pixels")
else:
print(f" Skipped small region with {len(region_pixels)} pixels (min: {min_region_size})")
print(f"Region analysis complete! Found {len(regions)} regions")
return regions
def create_annotated_image(self, radar_image: np.ndarray, regions: List[Dict]) -> np.ndarray:
"""
Create an annotated image with precipitation values labeled on regions.
Args:
radar_image: Original radar image
regions: List of detected regions
Returns:
Annotated image as numpy array
"""
# Convert to PIL for text drawing
img_pil = Image.fromarray(radar_image)
draw = ImageDraw.Draw(img_pil)
# Try to load a font, fall back to default if not available
try:
font = ImageFont.truetype("arial.ttf", 12)
except:
font = ImageFont.load_default()
# Draw labels on regions
for region in regions:
center_y, center_x = region['center']
precip = region['precipitation']
# Create label text
avg_value = (precip.min_value + precip.max_value) / 2
label = f"{avg_value:.1f} mm/h"
# Calculate text size
bbox = draw.textbbox((0, 0), label, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
# Draw semi-transparent background for text
bg_coords = [
center_x - text_width//2 - 2,
center_y - text_height//2 - 2,
center_x + text_width//2 + 2,
center_y + text_height//2 + 2
]
# Draw white background with transparency
draw.rectangle(bg_coords, fill=(255, 255, 255, 180))
# Draw text
text_coords = (center_x - text_width//2, center_y - text_height//2)
draw.text(text_coords, label, fill=(0, 0, 0), font=font)
# Draw bounding box for larger regions
if region['pixel_count'] > 500:
bbox_coords = [
region['bbox']['min_x'],
region['bbox']['min_y'],
region['bbox']['max_x'],
region['bbox']['max_y']
]
draw.rectangle(bbox_coords, outline=(255, 255, 255), width=1)
return np.array(img_pil)
def save_analysis_results(self, results: Dict, filename: str = "radar_analysis.json"):
"""Save analysis results to JSON file."""
# Convert numpy arrays to lists for JSON serialization
serializable_results = {}
for key, value in results.items():
if isinstance(value, np.ndarray):
serializable_results[key] = value.tolist()
else:
serializable_results[key] = value
with open(filename, 'w') as f:
json.dump(serializable_results, f, indent=2)
def analyze_radar(self, radar_image_path: str, legend_path: str) -> Dict:
"""
Complete radar analysis pipeline.
Args:
radar_image_path: Path to radar image file
legend_path: Path to legend image file (for reference)
Returns:
Dictionary containing analysis results and output file path
"""
try:
# Load radar image
radar_image = cv2.imread(radar_image_path)
if radar_image is None:
raise ValueError(f"Could not load radar image: {radar_image_path}")
# Convert BGR to RGB
radar_image_rgb = cv2.cvtColor(radar_image, cv2.COLOR_BGR2RGB)
# Perform pixel analysis
analysis = self.analyze_radar_pixels(radar_image_rgb)
# Find regions
regions = self.find_color_regions(radar_image_rgb)
# Create annotated image
annotated_image = self.create_annotated_image(radar_image_rgb, regions)
# Save annotated image
output_filename = radar_image_path.replace('.png', '_analyzed.png')
annotated_bgr = cv2.cvtColor(annotated_image, cv2.COLOR_RGB2BGR)
cv2.imwrite(output_filename, annotated_bgr)
# Create hover data for map integration with EXPANDED WMS bounds
# Extended bounds to cover ALL North American radar data including Alaska and southeastern US
wms_bounds = (20.0, 85.0, -170.0, -40.0) # lat_min, lat_max, lon_min, lon_max
hover_data = self.create_hover_data(analysis['precipitation_map'], wms_bounds)
# Prepare results
results = {
'analysis': analysis,
'regions': regions,
'output_file': output_filename,
'input_file': radar_image_path,
'legend_file': legend_path,
'hover_data': hover_data
}
return results
except Exception as e:
raise Exception(f"Radar analysis failed: {str(e)}")
def convert_colors_to_american_scale(self, radar_image: np.ndarray) -> np.ndarray:
"""
Convert Canadian radar colors to American radar color scale.
Uses the same dBZ values but maps them to American colors.
"""
print("🔄 Converting Canadian radar colors to American color scale...")
# Load American color scale
try:
with open("us_radar_legend_data.json", 'r') as f:
us_legend = json.load(f)
us_colors = []
for item in us_legend['colors']:
us_colors.append({
'min_dbz': item['min_value'],
'max_dbz': item['max_value'],
'rgb': tuple(item['rgb']),
'name': item['name']
})
except Exception as e:
print(f"❌ Could not load US color scale: {e}")
return radar_image # Return original if conversion fails
height, width = radar_image.shape[:2]
converted_image = radar_image.copy()
print(f"🎨 Converting {width}x{height} image to American colors...")
# For each pixel, find its dBZ value and map to American color
conversion_count = 0
for y in range(height):
for x in range(width):
pixel_rgb = tuple(int(c) for c in radar_image[y, x])
# Find the Canadian dBZ value for this pixel
canadian_match = self.find_precipitation_value(pixel_rgb)
if canadian_match:
# Get the dBZ value
dbz_value = (canadian_match.min_value + canadian_match.max_value) / 2
# Find corresponding American color for this dBZ value
american_color = None
for us_color in us_colors:
if us_color['min_dbz'] <= dbz_value <= us_color['max_dbz']:
american_color = us_color['rgb']
break
# If no exact match, find closest
if not american_color:
best_match = None
min_distance = float('inf')
for us_color in us_colors:
center_dbz = (us_color['min_dbz'] + us_color['max_dbz']) / 2
distance = abs(dbz_value - center_dbz)
if distance < min_distance:
min_distance = distance
best_match = us_color
if best_match:
american_color = best_match['rgb']
# Apply the American color
if american_color:
converted_image[y, x] = american_color
conversion_count += 1
print(f"✅ Converted {conversion_count:,} pixels to American color scale")
return converted_image
def create_hover_data(self, dbz_map: np.ndarray, image_bounds: tuple = None) -> Dict:
"""
Create MAXIMUM RESOLUTION hover data for map integration.
Include every precipitation pixel across the ENTIRE radar image coverage.
No artificial caps or limits - full resolution coverage.
"""
height, width = dbz_map.shape
hover_points = []
# Use provided image bounds or default Canada radar bounds
if image_bounds:
lat_min, lat_max, lon_min, lon_max = image_bounds
print(f"Using provided image bounds: {lat_min}°N to {lat_max}°N, {lon_min}°W to {lon_max}°W")
else:
# Default Canada radar bounds - EXACT same as working version for proper alignment
lat_min, lat_max = 42.0, 84.0
lon_min, lon_max = -142.0, -52.0
print(f"Using default bounds: {lat_min}°N to {lat_max}°N, {lon_min}°W to {lon_max}°W")
print(f"Creating MAXIMUM RESOLUTION hover data from {width}x{height} dBZ map...")
# First, find the actual data bounds within the image
precip_rows, precip_cols = np.where(dbz_map > 0)
if len(precip_rows) > 0:
data_min_row, data_max_row = precip_rows.min(), precip_rows.max()
data_min_col, data_max_col = precip_cols.min(), precip_cols.max()
print(f"📊 Actual data bounds: rows {data_min_row}-{data_max_row}, cols {data_min_col}-{data_max_col}")
print(f"📊 Data coverage: {len(precip_rows):,} precipitation pixels found")
else:
print("❌ No precipitation data found in image")
return {'points': [], 'total_points': 0, 'resolution': 'none'}
# SCAN ENTIRE IMAGE with maximum resolution - no bounding box limitations
# Process EVERY SINGLE PIXEL to capture all unique color variations
pixel_count = 0
unique_colors = set()
print(f"🔍 Scanning ENTIRE {width}x{height} image for ALL precipitation pixels...")
for y in range(height):
if y % 50 == 0: # Progress logging
progress = (y / height) * 100
print(f" Full image scan: {progress:.1f}% - {pixel_count} points, {len(unique_colors)} unique colors")
for x in range(width):
dbz_value = dbz_map[y, x]
# Include ALL precipitation pixels across ENTIRE image
if dbz_value > 0: # Capture any precipitation value
# Convert pixel coordinates to lat/lon - cover ENTIRE image bounds
lat = lat_max - (y / height) * (lat_max - lat_min)
lon = lon_min + (x / width) * (lon_max - lon_min)
# Track unique dBZ values for maximum detail
unique_colors.add(round(dbz_value, 3))
hover_points.append({
'lat': round(lat, 8), # Ultra-high precision coordinates
'lon': round(lon, 8), # Ultra-high precision coordinates
'dbz': round(dbz_value, 3), # Ultra-high precision dBZ
'intensity': self.categorize_dbz_simple(dbz_value),
'pixel_x': x,
'pixel_y': y
})
pixel_count += 1
print(f"📊 Full image scan complete:")
print(f" - Total pixels scanned: {width * height:,}")
print(f" - Precipitation pixels found: {pixel_count:,}")
print(f" - Unique dBZ values: {len(unique_colors)}")
print(f" - Coverage area: Full {width}x{height} image")
print(f"✅ Created {len(hover_points)} MAXIMUM RESOLUTION hover points")
print(f"✅ Coverage: ENTIRE radar image ({width}x{height} pixels)")
print(f"✅ Resolution: EVERY precipitation pixel included")
print(f"✅ No caps or limits applied")
return {
'points': hover_points,
'total_points': len(hover_points),
'resolution': 'maximum', # Every single pixel included
'coverage': 'full_image', # Complete radar image coverage
'precision': 'ultra_high' # 8-decimal coordinate precision
}
def categorize_dbz_simple(self, dbz_value: float) -> str:
"""Extended dBZ categorization for hover display - no caps, full range."""
if dbz_value < 1.0:
return "Very Light"
elif dbz_value < 4.0:
return "Light"
elif dbz_value < 8.0:
return "Light-Moderate"
elif dbz_value < 16.0:
return "Moderate"
elif dbz_value < 32.0:
return "Heavy"
elif dbz_value < 48.0:
return "Very Heavy"
elif dbz_value < 64.0:
return "Intense"
elif dbz_value < 80.0:
return "Severe"
elif dbz_value < 100.0:
return "Extreme"
elif dbz_value < 150.0:
return "Violent"
else:
return f"Exceptional ({dbz_value:.1f} dBZ)"
# Keep backward compatibility
class CanadianRadarAnalyzer(RadarAnalyzer):
"""Backward compatibility class for Canadian radar."""
def __init__(self, legend_path: str = "radar_legendwellcropped.png"):
super().__init__(radar_type="canadian", legend_path=legend_path)
if __name__ == "__main__":
# Test the analyzer
analyzer = CanadianRadarAnalyzer()
# Load existing radar image for testing
test_image = cv2.imread('test_radar_proper.png')
if test_image is not None:
test_image_rgb = cv2.cvtColor(test_image, cv2.COLOR_BGR2RGB)
print("Analyzing radar image...")
analysis = analyzer.analyze_radar_pixels(test_image_rgb)
print(f"Found precipitation in {analysis['precipitation_pixels']} pixels")
print("Finding regions...")
regions = analyzer.find_color_regions(test_image_rgb)
print(f"Found {len(regions)} precipitation regions")
print("Creating annotated image...")
annotated = analyzer.create_annotated_image(test_image_rgb, regions)
# Save results
cv2.imwrite('annotated_radar.png', cv2.cvtColor(annotated, cv2.COLOR_RGB2BGR))
analyzer.save_analysis_results(analysis)
print("Analysis complete! Check annotated_radar.png and radar_analysis.json")