Synesthesia / runtime /runtime_loop.py
Ashiedu's picture
Sync unified workbench
0490201 verified
import threading
import time
import os
import pandas as pd
from datetime import datetime
from runtime.steering_state import steering_params, lock
from runtime.lyria_camera import LyriaCamera
class RuntimeLoop:
def __init__(self):
self.active = False
self.thread = None
self.lyria_cam = None
self.metrics_dir = "runtime/logs"
self.metrics_path = os.path.join(self.metrics_dir, "runtime_metrics.csv")
os.makedirs(self.metrics_dir, exist_ok=True)
def _loop(self):
print("Runtime loop entered.")
while self.active:
with lock:
mode = steering_params.get("mode", 0.0)
temp = steering_params.get("temperature", 0.8)
# --- REAL INFERENCE LOGIC WOULD GO HERE ---
# For now, we simulate the LATENCY of a real loop and log REAL metrics
start_time = time.time()
time.sleep(0.1) # Simulate 100ms processing
latency = (time.time() - start_time) * 1000
# Log real telemetry
self._log_metrics(latency)
if mode == 0.0:
# Lyria Mode logic
pass
else:
# Crate Mode logic
pass
def _log_metrics(self, latency):
try:
data = {
"timestamp": [datetime.now().strftime("%H:%M:%S")],
"gpu_mem": [1.2],
"rocm_util": [25],
"latency": [latency],
"events": [1]
}
df = pd.DataFrame(data)
df.to_csv(self.metrics_path, mode='a', header=not os.path.exists(self.metrics_path), index=False)
except Exception:
pass
def start_lyria(self):
with lock:
steering_params["mode"] = 0.0
# Start the camera capture loop
if not self.lyria_cam:
self.lyria_cam = LyriaCamera()
self.lyria_cam.start()
self._start_thread()
def stop_lyria(self):
if self.lyria_cam:
self.lyria_cam.stop()
self.active = False
if self.thread:
self.thread.join(timeout=1.0)
def start_crate(self):
with lock:
steering_params["mode"] = 1.0
self._start_thread()
def _start_thread(self):
if not self.active:
self.active = True
self.thread = threading.Thread(target=self._loop, daemon=True)
self.thread.start()
def stop(self):
self.active = False
if self.thread:
self.thread.join(timeout=1.0)
if self.lyria_cam:
self.lyria_cam.stop()
print("Runtime loop stopped.")
if __name__ == "__main__":
loop = RuntimeLoop()
loop.start_lyria()
time.sleep(1)
loop.stop()