| """
|
| Professional REST API Server for Climate-Resilient Agriculture Platform
|
| =========================================================================
|
| Handles farmer registration, retrieving alerts, and comprehensive farm data
|
| """
|
|
|
| import json
|
| import os
|
| from datetime import datetime, timedelta
|
| from typing import List, Optional
|
| from fastapi import FastAPI, HTTPException, Query, WebSocket, WebSocketDisconnect
|
| from fastapi.middleware.cors import CORSMiddleware
|
| from fastapi.responses import JSONResponse
|
| from pydantic import BaseModel
|
| import logging
|
| import asyncio
|
| from farm_controller import run_farm_dashboard
|
| from database import db
|
| from models import FarmerProfile, FarmAlert, AlertSeverity
|
|
|
|
|
| logger = logging.getLogger("FarmAPI")
|
|
|
| app = FastAPI(
|
| title="πΎ Climate-Resilient Agriculture Platform",
|
| description="Professional farm intelligence platform with real-time alerts",
|
| version="1.0.0"
|
| )
|
|
|
|
|
| app.add_middleware(
|
| CORSMiddleware,
|
| allow_origins=["*"],
|
| allow_credentials=True,
|
| allow_methods=["*"],
|
| allow_headers=["*"],
|
| )
|
|
|
|
|
|
|
|
|
|
|
| class FarmerRegistration(BaseModel):
|
| """Farmer registration schema"""
|
| farmer_id: str
|
| name: str
|
| phone: str
|
| village: str
|
| latitude: float
|
| longitude: float
|
| crop_type: str
|
| soil_type: str
|
| motor_capacity: float = 10.0
|
| farm_size_hectares: float = 1.0
|
| telegram_chat_id: Optional[str] = None
|
|
|
| class AlertQueryParams(BaseModel):
|
| """Alert query parameters"""
|
| farmer_id: str
|
| limit: int = 50
|
| severity: Optional[str] = None
|
| category: Optional[str] = None
|
|
|
|
|
|
|
|
|
|
|
| @app.get("/health", tags=["System"])
|
| async def health_check():
|
| """System health check endpoint with live connectivity testing"""
|
| import httpx
|
|
|
|
|
| SERVICES = {
|
| "weather_api": os.getenv("WEATHER_API_URL", "http://127.0.0.1:8001/weather"),
|
| "pest_api": os.getenv("PEST_API_URL", "http://127.0.0.1:8000/api/predict"),
|
| "water_api": os.getenv("WATER_API_URL", "http://127.0.0.1:8002/predict")
|
| }
|
|
|
| health_results = {}
|
|
|
| async with httpx.AsyncClient() as client:
|
|
|
| health_results["api"] = "π’ ONLINE"
|
|
|
|
|
| try:
|
| db.get_all_farmers()
|
| health_results["database"] = "π’ ONLINE"
|
| except:
|
| health_results["database"] = "π΄ ERROR"
|
|
|
|
|
| for name, url in SERVICES.items():
|
| try:
|
|
|
| import socket
|
| from urllib.parse import urlparse
|
| parsed_url = urlparse(url)
|
| host, port = parsed_url.hostname or "127.0.0.1", parsed_url.port or 80
|
|
|
| with socket.create_connection((host, port), timeout=1.0):
|
| health_results[name] = "π’ ONLINE"
|
| except:
|
| health_results[name] = "π΄ OFFLINE"
|
|
|
|
|
| total_services = len(health_results)
|
| online_services = sum(1 for status in health_results.values() if "ONLINE" in str(status))
|
|
|
| status = "π’ OPERATIONAL" if online_services == total_services else "π‘ PARTIAL" if online_services > 1 else "π΄ CRITICAL"
|
|
|
| return {
|
| "status": status,
|
| "timestamp": datetime.now().isoformat(),
|
| "services": health_results,
|
| "online_count": f"{online_services}/{total_services}",
|
| "message": "All systems operational" if status == "π’ OPERATIONAL" else "Some background services are offline"
|
| }
|
|
|
| @app.get("/status", tags=["System"])
|
| async def platform_status():
|
| """Get platform status"""
|
| farmers = db.get_all_farmers()
|
| subscriptions = db.get_active_subscriptions()
|
|
|
| return {
|
| "platform": "Climate-Resilient Agriculture Platform",
|
| "version": "1.0.0",
|
| "status": "ACTIVE",
|
| "registered_farmers": len(farmers),
|
| "active_subscriptions": len(subscriptions),
|
| "api_endpoints": 8,
|
| "database_status": "Healthy",
|
| "last_update": datetime.now().isoformat()
|
| }
|
|
|
|
|
|
|
|
|
|
|
| @app.post("/api/farmers/register", tags=["Farmer Management"])
|
| async def register_farmer(farmer: FarmerRegistration):
|
| """Register a new farmer or update existing profile"""
|
| try:
|
| farmer_dict = farmer.dict()
|
| farmer_dict["registered_at"] = datetime.now().isoformat()
|
| farmer_dict["threshold_settings"] = {
|
| "temp_high": 40,
|
| "temp_low": 5,
|
| "wind_speed": 30,
|
| "wind_alert": 15,
|
| "rain_threshold": 10
|
| }
|
| farmer_dict["alert_preferences"] = {
|
| "weather": True,
|
| "pest": True,
|
| "irrigation": True,
|
| "sustainability": True,
|
| "climate_warning": True
|
| }
|
|
|
| if db.save_farmer(farmer_dict):
|
| logger.info(f"β
Farmer registered: {farmer.name}")
|
| return {
|
| "success": True,
|
| "message": f"Farmer {farmer.name} registered successfully",
|
| "farmer_id": farmer.farmer_id,
|
| "timestamp": datetime.now().isoformat()
|
| }
|
| else:
|
| raise HTTPException(500, "Failed to save farmer profile")
|
|
|
| except Exception as e:
|
| logger.error(f"Registration failed: {e}")
|
| raise HTTPException(500, f"Registration error: {str(e)}")
|
|
|
| @app.get("/api/farmers/{farmer_id}", tags=["Farmer Management"])
|
| async def get_farmer(farmer_id: str):
|
| """Get farmer profile"""
|
| farmer = db.get_farmer(farmer_id)
|
| if not farmer:
|
| raise HTTPException(404, "Farmer not found")
|
|
|
| return {
|
| "success": True,
|
| "farmer": farmer,
|
| "timestamp": datetime.now().isoformat()
|
| }
|
|
|
| @app.get("/api/farmers", tags=["Farmer Management"])
|
| async def list_all_farmers():
|
| """Get all registered farmers (admin)"""
|
| farmers = db.get_all_farmers()
|
| return {
|
| "success": True,
|
| "total_farmers": len(farmers),
|
| "farmers": farmers,
|
| "timestamp": datetime.now().isoformat()
|
| }
|
|
|
|
|
|
|
|
|
|
|
| @app.get("/api/dashboard/{farmer_id}", tags=["Farm Intelligence"])
|
| async def get_farm_dashboard(farmer_id: str):
|
| """Get complete farm dashboard with all intelligence"""
|
| try:
|
|
|
| dashboard_file = "dashboard_output.json"
|
| if os.path.exists(dashboard_file):
|
| with open(dashboard_file, "r") as f:
|
| dashboard = json.load(f)
|
| else:
|
| dashboard = await run_farm_dashboard()
|
|
|
| if not dashboard:
|
| raise HTTPException(500, "Failed to generate dashboard")
|
|
|
| return {
|
| "success": True,
|
| "dashboard": dashboard,
|
| "timestamp": datetime.now().isoformat()
|
| }
|
|
|
| except Exception as e:
|
| logger.error(f"Dashboard error: {e}")
|
| raise HTTPException(500, f"Dashboard error: {str(e)}")
|
|
|
| @app.get("/api/alerts/{farmer_id}", tags=["Alerts"])
|
| async def get_alerts(
|
| farmer_id: str,
|
| limit: int = Query(50, ge=1, le=500),
|
| severity: Optional[str] = None,
|
| category: Optional[str] = None
|
| ):
|
| """Get alerts for a farmer with optional filtering"""
|
| try:
|
| alerts = db.get_alerts_for_farmer(farmer_id, limit=limit)
|
|
|
|
|
| if severity:
|
| alerts = [a for a in alerts if a.get("severity") == severity]
|
|
|
|
|
| if category:
|
| alerts = [a for a in alerts if a.get("type") == category]
|
|
|
|
|
| critical_count = len([a for a in alerts if a.get("severity") == "CRITICAL"])
|
| high_count = len([a for a in alerts if a.get("severity") == "HIGH"])
|
|
|
| return {
|
| "success": True,
|
| "farmer_id": farmer_id,
|
| "total_alerts": len(alerts),
|
| "critical_count": critical_count,
|
| "high_count": high_count,
|
| "alerts": alerts[:limit],
|
| "timestamp": datetime.now().isoformat()
|
| }
|
|
|
| except Exception as e:
|
| logger.error(f"Alert query failed: {e}")
|
| raise HTTPException(500, f"Alert query failed: {str(e)}")
|
|
|
| @app.get("/api/critical-alerts/{farmer_id}", tags=["Alerts"])
|
| async def get_critical_alerts(farmer_id: str):
|
| """Get only critical and high priority alerts"""
|
| try:
|
| alerts = db.get_alerts_for_farmer(farmer_id, limit=100)
|
| critical_alerts = [a for a in alerts if a.get("severity") in ["CRITICAL", "HIGH"]]
|
|
|
| return {
|
| "success": True,
|
| "farmer_id": farmer_id,
|
| "critical_alerts_count": len(critical_alerts),
|
| "alerts": critical_alerts,
|
| "action_required": len(critical_alerts) > 0,
|
| "timestamp": datetime.now().isoformat()
|
| }
|
|
|
| except Exception as e:
|
| raise HTTPException(500, f"Error: {str(e)}")
|
|
|
|
|
|
|
|
|
|
|
| @app.get("/api/weather/{farmer_id}", tags=["Intelligence"])
|
| async def get_weather_intel(farmer_id: str):
|
| """Get weather intelligence for farmer's location"""
|
| try:
|
| with open("dashboard_output.json", "r") as f:
|
| dashboard = json.load(f)
|
|
|
| weather = dashboard.get("weather_intelligence", {})
|
| return {
|
| "success": True,
|
| "farmer_id": farmer_id,
|
| "weather": weather,
|
| "timestamp": datetime.now().isoformat()
|
| }
|
| except:
|
| raise HTTPException(500, "Weather data unavailable")
|
|
|
| @app.get("/api/pest/{farmer_id}", tags=["Intelligence"])
|
| async def get_pest_intel(farmer_id: str):
|
| """Get pest prediction and risk assessment"""
|
| try:
|
| with open("dashboard_output.json", "r") as f:
|
| dashboard = json.load(f)
|
|
|
| pest = dashboard.get("pest_intelligence", {})
|
| pest_table = pest.get("pest_prediction_table", [])
|
|
|
| critical_pests = [p for p in pest_table if p.get("severity", "").upper() in ["CRITICAL", "HIGH"]]
|
|
|
| return {
|
| "success": True,
|
| "farmer_id": farmer_id,
|
| "total_pests_detected": len(pest_table),
|
| "critical_pests": len(critical_pests),
|
| "pest_intelligence": pest,
|
| "timestamp": datetime.now().isoformat()
|
| }
|
| except:
|
| raise HTTPException(500, "Pest data unavailable")
|
|
|
| @app.get("/api/water/{farmer_id}", tags=["Intelligence"])
|
| async def get_water_intel(farmer_id: str):
|
| """Get irrigation requirements and water management insights"""
|
| try:
|
| with open("dashboard_output.json", "r") as f:
|
| dashboard = json.load(f)
|
|
|
| water = dashboard.get("water_intelligence", {})
|
|
|
| return {
|
| "success": True,
|
| "farmer_id": farmer_id,
|
| "water_requirement": water,
|
| "irrigation_urgency": "CRITICAL" if water.get("irrigation_minutes", 0) > 40 else "HIGH" if water.get("irrigation_minutes", 0) > 20 else "NORMAL",
|
| "timestamp": datetime.now().isoformat()
|
| }
|
| except:
|
| raise HTTPException(500, "Water data unavailable")
|
|
|
|
|
|
|
|
|
|
|
| @app.post("/api/subscribe/{farmer_id}/{telegram_chat_id}", tags=["Subscriptions"])
|
| async def subscribe_telegram(farmer_id: str, telegram_chat_id: str):
|
| """Subscribe farmer to Telegram alerts"""
|
| try:
|
| if db.save_subscription(farmer_id, telegram_chat_id):
|
| logger.info(f"β
Farmer {farmer_id} subscribed to Telegram alerts")
|
| return {
|
| "success": True,
|
| "message": "Subscribed to Telegram alerts",
|
| "farmer_id": farmer_id,
|
| "telegram_chat_id": telegram_chat_id,
|
| "timestamp": datetime.now().isoformat()
|
| }
|
| else:
|
| raise HTTPException(500, "Subscription failed")
|
|
|
| except Exception as e:
|
| raise HTTPException(500, f"Error: {str(e)}")
|
|
|
| @app.get("/api/subscriptions", tags=["Subscriptions"])
|
| async def get_active_subscriptions():
|
| """Get all active subscriptions (admin)"""
|
| subscriptions = db.get_active_subscriptions()
|
| return {
|
| "success": True,
|
| "active_subscriptions": len(subscriptions),
|
| "subscriptions": subscriptions,
|
| "timestamp": datetime.now().isoformat()
|
| }
|
|
|
|
|
|
|
|
|
|
|
| class ConnectionManager:
|
| """WebSocket connection manager for real-time alerts"""
|
| def __init__(self):
|
| self.active_connections: dict = {}
|
|
|
| async def connect(self, farmer_id: str, websocket: WebSocket):
|
| await websocket.accept()
|
| if farmer_id not in self.active_connections:
|
| self.active_connections[farmer_id] = []
|
| self.active_connections[farmer_id].append(websocket)
|
| logger.info(f"β
WebSocket connected for farmer {farmer_id}")
|
|
|
| async def disconnect(self, farmer_id: str, websocket: WebSocket):
|
| if farmer_id in self.active_connections:
|
| self.active_connections[farmer_id].remove(websocket)
|
| if not self.active_connections[farmer_id]:
|
| del self.active_connections[farmer_id]
|
| logger.info(f"β WebSocket disconnected for farmer {farmer_id}")
|
|
|
| async def broadcast(self, farmer_id: str, message: dict):
|
| if farmer_id in self.active_connections:
|
| for connection in self.active_connections[farmer_id]:
|
| try:
|
| await connection.send_json(message)
|
| except:
|
| pass
|
|
|
| manager = ConnectionManager()
|
|
|
| @app.websocket("/ws/alerts/{farmer_id}")
|
| async def websocket_alerts(websocket: WebSocket, farmer_id: str):
|
| """WebSocket endpoint for real-time farm alerts"""
|
| await manager.connect(farmer_id, websocket)
|
|
|
| try:
|
| while True:
|
|
|
| alerts = db.get_alerts_for_farmer(farmer_id, limit=10)
|
| critical = [a for a in alerts if a.get("severity") == "CRITICAL"]
|
|
|
| if critical:
|
| await websocket.send_json({
|
| "type": "alert",
|
| "farmer_id": farmer_id,
|
| "critical_alerts": critical,
|
| "timestamp": datetime.now().isoformat()
|
| })
|
|
|
|
|
| await asyncio.sleep(30)
|
|
|
| except WebSocketDisconnect:
|
| await manager.disconnect(farmer_id, websocket)
|
|
|
|
|
|
|
|
|
|
|
| @app.get("/api/stats/alerts/{farmer_id}", tags=["Analytics"])
|
| async def get_alert_stats(farmer_id: str, days: int = Query(7, ge=1, le=365)):
|
| """Get alert statistics for a farmer"""
|
| try:
|
| alerts = db.get_alerts_for_farmer(farmer_id, limit=1000)
|
|
|
|
|
| cutoff = datetime.now() - timedelta(days=days)
|
| recent_alerts = [a for a in alerts if datetime.fromisoformat(a.get("timestamp", "")) > cutoff]
|
|
|
| stats = {
|
| "period_days": days,
|
| "total_alerts": len(recent_alerts),
|
| "critical": len([a for a in recent_alerts if a.get("severity") == "CRITICAL"]),
|
| "high": len([a for a in recent_alerts if a.get("severity") == "HIGH"]),
|
| "medium": len([a for a in recent_alerts if a.get("severity") == "MEDIUM"]),
|
| "low": len([a for a in recent_alerts if a.get("severity") == "LOW"]),
|
| "by_category": {}
|
| }
|
|
|
|
|
| for alert in recent_alerts:
|
| category = alert.get("type", "other")
|
| stats["by_category"][category] = stats["by_category"].get(category, 0) + 1
|
|
|
| return {
|
| "success": True,
|
| "farmer_id": farmer_id,
|
| "statistics": stats,
|
| "timestamp": datetime.now().isoformat()
|
| }
|
|
|
| except Exception as e:
|
| raise HTTPException(500, f"Stats error: {str(e)}")
|
|
|
|
|
|
|
|
|
|
|
| @app.get("/", tags=["Documentation"])
|
| async def root():
|
| """Welcome endpoint with API documentation"""
|
| return {
|
| "platform": "πΎ Climate-Resilient Agriculture Platform",
|
| "version": "1.0.0",
|
| "description": "Real-time farm intelligence with AI-powered alerts",
|
| "documentation": "/docs",
|
| "openapi": "/openapi.json",
|
| "key_features": [
|
| "Real-time weather monitoring",
|
| "AI-powered pest prediction",
|
| "Smart irrigation recommendations",
|
| "Climate resilience scoring",
|
| "Telegram bot integration",
|
| "WebSocket for live alerts",
|
| "Sustainability insights"
|
| ],
|
| "quick_start": {
|
| "1_register_farmer": "POST /api/farmers/register",
|
| "2_get_dashboard": "GET /api/dashboard/{farmer_id}",
|
| "3_view_alerts": "GET /api/alerts/{farmer_id}",
|
| "4_subscribe_telegram": "POST /api/subscribe/{farmer_id}/{chat_id}"
|
| },
|
| "api_endpoints": 18,
|
| "status": "π’ OPERATIONAL"
|
| }
|
|
|
|
|
|
|
|
|
|
|
| if __name__ == "__main__":
|
| import uvicorn
|
|
|
| logger.info("πΎ Climate-Resilient Agriculture Platform Starting...")
|
| logger.info("π API Documentation: http://localhost:8003/docs")
|
|
|
| uvicorn.run(
|
| app,
|
| host="0.0.0.0",
|
| port=8003,
|
| log_level="info"
|
| )
|
|
|