ClimAI / executor.py
iPurushottam's picture
Upload folder using huggingface_hub
43ea1a5 verified
import requests
from datetime import datetime, timedelta
import re
OPEN_METEO_FORECAST = "https://api.open-meteo.com/v1/forecast"
OPEN_METEO_ARCHIVE = "https://archive-api.open-meteo.com/v1/archive"
# Default testing coordinates for Chennai
DEFAULT_LAT = 13.0827
DEFAULT_LON = 80.2707
def _ensure_datetime(d):
"""Ensure d is a full datetime object, not just a date."""
if d is None:
return None
if isinstance(d, datetime):
return d
return datetime(d.year, d.month, d.day)
def _infer_past_date_from_query(query: str):
current_year = datetime.now().year
m = re.search(r'\b(19\d{2}|20\d{2})\b', query.lower())
if m:
year = int(m.group(1))
if year < current_year:
now = datetime.utcnow()
try:
return datetime(year, now.month, now.day)
except ValueError:
return datetime(year, now.month, 28)
return None
def _extract_all_past_years(query: str):
current_year = datetime.now().year
now = datetime.now()
q = query.lower()
seen = set()
years_to_process = []
range_matches = re.finditer(r'\b(19\d{2}|20\d{2})\s*(?:to|-|and)\s*(19\d{2}|20\d{2})\b', q)
for m in range_matches:
start_y = int(m.group(1))
end_y = int(m.group(2))
if start_y > end_y:
start_y, end_y = end_y, start_y
for y in range(start_y, end_y + 1):
if y < current_year and y not in seen:
seen.add(y)
years_to_process.append(y)
single_matches = re.finditer(r'\b(19\d{2}|20\d{2}|2[0-5])\b', q)
for m in single_matches:
val = m.group(1)
y = 2000 + int(val) if len(val) == 2 else int(val)
if y < current_year and y not in seen:
seen.add(y)
years_to_process.append(y)
years_to_process = sorted(years_to_process, reverse=True)[:6]
years_to_process.sort()
past_dates = []
for year in years_to_process:
try:
past_dates.append(datetime(year, now.month, now.day))
except ValueError:
past_dates.append(datetime(year, now.month, 28))
return past_dates
def execute_plan(plan):
"""
Executes the deterministic plan generated by the Planner.
Routes to the correct external APIs or internal ML modules.
"""
intent = plan.get("intent", "weather")
all_intents = plan.get("all_intents", [intent])
target_date = _ensure_datetime(plan.get("date"))
ctx = plan.get("context", {})
query = plan.get("query", "")
# Fallback: infer past date from query year
if target_date is None and intent in ["weather_history", "weather"]:
inferred = _infer_past_date_from_query(query)
if inferred:
target_date = inferred
intent = "weather_history"
plan["intent"] = intent
execution_result = {
"weather": None,
"forecast": None,
"historical_weather": None,
"historical_comparison": None,
"cyclone": None,
"earthquake": None,
"tsunami": None,
"models": None
}
try:
from weather_service import (get_cyclones, get_earthquakes, get_tsunamis,
get_weather, get_forecast, fetch_historical_weather)
now = datetime.utcnow().date()
# ── DISASTER ROUTE ────────────────────────────────────────────────────
# Full report: always fetch weather + forecast + cyclones + earthquakes
if "disaster" in all_intents:
execution_result["weather"] = get_weather()
execution_result["forecast"] = get_forecast()
execution_result["cyclone"] = get_cyclones()
execution_result["earthquake"] = get_earthquakes()
execution_result["tsunami"] = get_tsunamis()
# Don't return early β€” other intents below may add more data
# ── COMPARISON ROUTE ──────────────────────────────────────────────────
if intent == "weather_comparison" or ctx.get("wants_comparison"):
execution_result["weather"] = get_weather()
past_dates = _extract_all_past_years(query)
if not past_dates and target_date:
target_dt_only = target_date.date() if isinstance(target_date, datetime) else target_date
if target_dt_only < now:
past_dates = [target_date]
if past_dates:
comparison_results = []
for past_dt in past_dates:
past_date_only = past_dt.date()
archive_limit = datetime.utcnow().date() - timedelta(days=5)
if past_date_only <= archive_limit:
hist = fetch_historical_weather(past_dt, days_range=1)
if hist and "error" not in hist:
hist["queried_year"] = past_dt.year
hist["queried_date"] = past_dt.strftime("%Y-%m-%d")
comparison_results.append(hist)
if comparison_results:
execution_result["historical_comparison"] = comparison_results
execution_result["historical_weather"] = comparison_results[0]
execution_result["forecast"] = get_forecast()
# ── STANDARD WEATHER / HISTORY / PREDICTION ROUTE ────────────────────
elif intent in ["weather_history", "weather", "prediction"]:
if target_date:
target_date_only = target_date.date() if isinstance(target_date, datetime) else target_date
if target_date_only < now:
execution_result["historical_weather"] = fetch_historical_weather(target_date, days_range=1)
elif target_date_only > now and (target_date_only - now).days <= 7:
execution_result["forecast"] = get_forecast()
target_date_only = target_date.date() if target_date and isinstance(target_date, datetime) else target_date
if not target_date or target_date_only == now:
execution_result["weather"] = get_weather()
if not target_date and intent in ["weather", "prediction"]:
execution_result["forecast"] = get_forecast()
# ── CYCLONE ROUTE ─────────────────────────────────────────────────────
if "cyclone" in all_intents and execution_result["cyclone"] is None:
cy_name = ctx.get("cyclone_name")
cy_year = ctx.get("year")
c_data = get_cyclones(name=cy_name, year=cy_year)
if ctx.get("wants_recent") and not cy_name:
cyc_list = sorted(c_data.get("cyclones", []), key=lambda c: c["year"], reverse=True)[:3]
c_data["cyclones"] = cyc_list
execution_result["cyclone"] = c_data
# ── EARTHQUAKE ROUTE ──────────────────────────────────────────────────
if "earthquake" in all_intents and execution_result["earthquake"] is None:
execution_result["earthquake"] = get_earthquakes()
# ── TSUNAMI ROUTE ─────────────────────────────────────────────────────
if "tsunami" in all_intents and execution_result["tsunami"] is None:
execution_result["tsunami"] = get_tsunamis()
except ImportError as e:
print(f"Executor Import Error: {e}")
return execution_result