| """ |
| fetch_firms.py — NASA FIRMS API fetcher for active fire detection data. |
| |
| Downloads VIIRS_SNPP_NRT fire detection points for a given bounding box, |
| filters by confidence threshold, and converts fire points to binary |
| 128×128 label masks. Falls back to realistic synthetic data when the API |
| is unavailable or no API key is configured. |
| """ |
|
|
| import csv |
| import io |
| import logging |
| from pathlib import Path |
| from typing import List, Dict, Tuple, Optional |
|
|
| import numpy as np |
| import requests |
|
|
| from src.training.config import ( |
| NASA_FIRMS_API_KEY, FIRMS_BASE_URL, FIRMS_RAW_DIR, |
| PATCH_SIZE, FIRE_CONFIDENCE_THRESHOLD, NUM_SYNTHETIC_SAMPLES, |
| IMAGE_PATCHES_DIR, |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| def fetch_firms_csv( |
| bbox: str = "-122.0,36.0,-118.0,39.0", |
| days: int = 7, |
| api_key: Optional[str] = None, |
| ) -> Optional[List[Dict]]: |
| """ |
| Fetch active fire detections from NASA FIRMS API. |
| |
| Args: |
| bbox: Bounding box as 'west,south,east,north'. |
| days: Number of recent days to query (max 10). |
| api_key: NASA FIRMS API key. Falls back to config if not provided. |
| |
| Returns: |
| List of fire detection dicts, or None if the request fails. |
| """ |
| key = api_key or NASA_FIRMS_API_KEY |
| if not key: |
| logger.warning("No NASA FIRMS API key configured — using synthetic data.") |
| return None |
|
|
| url = f"{FIRMS_BASE_URL}/{key}/VIIRS_SNPP_NRT/{bbox}/{days}" |
| logger.info(f"Fetching FIRMS data: {url}") |
|
|
| try: |
| response = requests.get(url, timeout=30) |
| response.raise_for_status() |
| reader = csv.DictReader(io.StringIO(response.text)) |
| records = list(reader) |
| logger.info(f"Received {len(records)} fire detections from FIRMS.") |
| return records |
| except requests.RequestException as e: |
| logger.error(f"FIRMS API request failed: {e}") |
| return None |
|
|
|
|
| def filter_high_confidence(records: List[Dict], threshold: int = FIRE_CONFIDENCE_THRESHOLD) -> List[Dict]: |
| """Keep only detections with confidence >= threshold.""" |
| filtered = [] |
| for r in records: |
| try: |
| conf = int(r.get("confidence", 0)) |
| if conf >= threshold: |
| filtered.append(r) |
| except (ValueError, TypeError): |
| continue |
| logger.info(f"Filtered to {len(filtered)} high-confidence detections (≥{threshold}%).") |
| return filtered |
|
|
|
|
| def fire_points_to_mask( |
| records: List[Dict], |
| bbox: str, |
| patch_size: int = PATCH_SIZE, |
| ) -> np.ndarray: |
| """ |
| Convert fire point detections to a binary spatial mask. |
| |
| Each fire detection's lat/lon is mapped into a (patch_size × patch_size) |
| grid based on the bounding box. Pixels with at least one fire detection |
| are set to 1.0; others remain 0.0. |
| |
| Args: |
| records: List of dicts with 'latitude' and 'longitude' keys. |
| bbox: 'west,south,east,north' string. |
| patch_size: Output spatial resolution. |
| |
| Returns: |
| Binary mask of shape (patch_size, patch_size). |
| """ |
| west, south, east, north = [float(x) for x in bbox.split(",")] |
| mask = np.zeros((patch_size, patch_size), dtype=np.float32) |
|
|
| for r in records: |
| try: |
| lat = float(r["latitude"]) |
| lon = float(r["longitude"]) |
|
|
| |
| col = int((lon - west) / (east - west) * (patch_size - 1)) |
| row = int((north - lat) / (north - south) * (patch_size - 1)) |
|
|
| |
| row = max(0, min(row, patch_size - 1)) |
| col = max(0, min(col, patch_size - 1)) |
|
|
| |
| radius = np.random.randint(1, 4) |
| for dr in range(-radius, radius + 1): |
| for dc in range(-radius, radius + 1): |
| rr = max(0, min(row + dr, patch_size - 1)) |
| cc = max(0, min(col + dc, patch_size - 1)) |
| mask[rr, cc] = 1.0 |
| except (KeyError, ValueError): |
| continue |
|
|
| return mask |
|
|
|
|
| def generate_synthetic_fire_data( |
| num_samples: int = NUM_SYNTHETIC_SAMPLES, |
| patch_size: int = PATCH_SIZE, |
| ) -> Tuple[np.ndarray, np.ndarray]: |
| """ |
| Generate synthetic satellite image patches and fire risk masks. |
| |
| Creates realistic-looking data with: |
| - 4-channel (RGB + NIR) image patches with terrain-like patterns |
| - Binary fire masks with clustered fire regions |
| |
| Args: |
| num_samples: Number of samples to generate. |
| patch_size: Spatial resolution of each patch. |
| |
| Returns: |
| Tuple of (images, masks) arrays with shapes |
| (N, 4, H, W) and (N, H, W). |
| """ |
| logger.info(f"Generating {num_samples} synthetic fire samples...") |
| images = np.zeros((num_samples, 4, patch_size, patch_size), dtype=np.float32) |
| masks = np.zeros((num_samples, patch_size, patch_size), dtype=np.float32) |
|
|
| for i in range(num_samples): |
| |
| |
| base = np.random.rand(patch_size // 8, patch_size // 8).astype(np.float32) |
| from scipy.ndimage import zoom |
| try: |
| terrain = zoom(base, 8, order=3)[:patch_size, :patch_size] |
| except ImportError: |
| terrain = np.random.rand(patch_size, patch_size).astype(np.float32) |
|
|
| |
| images[i, 0] = terrain * 0.4 + 0.1 |
| images[i, 1] = terrain * 0.5 + 0.2 |
| images[i, 2] = terrain * 0.2 + 0.05 |
| images[i, 3] = terrain * 0.7 + 0.1 |
|
|
| |
| has_fire = np.random.rand() > 0.3 |
| if has_fire: |
| num_clusters = np.random.randint(1, 5) |
| for _ in range(num_clusters): |
| cy = np.random.randint(10, patch_size - 10) |
| cx = np.random.randint(10, patch_size - 10) |
| radius = np.random.randint(3, 12) |
|
|
| |
| yy, xx = np.mgrid[:patch_size, :patch_size] |
| dist = np.sqrt((yy - cy) ** 2 + (xx - cx) ** 2) |
| fire_region = dist < radius |
| masks[i][fire_region] = 1.0 |
|
|
| |
| images[i, 0][fire_region] += 0.5 |
| images[i, 1][fire_region] += 0.2 |
| images[i, 3][fire_region] -= 0.3 |
|
|
| |
| import cv2 |
| mask_float = masks[i].astype(np.float32) |
| mask_smoothed = cv2.GaussianBlur(mask_float, (15, 15), 5) |
| masks[i] = mask_smoothed / (mask_smoothed.max() + 1e-8) |
|
|
| |
| images[i] = np.clip(images[i], 0.0, 1.0) |
|
|
| logger.info(f"Generated {num_samples} synthetic samples. " |
| f"Fire prevalence: {masks.sum(axis=(1,2)).mean():.1f} px/sample") |
| return images, masks |
|
|
|
|
| def fetch_and_prepare_firms( |
| bbox: str = "-122.0,36.0,-118.0,39.0", |
| days: int = 7, |
| save: bool = True, |
| ) -> Tuple[np.ndarray, np.ndarray]: |
| """ |
| End-to-end: fetch FIRMS data or generate synthetic fallback. |
| |
| Returns: |
| Tuple of (images, masks). |
| """ |
| records = fetch_firms_csv(bbox=bbox, days=days) |
|
|
| if records is not None and len(records) > 0: |
| filtered = filter_high_confidence(records) |
| mask = fire_points_to_mask(filtered, bbox) |
|
|
| |
| |
| images, _ = generate_synthetic_fire_data(num_samples=1) |
| images = images[0:1] |
| masks = mask[np.newaxis, ...] |
|
|
| logger.info("Using real FIRMS fire locations mapped to synthetic patches.") |
| else: |
| images, masks = generate_synthetic_fire_data() |
| logger.info("Using fully synthetic fire data as FIRMS fallback.") |
|
|
| if save: |
| np.save(IMAGE_PATCHES_DIR / "satellite_patches.npy", images) |
| np.save(IMAGE_PATCHES_DIR / "fire_masks.npy", masks) |
| logger.info(f"Saved patches to {IMAGE_PATCHES_DIR}") |
|
|
| return images, masks |
|
|
|
|
| if __name__ == "__main__": |
| logging.basicConfig(level=logging.INFO) |
| images, masks = fetch_and_prepare_firms() |
| print(f"Images: {images.shape}, Masks: {masks.shape}") |
|
|