Spaces:
Build error
Build error
| import streamlit as st | |
| import json | |
| import os | |
| import asyncio | |
| import time | |
| from datetime import datetime, timedelta | |
| import nats | |
| from nats.errors import TimeoutError | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| servers = os.environ.get("NATS_HOST", "nats://localhost:4222").split(",") | |
| user = os.environ.get("NATS_USER", "") | |
| password = os.environ.get("NATS_PWD", "") | |
| async def capture_log(device_id, stop_event): | |
| """Capture logs from NATS and yield messages.""" | |
| nc = await nats.connect( | |
| servers=servers, | |
| user=user, | |
| password=password, | |
| ) | |
| sub = await nc.subscribe("maika.ai.nlu") | |
| try: | |
| while not stop_event.is_set(): | |
| try: | |
| message = await sub.next_msg(timeout=0.1) | |
| message = json.loads(message.data) | |
| if message.get("request", {}).get("deviceId", {}) == device_id: | |
| data = {} | |
| if message.get("method", "") == "AskMeAnyThing": | |
| data["end_time"] = message.get("end_time") | |
| data["query"] = message.get("info", {}).get("transcript", "") or "[NO AUDIO]" | |
| data["response"] = message.get("responses", "") | |
| if data: | |
| TIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f" | |
| timestamp = datetime.strftime( | |
| datetime.strptime(data["end_time"], TIME_FORMAT) + timedelta(hours=7), | |
| TIME_FORMAT | |
| ) | |
| data["end_time"] = timestamp | |
| yield data | |
| except TimeoutError: | |
| await asyncio.sleep(0.01) | |
| finally: | |
| await nc.close() | |
| # Initialize session state | |
| if "messages" not in st.session_state: | |
| st.session_state.messages = [] | |
| if "capturing" not in st.session_state: | |
| st.session_state.capturing = False | |
| if "stop_event" not in st.session_state: | |
| st.session_state.stop_event = None | |
| st.title("BuddyOS interaction log") | |
| # Device ID input | |
| device_id = st.sidebar.text_input("Device ID", value="AIMWL25430001685") | |
| # Control buttons | |
| if st.sidebar.button("Capture", | |
| disabled=st.session_state.capturing, | |
| use_container_width=True, | |
| type="secondary" | |
| ): | |
| if device_id: | |
| st.session_state.capturing = True | |
| st.session_state.messages = [] | |
| st.session_state.stop_event = asyncio.Event() | |
| st.rerun() | |
| if st.sidebar.button( | |
| "Stop Capturing + Clear history", | |
| disabled=not st.session_state.capturing, | |
| use_container_width=True, | |
| type="primary", | |
| ): | |
| if st.session_state.stop_event: | |
| st.session_state.messages = [] | |
| st.session_state.stop_event.set() | |
| st.session_state.capturing = False | |
| st.rerun() | |
| # Display chat messages | |
| chat_container = st.container() | |
| with chat_container: | |
| for msg in st.session_state.messages: | |
| with st.chat_message("user"): | |
| st.write(":gray-badge[%s] %s"%(msg["end_time"], msg["query"])) | |
| with st.chat_message("assistant"): | |
| st.write(msg["response"]) | |
| # Capture logs if active | |
| if st.session_state.capturing and device_id: | |
| status_placeholder = st.sidebar.empty() | |
| status_placeholder.info(f"🔴 Capturing logs for device: {device_id}") | |
| async def run_capture(): | |
| async for data in capture_log(device_id, st.session_state.stop_event): | |
| st.session_state.messages.append(data) | |
| # Display new message immediately | |
| with chat_container: | |
| with st.chat_message("user"): | |
| st.write(":gray-badge[%s] %s"%(data["end_time"], data["query"])) | |
| with st.chat_message("assistant"): | |
| st.write(data["response"]) | |
| try: | |
| asyncio.run(run_capture()) | |
| except Exception as e: | |
| st.error(f"Error: {str(e)}") | |
| st.session_state.capturing = False | |
| st.rerun() |