diff --git "a/agent.py" "b/agent.py" --- "a/agent.py" +++ "b/agent.py" @@ -1,1205 +1,1205 @@ -import logging -import asyncio -import json -import os -from datetime import datetime -from zoneinfo import ZoneInfo -from typing import Annotated, Optional, AsyncIterable, Any, Dict -import random -import http.server -import socketserver -import threading - -from dotenv import load_dotenv -from livekit import rtc -from livekit.agents import ( - AutoSubscribe, - JobContext, - JobProcess, - WorkerOptions, - cli, - llm, - AgentSession, - metrics, - MetricsCollectedEvent, - Agent, -) -from livekit.agents.llm import function_tool -from livekit.agents.voice import ( - RunContext, - ModelSettings, -) -from livekit.plugins import openai, deepgram, cartesia, silero, groq - -# Groq SDK for summary generation -from groq import Groq as GroqClient - -# Monitoring and validation imports -import sentry_sdk -from logger import logger -from validators import validate_phone_number, validate_appointment_time, validate_purpose, validate_appointment_id - -# Try to import Beyond Presence plugin if available -try: - from livekit.plugins import bey - BEY_AVAILABLE = True -except ImportError: - BEY_AVAILABLE = False - logging.warning("Beyond Presence plugin not available. Install with: pip install \"livekit-agents[bey]\"") - -from db import Database - -load_dotenv() - -# Initialize Sentry for error tracking -if os.getenv("SENTRY_DSN"): - sentry_sdk.init( - dsn=os.getenv("SENTRY_DSN"), - traces_sample_rate=0.1, - environment=os.getenv("ENVIRONMENT", "production") - ) - print("✅ Sentry error tracking enabled") - -logger = logging.getLogger("voice-agent") -logger.setLevel(logging.INFO) - -# Suppress noisy logs from libraries -logging.getLogger("hpack").setLevel(logging.WARNING) -logging.getLogger("httpx").setLevel(logging.WARNING) -logging.getLogger("livekit").setLevel(logging.INFO) -logging.getLogger("urllib3").setLevel(logging.WARNING) - -def get_groq_api_key(): - """Rotate between multiple Groq API keys if available to avoid rate limits.""" - keys_str = os.getenv("GROQ_API_KEYS", "") - if keys_str: - keys = [k.strip() for k in keys_str.split(",") if k.strip()] - if keys: - chosen = random.choice(keys) - print(f"DEBUG: Selected Groq Key from list of {len(keys)}. Prefix: {chosen[:5]}...") - return chosen - - single_key = os.getenv("GROQ_API_KEY") - if single_key: - print(f"DEBUG: Using single GROQ_API_KEY. Prefix: {single_key[:5]}...") - return single_key - - print("DEBUG: No Groq API Key found!") - return None - -try: - from flagsmith import Flagsmith - flagsmith = Flagsmith(environment_key=os.getenv("FLAGSMITH_ENVIRONMENT_KEY", "default")) -except Exception: - flagsmith = None - -# ... (omitting lines for brevity) - - - - - -SYSTEM_PROMPT = """ -You are the SkyTask Clinic Assistant, a friendly and capable voice receptionist. - -# User: {name} | Status: {status} | Goal: {goal_instruction} -# Rules -- Voice response: Plain text only. Natural and polite. -- Be warm: Use "Good morning", "Thank you", "Please". -- Length: 1-3 sentences, but don't be robotic. -- Speak nums: "five five five". No emojis/markdown. -- Address user by name if known. -# Flow -1. Identify user (ask phone/name). -2. Tools: book_appointment, check_slots, retrieve_appointments, cancel/modify, summarize_call, end_conversation. - - STRICT: Only call these tools. Do NOT invent new tools. - - Do NOT speak tool names. Execute silently. - - summarize_call: When user asks "summarize" or "recap" - gives summary but continues call - - end_conversation: When user says "end call", "goodbye", "bye" - ends the call -3. Verify name mismatches. -# Guardrails -- Privacy protection active. -- Scope: Clinic appointments only. -""" - -class Assistant(Agent): - def __init__(self, db: Database, user_context: dict, room): - current_time_ist = datetime.now(ZoneInfo("Asia/Kolkata")).strftime("%Y-%m-%d %I:%M %p") - - # Initialize with Guest state - instructions = SYSTEM_PROMPT.format( - name="Guest", - status="Unidentified", - goal_instruction="Ask for their phone number (and name) to pull up their file. Say: 'Hi! I'm the clinic assistant. May I have your phone number to get started?'" - ) - instructions += f"\n\nCurrent time (IST): {current_time_ist}" - - super().__init__(instructions=instructions) - self.db = db - self.user_context = user_context - self.room = room - self.current_time_str = current_time_ist - self.should_disconnect = False - - # References needed for summary generation (set later in entrypoint) - self.usage_collector = None - self.assistant = None - self.start_time = datetime.now() - self.avatar_type = None - self.tts_provider = None - - # Prevent duplicate summaries - self.summary_generated = False - - # Listen for data messages from frontend (e.g., End Call button) - @room.on("data_received") - def on_data_received(data_packet): - try: - payload = data_packet.data.decode('utf-8') - data = json.loads(payload) - - if data.get("type") == "request_end_call": - logger.info("🔴 Frontend requested end call via button - triggering end_conversation") - # Trigger the end_conversation tool asynchronously - asyncio.create_task(self.end_conversation("User clicked End Call button")) - except Exception as e: - logger.warning(f"Error processing frontend data message: {e}") - - def update_instructions_with_name(self, name: str): - """Update the agent's instructions to include the user's name""" - try: - # Re-format with REAL name - new_instructions = SYSTEM_PROMPT.format( - name=name, - status="Authenticated", - goal_instruction=f"Help {name} with appointments. Address them as {name}." - ) - full_instructions = f"{new_instructions}\n\nCurrent time (IST): {self.current_time_str}" - - # Update the agent's instructions - self._instructions = full_instructions - - print(f"✅ Updated agent instructions with user name: {name}") - print(f"🔍 DEBUG - NEW PROMPT:\n{new_instructions}") - return True - except Exception as e: - print(f"Failed to update instructions: {e}") - return False - - # ... (omitting lines) ... - - @function_tool() - async def identify_user( - self, - contact_number: str - ): - """Identify the user by their phone number. Only call this when you have received a numeric phone number. - - Args: - contact_number: The user's contact phone number (e.g. 555-0101). Do not provide an empty string. - """ - if not contact_number or len(contact_number.strip()) < 3: - return "Error: A valid contact number is required to identify the user." - - try: - contact_number = validate_phone_number(contact_number) - except ValueError as e: - return f"Error: {str(e)}" - - await self._emit_frontend_event("identify_user", "started", {"contact_number": contact_number}) - logger.info(f"Identifying user with number: {contact_number}") - user = self.db.get_user(contact_number) - if not user: - user = self.db.create_user(contact_number) - is_new = True - else: - is_new = False - - self.user_context["contact_number"] = contact_number - self.user_context["user_name"] = user.get("name", "User") - - name = user.get('name', 'User') - - # Update the agent's instructions to include the user's name - self.update_instructions_with_name(name) - - # ALSO inject a system message into the chat context - # This ensures the LLM knows the name in the conversation history - if hasattr(self, 'chat_ctx') and self.chat_ctx: - try: - self.chat_ctx.items.append( - llm.ChatMessage( - role="system", - content=[f"IMPORTANT: The user's name is {name}. You MUST address them as {name} in all future responses. When they ask 'what's my name' or 'do you know my name', respond with 'Yes, {name}, your name is {name}.'"] - ) - ) - print(f"✅ Injected name '{name}' into chat context") - except Exception as e: - print(f"Could not inject into chat context: {e}") - - # Return a message that FORCES the agent to say the name immediately - result_msg = f"User identified successfully. Their name is {name}. You MUST immediately respond by saying: 'Great to meet you, {name}! How can I help you today?' Use their name {name} in your response right now." - await self._emit_frontend_event("identify_user", "success", result={"name": name, "is_new": is_new}) - return result_msg - - @function_tool() - async def verify_identity( - self, - contact_number: str, - stated_name: str - ): - """Verify the user's identity using both their phone number and stated name. - Use this when the user provides both pieces of information. - - Args: - contact_number: The user's phone number (numeric). - stated_name: The name the user introduced themselves with. - """ - if not contact_number or len(contact_number.strip()) < 3: - return "Error: A valid contact number is required." - - try: - contact_number = validate_phone_number(contact_number) - except ValueError as e: - return f"Error: {str(e)}" - - await self._emit_frontend_event("verify_identity", "started", {"contact_number": contact_number, "name": stated_name}) - logger.info(f"Verifying identity: {stated_name} with {contact_number}") - - user = self.db.get_user(contact_number) - - if not user: - # New user case with name provided - user = self.db.create_user(contact_number, name=stated_name) - is_new = True - db_name = stated_name - match = True - else: - is_new = False - db_name = user.get("name", "User") - # Simple fuzzy match check (case insensitive) - match = stated_name.lower() in db_name.lower() or db_name.lower() in stated_name.lower() - - self.user_context["contact_number"] = contact_number - self.user_context["user_name"] = db_name - - # Update system with the CORRECT name from DB (or new name) - self.update_instructions_with_name(db_name) - - if match: - # ALSO inject a system message into the chat context - # NOTE: Disabled - chat_ctx is read-only, agent instructions are sufficient - # if hasattr(self, 'chat_ctx') and self.chat_ctx: - # try: - # self.chat_ctx.items.append( - # llm.ChatMessage( - # role="system", - # content=[f"IMPORTANT: Identity verified. User is {db_name}. Address them as {db_name}."] - # ) - # ) - # except Exception: - # pass - - result_msg = f"Identity verified! The user is indeed {db_name}. Greet them naturally as {db_name}." - await self._emit_frontend_event("verify_identity", "success", result={"name": db_name, "match": True}) - return result_msg - else: - # Name mismatch logic - result_msg = f"Identity Mismatch Warning: The phone number belongs to '{db_name}', but user said '{stated_name}'. politely ask: 'I have this number registered under {db_name}. Are you {db_name}?'" - await self._emit_frontend_event("verify_identity", "warning", result={"db_name": db_name, "stated_name": stated_name, "match": False}) - return result_msg - - async def _emit_frontend_event(self, tool_name: str, status: str, args: dict = None, result: dict = None): - try: - payload = json.dumps({ - "type": "tool_call", - "tool": tool_name, - "status": status, - "args": args, - "result": result - }) - await self.room.local_participant.publish_data(payload, reliable=True) - except Exception as e: - logger.error(f"Failed to emit frontend event: {e}") - - @function_tool() - async def hello(self, response: str = ""): - """This tool is used for greetings. - - Args: - response: The greeting response. - """ - return "Hello! How can I help you today?" - - @function_tool() - async def identify_user( - self, - contact_number: str - ): - """Identify the user by their phone number. Only call this when you have received a numeric phone number. - - Args: - contact_number: The user's contact phone number (e.g. 555-0101). Do not provide an empty string. - """ - if not contact_number or len(contact_number.strip()) < 3: - return "Error: A valid contact number is required to identify the user." - - try: - contact_number = validate_phone_number(contact_number) - except ValueError as e: - return f"Error: {str(e)}" - - await self._emit_frontend_event("identify_user", "started", {"contact_number": contact_number}) - logger.info(f"Identifying user with number: {contact_number}") - user = self.db.get_user(contact_number) - if not user: - user = self.db.create_user(contact_number) - is_new = True - else: - is_new = False - - self.user_context["contact_number"] = contact_number - self.user_context["user_name"] = user.get("name", "User") - - # Helper comment: Name will now be picked up by the LLM from the tool return value - # and usage enforced by updated system prompts. - - result_msg = f"User identified. Name: {user.get('name')}. New user: {is_new}." - await self._emit_frontend_event("identify_user", "success", result={"name": user.get('name'), "is_new": is_new}) - return result_msg - - @function_tool() - async def fetch_slots(self, location: str): - """Fetch available appointment slots. - - Args: - location: The clinic location to check (e.g. 'main', 'downtown'). - """ - logger.info(f"Fetching available slots for {location}") - await self._emit_frontend_event("fetch_slots", "started", {"location": location}) - - # Use DB method to fetch slots (real or mock) - available_slots = self.db.get_available_slots() - slots_json = json.dumps(available_slots) - - await self._emit_frontend_event("fetch_slots", "success", result=available_slots) - return slots_json - - @function_tool() - async def book_appointment( - self, - time: str, - purpose: str - ): - """Book an appointment for the identified user. - - Args: - time: The ISO 8601 formatted date and time for the appointment. - purpose: Purpose of the appointment. - """ - await self._emit_frontend_event("book_appointment", "started", {"time": time, "purpose": purpose}) - contact_number = self.user_context.get("contact_number") - if not contact_number: - return "Error: User not identified. Please ask for phone number first." - - try: - contact_number = validate_phone_number(contact_number) - except ValueError as e: - return f"Error validation phone: {str(e)}" - - logger.info(f"Booking appointment for {contact_number} at {time}") - - is_available = self.db.check_slot_availability(datetime.fromisoformat(time)) - if not is_available: - return "Error: Slot not available." - - result = self.db.book_appointment(contact_number, time, purpose) - if result: - await self._emit_frontend_event("book_appointment", "success", result=result) - return f"Appointment booked successfully. ID: {result.get('id')}" - else: - await self._emit_frontend_event("book_appointment", "failed") - return "Failed to book appointment." - - @function_tool() - async def retrieve_appointments(self, user_confirmation: str): - """Retrieve past and upcoming appointments for the identified user. - - Args: - user_confirmation: The user's confirmation to see their appointments (e.g. 'show them', 'yes'). - """ - await self._emit_frontend_event("retrieve_appointments", "started") - contact_number = self.user_context.get("contact_number") - if not contact_number: - return "Error: User not identified." - - try: - contact_number = validate_phone_number(contact_number) - except ValueError as e: - return f"Error: {str(e)}" - - appointments = self.db.get_user_appointments(contact_number) - if not appointments: - await self._emit_frontend_event("retrieve_appointments", "success", result=[]) - return "No appointments found." - - await self._emit_frontend_event("retrieve_appointments", "success", result=appointments) - return json.dumps(appointments) - - @function_tool() - async def cancel_appointment( - self, - appointment_id: str - ): - """Cancel an appointment. - - Args: - appointment_id: The ID of the appointment to cancel. - """ - await self._emit_frontend_event("cancel_appointment", "started", {"appointment_id": appointment_id}) - success = self.db.cancel_appointment(appointment_id) - if success: - await self._emit_frontend_event("cancel_appointment", "success", result={"id": appointment_id}) - return "Appointment cancelled successfully." - else: - await self._emit_frontend_event("cancel_appointment", "failed") - return "Failed to cancel appointment." - - @function_tool() - async def modify_appointment( - self, - appointment_id: str, - new_time: str - ): - """Modify the date/time of an appointment. - - Args: - appointment_id: The ID of the appointment to modify. - new_time: The new ISO 8601 formatted date and time. - """ - await self._emit_frontend_event("modify_appointment", "started", {"appointment_id": appointment_id, "new_time": new_time}) - success = self.db.modify_appointment(appointment_id, new_time) - if success: - await self._emit_frontend_event("modify_appointment", "success", result={"id": appointment_id, "new_time": new_time}) - return "Appointment modified successfully." - else: - await self._emit_frontend_event("modify_appointment", "failed") - return "Failed to modify appointment." - - @function_tool() - async def summarize_call( - self, - request: Annotated[str, "User's request for summary"] = "summarize" - ) -> str: - """Provide a summary of the current call without ending it. - - Use this when the user asks for a summary but wants to continue the conversation. - Example triggers: "Can you summarize?", "What did we discuss?", "Recap please" - - Args: - request: The user's request for a summary (e.g., "summarize", "recap") - - Returns: - str: A spoken summary of the conversation so far. - """ - logger.info(f"Generating mid-call summary (not ending): {request}") - - # Get context and metrics - contact = self.user_context.get("contact_number") - if not contact: - return "So far, we've discussed your appointments. Is there anything else I can help you with?" - - # Collect usage metrics - summary = self.usage_collector.get_summary() - usage_stats = { - "stt_duration": summary.stt_audio_duration, - "llm_prompt_tokens": summary.llm_prompt_tokens, - "llm_completion_tokens": summary.llm_completion_tokens, - "tts_chars": summary.tts_characters_count - } - - duration = (datetime.now() - self.start_time).total_seconds() - user_name = self.user_context.get("user_name", "the patient") - - # Generate summary directly - try: - summary_data = await generate_and_save_summary( - self.db, - self.assistant.chat_ctx, - contact, - duration, - self.avatar_type, - self.tts_provider, - user_name, - usage_stats - ) - if summary_data and isinstance(summary_data, dict): - spoken_summary = summary_data.get("spoken_text", "So far, we've discussed your appointments.") - logger.info(f"Mid-call summary: {spoken_summary}") - return spoken_summary - except Exception as e: - logger.error(f"Failed to generate mid-call summary: {e}") - - return "So far, we've discussed your appointments. Is there anything else I can help you with?" - - @function_tool() - async def end_conversation(self, summary_request: str): - """End the current conversation session and generate a final summary. - - Args: - summary_request: The user's request to end or wrap up (e.g. 'bye', 'summarize', 'we're done'). - """ - logger.info("Ending conversation - generating summary first") - - # GUARD: Prevent duplicate summaries - if self.summary_generated: - logger.warning("Summary already generated - skipping duplicate generation") - return "Thank you for calling. Goodbye!" - - spoken_text = "Thank you for calling. Have a great day!" - summary_sent = False - - # Get context and metrics - contact = self.user_context.get("contact_number") - if contact: - # Collect usage metrics - summary = self.usage_collector.get_summary() - usage_stats = { - "stt_duration": summary.stt_audio_duration, - "llm_prompt_tokens": summary.llm_prompt_tokens, - "llm_completion_tokens": summary.llm_completion_tokens, - "tts_chars": summary.tts_characters_count - } - - duration = (datetime.now() - self.start_time).total_seconds() - user_name = self.user_context.get("user_name", "the patient") - - # Generate summary directly - try: - summary_data = await generate_and_save_summary( - self.db, - self.assistant.chat_ctx, - contact, - duration, - self.avatar_type, - self.tts_provider, - user_name, - usage_stats - ) - if summary_data and isinstance(summary_data, dict): - # 1. Get spoken summary - spoken_text = summary_data.get("spoken_text", spoken_text) - - # 2. Publish structured data to frontend - payload = json.dumps({ - "type": "summary", - "summary": summary_data - }) - await self.room.local_participant.publish_data(payload, reliable=True) - logger.info("Summary sent to frontend") - summary_sent = True - - # Mark summary as generated to prevent duplicates - self.summary_generated = True - - # CRITICAL: Send close_session to trigger auto-disconnect for voice UX - # Small delay to ensure summary is received first - await asyncio.sleep(0.1) - close_payload = json.dumps({"type": "close_session"}) - await self.room.local_participant.publish_data(close_payload, reliable=True) - logger.info("✅ close_session sent - UI will auto-disconnect") - - except Exception as e: - logger.error(f"Failed to process summary: {e}") - - # CRITICAL: If summary wasn't sent, send fallback with at least cost structure - if not summary_sent: - logger.warning("Sending fallback summary with cost placeholder") - fallback = { - "content": "Call ended. See cost breakdown below.", - "spoken_text": spoken_text, - "costs": {"stt": 0.0, "tts": 0.0, "llm": 0.0, "avatar": 0.0, "total": 0.0}, - "status": "fallback" - } - try: - payload = json.dumps({"type": "summary", "summary": fallback}) - await self.room.local_participant.publish_data(payload, reliable=True) - logger.info("Fallback summary sent to frontend") - except Exception as e: - logger.error(f"Failed to send fallback: {e}") - - # NOTE: Don't send close_session here - let frontend's 2-second timer handle disconnect - # This ensures the summary data channel message is received before disconnect - - # 4. Request disconnect implicitly by setting flag - # The session listener will handle the actual disconnect after speech ends - self.should_disconnect = True - logger.info("Disconnect requested - waiting for speech to finish") - - # Start safeguard immediately - asyncio.create_task(self.safeguard_disconnect()) - - # Return the simplified spoken text for the agent to say immediately - return spoken_text - - async def safeguard_disconnect(self): - """Force disconnect if normal flow fails.""" - logger.info("Safeguard: Timer started (10s)...") - await asyncio.sleep(10.0) - - state = self.room.connection_state - logger.info(f"Safeguard: Timeout reached. Room state is: {state}") - - if state == "connected": - logger.warning("Safeguard: Timed out. Sending close_session event.") - try: - payload = json.dumps({"type": "close_session"}) - await self.room.local_participant.publish_data(payload, reliable=True) - logger.info("Safeguard: close_session event sent.") - except Exception as e: - logger.warning(f"Safeguard: Failed to send event: {e}") - - await asyncio.sleep(3.0) # Give frontend more time to process - - if self.room.connection_state == "connected": - logger.warning("Safeguard: Force disconnecting room now.") - await self.room.disconnect() - else: - logger.info("Safeguard: Room already disconnected, taking no action.") - -def calculate_costs(duration_seconds: float, tts_chars: int, avatar_type: str, tts_provider: str, prompt_tokens: int = 0, completion_tokens: int = 0): - # Rates per unit - stt_rate = 0.006 # Deepgram Nova-2 ($0.006/min) - # Rates per unit (USD) - stt_rate = 0.006 # Deepgram Nova-2 ($0.006/min) - - # LLM Pricing: OpenAI GPT-OSS-120B (used for main conversation) - # Input: $0.15 / 1M tokens - # Output: $0.60 / 1M tokens - llm_rate_input = 0.15 / 1_000_000 - llm_rate_output = 0.60 / 1_000_000 - - # TTS Rates - if tts_provider == "cartesia": - tts_rate = 0.050 / 1000 # Cartesia (~$0.05/1k chars) - tts_label = "Cartesia" - elif tts_provider == "deepgram": - tts_rate = 0.015 / 1000 # Deepgram Aura ($0.015/1k chars) - tts_label = "Deepgram" - else: # Groq / Other - tts_rate = 0.000 # Assume Free/Included - tts_label = "Groq" - - # Avatar Rates - avatar_rate = 0.05 if avatar_type == 'bey' else 0 # Beyond Presence (~$0.05/min) - - # Calculate Standard Costs - stt_cost = (duration_seconds / 60) * stt_rate - tts_cost = tts_chars * tts_rate - - # Use real counts if provided, otherwise estimate (fallback) - if prompt_tokens == 0 and completion_tokens == 0: - # Usage estimates (simplified) - # Assume 150 words/min -> ~200 tokens/min input - estimated_input_tokens = (duration_seconds / 60) * 200 - estimated_output_tokens = (tts_chars / 4) # Rough char-to-token ratio - llm_cost = (estimated_input_tokens * llm_rate_input) + (estimated_output_tokens * llm_rate_output) - else: - llm_cost = (prompt_tokens * llm_rate_input) + (completion_tokens * llm_rate_output) - avatar_cost = (duration_seconds / 60) * avatar_rate - - total = stt_cost + tts_cost + llm_cost + avatar_cost - - # Log for debugging - logger.info(f"Cost calculation: duration={duration_seconds}s, tts_chars={tts_chars}, provider={tts_provider}") - logger.info(f"Costs: STT=${stt_cost:.6f}, TTS=${tts_cost:.6f}, LLM=${llm_cost:.6f}, Avatar=${avatar_cost:.6f}") - - return { - "stt": round(stt_cost, 6), - "tts": round(tts_cost, 6), - "llm": round(llm_cost, 6), - "avatar": round(avatar_cost, 6), - "total": round(total, 6), - "currency": "USD", - "labels": { - "tts": tts_label, - "stt": "Deepgram", - "llm": "Groq/OpenAI", - "avatar": "Beyond Presence" if avatar_type == 'bey' else "3D Avatar" - } - } - -async def generate_and_save_summary(db: Database, chat_ctx: llm.ChatContext, contact_number: str, duration: float, avatar_type: str, tts_provider: str, user_name: str = "the patient", usage_stats: dict = None) -> Optional[Dict[str, Any]]: - if not contact_number: - logger.warning("No contact number to save summary for.") - return - - logger.info("Generating conversation summary...") - - transcript = "" - - # Try to extract messages from chat context - try: - if hasattr(chat_ctx, 'items'): - items = chat_ctx.items - elif hasattr(chat_ctx, 'messages'): - items = chat_ctx.messages - else: - items = [] - - for item in items: - if isinstance(item, llm.ChatMessage): - role = item.role - content = item.content - if isinstance(content, list): - content = " ".join([str(c) for c in content]) - - if isinstance(content, str): - transcript += f"{role}: {content}\n" - except Exception as e: - logger.error(f"Error extracting transcript: {e}") - - # Calculate costs using official metrics if available, otherwise fallback - logger.info(f"Calculating costs with usage_stats: {usage_stats}") - if usage_stats: - tts_chars = usage_stats.get("tts_chars", 0) - prompt_tokens = usage_stats.get("llm_prompt_tokens", 0) - completion_tokens = usage_stats.get("llm_completion_tokens", 0) - costs = calculate_costs(duration, tts_chars, avatar_type, tts_provider, prompt_tokens, completion_tokens) - else: - # Fallback estimation - tts_chars = len(transcript) // 2 - costs = calculate_costs(duration, tts_chars, avatar_type, tts_provider) - - logger.info(f"Calculated costs: {costs}") - - prompt = ( - f"Summarize the conversation with {user_name} in JSON format.\n" - f"Transcript:\n{transcript}\n\n" - "CRITICAL: Use natural time formats like '9 AM' or '2:30 PM', NOT 'nine zero zero hours'\n" - "Return a valid JSON object with exactly two keys:\n" - "1. 'spoken': A 1-2 sentence spoken closing for TTS. Natural, human-like, polite. No special chars. Start with 'To recap,'.\n" - "2. 'written': A detailed bulleted summary for the user interface. Include topics, appointments booked, and outcome.\n" - "IMPORTANT: Ensure the JSON is valid. Do NOT use unescaped newlines in the 'written' string or 'spoken' string. Use \\n for line breaks.\n" - ) - - max_retries = 3 - retry_delay = 1 - - for attempt in range(max_retries): - try: - # Use Groq SDK directly instead of livekit wrapper for reliability - api_key = os.getenv("GROQ_API_KEY_SUMMARY") or get_groq_api_key() - client = GroqClient(api_key=api_key) - - # Use llama-3.3-70b-versatile for JSON reliability - response = client.chat.completions.create( - model="llama-3.3-70b-versatile", - messages=[ - {"role": "system", "content": "You are a helpful assistant. Output valid JSON only. Do not output markdown blocks."}, - {"role": "user", "content": prompt} - ], - temperature=0.7, - max_tokens=500 - ) - - full_response = response.choices[0].message.content - # Summary uses Llama-3.3-70B-Versatile - # Pricing: Input $0.59/1M, Output $0.79/1M - summary_input_cost = response.usage.prompt_tokens * (0.59 / 1_000_000) - summary_output_cost = response.usage.completion_tokens * (0.79 / 1_000_000) - summary_cost = summary_input_cost + summary_output_cost - - logger.info(f"🔍 RAW LLM RESPONSE: {full_response}") - logger.info(f"💰 Summary LLM cost: ${summary_cost:.6f} ({response.usage.prompt_tokens} + {response.usage.completion_tokens} tokens)") - - # Attempt to parse JSON - spoken = "To recap, we discussed your appointments. Have a great day!" - written = "" - - try: - # Clean up markdown code blocks if present - clean_json = full_response.replace("```json", "").replace("```", "").strip() - - # Regex heuristic to find the JSON object { ... } - import re - match = re.search(r"\{.*\}", clean_json, re.DOTALL) - if match: - clean_json = match.group(0) - - data = json.loads(clean_json) - spoken = data.get("spoken", spoken) - written = data.get("written", "") - - except (json.JSONDecodeError, AttributeError) as e: - logger.warning(f"Failed to parse JSON summary (standard): {e}. Retrying with Regex Fallback.") - # Fallback: Regex extraction for common invalid JSON issues (newlines in strings) - try: - import re - # Extract spoken - s_match = re.search(r'"spoken"\s*:\s*"(.*?)"', clean_json, re.DOTALL) - if s_match: - spoken = s_match.group(1) - - # Extract written (greedy to catch multi-line content) - w_match = re.search(r'"written"\s*:\s*"(.*?)(? str: + """Provide a summary of the current call without ending it. + + Use this when the user asks for a summary but wants to continue the conversation. + Example triggers: "Can you summarize?", "What did we discuss?", "Recap please" + + Args: + request: The user's request for a summary (e.g., "summarize", "recap") + + Returns: + str: A spoken summary of the conversation so far. + """ + logger.info(f"Generating mid-call summary (not ending): {request}") + + # Get context and metrics + contact = self.user_context.get("contact_number") + if not contact: + return "So far, we've discussed your appointments. Is there anything else I can help you with?" + + # Collect usage metrics + summary = self.usage_collector.get_summary() + usage_stats = { + "stt_duration": summary.stt_audio_duration, + "llm_prompt_tokens": summary.llm_prompt_tokens, + "llm_completion_tokens": summary.llm_completion_tokens, + "tts_chars": summary.tts_characters_count + } + + duration = (datetime.now() - self.start_time).total_seconds() + user_name = self.user_context.get("user_name", "the patient") + + # Generate summary directly + try: + summary_data = await generate_and_save_summary( + self.db, + self.assistant.chat_ctx, + contact, + duration, + self.avatar_type, + self.tts_provider, + user_name, + usage_stats + ) + if summary_data and isinstance(summary_data, dict): + spoken_summary = summary_data.get("spoken_text", "So far, we've discussed your appointments.") + logger.info(f"Mid-call summary: {spoken_summary}") + return spoken_summary + except Exception as e: + logger.error(f"Failed to generate mid-call summary: {e}") + + return "So far, we've discussed your appointments. Is there anything else I can help you with?" + + @function_tool() + async def end_conversation(self, summary_request: str): + """End the current conversation session and generate a final summary. + + Args: + summary_request: The user's request to end or wrap up (e.g. 'bye', 'summarize', 'we're done'). + """ + logger.info("Ending conversation - generating summary first") + + # GUARD: Prevent duplicate summaries + if self.summary_generated: + logger.warning("Summary already generated - skipping duplicate generation") + return "Thank you for calling. Goodbye!" + + spoken_text = "Thank you for calling. Have a great day!" + summary_sent = False + + # Get context and metrics + contact = self.user_context.get("contact_number") + if contact: + # Collect usage metrics + summary = self.usage_collector.get_summary() + usage_stats = { + "stt_duration": summary.stt_audio_duration, + "llm_prompt_tokens": summary.llm_prompt_tokens, + "llm_completion_tokens": summary.llm_completion_tokens, + "tts_chars": summary.tts_characters_count + } + + duration = (datetime.now() - self.start_time).total_seconds() + user_name = self.user_context.get("user_name", "the patient") + + # Generate summary directly + try: + summary_data = await generate_and_save_summary( + self.db, + self.assistant.chat_ctx, + contact, + duration, + self.avatar_type, + self.tts_provider, + user_name, + usage_stats + ) + if summary_data and isinstance(summary_data, dict): + # 1. Get spoken summary + spoken_text = summary_data.get("spoken_text", spoken_text) + + # 2. Publish structured data to frontend + payload = json.dumps({ + "type": "summary", + "summary": summary_data + }) + await self.room.local_participant.publish_data(payload, reliable=True) + logger.info("Summary sent to frontend") + summary_sent = True + + # Mark summary as generated to prevent duplicates + self.summary_generated = True + + # CRITICAL: Send close_session to trigger auto-disconnect for voice UX + # Small delay to ensure summary is received first + await asyncio.sleep(0.1) + close_payload = json.dumps({"type": "close_session"}) + await self.room.local_participant.publish_data(close_payload, reliable=True) + logger.info("✅ close_session sent - UI will auto-disconnect") + + except Exception as e: + logger.error(f"Failed to process summary: {e}") + + # CRITICAL: If summary wasn't sent, send fallback with at least cost structure + if not summary_sent: + logger.warning("Sending fallback summary with cost placeholder") + fallback = { + "content": "Call ended. See cost breakdown below.", + "spoken_text": spoken_text, + "costs": {"stt": 0.0, "tts": 0.0, "llm": 0.0, "avatar": 0.0, "total": 0.0}, + "status": "fallback" + } + try: + payload = json.dumps({"type": "summary", "summary": fallback}) + await self.room.local_participant.publish_data(payload, reliable=True) + logger.info("Fallback summary sent to frontend") + except Exception as e: + logger.error(f"Failed to send fallback: {e}") + + # NOTE: Don't send close_session here - let frontend's 2-second timer handle disconnect + # This ensures the summary data channel message is received before disconnect + + # 4. Request disconnect implicitly by setting flag + # The session listener will handle the actual disconnect after speech ends + self.should_disconnect = True + logger.info("Disconnect requested - waiting for speech to finish") + + # Start safeguard immediately + asyncio.create_task(self.safeguard_disconnect()) + + # Return the simplified spoken text for the agent to say immediately + return spoken_text + + async def safeguard_disconnect(self): + """Force disconnect if normal flow fails.""" + logger.info("Safeguard: Timer started (10s)...") + await asyncio.sleep(10.0) + + state = self.room.connection_state + logger.info(f"Safeguard: Timeout reached. Room state is: {state}") + + if state == "connected": + logger.warning("Safeguard: Timed out. Sending close_session event.") + try: + payload = json.dumps({"type": "close_session"}) + await self.room.local_participant.publish_data(payload, reliable=True) + logger.info("Safeguard: close_session event sent.") + except Exception as e: + logger.warning(f"Safeguard: Failed to send event: {e}") + + await asyncio.sleep(3.0) # Give frontend more time to process + + if self.room.connection_state == "connected": + logger.warning("Safeguard: Force disconnecting room now.") + await self.room.disconnect() + else: + logger.info("Safeguard: Room already disconnected, taking no action.") + +def calculate_costs(duration_seconds: float, tts_chars: int, avatar_type: str, tts_provider: str, prompt_tokens: int = 0, completion_tokens: int = 0): + # Rates per unit + stt_rate = 0.006 # Deepgram Nova-2 ($0.006/min) + # Rates per unit (USD) + stt_rate = 0.006 # Deepgram Nova-2 ($0.006/min) + + # LLM Pricing: OpenAI GPT-OSS-120B (used for main conversation) + # Input: $0.15 / 1M tokens + # Output: $0.60 / 1M tokens + llm_rate_input = 0.15 / 1_000_000 + llm_rate_output = 0.60 / 1_000_000 + + # TTS Rates + if tts_provider == "cartesia": + tts_rate = 0.050 / 1000 # Cartesia (~$0.05/1k chars) + tts_label = "Cartesia" + elif tts_provider == "deepgram": + tts_rate = 0.015 / 1000 # Deepgram Aura ($0.015/1k chars) + tts_label = "Deepgram" + else: # Groq / Other + tts_rate = 0.000 # Assume Free/Included + tts_label = "Groq" + + # Avatar Rates + avatar_rate = 0.05 if avatar_type == 'bey' else 0 # Beyond Presence (~$0.05/min) + + # Calculate Standard Costs + stt_cost = (duration_seconds / 60) * stt_rate + tts_cost = tts_chars * tts_rate + + # Use real counts if provided, otherwise estimate (fallback) + if prompt_tokens == 0 and completion_tokens == 0: + # Usage estimates (simplified) + # Assume 150 words/min -> ~200 tokens/min input + estimated_input_tokens = (duration_seconds / 60) * 200 + estimated_output_tokens = (tts_chars / 4) # Rough char-to-token ratio + llm_cost = (estimated_input_tokens * llm_rate_input) + (estimated_output_tokens * llm_rate_output) + else: + llm_cost = (prompt_tokens * llm_rate_input) + (completion_tokens * llm_rate_output) + avatar_cost = (duration_seconds / 60) * avatar_rate + + total = stt_cost + tts_cost + llm_cost + avatar_cost + + # Log for debugging + logger.info(f"Cost calculation: duration={duration_seconds}s, tts_chars={tts_chars}, provider={tts_provider}") + logger.info(f"Costs: STT=${stt_cost:.6f}, TTS=${tts_cost:.6f}, LLM=${llm_cost:.6f}, Avatar=${avatar_cost:.6f}") + + return { + "stt": round(stt_cost, 6), + "tts": round(tts_cost, 6), + "llm": round(llm_cost, 6), + "avatar": round(avatar_cost, 6), + "total": round(total, 6), + "currency": "USD", + "labels": { + "tts": tts_label, + "stt": "Deepgram", + "llm": "Groq/OpenAI", + "avatar": "Beyond Presence" if avatar_type == 'bey' else "3D Avatar" + } + } + +async def generate_and_save_summary(db: Database, chat_ctx: llm.ChatContext, contact_number: str, duration: float, avatar_type: str, tts_provider: str, user_name: str = "the patient", usage_stats: dict = None) -> Optional[Dict[str, Any]]: + if not contact_number: + logger.warning("No contact number to save summary for.") + return + + logger.info("Generating conversation summary...") + + transcript = "" + + # Try to extract messages from chat context + try: + if hasattr(chat_ctx, 'items'): + items = chat_ctx.items + elif hasattr(chat_ctx, 'messages'): + items = chat_ctx.messages + else: + items = [] + + for item in items: + if isinstance(item, llm.ChatMessage): + role = item.role + content = item.content + if isinstance(content, list): + content = " ".join([str(c) for c in content]) + + if isinstance(content, str): + transcript += f"{role}: {content}\n" + except Exception as e: + logger.error(f"Error extracting transcript: {e}") + + # Calculate costs using official metrics if available, otherwise fallback + logger.info(f"Calculating costs with usage_stats: {usage_stats}") + if usage_stats: + tts_chars = usage_stats.get("tts_chars", 0) + prompt_tokens = usage_stats.get("llm_prompt_tokens", 0) + completion_tokens = usage_stats.get("llm_completion_tokens", 0) + costs = calculate_costs(duration, tts_chars, avatar_type, tts_provider, prompt_tokens, completion_tokens) + else: + # Fallback estimation + tts_chars = len(transcript) // 2 + costs = calculate_costs(duration, tts_chars, avatar_type, tts_provider) + + logger.info(f"Calculated costs: {costs}") + + prompt = ( + f"Summarize the conversation with {user_name} in JSON format.\n" + f"Transcript:\n{transcript}\n\n" + "CRITICAL: Use natural time formats like '9 AM' or '2:30 PM', NOT 'nine zero zero hours'\n" + "Return a valid JSON object with exactly two keys:\n" + "1. 'spoken': A 1-2 sentence spoken closing for TTS. Natural, human-like, polite. No special chars. Start with 'To recap,'.\n" + "2. 'written': A detailed bulleted summary for the user interface. Include topics, appointments booked, and outcome.\n" + "IMPORTANT: Ensure the JSON is valid. Do NOT use unescaped newlines in the 'written' string or 'spoken' string. Use \\n for line breaks.\n" + ) + + max_retries = 3 + retry_delay = 1 + + for attempt in range(max_retries): + try: + # Use Groq SDK directly instead of livekit wrapper for reliability + api_key = os.getenv("GROQ_API_KEY_SUMMARY") or get_groq_api_key() + client = GroqClient(api_key=api_key) + + # Use llama-3.3-70b-versatile for JSON reliability + response = client.chat.completions.create( + model="llama-3.3-70b-versatile", + messages=[ + {"role": "system", "content": "You are a helpful assistant. Output valid JSON only. Do not output markdown blocks."}, + {"role": "user", "content": prompt} + ], + temperature=0.7, + max_tokens=500 + ) + + full_response = response.choices[0].message.content + # Summary uses Llama-3.3-70B-Versatile + # Pricing: Input $0.59/1M, Output $0.79/1M + summary_input_cost = response.usage.prompt_tokens * (0.59 / 1_000_000) + summary_output_cost = response.usage.completion_tokens * (0.79 / 1_000_000) + summary_cost = summary_input_cost + summary_output_cost + + logger.info(f"🔍 RAW LLM RESPONSE: {full_response}") + logger.info(f"💰 Summary LLM cost: ${summary_cost:.6f} ({response.usage.prompt_tokens} + {response.usage.completion_tokens} tokens)") + + # Attempt to parse JSON + spoken = "To recap, we discussed your appointments. Have a great day!" + written = "" + + try: + # Clean up markdown code blocks if present + clean_json = full_response.replace("```json", "").replace("```", "").strip() + + # Regex heuristic to find the JSON object { ... } + import re + match = re.search(r"\{.*\}", clean_json, re.DOTALL) + if match: + clean_json = match.group(0) + + data = json.loads(clean_json) + spoken = data.get("spoken", spoken) + written = data.get("written", "") + + except (json.JSONDecodeError, AttributeError) as e: + logger.warning(f"Failed to parse JSON summary (standard): {e}. Retrying with Regex Fallback.") + # Fallback: Regex extraction for common invalid JSON issues (newlines in strings) + try: + import re + # Extract spoken + s_match = re.search(r'"spoken"\s*:\s*"(.*?)"', clean_json, re.DOTALL) + if s_match: + spoken = s_match.group(1) + + # Extract written (greedy to catch multi-line content) + w_match = re.search(r'"written"\s*:\s*"(.*?)(?