| """ |
| Professional Farm Controller & Decision Engine |
| ============================================== |
| Orchestrates all farm intelligence APIs and generates comprehensive dashboards |
| """ |
|
|
| import json |
| import asyncio |
| import httpx |
| import logging |
| import os |
| from datetime import datetime |
| import alerts |
| from database import db |
|
|
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s" |
| ) |
| logger = logging.getLogger("FarmController") |
|
|
| |
| WEATHER_URL = os.getenv("WEATHER_API_URL", "http://127.0.0.1:8001/weather") |
| PEST_URL = os.getenv("PEST_API_URL", "http://127.0.0.1:8000/api/predict") |
| WATER_URL = os.getenv("WATER_API_URL", "http://127.0.0.1:8002/predict") |
|
|
| async def fetch(client: httpx.AsyncClient, url: str, params=None, json_data=None, method="GET"): |
| """Generic async fetch with error handling""" |
| try: |
| if method == "GET": |
| r = await client.get(url, params=params, timeout=60.0) |
| else: |
| r = await client.post(url, json=json_data, timeout=60.0) |
| r.raise_for_status() |
| return r.json() |
| except Exception as e: |
| logger.error(f"Error fetching {url}: {e}") |
| return {"error": str(e), "status": "offline"} |
|
|
| async def run_farm_dashboard(user_file: str = "user.json", output_file: str = "dashboard_output.json"): |
| """ |
| Main orchestration function - Runs all APIs in parallel and generates dashboard |
| """ |
| try: |
| with open(user_file, "r") as f: |
| farmer = json.load(f) |
| except Exception as e: |
| logger.error(f"Error loading farmer profile: {e}") |
| return None |
| |
| logger.info(f"πΎ Running dashboard for {farmer.get('name', 'Farmer')}...") |
| |
| |
| lat = farmer.get("latitude", 18.5204) |
| lon = farmer.get("longitude", 73.8567) |
| crop_type = farmer.get("crop_type", "Crop") |
| village = farmer.get("village", "Pune") |
| soil_type = farmer.get("soil_type", "Dry") |
| season = farmer.get("season", "Annual") |
| motor_capacity = farmer.get("motor_capacity", 10.0) |
| farm_size_hectares = farmer.get("farm_size_hectares", 1.0) |
| |
| |
| pest_payload = { |
| "latitude": lat, |
| "longitude": lon, |
| "crop_type": crop_type, |
| "season": season, |
| "soil_type": soil_type, |
| "language": "English" |
| } |
| |
| water_payload = { |
| "city": village, |
| "crop_type": crop_type, |
| "soil_type": soil_type, |
| "capacity": motor_capacity, |
| "farm_size_hectares": farm_size_hectares |
| } |
| |
| |
| async with httpx.AsyncClient() as client: |
| logger.info("π‘ Calling all intelligence APIs in parallel...") |
| |
| weather_task = fetch(client, WEATHER_URL, params={"lat": lat, "lon": lon}) |
| pest_task = fetch(client, PEST_URL, json_data=pest_payload, method="POST") |
| water_task = fetch(client, WATER_URL, json_data=water_payload, method="POST") |
| |
| weather_res, pest_res, water_res = await asyncio.gather( |
| weather_task, pest_task, water_task, |
| return_exceptions=True |
| ) |
| |
| |
| if isinstance(weather_res, Exception): |
| logger.error(f"Weather API failed: {weather_res}") |
| weather_res = {"error": str(weather_res), "current": {}, "forecast": []} |
| |
| if isinstance(pest_res, Exception): |
| logger.error(f"Pest API failed: {pest_res}") |
| pest_res = {"error": str(pest_res), "pest_prediction_table": []} |
| |
| if isinstance(water_res, Exception): |
| logger.error(f"Water API failed: {water_res}") |
| water_res = {"error": str(water_res), "water_m3_sqm": 0, "irrigation_minutes": 0} |
| |
| logger.info("β
All APIs completed") |
| |
| |
| |
| |
| |
| logger.info("π§ Running decision engine...") |
| notifications = alerts.get_alerts(weather_res, pest_res, water_res, farmer) |
| |
| |
| for alert in notifications: |
| alert_copy = alert.copy() |
| alert_copy["farmer_id"] = farmer.get("farmer_id", "DEFAULT") |
| db.save_alert(alert_copy) |
| |
| |
| |
| |
| |
| dashboard = { |
| "generated_at": datetime.now().isoformat(), |
| "farmer_id": farmer.get("farmer_id", "DEFAULT"), |
| "farmer_profile": farmer, |
| |
| "weather_intelligence": { |
| "status": "online" if "error" not in weather_res else "offline", |
| **weather_res |
| }, |
| |
| "pest_intelligence": { |
| "status": "online" if "error" not in pest_res else "offline", |
| **pest_res |
| }, |
| |
| "water_intelligence": { |
| "status": "online" if "error" not in water_res else "offline", |
| **water_res |
| }, |
| |
| "alerts": notifications, |
| |
| "summary": { |
| "total_alerts": len(notifications), |
| "critical_alerts": len([a for a in notifications if a.get("severity") == "CRITICAL"]), |
| "high_alerts": len([a for a in notifications if a.get("severity") == "HIGH"]), |
| "medium_alerts": len([a for a in notifications if a.get("severity") == "MEDIUM"]), |
| "action_required": any(a.get("action_required", False) for a in notifications) |
| }, |
| |
| "health_check": { |
| "weather_api": "π’ OK" if weather_res.get("status") == "online" else "π΄ DOWN", |
| "pest_api": "π’ OK" if pest_res.get("status") == "online" else "π΄ DOWN", |
| "water_api": "π’ OK" if water_res.get("status") == "online" else "π΄ DOWN", |
| "database": "π’ OK" |
| } |
| } |
| |
| |
| with open(output_file, "w") as f: |
| json.dump(dashboard, f, indent=4, default=str) |
| |
| logger.info(f"β
Dashboard generated: {output_file}") |
| logger.info(f" Total Alerts: {dashboard['summary']['total_alerts']}") |
| logger.info(f" Critical: {dashboard['summary']['critical_alerts']} | High: {dashboard['summary']['high_alerts']}") |
| |
| return dashboard |
|
|
|
|
| |
| async def main(): |
| """For standalone execution""" |
| dashboard = await run_farm_dashboard() |
| if dashboard: |
| print(json.dumps(dashboard, indent=2, default=str)) |
|
|
|
|
| if __name__ == "__main__": |
| asyncio.run(main()) |
|
|