""" Professional REST API Server for Climate-Resilient Agriculture Platform ========================================================================= Handles farmer registration, retrieving alerts, and comprehensive farm data """ import json import os from datetime import datetime, timedelta from typing import List, Optional from fastapi import FastAPI, HTTPException, Query, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from pydantic import BaseModel import logging import asyncio from farm_controller import run_farm_dashboard from database import db from models import FarmerProfile, FarmAlert, AlertSeverity #SETUP logger = logging.getLogger("FarmAPI") app = FastAPI( title="🌾 Climate-Resilient Agriculture Platform", description="Professional farm intelligence platform with real-time alerts", version="1.0.0" ) # CORS Configuration app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ════════════════════════════════════════════════════════════════════════════ # DATA MODELS # ════════════════════════════════════════════════════════════════════════════ class FarmerRegistration(BaseModel): """Farmer registration schema""" farmer_id: str name: str phone: str village: str latitude: float longitude: float crop_type: str soil_type: str motor_capacity: float = 10.0 farm_size_hectares: float = 1.0 telegram_chat_id: Optional[str] = None class AlertQueryParams(BaseModel): """Alert query parameters""" farmer_id: str limit: int = 50 severity: Optional[str] = None category: Optional[str] = None # ════════════════════════════════════════════════════════════════════════════ # HEALTH & STATUS # ════════════════════════════════════════════════════════════════════════════ @app.get("/health", tags=["System"]) async def health_check(): """System health check endpoint with live connectivity testing""" import httpx # Internal endpoints SERVICES = { "weather_api": os.getenv("WEATHER_API_URL", "http://127.0.0.1:8001/weather"), "pest_api": os.getenv("PEST_API_URL", "http://127.0.0.1:8000/api/predict"), "water_api": os.getenv("WATER_API_URL", "http://127.0.0.1:8002/predict") } health_results = {} async with httpx.AsyncClient() as client: # Check API health_results["api"] = "🟢 ONLINE" # Check Database try: db.get_all_farmers() health_results["database"] = "🟢 ONLINE" except: health_results["database"] = "🔴 ERROR" # Check sub-services (simple TCP/HTTP check) for name, url in SERVICES.items(): try: # Just check if Port is at least open/listening import socket from urllib.parse import urlparse parsed_url = urlparse(url) host, port = parsed_url.hostname or "127.0.0.1", parsed_url.port or 80 with socket.create_connection((host, port), timeout=1.0): health_results[name] = "🟢 ONLINE" except: health_results[name] = "🔴 OFFLINE" # Overall Status total_services = len(health_results) online_services = sum(1 for status in health_results.values() if "ONLINE" in str(status)) status = "🟢 OPERATIONAL" if online_services == total_services else "🟡 PARTIAL" if online_services > 1 else "🔴 CRITICAL" return { "status": status, "timestamp": datetime.now().isoformat(), "services": health_results, "online_count": f"{online_services}/{total_services}", "message": "All systems operational" if status == "🟢 OPERATIONAL" else "Some background services are offline" } @app.get("/status", tags=["System"]) async def platform_status(): """Get platform status""" farmers = db.get_all_farmers() subscriptions = db.get_active_subscriptions() return { "platform": "Climate-Resilient Agriculture Platform", "version": "1.0.0", "status": "ACTIVE", "registered_farmers": len(farmers), "active_subscriptions": len(subscriptions), "api_endpoints": 8, "database_status": "Healthy", "last_update": datetime.now().isoformat() } # ════════════════════════════════════════════════════════════════════════════ # FARMER MANAGEMENT # ════════════════════════════════════════════════════════════════════════════ @app.post("/api/farmers/register", tags=["Farmer Management"]) async def register_farmer(farmer: FarmerRegistration): """Register a new farmer or update existing profile""" try: farmer_dict = farmer.dict() farmer_dict["registered_at"] = datetime.now().isoformat() farmer_dict["threshold_settings"] = { "temp_high": 40, "temp_low": 5, "wind_speed": 30, "wind_alert": 15, "rain_threshold": 10 } farmer_dict["alert_preferences"] = { "weather": True, "pest": True, "irrigation": True, "sustainability": True, "climate_warning": True } if db.save_farmer(farmer_dict): logger.info(f"✅ Farmer registered: {farmer.name}") return { "success": True, "message": f"Farmer {farmer.name} registered successfully", "farmer_id": farmer.farmer_id, "timestamp": datetime.now().isoformat() } else: raise HTTPException(500, "Failed to save farmer profile") except Exception as e: logger.error(f"Registration failed: {e}") raise HTTPException(500, f"Registration error: {str(e)}") @app.get("/api/farmers/{farmer_id}", tags=["Farmer Management"]) async def get_farmer(farmer_id: str): """Get farmer profile""" farmer = db.get_farmer(farmer_id) if not farmer: raise HTTPException(404, "Farmer not found") return { "success": True, "farmer": farmer, "timestamp": datetime.now().isoformat() } @app.get("/api/farmers", tags=["Farmer Management"]) async def list_all_farmers(): """Get all registered farmers (admin)""" farmers = db.get_all_farmers() return { "success": True, "total_farmers": len(farmers), "farmers": farmers, "timestamp": datetime.now().isoformat() } # ════════════════════════════════════════════════════════════════════════════ # FARM INTELLIGENCE & ALERTS # ════════════════════════════════════════════════════════════════════════════ @app.get("/api/dashboard/{farmer_id}", tags=["Farm Intelligence"]) async def get_farm_dashboard(farmer_id: str): """Get complete farm dashboard with all intelligence""" try: # Load or generate dashboard dashboard_file = "dashboard_output.json" if os.path.exists(dashboard_file): with open(dashboard_file, "r") as f: dashboard = json.load(f) else: dashboard = await run_farm_dashboard() if not dashboard: raise HTTPException(500, "Failed to generate dashboard") return { "success": True, "dashboard": dashboard, "timestamp": datetime.now().isoformat() } except Exception as e: logger.error(f"Dashboard error: {e}") raise HTTPException(500, f"Dashboard error: {str(e)}") @app.get("/api/alerts/{farmer_id}", tags=["Alerts"]) async def get_alerts( farmer_id: str, limit: int = Query(50, ge=1, le=500), severity: Optional[str] = None, category: Optional[str] = None ): """Get alerts for a farmer with optional filtering""" try: alerts = db.get_alerts_for_farmer(farmer_id, limit=limit) # Filter by severity if severity: alerts = [a for a in alerts if a.get("severity") == severity] # Filter by category if category: alerts = [a for a in alerts if a.get("type") == category] # Count by severity critical_count = len([a for a in alerts if a.get("severity") == "CRITICAL"]) high_count = len([a for a in alerts if a.get("severity") == "HIGH"]) return { "success": True, "farmer_id": farmer_id, "total_alerts": len(alerts), "critical_count": critical_count, "high_count": high_count, "alerts": alerts[:limit], "timestamp": datetime.now().isoformat() } except Exception as e: logger.error(f"Alert query failed: {e}") raise HTTPException(500, f"Alert query failed: {str(e)}") @app.get("/api/critical-alerts/{farmer_id}", tags=["Alerts"]) async def get_critical_alerts(farmer_id: str): """Get only critical and high priority alerts""" try: alerts = db.get_alerts_for_farmer(farmer_id, limit=100) critical_alerts = [a for a in alerts if a.get("severity") in ["CRITICAL", "HIGH"]] return { "success": True, "farmer_id": farmer_id, "critical_alerts_count": len(critical_alerts), "alerts": critical_alerts, "action_required": len(critical_alerts) > 0, "timestamp": datetime.now().isoformat() } except Exception as e: raise HTTPException(500, f"Error: {str(e)}") # ════════════════════════════════════════════════════════════════════════════ # INTELLIGENCE ENDPOINTS # ════════════════════════════════════════════════════════════════════════════ @app.get("/api/weather/{farmer_id}", tags=["Intelligence"]) async def get_weather_intel(farmer_id: str): """Get weather intelligence for farmer's location""" try: with open("dashboard_output.json", "r") as f: dashboard = json.load(f) weather = dashboard.get("weather_intelligence", {}) return { "success": True, "farmer_id": farmer_id, "weather": weather, "timestamp": datetime.now().isoformat() } except: raise HTTPException(500, "Weather data unavailable") @app.get("/api/pest/{farmer_id}", tags=["Intelligence"]) async def get_pest_intel(farmer_id: str): """Get pest prediction and risk assessment""" try: with open("dashboard_output.json", "r") as f: dashboard = json.load(f) pest = dashboard.get("pest_intelligence", {}) pest_table = pest.get("pest_prediction_table", []) critical_pests = [p for p in pest_table if p.get("severity", "").upper() in ["CRITICAL", "HIGH"]] return { "success": True, "farmer_id": farmer_id, "total_pests_detected": len(pest_table), "critical_pests": len(critical_pests), "pest_intelligence": pest, "timestamp": datetime.now().isoformat() } except: raise HTTPException(500, "Pest data unavailable") @app.get("/api/water/{farmer_id}", tags=["Intelligence"]) async def get_water_intel(farmer_id: str): """Get irrigation requirements and water management insights""" try: with open("dashboard_output.json", "r") as f: dashboard = json.load(f) water = dashboard.get("water_intelligence", {}) return { "success": True, "farmer_id": farmer_id, "water_requirement": water, "irrigation_urgency": "CRITICAL" if water.get("irrigation_minutes", 0) > 40 else "HIGH" if water.get("irrigation_minutes", 0) > 20 else "NORMAL", "timestamp": datetime.now().isoformat() } except: raise HTTPException(500, "Water data unavailable") # ════════════════════════════════════════════════════════════════════════════ # ALERT SUBSCRIPTIONS # ════════════════════════════════════════════════════════════════════════════ @app.post("/api/subscribe/{farmer_id}/{telegram_chat_id}", tags=["Subscriptions"]) async def subscribe_telegram(farmer_id: str, telegram_chat_id: str): """Subscribe farmer to Telegram alerts""" try: if db.save_subscription(farmer_id, telegram_chat_id): logger.info(f"✅ Farmer {farmer_id} subscribed to Telegram alerts") return { "success": True, "message": "Subscribed to Telegram alerts", "farmer_id": farmer_id, "telegram_chat_id": telegram_chat_id, "timestamp": datetime.now().isoformat() } else: raise HTTPException(500, "Subscription failed") except Exception as e: raise HTTPException(500, f"Error: {str(e)}") @app.get("/api/subscriptions", tags=["Subscriptions"]) async def get_active_subscriptions(): """Get all active subscriptions (admin)""" subscriptions = db.get_active_subscriptions() return { "success": True, "active_subscriptions": len(subscriptions), "subscriptions": subscriptions, "timestamp": datetime.now().isoformat() } # ════════════════════════════════════════════════════════════════════════════ # WEBSOCKET FOR REAL-TIME ALERTS # ════════════════════════════════════════════════════════════════════════════ class ConnectionManager: """WebSocket connection manager for real-time alerts""" def __init__(self): self.active_connections: dict = {} async def connect(self, farmer_id: str, websocket: WebSocket): await websocket.accept() if farmer_id not in self.active_connections: self.active_connections[farmer_id] = [] self.active_connections[farmer_id].append(websocket) logger.info(f"✅ WebSocket connected for farmer {farmer_id}") async def disconnect(self, farmer_id: str, websocket: WebSocket): if farmer_id in self.active_connections: self.active_connections[farmer_id].remove(websocket) if not self.active_connections[farmer_id]: del self.active_connections[farmer_id] logger.info(f"❌ WebSocket disconnected for farmer {farmer_id}") async def broadcast(self, farmer_id: str, message: dict): if farmer_id in self.active_connections: for connection in self.active_connections[farmer_id]: try: await connection.send_json(message) except: pass manager = ConnectionManager() @app.websocket("/ws/alerts/{farmer_id}") async def websocket_alerts(websocket: WebSocket, farmer_id: str): """WebSocket endpoint for real-time farm alerts""" await manager.connect(farmer_id, websocket) try: while True: # Check for alerts periodically alerts = db.get_alerts_for_farmer(farmer_id, limit=10) critical = [a for a in alerts if a.get("severity") == "CRITICAL"] if critical: await websocket.send_json({ "type": "alert", "farmer_id": farmer_id, "critical_alerts": critical, "timestamp": datetime.now().isoformat() }) # Wait before next check await asyncio.sleep(30) except WebSocketDisconnect: await manager.disconnect(farmer_id, websocket) # ════════════════════════════════════════════════════════════════════════════ # STATISTICS & ANALYTICS # ════════════════════════════════════════════════════════════════════════════ @app.get("/api/stats/alerts/{farmer_id}", tags=["Analytics"]) async def get_alert_stats(farmer_id: str, days: int = Query(7, ge=1, le=365)): """Get alert statistics for a farmer""" try: alerts = db.get_alerts_for_farmer(farmer_id, limit=1000) # Filter by date cutoff = datetime.now() - timedelta(days=days) recent_alerts = [a for a in alerts if datetime.fromisoformat(a.get("timestamp", "")) > cutoff] stats = { "period_days": days, "total_alerts": len(recent_alerts), "critical": len([a for a in recent_alerts if a.get("severity") == "CRITICAL"]), "high": len([a for a in recent_alerts if a.get("severity") == "HIGH"]), "medium": len([a for a in recent_alerts if a.get("severity") == "MEDIUM"]), "low": len([a for a in recent_alerts if a.get("severity") == "LOW"]), "by_category": {} } # Count by category for alert in recent_alerts: category = alert.get("type", "other") stats["by_category"][category] = stats["by_category"].get(category, 0) + 1 return { "success": True, "farmer_id": farmer_id, "statistics": stats, "timestamp": datetime.now().isoformat() } except Exception as e: raise HTTPException(500, f"Stats error: {str(e)}") # ════════════════════════════════════════════════════════════════════════════ # DOCUMENTATION # ════════════════════════════════════════════════════════════════════════════ @app.get("/", tags=["Documentation"]) async def root(): """Welcome endpoint with API documentation""" return { "platform": "🌾 Climate-Resilient Agriculture Platform", "version": "1.0.0", "description": "Real-time farm intelligence with AI-powered alerts", "documentation": "/docs", "openapi": "/openapi.json", "key_features": [ "Real-time weather monitoring", "AI-powered pest prediction", "Smart irrigation recommendations", "Climate resilience scoring", "Telegram bot integration", "WebSocket for live alerts", "Sustainability insights" ], "quick_start": { "1_register_farmer": "POST /api/farmers/register", "2_get_dashboard": "GET /api/dashboard/{farmer_id}", "3_view_alerts": "GET /api/alerts/{farmer_id}", "4_subscribe_telegram": "POST /api/subscribe/{farmer_id}/{chat_id}" }, "api_endpoints": 18, "status": "🟢 OPERATIONAL" } # ════════════════════════════════════════════════════════════════════════════ # MAIN ENTRY # ════════════════════════════════════════════════════════════════════════════ if __name__ == "__main__": import uvicorn logger.info("🌾 Climate-Resilient Agriculture Platform Starting...") logger.info("📚 API Documentation: http://localhost:8003/docs") uvicorn.run( app, host="0.0.0.0", port=8003, log_level="info" )