Spaces:
Sleeping
Sleeping
| # app.py - Updated for Hugging Face Spaces with Gradio compatibility | |
| import os | |
| import io | |
| import json | |
| import uuid | |
| import time | |
| import datetime | |
| import requests | |
| import gradio as gr | |
| from typing import Dict, Any | |
| from fastapi import FastAPI, Request, HTTPException | |
| from fastapi.responses import JSONResponse, StreamingResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import uvicorn | |
| # Configuration | |
| HF_API_URL = "https://router.huggingface.co/v1/chat/completions" | |
| HF_TOKEN = os.environ.get("HF_TOKEN") # must be set in environment | |
| MODEL = "deepseek-ai/DeepSeek-V3.2:novita" | |
| if not HF_TOKEN: | |
| raise RuntimeError("HF_TOKEN environment variable is required") | |
| # In-memory job store | |
| jobs_store: Dict[str, Dict] = {} | |
| # FastAPI app | |
| app = FastAPI(title="EcoScan AI API", version="1.0.0") | |
| # CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Extended SYSTEM_PROMPT | |
| SYSTEM_PROMPT = """ | |
| You are EcoScan AI, a smart AI tool for environmental risk assessment in Nigeria and other locations. | |
| You analyze proposed project locations using satellite imagery and scientific data to assess risks in: | |
| - Soil Erosion (score 0-10, higher = higher risk) | |
| - Slope Stability (score 0-10, higher = higher risk) | |
| - Flooding (score 0-10, higher = higher risk) | |
| - Biodiversity (score 0-10, higher = higher impact/risk to biodiversity) | |
| For a given location (place name or coordinates like "9.0820° N, 8.6753° E"), generate a realistic, detailed assessment. | |
| **IMPORTANT**: Output ONLY a single valid JSON object and nothing else. The JSON object must follow exactly this structure: | |
| { | |
| "location": { | |
| "name": "extracted location name", | |
| "coordinates": "latitude, longitude if available", | |
| "region": "geographic region" | |
| }, | |
| "erosion": { | |
| "score": float, | |
| "description": "detailed explanation of erosion risk", | |
| "factors": ["factor1", "factor2", "factor3"], | |
| "mitigation": "recommended mitigation measures" | |
| }, | |
| "slope_stability": { | |
| "score": float, | |
| "description": "detailed explanation of slope stability", | |
| "factors": ["factor1", "factor2", "factor3"], | |
| "mitigation": "recommended mitigation measures" | |
| }, | |
| "flooding": { | |
| "score": float, | |
| "description": "detailed explanation of flooding risk", | |
| "factors": ["factor1", "factor2", "factor3"], | |
| "mitigation": "recommended mitigation measures" | |
| }, | |
| "biodiversity": { | |
| "score": float, | |
| "description": "detailed explanation of biodiversity impact", | |
| "factors": ["factor1", "factor2", "factor3"], | |
| "mitigation": "recommended mitigation measures" | |
| }, | |
| "overall_report": "A comprehensive 3-5 sentence paragraph summarizing all risks and specific mitigation recommendations.", | |
| "assessment_date": "current date in YYYY-MM-DD format" | |
| } | |
| - Scores must be numeric (float) between 0.0 and 10.0 (one decimal place preferred). | |
| - Descriptions should be 2-3 sentences with specific details. | |
| - Provide 2-3 factors for each category. | |
| - Mitigation should be practical, actionable recommendations. | |
| - Do not include any additional text outside the JSON. | |
| """ | |
| def extract_coordinates(location_text: str) -> Dict[str, Any]: | |
| """Extract coordinates from location text if present.""" | |
| import re | |
| patterns = [ | |
| r'([-+]?\d+\.\d+)[°\s]*([NS]?)[,\s]*([-+]?\d+\.\d+)[°\s]*([EW]?)', | |
| r'lat[:\s]*([-+]?\d+\.\d+)[,\s]*lon[:\s]*([-+]?\d+\.\d+)', | |
| r'([-+]?\d+\.\d+)[,\s]+([-+]?\d+\.\d+)' | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, location_text, re.IGNORECASE) | |
| if match: | |
| groups = match.groups() | |
| if len(groups) >= 2: | |
| lat = float(groups[0]) | |
| lon = float(groups[1]) | |
| if len(groups) > 2 and groups[2] == 'S': | |
| lat = -lat | |
| if len(groups) > 3 and groups[3] == 'W': | |
| lon = -lon | |
| return { | |
| "latitude": lat, | |
| "longitude": lon, | |
| "coordinates": f"{lat:.4f}, {lon:.4f}" | |
| } | |
| return {"coordinates": None, "latitude": None, "longitude": None} | |
| def call_hf_chat(location_text: str) -> str: | |
| """Call the Hugging Face chat completions endpoint.""" | |
| headers = {"Authorization": f"Bearer {HF_TOKEN}"} | |
| user_prompt = f"Assess the following location: {location_text}\n" | |
| coords = extract_coordinates(location_text) | |
| if coords["coordinates"]: | |
| user_prompt += f"Coordinates: {coords['coordinates']}\n" | |
| user_prompt += "Return output exactly in the required JSON format." | |
| payload = { | |
| "model": MODEL, | |
| "messages": [ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": user_prompt} | |
| ], | |
| "max_tokens": 1024, | |
| "temperature": 0.2, | |
| } | |
| try: | |
| r = requests.post(HF_API_URL, headers=headers, json=payload, timeout=45) | |
| r.raise_for_status() | |
| data = r.json() | |
| return data["choices"][0]["message"]["content"] | |
| except Exception as e: | |
| raise RuntimeError(f"HuggingFace API error: {e}") | |
| def sanitize_and_parse_json(text: str) -> Dict: | |
| """Parse model output as JSON.""" | |
| import re | |
| text = text.strip() | |
| json_match = re.search(r'\{.*\}', text, re.DOTALL) | |
| if json_match: | |
| text = json_match.group(0) | |
| try: | |
| return json.loads(text) | |
| except: | |
| text = re.sub(r',\s*}', '}', text) | |
| text = re.sub(r',\s*]', ']', text) | |
| return json.loads(text) | |
| def validate_assessment(assessment: Dict, location_text: str) -> Dict: | |
| """Validate and enrich assessment.""" | |
| # Ensure all required keys exist | |
| for key in ["erosion", "slope_stability", "flooding", "biodiversity", "overall_report"]: | |
| if key not in assessment: | |
| if key != "overall_report": | |
| assessment[key] = {"score": 0.0, "description": "Data not available", "factors": [], "mitigation": ""} | |
| else: | |
| assessment[key] = "Assessment data not available." | |
| # Add location information | |
| if "location" not in assessment: | |
| coords = extract_coordinates(location_text) | |
| assessment["location"] = { | |
| "name": location_text, | |
| "coordinates": coords.get("coordinates"), | |
| "region": "Unknown" | |
| } | |
| # Ensure scores are valid | |
| for metric in ["erosion", "slope_stability", "flooding", "biodiversity"]: | |
| if metric in assessment: | |
| entry = assessment[metric] | |
| if isinstance(entry, dict): | |
| if "score" in entry: | |
| try: | |
| score = float(entry["score"]) | |
| entry["score"] = max(0.0, min(10.0, round(score, 1))) | |
| except: | |
| entry["score"] = 5.0 | |
| else: | |
| entry["score"] = 5.0 | |
| if "description" not in entry: | |
| entry["description"] = "Risk assessment data." | |
| if "factors" not in entry: | |
| entry["factors"] = ["Soil type", "Vegetation cover", "Rainfall pattern"] | |
| if "mitigation" not in entry: | |
| entry["mitigation"] = "Implement standard mitigation measures." | |
| assessment["assessment_date"] = datetime.datetime.now().strftime("%Y-%m-%d") | |
| return assessment | |
| # API Endpoints | |
| async def root(): | |
| return {"message": "EcoScan AI API", "status": "running", "version": "1.0.0"} | |
| async def health(): | |
| return {"status": "ok", "service": "EcoScan AI"} | |
| async def assess_location(request: Request): | |
| """Simple synchronous assessment endpoint.""" | |
| try: | |
| data = await request.json() | |
| location = data.get("location", "") | |
| if not location: | |
| raise HTTPException(status_code=400, detail="Location is required") | |
| # Call AI model | |
| model_output = call_hf_chat(location) | |
| # Parse and validate | |
| assessment = sanitize_and_parse_json(model_output) | |
| assessment = validate_assessment(assessment, location) | |
| return JSONResponse(content=assessment) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def create_assessment_job(request: Request): | |
| """Create async assessment job.""" | |
| try: | |
| data = await request.json() | |
| location = data.get("location", "") | |
| if not location: | |
| raise HTTPException(status_code=400, detail="Location is required") | |
| job_id = str(uuid.uuid4()) | |
| # Store job | |
| jobs_store[job_id] = { | |
| "job_id": job_id, | |
| "type": "assessment", | |
| "status": "processing", | |
| "progress": 0.0, | |
| "message": "Starting analysis...", | |
| "location": location, | |
| "created_at": time.time(), | |
| "result": None, | |
| "error": None | |
| } | |
| # Process in background (simplified for demo) | |
| import threading | |
| def process_job(): | |
| try: | |
| jobs_store[job_id]["progress"] = 0.3 | |
| jobs_store[job_id]["message"] = "Analyzing environmental data..." | |
| model_output = call_hf_chat(location) | |
| jobs_store[job_id]["progress"] = 0.7 | |
| jobs_store[job_id]["message"] = "Processing results..." | |
| assessment = sanitize_and_parse_json(model_output) | |
| assessment = validate_assessment(assessment, location) | |
| jobs_store[job_id]["status"] = "done" | |
| jobs_store[job_id]["progress"] = 1.0 | |
| jobs_store[job_id]["message"] = "Assessment complete" | |
| jobs_store[job_id]["result"] = assessment | |
| except Exception as e: | |
| jobs_store[job_id]["status"] = "error" | |
| jobs_store[job_id]["message"] = str(e) | |
| jobs_store[job_id]["error"] = str(e) | |
| thread = threading.Thread(target=process_job) | |
| thread.daemon = True | |
| thread.start() | |
| return {"job_id": job_id, "status": "processing"} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_assessment_status(job_id: str): | |
| """Get job status.""" | |
| job = jobs_store.get(job_id) | |
| if not job: | |
| raise HTTPException(status_code=404, detail="Job not found") | |
| return { | |
| "job_id": job_id, | |
| "status": job["status"], | |
| "progress": job["progress"], | |
| "message": job["message"], | |
| "error": job.get("error") | |
| } | |
| async def get_assessment_result(job_id: str): | |
| """Get job result.""" | |
| job = jobs_store.get(job_id) | |
| if not job: | |
| raise HTTPException(status_code=404, detail="Job not found") | |
| if job["status"] != "done": | |
| raise HTTPException(status_code=400, detail="Job not completed yet") | |
| return job["result"] | |
| async def create_pdf_job(request: Request): | |
| """Create PDF generation job.""" | |
| try: | |
| data = await request.json() | |
| location = data.get("location", "") | |
| assessment = data.get("assessment") | |
| if not location: | |
| raise HTTPException(status_code=400, detail="Location is required") | |
| if not assessment: | |
| # Generate assessment first | |
| model_output = call_hf_chat(location) | |
| assessment = sanitize_and_parse_json(model_output) | |
| assessment = validate_assessment(assessment, location) | |
| job_id = str(uuid.uuid4()) | |
| # Store job | |
| jobs_store[job_id] = { | |
| "job_id": job_id, | |
| "type": "pdf", | |
| "status": "processing", | |
| "progress": 0.0, | |
| "message": "Generating PDF...", | |
| "location": location, | |
| "created_at": time.time(), | |
| "result": None, | |
| "error": None | |
| } | |
| # Process PDF in background | |
| import threading | |
| from reportlab.lib.pagesizes import A4 | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle | |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
| from reportlab.lib.enums import TA_LEFT | |
| from reportlab.lib.colors import HexColor | |
| def generate_pdf(): | |
| try: | |
| buffer = io.BytesIO() | |
| doc = SimpleDocTemplate(buffer, pagesize=A4) | |
| story = [] | |
| styles = getSampleStyleSheet() | |
| # Title | |
| title_style = ParagraphStyle( | |
| 'Title', | |
| parent=styles['Heading1'], | |
| fontSize=20, | |
| textColor=HexColor('#16a34a') | |
| ) | |
| story.append(Paragraph("EcoScan AI - Environmental Risk Assessment", title_style)) | |
| story.append(Spacer(1, 12)) | |
| # Location | |
| story.append(Paragraph(f"Location: {location}", styles["Normal"])) | |
| story.append(Spacer(1, 24)) | |
| # Risk scores table | |
| table_data = [["Risk Factor", "Score", "Description"]] | |
| for name, key in [ | |
| ("Erosion", "erosion"), | |
| ("Slope Stability", "slope_stability"), | |
| ("Flooding", "flooding"), | |
| ("Biodiversity", "biodiversity") | |
| ]: | |
| data = assessment.get(key, {}) | |
| score = data.get("score", 0) | |
| desc = data.get("description", "")[:80] + "..." if len(data.get("description", "")) > 80 else data.get("description", "") | |
| table_data.append([name, f"{score:.1f}", desc]) | |
| table = Table(table_data) | |
| table.setStyle(TableStyle([ | |
| ('BACKGROUND', (0, 0), (-1, 0), HexColor('#f0f9ff')), | |
| ('GRID', (0, 0), (-1, -1), 0.5, HexColor('#e5e7eb')), | |
| ])) | |
| story.append(table) | |
| story.append(Spacer(1, 24)) | |
| # Overall report | |
| story.append(Paragraph("Overall Assessment", styles["Heading2"])) | |
| story.append(Paragraph(assessment.get("overall_report", ""), styles["Normal"])) | |
| doc.build(story) | |
| buffer.seek(0) | |
| pdf_bytes = buffer.read() | |
| jobs_store[job_id]["status"] = "done" | |
| jobs_store[job_id]["progress"] = 1.0 | |
| jobs_store[job_id]["message"] = "PDF generated" | |
| jobs_store[job_id]["result"] = pdf_bytes | |
| except Exception as e: | |
| jobs_store[job_id]["status"] = "error" | |
| jobs_store[job_id]["message"] = str(e) | |
| jobs_store[job_id]["error"] = str(e) | |
| thread = threading.Thread(target=generate_pdf) | |
| thread.daemon = True | |
| thread.start() | |
| return {"job_id": job_id, "status": "processing"} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_pdf_status(job_id: str): | |
| """Get PDF job status.""" | |
| job = jobs_store.get(job_id) | |
| if not job: | |
| raise HTTPException(status_code=404, detail="Job not found") | |
| return { | |
| "job_id": job_id, | |
| "status": job["status"], | |
| "progress": job["progress"], | |
| "message": job["message"], | |
| "error": job.get("error") | |
| } | |
| async def get_pdf_result(job_id: str): | |
| """Download PDF.""" | |
| job = jobs_store.get(job_id) | |
| if not job: | |
| raise HTTPException(status_code=404, detail="Job not found") | |
| if job["status"] != "done": | |
| raise HTTPException(status_code=400, detail="Job not completed yet") | |
| pdf_bytes = job["result"] | |
| return StreamingResponse( | |
| io.BytesIO(pdf_bytes), | |
| media_type="application/pdf", | |
| headers={"Content-Disposition": f'attachment; filename="EcoScan_Report_{datetime.datetime.now().strftime("%Y%m%d")}.pdf"'} | |
| ) | |
| # Gradio Interface (for Hugging Face Spaces demo) | |
| def gradio_assess(location: str): | |
| """Gradio interface function.""" | |
| try: | |
| model_output = call_hf_chat(location) | |
| assessment = sanitize_and_parse_json(model_output) | |
| assessment = validate_assessment(assessment, location) | |
| # Format output for Gradio | |
| output = f""" | |
| ## 📍 Location: {assessment['location']['name']} | |
| **Coordinates:** {assessment['location'].get('coordinates', 'Not specified')} | |
| **Region:** {assessment['location'].get('region', 'Not specified')} | |
| **Assessment Date:** {assessment['assessment_date']} | |
| ### 📊 Risk Scores: | |
| - **Erosion:** {assessment['erosion']['score']}/10 | |
| - **Slope Stability:** {assessment['slope_stability']['score']}/10 | |
| - **Flooding:** {assessment['flooding']['score']}/10 | |
| - **Biodiversity:** {assessment['biodiversity']['score']}/10 | |
| ### 📝 Overall Assessment: | |
| {assessment['overall_report']} | |
| """ | |
| return output, assessment | |
| except Exception as e: | |
| return f"Error: {str(e)}", None | |
| # Create Gradio interface | |
| demo = gr.Interface( | |
| fn=gradio_assess, | |
| inputs=gr.Textbox( | |
| label="Enter Location", | |
| placeholder="e.g., 'Lagos, Nigeria' or '9.0820° N, 8.6753° E'" | |
| ), | |
| outputs=[ | |
| gr.Markdown(label="Assessment Report"), | |
| gr.JSON(label="Raw Data") | |
| ], | |
| title="🌍 EcoScan AI - Environmental Risk Assessment", | |
| description="Analyze environmental risks for any location in Nigeria or beyond. Enter a location name or coordinates.", | |
| examples=[ | |
| ["Lagos, Nigeria"], | |
| ["9.0820° N, 8.6753° E"], | |
| ["Niger Delta region"], | |
| ["6.5244, 3.3792"] | |
| ] | |
| ) | |
| # Hugging Face Spaces expects the app to be served via uvicorn | |
| if __name__ == "__main__": | |
| # For local testing | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |