from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware import numpy as np from collections import deque import joblib import time import datetime import asyncio from features import extract_features app = FastAPI() # ----------------------------- # CORS # ----------------------------- app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ----------------------------- # CONFIG # ----------------------------- WINDOW_SIZE = 120 CRASH_THRESHOLD = 0.5 COOLDOWN = 10 model = joblib.load("rf_model.pkl") buffer = deque(maxlen=WINDOW_SIZE) last_crash_time = 0 latest_location = {"lat": 0.0, "lon": 0.0} # ⚔ shared state latest_result = {"status": "collecting", "confidence": 0.0} # ⚔ queue for async processing data_queue = asyncio.Queue() # ----------------------------- # LOCATION # ----------------------------- @app.post("/location") async def update_location(data: dict): global latest_location latest_location = { "lat": float(data.get("lat", 0)), "lon": float(data.get("lon", 0)) } return {"status": "location updated"} @app.get("/location") async def get_location(): return latest_location # ----------------------------- # šŸš€ ULTRA FAST INGEST # ----------------------------- @app.post("/sensor") async def sensor_ingest(request: Request): try: body = await request.json() # handle all formats if isinstance(body, list): data_list = body elif isinstance(body, dict) and "data" in body: data_list = body["data"] else: data_list = [body] # push to queue (NON-BLOCKING) for d in data_list: await data_queue.put(d) # ⚔ instant response (no waiting) return latest_result except: return {"status": "error"} # ----------------------------- # ⚔ BACKGROUND PROCESSOR # ----------------------------- async def process_data(): global last_crash_time, latest_result history = [] trigger_count = 0 while True: try: d = await data_queue.get() buffer.append([ float(d.get("ax", 0)), float(d.get("ay", 0)), float(d.get("az", 0)), float(d.get("gx", 0)), float(d.get("gy", 0)), float(d.get("gz", 0)), float(d.get("speed", 0)) ]) if len(buffer) < WINDOW_SIZE: latest_result = {"status": "collecting"} continue window = np.array(buffer) # ML features = extract_features(window) prob = model.predict_proba([features])[0][1] # physics boost acc = np.sqrt(window[:,0]**2 + window[:,1]**2 + window[:,2]**2) if np.max(acc) > 40: prob += 0.2 prob = float(min(prob, 1.0)) # smoothing history.append(prob) history = history[-5:] avg_prob = float(sum(history) / len(history)) # trigger logic if avg_prob > CRASH_THRESHOLD: trigger_count += 1 else: trigger_count = 0 current_time = time.time() if trigger_count >= 2 and (current_time - last_crash_time > COOLDOWN): last_crash_time = current_time latest_result = { "event": "CRASH_DETECTED", "confidence": avg_prob, "timestamp": current_time, "location": latest_location } print("\nšŸ’„ CRASH DETECTED:", avg_prob) else: latest_result = { "status": "monitoring", "confidence": avg_prob } except Exception as e: print("PROCESS ERROR:", e) # ----------------------------- # START BACKGROUND TASK # ----------------------------- @app.on_event("startup") async def startup_event(): asyncio.create_task(process_data()) # ----------------------------- # ROOT # ----------------------------- @app.get("/") async def home(): return {"status": "Optimized Crash Detection Server šŸš€"}