cyclone-pred-api / src /live_data_fetcher.py
clarindasusan's picture
Update src/live_data_fetcher.py
57cd1f1 verified
import requests
import numpy as np
from datetime import datetime
import re
# ============================================================================
# IMD CYCLONE DATA FETCHER
# ============================================================================
class IMDCycloneDataFetcher:
# API 6 — District wise Warning (primary)
WARNING_API_URL = "https://mausam.imd.gov.in/api/warnings_district_api.php"
# API 9 — RSS Feeds (secondary)
RSS_URL = "https://mausam.imd.gov.in/imd_latest/contents/dist_nowcast_rss.php"
# API 4 — District Wise Nowcast (tertiary)
NOWCAST_URL = "https://mausam.imd.gov.in/api/nowcast_district_api.php"
# Keep BULLETIN_URLS for any code that references it
BULLETIN_URLS = [
"https://mausam.imd.gov.in/api/warnings_district_api.php",
"https://mausam.imd.gov.in/imd_latest/contents/dist_nowcast_rss.php",
]
def __init__(self):
self.rsmc_url = "https://rsmcnewdelhi.imd.gov.in/"
def fetch_active_cyclones(self):
"""Fetch cyclone-related alerts from IMD district warning API."""
try:
response = requests.get(self.WARNING_API_URL, timeout=10)
response.raise_for_status()
data = response.json()
cyclone_alerts = []
records = data if isinstance(data, list) else data.get("data", [])
for item in records:
text = str(item).lower()
if any(kw in text for kw in
["cyclone", "depression", "storm", "deep depression"]):
cyclone_alerts.append({
"alert": item,
"timestamp": datetime.now().isoformat()
})
return {
"status": "success",
"active_cyclones": cyclone_alerts,
"count": len(cyclone_alerts),
"source": "IMD District Warnings API",
"last_updated": datetime.now().isoformat(),
}
except Exception as e:
return {"status": "error", "message": str(e), "active_cyclones": []}
def fetch_hourly_bulletin(self):
"""
Fetch bulletin content for cyclone parameter parsing.
Tries API 6 (warnings JSON) first, then API 9 (RSS) as fallback.
Returns dict with 'content' string for parse_cyclone_parameters().
"""
errors = []
# API 6: District Warnings
try:
response = requests.get(self.WARNING_API_URL, timeout=10)
if response.status_code == 200:
data = response.json()
records = data if isinstance(data, list) else data.get("data", [])
parts = []
for record in records:
if isinstance(record, dict):
parts.extend(str(v) for v in record.values())
else:
parts.append(str(record))
content = "\n".join(parts)
if content.strip():
return {
"status": "success",
"content": content,
"url_used": self.WARNING_API_URL,
"source": "IMD District Warnings (API 6)",
"timestamp": datetime.now().isoformat(),
}
errors.append(f"API 6 -> HTTP {response.status_code}")
except Exception as e:
errors.append(f"API 6 -> {str(e)}")
# API 9: RSS Feed fallback
try:
response = requests.get(self.RSS_URL, timeout=10)
if response.status_code == 200 and len(response.text.strip()) > 50:
# Strip XML tags, keep plain text for regex parsing
text = re.sub(r"<[^>]+>", " ", response.text)
text = re.sub(r"\s+", " ", text).strip()
return {
"status": "success",
"content": text,
"url_used": self.RSS_URL,
"source": "IMD RSS Feed (API 9)",
"timestamp": datetime.now().isoformat(),
}
errors.append(f"API 9 -> HTTP {response.status_code}")
except Exception as e:
errors.append(f"API 9 -> {str(e)}")
return {
"status": "error",
"message": "All IMD bulletin sources unavailable",
"tried": errors,
"timestamp": datetime.now().isoformat(),
}
# In fetch_rsmc_page_alerts(), remove the API 4 block entirely
# Keep only the API 6 warnings check
def fetch_rsmc_page_alerts(self):
alerts = []
errors = []
# API 6: Warnings only (API 4 Nowcast requires auth - 401)
try:
response = requests.get(self.WARNING_API_URL, timeout=10)
if response.status_code == 200:
data = response.json()
records = data if isinstance(data, list) else data.get("data", [])
for record in records:
text = str(record)
if any(kw in text.lower() for kw in
["cyclone", "depression", "storm", "warning"]):
if text[:500] not in alerts:
alerts.append(text[:500])
else:
errors.append(f"Warnings API -> HTTP {response.status_code}")
except Exception as e:
errors.append(f"Warnings API -> {str(e)}")
return {
"status": "success" if alerts else "no_alerts",
"source": "IMD District Warnings API (API 6)",
"alerts": list(dict.fromkeys(alerts)),
"count": len(alerts),
"errors": errors,
"timestamp": datetime.now().isoformat(),
}
def parse_cyclone_parameters(self, bulletin_text) -> dict:
"""
Extract cyclone parameters from bulletin content.
Accepts string or list (JSON records).
"""
params = {}
# Normalise input to string for regex
if isinstance(bulletin_text, list):
text = " ".join(str(item) for item in bulletin_text)
else:
text = str(bulletin_text)
try:
lat_match = re.search(r"(\d+\.?\d*)\s*[°\s]*[NS]", text, re.IGNORECASE)
if lat_match:
params["LAT"] = float(lat_match.group(1))
lon_match = re.search(r"(\d+\.?\d*)\s*[°\s]*[EW]", text, re.IGNORECASE)
if lon_match:
params["LON"] = float(lon_match.group(1))
wind_match = re.search(
r"wind[s]?.*?(\d+)\s*(kmph|km/h|knots|kt)", text, re.IGNORECASE
)
if wind_match:
speed = float(wind_match.group(1))
unit = wind_match.group(2).lower()
if "km" in unit:
speed = speed / 1.852
params["MAX_WIND"] = speed
pressure_match = re.search(
r"pressure.*?(\d{3,4})\s*(hpa|mb)", text, re.IGNORECASE
)
if pressure_match:
params["MIN_PRESSURE"] = float(pressure_match.group(1))
except Exception as e:
print(f"[CycloneFetcher] Error parsing bulletin: {e}")
return params
# ============================================================================
# JTWC FETCHER
# ============================================================================
class JTWCDataFetcher:
def __init__(self):
self.jtwc_url = "https://www.metoc.navy.mil/jtwc/jtwc.html"
def fetch_current_warnings(self):
try:
response = requests.get(self.jtwc_url, timeout=5)
if response.status_code == 200:
return {"status": "success", "timestamp": datetime.now().isoformat()}
return {"status": "error", "message": f"HTTP {response.status_code}"}
except Exception as e:
return {"status": "error", "message": str(e)}
# ============================================================================
# CYCLONE FEATURE MAP — module-level
# ============================================================================
CYCLONE_MODEL_FEATURE_MAP = {
"wind_speed_kmh": lambda f: min(f.get("MAX_WIND", 0) * 1.852, 350),
"central_pressure_hpa": lambda f: f.get("MIN_PRESSURE", 1000),
"sea_surface_temp_c": lambda f: f.get("SEA_SURFACE_TEMP_C", 28.5),
"track_curvature": lambda f: f.get("TRACK_CURVATURE", 0.3),
"distance_to_coast_km": lambda f: f.get("DIST_TO_COAST_KM", 200.0),
"storm_surge_potential": lambda f: min(1.0, f.get("MAX_WIND", 0) * 1.852 / 350),
"atmospheric_moisture": lambda f: f.get("ATMOSPHERIC_MOISTURE", 0.65),
"shear_index": lambda f: f.get("SHEAR_INDEX", 0.3),
}
# ============================================================================
# CYCLONE FEATURE ENGINEER
# ============================================================================
class CycloneFeatureEngineer:
def to_model_features(self, engineered: dict) -> dict:
"""Translate engineer_features() output -> CYCLONE_FEATURES schema."""
return {k: fn(engineered) for k, fn in CYCLONE_MODEL_FEATURE_MAP.items()}
def calculate_distance(self, lat, lon, target_lat, target_lon):
from math import radians, sin, cos, sqrt, atan2
R = 6371
lat1, lon1 = radians(lat), radians(lon)
lat2, lon2 = radians(target_lat), radians(target_lon)
dlat, dlon = lat2 - lat1, lon2 - lon1
a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2
return R * 2 * atan2(sqrt(a), sqrt(1 - a))
def create_cyclical_features(self, month, hour, day_of_year):
sin_doy = np.sin(2 * np.pi * day_of_year / 365)
cos_doy = np.cos(2 * np.pi * day_of_year / 365)
sin_hour = np.sin(2 * np.pi * hour / 24)
cos_hour = np.cos(2 * np.pi * hour / 24)
return sin_doy, cos_doy, sin_hour, cos_hour
def engineer_features(self, raw_data: dict, historical_data=None) -> dict:
features = {
"LAT": raw_data.get("LAT", 0),
"LON": raw_data.get("LON", 0),
"MAX_WIND": raw_data.get("MAX_WIND", 0),
"MIN_PRESSURE": raw_data.get("MIN_PRESSURE", 1000),
}
for quad in ["NE", "SE", "SW", "NW"]:
features[f"RAD_{quad}"] = raw_data.get(f"RAD_{quad}", 0)
features[f"RAD50_{quad}"] = raw_data.get(f"RAD50_{quad}", 0)
features[f"RAD64_{quad}"] = raw_data.get(f"RAD64_{quad}", 0)
now = datetime.now()
features["MONTH"] = now.month
features["HOUR"] = now.hour
features["DAY_OF_YEAR"] = now.timetuple().tm_yday
sin_doy, cos_doy, sin_hour, cos_hour = self.create_cyclical_features(
features["MONTH"], features["HOUR"], features["DAY_OF_YEAR"]
)
features.update({
"SIN_DOY": sin_doy,
"COS_DOY": cos_doy,
"SIN_HOUR": sin_hour,
"COS_HOUR": cos_hour,
})
features["STORM_AGE_HOURS"] = raw_data.get("STORM_AGE_HOURS", 24)
features["STORM_DURATION_HOURS"] = raw_data.get("STORM_DURATION_HOURS", 48)
features["MOVEMENT_SPEED_KPH"] = raw_data.get("MOVEMENT_SPEED_KPH", 15)
for lag, hours in [(6, 1), (12, 2), (18, 3), (24, 4)]:
if historical_data and len(historical_data) >= hours:
features[f"WIND_t-{lag}"] = historical_data[-hours].get("MAX_WIND", features["MAX_WIND"])
features[f"PRESSURE_t-{lag}"] = historical_data[-hours].get("MIN_PRESSURE", features["MIN_PRESSURE"])
else:
features[f"WIND_t-{lag}"] = features["MAX_WIND"]
features[f"PRESSURE_t-{lag}"] = features["MIN_PRESSURE"]
features["WIND_CHANGE_6H"] = features["MAX_WIND"] - features["WIND_t-6"]
features["PRESSURE_CHANGE_6H"] = features["MIN_PRESSURE"] - features["PRESSURE_t-6"]
features["WIND_CHANGE_12H"] = features["MAX_WIND"] - features["WIND_t-12"]
features["PRESSURE_CHANGE_12H"] = features["MIN_PRESSURE"] - features["PRESSURE_t-12"]
features["INTENSIFICATION_RATE"] = features["WIND_CHANGE_6H"] / 6
current_type = raw_data.get("STORM_TYPE", "TS")
for stype in ["DB", "EX", "MD", "TC", "TD", "TS", "TY", "WV"]:
features[f"STORM_{stype}"] = 1 if stype == current_type else 0
for prefix, key in [("RAD", "AVG_RAD34"), ("RAD50", "AVG_RAD50"), ("RAD64", "AVG_RAD64")]:
features[key] = float(np.mean([
features[f"{prefix}_{q}"] for q in ["NE", "SE", "SW", "NW"]
]))
features["STORM_SIZE"] = raw_data.get("STORM_SIZE", features["AVG_RAD34"])
return features
# ============================================================================
# IMD FLOOD DATA FETCHER
# ============================================================================
class IMDFloodDataFetcher:
"""
Fetches live rainfall from IMD APIs and combines with static
flood features to generate flood risk heatmap inputs.
"""
CURRENT_WEATHER_URL = "https://mausam.imd.gov.in/api/current_wx_api.php"
DISTRICT_RAINFALL_URL = "https://mausam.imd.gov.in/api/districtwise_rainfall_api.php"
AWS_DATA_URL = "https://city.imd.gov.in/api/aws_data_api.php"
CITY_STATIONS = {
"Mumbai": "42941",
"Chennai": "43279",
"Kolkata": "42809",
"Delhi": "42182",
"Hyderabad": "43128",
"Bangalore": "43295",
"Bhubaneswar": "42971",
"Patna": "42492",
"Guwahati": "42410",
"Kochi": "43371",
}
STATION_COORDS = {
"Mumbai": (19.0760, 72.8777),
"Chennai": (13.0827, 80.2707),
"Kolkata": (22.5726, 88.3639),
"Delhi": (28.6139, 77.2090),
"Hyderabad": (17.3850, 78.4867),
"Bangalore": (12.9716, 77.5946),
"Bhubaneswar": (20.2961, 85.8245),
"Patna": (25.5941, 85.1376),
"Guwahati": (26.1445, 91.7362),
"Kochi": ( 9.9312, 76.2673),
}
CITY_STATIC_FEATURES = {
"Mumbai": {"elevation_m": 11, "drainage_capacity_index": 0.35,
"flow_accumulation": 0.70, "twi": 12.0, "dist_river": 0.8},
"Chennai": {"elevation_m": 6, "drainage_capacity_index": 0.40,
"flow_accumulation": 0.65, "twi": 11.5, "dist_river": 1.2},
"Kolkata": {"elevation_m": 9, "drainage_capacity_index": 0.30,
"flow_accumulation": 0.75, "twi": 13.0, "dist_river": 0.5},
"Delhi": {"elevation_m": 216, "drainage_capacity_index": 0.50,
"flow_accumulation": 0.45, "twi": 8.5, "dist_river": 2.0},
"Hyderabad": {"elevation_m": 542, "drainage_capacity_index": 0.55,
"flow_accumulation": 0.35, "twi": 7.0, "dist_river": 3.5},
"Bangalore": {"elevation_m": 920, "drainage_capacity_index": 0.60,
"flow_accumulation": 0.30, "twi": 6.5, "dist_river": 4.0},
"Bhubaneswar": {"elevation_m": 45, "drainage_capacity_index": 0.38,
"flow_accumulation": 0.60, "twi": 10.5, "dist_river": 1.5},
"Patna": {"elevation_m": 53, "drainage_capacity_index": 0.28,
"flow_accumulation": 0.80, "twi": 14.0, "dist_river": 0.4},
"Guwahati": {"elevation_m": 55, "drainage_capacity_index": 0.32,
"flow_accumulation": 0.72, "twi": 13.5, "dist_river": 0.6},
"Kochi": {"elevation_m": 3, "drainage_capacity_index": 0.33,
"flow_accumulation": 0.78, "twi": 14.5, "dist_river": 0.3},
}
def fetch_all_cities(self) -> list:
"""
Fetch rainfall for all monitored cities.
Tries API 5 (district bulk) first, then per-station API 3 for gaps.
"""
results = []
district_data = self._fetch_district_rainfall_bulk()
for city, station_id in self.CITY_STATIONS.items():
lat, lon = self.STATION_COORDS.get(city, (0, 0))
static = self.CITY_STATIC_FEATURES.get(city, {})
rainfall_mm = district_data.get(city)
api_used = "API 5 - District Rainfall"
if rainfall_mm is None:
obs = self.fetch_city_rainfall(city, station_id)
rainfall_mm = obs.get("rainfall_mm")
api_used = obs.get("api_used", "API 3 - Current Weather")
results.append({
"status": "success" if rainfall_mm is not None else "no_data",
"city": city,
"rainfall_mm": rainfall_mm,
"lat": lat,
"lon": lon,
"static_features": static,
"api_used": api_used,
})
return results
def fetch_city_rainfall(self, city: str, station_id: str) -> dict:
"""Fetch current rainfall for a single city. Tries API 3 then API 10."""
try:
url = f"{self.CURRENT_WEATHER_URL}?id={station_id}"
response = requests.get(url, timeout=8)
if response.status_code == 200:
result = self._parse_current_weather_api(response, city)
if result.get("rainfall_mm") is not None:
result["api_used"] = "API 3 - Current Weather"
return result
except Exception:
pass
try:
response = requests.get(self.AWS_DATA_URL, timeout=8)
if response.status_code == 200:
result = self._parse_aws_data(response, city, station_id)
if result.get("rainfall_mm") is not None:
result["api_used"] = "API 10 - AWS/ARG"
return result
except Exception:
pass
return {"status": "error", "city": city, "rainfall_mm": None, "api_used": "none"}
def _fetch_district_rainfall_bulk(self) -> dict:
"""API 5 bulk fetch. Returns {city_name: rainfall_mm}."""
city_to_district = {
"Mumbai": ["Mumbai", "Mumbai City", "Mumbai Suburban"],
"Chennai": ["Chennai"],
"Kolkata": ["Kolkata", "Calcutta"],
"Delhi": ["Delhi", "New Delhi", "Central Delhi"],
"Hyderabad": ["Hyderabad", "Ranga Reddy"],
"Bangalore": ["Bengaluru", "Bangalore", "Bangalore Urban"],
"Bhubaneswar": ["Khordha", "Bhubaneswar"],
"Patna": ["Patna"],
"Guwahati": ["Kamrup Metropolitan", "Guwahati"],
"Kochi": ["Ernakulam", "Kochi"],
}
result = {}
try:
response = requests.get(self.DISTRICT_RAINFALL_URL, timeout=10)
if response.status_code != 200:
return result
data = response.json()
records = data if isinstance(data, list) else data.get("data", [])
for record in records:
district_name = str(record.get("district",
record.get("District",
record.get("district_name", "")))).strip()
for city, aliases in city_to_district.items():
if city in result:
continue
if any(alias.lower() in district_name.lower() for alias in aliases):
for field in ["rainfall", "rain", "rf", "rf_24hr",
"Rainfall", "RAINFALL", "actual"]:
val = record.get(field)
if val not in (None, "", "NA", "--", "NR"):
try:
result[city] = float(
str(val).replace("mm", "").strip()
)
break
except (ValueError, TypeError):
continue
break
except Exception:
pass
return result
def _parse_current_weather_api(self, response, city: str) -> dict:
"""Parse API 3 JSON or HTML response."""
rainfall_mm = None
try:
data = response.json()
records = data if isinstance(data, list) else [data]
if records:
record = records[0]
for field in ["rainfall", "rain", "rf", "rainfall_mm",
"Rainfall", "RAINFALL", "rf_24hr"]:
val = record.get(field)
if val not in (None, "", "NA", "--"):
try:
rainfall_mm = float(str(val).replace("mm", "").strip())
break
except (ValueError, TypeError):
continue
except Exception:
m = re.search(r"rainfall[:\s]+(\d+\.?\d*)\s*mm",
response.text, re.IGNORECASE)
if m:
rainfall_mm = float(m.group(1))
return {
"status": "success" if rainfall_mm is not None else "no_rainfall_data",
"city": city,
"rainfall_mm": rainfall_mm,
}
def _parse_aws_data(self, response, city: str, station_id: str) -> dict:
"""Parse API 10 AWS/ARG data."""
rainfall_mm = None
try:
data = response.json()
records = data if isinstance(data, list) else data.get("data", [])
for record in records:
sid = str(record.get("station_id", record.get("id", "")))
if sid == station_id:
for field in ["rainfall", "rain", "rf", "rf_1hr", "rf_24hr"]:
val = record.get(field)
if val not in (None, "", "NA", "--"):
try:
rainfall_mm = float(str(val).strip())
break
except (ValueError, TypeError):
continue
break
except Exception:
pass
return {
"status": "success" if rainfall_mm is not None else "no_rainfall_data",
"city": city,
"rainfall_mm": rainfall_mm,
}
def estimate_soil_saturation(self, rainfall_mm: float) -> float:
"""0mm -> 20% baseline, 200mm+ -> 95%."""
return float(np.clip(0.20 + (rainfall_mm / 200.0) * 0.75, 0.20, 0.95)) * 100