Spaces:
Sleeping
Sleeping
| import asyncio | |
| from datetime import datetime, timezone | |
| from typing import Any, Dict, Optional, List, Iterable, Tuple | |
| from dateutil import parser as dtparser | |
| from ..data.geo import haversine_km | |
| from .fetchers import ( | |
| fetch_usgs_quakes_geojson, fetch_nws_alerts_geojson, | |
| fetch_eonet_events_geojson, fetch_firms_hotspots_geojson | |
| ) | |
| def _flatten_lonlats(coords: Any) -> List[Tuple[float, float]]: | |
| """Collect (lon, lat) pairs from nested coordinate arrays.""" | |
| out: List[Tuple[float, float]] = [] | |
| if not isinstance(coords, (list, tuple)): | |
| return out | |
| if len(coords) >= 2 and isinstance(coords[0], (int, float)) and isinstance(coords[1], (int, float)): | |
| # Single coordinate pair [lon, lat, ...] | |
| out.append((float(coords[0]), float(coords[1]))) | |
| else: | |
| for c in coords: | |
| out.extend(_flatten_lonlats(c)) | |
| return out | |
| def _centroid_from_geom(geom: Dict[str, Any]) -> Optional[Tuple[float, float]]: | |
| """Return (lon, lat) for any geometry by taking a simple average of all coords.""" | |
| if not geom or "type" not in geom: | |
| return None | |
| gtype = geom.get("type") | |
| coords = geom.get("coordinates") | |
| # Fast path for Point | |
| if gtype == "Point" and isinstance(coords, (list, tuple)) and len(coords) >= 2: | |
| return (float(coords[0]), float(coords[1])) | |
| # Generic centroid for Polygon/MultiPolygon/LineString/etc. | |
| pts = _flatten_lonlats(coords) | |
| if not pts: | |
| return None | |
| xs = [p[0] for p in pts] | |
| ys = [p[1] for p in pts] | |
| return (sum(xs) / len(xs), sum(ys) / len(ys)) | |
| def _mk_point_feature(lon: float, lat: float, props: Dict[str, Any]) -> Dict[str, Any]: | |
| return { | |
| "type": "Feature", | |
| "geometry": {"type": "Point", "coordinates": [float(lon), float(lat)]}, | |
| "properties": props or {}, | |
| } | |
| def _report_to_update(f: Dict[str, Any]) -> Dict[str, Any]: | |
| p = f.get("properties", {}) or {} | |
| lat = f["geometry"]["coordinates"][1] | |
| lon = f["geometry"]["coordinates"][0] | |
| rid = p.get("rid") or p.get("id") or p.get("_id") or p.get("uuid") | |
| return { | |
| "kind": "report", | |
| "title": p.get("title") or p.get("text") or "User report", | |
| "emoji": p.get("emoji") or "📝", | |
| "time": p.get("reported_at"), | |
| "lat": float(lat), "lon": float(lon), | |
| "severity": p.get("severity"), | |
| "sourceUrl": None, | |
| "raw": p, | |
| "rid": rid, | |
| } | |
| def _quake_to_update(f: Dict[str, Any]) -> Dict[str, Any] | None: | |
| p = f.get("properties", {}) or {} | |
| g = f.get("geometry", {}) or {} | |
| if g.get("type") != "Point": return None | |
| lon, lat = g["coordinates"][:2] | |
| title = p.get("place") or p.get("title") or "Earthquake" | |
| mag = p.get("mag") or p.get("Magnitude") or p.get("m") | |
| ts = p.get("time") | |
| if isinstance(ts, (int, float)): | |
| time_iso = datetime.fromtimestamp(ts/1000, tz=timezone.utc).isoformat() | |
| else: | |
| time_iso = p.get("updated") if isinstance(p.get("updated"), str) else datetime.now(timezone.utc).isoformat() | |
| return {"kind": "quake", "title": title, "emoji": "💥", "time": time_iso, | |
| "lat": float(lat), "lon": float(lon), "severity": f"M{mag}" if mag is not None else None, | |
| "sourceUrl": p.get("url") or p.get("detail"), "raw": p} | |
| def _eonet_to_update(f: Dict[str, Any]) -> Dict[str, Any] | None: | |
| p = f.get("properties", {}) or {} | |
| g = f.get("geometry", {}) or {} | |
| if g.get("type") != "Point": return None | |
| lon, lat = g["coordinates"][:2] | |
| title = p.get("title") or p.get("category") or "Event" | |
| cat = (p.get("category") or (p.get("categories") or [{}])[0].get("title") or "").lower() | |
| if "wildfire" in cat: emoji = "🔥" | |
| elif "volcano" in cat: emoji = "🌋" | |
| elif "earthquake" in cat or "seismic" in cat: emoji = "💥" | |
| elif any(k in cat for k in ["storm","cyclone","hurricane","typhoon"]): emoji = "🌀" | |
| elif "flood" in cat: emoji = "🌊" | |
| elif "landslide" in cat: emoji = "🏔️" | |
| elif any(k in cat for k in ["ice","snow","blizzard"]): emoji = "❄️" | |
| elif any(k in cat for k in ["dust","smoke","haze"]): emoji = "🌫️" | |
| else: emoji = "⚠️" | |
| time_iso = p.get("time") or p.get("updated") or datetime.now(timezone.utc).isoformat() | |
| return {"kind": "eonet", "title": title, "emoji": emoji, "time": time_iso, | |
| "lat": float(lat), "lon": float(lon), "sourceUrl": p.get("link") or p.get("url"), "raw": p} | |
| def _firms_to_update(f: Dict[str, Any]) -> Dict[str, Any] | None: | |
| p = f.get("properties", {}) or {} | |
| g = f.get("geometry", {}) or {} | |
| if g.get("type") != "Point": return None | |
| lon, lat = g["coordinates"][:2] | |
| time_iso = p.get("acq_datetime") or p.get("acq_date") or datetime.now(timezone.utc).isoformat() | |
| sev = p.get("confidence") or p.get("brightness") or p.get("frp") | |
| return {"kind": "fire", "title": "Fire hotspot", "emoji": "🔥", "time": time_iso, | |
| "lat": float(lat), "lon": float(lon), "severity": sev, "sourceUrl": None, "raw": p} | |
| def _within(lat: float, lon: float, u: Dict[str, Any], radius_km: float) -> bool: | |
| return haversine_km((lat, lon), (u["lat"], u["lon"])) <= radius_km | |
| def _is_recent(iso: str | None, max_age_hours: int) -> bool: | |
| if not iso: return False | |
| try: | |
| t = dtparser.isoparse(iso) | |
| if not t.tzinfo: t = t.replace(tzinfo=timezone.utc) | |
| except Exception: | |
| return False | |
| return (datetime.now(timezone.utc) - t).total_seconds() <= max_age_hours * 3600 | |
| async def _gather_feeds(): | |
| results = await asyncio.gather( | |
| fetch_usgs_quakes_geojson(), fetch_nws_alerts_geojson(), | |
| fetch_eonet_events_geojson(), fetch_firms_hotspots_geojson(), | |
| return_exceptions=True | |
| ) | |
| def ok(x): return {"features": []} if isinstance(x, Exception) or not x else x | |
| return {"usgs": ok(results[0]), "nws": ok(results[1]), "eonet": ok(results[2]), "firms": ok(results[3])} | |
| async def local_updates(lat: float, lon: float, radius_miles: float, max_age_hours: int, limit: int): | |
| from ..data.store import find_reports_near | |
| km = float(radius_miles) * 1.609344 | |
| near_reports = find_reports_near(lat, lon, radius_km=km, limit=limit, max_age_hours=max_age_hours) | |
| updates: List[Dict[str, Any]] = [_report_to_update(f) for f in near_reports] | |
| feeds = await _gather_feeds() | |
| for f in (feeds["usgs"].get("features") or []): | |
| u = _quake_to_update(f) | |
| if u and _is_recent(u["time"], max_age_hours) and _within(lat, lon, u, km): | |
| updates.append(u) | |
| for u in _nws_to_updates(feeds["nws"]): | |
| if _is_recent(u["time"], max_age_hours) and _within(lat, lon, u, km): | |
| updates.append(u) | |
| for f in (feeds["eonet"].get("features") or []): | |
| u = _eonet_to_update(f) | |
| if u and _is_recent(u["time"], max_age_hours) and _within(lat, lon, u, km): | |
| updates.append(u) | |
| for f in (feeds["firms"].get("features") or []): | |
| u = _firms_to_update(f) | |
| if u and _is_recent(u["time"], max_age_hours) and _within(lat, lon, u, km): | |
| updates.append(u) | |
| updates.sort(key=lambda x: x["time"] or "", reverse=True) | |
| return {"count": min(len(updates), limit), "updates": updates[:limit]} | |
| def _nws_to_updates(fc: Dict[str, Any]) -> list[Dict[str, Any]]: | |
| out: list[Dict[str, Any]] = [] | |
| for f in (fc.get("features") or []): | |
| p = f.get("properties", {}) or {} | |
| g = f.get("geometry", {}) or {} | |
| coords = None | |
| if g.get("type") == "Polygon": | |
| poly = g["coordinates"][0] | |
| if poly: | |
| lats = [c[1] for c in poly]; lons = [c[0] for c in poly] | |
| coords = (sum(lats)/len(lats), sum(lons)/len(lons)) | |
| elif g.get("type") == "Point": | |
| coords = (g["coordinates"][1], g["coordinates"][0]) | |
| if not coords: | |
| continue | |
| sev = p.get("severity") or "Unknown" | |
| issued = p.get("effective") or p.get("onset") or p.get("sent") or datetime.now(timezone.utc).isoformat() | |
| out.append({"kind": "nws", "title": p.get("event") or "NWS Alert", "emoji": "⚠️", | |
| "time": issued, "lat": float(coords[0]), "lon": float(coords[1]), | |
| "severity": sev, "sourceUrl": p.get("@id") or p.get("id"), "raw": p}) | |
| return out | |
| async def global_updates(limit: int, max_age_hours: Optional[int]): | |
| from ..data.store import get_feature_collection | |
| fc = get_feature_collection() | |
| reports = fc.get("features") or [] | |
| rep_updates = [_report_to_update(f) for f in reports] | |
| feeds = await _gather_feeds() | |
| nws_updates = _nws_to_updates(feeds["nws"]) | |
| quake_updates = [_ for f in (feeds["usgs"].get("features") or []) if (_ := _quake_to_update(f))] | |
| eonet_updates = [_ for f in (feeds["eonet"].get("features") or []) if (_ := _eonet_to_update(f))] | |
| firms_updates = [_ for f in (feeds["firms"].get("features") or []) if (_ := _firms_to_update(f))] | |
| updates = rep_updates + nws_updates + quake_updates + eonet_updates + firms_updates | |
| if max_age_hours is not None: | |
| updates = [u for u in updates if _is_recent(u["time"], max_age_hours)] | |
| updates.sort(key=lambda x: x["time"] or "", reverse=True) | |
| return {"count": min(len(updates), limit), "updates": updates[:limit]} | |
| async def eonet_geojson_points() -> Dict[str, Any]: | |
| """Always return Point features for EONET (polygon events -> centroid).""" | |
| fc = await fetch_eonet_events_geojson() or {} | |
| features = [] | |
| for f in (fc.get("features") or []): | |
| g = f.get("geometry") or {} | |
| p = f.get("properties") or {} | |
| cen = _centroid_from_geom(g) | |
| if not cen: | |
| continue | |
| lon, lat = cen | |
| # Keep a stable, small prop set the map can style | |
| props = { | |
| "source": "eonet", | |
| "title": p.get("title") or p.get("category") or "Event", | |
| "emoji": "⚠️", # the map can replace based on category if it wants | |
| "raw": p, | |
| } | |
| features.append(_mk_point_feature(lon, lat, props)) | |
| return {"type": "FeatureCollection", "features": features} | |
| async def firms_geojson_points() -> Dict[str, Any]: | |
| """Always return Point features for FIRMS (skip invalid rows).""" | |
| fc = await fetch_firms_hotspots_geojson() or {} | |
| features = [] | |
| for f in (fc.get("features") or []): | |
| g = f.get("geometry") or {} | |
| p = f.get("properties") or {} | |
| cen = _centroid_from_geom(g) | |
| if not cen: | |
| # Some rows can be malformed; skip them | |
| continue | |
| lon, lat = cen | |
| props = { | |
| "source": "firms", | |
| "title": "Fire hotspot", | |
| "emoji": "🔥", | |
| "confidence": p.get("confidence"), | |
| "brightness": p.get("brightness"), | |
| "time": p.get("acq_datetime") or p.get("acq_date"), | |
| "raw": p, | |
| } | |
| features.append(_mk_point_feature(lon, lat, props)) | |
| return {"type": "FeatureCollection", "features": features} | |