import streamlit as st
import pandas as pd
import altair as alt
import os
import sys
import time
BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.append(BASE_DIR)
from inference import TelemetryInferenceEngineLite
DATA_PATH = os.path.join(BASE_DIR, "assets/test.csv")
st.set_page_config(page_title="Race Telemetry", layout="wide")
@st.cache_resource
def load_engine():
return TelemetryInferenceEngineLite()
engine = load_engine()
@st.cache_data
def load_data():
return pd.read_csv(DATA_PATH)
FULL_DATA = load_data()
if "cursor" not in st.session_state:
st.session_state.cursor = 0
if "telemetry" not in st.session_state:
st.session_state.telemetry = pd.DataFrame()
st.markdown("""
""", unsafe_allow_html=True)
st.sidebar.title("Pit Wall Controls")
auto_refresh = st.sidebar.toggle("Auto Refresh", value=True)
refresh_interval = st.sidebar.slider("Refresh Interval (seconds)", 1, 5, 1)
batch_size = st.sidebar.selectbox("Rows per fetch", [1, 5, 10], index=0)
def fetch_rows(batch_size):
start = st.session_state.cursor
end = start + batch_size
batch = FULL_DATA.iloc[start:end].copy()
st.session_state.cursor = end
return batch
def run_inference(df_batch):
outputs = []
for _, row in df_batch.iterrows():
prediction = engine.process_row(row)
row_dict = row.to_dict()
row_dict["predicted_lap_time"] = prediction.get("predicted_lap_time")
row_dict["predicted_gear"] = prediction.get("predicted_gear")
row_dict["driving_behavior"] = prediction.get("driving_behavior")
outputs.append(row_dict)
return pd.DataFrame(outputs)
new_data = fetch_rows(batch_size)
if not new_data.empty:
processed = run_inference(new_data)
st.session_state.telemetry = pd.concat(
[st.session_state.telemetry, processed],
ignore_index=True
)
df = st.session_state.telemetry
if df.empty:
st.stop()
df["t"] = range(len(df))
latest = df.iloc[-1]
st.markdown(
"""
🏁 Race Telemetry
Pit Wall Dashboard
""",
unsafe_allow_html=True
)
left, right = st.columns([3, 1])
with left:
with st.container(border=True):
c1, c2, c3 = st.columns(3)
with c1:
st.markdown('Predicted Lap
', unsafe_allow_html=True)
st.markdown(
f'{latest["predicted_lap_time"]:.2f} s
',
unsafe_allow_html=True
)
with c2:
st.markdown('Recommended Gear
', unsafe_allow_html=True)
st.markdown(
f'{int(latest["predicted_gear"])}
',
unsafe_allow_html=True
)
with c3:
st.markdown('Driving Style
', unsafe_allow_html=True)
st.markdown(
f'{latest["driving_behavior"]}
',
unsafe_allow_html=True
)
st.markdown("
", unsafe_allow_html=True)
r2c1, r2c2 = st.columns(2)
r2c1.metric("Speed (km/h)", f"{latest['speed']:.1f}")
r2c1.altair_chart(
alt.Chart(df).mark_line().encode(x="t:Q", y="speed:Q"),
use_container_width=True
)
r2c2.metric("Engine RPM", int(latest["current_engine_rpm"]))
r2c2.altair_chart(
alt.Chart(df).mark_area(opacity=0.7).encode(x="t:Q", y="current_engine_rpm:Q"),
use_container_width=True
)
with right:
st.markdown("### Track")
st.image(os.path.join(BASE_DIR, "assets/track.png"), use_container_width=True)
p1, p2, p3, p4 = st.columns(4)
df["power_kw"] = df["power"] / 1000
p1.metric("Power (kW)", f"{df['power_kw'].iloc[-1]:.1f}")
p1.altair_chart(alt.Chart(df).mark_area().encode(x="t", y="power_kw"), use_container_width=True)
p2.metric("Torque (Nm)", f"{latest['torque']:.1f}")
p2.altair_chart(alt.Chart(df).mark_line().encode(x="t", y="torque"), use_container_width=True)
p3.metric("Boost (psi)", f"{latest['boost']:.2f}")
p3.altair_chart(alt.Chart(df).mark_line().encode(x="t", y="boost"), use_container_width=True)
p4.metric("Avg Tire Temp (°C)", f"{latest['avg_tire_temp']:.1f}")
p4.altair_chart(alt.Chart(df).mark_line().encode(x="t", y="avg_tire_temp"), use_container_width=True)
attitude_chart = alt.Chart(df).transform_fold(
["yaw", "pitch", "roll"],
as_=["Axis", "Value"]
).mark_line().encode(
x="t:Q",
y="Value:Q",
color="Axis:N"
)
st.metric(
"Yaw / Pitch / Roll (rad)",
f"{latest['yaw']:.2f}, {latest['pitch']:.2f}, {latest['roll']:.2f}"
)
st.altair_chart(attitude_chart, use_container_width=True)
if auto_refresh:
time.sleep(refresh_interval)
st.rerun()