radar_wizard / precise_color_analyzer.py
nakas's picture
Implement precise color analysis using uploaded dBZ legend
7dff5c6
import numpy as np
import cv2
from PIL import Image
from typing import Dict, List, Tuple, Optional
import json
class PreciseRadarColorAnalyzer:
"""
Extracts precise color mappings from the Canadian radar legend
and performs accurate dBZ analysis on radar images.
"""
def __init__(self, legend_path: str = "canadaradarlegend_point1_to_200dbz.png"):
self.legend_path = legend_path
self.color_map = self.extract_precise_colors()
def extract_precise_colors(self) -> List[Tuple[float, Tuple[int, int, int]]]:
"""
Extract precise color-to-dBZ mappings from the legend image.
Returns list of (dBZ_value, RGB_color) tuples.
"""
# Load legend image
legend = cv2.imread(self.legend_path)
if legend is None:
raise ValueError(f"Could not load legend: {self.legend_path}")
legend_rgb = cv2.cvtColor(legend, cv2.COLOR_BGR2RGB)
height, width = legend_rgb.shape[:2]
# Sample colors from the legend (assuming vertical gradient)
color_samples = []
# dBZ values from 0.1 to 200 (logarithmic scale typical for radar)
dbz_values = [
0.1, 0.2, 0.5, 1.0, 2.0, 4.0, 8.0, 12.0, 16.0, 24.0,
32.0, 50.0, 64.0, 100.0, 125.0, 150.0, 175.0, 200.0
]
# Sample colors from center column of legend
center_x = width // 2
for i, dbz in enumerate(dbz_values):
# Map dBZ value to position in legend (top to bottom)
# Assuming legend goes from high dBZ (top) to low dBZ (bottom)
progress = i / (len(dbz_values) - 1)
y_pos = int(progress * (height - 1))
# Sample color from center of that row
rgb_color = tuple(legend_rgb[y_pos, center_x])
color_samples.append((dbz, rgb_color))
# Also sample every 5th pixel for more granular color mapping
detailed_samples = []
for y in range(0, height, 5):
# Map pixel position back to approximate dBZ value
progress = y / (height - 1)
# Reverse mapping: top = high dBZ, bottom = low dBZ
dbz_approx = 200.0 - (progress * 199.9) # 200 to 0.1
rgb_color = tuple(legend_rgb[y, center_x])
detailed_samples.append((dbz_approx, rgb_color))
# Combine and sort by dBZ value
all_samples = color_samples + detailed_samples
all_samples.sort(key=lambda x: x[0])
return all_samples
def find_closest_dbz(self, pixel_rgb: Tuple[int, int, int]) -> Optional[float]:
"""
Find the closest dBZ value for a given RGB pixel.
"""
if not self.color_map:
return None
min_distance = float('inf')
closest_dbz = None
for dbz, color in self.color_map:
# Calculate Euclidean distance in RGB space
distance = np.sqrt(sum((p - c) ** 2 for p, c in zip(pixel_rgb, color)))
if distance < min_distance:
min_distance = distance
closest_dbz = dbz
# Only return match if reasonably close (within color tolerance)
return closest_dbz if min_distance < 25 else None
def categorize_dbz(self, dbz_value: float) -> str:
"""Categorize dBZ value into intensity levels."""
if dbz_value < 1.0:
return "Very Light (0.1-1.0 dBZ)"
elif dbz_value < 4.0:
return "Light (1.0-4.0 dBZ)"
elif dbz_value < 12.0:
return "Light-Moderate (4.0-12.0 dBZ)"
elif dbz_value < 24.0:
return "Moderate (12.0-24.0 dBZ)"
elif dbz_value < 32.0:
return "Moderate-Heavy (24.0-32.0 dBZ)"
elif dbz_value < 50.0:
return "Heavy (32.0-50.0 dBZ)"
elif dbz_value < 64.0:
return "Very Heavy (50.0-64.0 dBZ)"
elif dbz_value < 100.0:
return "Extreme (64.0-100.0 dBZ)"
else:
return "Severe (100.0+ dBZ)"
def analyze_radar_image(self, radar_path: str) -> Dict:
"""
Perform precise dBZ analysis on radar image.
"""
# Load radar image
radar = cv2.imread(radar_path)
if radar is None:
raise ValueError(f"Could not load radar: {radar_path}")
radar_rgb = cv2.cvtColor(radar, cv2.COLOR_BGR2RGB)
height, width = radar_rgb.shape[:2]
# Initialize analysis data
dbz_map = np.zeros((height, width), dtype=float)
pixel_stats = {}
total_precipitation_pixels = 0
print(f"Analyzing {width}x{height} radar image...")
# Analyze each pixel
for y in range(height):
if y % 50 == 0: # Progress indicator
print(f"Processing row {y}/{height}")
for x in range(width):
pixel_rgb = tuple(int(c) for c in radar_rgb[y, x])
# Skip very dark pixels (background)
if sum(pixel_rgb) < 30:
continue
# Find closest dBZ value
dbz_value = self.find_closest_dbz(pixel_rgb)
if dbz_value is not None:
dbz_map[y, x] = dbz_value
total_precipitation_pixels += 1
# Categorize for statistics
category = self.categorize_dbz(dbz_value)
pixel_stats[category] = pixel_stats.get(category, 0) + 1
total_pixels = height * width
coverage_percent = (total_precipitation_pixels / total_pixels) * 100
print(f"Analysis complete! Found precipitation in {total_precipitation_pixels:,} pixels")
return {
'dbz_map': dbz_map,
'pixel_statistics': pixel_stats,
'total_pixels': total_pixels,
'precipitation_pixels': total_precipitation_pixels,
'precipitation_percentage': coverage_percent,
'intensity_levels': pixel_stats,
'color_mapping_samples': len(self.color_map)
}
def find_precipitation_regions(self, radar_rgb: np.ndarray, dbz_map: np.ndarray, min_region_size: int = 50) -> List[Dict]:
"""
Find connected regions of similar precipitation intensity.
"""
height, width = radar_rgb.shape[:2]
visited = np.zeros((height, width), dtype=bool)
regions = []
def flood_fill(start_y: int, start_x: int, target_dbz: float, tolerance: float = 5.0) -> List[Tuple[int, int]]:
"""Flood fill to find connected pixels with similar dBZ values."""
stack = [(start_y, start_x)]
region_pixels = []
while stack:
y, x = stack.pop()
if (y < 0 or y >= height or x < 0 or x >= width or
visited[y, x] or dbz_map[y, x] == 0):
continue
current_dbz = dbz_map[y, x]
if abs(current_dbz - target_dbz) > tolerance:
continue
visited[y, x] = True
region_pixels.append((y, x))
# Add 4-connected neighbors
for dy, dx in [(-1,0), (1,0), (0,-1), (0,1)]:
stack.append((y+dy, x+dx))
return region_pixels
print("Finding precipitation regions...")
# Find regions
for y in range(height):
for x in range(width):
if not visited[y, x] and dbz_map[y, x] > 0:
target_dbz = dbz_map[y, x]
region_pixels = flood_fill(y, x, target_dbz)
if len(region_pixels) >= min_region_size:
# Calculate region statistics
dbz_values = [dbz_map[py, px] for py, px in region_pixels]
avg_dbz = np.mean(dbz_values)
# Calculate bounding box
ys = [py for py, px in region_pixels]
xs = [px for py, px in region_pixels]
bbox = (min(xs), min(ys), max(xs), max(ys))
regions.append({
'pixels': len(region_pixels),
'avg_dbz': avg_dbz,
'category': self.categorize_dbz(avg_dbz),
'bbox': bbox,
'center': (np.mean(xs), np.mean(ys))
})
print(f"Found {len(regions)} precipitation regions")
return regions
def create_annotated_image(self, radar_path: str, analysis: Dict, regions: List[Dict]) -> str:
"""
Create annotated radar image with dBZ values labeled.
"""
# Load original image
radar = cv2.imread(radar_path)
radar_rgb = cv2.cvtColor(radar, cv2.COLOR_BGR2RGB)
# Convert to PIL for text drawing
img_pil = Image.fromarray(radar_rgb)
from PIL import ImageDraw, ImageFont
draw = ImageDraw.Draw(img_pil)
# Try to load a font
try:
font = ImageFont.truetype("/System/Library/Fonts/Arial.ttf", 12)
except:
font = ImageFont.load_default()
# Annotate regions
for i, region in enumerate(regions):
if region['pixels'] > 100: # Only annotate significant regions
x, y = region['center']
text = f"{region['avg_dbz']:.1f} dBZ"
# Draw text with background
text_bbox = draw.textbbox((int(x), int(y)), text, font=font)
draw.rectangle(text_bbox, fill=(0, 0, 0, 128))
draw.text((int(x), int(y)), text, fill=(255, 255, 255), font=font)
# Draw bounding box
bbox = region['bbox']
draw.rectangle(bbox, outline=(255, 255, 0), width=2)
# Save annotated image
output_path = radar_path.replace('.png', '_precise_analysis.png')
annotated_array = np.array(img_pil)
annotated_bgr = cv2.cvtColor(annotated_array, cv2.COLOR_RGB2BGR)
cv2.imwrite(output_path, annotated_bgr)
return output_path
def test_precise_analyzer():
"""Test the precise color analyzer."""
analyzer = PreciseRadarColorAnalyzer()
print(f"Extracted {len(analyzer.color_map)} color samples from legend")
# Test on current radar image
radar_files = ["test_radar_proper.png", "current_radar_fetch.png"]
for radar_file in radar_files:
try:
print(f"\nAnalyzing {radar_file}...")
analysis = analyzer.analyze_radar_image(radar_file)
print(f"Results:")
print(f"- Total pixels: {analysis['total_pixels']:,}")
print(f"- Precipitation pixels: {analysis['precipitation_pixels']:,}")
print(f"- Coverage: {analysis['precipitation_percentage']:.2f}%")
print(f"- Categories found:")
for category, count in analysis['pixel_statistics'].items():
print(f" * {category}: {count:,} pixels")
# Find regions
radar = cv2.imread(radar_file)
radar_rgb = cv2.cvtColor(radar, cv2.COLOR_BGR2RGB)
regions = analyzer.find_precipitation_regions(radar_rgb, analysis['dbz_map'])
# Create annotated image
output_file = analyzer.create_annotated_image(radar_file, analysis, regions)
print(f"- Annotated image saved: {output_file}")
break # Success, use this file
except Exception as e:
print(f"Error with {radar_file}: {e}")
continue
if __name__ == "__main__":
test_precise_analyzer()