f1-telemetry / data_service.py
AdeshJain's picture
Upload 6 files
36c05b7 verified
from datetime import datetime
import os
import pandas as pd
import requests
# Can be overridden via environment variables (set as HF Space secrets)
THINGSPEAK_CHANNEL_ID = os.environ.get("THINGSPEAK_CHANNEL_ID", "3352043")
THINGSPEAK_READ_KEY = os.environ.get("THINGSPEAK_READ_KEY", "2LNNU8X6OAW22BHE")
# In-memory mock data array for when real API fails or isn't configured
mock_data_store = []
mock_time_counter = datetime.now().timestamp()
def generate_mock_data(limit=10):
"""Generates mock telemetry data to ensure the UI works out-of-the-box."""
global mock_time_counter
import random
import math
data = []
for _ in range(limit):
# Sine wave base for speed + random noise
base_speed = 60 + 20 * math.sin(mock_time_counter / 100)
speed = max(0, min(200, base_speed + random.uniform(-5, 5)))
# Determine acceleration based on speed change
# Mock simple ax, ay, az
ax = random.uniform(-2, 2)
ay = random.uniform(-1, 1)
az = random.uniform(9.0, 10.5) # gravity is 9.8
temperature = random.uniform(25.0, 45.0)
data.append({
"created_at": datetime.fromtimestamp(mock_time_counter).isoformat() + "Z",
"field1": str(temperature),
"field2": "50.0", # humidity
"field3": str(ay),
"field4": str(ax),
"field5": str(az),
"field6": str(speed), # using speed mapped to RPM slot
"field7": "1", # lapCount
"field8": str(80.0 + random.uniform(-2, 2)) # tireTemp
})
mock_time_counter += 15 # 15 seconds per interval
return data
def fetch_thingspeak_data(results=100) -> pd.DataFrame:
"""Fetches data from ThingSpeak. Falls back to mock data if not configured or failed."""
API_URL = f"https://api.thingspeak.com/channels/{THINGSPEAK_CHANNEL_ID}/feeds.json"
params = {"results": results}
if THINGSPEAK_READ_KEY:
params["api_key"] = THINGSPEAK_READ_KEY
use_mock = (THINGSPEAK_CHANNEL_ID == "YOUR_CHANNEL_ID_HERE" or THINGSPEAK_CHANNEL_ID == "")
if not use_mock:
try:
response = requests.get(API_URL, params=params, timeout=5)
response.raise_for_status()
data = response.json().get("feeds", [])
if not data:
use_mock = True
except Exception as e:
print("ThingSpeak fetch failed, falling back to mock. Error:", e)
use_mock = True
if use_mock:
data = generate_mock_data(results)
df = pd.DataFrame(data)
if df.empty:
return pd.DataFrame(columns=["timestamp", "speed", "ax", "ay", "az", "temperature", "humidity", "lap_count", "tire_temp"])
# Map fields according to ESP32 code:
# 1: temperature, 2: humidity, 3: latG (ay), 4: lonG (ax), 5: vertG (az), 6: RPM, 7: speedKmh
rename_map = {
"created_at": "timestamp",
"field1": "temperature",
"field2": "humidity",
"field3": "ay",
"field4": "ax",
"field5": "az",
"field6": "rpm",
"field7": "speed_kmh"
}
df = df.rename(columns=rename_map)
# Filter to only the columns we expect (some mock/initial points might lack them)
expected_cols = ["timestamp", "temperature", "humidity", "ay", "ax", "az", "rpm", "speed_kmh"]
for col in expected_cols:
if col not in df.columns:
df[col] = 0.0
df = df[expected_cols]
# Convert types
df["timestamp"] = pd.to_datetime(df["timestamp"])
for col in expected_cols[1:]:
df[col] = pd.to_numeric(df[col], errors="coerce")
# Handle missing values by forward filling then backward filling
df = df.ffill().bfill()
df = df.fillna(0) # any remaining NaNs become 0
return df