agentsay's picture
Update app.py
9b7c64f verified
from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import JSONResponse
from pydantic import BaseModel
import os
import requests
import time
import cloudinary
import cloudinary.utils
import engine
import config
import futureWeather
import warnings
import re
from geopy.geocoders import Nominatim
from geopy.exc import GeocoderTimedOut
import pandas as pd
# Load environment variables from .env file
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
print("Warning: python-dotenv not installed. Using system environment variables only.")
warnings.filterwarnings("ignore")
app = FastAPI()
# Configure Cloudinary using environment variables
cloudinary_config = {
'cloud_name': config.CLOUDINARY_CLOUD_NAME,
'api_key': config.CLOUDINARY_API_KEY,
'api_secret': config.CLOUDINARY_API_SECRET
}
# Validate that all required Cloudinary credentials are present
if not all(cloudinary_config.values()):
print("Warning: Some Cloudinary environment variables are missing!")
missing = [k for k, v in cloudinary_config.items() if not v]
print(f"Missing: {missing}")
cloudinary.config(**cloudinary_config)
# Ensure upload directory exists
UPLOAD_FOLDER = 'Uploads'
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
# Pydantic models for request validation
class ImageRequest(BaseModel):
publicId: str
fileType: str
originalName: str | None = None
class CropYieldRequest(BaseModel):
cropName: str
locationLat: float
locationLong: float
class WeatherPredictionRequest(BaseModel):
locationLat: float
locationLong: float
language: str
# Generate signed URL for Cloudinary
def get_signed_url(public_id: str, resource_type: str = 'image', expires_in: int = 300) -> str:
expires_at = int(time.time()) + expires_in
url, options = cloudinary.utils.cloudinary_url(
public_id,
resource_type=resource_type,
type="authenticated",
sign_url=True,
expires_at=expires_at
)
return url
# Download from Cloudinary and save to local file
def download_file(public_id: str, save_path: str, file_type: str = 'image/jpeg') -> bool:
resource_type = 'raw' if file_type == 'raw' else 'image'
url = get_signed_url(public_id, resource_type=resource_type)
response = requests.get(url, headers={'Content-Type': file_type})
if response.status_code == 200:
with open(save_path, 'wb') as f:
f.write(response.content)
return True
return False
# --- FastAPI Routes ---
@app.get("/")
async def root():
return {
"message": "Agrosure API is running!",
"status": "healthy",
"endpoints": {
"exif_metadata": "/api/exif_metadata",
"damage_detection": "/api/damage_detection",
"crop_type": "/api/crop_type",
"crop_yield_prediction": "/predictForCrop",
"weather_prediction": "/futureWeatherPrediction"
},
"docs": "/docs",
"redoc": "/redoc"
}
@app.post("/api/exif_metadata")
async def exif_metadata(image_request: ImageRequest):
filename = image_request.originalName or f"{image_request.publicId.split('/')[-1]}.jpg"
filepath = os.path.join(UPLOAD_FOLDER, filename)
if not download_file(image_request.publicId, filepath, image_request.fileType):
raise HTTPException(status_code=500, detail=f"Failed to download image from Cloudinary: {image_request.publicId}")
result = engine.get_exif_data(filepath)
os.remove(filepath)
return result
@app.post("/api/damage_detection")
async def damage_detection(image_request: ImageRequest):
print(f"Received damage detection request: {image_request}")
filename = image_request.originalName or f"{image_request.publicId.split('/')[-1]}.jpg"
filepath = os.path.join(UPLOAD_FOLDER, filename)
if not download_file(image_request.publicId, filepath, image_request.fileType):
raise HTTPException(status_code=500, detail=f"Failed to download image from Cloudinary: {image_request.publicId}")
result = engine.predict_damage(filepath)
os.remove(filepath)
return result
@app.post("/api/crop_type")
async def crop_type(image_request: ImageRequest):
filename = image_request.originalName or f"{image_request.publicId.split('/')[-1]}.jpg"
filepath = os.path.join(UPLOAD_FOLDER, filename)
if not download_file(image_request.publicId, filepath, image_request.fileType):
raise HTTPException(status_code=500, detail=f"Failed to download image from Cloudinary: {image_request.publicId}")
result = engine.predict_crop(filepath)
os.remove(filepath)
return result
@app.post("/predictForCrop")
async def predict_crop_yield(data: CropYieldRequest):
if not (-90 <= data.locationLat <= 90) or not (-180 <= data.locationLong <= 180):
raise HTTPException(status_code=400, detail="Invalid latitude or longitude values")
try:
result = engine.predict_crop_yield_from_location(
crop_input=data.cropName.upper(),
lat=data.locationLat,
lon=data.locationLong
)
return result
except ValueError as e:
raise HTTPException(status_code=400, detail=f"Invalid numeric input: {str(e)}")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/futureWeatherPrediction")
async def future_weather_prediction(data: WeatherPredictionRequest):
if not (-90 <= data.locationLat <= 90) or not (-180 <= data.locationLong <= 180):
raise HTTPException(status_code=400, detail="Invalid latitude or longitude values")
try:
tom = futureWeather.fetch_tomorrow(data.locationLat, data.locationLong)
if not tom or len(tom.get("timelines", {}).get("daily", [])) < 7:
weather_data, source = futureWeather.fetch_open_meteo(data.locationLat, data.locationLong), "open-meteo"
else:
weather_data, source = tom, "tomorrow"
summary, score, should_claim, flags = futureWeather.extract_and_calc(weather_data, source)
ai_text = futureWeather.invoke_gemini(summary, score, should_claim, flags, data.language)
return {
"claim_recommendation": {
"should_claim": should_claim,
"weather_trend_risk_score": round(score, 2),
"forecast_summary": summary,
"language": data.language,
"gemini_response": ai_text
}
}
except ValueError as e:
raise HTTPException(status_code=400, detail=f"Invalid numeric input: {str(e)}")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
## MADE BY UDDALAK MUKHERJEE
# Load and clean crop data once on startup
CROP_DATA_PATH = "data/ICRISAT-District_Level_Data_30_Years.csv"
df_crop = pd.read_csv(CROP_DATA_PATH)
df_crop_clean = df_crop.drop(columns=['State Code', 'Year', 'State Name'], errors='ignore')
mean_crop_by_district = df_crop_clean.groupby('Dist Name').mean(numeric_only=True)
def get_district_from_coordinates(lat, lon):
geolocator = Nominatim(user_agent="agrisure-ai")
try:
location = geolocator.reverse((lat, lon), exactly_one=True)
except GeocoderTimedOut:
raise Exception("Reverse geocoding service timed out.")
except Exception as e:
raise Exception(f"Geocoding error: {str(e)}")
if not location:
raise ValueError("Could not get district from coordinates.")
# Handle potential async/coroutine response with type ignoring
try:
# Use type: ignore to suppress type checker warnings for geopy attributes
address = location.raw.get('address', {}) # type: ignore
except (AttributeError, TypeError):
try:
# Fallback: try to get address from location attributes
addr_str = str(location.address) # type: ignore
# Basic parsing fallback
address = {'display_name': addr_str}
except (AttributeError, TypeError):
raise ValueError("Could not parse location data.")
if not address:
raise ValueError("Could not get district from coordinates.")
district = (
address.get('district') or
address.get('state_district') or
address.get('county')
)
if district and 'district' in district.lower():
district = district.replace("District", "").strip()
return district
def clean_district_name(district):
if not isinstance(district, str):
return district
district = re.sub(r"\s*[-\u2013]\s*(I{1,3}|IV|V|VI|VII|VIII|IX|X|\d+)$", "", district, flags=re.IGNORECASE)
district = district.replace("District", "").strip()
aliases = {
"Purba Bardhaman": "Burdwan",
"Paschim Bardhaman": "Burdwan",
"Bardhaman": "Burdwan",
"Kalna": "Burdwan",
"Kalyani": "Nadia",
"Raiganj": "Uttar Dinajpur",
"Kolkata": "North 24 Parganas"
}
return aliases.get(district, district)
@app.get("/top-crops")
async def get_top_5_crops(
lat: float = Query(..., description="Latitude of the location"),
lon: float = Query(..., description="Longitude of the location")
):
try:
district_name = get_district_from_coordinates(lat, lon)
if not district_name:
return JSONResponse(status_code=404, content={"error": "Could not resolve district from coordinates."})
district_name = clean_district_name(district_name)
matched_district = None
for dist in mean_crop_by_district.index:
if dist.strip().lower() == district_name.lower():
matched_district = dist
break
if not matched_district:
return JSONResponse(status_code=404, content={"error": f"District '{district_name}' not found in dataset."})
top_crops = mean_crop_by_district.loc[matched_district].sort_values(ascending=False).head(5)
print(top_crops)
return {
"district": matched_district,
"top_5_crops": [
crop.replace(" (Kg per ha)", "").replace("YIELD", "").strip()
for crop in top_crops.index
]
}
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})
if __name__ == "__main__":
import uvicorn
print("Starting FastAPI server...")
print("Server will be available at:")
print(" - http://localhost:7860")
print("\nPress CTRL+C to stop the server")
uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True)