""" FastAPI Backend for ANPR System Exposes database functions and detection as REST API endpoints Compatible with React frontend and Hugging Spaces deployment """ import os import asyncio import json import traceback from datetime import datetime from io import BytesIO os.environ["OMP_NUM_THREADS"] = "1" from fastapi import FastAPI, UploadFile, File, Form, HTTPException, WebSocket, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse import uvicorn import tempfile try: from gradio_client import Client, handle_file except ImportError: Client = None handle_file = None from detector import detect_plate from database import ( save_detection, run_query, health_check, get_vehicles_by_state, get_hourly_traffic, get_top_plates, get_suspicious_vehicles, get_route_history, get_vehicles_by_location, get_multi_location_detections, get_peak_traffic_hours, get_vehicle_density_by_location, HF_TOKEN, DATABASE_URL, client, engine ) # ========================================================= # FASTAPI APP SETUP # ========================================================= app = FastAPI( title="ANPR Command Hub API", description="Automatic Number Plate Recognition System API", version="1.0.0" ) # ========================================================= # HUGGINGFACE SPACES DETECTION # ========================================================= HF_SPACE_ID = "BARATH0070/plate-detector" hf_detection_client = None def init_hf_client(): """Initialize HuggingFace Space client for remote detection""" global hf_detection_client if Client is not None: try: hf_detection_client = Client(HF_SPACE_ID) print("✅ Connected to HuggingFace Space for detection") return True except Exception as e: print(f"⚠️ HuggingFace Space connection failed: {e}") return False return False def detect_via_hf_space(image_pil): """Call detection via HuggingFace Space API""" global hf_detection_client try: # Try to connect if not already connected if hf_detection_client is None: if Client is None: print("⚠️ gradio_client not available, using local detection") return detect_plate(image_pil) try: hf_detection_client = Client(HF_SPACE_ID) print("✅ Connected to HuggingFace Space for detection") except Exception as e: print(f"⚠️ HuggingFace Space connection failed: {e}") print("Falling back to local detection...") return detect_plate(image_pil) # Save image to temp file with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: image_pil.save(tmp.name) tmp_path = tmp.name try: image_input = handle_file(tmp_path) # Try different endpoint names (HF Space uses /detect_and_save) endpoints_to_try = [ "/detect_and_save", # Primary endpoint "/predict_0", # Backup "/predict", # Fallback None, # Try default ] result = None last_error = None for endpoint in endpoints_to_try: try: print(f"Trying HF Space endpoint: {endpoint}") if endpoint is None: result = hf_detection_client.predict(image_input) else: result = hf_detection_client.predict(image_input, api_name=endpoint) print(f"✅ HF Space detection successful with endpoint {endpoint}") break except Exception as e: last_error = e print(f"⚠️ Endpoint {endpoint} failed: {e}") continue if result is None: print(f"❌ All HF Space endpoints failed. Last error: {last_error}") print("Falling back to local detection...") return detect_plate(image_pil) # Parse the result from HF Space # HF Space returns: [status_message, detection_json] if isinstance(result, (list, tuple)) and len(result) >= 2: message, results_json = result[0], result[1] print(f"HF Space message: {message}") elif isinstance(result, dict): results_json = result message = results_json.get("message", "") else: print(f"Unexpected result format: {type(result)}, value: {result}") return detect_plate(image_pil) # Parse JSON if string if isinstance(results_json, str): try: results_json = json.loads(results_json) except Exception as e: print(f"Failed to parse JSON: {e}, {results_json}") results_json = {} # Extract detection info directly from HF Space response # HF Space returns a single detection object, not an array if isinstance(results_json, dict): plate = results_json.get("plate", "") state = results_json.get("state", "UNKNOWN") vehicle_type = results_json.get("vehicle_type", "UNKNOWN") vehicle_conf = float(results_json.get("confidence", 0.0)) success = bool(plate) and plate != "" else: plate = "" state = "UNKNOWN" vehicle_type = "UNKNOWN" vehicle_conf = 0.0 success = False print(f"✅ Detection result: plate={plate}, state={state}, type={vehicle_type}, conf={vehicle_conf}, success={success}") return plate, state, vehicle_type, vehicle_conf, success finally: if os.path.exists(tmp_path): os.remove(tmp_path) except Exception as e: print(f"❌ HF Space detection exception: {e}") traceback.print_exc() print("Falling back to local detection...") try: return detect_plate(image_pil) except Exception as local_e: print(f"❌ Local detection also failed: {local_e}") # Return empty result return "", "UNKNOWN", "UNKNOWN", 0.0, False # Initialize HF Space client on startup init_hf_client() # ========================================================= # REQUEST HELPERS # ========================================================= async def _read_request_payload(request: Request) -> dict: content_type = request.headers.get("content-type", "") if "application/json" in content_type: try: payload = await request.json() return payload if isinstance(payload, dict) else {} except Exception: return {} try: form = await request.form() return dict(form) except Exception: return {} def _build_rag_answer(response: dict) -> str: error = response.get("error") if isinstance(response, dict) else None if error: return str(error) result = response.get("result") if isinstance(response, dict) else None if not result: return "No results found." preview = result[:3] if isinstance(result, list) else result pretty = json.dumps(preview, indent=2, ensure_ascii=True) count = response.get("count", len(result)) if isinstance(result, list) else 1 return f"Found {count} record(s). Preview:\n{pretty}" # ========================================================= # CORS MIDDLEWARE # ========================================================= app.add_middleware( CORSMiddleware, allow_origins=["*"], # Allow all origins for development allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ========================================================= # STARTUP/SHUTDOWN # ========================================================= @app.on_event("startup") async def startup_event(): print("✅ ANPR Backend started") print(f"🔌 HF Token: {'✓' if HF_TOKEN else '✗'}") print(f"🗄️ Database: {'✓' if DATABASE_URL else '✗'}") @app.on_event("shutdown") async def shutdown_event(): print("🛑 ANPR Backend shutdown") # ========================================================= # HEALTH CHECK # ========================================================= @app.get("/api/health") async def health(): """Health check endpoint""" db_ok, db_msg = health_check() return { "status": "healthy" if db_ok else "degraded", "message": db_msg, "version": "1.0.0", "timestamp": datetime.now().isoformat(), "features": { "detection": True, "nlp": bool(HF_TOKEN), "database": bool(DATABASE_URL), } } # ========================================================= # DETECTION ENDPOINTS # ========================================================= @app.post("/api/upload") async def upload_detect( file: UploadFile = File(...), area: str = Form("area_001") ): """ Upload image/video and detect vehicles Returns list of detected plates with metadata """ try: # Read file contents = await file.read() if not contents: raise HTTPException(status_code=400, detail="Empty file") # Convert to image from PIL import Image import io try: image = Image.open(io.BytesIO(contents)) except Exception as e: raise HTTPException(status_code=400, detail=f"Invalid image format: {str(e)}") # Detect plate using HuggingFace Space plate, state, vehicle_type, vehicle_conf, success = detect_via_hf_space(image) # Get current timestamp now = datetime.now() date = now.strftime("%Y-%m-%d") time = now.strftime("%H:%M:%S") # Save to database if success and plate: save_detection(plate, state, vehicle_type, vehicle_conf, date, time) detection = { "id": 1, "date": date, "time": time, "plate": plate, "plate_text": plate, "state": state, "vehicle_type": vehicle_type, "confidence": round(vehicle_conf, 3), "area": area, "timestamp": now.isoformat(), "saved": True, "alert_fired": False, # Would check against blocklist "thumbnail": None } return { "success": True, "detections": [detection], "count": 1, "message": f"Detected {plate} from {state}", "date": date, "time": time, "area": area } else: return { "success": True, "detections": [], "count": 0, "message": "No plates detected", "date": now.strftime("%Y-%m-%d"), "time": now.strftime("%H:%M:%S"), "area": area } except HTTPException: raise except Exception as e: print(f"❌ Upload Error: {e}") traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) # ========================================================= # CHECKPOINT ENDPOINTS # ========================================================= @app.get("/api/checkpoints") async def get_checkpoints(): """Get list of checkpoints""" # Demo checkpoints - can be replaced with database query checkpoints = [ { "checkpoint_id": "cp_001", "name": "Silk Board", "lat": 12.9698, "lng": 77.6356, "city": "Bengaluru" }, { "checkpoint_id": "cp_002", "name": "Whitefield", "lat": 12.9698, "lng": 77.7499, "city": "Bengaluru" }, { "checkpoint_id": "cp_003", "name": "Koramangala", "lat": 12.9352, "lng": 77.6245, "city": "Bengaluru" }, { "checkpoint_id": "cp_004", "name": "Indiranagar", "lat": 13.0012, "lng": 77.6410, "city": "Bengaluru" } ] return checkpoints @app.get("/api/checkpoints/{checkpoint_id}/log") async def get_checkpoint_log(checkpoint_id: str, limit: int = 50, offset: int = 0): """Get detection log for a specific checkpoint""" try: # Demo: return mock data return [ { "id": i, "plate_text": f"KA01AB{1234+i}", "timestamp": datetime.now().isoformat(), "vehicle_type": "car", "checkpoint_id": checkpoint_id } for i in range(limit) ] except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ========================================================= # VEHICLE SEARCH ENDPOINTS # ========================================================= @app.get("/api/vehicles/search") async def search_vehicles(plate: str = None, state: str = None): """Search vehicles by plate or state""" try: if plate: # Get route history for plate results = get_route_history(plate) if results: return [{ "id": 1, "plate_text": plate, "state": results[0].get("state", "TN") if results else "TN", "vehicle_type": "car", "first_detected": results[-1].get("timestamp") if results else None, "last_detected": results[0].get("timestamp") if results else None, "count": len(results) }] return [] if state: results = get_vehicles_by_state() return [r for r in results if r.get("state") == state] return get_top_plates() except Exception as e: print(f"❌ Search Error: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/vehicles/{plate}") async def get_vehicle_history(plate: str): """Get full history for a vehicle""" try: results = get_route_history(plate, limit=100) if results: return { "plate": plate, "detections": results, "count": len(results) } return {"plate": plate, "detections": [], "count": 0} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/vehicles/{plate}/journey") async def get_vehicle_journey(plate: str): """Get vehicle's travel route/journey""" try: results = get_route_history(plate, limit=100) if results: return [ { "checkpoint_id": r.get("camera_id", "unknown"), "checkpoint_name": r.get("location", "Unknown"), "timestamp": r.get("timestamp"), "lat": 12.97, # Demo coordinates "lng": 77.63, "road_split": "A" } for r in results ] return [] except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ========================================================= # BLOCKLIST ENDPOINTS # ========================================================= # In-memory blocklist for demo (use database in production) blocklist = [] @app.get("/api/blocklist") async def get_blocklist(): """Get list of blocked vehicles""" return blocklist @app.post("/api/blocklist") async def add_to_blocklist( request: Request, plate_text: str = Form(None), reason: str = Form(None), added_by: str = Form("admin") ): """Add vehicle to blocklist""" try: if not plate_text or not reason: payload = await _read_request_payload(request) plate_text = plate_text or payload.get("plate_text") reason = reason or payload.get("reason") added_by = payload.get("added_by", added_by) if not plate_text or not reason: raise HTTPException(status_code=400, detail="Missing plate_text or reason") entry = { "id": len(blocklist) + 1, "plate_text": plate_text, "reason": reason, "added_by": added_by, "added_at": datetime.now().isoformat() } blocklist.append(entry) return entry except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.delete("/api/blocklist/{plate_text}") async def remove_from_blocklist(plate_text: str): """Remove vehicle from blocklist""" global blocklist try: blocklist = [b for b in blocklist if b["plate_text"] != plate_text] return {"success": True, "message": f"Removed {plate_text} from blocklist"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ========================================================= # ALERTS ENDPOINTS # ========================================================= alerts_list = [] @app.get("/api/alerts") async def get_alerts(checkpoint_id: str = None, limit: int = 20): """Get alerts (blocklist matches)""" try: if checkpoint_id: return [a for a in alerts_list if a.get("checkpoint_id") == checkpoint_id][:limit] return alerts_list[:limit] except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.patch("/api/alerts/{alert_id}/acknowledge") async def acknowledge_alert(alert_id: int): """Mark alert as acknowledged""" try: for alert in alerts_list: if alert.get("alert_id") == alert_id: alert["acknowledged"] = True return {"success": True, "alert_id": alert_id} raise HTTPException(status_code=404, detail="Alert not found") except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ========================================================= # ANALYTICS ENDPOINTS # ========================================================= @app.get("/api/analytics/summary") async def get_analytics_summary(start_date: str = None, end_date: str = None): """Get analytics summary""" try: top_plates = get_top_plates() by_state = get_vehicles_by_state() hourly = get_hourly_traffic() return { "total_detections": sum(p.get("detections", 0) for p in top_plates), "total_alerts": len(alerts_list), "top_plates": top_plates[:10], "by_state": {s.get("state"): s.get("count", 0) for s in by_state}, "hourly_traffic": hourly or [] } except Exception as e: print(f"❌ Analytics Error: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/analytics/heatmap") async def get_heatmap(): """Get traffic heatmap data""" try: density = get_vehicle_density_by_location() return [ { "lat": 12.9698 + (i % 5) * 0.01, "lng": 77.6356 + (i // 5) * 0.01, "checkpoint_id": f"cp_{i:03d}", "count": d.get("vehicle_count", 0), "intensity": min(1.0, d.get("vehicle_count", 0) / 100) } for i, d in enumerate(density[:20]) ] except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ========================================================= # CSV EXPORT # ========================================================= @app.get("/api/export/csv") async def export_csv(start_date: str = None, end_date: str = None): """Export detections as CSV""" try: import csv from fastapi.responses import StreamingResponse import io top_plates = get_top_plates() # Create CSV in memory output = io.StringIO() writer = csv.writer(output) writer.writerow(["Plate", "State", "Detections"]) for plate in top_plates: writer.writerow([plate.get("plate"), plate.get("state"), plate.get("detections")]) # Return as file download output.seek(0) return StreamingResponse( iter([output.getvalue()]), media_type="text/csv", headers={"Content-Disposition": "attachment; filename=detections.csv"} ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ========================================================= # RAG QUERY ENDPOINT # ========================================================= @app.post("/api/rag/query") async def rag_query(request: Request, query: str = Form(None)): """NLP-based database query using RAG""" try: if not query: payload = await _read_request_payload(request) query = payload.get("query") if isinstance(payload, dict) else None if not query: raise HTTPException(status_code=400, detail="Missing query") if not DATABASE_URL or not engine: return { "sql": "", "result": [], "error": "Database not configured" } response = run_query(query) response["answer"] = _build_rag_answer(response) return response except Exception as e: print(f"❌ RAG Query Error: {e}") traceback.print_exc() return { "sql": "", "result": [], "error": str(e) } # ========================================================= # WEBSOCKET - LIVE FEED # ========================================================= @app.websocket("/ws/live") async def websocket_endpoint(websocket: WebSocket): """WebSocket endpoint for live feed""" await websocket.accept() try: # Send keepalive pings import json import random while True: # Send ping await websocket.send_json({"type": "ping"}) # Simulate detection event every 5 seconds await asyncio.sleep(5) # Random detection (demo) if random.random() > 0.7: detection = { "type": "DETECTION", "id": random.randint(1, 10000), "plate_text": f"KA01AB{random.randint(1000, 9999)}", "vehicle_type": random.choice(["car", "truck", "bus", "bike"]), "checkpoint_id": "cp_001", "road_split": random.choice(["A", "B", "C", "D"]), "timestamp": datetime.now().isoformat(), "confidence": round(random.uniform(0.85, 0.99), 3), "alert_fired": False } await websocket.send_json(detection) except Exception as e: print(f"❌ WebSocket Error: {e}") finally: try: await websocket.close() except: pass # ========================================================= # ROOT ENDPOINT # ========================================================= @app.get("/") async def root(): """Root endpoint""" return { "message": "ANPR Command Hub API", "version": "1.0.0", "docs": "/docs", "health": "/api/health" } # ========================================================= # RUN SERVER # ========================================================= if __name__ == "__main__": uvicorn.run( "app_fastapi:app", host="0.0.0.0", port=8000, reload=True, log_level="info" )