# rentbot/app.py import os import base64 import json import asyncio import numpy as np from fastapi import FastAPI, WebSocket, WebSocketDisconnect from dotenv import load_dotenv from audio_utils import ulaw_to_pcm16 from stt_handler import transcribe_audio_chunk from llm_handler import get_llm_response from tts_handler import text_to_speech_stream from tool_handler import execute_tool_call # Load environment variables from .env file load_dotenv() # Initialize FastAPI application app = FastAPI() # --- Add a root endpoint for health checks and basic info --- @app.get("/") async def root(): """ A simple GET endpoint to confirm the server is running and provide info. This is what you see when you visit the Hugging Face Space URL in a browser. """ return {"status": "running", "message": "RentBot is active. Connect via WebSocket at the /rentbot endpoint."} # --- Global Configuration --- SILENCE_THRESHOLD_SECONDS = 0.7 AUDIO_RATE = 8000 # Hz for Twilio media streams AUDIO_BUFFER_SIZE = int(SILENCE_THRESHOLD_SECONDS * AUDIO_RATE) # In-memory session storage (for demonstration). In production, use Redis or a database. sessions = {} # --- Main WebSocket Endpoint for Twilio --- @app.websocket("/rentbot") async def websocket_endpoint(ws: WebSocket): await ws.accept() stream_sid = None audio_buffer = np.array([], dtype=np.int16) try: async for message in ws.iter_text(): data = json.loads(message) if data['event'] == 'start': stream_sid = data['start']['streamSid'] sessions[stream_sid] = { "messages": [{"role": "system", "content": os.getenv("SYSTEM_PROMPT")}], "processing_task": None } print(f"New stream started: {stream_sid}") # Send an initial greeting initial_greeting = "Hi! I'm RentBot, your leasing assistant. How can I help you today?" sessions[stream_sid]["messages"].append({"role": "assistant", "content": initial_greeting}) async def send_initial_greeting(): tts_iterator = text_to_speech_stream(iter([initial_greeting])) async for audio_chunk in tts_iterator: payload = base64.b64encode(audio_chunk).decode('utf-8') await ws.send_json({ "event": "media", "streamSid": stream_sid, "media": {"payload": payload} }) await ws.send_json({"event": "mark", "streamSid": stream_sid, "mark": {"name": "bot_turn_end"}}) asyncio.create_task(send_initial_greeting()) elif data['event'] == 'media': if not stream_sid: continue chunk_ulaw = base64.b64decode(data['media']['payload']) chunk_pcm = ulaw_to_pcm16(chunk_ulaw) audio_buffer = np.append(audio_buffer, chunk_pcm) if len(audio_buffer) >= AUDIO_BUFFER_SIZE: if sessions[stream_sid].get("processing_task") and not sessions[stream_sid]["processing_task"].done(): continue task = asyncio.create_task(process_user_audio(ws, stream_sid, audio_buffer)) sessions[stream_sid]["processing_task"] = task audio_buffer = np.array([], dtype=np.int16) elif data['event'] == 'mark': if not stream_sid: continue if len(audio_buffer) > 1000: # Heuristic to process leftover audio on pause if not (sessions[stream_sid].get("processing_task") and not sessions[stream_sid]["processing_task"].done()): task = asyncio.create_task(process_user_audio(ws, stream_sid, audio_buffer)) sessions[stream_sid]["processing_task"] = task audio_buffer = np.array([], dtype=np.int16) elif data['event'] == 'stop': print(f"Stream stopped: {stream_sid}") break except WebSocketDisconnect: print(f"WebSocket disconnected for stream {stream_sid}") except Exception as e: print(f"An error occurred in websocket_endpoint: {e}") finally: if stream_sid and stream_sid in sessions: if sessions[stream_sid].get("processing_task"): sessions[stream_sid]["processing_task"].cancel() del sessions[stream_sid] print(f"Session cleaned up for stream {stream_sid}") # --- Core Logic Functions --- async def process_user_audio(ws: WebSocket, stream_sid: str, audio_chunk: np.ndarray): """The main logic loop: STT -> LLM -> (Tool/TTS)""" print(f"[{stream_sid}] Processing audio chunk of size {len(audio_chunk)}...") # 1. Speech-to-Text user_text = await transcribe_audio_chunk(audio_chunk) if not user_text: print(f"[{stream_sid}] No text transcribed.") return print(f"[{stream_sid}] User said: {user_text}") sessions[stream_sid]["messages"].append({"role": "user", "content": user_text}) # Queue to pass text from LLM to TTS tts_queue = asyncio.Queue() async def llm_chunk_handler(chunk): await tts_queue.put(chunk) async def tts_text_iterator(): while True: chunk = await tts_queue.get() if chunk is None: break yield chunk # 2. Start LLM and TTS tasks concurrently for low latency llm_task = asyncio.create_task(get_llm_response(sessions[stream_sid]["messages"], llm_chunk_handler)) tts_task = asyncio.create_task(stream_and_send_audio(ws, stream_sid, tts_text_iterator())) # Wait for LLM to finish and get final message object assistant_message, tool_calls = await llm_task await tts_queue.put(None) # Signal TTS to end await tts_task # Wait for TTS to finish sending audio if assistant_message and assistant_message.get("content"): sessions[stream_sid]["messages"].append(assistant_message) # 3. Handle Tool Calls if any if tool_calls: sessions[stream_sid]["messages"].append(assistant_message) for tool_call_data in tool_calls: tool_call = type('ToolCall', (), { 'id': tool_call_data.get('id'), 'function': type('Function', (), tool_call_data.get('function')) })() print(f"[{stream_sid}] Executing tool: {tool_call.function.name}") tool_result_message = execute_tool_call(tool_call) sessions[stream_sid]["messages"].append(tool_result_message) # 4. Get a final response from the LLM after executing the tool final_tts_queue = asyncio.Queue() async def final_llm_chunk_handler(chunk): await final_tts_queue.put(chunk) async def final_tts_iterator(): while True: chunk = await final_tts_queue.get() if chunk is None: break yield chunk final_llm_task = asyncio.create_task(get_llm_response(sessions[stream_sid]["messages"], final_llm_chunk_handler)) final_tts_task = asyncio.create_task(stream_and_send_audio(ws, stream_sid, final_tts_iterator())) final_assistant_message, _ = await final_llm_task await final_tts_queue.put(None) await final_tts_task if final_assistant_message: sessions[stream_sid]["messages"].append(final_assistant_message) async def stream_and_send_audio(ws: WebSocket, stream_sid: str, text_iterator): """Stream text to TTS and send the resulting audio back over the WebSocket.""" async for audio_chunk in text_to_speech_stream(text_iterator): if audio_chunk: payload = base64.b64encode(audio_chunk).decode('utf-8') await ws.send_json({ "event": "media", "streamSid": stream_sid, "media": {"payload": payload} }) await ws.send_json({"event": "mark", "streamSid": stream_sid, "mark": {"name": "bot_turn_end"}}) print(f"[{stream_sid}] Finished sending bot's audio turn.") # --- Application Entry Point --- if __name__ == "__main__": import uvicorn # Hugging Face Spaces expects the app to run on port 7860 port = int(os.environ.get("PORT", 7860)) print(f"Starting RentBot server on host 0.0.0.0 and port {port}...") uvicorn.run(app, host="0.0.0.0", port=port)