tele_bot / api_server.py
PRC142004's picture
Upload 20 files
6a6337e verified
"""
Professional REST API Server for Climate-Resilient Agriculture Platform
=========================================================================
Handles farmer registration, retrieving alerts, and comprehensive farm data
"""
import json
import os
from datetime import datetime, timedelta
from typing import List, Optional
from fastapi import FastAPI, HTTPException, Query, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel
import logging
import asyncio
from farm_controller import run_farm_dashboard
from database import db
from models import FarmerProfile, FarmAlert, AlertSeverity
#SETUP
logger = logging.getLogger("FarmAPI")
app = FastAPI(
title="🌾 Climate-Resilient Agriculture Platform",
description="Professional farm intelligence platform with real-time alerts",
version="1.0.0"
)
# CORS Configuration
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ════════════════════════════════════════════════════════════════════════════
# DATA MODELS
# ════════════════════════════════════════════════════════════════════════════
class FarmerRegistration(BaseModel):
"""Farmer registration schema"""
farmer_id: str
name: str
phone: str
village: str
latitude: float
longitude: float
crop_type: str
soil_type: str
motor_capacity: float = 10.0
farm_size_hectares: float = 1.0
telegram_chat_id: Optional[str] = None
class AlertQueryParams(BaseModel):
"""Alert query parameters"""
farmer_id: str
limit: int = 50
severity: Optional[str] = None
category: Optional[str] = None
# ════════════════════════════════════════════════════════════════════════════
# HEALTH & STATUS
# ════════════════════════════════════════════════════════════════════════════
@app.get("/health", tags=["System"])
async def health_check():
"""System health check endpoint with live connectivity testing"""
import httpx
# Internal endpoints
SERVICES = {
"weather_api": os.getenv("WEATHER_API_URL", "http://127.0.0.1:8001/weather"),
"pest_api": os.getenv("PEST_API_URL", "http://127.0.0.1:8000/api/predict"),
"water_api": os.getenv("WATER_API_URL", "http://127.0.0.1:8002/predict")
}
health_results = {}
async with httpx.AsyncClient() as client:
# Check API
health_results["api"] = "🟒 ONLINE"
# Check Database
try:
db.get_all_farmers()
health_results["database"] = "🟒 ONLINE"
except:
health_results["database"] = "πŸ”΄ ERROR"
# Check sub-services (simple TCP/HTTP check)
for name, url in SERVICES.items():
try:
# Just check if Port is at least open/listening
import socket
from urllib.parse import urlparse
parsed_url = urlparse(url)
host, port = parsed_url.hostname or "127.0.0.1", parsed_url.port or 80
with socket.create_connection((host, port), timeout=1.0):
health_results[name] = "🟒 ONLINE"
except:
health_results[name] = "πŸ”΄ OFFLINE"
# Overall Status
total_services = len(health_results)
online_services = sum(1 for status in health_results.values() if "ONLINE" in str(status))
status = "🟒 OPERATIONAL" if online_services == total_services else "🟑 PARTIAL" if online_services > 1 else "πŸ”΄ CRITICAL"
return {
"status": status,
"timestamp": datetime.now().isoformat(),
"services": health_results,
"online_count": f"{online_services}/{total_services}",
"message": "All systems operational" if status == "🟒 OPERATIONAL" else "Some background services are offline"
}
@app.get("/status", tags=["System"])
async def platform_status():
"""Get platform status"""
farmers = db.get_all_farmers()
subscriptions = db.get_active_subscriptions()
return {
"platform": "Climate-Resilient Agriculture Platform",
"version": "1.0.0",
"status": "ACTIVE",
"registered_farmers": len(farmers),
"active_subscriptions": len(subscriptions),
"api_endpoints": 8,
"database_status": "Healthy",
"last_update": datetime.now().isoformat()
}
# ════════════════════════════════════════════════════════════════════════════
# FARMER MANAGEMENT
# ════════════════════════════════════════════════════════════════════════════
@app.post("/api/farmers/register", tags=["Farmer Management"])
async def register_farmer(farmer: FarmerRegistration):
"""Register a new farmer or update existing profile"""
try:
farmer_dict = farmer.dict()
farmer_dict["registered_at"] = datetime.now().isoformat()
farmer_dict["threshold_settings"] = {
"temp_high": 40,
"temp_low": 5,
"wind_speed": 30,
"wind_alert": 15,
"rain_threshold": 10
}
farmer_dict["alert_preferences"] = {
"weather": True,
"pest": True,
"irrigation": True,
"sustainability": True,
"climate_warning": True
}
if db.save_farmer(farmer_dict):
logger.info(f"βœ… Farmer registered: {farmer.name}")
return {
"success": True,
"message": f"Farmer {farmer.name} registered successfully",
"farmer_id": farmer.farmer_id,
"timestamp": datetime.now().isoformat()
}
else:
raise HTTPException(500, "Failed to save farmer profile")
except Exception as e:
logger.error(f"Registration failed: {e}")
raise HTTPException(500, f"Registration error: {str(e)}")
@app.get("/api/farmers/{farmer_id}", tags=["Farmer Management"])
async def get_farmer(farmer_id: str):
"""Get farmer profile"""
farmer = db.get_farmer(farmer_id)
if not farmer:
raise HTTPException(404, "Farmer not found")
return {
"success": True,
"farmer": farmer,
"timestamp": datetime.now().isoformat()
}
@app.get("/api/farmers", tags=["Farmer Management"])
async def list_all_farmers():
"""Get all registered farmers (admin)"""
farmers = db.get_all_farmers()
return {
"success": True,
"total_farmers": len(farmers),
"farmers": farmers,
"timestamp": datetime.now().isoformat()
}
# ════════════════════════════════════════════════════════════════════════════
# FARM INTELLIGENCE & ALERTS
# ════════════════════════════════════════════════════════════════════════════
@app.get("/api/dashboard/{farmer_id}", tags=["Farm Intelligence"])
async def get_farm_dashboard(farmer_id: str):
"""Get complete farm dashboard with all intelligence"""
try:
# Load or generate dashboard
dashboard_file = "dashboard_output.json"
if os.path.exists(dashboard_file):
with open(dashboard_file, "r") as f:
dashboard = json.load(f)
else:
dashboard = await run_farm_dashboard()
if not dashboard:
raise HTTPException(500, "Failed to generate dashboard")
return {
"success": True,
"dashboard": dashboard,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Dashboard error: {e}")
raise HTTPException(500, f"Dashboard error: {str(e)}")
@app.get("/api/alerts/{farmer_id}", tags=["Alerts"])
async def get_alerts(
farmer_id: str,
limit: int = Query(50, ge=1, le=500),
severity: Optional[str] = None,
category: Optional[str] = None
):
"""Get alerts for a farmer with optional filtering"""
try:
alerts = db.get_alerts_for_farmer(farmer_id, limit=limit)
# Filter by severity
if severity:
alerts = [a for a in alerts if a.get("severity") == severity]
# Filter by category
if category:
alerts = [a for a in alerts if a.get("type") == category]
# Count by severity
critical_count = len([a for a in alerts if a.get("severity") == "CRITICAL"])
high_count = len([a for a in alerts if a.get("severity") == "HIGH"])
return {
"success": True,
"farmer_id": farmer_id,
"total_alerts": len(alerts),
"critical_count": critical_count,
"high_count": high_count,
"alerts": alerts[:limit],
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Alert query failed: {e}")
raise HTTPException(500, f"Alert query failed: {str(e)}")
@app.get("/api/critical-alerts/{farmer_id}", tags=["Alerts"])
async def get_critical_alerts(farmer_id: str):
"""Get only critical and high priority alerts"""
try:
alerts = db.get_alerts_for_farmer(farmer_id, limit=100)
critical_alerts = [a for a in alerts if a.get("severity") in ["CRITICAL", "HIGH"]]
return {
"success": True,
"farmer_id": farmer_id,
"critical_alerts_count": len(critical_alerts),
"alerts": critical_alerts,
"action_required": len(critical_alerts) > 0,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
raise HTTPException(500, f"Error: {str(e)}")
# ════════════════════════════════════════════════════════════════════════════
# INTELLIGENCE ENDPOINTS
# ════════════════════════════════════════════════════════════════════════════
@app.get("/api/weather/{farmer_id}", tags=["Intelligence"])
async def get_weather_intel(farmer_id: str):
"""Get weather intelligence for farmer's location"""
try:
with open("dashboard_output.json", "r") as f:
dashboard = json.load(f)
weather = dashboard.get("weather_intelligence", {})
return {
"success": True,
"farmer_id": farmer_id,
"weather": weather,
"timestamp": datetime.now().isoformat()
}
except:
raise HTTPException(500, "Weather data unavailable")
@app.get("/api/pest/{farmer_id}", tags=["Intelligence"])
async def get_pest_intel(farmer_id: str):
"""Get pest prediction and risk assessment"""
try:
with open("dashboard_output.json", "r") as f:
dashboard = json.load(f)
pest = dashboard.get("pest_intelligence", {})
pest_table = pest.get("pest_prediction_table", [])
critical_pests = [p for p in pest_table if p.get("severity", "").upper() in ["CRITICAL", "HIGH"]]
return {
"success": True,
"farmer_id": farmer_id,
"total_pests_detected": len(pest_table),
"critical_pests": len(critical_pests),
"pest_intelligence": pest,
"timestamp": datetime.now().isoformat()
}
except:
raise HTTPException(500, "Pest data unavailable")
@app.get("/api/water/{farmer_id}", tags=["Intelligence"])
async def get_water_intel(farmer_id: str):
"""Get irrigation requirements and water management insights"""
try:
with open("dashboard_output.json", "r") as f:
dashboard = json.load(f)
water = dashboard.get("water_intelligence", {})
return {
"success": True,
"farmer_id": farmer_id,
"water_requirement": water,
"irrigation_urgency": "CRITICAL" if water.get("irrigation_minutes", 0) > 40 else "HIGH" if water.get("irrigation_minutes", 0) > 20 else "NORMAL",
"timestamp": datetime.now().isoformat()
}
except:
raise HTTPException(500, "Water data unavailable")
# ════════════════════════════════════════════════════════════════════════════
# ALERT SUBSCRIPTIONS
# ════════════════════════════════════════════════════════════════════════════
@app.post("/api/subscribe/{farmer_id}/{telegram_chat_id}", tags=["Subscriptions"])
async def subscribe_telegram(farmer_id: str, telegram_chat_id: str):
"""Subscribe farmer to Telegram alerts"""
try:
if db.save_subscription(farmer_id, telegram_chat_id):
logger.info(f"βœ… Farmer {farmer_id} subscribed to Telegram alerts")
return {
"success": True,
"message": "Subscribed to Telegram alerts",
"farmer_id": farmer_id,
"telegram_chat_id": telegram_chat_id,
"timestamp": datetime.now().isoformat()
}
else:
raise HTTPException(500, "Subscription failed")
except Exception as e:
raise HTTPException(500, f"Error: {str(e)}")
@app.get("/api/subscriptions", tags=["Subscriptions"])
async def get_active_subscriptions():
"""Get all active subscriptions (admin)"""
subscriptions = db.get_active_subscriptions()
return {
"success": True,
"active_subscriptions": len(subscriptions),
"subscriptions": subscriptions,
"timestamp": datetime.now().isoformat()
}
# ════════════════════════════════════════════════════════════════════════════
# WEBSOCKET FOR REAL-TIME ALERTS
# ════════════════════════════════════════════════════════════════════════════
class ConnectionManager:
"""WebSocket connection manager for real-time alerts"""
def __init__(self):
self.active_connections: dict = {}
async def connect(self, farmer_id: str, websocket: WebSocket):
await websocket.accept()
if farmer_id not in self.active_connections:
self.active_connections[farmer_id] = []
self.active_connections[farmer_id].append(websocket)
logger.info(f"βœ… WebSocket connected for farmer {farmer_id}")
async def disconnect(self, farmer_id: str, websocket: WebSocket):
if farmer_id in self.active_connections:
self.active_connections[farmer_id].remove(websocket)
if not self.active_connections[farmer_id]:
del self.active_connections[farmer_id]
logger.info(f"❌ WebSocket disconnected for farmer {farmer_id}")
async def broadcast(self, farmer_id: str, message: dict):
if farmer_id in self.active_connections:
for connection in self.active_connections[farmer_id]:
try:
await connection.send_json(message)
except:
pass
manager = ConnectionManager()
@app.websocket("/ws/alerts/{farmer_id}")
async def websocket_alerts(websocket: WebSocket, farmer_id: str):
"""WebSocket endpoint for real-time farm alerts"""
await manager.connect(farmer_id, websocket)
try:
while True:
# Check for alerts periodically
alerts = db.get_alerts_for_farmer(farmer_id, limit=10)
critical = [a for a in alerts if a.get("severity") == "CRITICAL"]
if critical:
await websocket.send_json({
"type": "alert",
"farmer_id": farmer_id,
"critical_alerts": critical,
"timestamp": datetime.now().isoformat()
})
# Wait before next check
await asyncio.sleep(30)
except WebSocketDisconnect:
await manager.disconnect(farmer_id, websocket)
# ════════════════════════════════════════════════════════════════════════════
# STATISTICS & ANALYTICS
# ════════════════════════════════════════════════════════════════════════════
@app.get("/api/stats/alerts/{farmer_id}", tags=["Analytics"])
async def get_alert_stats(farmer_id: str, days: int = Query(7, ge=1, le=365)):
"""Get alert statistics for a farmer"""
try:
alerts = db.get_alerts_for_farmer(farmer_id, limit=1000)
# Filter by date
cutoff = datetime.now() - timedelta(days=days)
recent_alerts = [a for a in alerts if datetime.fromisoformat(a.get("timestamp", "")) > cutoff]
stats = {
"period_days": days,
"total_alerts": len(recent_alerts),
"critical": len([a for a in recent_alerts if a.get("severity") == "CRITICAL"]),
"high": len([a for a in recent_alerts if a.get("severity") == "HIGH"]),
"medium": len([a for a in recent_alerts if a.get("severity") == "MEDIUM"]),
"low": len([a for a in recent_alerts if a.get("severity") == "LOW"]),
"by_category": {}
}
# Count by category
for alert in recent_alerts:
category = alert.get("type", "other")
stats["by_category"][category] = stats["by_category"].get(category, 0) + 1
return {
"success": True,
"farmer_id": farmer_id,
"statistics": stats,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
raise HTTPException(500, f"Stats error: {str(e)}")
# ════════════════════════════════════════════════════════════════════════════
# DOCUMENTATION
# ════════════════════════════════════════════════════════════════════════════
@app.get("/", tags=["Documentation"])
async def root():
"""Welcome endpoint with API documentation"""
return {
"platform": "🌾 Climate-Resilient Agriculture Platform",
"version": "1.0.0",
"description": "Real-time farm intelligence with AI-powered alerts",
"documentation": "/docs",
"openapi": "/openapi.json",
"key_features": [
"Real-time weather monitoring",
"AI-powered pest prediction",
"Smart irrigation recommendations",
"Climate resilience scoring",
"Telegram bot integration",
"WebSocket for live alerts",
"Sustainability insights"
],
"quick_start": {
"1_register_farmer": "POST /api/farmers/register",
"2_get_dashboard": "GET /api/dashboard/{farmer_id}",
"3_view_alerts": "GET /api/alerts/{farmer_id}",
"4_subscribe_telegram": "POST /api/subscribe/{farmer_id}/{chat_id}"
},
"api_endpoints": 18,
"status": "🟒 OPERATIONAL"
}
# ════════════════════════════════════════════════════════════════════════════
# MAIN ENTRY
# ════════════════════════════════════════════════════════════════════════════
if __name__ == "__main__":
import uvicorn
logger.info("🌾 Climate-Resilient Agriculture Platform Starting...")
logger.info("πŸ“š API Documentation: http://localhost:8003/docs")
uvicorn.run(
app,
host="0.0.0.0",
port=8003,
log_level="info"
)