Spaces:
Runtime error
Runtime error
| from fastapi import FastAPI, UploadFile, File, HTTPException, Form | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import FileResponse, JSONResponse | |
| import json | |
| import os | |
| from datetime import datetime | |
| from typing import List, Optional | |
| import numpy as np | |
| from PIL import Image | |
| from io import BytesIO | |
| import piexif | |
| import uuid | |
| from sqlalchemy import func | |
| from dotenv import load_dotenv | |
| # Load env before imports | |
| load_dotenv() | |
| # Import real YOLO ensemble detector | |
| # Uses trained yolo11base.pt model for actual inference | |
| try: | |
| from ..models.ensemble import EnsembleDetector, SeverityClassifier, TTAProcessor | |
| print("[OK] Loaded real YOLO ensemble detector") | |
| except ImportError as e: | |
| print(f"[WARN] Real ensemble not found ({e}), using mocks") | |
| from ..models.mock_ensemble import EnsembleDetector, SeverityClassifier, TTAProcessor | |
| # GPS Fallback: Near DJI Sample Location (DJI_202511261727_006) | |
| # Original DJI GPS: 16.009719, 108.258302 - Fallback is 50m to the right | |
| DEFAULT_LATITUDE = 16.009719 | |
| DEFAULT_LONGITUDE = 108.258852 | |
| DEFAULT_ALTITUDE = 68.5 | |
| from ..database.models import Detection, DetectionResult, Base | |
| from ..database.db import SessionLocal, engine | |
| # Create tables if they don't exist | |
| Base.metadata.create_all(bind=engine) | |
| app = FastAPI( | |
| title="Road Damage Detection API", | |
| description="AI-powered highway infrastructure monitoring", | |
| version="1.0.0" | |
| ) | |
| # CORS configuration for web UI - MUST BE BEFORE static files mount | |
| origins = os.getenv("ALLOWED_ORIGINS", "http://localhost:3000,http://localhost:3001").split(",") | |
| if not origins or origins == [""]: | |
| origins = ["*"] | |
| # Always include common localhost ports and production URLs | |
| if "http://localhost:3001" not in origins: | |
| origins.append("http://localhost:3001") | |
| if "http://localhost:3000" not in origins: | |
| origins.append("http://localhost:3000") | |
| # Add Vercel production URL | |
| if "https://road-damage-on5d.vercel.app" not in origins: | |
| origins.append("https://road-damage-on5d.vercel.app") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=origins, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| expose_headers=["*"], | |
| ) | |
| # Custom image serving endpoint with CORS headers (StaticFiles doesn't properly apply CORS) | |
| from starlette.responses import Response | |
| async def serve_upload(filename: str): | |
| """Serve uploaded images with proper CORS headers.""" | |
| filepath = os.path.join("data/uploads", filename) | |
| if not os.path.exists(filepath): | |
| raise HTTPException(status_code=404, detail="File not found") | |
| # Determine content type | |
| ext = os.path.splitext(filename)[1].lower() | |
| content_types = { | |
| '.jpg': 'image/jpeg', | |
| '.jpeg': 'image/jpeg', | |
| '.png': 'image/png', | |
| '.gif': 'image/gif', | |
| '.webp': 'image/webp', | |
| } | |
| content_type = content_types.get(ext, 'application/octet-stream') | |
| with open(filepath, 'rb') as f: | |
| content = f.read() | |
| return Response( | |
| content=content, | |
| media_type=content_type, | |
| headers={ | |
| "Access-Control-Allow-Origin": "*", | |
| "Access-Control-Allow-Methods": "GET, OPTIONS", | |
| "Access-Control-Allow-Headers": "*", | |
| "Cache-Control": "public, max-age=3600" | |
| } | |
| ) | |
| # Initialize models | |
| print("[INFO] Loading models...") | |
| ensemble = EnsembleDetector() | |
| severity_classifier = SeverityClassifier() | |
| tta_processor = TTAProcessor() | |
| # Store detection results temporarily (for simple retrieval without DB) | |
| DETECTION_CACHE = {} | |
| # Ensure uploads directory exists | |
| os.makedirs("data/uploads", exist_ok=True) | |
| # ============================================ | |
| # 1. EXIF EXTRACTION FROM DRONE IMAGES | |
| # ============================================ | |
| def extract_gps_from_image(image_bytes: bytes) -> dict: | |
| """ | |
| Extract GPS coordinates from DJI image EXIF data. | |
| """ | |
| try: | |
| image = Image.open(BytesIO(image_bytes)) | |
| try: | |
| exif_dict = piexif.load(image_bytes) | |
| print(f"[GPS] EXIF loaded, keys: {list(exif_dict.keys())}") | |
| except Exception as e: | |
| # Maybe PNG or no exif | |
| print(f"[WARN] EXIF load failed: {e}") | |
| return { | |
| "latitude": DEFAULT_LATITUDE, | |
| "longitude": DEFAULT_LONGITUDE, | |
| "altitude": DEFAULT_ALTITUDE, | |
| "timestamp": datetime.now().isoformat(), | |
| "drone_model": "Unknown", | |
| "success": False | |
| } | |
| gps_info = exif_dict.get("GPS", {}) | |
| print(f"[GPS] GPS info keys: {list(gps_info.keys())}") | |
| # Helper to convert rational to float | |
| def convert_to_degrees(value): | |
| if not value: return 0.0 | |
| d = value[0][0] / value[0][1] | |
| m = value[1][0] / value[1][1] / 60.0 | |
| s = value[2][0] / value[2][1] / 3600.0 | |
| return d + m + s | |
| if piexif.GPSIFD.GPSLatitude in gps_info and piexif.GPSIFD.GPSLongitude in gps_info: | |
| lat = convert_to_degrees(gps_info[piexif.GPSIFD.GPSLatitude]) | |
| lon = convert_to_degrees(gps_info[piexif.GPSIFD.GPSLongitude]) | |
| print(f"[GPS] GPS found: raw lat={lat}, lon={lon}") | |
| # Adjust for direction | |
| if piexif.GPSIFD.GPSLatitudeRef in gps_info and gps_info[piexif.GPSIFD.GPSLatitudeRef] == b'S': | |
| lat = -lat | |
| if piexif.GPSIFD.GPSLongitudeRef in gps_info and gps_info[piexif.GPSIFD.GPSLongitudeRef] == b'W': | |
| lon = -lon | |
| print(f"[GPS] GPS after direction: lat={lat}, lon={lon}") | |
| # Altitude - safely handle different data formats | |
| try: | |
| alt_data = gps_info.get(piexif.GPSIFD.GPSAltitude) | |
| if alt_data and isinstance(alt_data, tuple) and len(alt_data) >= 1: | |
| if isinstance(alt_data[0], tuple) and len(alt_data[0]) >= 2: | |
| altitude = alt_data[0][0] / alt_data[0][1] | |
| elif isinstance(alt_data[0], (int, float)): | |
| altitude = float(alt_data[0]) | |
| else: | |
| altitude = 0 | |
| else: | |
| altitude = 0 | |
| except Exception: | |
| altitude = 0 | |
| else: | |
| # Fallback to Da Nang if GPS not in EXIF | |
| print(f"[WARN] GPS not found in EXIF! Using default Da Nang coords") | |
| lat = DEFAULT_LATITUDE | |
| lon = DEFAULT_LONGITUDE | |
| altitude = DEFAULT_ALTITUDE | |
| # Timestamp - safely extract | |
| try: | |
| timestamp_data = exif_dict.get("0th", {}).get(piexif.ImageIFD.DateTime) | |
| if timestamp_data: | |
| if isinstance(timestamp_data, bytes): | |
| timestamp = timestamp_data.decode() | |
| elif isinstance(timestamp_data, str): | |
| timestamp = timestamp_data | |
| else: | |
| timestamp = datetime.now().isoformat() | |
| else: | |
| timestamp = datetime.now().isoformat() | |
| except Exception: | |
| timestamp = datetime.now().isoformat() | |
| # Drone model - safely extract | |
| try: | |
| model_data = exif_dict.get("0th", {}).get(piexif.ImageIFD.Model) | |
| if model_data: | |
| if isinstance(model_data, bytes): | |
| drone_model = model_data.decode() | |
| elif isinstance(model_data, str): | |
| drone_model = model_data | |
| else: | |
| drone_model = "Unknown" | |
| else: | |
| drone_model = "Unknown" | |
| except Exception: | |
| drone_model = "Unknown" | |
| return { | |
| "latitude": float(lat), | |
| "longitude": float(lon), | |
| "altitude": float(altitude), | |
| "timestamp": timestamp, | |
| "drone_model": drone_model, | |
| "success": True | |
| } | |
| except Exception as e: | |
| # Fallback to Da Nang coordinates | |
| print(f"[WARN] EXIF extraction failed: {str(e)}") | |
| return { | |
| "latitude": DEFAULT_LATITUDE, | |
| "longitude": DEFAULT_LONGITUDE, | |
| "altitude": DEFAULT_ALTITUDE, | |
| "timestamp": datetime.now().isoformat(), | |
| "drone_model": "Unknown", | |
| "success": False, | |
| "error": str(e) | |
| } | |
| # ============================================ | |
| # 2. DETECTION API ENDPOINT | |
| # ============================================ | |
| async def detect_damage(file: UploadFile = File(...)): | |
| print(f"\n{'='*60}") | |
| print(f"[DETECT] Received file: {file.filename}") | |
| print(f"{'='*60}") | |
| try: | |
| # Read image | |
| contents = await file.read() | |
| print(f"[DETECT] File size: {len(contents)} bytes") | |
| image = Image.open(BytesIO(contents)) | |
| image_array = np.array(image) | |
| print(f"[DETECT] Image shape: {image_array.shape}") | |
| # Save for static serving | |
| filename = f"{datetime.now().strftime('%Y%m%d%H%M%S')}_{file.filename}" | |
| filepath = os.path.join("data/uploads", filename) | |
| with open(filepath, "wb") as f: | |
| f.write(contents) | |
| print(f"[DETECT] Saved to: {filepath}") | |
| image_url = f"/uploads/{filename}" | |
| # Extract GPS from EXIF | |
| gps_data = extract_gps_from_image(contents) | |
| base_lat = gps_data["latitude"] | |
| base_lon = gps_data["longitude"] | |
| print(f"[DETECT] GPS: lat={base_lat}, lon={base_lon}") | |
| # Run AI inference | |
| print("[DETECT] Running YOLO inference...") | |
| ensemble_results = ensemble.predict(image_array) | |
| print(f"[DETECT] Ensemble returned {len(ensemble_results)} detections") | |
| tta_results = tta_processor.predict(image_array) | |
| print(f"[DETECT] TTA returned {len(tta_results)} detections") | |
| all_detections = ensemble_results + tta_results | |
| print(f"[DETECT] Total raw detections: {len(all_detections)}") | |
| # Process and classify | |
| detections_with_severity = [] | |
| for det in all_detections: | |
| x1, y1, x2, y2 = det["box"] | |
| bbox_area = (x2 - x1) * (y2 - y1) | |
| image_area = image.size[0] * image.size[1] | |
| area_ratio = bbox_area / image_area | |
| severity = severity_classifier.classify(area_ratio, det["confidence"]) | |
| # Add slight GPS offset to separate markers if multiple damages in one image | |
| detection_idx = len(detections_with_severity) | |
| det_lat = base_lat + (detection_idx * 0.0001) | |
| det_lon = base_lon + (detection_idx * 0.0001) | |
| detections_with_severity.append({ | |
| "x1": int(x1), "y1": int(y1), "x2": int(x2), "y2": int(y2), | |
| "class_name": det["class_name"], | |
| "class_id": det["class_id"], | |
| "confidence": float(det["confidence"]), | |
| "severity": severity, | |
| "area_ratio": float(area_ratio), | |
| "votes": det.get("votes", 2), | |
| "latitude": float(det_lat), | |
| "longitude": float(det_lon), | |
| "image_url": image_url | |
| }) | |
| # Save detection result | |
| detection_id = str(uuid.uuid4()) | |
| DETECTION_CACHE[detection_id] = { | |
| "detections": detections_with_severity, | |
| "gps_data": gps_data, | |
| "timestamp": datetime.now().isoformat(), | |
| "image_filename": file.filename, | |
| "total_count": len(detections_with_severity), | |
| "image_url": image_url | |
| } | |
| # DB Storage | |
| db = SessionLocal() | |
| try: | |
| db_detection = Detection( | |
| detection_id=detection_id, | |
| image_filename=file.filename, | |
| gps_latitude=base_lat, | |
| gps_longitude=base_lon, | |
| total_detections=len(detections_with_severity), | |
| created_at=datetime.now() | |
| ) | |
| db.add(db_detection) | |
| for det in detections_with_severity: | |
| db_result = DetectionResult( | |
| detection_id=detection_id, | |
| class_name=det["class_name"], | |
| confidence=det["confidence"], | |
| severity=det["severity"], | |
| gps_latitude=det["latitude"], | |
| gps_longitude=det["longitude"], | |
| bbox_x1=det["x1"], bbox_y1=det["y1"], | |
| bbox_x2=det["x2"], bbox_y2=det["y2"] | |
| ) | |
| db.add(db_result) | |
| db.commit() | |
| except Exception as db_error: | |
| print(f"[WARN] Database error: {str(db_error)}") | |
| db.rollback() | |
| finally: | |
| db.close() | |
| print(f"[DETECT] SUCCESS: Returning {len(detections_with_severity)} detections") | |
| for i, det in enumerate(detections_with_severity): | |
| print(f"[DETECT] {i+1}. {det['class_name']} - conf: {det['confidence']:.3f}") | |
| print(f"{'='*60}") | |
| return JSONResponse({ | |
| "success": True, | |
| "detection_id": detection_id, | |
| "detections": detections_with_severity, | |
| "gps_data": gps_data, | |
| "total_count": len(detections_with_severity), | |
| "image_url": image_url | |
| }) | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_results(detection_id: str): | |
| if detection_id in DETECTION_CACHE: | |
| return DETECTION_CACHE[detection_id] | |
| # Try DB | |
| db = SessionLocal() | |
| try: | |
| d = db.query(Detection).filter(Detection.detection_id == detection_id).first() | |
| if not d: | |
| raise HTTPException(status_code=404, detail="Detection not found") | |
| results = db.query(DetectionResult).filter(DetectionResult.detection_id == detection_id).all() | |
| return { | |
| "detections": [ | |
| { | |
| "class_name": r.class_name, | |
| "confidence": r.confidence, | |
| "severity": r.severity, | |
| "latitude": r.gps_latitude, | |
| "longitude": r.gps_longitude, | |
| "x1": r.bbox_x1, "y1": r.bbox_y1, "x2": r.bbox_x2, "y2": r.bbox_y2, | |
| "image_url": f"/uploads/{d.image_filename}" # Assumption: filename is unique/enough | |
| } for r in results | |
| ], | |
| "timestamp": d.created_at.isoformat(), | |
| "image_filename": d.image_filename | |
| } | |
| finally: | |
| db.close() | |
| # ============================================ | |
| # PDF REPORT GENERATION | |
| # ============================================ | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.lib.styles import getSampleStyleSheet | |
| from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer | |
| from reportlab.lib.units import inch | |
| from reportlab.lib import colors | |
| async def download_pdf(detection_id: str): | |
| # Fetch data (from cache or DB) | |
| data = await get_results(detection_id) | |
| detections = data["detections"] | |
| pdf_filename = f"report_{detection_id}.pdf" | |
| pdf_path = f"/tmp/{pdf_filename}" | |
| if os.name == 'nt': # Windows fix | |
| pdf_path = f"report_{detection_id}.pdf" | |
| doc = SimpleDocTemplate(pdf_path, pagesize=letter) | |
| story = [] | |
| styles = getSampleStyleSheet() | |
| story.append(Paragraph("Road Damage Detection Report", styles['Title'])) | |
| story.append(Spacer(1, 0.2*inch)) | |
| table_data = [["Class", "Severity", "Confidence", "GPS"]] | |
| for det in detections: | |
| table_data.append([ | |
| det["class_name"], | |
| det["severity"], | |
| f"{det['confidence']:.2f}", | |
| f"{det['latitude']:.4f}, {det['longitude']:.4f}" | |
| ]) | |
| t = Table(table_data) | |
| t.setStyle(TableStyle([ | |
| ('BACKGROUND', (0, 0), (-1, 0), colors.grey), | |
| ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke), | |
| ('ALIGN', (0, 0), (-1, -1), 'CENTER'), | |
| ('GRID', (0, 0), (-1, -1), 1, colors.black) | |
| ])) | |
| story.append(t) | |
| doc.build(story) | |
| return FileResponse(pdf_path, filename=pdf_filename) | |
| async def health_check(): | |
| try: | |
| # Check DB connection | |
| db = SessionLocal() | |
| db.execute(func.now()) | |
| db.close() | |
| db_status = "connected" | |
| except Exception as e: | |
| db_status = f"error: {str(e)}" | |
| return { | |
| "status": "healthy", | |
| "models_loaded": True, | |
| "database": db_status, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| # ============================================ | |
| # 3. BATCH DETECTION ENDPOINT | |
| # ============================================ | |
| # Request model for analysis | |
| from pydantic import BaseModel | |
| class AnalysisRequest(BaseModel): | |
| filenames: List[str] | |
| async def analyze_survey(request: AnalysisRequest): | |
| print(f"\n{'='*60}") | |
| print(f"[ANALYZE] Starting analysis of {len(request.filenames)} files") | |
| print(f"{'='*60}") | |
| results = [] | |
| for idx, filename in enumerate(request.filenames): | |
| print(f"\n[ANALYZE] [{idx+1}/{len(request.filenames)}] Processing: {filename}") | |
| # Check if file exists in uploads | |
| filepath = os.path.join("data/uploads", filename) | |
| if not os.path.exists(filepath): | |
| print(f"[ANALYZE] ERROR: File not found: {filepath}") | |
| continue | |
| try: | |
| # Read image | |
| with open(filepath, "rb") as f: | |
| contents = f.read() | |
| print(f"[ANALYZE] File size: {len(contents)} bytes") | |
| image = Image.open(BytesIO(contents)) | |
| image_array = np.array(image) | |
| print(f"[ANALYZE] Image shape: {image_array.shape}") | |
| image_url = f"/uploads/{filename}" | |
| # Extract GPS (re-extract or we could cache it, but re-extract is safer/stateless) | |
| gps_data = extract_gps_from_image(contents) | |
| base_lat = gps_data["latitude"] | |
| base_lon = gps_data["longitude"] | |
| print(f"[ANALYZE] GPS: lat={base_lat}, lon={base_lon}") | |
| # Run API | |
| print(f"[ANALYZE] Running YOLO inference...") | |
| ensemble_results = ensemble.predict(image_array) | |
| print(f"[ANALYZE] Detected {len(ensemble_results)} objects") | |
| detections_processed = [] | |
| # If no detections, return clean status | |
| if not ensemble_results: | |
| results.append({ | |
| "x1": 0, "y1": 0, "x2": 0, "y2": 0, | |
| "class_name": "No Damage", | |
| "class_id": -1, | |
| "confidence": 1.0, | |
| "severity": "none", | |
| "latitude": float(base_lat), | |
| "longitude": float(base_lon), | |
| "image_url": image_url, | |
| "filename": filename | |
| }) | |
| for det in ensemble_results: | |
| detections_processed.append({ | |
| "x1": int(det["box"][0]), "y1": int(det["box"][1]), | |
| "x2": int(det["box"][2]), "y2": int(det["box"][3]), | |
| "class_name": det["class_name"], | |
| "class_id": det.get("class_id", 0), | |
| "confidence": float(det["confidence"]), | |
| "severity": "medium", # Simplified | |
| "latitude": float(base_lat), | |
| "longitude": float(base_lon), | |
| "image_url": image_url, | |
| "filename": filename | |
| }) | |
| results.extend(detections_processed) | |
| except Exception as e: | |
| print(f"[ERROR] Error analyzing {filename}: {e}") | |
| continue | |
| return { | |
| "success": True, | |
| "total_analyzed": len(request.filenames), | |
| "detections": results | |
| } | |
| # ============================================ | |
| # 4. UPLOAD-ONLY ENDPOINT (Map first, analyze later) | |
| # ============================================ | |
| async def upload_survey(files: List[UploadFile] = File(...)): | |
| """ | |
| Upload survey images and extract GPS data only. | |
| Returns 'Pending' points for map display. | |
| Use /analyze-survey to run AI detection later. | |
| """ | |
| results = [] | |
| print(f"[UPLOAD] Uploading {len(files)} survey files...") | |
| for file in files: | |
| # Skip non-image files | |
| if not file.content_type or not file.content_type.startswith('image/'): | |
| print(f"[SKIP] Skipping non-image: {file.filename}") | |
| continue | |
| try: | |
| # Read content | |
| contents = await file.read() | |
| image = Image.open(BytesIO(contents)) | |
| # Extract just the base filename (remove folder path if present) | |
| base_filename = os.path.basename(file.filename) if file.filename else f"upload_{uuid.uuid4().hex[:8]}.jpg" | |
| # Save for static serving with unique name | |
| unique_name = f"{datetime.now().strftime('%Y%m%d%H%M%S')}_{uuid.uuid4().hex[:8]}_{base_filename}" | |
| filepath = os.path.join("data/uploads", unique_name) | |
| # Ensure uploads directory exists | |
| os.makedirs("data/uploads", exist_ok=True) | |
| with open(filepath, "wb") as f: | |
| f.write(contents) | |
| image_url = f"/uploads/{unique_name}" | |
| # Extract GPS | |
| gps_data = extract_gps_from_image(contents) | |
| base_lat = gps_data["latitude"] | |
| base_lon = gps_data["longitude"] | |
| print(f"[GPS] GPS from {file.filename}: lat={base_lat}, lon={base_lon}, success={gps_data.get('success')}") | |
| # Return "Pending" point for map | |
| results.append({ | |
| "x1": 0, "y1": 0, "x2": 0, "y2": 0, | |
| "class_name": "Pending", | |
| "class_id": -2, | |
| "confidence": 0.0, | |
| "severity": "none", | |
| "latitude": float(base_lat), | |
| "longitude": float(base_lon), | |
| "image_url": image_url, | |
| "filename": unique_name | |
| }) | |
| print(f"[OK] Uploaded: {file.filename} -> {unique_name}") | |
| except Exception as e: | |
| import traceback | |
| print(f"[ERROR] Error uploading {file.filename}: {e}") | |
| traceback.print_exc() | |
| continue | |
| return { | |
| "success": True, | |
| "total_files": len(files), | |
| "total_uploaded": len(results), | |
| "detections": results | |
| } | |
| async def detect_batch(files: List[UploadFile] = File(...), analyze: Optional[str] = Form("true")): | |
| # Convert string to bool (FormData sends strings) | |
| should_analyze = analyze.lower() in ("true", "1", "yes") | |
| results = [] | |
| print(f"[BATCH] Processing batch of {len(files)} files (Analyze={should_analyze})...") | |
| for file in files: | |
| # Skip non-image files | |
| if not file.content_type.startswith('image/'): | |
| continue | |
| try: | |
| # Read content | |
| contents = await file.read() | |
| image = Image.open(BytesIO(contents)) | |
| # Save for static serving | |
| unique_name = f"{datetime.now().strftime('%Y%m%d%H%M%S')}_{uuid.uuid4().hex[:8]}_{file.filename}" | |
| filepath = os.path.join("data/uploads", unique_name) | |
| with open(filepath, "wb") as f: | |
| f.write(contents) | |
| image_url = f"/uploads/{unique_name}" | |
| # Extract GPS | |
| gps_data = extract_gps_from_image(contents) | |
| base_lat = gps_data["latitude"] | |
| base_lon = gps_data["longitude"] | |
| # IF NOT ANALYZING: Return "Pending" point | |
| if not should_analyze: | |
| results.append({ | |
| "x1": 0, "y1": 0, "x2": 0, "y2": 0, | |
| "class_name": "Pending", # Special class for UI | |
| "class_id": -2, | |
| "confidence": 0.0, | |
| "severity": "none", | |
| "latitude": float(base_lat), | |
| "longitude": float(base_lon), | |
| "image_url": image_url, | |
| "filename": unique_name # Return the saved filename for later analysis | |
| }) | |
| continue | |
| # IF ANALYZING (Legacy flow or manual trigger if we used this endpoint?) | |
| image_array = np.array(image) | |
| ensemble_results = ensemble.predict(image_array) | |
| detections_processed = [] | |
| if not ensemble_results: | |
| results.append({ | |
| "x1": 0, "y1": 0, "x2": 0, "y2": 0, | |
| "class_name": "No Damage", | |
| "class_id": -1, | |
| "confidence": 1.0, | |
| "severity": "none", | |
| "latitude": float(base_lat), | |
| "longitude": float(base_lon), | |
| "image_url": image_url, | |
| "filename": unique_name | |
| }) | |
| for det in ensemble_results: | |
| detections_processed.append({ | |
| "x1": int(det["box"][0]), "y1": int(det["box"][1]), | |
| "x2": int(det["box"][2]), "y2": int(det["box"][3]), | |
| "class_name": det["class_name"], | |
| "class_id": det.get("class_id", 0), | |
| "confidence": float(det["confidence"]), | |
| "severity": "medium", | |
| "latitude": float(base_lat), | |
| "longitude": float(base_lon), | |
| "image_url": image_url, | |
| "filename": unique_name | |
| }) | |
| results.extend(detections_processed) | |
| except Exception as e: | |
| print(f"[ERROR] Error processing {file.filename}: {e}") | |
| continue | |
| return { | |
| "success": True, | |
| "total_files": len(files), | |
| "total_detections": len(results), | |
| "detections": results | |
| } | |