f1-telemetry / main.py
AdeshJain's picture
Upload 6 files
36c05b7 verified
from fastapi import FastAPI, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from typing import Dict, Any
import numpy as np
from data_service import fetch_thingspeak_data
from ml_service import ml_engine
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: Pre-train models on mock/live data so dashboard works immediately
print("Pre-training models on startup...")
df = fetch_thingspeak_data(results=200)
success = ml_engine.train_mock(df)
if success:
print("Models successfully pre-trained.")
else:
print("Model pre-training failed (insufficient data).")
yield
# Shutdown: nothing to clean up
app = FastAPI(title="Vehicle Telemetry API", lifespan=lifespan)
# Allow CORS for Next.js frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Simplified for now to rule out CORS as the cause of "Failed to fetch"
allow_credentials=False, # Credentials must be False if using wildcard origin
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/data")
def get_data() -> Dict[str, Any]:
"""Returns the latest telemetry data points."""
df = fetch_thingspeak_data(results=20)
# Calculate acceleration magnitude for the frontend
if not df.empty:
df['acc_mag'] = np.sqrt(df['ax']**2 + df['ay']**2 + df['az']**2)
else:
df['acc_mag'] = []
return {
"status": "success",
"data": df.to_dict(orient="records")
}
@app.get("/metrics")
def get_metrics() -> Dict[str, Any]:
"""Returns aggregated ride performance metrics."""
df = fetch_thingspeak_data(results=50) # Fetch more for aggregates
if df.empty:
return {
"avg_rpm": 0.0,
"max_rpm": 0.0,
"avg_speed_kmh": 0.0,
"efficiency_score": 0
}
avg_rpm = df['rpm'].mean()
max_rpm = df['rpm'].max()
avg_speed_kmh = df['speed_kmh'].mean()
# Efficiency score based on jerk/smoothness
acc_mag = np.sqrt(df['ax']**2 + df['ay']**2 + df['az']**2)
jerk = acc_mag.diff().abs().mean()
efficiency = max(0, min(100, 100 - (jerk * 5)))
return {
"avg_rpm": round(float(avg_rpm), 1),
"max_rpm": round(float(max_rpm), 1),
"avg_speed_kmh": round(float(avg_speed_kmh), 1),
"efficiency_score": round(float(efficiency if not np.isnan(efficiency) else 100))
}
@app.get("/ai")
def get_ai_insights() -> Dict[str, Any]:
"""Returns AI model predictions on latest data."""
df = fetch_thingspeak_data(results=10)
insights = ml_engine.get_insights(df)
return {
"driver_behavior": insights["behavior"],
"anomaly_status": insights["anomaly_status"],
}
@app.get("/predict")
def get_prediction() -> Dict[str, Any]:
"""Returns predicted next speed."""
df = fetch_thingspeak_data(results=10)
insights = ml_engine.get_insights(df)
return {
"predicted_speed": insights["predicted_speed"]
}
@app.post("/train")
def trigger_training(background_tasks: BackgroundTasks):
"""Triggers model training using collected dataset."""
def train_task():
df = fetch_thingspeak_data(results=500)
ml_engine.train_mock(df)
print("Background training completed.")
background_tasks.add_task(train_task)
return {"status": "Training started in background"}
if __name__ == "__main__":
import uvicorn
import os
port = int(os.environ.get("PORT", 7860))
uvicorn.run(app, host="0.0.0.0", port=port)