Spaces:
Sleeping
Sleeping
| import requests | |
| import numpy as np | |
| from datetime import datetime | |
| import re | |
| # ============================================================================ | |
| # IMD CYCLONE DATA FETCHER | |
| # ============================================================================ | |
| class IMDCycloneDataFetcher: | |
| # API 6 — District wise Warning (primary) | |
| WARNING_API_URL = "https://mausam.imd.gov.in/api/warnings_district_api.php" | |
| # API 9 — RSS Feeds (secondary) | |
| RSS_URL = "https://mausam.imd.gov.in/imd_latest/contents/dist_nowcast_rss.php" | |
| # API 4 — District Wise Nowcast (tertiary) | |
| NOWCAST_URL = "https://mausam.imd.gov.in/api/nowcast_district_api.php" | |
| # Keep BULLETIN_URLS for any code that references it | |
| BULLETIN_URLS = [ | |
| "https://mausam.imd.gov.in/api/warnings_district_api.php", | |
| "https://mausam.imd.gov.in/imd_latest/contents/dist_nowcast_rss.php", | |
| ] | |
| def __init__(self): | |
| self.rsmc_url = "https://rsmcnewdelhi.imd.gov.in/" | |
| def fetch_active_cyclones(self): | |
| """Fetch cyclone-related alerts from IMD district warning API.""" | |
| try: | |
| response = requests.get(self.WARNING_API_URL, timeout=10) | |
| response.raise_for_status() | |
| data = response.json() | |
| cyclone_alerts = [] | |
| records = data if isinstance(data, list) else data.get("data", []) | |
| for item in records: | |
| text = str(item).lower() | |
| if any(kw in text for kw in | |
| ["cyclone", "depression", "storm", "deep depression"]): | |
| cyclone_alerts.append({ | |
| "alert": item, | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| return { | |
| "status": "success", | |
| "active_cyclones": cyclone_alerts, | |
| "count": len(cyclone_alerts), | |
| "source": "IMD District Warnings API", | |
| "last_updated": datetime.now().isoformat(), | |
| } | |
| except Exception as e: | |
| return {"status": "error", "message": str(e), "active_cyclones": []} | |
| def fetch_hourly_bulletin(self): | |
| """ | |
| Fetch bulletin content for cyclone parameter parsing. | |
| Tries API 6 (warnings JSON) first, then API 9 (RSS) as fallback. | |
| Returns dict with 'content' string for parse_cyclone_parameters(). | |
| """ | |
| errors = [] | |
| # API 6: District Warnings | |
| try: | |
| response = requests.get(self.WARNING_API_URL, timeout=10) | |
| if response.status_code == 200: | |
| data = response.json() | |
| records = data if isinstance(data, list) else data.get("data", []) | |
| parts = [] | |
| for record in records: | |
| if isinstance(record, dict): | |
| parts.extend(str(v) for v in record.values()) | |
| else: | |
| parts.append(str(record)) | |
| content = "\n".join(parts) | |
| if content.strip(): | |
| return { | |
| "status": "success", | |
| "content": content, | |
| "url_used": self.WARNING_API_URL, | |
| "source": "IMD District Warnings (API 6)", | |
| "timestamp": datetime.now().isoformat(), | |
| } | |
| errors.append(f"API 6 -> HTTP {response.status_code}") | |
| except Exception as e: | |
| errors.append(f"API 6 -> {str(e)}") | |
| # API 9: RSS Feed fallback | |
| try: | |
| response = requests.get(self.RSS_URL, timeout=10) | |
| if response.status_code == 200 and len(response.text.strip()) > 50: | |
| # Strip XML tags, keep plain text for regex parsing | |
| text = re.sub(r"<[^>]+>", " ", response.text) | |
| text = re.sub(r"\s+", " ", text).strip() | |
| return { | |
| "status": "success", | |
| "content": text, | |
| "url_used": self.RSS_URL, | |
| "source": "IMD RSS Feed (API 9)", | |
| "timestamp": datetime.now().isoformat(), | |
| } | |
| errors.append(f"API 9 -> HTTP {response.status_code}") | |
| except Exception as e: | |
| errors.append(f"API 9 -> {str(e)}") | |
| return { | |
| "status": "error", | |
| "message": "All IMD bulletin sources unavailable", | |
| "tried": errors, | |
| "timestamp": datetime.now().isoformat(), | |
| } | |
| # In fetch_rsmc_page_alerts(), remove the API 4 block entirely | |
| # Keep only the API 6 warnings check | |
| def fetch_rsmc_page_alerts(self): | |
| alerts = [] | |
| errors = [] | |
| # API 6: Warnings only (API 4 Nowcast requires auth - 401) | |
| try: | |
| response = requests.get(self.WARNING_API_URL, timeout=10) | |
| if response.status_code == 200: | |
| data = response.json() | |
| records = data if isinstance(data, list) else data.get("data", []) | |
| for record in records: | |
| text = str(record) | |
| if any(kw in text.lower() for kw in | |
| ["cyclone", "depression", "storm", "warning"]): | |
| if text[:500] not in alerts: | |
| alerts.append(text[:500]) | |
| else: | |
| errors.append(f"Warnings API -> HTTP {response.status_code}") | |
| except Exception as e: | |
| errors.append(f"Warnings API -> {str(e)}") | |
| return { | |
| "status": "success" if alerts else "no_alerts", | |
| "source": "IMD District Warnings API (API 6)", | |
| "alerts": list(dict.fromkeys(alerts)), | |
| "count": len(alerts), | |
| "errors": errors, | |
| "timestamp": datetime.now().isoformat(), | |
| } | |
| def parse_cyclone_parameters(self, bulletin_text) -> dict: | |
| """ | |
| Extract cyclone parameters from bulletin content. | |
| Accepts string or list (JSON records). | |
| """ | |
| params = {} | |
| # Normalise input to string for regex | |
| if isinstance(bulletin_text, list): | |
| text = " ".join(str(item) for item in bulletin_text) | |
| else: | |
| text = str(bulletin_text) | |
| try: | |
| lat_match = re.search(r"(\d+\.?\d*)\s*[°\s]*[NS]", text, re.IGNORECASE) | |
| if lat_match: | |
| params["LAT"] = float(lat_match.group(1)) | |
| lon_match = re.search(r"(\d+\.?\d*)\s*[°\s]*[EW]", text, re.IGNORECASE) | |
| if lon_match: | |
| params["LON"] = float(lon_match.group(1)) | |
| wind_match = re.search( | |
| r"wind[s]?.*?(\d+)\s*(kmph|km/h|knots|kt)", text, re.IGNORECASE | |
| ) | |
| if wind_match: | |
| speed = float(wind_match.group(1)) | |
| unit = wind_match.group(2).lower() | |
| if "km" in unit: | |
| speed = speed / 1.852 | |
| params["MAX_WIND"] = speed | |
| pressure_match = re.search( | |
| r"pressure.*?(\d{3,4})\s*(hpa|mb)", text, re.IGNORECASE | |
| ) | |
| if pressure_match: | |
| params["MIN_PRESSURE"] = float(pressure_match.group(1)) | |
| except Exception as e: | |
| print(f"[CycloneFetcher] Error parsing bulletin: {e}") | |
| return params | |
| # ============================================================================ | |
| # JTWC FETCHER | |
| # ============================================================================ | |
| class JTWCDataFetcher: | |
| def __init__(self): | |
| self.jtwc_url = "https://www.metoc.navy.mil/jtwc/jtwc.html" | |
| def fetch_current_warnings(self): | |
| try: | |
| response = requests.get(self.jtwc_url, timeout=5) | |
| if response.status_code == 200: | |
| return {"status": "success", "timestamp": datetime.now().isoformat()} | |
| return {"status": "error", "message": f"HTTP {response.status_code}"} | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| # ============================================================================ | |
| # CYCLONE FEATURE MAP — module-level | |
| # ============================================================================ | |
| CYCLONE_MODEL_FEATURE_MAP = { | |
| "wind_speed_kmh": lambda f: min(f.get("MAX_WIND", 0) * 1.852, 350), | |
| "central_pressure_hpa": lambda f: f.get("MIN_PRESSURE", 1000), | |
| "sea_surface_temp_c": lambda f: f.get("SEA_SURFACE_TEMP_C", 28.5), | |
| "track_curvature": lambda f: f.get("TRACK_CURVATURE", 0.3), | |
| "distance_to_coast_km": lambda f: f.get("DIST_TO_COAST_KM", 200.0), | |
| "storm_surge_potential": lambda f: min(1.0, f.get("MAX_WIND", 0) * 1.852 / 350), | |
| "atmospheric_moisture": lambda f: f.get("ATMOSPHERIC_MOISTURE", 0.65), | |
| "shear_index": lambda f: f.get("SHEAR_INDEX", 0.3), | |
| } | |
| # ============================================================================ | |
| # CYCLONE FEATURE ENGINEER | |
| # ============================================================================ | |
| class CycloneFeatureEngineer: | |
| def to_model_features(self, engineered: dict) -> dict: | |
| """Translate engineer_features() output -> CYCLONE_FEATURES schema.""" | |
| return {k: fn(engineered) for k, fn in CYCLONE_MODEL_FEATURE_MAP.items()} | |
| def calculate_distance(self, lat, lon, target_lat, target_lon): | |
| from math import radians, sin, cos, sqrt, atan2 | |
| R = 6371 | |
| lat1, lon1 = radians(lat), radians(lon) | |
| lat2, lon2 = radians(target_lat), radians(target_lon) | |
| dlat, dlon = lat2 - lat1, lon2 - lon1 | |
| a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2 | |
| return R * 2 * atan2(sqrt(a), sqrt(1 - a)) | |
| def create_cyclical_features(self, month, hour, day_of_year): | |
| sin_doy = np.sin(2 * np.pi * day_of_year / 365) | |
| cos_doy = np.cos(2 * np.pi * day_of_year / 365) | |
| sin_hour = np.sin(2 * np.pi * hour / 24) | |
| cos_hour = np.cos(2 * np.pi * hour / 24) | |
| return sin_doy, cos_doy, sin_hour, cos_hour | |
| def engineer_features(self, raw_data: dict, historical_data=None) -> dict: | |
| features = { | |
| "LAT": raw_data.get("LAT", 0), | |
| "LON": raw_data.get("LON", 0), | |
| "MAX_WIND": raw_data.get("MAX_WIND", 0), | |
| "MIN_PRESSURE": raw_data.get("MIN_PRESSURE", 1000), | |
| } | |
| for quad in ["NE", "SE", "SW", "NW"]: | |
| features[f"RAD_{quad}"] = raw_data.get(f"RAD_{quad}", 0) | |
| features[f"RAD50_{quad}"] = raw_data.get(f"RAD50_{quad}", 0) | |
| features[f"RAD64_{quad}"] = raw_data.get(f"RAD64_{quad}", 0) | |
| now = datetime.now() | |
| features["MONTH"] = now.month | |
| features["HOUR"] = now.hour | |
| features["DAY_OF_YEAR"] = now.timetuple().tm_yday | |
| sin_doy, cos_doy, sin_hour, cos_hour = self.create_cyclical_features( | |
| features["MONTH"], features["HOUR"], features["DAY_OF_YEAR"] | |
| ) | |
| features.update({ | |
| "SIN_DOY": sin_doy, | |
| "COS_DOY": cos_doy, | |
| "SIN_HOUR": sin_hour, | |
| "COS_HOUR": cos_hour, | |
| }) | |
| features["STORM_AGE_HOURS"] = raw_data.get("STORM_AGE_HOURS", 24) | |
| features["STORM_DURATION_HOURS"] = raw_data.get("STORM_DURATION_HOURS", 48) | |
| features["MOVEMENT_SPEED_KPH"] = raw_data.get("MOVEMENT_SPEED_KPH", 15) | |
| for lag, hours in [(6, 1), (12, 2), (18, 3), (24, 4)]: | |
| if historical_data and len(historical_data) >= hours: | |
| features[f"WIND_t-{lag}"] = historical_data[-hours].get("MAX_WIND", features["MAX_WIND"]) | |
| features[f"PRESSURE_t-{lag}"] = historical_data[-hours].get("MIN_PRESSURE", features["MIN_PRESSURE"]) | |
| else: | |
| features[f"WIND_t-{lag}"] = features["MAX_WIND"] | |
| features[f"PRESSURE_t-{lag}"] = features["MIN_PRESSURE"] | |
| features["WIND_CHANGE_6H"] = features["MAX_WIND"] - features["WIND_t-6"] | |
| features["PRESSURE_CHANGE_6H"] = features["MIN_PRESSURE"] - features["PRESSURE_t-6"] | |
| features["WIND_CHANGE_12H"] = features["MAX_WIND"] - features["WIND_t-12"] | |
| features["PRESSURE_CHANGE_12H"] = features["MIN_PRESSURE"] - features["PRESSURE_t-12"] | |
| features["INTENSIFICATION_RATE"] = features["WIND_CHANGE_6H"] / 6 | |
| current_type = raw_data.get("STORM_TYPE", "TS") | |
| for stype in ["DB", "EX", "MD", "TC", "TD", "TS", "TY", "WV"]: | |
| features[f"STORM_{stype}"] = 1 if stype == current_type else 0 | |
| for prefix, key in [("RAD", "AVG_RAD34"), ("RAD50", "AVG_RAD50"), ("RAD64", "AVG_RAD64")]: | |
| features[key] = float(np.mean([ | |
| features[f"{prefix}_{q}"] for q in ["NE", "SE", "SW", "NW"] | |
| ])) | |
| features["STORM_SIZE"] = raw_data.get("STORM_SIZE", features["AVG_RAD34"]) | |
| return features | |
| # ============================================================================ | |
| # IMD FLOOD DATA FETCHER | |
| # ============================================================================ | |
| class IMDFloodDataFetcher: | |
| """ | |
| Fetches live rainfall from IMD APIs and combines with static | |
| flood features to generate flood risk heatmap inputs. | |
| """ | |
| CURRENT_WEATHER_URL = "https://mausam.imd.gov.in/api/current_wx_api.php" | |
| DISTRICT_RAINFALL_URL = "https://mausam.imd.gov.in/api/districtwise_rainfall_api.php" | |
| AWS_DATA_URL = "https://city.imd.gov.in/api/aws_data_api.php" | |
| CITY_STATIONS = { | |
| "Mumbai": "42941", | |
| "Chennai": "43279", | |
| "Kolkata": "42809", | |
| "Delhi": "42182", | |
| "Hyderabad": "43128", | |
| "Bangalore": "43295", | |
| "Bhubaneswar": "42971", | |
| "Patna": "42492", | |
| "Guwahati": "42410", | |
| "Kochi": "43371", | |
| } | |
| STATION_COORDS = { | |
| "Mumbai": (19.0760, 72.8777), | |
| "Chennai": (13.0827, 80.2707), | |
| "Kolkata": (22.5726, 88.3639), | |
| "Delhi": (28.6139, 77.2090), | |
| "Hyderabad": (17.3850, 78.4867), | |
| "Bangalore": (12.9716, 77.5946), | |
| "Bhubaneswar": (20.2961, 85.8245), | |
| "Patna": (25.5941, 85.1376), | |
| "Guwahati": (26.1445, 91.7362), | |
| "Kochi": ( 9.9312, 76.2673), | |
| } | |
| CITY_STATIC_FEATURES = { | |
| "Mumbai": {"elevation_m": 11, "drainage_capacity_index": 0.35, | |
| "flow_accumulation": 0.70, "twi": 12.0, "dist_river": 0.8}, | |
| "Chennai": {"elevation_m": 6, "drainage_capacity_index": 0.40, | |
| "flow_accumulation": 0.65, "twi": 11.5, "dist_river": 1.2}, | |
| "Kolkata": {"elevation_m": 9, "drainage_capacity_index": 0.30, | |
| "flow_accumulation": 0.75, "twi": 13.0, "dist_river": 0.5}, | |
| "Delhi": {"elevation_m": 216, "drainage_capacity_index": 0.50, | |
| "flow_accumulation": 0.45, "twi": 8.5, "dist_river": 2.0}, | |
| "Hyderabad": {"elevation_m": 542, "drainage_capacity_index": 0.55, | |
| "flow_accumulation": 0.35, "twi": 7.0, "dist_river": 3.5}, | |
| "Bangalore": {"elevation_m": 920, "drainage_capacity_index": 0.60, | |
| "flow_accumulation": 0.30, "twi": 6.5, "dist_river": 4.0}, | |
| "Bhubaneswar": {"elevation_m": 45, "drainage_capacity_index": 0.38, | |
| "flow_accumulation": 0.60, "twi": 10.5, "dist_river": 1.5}, | |
| "Patna": {"elevation_m": 53, "drainage_capacity_index": 0.28, | |
| "flow_accumulation": 0.80, "twi": 14.0, "dist_river": 0.4}, | |
| "Guwahati": {"elevation_m": 55, "drainage_capacity_index": 0.32, | |
| "flow_accumulation": 0.72, "twi": 13.5, "dist_river": 0.6}, | |
| "Kochi": {"elevation_m": 3, "drainage_capacity_index": 0.33, | |
| "flow_accumulation": 0.78, "twi": 14.5, "dist_river": 0.3}, | |
| } | |
| def fetch_all_cities(self) -> list: | |
| """ | |
| Fetch rainfall for all monitored cities. | |
| Tries API 5 (district bulk) first, then per-station API 3 for gaps. | |
| """ | |
| results = [] | |
| district_data = self._fetch_district_rainfall_bulk() | |
| for city, station_id in self.CITY_STATIONS.items(): | |
| lat, lon = self.STATION_COORDS.get(city, (0, 0)) | |
| static = self.CITY_STATIC_FEATURES.get(city, {}) | |
| rainfall_mm = district_data.get(city) | |
| api_used = "API 5 - District Rainfall" | |
| if rainfall_mm is None: | |
| obs = self.fetch_city_rainfall(city, station_id) | |
| rainfall_mm = obs.get("rainfall_mm") | |
| api_used = obs.get("api_used", "API 3 - Current Weather") | |
| results.append({ | |
| "status": "success" if rainfall_mm is not None else "no_data", | |
| "city": city, | |
| "rainfall_mm": rainfall_mm, | |
| "lat": lat, | |
| "lon": lon, | |
| "static_features": static, | |
| "api_used": api_used, | |
| }) | |
| return results | |
| def fetch_city_rainfall(self, city: str, station_id: str) -> dict: | |
| """Fetch current rainfall for a single city. Tries API 3 then API 10.""" | |
| try: | |
| url = f"{self.CURRENT_WEATHER_URL}?id={station_id}" | |
| response = requests.get(url, timeout=8) | |
| if response.status_code == 200: | |
| result = self._parse_current_weather_api(response, city) | |
| if result.get("rainfall_mm") is not None: | |
| result["api_used"] = "API 3 - Current Weather" | |
| return result | |
| except Exception: | |
| pass | |
| try: | |
| response = requests.get(self.AWS_DATA_URL, timeout=8) | |
| if response.status_code == 200: | |
| result = self._parse_aws_data(response, city, station_id) | |
| if result.get("rainfall_mm") is not None: | |
| result["api_used"] = "API 10 - AWS/ARG" | |
| return result | |
| except Exception: | |
| pass | |
| return {"status": "error", "city": city, "rainfall_mm": None, "api_used": "none"} | |
| def _fetch_district_rainfall_bulk(self) -> dict: | |
| """API 5 bulk fetch. Returns {city_name: rainfall_mm}.""" | |
| city_to_district = { | |
| "Mumbai": ["Mumbai", "Mumbai City", "Mumbai Suburban"], | |
| "Chennai": ["Chennai"], | |
| "Kolkata": ["Kolkata", "Calcutta"], | |
| "Delhi": ["Delhi", "New Delhi", "Central Delhi"], | |
| "Hyderabad": ["Hyderabad", "Ranga Reddy"], | |
| "Bangalore": ["Bengaluru", "Bangalore", "Bangalore Urban"], | |
| "Bhubaneswar": ["Khordha", "Bhubaneswar"], | |
| "Patna": ["Patna"], | |
| "Guwahati": ["Kamrup Metropolitan", "Guwahati"], | |
| "Kochi": ["Ernakulam", "Kochi"], | |
| } | |
| result = {} | |
| try: | |
| response = requests.get(self.DISTRICT_RAINFALL_URL, timeout=10) | |
| if response.status_code != 200: | |
| return result | |
| data = response.json() | |
| records = data if isinstance(data, list) else data.get("data", []) | |
| for record in records: | |
| district_name = str(record.get("district", | |
| record.get("District", | |
| record.get("district_name", "")))).strip() | |
| for city, aliases in city_to_district.items(): | |
| if city in result: | |
| continue | |
| if any(alias.lower() in district_name.lower() for alias in aliases): | |
| for field in ["rainfall", "rain", "rf", "rf_24hr", | |
| "Rainfall", "RAINFALL", "actual"]: | |
| val = record.get(field) | |
| if val not in (None, "", "NA", "--", "NR"): | |
| try: | |
| result[city] = float( | |
| str(val).replace("mm", "").strip() | |
| ) | |
| break | |
| except (ValueError, TypeError): | |
| continue | |
| break | |
| except Exception: | |
| pass | |
| return result | |
| def _parse_current_weather_api(self, response, city: str) -> dict: | |
| """Parse API 3 JSON or HTML response.""" | |
| rainfall_mm = None | |
| try: | |
| data = response.json() | |
| records = data if isinstance(data, list) else [data] | |
| if records: | |
| record = records[0] | |
| for field in ["rainfall", "rain", "rf", "rainfall_mm", | |
| "Rainfall", "RAINFALL", "rf_24hr"]: | |
| val = record.get(field) | |
| if val not in (None, "", "NA", "--"): | |
| try: | |
| rainfall_mm = float(str(val).replace("mm", "").strip()) | |
| break | |
| except (ValueError, TypeError): | |
| continue | |
| except Exception: | |
| m = re.search(r"rainfall[:\s]+(\d+\.?\d*)\s*mm", | |
| response.text, re.IGNORECASE) | |
| if m: | |
| rainfall_mm = float(m.group(1)) | |
| return { | |
| "status": "success" if rainfall_mm is not None else "no_rainfall_data", | |
| "city": city, | |
| "rainfall_mm": rainfall_mm, | |
| } | |
| def _parse_aws_data(self, response, city: str, station_id: str) -> dict: | |
| """Parse API 10 AWS/ARG data.""" | |
| rainfall_mm = None | |
| try: | |
| data = response.json() | |
| records = data if isinstance(data, list) else data.get("data", []) | |
| for record in records: | |
| sid = str(record.get("station_id", record.get("id", ""))) | |
| if sid == station_id: | |
| for field in ["rainfall", "rain", "rf", "rf_1hr", "rf_24hr"]: | |
| val = record.get(field) | |
| if val not in (None, "", "NA", "--"): | |
| try: | |
| rainfall_mm = float(str(val).strip()) | |
| break | |
| except (ValueError, TypeError): | |
| continue | |
| break | |
| except Exception: | |
| pass | |
| return { | |
| "status": "success" if rainfall_mm is not None else "no_rainfall_data", | |
| "city": city, | |
| "rainfall_mm": rainfall_mm, | |
| } | |
| def estimate_soil_saturation(self, rainfall_mm: float) -> float: | |
| """0mm -> 20% baseline, 200mm+ -> 95%.""" | |
| return float(np.clip(0.20 + (rainfall_mm / 200.0) * 0.75, 0.20, 0.95)) * 100 |