import asyncio from datetime import datetime, timezone from typing import Any, Dict, Optional, List, Iterable, Tuple from dateutil import parser as dtparser from ..data.geo import haversine_km from .fetchers import ( fetch_usgs_quakes_geojson, fetch_nws_alerts_geojson, fetch_eonet_events_geojson, fetch_firms_hotspots_geojson ) def _flatten_lonlats(coords: Any) -> List[Tuple[float, float]]: """Collect (lon, lat) pairs from nested coordinate arrays.""" out: List[Tuple[float, float]] = [] if not isinstance(coords, (list, tuple)): return out if len(coords) >= 2 and isinstance(coords[0], (int, float)) and isinstance(coords[1], (int, float)): # Single coordinate pair [lon, lat, ...] out.append((float(coords[0]), float(coords[1]))) else: for c in coords: out.extend(_flatten_lonlats(c)) return out def _centroid_from_geom(geom: Dict[str, Any]) -> Optional[Tuple[float, float]]: """Return (lon, lat) for any geometry by taking a simple average of all coords.""" if not geom or "type" not in geom: return None gtype = geom.get("type") coords = geom.get("coordinates") # Fast path for Point if gtype == "Point" and isinstance(coords, (list, tuple)) and len(coords) >= 2: return (float(coords[0]), float(coords[1])) # Generic centroid for Polygon/MultiPolygon/LineString/etc. pts = _flatten_lonlats(coords) if not pts: return None xs = [p[0] for p in pts] ys = [p[1] for p in pts] return (sum(xs) / len(xs), sum(ys) / len(ys)) def _mk_point_feature(lon: float, lat: float, props: Dict[str, Any]) -> Dict[str, Any]: return { "type": "Feature", "geometry": {"type": "Point", "coordinates": [float(lon), float(lat)]}, "properties": props or {}, } def _report_to_update(f: Dict[str, Any]) -> Dict[str, Any]: p = f.get("properties", {}) or {} lat = f["geometry"]["coordinates"][1] lon = f["geometry"]["coordinates"][0] rid = p.get("rid") or p.get("id") or p.get("_id") or p.get("uuid") return { "kind": "report", "title": p.get("title") or p.get("text") or "User report", "emoji": p.get("emoji") or "📝", "time": p.get("reported_at"), "lat": float(lat), "lon": float(lon), "severity": p.get("severity"), "sourceUrl": None, "raw": p, "rid": rid, } def _quake_to_update(f: Dict[str, Any]) -> Dict[str, Any] | None: p = f.get("properties", {}) or {} g = f.get("geometry", {}) or {} if g.get("type") != "Point": return None lon, lat = g["coordinates"][:2] title = p.get("place") or p.get("title") or "Earthquake" mag = p.get("mag") or p.get("Magnitude") or p.get("m") ts = p.get("time") if isinstance(ts, (int, float)): time_iso = datetime.fromtimestamp(ts/1000, tz=timezone.utc).isoformat() else: time_iso = p.get("updated") if isinstance(p.get("updated"), str) else datetime.now(timezone.utc).isoformat() return {"kind": "quake", "title": title, "emoji": "💥", "time": time_iso, "lat": float(lat), "lon": float(lon), "severity": f"M{mag}" if mag is not None else None, "sourceUrl": p.get("url") or p.get("detail"), "raw": p} def _eonet_to_update(f: Dict[str, Any]) -> Dict[str, Any] | None: p = f.get("properties", {}) or {} g = f.get("geometry", {}) or {} if g.get("type") != "Point": return None lon, lat = g["coordinates"][:2] title = p.get("title") or p.get("category") or "Event" cat = (p.get("category") or (p.get("categories") or [{}])[0].get("title") or "").lower() if "wildfire" in cat: emoji = "🔥" elif "volcano" in cat: emoji = "🌋" elif "earthquake" in cat or "seismic" in cat: emoji = "💥" elif any(k in cat for k in ["storm","cyclone","hurricane","typhoon"]): emoji = "🌀" elif "flood" in cat: emoji = "🌊" elif "landslide" in cat: emoji = "🏔️" elif any(k in cat for k in ["ice","snow","blizzard"]): emoji = "❄️" elif any(k in cat for k in ["dust","smoke","haze"]): emoji = "🌫️" else: emoji = "⚠️" time_iso = p.get("time") or p.get("updated") or datetime.now(timezone.utc).isoformat() return {"kind": "eonet", "title": title, "emoji": emoji, "time": time_iso, "lat": float(lat), "lon": float(lon), "sourceUrl": p.get("link") or p.get("url"), "raw": p} def _firms_to_update(f: Dict[str, Any]) -> Dict[str, Any] | None: p = f.get("properties", {}) or {} g = f.get("geometry", {}) or {} if g.get("type") != "Point": return None lon, lat = g["coordinates"][:2] time_iso = p.get("acq_datetime") or p.get("acq_date") or datetime.now(timezone.utc).isoformat() sev = p.get("confidence") or p.get("brightness") or p.get("frp") return {"kind": "fire", "title": "Fire hotspot", "emoji": "🔥", "time": time_iso, "lat": float(lat), "lon": float(lon), "severity": sev, "sourceUrl": None, "raw": p} def _within(lat: float, lon: float, u: Dict[str, Any], radius_km: float) -> bool: return haversine_km((lat, lon), (u["lat"], u["lon"])) <= radius_km def _is_recent(iso: str | None, max_age_hours: int) -> bool: if not iso: return False try: t = dtparser.isoparse(iso) if not t.tzinfo: t = t.replace(tzinfo=timezone.utc) except Exception: return False return (datetime.now(timezone.utc) - t).total_seconds() <= max_age_hours * 3600 async def _gather_feeds(): results = await asyncio.gather( fetch_usgs_quakes_geojson(), fetch_nws_alerts_geojson(), fetch_eonet_events_geojson(), fetch_firms_hotspots_geojson(), return_exceptions=True ) def ok(x): return {"features": []} if isinstance(x, Exception) or not x else x return {"usgs": ok(results[0]), "nws": ok(results[1]), "eonet": ok(results[2]), "firms": ok(results[3])} async def local_updates(lat: float, lon: float, radius_miles: float, max_age_hours: int, limit: int): from ..data.store import find_reports_near km = float(radius_miles) * 1.609344 near_reports = find_reports_near(lat, lon, radius_km=km, limit=limit, max_age_hours=max_age_hours) updates: List[Dict[str, Any]] = [_report_to_update(f) for f in near_reports] feeds = await _gather_feeds() for f in (feeds["usgs"].get("features") or []): u = _quake_to_update(f) if u and _is_recent(u["time"], max_age_hours) and _within(lat, lon, u, km): updates.append(u) for u in _nws_to_updates(feeds["nws"]): if _is_recent(u["time"], max_age_hours) and _within(lat, lon, u, km): updates.append(u) for f in (feeds["eonet"].get("features") or []): u = _eonet_to_update(f) if u and _is_recent(u["time"], max_age_hours) and _within(lat, lon, u, km): updates.append(u) for f in (feeds["firms"].get("features") or []): u = _firms_to_update(f) if u and _is_recent(u["time"], max_age_hours) and _within(lat, lon, u, km): updates.append(u) updates.sort(key=lambda x: x["time"] or "", reverse=True) return {"count": min(len(updates), limit), "updates": updates[:limit]} def _nws_to_updates(fc: Dict[str, Any]) -> list[Dict[str, Any]]: out: list[Dict[str, Any]] = [] for f in (fc.get("features") or []): p = f.get("properties", {}) or {} g = f.get("geometry", {}) or {} coords = None if g.get("type") == "Polygon": poly = g["coordinates"][0] if poly: lats = [c[1] for c in poly]; lons = [c[0] for c in poly] coords = (sum(lats)/len(lats), sum(lons)/len(lons)) elif g.get("type") == "Point": coords = (g["coordinates"][1], g["coordinates"][0]) if not coords: continue sev = p.get("severity") or "Unknown" issued = p.get("effective") or p.get("onset") or p.get("sent") or datetime.now(timezone.utc).isoformat() out.append({"kind": "nws", "title": p.get("event") or "NWS Alert", "emoji": "⚠️", "time": issued, "lat": float(coords[0]), "lon": float(coords[1]), "severity": sev, "sourceUrl": p.get("@id") or p.get("id"), "raw": p}) return out async def global_updates(limit: int, max_age_hours: Optional[int]): from ..data.store import get_feature_collection fc = get_feature_collection() reports = fc.get("features") or [] rep_updates = [_report_to_update(f) for f in reports] feeds = await _gather_feeds() nws_updates = _nws_to_updates(feeds["nws"]) quake_updates = [_ for f in (feeds["usgs"].get("features") or []) if (_ := _quake_to_update(f))] eonet_updates = [_ for f in (feeds["eonet"].get("features") or []) if (_ := _eonet_to_update(f))] firms_updates = [_ for f in (feeds["firms"].get("features") or []) if (_ := _firms_to_update(f))] updates = rep_updates + nws_updates + quake_updates + eonet_updates + firms_updates if max_age_hours is not None: updates = [u for u in updates if _is_recent(u["time"], max_age_hours)] updates.sort(key=lambda x: x["time"] or "", reverse=True) return {"count": min(len(updates), limit), "updates": updates[:limit]} async def eonet_geojson_points() -> Dict[str, Any]: """Always return Point features for EONET (polygon events -> centroid).""" fc = await fetch_eonet_events_geojson() or {} features = [] for f in (fc.get("features") or []): g = f.get("geometry") or {} p = f.get("properties") or {} cen = _centroid_from_geom(g) if not cen: continue lon, lat = cen # Keep a stable, small prop set the map can style props = { "source": "eonet", "title": p.get("title") or p.get("category") or "Event", "emoji": "⚠️", # the map can replace based on category if it wants "raw": p, } features.append(_mk_point_feature(lon, lat, props)) return {"type": "FeatureCollection", "features": features} async def firms_geojson_points() -> Dict[str, Any]: """Always return Point features for FIRMS (skip invalid rows).""" fc = await fetch_firms_hotspots_geojson() or {} features = [] for f in (fc.get("features") or []): g = f.get("geometry") or {} p = f.get("properties") or {} cen = _centroid_from_geom(g) if not cen: # Some rows can be malformed; skip them continue lon, lat = cen props = { "source": "firms", "title": "Fire hotspot", "emoji": "🔥", "confidence": p.get("confidence"), "brightness": p.get("brightness"), "time": p.get("acq_datetime") or p.get("acq_date"), "raw": p, } features.append(_mk_point_feature(lon, lat, props)) return {"type": "FeatureCollection", "features": features}