Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, BackgroundTasks | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from contextlib import asynccontextmanager | |
| from typing import Dict, Any | |
| import numpy as np | |
| from data_service import fetch_thingspeak_data | |
| from ml_service import ml_engine | |
| async def lifespan(app: FastAPI): | |
| # Startup: Pre-train models on mock/live data so dashboard works immediately | |
| print("Pre-training models on startup...") | |
| df = fetch_thingspeak_data(results=200) | |
| success = ml_engine.train_mock(df) | |
| if success: | |
| print("Models successfully pre-trained.") | |
| else: | |
| print("Model pre-training failed (insufficient data).") | |
| yield | |
| # Shutdown: nothing to clean up | |
| app = FastAPI(title="Vehicle Telemetry API", lifespan=lifespan) | |
| # Allow CORS for Next.js frontend | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Simplified for now to rule out CORS as the cause of "Failed to fetch" | |
| allow_credentials=False, # Credentials must be False if using wildcard origin | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def get_data() -> Dict[str, Any]: | |
| """Returns the latest telemetry data points.""" | |
| df = fetch_thingspeak_data(results=20) | |
| # Calculate acceleration magnitude for the frontend | |
| if not df.empty: | |
| df['acc_mag'] = np.sqrt(df['ax']**2 + df['ay']**2 + df['az']**2) | |
| else: | |
| df['acc_mag'] = [] | |
| return { | |
| "status": "success", | |
| "data": df.to_dict(orient="records") | |
| } | |
| def get_metrics() -> Dict[str, Any]: | |
| """Returns aggregated ride performance metrics.""" | |
| df = fetch_thingspeak_data(results=50) # Fetch more for aggregates | |
| if df.empty: | |
| return { | |
| "avg_rpm": 0.0, | |
| "max_rpm": 0.0, | |
| "avg_speed_kmh": 0.0, | |
| "efficiency_score": 0 | |
| } | |
| avg_rpm = df['rpm'].mean() | |
| max_rpm = df['rpm'].max() | |
| avg_speed_kmh = df['speed_kmh'].mean() | |
| # Efficiency score based on jerk/smoothness | |
| acc_mag = np.sqrt(df['ax']**2 + df['ay']**2 + df['az']**2) | |
| jerk = acc_mag.diff().abs().mean() | |
| efficiency = max(0, min(100, 100 - (jerk * 5))) | |
| return { | |
| "avg_rpm": round(float(avg_rpm), 1), | |
| "max_rpm": round(float(max_rpm), 1), | |
| "avg_speed_kmh": round(float(avg_speed_kmh), 1), | |
| "efficiency_score": round(float(efficiency if not np.isnan(efficiency) else 100)) | |
| } | |
| def get_ai_insights() -> Dict[str, Any]: | |
| """Returns AI model predictions on latest data.""" | |
| df = fetch_thingspeak_data(results=10) | |
| insights = ml_engine.get_insights(df) | |
| return { | |
| "driver_behavior": insights["behavior"], | |
| "anomaly_status": insights["anomaly_status"], | |
| } | |
| def get_prediction() -> Dict[str, Any]: | |
| """Returns predicted next speed.""" | |
| df = fetch_thingspeak_data(results=10) | |
| insights = ml_engine.get_insights(df) | |
| return { | |
| "predicted_speed": insights["predicted_speed"] | |
| } | |
| def trigger_training(background_tasks: BackgroundTasks): | |
| """Triggers model training using collected dataset.""" | |
| def train_task(): | |
| df = fetch_thingspeak_data(results=500) | |
| ml_engine.train_mock(df) | |
| print("Background training completed.") | |
| background_tasks.add_task(train_task) | |
| return {"status": "Training started in background"} | |
| if __name__ == "__main__": | |
| import uvicorn | |
| import os | |
| port = int(os.environ.get("PORT", 7860)) | |
| uvicorn.run(app, host="0.0.0.0", port=port) | |