crashdetectionritz / server.py
agentsay's picture
modified: server.py
871d109
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
import numpy as np
from collections import deque
import joblib
import time
import datetime
import asyncio
from features import extract_features
app = FastAPI()
# -----------------------------
# CORS
# -----------------------------
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# -----------------------------
# CONFIG
# -----------------------------
WINDOW_SIZE = 120
CRASH_THRESHOLD = 0.5
COOLDOWN = 10
model = joblib.load("rf_model.pkl")
buffer = deque(maxlen=WINDOW_SIZE)
last_crash_time = 0
latest_location = {"lat": 0.0, "lon": 0.0}
# ⚑ shared state
latest_result = {"status": "collecting", "confidence": 0.0}
# ⚑ queue for async processing
data_queue = asyncio.Queue()
# -----------------------------
# LOCATION
# -----------------------------
@app.post("/location")
async def update_location(data: dict):
global latest_location
latest_location = {
"lat": float(data.get("lat", 0)),
"lon": float(data.get("lon", 0))
}
return {"status": "location updated"}
@app.get("/location")
async def get_location():
return latest_location
# -----------------------------
# πŸš€ ULTRA FAST INGEST
# -----------------------------
@app.post("/sensor")
async def sensor_ingest(request: Request):
try:
body = await request.json()
# handle all formats
if isinstance(body, list):
data_list = body
elif isinstance(body, dict) and "data" in body:
data_list = body["data"]
else:
data_list = [body]
# push to queue (NON-BLOCKING)
for d in data_list:
await data_queue.put(d)
# ⚑ instant response (no waiting)
return latest_result
except:
return {"status": "error"}
# -----------------------------
# ⚑ BACKGROUND PROCESSOR
# -----------------------------
async def process_data():
global last_crash_time, latest_result
history = []
trigger_count = 0
while True:
try:
d = await data_queue.get()
buffer.append([
float(d.get("ax", 0)),
float(d.get("ay", 0)),
float(d.get("az", 0)),
float(d.get("gx", 0)),
float(d.get("gy", 0)),
float(d.get("gz", 0)),
float(d.get("speed", 0))
])
if len(buffer) < WINDOW_SIZE:
latest_result = {"status": "collecting"}
continue
window = np.array(buffer)
# ML
features = extract_features(window)
prob = model.predict_proba([features])[0][1]
# physics boost
acc = np.sqrt(window[:,0]**2 + window[:,1]**2 + window[:,2]**2)
if np.max(acc) > 40:
prob += 0.2
prob = float(min(prob, 1.0))
# smoothing
history.append(prob)
history = history[-5:]
avg_prob = float(sum(history) / len(history))
# trigger logic
if avg_prob > CRASH_THRESHOLD:
trigger_count += 1
else:
trigger_count = 0
current_time = time.time()
if trigger_count >= 2 and (current_time - last_crash_time > COOLDOWN):
last_crash_time = current_time
latest_result = {
"event": "CRASH_DETECTED",
"confidence": avg_prob,
"timestamp": current_time,
"location": latest_location
}
print("\nπŸ’₯ CRASH DETECTED:", avg_prob)
else:
latest_result = {
"status": "monitoring",
"confidence": avg_prob
}
except Exception as e:
print("PROCESS ERROR:", e)
# -----------------------------
# START BACKGROUND TASK
# -----------------------------
@app.on_event("startup")
async def startup_event():
asyncio.create_task(process_data())
# -----------------------------
# ROOT
# -----------------------------
@app.get("/")
async def home():
return {"status": "Optimized Crash Detection Server πŸš€"}