Eco-AI / app.py
Adedoyinjames's picture
Update app.py
d5538fd verified
# app.py - Updated for Hugging Face Spaces with Gradio compatibility
import os
import io
import json
import uuid
import time
import datetime
import requests
import gradio as gr
from typing import Dict, Any
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
# Configuration
HF_API_URL = "https://router.huggingface.co/v1/chat/completions"
HF_TOKEN = os.environ.get("HF_TOKEN") # must be set in environment
MODEL = "deepseek-ai/DeepSeek-V3.2:novita"
if not HF_TOKEN:
raise RuntimeError("HF_TOKEN environment variable is required")
# In-memory job store
jobs_store: Dict[str, Dict] = {}
# FastAPI app
app = FastAPI(title="EcoScan AI API", version="1.0.0")
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Extended SYSTEM_PROMPT
SYSTEM_PROMPT = """
You are EcoScan AI, a smart AI tool for environmental risk assessment in Nigeria and other locations.
You analyze proposed project locations using satellite imagery and scientific data to assess risks in:
- Soil Erosion (score 0-10, higher = higher risk)
- Slope Stability (score 0-10, higher = higher risk)
- Flooding (score 0-10, higher = higher risk)
- Biodiversity (score 0-10, higher = higher impact/risk to biodiversity)
For a given location (place name or coordinates like "9.0820° N, 8.6753° E"), generate a realistic, detailed assessment.
**IMPORTANT**: Output ONLY a single valid JSON object and nothing else. The JSON object must follow exactly this structure:
{
"location": {
"name": "extracted location name",
"coordinates": "latitude, longitude if available",
"region": "geographic region"
},
"erosion": {
"score": float,
"description": "detailed explanation of erosion risk",
"factors": ["factor1", "factor2", "factor3"],
"mitigation": "recommended mitigation measures"
},
"slope_stability": {
"score": float,
"description": "detailed explanation of slope stability",
"factors": ["factor1", "factor2", "factor3"],
"mitigation": "recommended mitigation measures"
},
"flooding": {
"score": float,
"description": "detailed explanation of flooding risk",
"factors": ["factor1", "factor2", "factor3"],
"mitigation": "recommended mitigation measures"
},
"biodiversity": {
"score": float,
"description": "detailed explanation of biodiversity impact",
"factors": ["factor1", "factor2", "factor3"],
"mitigation": "recommended mitigation measures"
},
"overall_report": "A comprehensive 3-5 sentence paragraph summarizing all risks and specific mitigation recommendations.",
"assessment_date": "current date in YYYY-MM-DD format"
}
- Scores must be numeric (float) between 0.0 and 10.0 (one decimal place preferred).
- Descriptions should be 2-3 sentences with specific details.
- Provide 2-3 factors for each category.
- Mitigation should be practical, actionable recommendations.
- Do not include any additional text outside the JSON.
"""
def extract_coordinates(location_text: str) -> Dict[str, Any]:
"""Extract coordinates from location text if present."""
import re
patterns = [
r'([-+]?\d+\.\d+)[°\s]*([NS]?)[,\s]*([-+]?\d+\.\d+)[°\s]*([EW]?)',
r'lat[:\s]*([-+]?\d+\.\d+)[,\s]*lon[:\s]*([-+]?\d+\.\d+)',
r'([-+]?\d+\.\d+)[,\s]+([-+]?\d+\.\d+)'
]
for pattern in patterns:
match = re.search(pattern, location_text, re.IGNORECASE)
if match:
groups = match.groups()
if len(groups) >= 2:
lat = float(groups[0])
lon = float(groups[1])
if len(groups) > 2 and groups[2] == 'S':
lat = -lat
if len(groups) > 3 and groups[3] == 'W':
lon = -lon
return {
"latitude": lat,
"longitude": lon,
"coordinates": f"{lat:.4f}, {lon:.4f}"
}
return {"coordinates": None, "latitude": None, "longitude": None}
def call_hf_chat(location_text: str) -> str:
"""Call the Hugging Face chat completions endpoint."""
headers = {"Authorization": f"Bearer {HF_TOKEN}"}
user_prompt = f"Assess the following location: {location_text}\n"
coords = extract_coordinates(location_text)
if coords["coordinates"]:
user_prompt += f"Coordinates: {coords['coordinates']}\n"
user_prompt += "Return output exactly in the required JSON format."
payload = {
"model": MODEL,
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_prompt}
],
"max_tokens": 1024,
"temperature": 0.2,
}
try:
r = requests.post(HF_API_URL, headers=headers, json=payload, timeout=45)
r.raise_for_status()
data = r.json()
return data["choices"][0]["message"]["content"]
except Exception as e:
raise RuntimeError(f"HuggingFace API error: {e}")
def sanitize_and_parse_json(text: str) -> Dict:
"""Parse model output as JSON."""
import re
text = text.strip()
json_match = re.search(r'\{.*\}', text, re.DOTALL)
if json_match:
text = json_match.group(0)
try:
return json.loads(text)
except:
text = re.sub(r',\s*}', '}', text)
text = re.sub(r',\s*]', ']', text)
return json.loads(text)
def validate_assessment(assessment: Dict, location_text: str) -> Dict:
"""Validate and enrich assessment."""
# Ensure all required keys exist
for key in ["erosion", "slope_stability", "flooding", "biodiversity", "overall_report"]:
if key not in assessment:
if key != "overall_report":
assessment[key] = {"score": 0.0, "description": "Data not available", "factors": [], "mitigation": ""}
else:
assessment[key] = "Assessment data not available."
# Add location information
if "location" not in assessment:
coords = extract_coordinates(location_text)
assessment["location"] = {
"name": location_text,
"coordinates": coords.get("coordinates"),
"region": "Unknown"
}
# Ensure scores are valid
for metric in ["erosion", "slope_stability", "flooding", "biodiversity"]:
if metric in assessment:
entry = assessment[metric]
if isinstance(entry, dict):
if "score" in entry:
try:
score = float(entry["score"])
entry["score"] = max(0.0, min(10.0, round(score, 1)))
except:
entry["score"] = 5.0
else:
entry["score"] = 5.0
if "description" not in entry:
entry["description"] = "Risk assessment data."
if "factors" not in entry:
entry["factors"] = ["Soil type", "Vegetation cover", "Rainfall pattern"]
if "mitigation" not in entry:
entry["mitigation"] = "Implement standard mitigation measures."
assessment["assessment_date"] = datetime.datetime.now().strftime("%Y-%m-%d")
return assessment
# API Endpoints
@app.get("/")
async def root():
return {"message": "EcoScan AI API", "status": "running", "version": "1.0.0"}
@app.get("/health")
async def health():
return {"status": "ok", "service": "EcoScan AI"}
@app.post("/assess")
async def assess_location(request: Request):
"""Simple synchronous assessment endpoint."""
try:
data = await request.json()
location = data.get("location", "")
if not location:
raise HTTPException(status_code=400, detail="Location is required")
# Call AI model
model_output = call_hf_chat(location)
# Parse and validate
assessment = sanitize_and_parse_json(model_output)
assessment = validate_assessment(assessment, location)
return JSONResponse(content=assessment)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/assess_job")
async def create_assessment_job(request: Request):
"""Create async assessment job."""
try:
data = await request.json()
location = data.get("location", "")
if not location:
raise HTTPException(status_code=400, detail="Location is required")
job_id = str(uuid.uuid4())
# Store job
jobs_store[job_id] = {
"job_id": job_id,
"type": "assessment",
"status": "processing",
"progress": 0.0,
"message": "Starting analysis...",
"location": location,
"created_at": time.time(),
"result": None,
"error": None
}
# Process in background (simplified for demo)
import threading
def process_job():
try:
jobs_store[job_id]["progress"] = 0.3
jobs_store[job_id]["message"] = "Analyzing environmental data..."
model_output = call_hf_chat(location)
jobs_store[job_id]["progress"] = 0.7
jobs_store[job_id]["message"] = "Processing results..."
assessment = sanitize_and_parse_json(model_output)
assessment = validate_assessment(assessment, location)
jobs_store[job_id]["status"] = "done"
jobs_store[job_id]["progress"] = 1.0
jobs_store[job_id]["message"] = "Assessment complete"
jobs_store[job_id]["result"] = assessment
except Exception as e:
jobs_store[job_id]["status"] = "error"
jobs_store[job_id]["message"] = str(e)
jobs_store[job_id]["error"] = str(e)
thread = threading.Thread(target=process_job)
thread.daemon = True
thread.start()
return {"job_id": job_id, "status": "processing"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/assess_job/status/{job_id}")
async def get_assessment_status(job_id: str):
"""Get job status."""
job = jobs_store.get(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
return {
"job_id": job_id,
"status": job["status"],
"progress": job["progress"],
"message": job["message"],
"error": job.get("error")
}
@app.get("/assess_job/result/{job_id}")
async def get_assessment_result(job_id: str):
"""Get job result."""
job = jobs_store.get(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
if job["status"] != "done":
raise HTTPException(status_code=400, detail="Job not completed yet")
return job["result"]
@app.post("/pdf_job")
async def create_pdf_job(request: Request):
"""Create PDF generation job."""
try:
data = await request.json()
location = data.get("location", "")
assessment = data.get("assessment")
if not location:
raise HTTPException(status_code=400, detail="Location is required")
if not assessment:
# Generate assessment first
model_output = call_hf_chat(location)
assessment = sanitize_and_parse_json(model_output)
assessment = validate_assessment(assessment, location)
job_id = str(uuid.uuid4())
# Store job
jobs_store[job_id] = {
"job_id": job_id,
"type": "pdf",
"status": "processing",
"progress": 0.0,
"message": "Generating PDF...",
"location": location,
"created_at": time.time(),
"result": None,
"error": None
}
# Process PDF in background
import threading
from reportlab.lib.pagesizes import A4
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.enums import TA_LEFT
from reportlab.lib.colors import HexColor
def generate_pdf():
try:
buffer = io.BytesIO()
doc = SimpleDocTemplate(buffer, pagesize=A4)
story = []
styles = getSampleStyleSheet()
# Title
title_style = ParagraphStyle(
'Title',
parent=styles['Heading1'],
fontSize=20,
textColor=HexColor('#16a34a')
)
story.append(Paragraph("EcoScan AI - Environmental Risk Assessment", title_style))
story.append(Spacer(1, 12))
# Location
story.append(Paragraph(f"Location: {location}", styles["Normal"]))
story.append(Spacer(1, 24))
# Risk scores table
table_data = [["Risk Factor", "Score", "Description"]]
for name, key in [
("Erosion", "erosion"),
("Slope Stability", "slope_stability"),
("Flooding", "flooding"),
("Biodiversity", "biodiversity")
]:
data = assessment.get(key, {})
score = data.get("score", 0)
desc = data.get("description", "")[:80] + "..." if len(data.get("description", "")) > 80 else data.get("description", "")
table_data.append([name, f"{score:.1f}", desc])
table = Table(table_data)
table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), HexColor('#f0f9ff')),
('GRID', (0, 0), (-1, -1), 0.5, HexColor('#e5e7eb')),
]))
story.append(table)
story.append(Spacer(1, 24))
# Overall report
story.append(Paragraph("Overall Assessment", styles["Heading2"]))
story.append(Paragraph(assessment.get("overall_report", ""), styles["Normal"]))
doc.build(story)
buffer.seek(0)
pdf_bytes = buffer.read()
jobs_store[job_id]["status"] = "done"
jobs_store[job_id]["progress"] = 1.0
jobs_store[job_id]["message"] = "PDF generated"
jobs_store[job_id]["result"] = pdf_bytes
except Exception as e:
jobs_store[job_id]["status"] = "error"
jobs_store[job_id]["message"] = str(e)
jobs_store[job_id]["error"] = str(e)
thread = threading.Thread(target=generate_pdf)
thread.daemon = True
thread.start()
return {"job_id": job_id, "status": "processing"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/pdf_job/status/{job_id}")
async def get_pdf_status(job_id: str):
"""Get PDF job status."""
job = jobs_store.get(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
return {
"job_id": job_id,
"status": job["status"],
"progress": job["progress"],
"message": job["message"],
"error": job.get("error")
}
@app.get("/pdf_job/result/{job_id}")
async def get_pdf_result(job_id: str):
"""Download PDF."""
job = jobs_store.get(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
if job["status"] != "done":
raise HTTPException(status_code=400, detail="Job not completed yet")
pdf_bytes = job["result"]
return StreamingResponse(
io.BytesIO(pdf_bytes),
media_type="application/pdf",
headers={"Content-Disposition": f'attachment; filename="EcoScan_Report_{datetime.datetime.now().strftime("%Y%m%d")}.pdf"'}
)
# Gradio Interface (for Hugging Face Spaces demo)
def gradio_assess(location: str):
"""Gradio interface function."""
try:
model_output = call_hf_chat(location)
assessment = sanitize_and_parse_json(model_output)
assessment = validate_assessment(assessment, location)
# Format output for Gradio
output = f"""
## 📍 Location: {assessment['location']['name']}
**Coordinates:** {assessment['location'].get('coordinates', 'Not specified')}
**Region:** {assessment['location'].get('region', 'Not specified')}
**Assessment Date:** {assessment['assessment_date']}
### 📊 Risk Scores:
- **Erosion:** {assessment['erosion']['score']}/10
- **Slope Stability:** {assessment['slope_stability']['score']}/10
- **Flooding:** {assessment['flooding']['score']}/10
- **Biodiversity:** {assessment['biodiversity']['score']}/10
### 📝 Overall Assessment:
{assessment['overall_report']}
"""
return output, assessment
except Exception as e:
return f"Error: {str(e)}", None
# Create Gradio interface
demo = gr.Interface(
fn=gradio_assess,
inputs=gr.Textbox(
label="Enter Location",
placeholder="e.g., 'Lagos, Nigeria' or '9.0820° N, 8.6753° E'"
),
outputs=[
gr.Markdown(label="Assessment Report"),
gr.JSON(label="Raw Data")
],
title="🌍 EcoScan AI - Environmental Risk Assessment",
description="Analyze environmental risks for any location in Nigeria or beyond. Enter a location name or coordinates.",
examples=[
["Lagos, Nigeria"],
["9.0820° N, 8.6753° E"],
["Niger Delta region"],
["6.5244, 3.3792"]
]
)
# Hugging Face Spaces expects the app to be served via uvicorn
if __name__ == "__main__":
# For local testing
uvicorn.run(app, host="0.0.0.0", port=7860)