Spaces:
Sleeping
Sleeping
barathvasan-dev
Fix: Parse HF Space detection response format correctly - handle single detection object not array
f9c9a7c | """ | |
| FastAPI Backend for ANPR System | |
| Exposes database functions and detection as REST API endpoints | |
| Compatible with React frontend and Hugging Spaces deployment | |
| """ | |
| import os | |
| import asyncio | |
| import json | |
| import traceback | |
| from datetime import datetime | |
| from io import BytesIO | |
| os.environ["OMP_NUM_THREADS"] = "1" | |
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException, WebSocket, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| import uvicorn | |
| import tempfile | |
| try: | |
| from gradio_client import Client, handle_file | |
| except ImportError: | |
| Client = None | |
| handle_file = None | |
| from detector import detect_plate | |
| from database import ( | |
| save_detection, | |
| run_query, | |
| health_check, | |
| get_vehicles_by_state, | |
| get_hourly_traffic, | |
| get_top_plates, | |
| get_suspicious_vehicles, | |
| get_route_history, | |
| get_vehicles_by_location, | |
| get_multi_location_detections, | |
| get_peak_traffic_hours, | |
| get_vehicle_density_by_location, | |
| HF_TOKEN, | |
| DATABASE_URL, | |
| client, | |
| engine | |
| ) | |
| # ========================================================= | |
| # FASTAPI APP SETUP | |
| # ========================================================= | |
| app = FastAPI( | |
| title="ANPR Command Hub API", | |
| description="Automatic Number Plate Recognition System API", | |
| version="1.0.0" | |
| ) | |
| # ========================================================= | |
| # HUGGINGFACE SPACES DETECTION | |
| # ========================================================= | |
| HF_SPACE_ID = "BARATH0070/plate-detector" | |
| hf_detection_client = None | |
| def init_hf_client(): | |
| """Initialize HuggingFace Space client for remote detection""" | |
| global hf_detection_client | |
| if Client is not None: | |
| try: | |
| hf_detection_client = Client(HF_SPACE_ID) | |
| print("✅ Connected to HuggingFace Space for detection") | |
| return True | |
| except Exception as e: | |
| print(f"⚠️ HuggingFace Space connection failed: {e}") | |
| return False | |
| return False | |
| def detect_via_hf_space(image_pil): | |
| """Call detection via HuggingFace Space API""" | |
| global hf_detection_client | |
| try: | |
| # Try to connect if not already connected | |
| if hf_detection_client is None: | |
| if Client is None: | |
| print("⚠️ gradio_client not available, using local detection") | |
| return detect_plate(image_pil) | |
| try: | |
| hf_detection_client = Client(HF_SPACE_ID) | |
| print("✅ Connected to HuggingFace Space for detection") | |
| except Exception as e: | |
| print(f"⚠️ HuggingFace Space connection failed: {e}") | |
| print("Falling back to local detection...") | |
| return detect_plate(image_pil) | |
| # Save image to temp file | |
| with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: | |
| image_pil.save(tmp.name) | |
| tmp_path = tmp.name | |
| try: | |
| image_input = handle_file(tmp_path) | |
| # Try different endpoint names (HF Space uses /detect_and_save) | |
| endpoints_to_try = [ | |
| "/detect_and_save", # Primary endpoint | |
| "/predict_0", # Backup | |
| "/predict", # Fallback | |
| None, # Try default | |
| ] | |
| result = None | |
| last_error = None | |
| for endpoint in endpoints_to_try: | |
| try: | |
| print(f"Trying HF Space endpoint: {endpoint}") | |
| if endpoint is None: | |
| result = hf_detection_client.predict(image_input) | |
| else: | |
| result = hf_detection_client.predict(image_input, api_name=endpoint) | |
| print(f"✅ HF Space detection successful with endpoint {endpoint}") | |
| break | |
| except Exception as e: | |
| last_error = e | |
| print(f"⚠️ Endpoint {endpoint} failed: {e}") | |
| continue | |
| if result is None: | |
| print(f"❌ All HF Space endpoints failed. Last error: {last_error}") | |
| print("Falling back to local detection...") | |
| return detect_plate(image_pil) | |
| # Parse the result from HF Space | |
| # HF Space returns: [status_message, detection_json] | |
| if isinstance(result, (list, tuple)) and len(result) >= 2: | |
| message, results_json = result[0], result[1] | |
| print(f"HF Space message: {message}") | |
| elif isinstance(result, dict): | |
| results_json = result | |
| message = results_json.get("message", "") | |
| else: | |
| print(f"Unexpected result format: {type(result)}, value: {result}") | |
| return detect_plate(image_pil) | |
| # Parse JSON if string | |
| if isinstance(results_json, str): | |
| try: | |
| results_json = json.loads(results_json) | |
| except Exception as e: | |
| print(f"Failed to parse JSON: {e}, {results_json}") | |
| results_json = {} | |
| # Extract detection info directly from HF Space response | |
| # HF Space returns a single detection object, not an array | |
| if isinstance(results_json, dict): | |
| plate = results_json.get("plate", "") | |
| state = results_json.get("state", "UNKNOWN") | |
| vehicle_type = results_json.get("vehicle_type", "UNKNOWN") | |
| vehicle_conf = float(results_json.get("confidence", 0.0)) | |
| success = bool(plate) and plate != "" | |
| else: | |
| plate = "" | |
| state = "UNKNOWN" | |
| vehicle_type = "UNKNOWN" | |
| vehicle_conf = 0.0 | |
| success = False | |
| print(f"✅ Detection result: plate={plate}, state={state}, type={vehicle_type}, conf={vehicle_conf}, success={success}") | |
| return plate, state, vehicle_type, vehicle_conf, success | |
| finally: | |
| if os.path.exists(tmp_path): | |
| os.remove(tmp_path) | |
| except Exception as e: | |
| print(f"❌ HF Space detection exception: {e}") | |
| traceback.print_exc() | |
| print("Falling back to local detection...") | |
| try: | |
| return detect_plate(image_pil) | |
| except Exception as local_e: | |
| print(f"❌ Local detection also failed: {local_e}") | |
| # Return empty result | |
| return "", "UNKNOWN", "UNKNOWN", 0.0, False | |
| # Initialize HF Space client on startup | |
| init_hf_client() | |
| # ========================================================= | |
| # REQUEST HELPERS | |
| # ========================================================= | |
| async def _read_request_payload(request: Request) -> dict: | |
| content_type = request.headers.get("content-type", "") | |
| if "application/json" in content_type: | |
| try: | |
| payload = await request.json() | |
| return payload if isinstance(payload, dict) else {} | |
| except Exception: | |
| return {} | |
| try: | |
| form = await request.form() | |
| return dict(form) | |
| except Exception: | |
| return {} | |
| def _build_rag_answer(response: dict) -> str: | |
| error = response.get("error") if isinstance(response, dict) else None | |
| if error: | |
| return str(error) | |
| result = response.get("result") if isinstance(response, dict) else None | |
| if not result: | |
| return "No results found." | |
| preview = result[:3] if isinstance(result, list) else result | |
| pretty = json.dumps(preview, indent=2, ensure_ascii=True) | |
| count = response.get("count", len(result)) if isinstance(result, list) else 1 | |
| return f"Found {count} record(s). Preview:\n{pretty}" | |
| # ========================================================= | |
| # CORS MIDDLEWARE | |
| # ========================================================= | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Allow all origins for development | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ========================================================= | |
| # STARTUP/SHUTDOWN | |
| # ========================================================= | |
| async def startup_event(): | |
| print("✅ ANPR Backend started") | |
| print(f"🔌 HF Token: {'✓' if HF_TOKEN else '✗'}") | |
| print(f"🗄️ Database: {'✓' if DATABASE_URL else '✗'}") | |
| async def shutdown_event(): | |
| print("🛑 ANPR Backend shutdown") | |
| # ========================================================= | |
| # HEALTH CHECK | |
| # ========================================================= | |
| async def health(): | |
| """Health check endpoint""" | |
| db_ok, db_msg = health_check() | |
| return { | |
| "status": "healthy" if db_ok else "degraded", | |
| "message": db_msg, | |
| "version": "1.0.0", | |
| "timestamp": datetime.now().isoformat(), | |
| "features": { | |
| "detection": True, | |
| "nlp": bool(HF_TOKEN), | |
| "database": bool(DATABASE_URL), | |
| } | |
| } | |
| # ========================================================= | |
| # DETECTION ENDPOINTS | |
| # ========================================================= | |
| async def upload_detect( | |
| file: UploadFile = File(...), | |
| area: str = Form("area_001") | |
| ): | |
| """ | |
| Upload image/video and detect vehicles | |
| Returns list of detected plates with metadata | |
| """ | |
| try: | |
| # Read file | |
| contents = await file.read() | |
| if not contents: | |
| raise HTTPException(status_code=400, detail="Empty file") | |
| # Convert to image | |
| from PIL import Image | |
| import io | |
| try: | |
| image = Image.open(io.BytesIO(contents)) | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Invalid image format: {str(e)}") | |
| # Detect plate using HuggingFace Space | |
| plate, state, vehicle_type, vehicle_conf, success = detect_via_hf_space(image) | |
| # Get current timestamp | |
| now = datetime.now() | |
| date = now.strftime("%Y-%m-%d") | |
| time = now.strftime("%H:%M:%S") | |
| # Save to database | |
| if success and plate: | |
| save_detection(plate, state, vehicle_type, vehicle_conf, date, time) | |
| detection = { | |
| "id": 1, | |
| "date": date, | |
| "time": time, | |
| "plate": plate, | |
| "plate_text": plate, | |
| "state": state, | |
| "vehicle_type": vehicle_type, | |
| "confidence": round(vehicle_conf, 3), | |
| "area": area, | |
| "timestamp": now.isoformat(), | |
| "saved": True, | |
| "alert_fired": False, # Would check against blocklist | |
| "thumbnail": None | |
| } | |
| return { | |
| "success": True, | |
| "detections": [detection], | |
| "count": 1, | |
| "message": f"Detected {plate} from {state}", | |
| "date": date, | |
| "time": time, | |
| "area": area | |
| } | |
| else: | |
| return { | |
| "success": True, | |
| "detections": [], | |
| "count": 0, | |
| "message": "No plates detected", | |
| "date": now.strftime("%Y-%m-%d"), | |
| "time": now.strftime("%H:%M:%S"), | |
| "area": area | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| print(f"❌ Upload Error: {e}") | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ========================================================= | |
| # CHECKPOINT ENDPOINTS | |
| # ========================================================= | |
| async def get_checkpoints(): | |
| """Get list of checkpoints""" | |
| # Demo checkpoints - can be replaced with database query | |
| checkpoints = [ | |
| { | |
| "checkpoint_id": "cp_001", | |
| "name": "Silk Board", | |
| "lat": 12.9698, | |
| "lng": 77.6356, | |
| "city": "Bengaluru" | |
| }, | |
| { | |
| "checkpoint_id": "cp_002", | |
| "name": "Whitefield", | |
| "lat": 12.9698, | |
| "lng": 77.7499, | |
| "city": "Bengaluru" | |
| }, | |
| { | |
| "checkpoint_id": "cp_003", | |
| "name": "Koramangala", | |
| "lat": 12.9352, | |
| "lng": 77.6245, | |
| "city": "Bengaluru" | |
| }, | |
| { | |
| "checkpoint_id": "cp_004", | |
| "name": "Indiranagar", | |
| "lat": 13.0012, | |
| "lng": 77.6410, | |
| "city": "Bengaluru" | |
| } | |
| ] | |
| return checkpoints | |
| async def get_checkpoint_log(checkpoint_id: str, limit: int = 50, offset: int = 0): | |
| """Get detection log for a specific checkpoint""" | |
| try: | |
| # Demo: return mock data | |
| return [ | |
| { | |
| "id": i, | |
| "plate_text": f"KA01AB{1234+i}", | |
| "timestamp": datetime.now().isoformat(), | |
| "vehicle_type": "car", | |
| "checkpoint_id": checkpoint_id | |
| } | |
| for i in range(limit) | |
| ] | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ========================================================= | |
| # VEHICLE SEARCH ENDPOINTS | |
| # ========================================================= | |
| async def search_vehicles(plate: str = None, state: str = None): | |
| """Search vehicles by plate or state""" | |
| try: | |
| if plate: | |
| # Get route history for plate | |
| results = get_route_history(plate) | |
| if results: | |
| return [{ | |
| "id": 1, | |
| "plate_text": plate, | |
| "state": results[0].get("state", "TN") if results else "TN", | |
| "vehicle_type": "car", | |
| "first_detected": results[-1].get("timestamp") if results else None, | |
| "last_detected": results[0].get("timestamp") if results else None, | |
| "count": len(results) | |
| }] | |
| return [] | |
| if state: | |
| results = get_vehicles_by_state() | |
| return [r for r in results if r.get("state") == state] | |
| return get_top_plates() | |
| except Exception as e: | |
| print(f"❌ Search Error: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_vehicle_history(plate: str): | |
| """Get full history for a vehicle""" | |
| try: | |
| results = get_route_history(plate, limit=100) | |
| if results: | |
| return { | |
| "plate": plate, | |
| "detections": results, | |
| "count": len(results) | |
| } | |
| return {"plate": plate, "detections": [], "count": 0} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_vehicle_journey(plate: str): | |
| """Get vehicle's travel route/journey""" | |
| try: | |
| results = get_route_history(plate, limit=100) | |
| if results: | |
| return [ | |
| { | |
| "checkpoint_id": r.get("camera_id", "unknown"), | |
| "checkpoint_name": r.get("location", "Unknown"), | |
| "timestamp": r.get("timestamp"), | |
| "lat": 12.97, # Demo coordinates | |
| "lng": 77.63, | |
| "road_split": "A" | |
| } | |
| for r in results | |
| ] | |
| return [] | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ========================================================= | |
| # BLOCKLIST ENDPOINTS | |
| # ========================================================= | |
| # In-memory blocklist for demo (use database in production) | |
| blocklist = [] | |
| async def get_blocklist(): | |
| """Get list of blocked vehicles""" | |
| return blocklist | |
| async def add_to_blocklist( | |
| request: Request, | |
| plate_text: str = Form(None), | |
| reason: str = Form(None), | |
| added_by: str = Form("admin") | |
| ): | |
| """Add vehicle to blocklist""" | |
| try: | |
| if not plate_text or not reason: | |
| payload = await _read_request_payload(request) | |
| plate_text = plate_text or payload.get("plate_text") | |
| reason = reason or payload.get("reason") | |
| added_by = payload.get("added_by", added_by) | |
| if not plate_text or not reason: | |
| raise HTTPException(status_code=400, detail="Missing plate_text or reason") | |
| entry = { | |
| "id": len(blocklist) + 1, | |
| "plate_text": plate_text, | |
| "reason": reason, | |
| "added_by": added_by, | |
| "added_at": datetime.now().isoformat() | |
| } | |
| blocklist.append(entry) | |
| return entry | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def remove_from_blocklist(plate_text: str): | |
| """Remove vehicle from blocklist""" | |
| global blocklist | |
| try: | |
| blocklist = [b for b in blocklist if b["plate_text"] != plate_text] | |
| return {"success": True, "message": f"Removed {plate_text} from blocklist"} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ========================================================= | |
| # ALERTS ENDPOINTS | |
| # ========================================================= | |
| alerts_list = [] | |
| async def get_alerts(checkpoint_id: str = None, limit: int = 20): | |
| """Get alerts (blocklist matches)""" | |
| try: | |
| if checkpoint_id: | |
| return [a for a in alerts_list if a.get("checkpoint_id") == checkpoint_id][:limit] | |
| return alerts_list[:limit] | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def acknowledge_alert(alert_id: int): | |
| """Mark alert as acknowledged""" | |
| try: | |
| for alert in alerts_list: | |
| if alert.get("alert_id") == alert_id: | |
| alert["acknowledged"] = True | |
| return {"success": True, "alert_id": alert_id} | |
| raise HTTPException(status_code=404, detail="Alert not found") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ========================================================= | |
| # ANALYTICS ENDPOINTS | |
| # ========================================================= | |
| async def get_analytics_summary(start_date: str = None, end_date: str = None): | |
| """Get analytics summary""" | |
| try: | |
| top_plates = get_top_plates() | |
| by_state = get_vehicles_by_state() | |
| hourly = get_hourly_traffic() | |
| return { | |
| "total_detections": sum(p.get("detections", 0) for p in top_plates), | |
| "total_alerts": len(alerts_list), | |
| "top_plates": top_plates[:10], | |
| "by_state": {s.get("state"): s.get("count", 0) for s in by_state}, | |
| "hourly_traffic": hourly or [] | |
| } | |
| except Exception as e: | |
| print(f"❌ Analytics Error: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_heatmap(): | |
| """Get traffic heatmap data""" | |
| try: | |
| density = get_vehicle_density_by_location() | |
| return [ | |
| { | |
| "lat": 12.9698 + (i % 5) * 0.01, | |
| "lng": 77.6356 + (i // 5) * 0.01, | |
| "checkpoint_id": f"cp_{i:03d}", | |
| "count": d.get("vehicle_count", 0), | |
| "intensity": min(1.0, d.get("vehicle_count", 0) / 100) | |
| } | |
| for i, d in enumerate(density[:20]) | |
| ] | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ========================================================= | |
| # CSV EXPORT | |
| # ========================================================= | |
| async def export_csv(start_date: str = None, end_date: str = None): | |
| """Export detections as CSV""" | |
| try: | |
| import csv | |
| from fastapi.responses import StreamingResponse | |
| import io | |
| top_plates = get_top_plates() | |
| # Create CSV in memory | |
| output = io.StringIO() | |
| writer = csv.writer(output) | |
| writer.writerow(["Plate", "State", "Detections"]) | |
| for plate in top_plates: | |
| writer.writerow([plate.get("plate"), plate.get("state"), plate.get("detections")]) | |
| # Return as file download | |
| output.seek(0) | |
| return StreamingResponse( | |
| iter([output.getvalue()]), | |
| media_type="text/csv", | |
| headers={"Content-Disposition": "attachment; filename=detections.csv"} | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ========================================================= | |
| # RAG QUERY ENDPOINT | |
| # ========================================================= | |
| async def rag_query(request: Request, query: str = Form(None)): | |
| """NLP-based database query using RAG""" | |
| try: | |
| if not query: | |
| payload = await _read_request_payload(request) | |
| query = payload.get("query") if isinstance(payload, dict) else None | |
| if not query: | |
| raise HTTPException(status_code=400, detail="Missing query") | |
| if not DATABASE_URL or not engine: | |
| return { | |
| "sql": "", | |
| "result": [], | |
| "error": "Database not configured" | |
| } | |
| response = run_query(query) | |
| response["answer"] = _build_rag_answer(response) | |
| return response | |
| except Exception as e: | |
| print(f"❌ RAG Query Error: {e}") | |
| traceback.print_exc() | |
| return { | |
| "sql": "", | |
| "result": [], | |
| "error": str(e) | |
| } | |
| # ========================================================= | |
| # WEBSOCKET - LIVE FEED | |
| # ========================================================= | |
| async def websocket_endpoint(websocket: WebSocket): | |
| """WebSocket endpoint for live feed""" | |
| await websocket.accept() | |
| try: | |
| # Send keepalive pings | |
| import json | |
| import random | |
| while True: | |
| # Send ping | |
| await websocket.send_json({"type": "ping"}) | |
| # Simulate detection event every 5 seconds | |
| await asyncio.sleep(5) | |
| # Random detection (demo) | |
| if random.random() > 0.7: | |
| detection = { | |
| "type": "DETECTION", | |
| "id": random.randint(1, 10000), | |
| "plate_text": f"KA01AB{random.randint(1000, 9999)}", | |
| "vehicle_type": random.choice(["car", "truck", "bus", "bike"]), | |
| "checkpoint_id": "cp_001", | |
| "road_split": random.choice(["A", "B", "C", "D"]), | |
| "timestamp": datetime.now().isoformat(), | |
| "confidence": round(random.uniform(0.85, 0.99), 3), | |
| "alert_fired": False | |
| } | |
| await websocket.send_json(detection) | |
| except Exception as e: | |
| print(f"❌ WebSocket Error: {e}") | |
| finally: | |
| try: | |
| await websocket.close() | |
| except: | |
| pass | |
| # ========================================================= | |
| # ROOT ENDPOINT | |
| # ========================================================= | |
| async def root(): | |
| """Root endpoint""" | |
| return { | |
| "message": "ANPR Command Hub API", | |
| "version": "1.0.0", | |
| "docs": "/docs", | |
| "health": "/api/health" | |
| } | |
| # ========================================================= | |
| # RUN SERVER | |
| # ========================================================= | |
| if __name__ == "__main__": | |
| uvicorn.run( | |
| "app_fastapi:app", | |
| host="0.0.0.0", | |
| port=8000, | |
| reload=True, | |
| log_level="info" | |
| ) | |