plate-detector / app_fastapi.py
barathvasan-dev
Fix: Parse HF Space detection response format correctly - handle single detection object not array
f9c9a7c
"""
FastAPI Backend for ANPR System
Exposes database functions and detection as REST API endpoints
Compatible with React frontend and Hugging Spaces deployment
"""
import os
import asyncio
import json
import traceback
from datetime import datetime
from io import BytesIO
os.environ["OMP_NUM_THREADS"] = "1"
from fastapi import FastAPI, UploadFile, File, Form, HTTPException, WebSocket, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import uvicorn
import tempfile
try:
from gradio_client import Client, handle_file
except ImportError:
Client = None
handle_file = None
from detector import detect_plate
from database import (
save_detection,
run_query,
health_check,
get_vehicles_by_state,
get_hourly_traffic,
get_top_plates,
get_suspicious_vehicles,
get_route_history,
get_vehicles_by_location,
get_multi_location_detections,
get_peak_traffic_hours,
get_vehicle_density_by_location,
HF_TOKEN,
DATABASE_URL,
client,
engine
)
# =========================================================
# FASTAPI APP SETUP
# =========================================================
app = FastAPI(
title="ANPR Command Hub API",
description="Automatic Number Plate Recognition System API",
version="1.0.0"
)
# =========================================================
# HUGGINGFACE SPACES DETECTION
# =========================================================
HF_SPACE_ID = "BARATH0070/plate-detector"
hf_detection_client = None
def init_hf_client():
"""Initialize HuggingFace Space client for remote detection"""
global hf_detection_client
if Client is not None:
try:
hf_detection_client = Client(HF_SPACE_ID)
print("✅ Connected to HuggingFace Space for detection")
return True
except Exception as e:
print(f"⚠️ HuggingFace Space connection failed: {e}")
return False
return False
def detect_via_hf_space(image_pil):
"""Call detection via HuggingFace Space API"""
global hf_detection_client
try:
# Try to connect if not already connected
if hf_detection_client is None:
if Client is None:
print("⚠️ gradio_client not available, using local detection")
return detect_plate(image_pil)
try:
hf_detection_client = Client(HF_SPACE_ID)
print("✅ Connected to HuggingFace Space for detection")
except Exception as e:
print(f"⚠️ HuggingFace Space connection failed: {e}")
print("Falling back to local detection...")
return detect_plate(image_pil)
# Save image to temp file
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
image_pil.save(tmp.name)
tmp_path = tmp.name
try:
image_input = handle_file(tmp_path)
# Try different endpoint names (HF Space uses /detect_and_save)
endpoints_to_try = [
"/detect_and_save", # Primary endpoint
"/predict_0", # Backup
"/predict", # Fallback
None, # Try default
]
result = None
last_error = None
for endpoint in endpoints_to_try:
try:
print(f"Trying HF Space endpoint: {endpoint}")
if endpoint is None:
result = hf_detection_client.predict(image_input)
else:
result = hf_detection_client.predict(image_input, api_name=endpoint)
print(f"✅ HF Space detection successful with endpoint {endpoint}")
break
except Exception as e:
last_error = e
print(f"⚠️ Endpoint {endpoint} failed: {e}")
continue
if result is None:
print(f"❌ All HF Space endpoints failed. Last error: {last_error}")
print("Falling back to local detection...")
return detect_plate(image_pil)
# Parse the result from HF Space
# HF Space returns: [status_message, detection_json]
if isinstance(result, (list, tuple)) and len(result) >= 2:
message, results_json = result[0], result[1]
print(f"HF Space message: {message}")
elif isinstance(result, dict):
results_json = result
message = results_json.get("message", "")
else:
print(f"Unexpected result format: {type(result)}, value: {result}")
return detect_plate(image_pil)
# Parse JSON if string
if isinstance(results_json, str):
try:
results_json = json.loads(results_json)
except Exception as e:
print(f"Failed to parse JSON: {e}, {results_json}")
results_json = {}
# Extract detection info directly from HF Space response
# HF Space returns a single detection object, not an array
if isinstance(results_json, dict):
plate = results_json.get("plate", "")
state = results_json.get("state", "UNKNOWN")
vehicle_type = results_json.get("vehicle_type", "UNKNOWN")
vehicle_conf = float(results_json.get("confidence", 0.0))
success = bool(plate) and plate != ""
else:
plate = ""
state = "UNKNOWN"
vehicle_type = "UNKNOWN"
vehicle_conf = 0.0
success = False
print(f"✅ Detection result: plate={plate}, state={state}, type={vehicle_type}, conf={vehicle_conf}, success={success}")
return plate, state, vehicle_type, vehicle_conf, success
finally:
if os.path.exists(tmp_path):
os.remove(tmp_path)
except Exception as e:
print(f"❌ HF Space detection exception: {e}")
traceback.print_exc()
print("Falling back to local detection...")
try:
return detect_plate(image_pil)
except Exception as local_e:
print(f"❌ Local detection also failed: {local_e}")
# Return empty result
return "", "UNKNOWN", "UNKNOWN", 0.0, False
# Initialize HF Space client on startup
init_hf_client()
# =========================================================
# REQUEST HELPERS
# =========================================================
async def _read_request_payload(request: Request) -> dict:
content_type = request.headers.get("content-type", "")
if "application/json" in content_type:
try:
payload = await request.json()
return payload if isinstance(payload, dict) else {}
except Exception:
return {}
try:
form = await request.form()
return dict(form)
except Exception:
return {}
def _build_rag_answer(response: dict) -> str:
error = response.get("error") if isinstance(response, dict) else None
if error:
return str(error)
result = response.get("result") if isinstance(response, dict) else None
if not result:
return "No results found."
preview = result[:3] if isinstance(result, list) else result
pretty = json.dumps(preview, indent=2, ensure_ascii=True)
count = response.get("count", len(result)) if isinstance(result, list) else 1
return f"Found {count} record(s). Preview:\n{pretty}"
# =========================================================
# CORS MIDDLEWARE
# =========================================================
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allow all origins for development
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# =========================================================
# STARTUP/SHUTDOWN
# =========================================================
@app.on_event("startup")
async def startup_event():
print("✅ ANPR Backend started")
print(f"🔌 HF Token: {'✓' if HF_TOKEN else '✗'}")
print(f"🗄️ Database: {'✓' if DATABASE_URL else '✗'}")
@app.on_event("shutdown")
async def shutdown_event():
print("🛑 ANPR Backend shutdown")
# =========================================================
# HEALTH CHECK
# =========================================================
@app.get("/api/health")
async def health():
"""Health check endpoint"""
db_ok, db_msg = health_check()
return {
"status": "healthy" if db_ok else "degraded",
"message": db_msg,
"version": "1.0.0",
"timestamp": datetime.now().isoformat(),
"features": {
"detection": True,
"nlp": bool(HF_TOKEN),
"database": bool(DATABASE_URL),
}
}
# =========================================================
# DETECTION ENDPOINTS
# =========================================================
@app.post("/api/upload")
async def upload_detect(
file: UploadFile = File(...),
area: str = Form("area_001")
):
"""
Upload image/video and detect vehicles
Returns list of detected plates with metadata
"""
try:
# Read file
contents = await file.read()
if not contents:
raise HTTPException(status_code=400, detail="Empty file")
# Convert to image
from PIL import Image
import io
try:
image = Image.open(io.BytesIO(contents))
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid image format: {str(e)}")
# Detect plate using HuggingFace Space
plate, state, vehicle_type, vehicle_conf, success = detect_via_hf_space(image)
# Get current timestamp
now = datetime.now()
date = now.strftime("%Y-%m-%d")
time = now.strftime("%H:%M:%S")
# Save to database
if success and plate:
save_detection(plate, state, vehicle_type, vehicle_conf, date, time)
detection = {
"id": 1,
"date": date,
"time": time,
"plate": plate,
"plate_text": plate,
"state": state,
"vehicle_type": vehicle_type,
"confidence": round(vehicle_conf, 3),
"area": area,
"timestamp": now.isoformat(),
"saved": True,
"alert_fired": False, # Would check against blocklist
"thumbnail": None
}
return {
"success": True,
"detections": [detection],
"count": 1,
"message": f"Detected {plate} from {state}",
"date": date,
"time": time,
"area": area
}
else:
return {
"success": True,
"detections": [],
"count": 0,
"message": "No plates detected",
"date": now.strftime("%Y-%m-%d"),
"time": now.strftime("%H:%M:%S"),
"area": area
}
except HTTPException:
raise
except Exception as e:
print(f"❌ Upload Error: {e}")
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
# =========================================================
# CHECKPOINT ENDPOINTS
# =========================================================
@app.get("/api/checkpoints")
async def get_checkpoints():
"""Get list of checkpoints"""
# Demo checkpoints - can be replaced with database query
checkpoints = [
{
"checkpoint_id": "cp_001",
"name": "Silk Board",
"lat": 12.9698,
"lng": 77.6356,
"city": "Bengaluru"
},
{
"checkpoint_id": "cp_002",
"name": "Whitefield",
"lat": 12.9698,
"lng": 77.7499,
"city": "Bengaluru"
},
{
"checkpoint_id": "cp_003",
"name": "Koramangala",
"lat": 12.9352,
"lng": 77.6245,
"city": "Bengaluru"
},
{
"checkpoint_id": "cp_004",
"name": "Indiranagar",
"lat": 13.0012,
"lng": 77.6410,
"city": "Bengaluru"
}
]
return checkpoints
@app.get("/api/checkpoints/{checkpoint_id}/log")
async def get_checkpoint_log(checkpoint_id: str, limit: int = 50, offset: int = 0):
"""Get detection log for a specific checkpoint"""
try:
# Demo: return mock data
return [
{
"id": i,
"plate_text": f"KA01AB{1234+i}",
"timestamp": datetime.now().isoformat(),
"vehicle_type": "car",
"checkpoint_id": checkpoint_id
}
for i in range(limit)
]
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# =========================================================
# VEHICLE SEARCH ENDPOINTS
# =========================================================
@app.get("/api/vehicles/search")
async def search_vehicles(plate: str = None, state: str = None):
"""Search vehicles by plate or state"""
try:
if plate:
# Get route history for plate
results = get_route_history(plate)
if results:
return [{
"id": 1,
"plate_text": plate,
"state": results[0].get("state", "TN") if results else "TN",
"vehicle_type": "car",
"first_detected": results[-1].get("timestamp") if results else None,
"last_detected": results[0].get("timestamp") if results else None,
"count": len(results)
}]
return []
if state:
results = get_vehicles_by_state()
return [r for r in results if r.get("state") == state]
return get_top_plates()
except Exception as e:
print(f"❌ Search Error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/vehicles/{plate}")
async def get_vehicle_history(plate: str):
"""Get full history for a vehicle"""
try:
results = get_route_history(plate, limit=100)
if results:
return {
"plate": plate,
"detections": results,
"count": len(results)
}
return {"plate": plate, "detections": [], "count": 0}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/vehicles/{plate}/journey")
async def get_vehicle_journey(plate: str):
"""Get vehicle's travel route/journey"""
try:
results = get_route_history(plate, limit=100)
if results:
return [
{
"checkpoint_id": r.get("camera_id", "unknown"),
"checkpoint_name": r.get("location", "Unknown"),
"timestamp": r.get("timestamp"),
"lat": 12.97, # Demo coordinates
"lng": 77.63,
"road_split": "A"
}
for r in results
]
return []
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# =========================================================
# BLOCKLIST ENDPOINTS
# =========================================================
# In-memory blocklist for demo (use database in production)
blocklist = []
@app.get("/api/blocklist")
async def get_blocklist():
"""Get list of blocked vehicles"""
return blocklist
@app.post("/api/blocklist")
async def add_to_blocklist(
request: Request,
plate_text: str = Form(None),
reason: str = Form(None),
added_by: str = Form("admin")
):
"""Add vehicle to blocklist"""
try:
if not plate_text or not reason:
payload = await _read_request_payload(request)
plate_text = plate_text or payload.get("plate_text")
reason = reason or payload.get("reason")
added_by = payload.get("added_by", added_by)
if not plate_text or not reason:
raise HTTPException(status_code=400, detail="Missing plate_text or reason")
entry = {
"id": len(blocklist) + 1,
"plate_text": plate_text,
"reason": reason,
"added_by": added_by,
"added_at": datetime.now().isoformat()
}
blocklist.append(entry)
return entry
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/api/blocklist/{plate_text}")
async def remove_from_blocklist(plate_text: str):
"""Remove vehicle from blocklist"""
global blocklist
try:
blocklist = [b for b in blocklist if b["plate_text"] != plate_text]
return {"success": True, "message": f"Removed {plate_text} from blocklist"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# =========================================================
# ALERTS ENDPOINTS
# =========================================================
alerts_list = []
@app.get("/api/alerts")
async def get_alerts(checkpoint_id: str = None, limit: int = 20):
"""Get alerts (blocklist matches)"""
try:
if checkpoint_id:
return [a for a in alerts_list if a.get("checkpoint_id") == checkpoint_id][:limit]
return alerts_list[:limit]
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.patch("/api/alerts/{alert_id}/acknowledge")
async def acknowledge_alert(alert_id: int):
"""Mark alert as acknowledged"""
try:
for alert in alerts_list:
if alert.get("alert_id") == alert_id:
alert["acknowledged"] = True
return {"success": True, "alert_id": alert_id}
raise HTTPException(status_code=404, detail="Alert not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# =========================================================
# ANALYTICS ENDPOINTS
# =========================================================
@app.get("/api/analytics/summary")
async def get_analytics_summary(start_date: str = None, end_date: str = None):
"""Get analytics summary"""
try:
top_plates = get_top_plates()
by_state = get_vehicles_by_state()
hourly = get_hourly_traffic()
return {
"total_detections": sum(p.get("detections", 0) for p in top_plates),
"total_alerts": len(alerts_list),
"top_plates": top_plates[:10],
"by_state": {s.get("state"): s.get("count", 0) for s in by_state},
"hourly_traffic": hourly or []
}
except Exception as e:
print(f"❌ Analytics Error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/analytics/heatmap")
async def get_heatmap():
"""Get traffic heatmap data"""
try:
density = get_vehicle_density_by_location()
return [
{
"lat": 12.9698 + (i % 5) * 0.01,
"lng": 77.6356 + (i // 5) * 0.01,
"checkpoint_id": f"cp_{i:03d}",
"count": d.get("vehicle_count", 0),
"intensity": min(1.0, d.get("vehicle_count", 0) / 100)
}
for i, d in enumerate(density[:20])
]
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# =========================================================
# CSV EXPORT
# =========================================================
@app.get("/api/export/csv")
async def export_csv(start_date: str = None, end_date: str = None):
"""Export detections as CSV"""
try:
import csv
from fastapi.responses import StreamingResponse
import io
top_plates = get_top_plates()
# Create CSV in memory
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["Plate", "State", "Detections"])
for plate in top_plates:
writer.writerow([plate.get("plate"), plate.get("state"), plate.get("detections")])
# Return as file download
output.seek(0)
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=detections.csv"}
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# =========================================================
# RAG QUERY ENDPOINT
# =========================================================
@app.post("/api/rag/query")
async def rag_query(request: Request, query: str = Form(None)):
"""NLP-based database query using RAG"""
try:
if not query:
payload = await _read_request_payload(request)
query = payload.get("query") if isinstance(payload, dict) else None
if not query:
raise HTTPException(status_code=400, detail="Missing query")
if not DATABASE_URL or not engine:
return {
"sql": "",
"result": [],
"error": "Database not configured"
}
response = run_query(query)
response["answer"] = _build_rag_answer(response)
return response
except Exception as e:
print(f"❌ RAG Query Error: {e}")
traceback.print_exc()
return {
"sql": "",
"result": [],
"error": str(e)
}
# =========================================================
# WEBSOCKET - LIVE FEED
# =========================================================
@app.websocket("/ws/live")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket endpoint for live feed"""
await websocket.accept()
try:
# Send keepalive pings
import json
import random
while True:
# Send ping
await websocket.send_json({"type": "ping"})
# Simulate detection event every 5 seconds
await asyncio.sleep(5)
# Random detection (demo)
if random.random() > 0.7:
detection = {
"type": "DETECTION",
"id": random.randint(1, 10000),
"plate_text": f"KA01AB{random.randint(1000, 9999)}",
"vehicle_type": random.choice(["car", "truck", "bus", "bike"]),
"checkpoint_id": "cp_001",
"road_split": random.choice(["A", "B", "C", "D"]),
"timestamp": datetime.now().isoformat(),
"confidence": round(random.uniform(0.85, 0.99), 3),
"alert_fired": False
}
await websocket.send_json(detection)
except Exception as e:
print(f"❌ WebSocket Error: {e}")
finally:
try:
await websocket.close()
except:
pass
# =========================================================
# ROOT ENDPOINT
# =========================================================
@app.get("/")
async def root():
"""Root endpoint"""
return {
"message": "ANPR Command Hub API",
"version": "1.0.0",
"docs": "/docs",
"health": "/api/health"
}
# =========================================================
# RUN SERVER
# =========================================================
if __name__ == "__main__":
uvicorn.run(
"app_fastapi:app",
host="0.0.0.0",
port=8000,
reload=True,
log_level="info"
)