duyngtr
init code
5db4f1d
import streamlit as st
import json
import os
import asyncio
import time
from datetime import datetime, timedelta
import nats
from nats.errors import TimeoutError
from dotenv import load_dotenv
load_dotenv()
servers = os.environ.get("NATS_HOST", "nats://localhost:4222").split(",")
user = os.environ.get("NATS_USER", "")
password = os.environ.get("NATS_PWD", "")
async def capture_log(device_id, stop_event):
"""Capture logs from NATS and yield messages."""
nc = await nats.connect(
servers=servers,
user=user,
password=password,
)
sub = await nc.subscribe("maika.ai.nlu")
try:
while not stop_event.is_set():
try:
message = await sub.next_msg(timeout=0.1)
message = json.loads(message.data)
if message.get("request", {}).get("deviceId", {}) == device_id:
data = {}
if message.get("method", "") == "AskMeAnyThing":
data["end_time"] = message.get("end_time")
data["query"] = message.get("info", {}).get("transcript", "") or "[NO AUDIO]"
data["response"] = message.get("responses", "")
if data:
TIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f"
timestamp = datetime.strftime(
datetime.strptime(data["end_time"], TIME_FORMAT) + timedelta(hours=7),
TIME_FORMAT
)
data["end_time"] = timestamp
yield data
except TimeoutError:
await asyncio.sleep(0.01)
finally:
await nc.close()
# Initialize session state
if "messages" not in st.session_state:
st.session_state.messages = []
if "capturing" not in st.session_state:
st.session_state.capturing = False
if "stop_event" not in st.session_state:
st.session_state.stop_event = None
st.title("BuddyOS interaction log")
# Device ID input
device_id = st.sidebar.text_input("Device ID", value="AIMWL25430001685")
# Control buttons
if st.sidebar.button("Capture",
disabled=st.session_state.capturing,
use_container_width=True,
type="secondary"
):
if device_id:
st.session_state.capturing = True
st.session_state.messages = []
st.session_state.stop_event = asyncio.Event()
st.rerun()
if st.sidebar.button(
"Stop Capturing + Clear history",
disabled=not st.session_state.capturing,
use_container_width=True,
type="primary",
):
if st.session_state.stop_event:
st.session_state.messages = []
st.session_state.stop_event.set()
st.session_state.capturing = False
st.rerun()
# Display chat messages
chat_container = st.container()
with chat_container:
for msg in st.session_state.messages:
with st.chat_message("user"):
st.write(":gray-badge[%s] %s"%(msg["end_time"], msg["query"]))
with st.chat_message("assistant"):
st.write(msg["response"])
# Capture logs if active
if st.session_state.capturing and device_id:
status_placeholder = st.sidebar.empty()
status_placeholder.info(f"🔴 Capturing logs for device: {device_id}")
async def run_capture():
async for data in capture_log(device_id, st.session_state.stop_event):
st.session_state.messages.append(data)
# Display new message immediately
with chat_container:
with st.chat_message("user"):
st.write(":gray-badge[%s] %s"%(data["end_time"], data["query"]))
with st.chat_message("assistant"):
st.write(data["response"])
try:
asyncio.run(run_capture())
except Exception as e:
st.error(f"Error: {str(e)}")
st.session_state.capturing = False
st.rerun()