""" Professional Farm Controller & Decision Engine ============================================== Orchestrates all farm intelligence APIs and generates comprehensive dashboards """ import json import asyncio import httpx import logging import os from datetime import datetime import alerts from database import db logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s" ) logger = logging.getLogger("FarmController") # API Endpoints WEATHER_URL = os.getenv("WEATHER_API_URL", "http://127.0.0.1:8001/weather") PEST_URL = os.getenv("PEST_API_URL", "http://127.0.0.1:8000/api/predict") WATER_URL = os.getenv("WATER_API_URL", "http://127.0.0.1:8002/predict") async def fetch(client: httpx.AsyncClient, url: str, params=None, json_data=None, method="GET"): """Generic async fetch with error handling""" try: if method == "GET": r = await client.get(url, params=params, timeout=60.0) else: r = await client.post(url, json=json_data, timeout=60.0) r.raise_for_status() return r.json() except Exception as e: logger.error(f"Error fetching {url}: {e}") return {"error": str(e), "status": "offline"} async def run_farm_dashboard(user_file: str = "user.json", output_file: str = "dashboard_output.json"): """ Main orchestration function - Runs all APIs in parallel and generates dashboard """ try: with open(user_file, "r") as f: farmer = json.load(f) except Exception as e: logger.error(f"Error loading farmer profile: {e}") return None logger.info(f"🌾 Running dashboard for {farmer.get('name', 'Farmer')}...") # Extract farmer data lat = farmer.get("latitude", 18.5204) lon = farmer.get("longitude", 73.8567) crop_type = farmer.get("crop_type", "Crop") village = farmer.get("village", "Pune") soil_type = farmer.get("soil_type", "Dry") season = farmer.get("season", "Annual") motor_capacity = farmer.get("motor_capacity", 10.0) farm_size_hectares = farmer.get("farm_size_hectares", 1.0) # Prepare API payloads pest_payload = { "latitude": lat, "longitude": lon, "crop_type": crop_type, "season": season, "soil_type": soil_type, "language": "English" } water_payload = { "city": village, "crop_type": crop_type, "soil_type": soil_type, "capacity": motor_capacity, "farm_size_hectares": farm_size_hectares } # Execute all API calls in parallel async with httpx.AsyncClient() as client: logger.info("📡 Calling all intelligence APIs in parallel...") weather_task = fetch(client, WEATHER_URL, params={"lat": lat, "lon": lon}) pest_task = fetch(client, PEST_URL, json_data=pest_payload, method="POST") water_task = fetch(client, WATER_URL, json_data=water_payload, method="POST") weather_res, pest_res, water_res = await asyncio.gather( weather_task, pest_task, water_task, return_exceptions=True ) # Handle exceptions if isinstance(weather_res, Exception): logger.error(f"Weather API failed: {weather_res}") weather_res = {"error": str(weather_res), "current": {}, "forecast": []} if isinstance(pest_res, Exception): logger.error(f"Pest API failed: {pest_res}") pest_res = {"error": str(pest_res), "pest_prediction_table": []} if isinstance(water_res, Exception): logger.error(f"Water API failed: {water_res}") water_res = {"error": str(water_res), "water_m3_sqm": 0, "irrigation_minutes": 0} logger.info("✅ All APIs completed") # ════════════════════════════════════════════════════════════════════════════ # DECISION ENGINE: Generate intelligent alerts # ════════════════════════════════════════════════════════════════════════════ logger.info("🧠 Running decision engine...") notifications = alerts.get_alerts(weather_res, pest_res, water_res, farmer) # Save alert history for alert in notifications: alert_copy = alert.copy() alert_copy["farmer_id"] = farmer.get("farmer_id", "DEFAULT") db.save_alert(alert_copy) # ════════════════════════════════════════════════════════════════════════════ # BUILD COMPREHENSIVE DASHBOARD # ════════════════════════════════════════════════════════════════════════════ dashboard = { "generated_at": datetime.now().isoformat(), "farmer_id": farmer.get("farmer_id", "DEFAULT"), "farmer_profile": farmer, "weather_intelligence": { "status": "online" if "error" not in weather_res else "offline", **weather_res }, "pest_intelligence": { "status": "online" if "error" not in pest_res else "offline", **pest_res }, "water_intelligence": { "status": "online" if "error" not in water_res else "offline", **water_res }, "alerts": notifications, "summary": { "total_alerts": len(notifications), "critical_alerts": len([a for a in notifications if a.get("severity") == "CRITICAL"]), "high_alerts": len([a for a in notifications if a.get("severity") == "HIGH"]), "medium_alerts": len([a for a in notifications if a.get("severity") == "MEDIUM"]), "action_required": any(a.get("action_required", False) for a in notifications) }, "health_check": { "weather_api": "🟢 OK" if weather_res.get("status") == "online" else "🔴 DOWN", "pest_api": "🟢 OK" if pest_res.get("status") == "online" else "🔴 DOWN", "water_api": "🟢 OK" if water_res.get("status") == "online" else "🔴 DOWN", "database": "🟢 OK" } } # Save dashboard with open(output_file, "w") as f: json.dump(dashboard, f, indent=4, default=str) logger.info(f"✅ Dashboard generated: {output_file}") logger.info(f" Total Alerts: {dashboard['summary']['total_alerts']}") logger.info(f" Critical: {dashboard['summary']['critical_alerts']} | High: {dashboard['summary']['high_alerts']}") return dashboard # Async entry point async def main(): """For standalone execution""" dashboard = await run_farm_dashboard() if dashboard: print(json.dumps(dashboard, indent=2, default=str)) if __name__ == "__main__": asyncio.run(main())