NEXAS commited on
Commit
b3cb0b5
·
verified ·
1 Parent(s): 821dbdf

Upload 12 files

Browse files
Files changed (12) hide show
  1. Dockerfile +6 -0
  2. agent.py +1205 -0
  3. cache.py +87 -0
  4. db.py +358 -0
  5. extras/debug_chat_ctx.py +27 -0
  6. extras/debug_chat_ctx_v2.py +48 -0
  7. extras/health.py +70 -0
  8. extras/test_groq.py +63 -0
  9. logger.py +69 -0
  10. pinger.py +26 -0
  11. requirements.txt +12 -0
  12. validators.py +98 -0
Dockerfile ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ FROM python:3.11-slim
2
+ WORKDIR /app
3
+ COPY requirements.txt .
4
+ RUN pip install --no-cache-dir -r requirements.txt
5
+ COPY . .
6
+ CMD ["python", "agent.py", "start"]
agent.py ADDED
@@ -0,0 +1,1205 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ import asyncio
3
+ import json
4
+ import os
5
+ from datetime import datetime
6
+ from zoneinfo import ZoneInfo
7
+ from typing import Annotated, Optional, AsyncIterable, Any, Dict
8
+ import random
9
+ import http.server
10
+ import socketserver
11
+ import threading
12
+
13
+ from dotenv import load_dotenv
14
+ from livekit import rtc
15
+ from livekit.agents import (
16
+ AutoSubscribe,
17
+ JobContext,
18
+ JobProcess,
19
+ WorkerOptions,
20
+ cli,
21
+ llm,
22
+ AgentSession,
23
+ metrics,
24
+ MetricsCollectedEvent,
25
+ Agent,
26
+ )
27
+ from livekit.agents.llm import function_tool
28
+ from livekit.agents.voice import (
29
+ RunContext,
30
+ ModelSettings,
31
+ )
32
+ from livekit.plugins import openai, deepgram, cartesia, silero, groq
33
+
34
+ # Groq SDK for summary generation
35
+ from groq import Groq as GroqClient
36
+
37
+ # Monitoring and validation imports
38
+ import sentry_sdk
39
+ from logger import logger
40
+ from validators import validate_phone_number, validate_appointment_time, validate_purpose, validate_appointment_id
41
+
42
+ # Try to import Beyond Presence plugin if available
43
+ try:
44
+ from livekit.plugins import bey
45
+ BEY_AVAILABLE = True
46
+ except ImportError:
47
+ BEY_AVAILABLE = False
48
+ logging.warning("Beyond Presence plugin not available. Install with: pip install \"livekit-agents[bey]\"")
49
+
50
+ from db import Database
51
+
52
+ load_dotenv()
53
+
54
+ # Initialize Sentry for error tracking
55
+ if os.getenv("SENTRY_DSN"):
56
+ sentry_sdk.init(
57
+ dsn=os.getenv("SENTRY_DSN"),
58
+ traces_sample_rate=0.1,
59
+ environment=os.getenv("ENVIRONMENT", "production")
60
+ )
61
+ print("✅ Sentry error tracking enabled")
62
+
63
+ logger = logging.getLogger("voice-agent")
64
+ logger.setLevel(logging.INFO)
65
+
66
+ # Suppress noisy logs from libraries
67
+ logging.getLogger("hpack").setLevel(logging.WARNING)
68
+ logging.getLogger("httpx").setLevel(logging.WARNING)
69
+ logging.getLogger("livekit").setLevel(logging.INFO)
70
+ logging.getLogger("urllib3").setLevel(logging.WARNING)
71
+
72
+ def get_groq_api_key():
73
+ """Rotate between multiple Groq API keys if available to avoid rate limits."""
74
+ keys_str = os.getenv("GROQ_API_KEYS", "")
75
+ if keys_str:
76
+ keys = [k.strip() for k in keys_str.split(",") if k.strip()]
77
+ if keys:
78
+ chosen = random.choice(keys)
79
+ print(f"DEBUG: Selected Groq Key from list of {len(keys)}. Prefix: {chosen[:5]}...")
80
+ return chosen
81
+
82
+ single_key = os.getenv("GROQ_API_KEY")
83
+ if single_key:
84
+ print(f"DEBUG: Using single GROQ_API_KEY. Prefix: {single_key[:5]}...")
85
+ return single_key
86
+
87
+ print("DEBUG: No Groq API Key found!")
88
+ return None
89
+
90
+ try:
91
+ from flagsmith import Flagsmith
92
+ flagsmith = Flagsmith(environment_key=os.getenv("FLAGSMITH_ENVIRONMENT_KEY", "default"))
93
+ except Exception:
94
+ flagsmith = None
95
+
96
+ # ... (omitting lines for brevity)
97
+
98
+
99
+
100
+
101
+
102
+ SYSTEM_PROMPT = """
103
+ You are the SkyTask Clinic Assistant, a friendly and capable voice receptionist.
104
+
105
+ # User: {name} | Status: {status} | Goal: {goal_instruction}
106
+ # Rules
107
+ - Voice response: Plain text only. Natural and polite.
108
+ - Be warm: Use "Good morning", "Thank you", "Please".
109
+ - Length: 1-3 sentences, but don't be robotic.
110
+ - Speak nums: "five five five". No emojis/markdown.
111
+ - Address user by name if known.
112
+ # Flow
113
+ 1. Identify user (ask phone/name).
114
+ 2. Tools: book_appointment, check_slots, retrieve_appointments, cancel/modify, summarize_call, end_conversation.
115
+ - STRICT: Only call these tools. Do NOT invent new tools.
116
+ - Do NOT speak tool names. Execute silently.
117
+ - summarize_call: When user asks "summarize" or "recap" - gives summary but continues call
118
+ - end_conversation: When user says "end call", "goodbye", "bye" - ends the call
119
+ 3. Verify name mismatches.
120
+ # Guardrails
121
+ - Privacy protection active.
122
+ - Scope: Clinic appointments only.
123
+ """
124
+
125
+ class Assistant(Agent):
126
+ def __init__(self, db: Database, user_context: dict, room):
127
+ current_time_ist = datetime.now(ZoneInfo("Asia/Kolkata")).strftime("%Y-%m-%d %I:%M %p")
128
+
129
+ # Initialize with Guest state
130
+ instructions = SYSTEM_PROMPT.format(
131
+ name="Guest",
132
+ status="Unidentified",
133
+ goal_instruction="Ask for their phone number (and name) to pull up their file. Say: 'Hi! I'm the clinic assistant. May I have your phone number to get started?'"
134
+ )
135
+ instructions += f"\n\nCurrent time (IST): {current_time_ist}"
136
+
137
+ super().__init__(instructions=instructions)
138
+ self.db = db
139
+ self.user_context = user_context
140
+ self.room = room
141
+ self.current_time_str = current_time_ist
142
+ self.should_disconnect = False
143
+
144
+ # References needed for summary generation (set later in entrypoint)
145
+ self.usage_collector = None
146
+ self.assistant = None
147
+ self.start_time = datetime.now()
148
+ self.avatar_type = None
149
+ self.tts_provider = None
150
+
151
+ # Prevent duplicate summaries
152
+ self.summary_generated = False
153
+
154
+ # Listen for data messages from frontend (e.g., End Call button)
155
+ @room.on("data_received")
156
+ def on_data_received(data_packet):
157
+ try:
158
+ payload = data_packet.data.decode('utf-8')
159
+ data = json.loads(payload)
160
+
161
+ if data.get("type") == "request_end_call":
162
+ logger.info("🔴 Frontend requested end call via button - triggering end_conversation")
163
+ # Trigger the end_conversation tool asynchronously
164
+ asyncio.create_task(self.end_conversation("User clicked End Call button"))
165
+ except Exception as e:
166
+ logger.warning(f"Error processing frontend data message: {e}")
167
+
168
+ def update_instructions_with_name(self, name: str):
169
+ """Update the agent's instructions to include the user's name"""
170
+ try:
171
+ # Re-format with REAL name
172
+ new_instructions = SYSTEM_PROMPT.format(
173
+ name=name,
174
+ status="Authenticated",
175
+ goal_instruction=f"Help {name} with appointments. Address them as {name}."
176
+ )
177
+ full_instructions = f"{new_instructions}\n\nCurrent time (IST): {self.current_time_str}"
178
+
179
+ # Update the agent's instructions
180
+ self._instructions = full_instructions
181
+
182
+ print(f"✅ Updated agent instructions with user name: {name}")
183
+ print(f"🔍 DEBUG - NEW PROMPT:\n{new_instructions}")
184
+ return True
185
+ except Exception as e:
186
+ print(f"Failed to update instructions: {e}")
187
+ return False
188
+
189
+ # ... (omitting lines) ...
190
+
191
+ @function_tool()
192
+ async def identify_user(
193
+ self,
194
+ contact_number: str
195
+ ):
196
+ """Identify the user by their phone number. Only call this when you have received a numeric phone number.
197
+
198
+ Args:
199
+ contact_number: The user's contact phone number (e.g. 555-0101). Do not provide an empty string.
200
+ """
201
+ if not contact_number or len(contact_number.strip()) < 3:
202
+ return "Error: A valid contact number is required to identify the user."
203
+
204
+ try:
205
+ contact_number = validate_phone_number(contact_number)
206
+ except ValueError as e:
207
+ return f"Error: {str(e)}"
208
+
209
+ await self._emit_frontend_event("identify_user", "started", {"contact_number": contact_number})
210
+ logger.info(f"Identifying user with number: {contact_number}")
211
+ user = self.db.get_user(contact_number)
212
+ if not user:
213
+ user = self.db.create_user(contact_number)
214
+ is_new = True
215
+ else:
216
+ is_new = False
217
+
218
+ self.user_context["contact_number"] = contact_number
219
+ self.user_context["user_name"] = user.get("name", "User")
220
+
221
+ name = user.get('name', 'User')
222
+
223
+ # Update the agent's instructions to include the user's name
224
+ self.update_instructions_with_name(name)
225
+
226
+ # ALSO inject a system message into the chat context
227
+ # This ensures the LLM knows the name in the conversation history
228
+ if hasattr(self, 'chat_ctx') and self.chat_ctx:
229
+ try:
230
+ self.chat_ctx.items.append(
231
+ llm.ChatMessage(
232
+ role="system",
233
+ content=[f"IMPORTANT: The user's name is {name}. You MUST address them as {name} in all future responses. When they ask 'what's my name' or 'do you know my name', respond with 'Yes, {name}, your name is {name}.'"]
234
+ )
235
+ )
236
+ print(f"✅ Injected name '{name}' into chat context")
237
+ except Exception as e:
238
+ print(f"Could not inject into chat context: {e}")
239
+
240
+ # Return a message that FORCES the agent to say the name immediately
241
+ result_msg = f"User identified successfully. Their name is {name}. You MUST immediately respond by saying: 'Great to meet you, {name}! How can I help you today?' Use their name {name} in your response right now."
242
+ await self._emit_frontend_event("identify_user", "success", result={"name": name, "is_new": is_new})
243
+ return result_msg
244
+
245
+ @function_tool()
246
+ async def verify_identity(
247
+ self,
248
+ contact_number: str,
249
+ stated_name: str
250
+ ):
251
+ """Verify the user's identity using both their phone number and stated name.
252
+ Use this when the user provides both pieces of information.
253
+
254
+ Args:
255
+ contact_number: The user's phone number (numeric).
256
+ stated_name: The name the user introduced themselves with.
257
+ """
258
+ if not contact_number or len(contact_number.strip()) < 3:
259
+ return "Error: A valid contact number is required."
260
+
261
+ try:
262
+ contact_number = validate_phone_number(contact_number)
263
+ except ValueError as e:
264
+ return f"Error: {str(e)}"
265
+
266
+ await self._emit_frontend_event("verify_identity", "started", {"contact_number": contact_number, "name": stated_name})
267
+ logger.info(f"Verifying identity: {stated_name} with {contact_number}")
268
+
269
+ user = self.db.get_user(contact_number)
270
+
271
+ if not user:
272
+ # New user case with name provided
273
+ user = self.db.create_user(contact_number, name=stated_name)
274
+ is_new = True
275
+ db_name = stated_name
276
+ match = True
277
+ else:
278
+ is_new = False
279
+ db_name = user.get("name", "User")
280
+ # Simple fuzzy match check (case insensitive)
281
+ match = stated_name.lower() in db_name.lower() or db_name.lower() in stated_name.lower()
282
+
283
+ self.user_context["contact_number"] = contact_number
284
+ self.user_context["user_name"] = db_name
285
+
286
+ # Update system with the CORRECT name from DB (or new name)
287
+ self.update_instructions_with_name(db_name)
288
+
289
+ if match:
290
+ # ALSO inject a system message into the chat context
291
+ # NOTE: Disabled - chat_ctx is read-only, agent instructions are sufficient
292
+ # if hasattr(self, 'chat_ctx') and self.chat_ctx:
293
+ # try:
294
+ # self.chat_ctx.items.append(
295
+ # llm.ChatMessage(
296
+ # role="system",
297
+ # content=[f"IMPORTANT: Identity verified. User is {db_name}. Address them as {db_name}."]
298
+ # )
299
+ # )
300
+ # except Exception:
301
+ # pass
302
+
303
+ result_msg = f"Identity verified! The user is indeed {db_name}. Greet them naturally as {db_name}."
304
+ await self._emit_frontend_event("verify_identity", "success", result={"name": db_name, "match": True})
305
+ return result_msg
306
+ else:
307
+ # Name mismatch logic
308
+ result_msg = f"Identity Mismatch Warning: The phone number belongs to '{db_name}', but user said '{stated_name}'. politely ask: 'I have this number registered under {db_name}. Are you {db_name}?'"
309
+ await self._emit_frontend_event("verify_identity", "warning", result={"db_name": db_name, "stated_name": stated_name, "match": False})
310
+ return result_msg
311
+
312
+ async def _emit_frontend_event(self, tool_name: str, status: str, args: dict = None, result: dict = None):
313
+ try:
314
+ payload = json.dumps({
315
+ "type": "tool_call",
316
+ "tool": tool_name,
317
+ "status": status,
318
+ "args": args,
319
+ "result": result
320
+ })
321
+ await self.room.local_participant.publish_data(payload, reliable=True)
322
+ except Exception as e:
323
+ logger.error(f"Failed to emit frontend event: {e}")
324
+
325
+ @function_tool()
326
+ async def hello(self, response: str = ""):
327
+ """This tool is used for greetings.
328
+
329
+ Args:
330
+ response: The greeting response.
331
+ """
332
+ return "Hello! How can I help you today?"
333
+
334
+ @function_tool()
335
+ async def identify_user(
336
+ self,
337
+ contact_number: str
338
+ ):
339
+ """Identify the user by their phone number. Only call this when you have received a numeric phone number.
340
+
341
+ Args:
342
+ contact_number: The user's contact phone number (e.g. 555-0101). Do not provide an empty string.
343
+ """
344
+ if not contact_number or len(contact_number.strip()) < 3:
345
+ return "Error: A valid contact number is required to identify the user."
346
+
347
+ try:
348
+ contact_number = validate_phone_number(contact_number)
349
+ except ValueError as e:
350
+ return f"Error: {str(e)}"
351
+
352
+ await self._emit_frontend_event("identify_user", "started", {"contact_number": contact_number})
353
+ logger.info(f"Identifying user with number: {contact_number}")
354
+ user = self.db.get_user(contact_number)
355
+ if not user:
356
+ user = self.db.create_user(contact_number)
357
+ is_new = True
358
+ else:
359
+ is_new = False
360
+
361
+ self.user_context["contact_number"] = contact_number
362
+ self.user_context["user_name"] = user.get("name", "User")
363
+
364
+ # Helper comment: Name will now be picked up by the LLM from the tool return value
365
+ # and usage enforced by updated system prompts.
366
+
367
+ result_msg = f"User identified. Name: {user.get('name')}. New user: {is_new}."
368
+ await self._emit_frontend_event("identify_user", "success", result={"name": user.get('name'), "is_new": is_new})
369
+ return result_msg
370
+
371
+ @function_tool()
372
+ async def fetch_slots(self, location: str):
373
+ """Fetch available appointment slots.
374
+
375
+ Args:
376
+ location: The clinic location to check (e.g. 'main', 'downtown').
377
+ """
378
+ logger.info(f"Fetching available slots for {location}")
379
+ await self._emit_frontend_event("fetch_slots", "started", {"location": location})
380
+
381
+ # Use DB method to fetch slots (real or mock)
382
+ available_slots = self.db.get_available_slots()
383
+ slots_json = json.dumps(available_slots)
384
+
385
+ await self._emit_frontend_event("fetch_slots", "success", result=available_slots)
386
+ return slots_json
387
+
388
+ @function_tool()
389
+ async def book_appointment(
390
+ self,
391
+ time: str,
392
+ purpose: str
393
+ ):
394
+ """Book an appointment for the identified user.
395
+
396
+ Args:
397
+ time: The ISO 8601 formatted date and time for the appointment.
398
+ purpose: Purpose of the appointment.
399
+ """
400
+ await self._emit_frontend_event("book_appointment", "started", {"time": time, "purpose": purpose})
401
+ contact_number = self.user_context.get("contact_number")
402
+ if not contact_number:
403
+ return "Error: User not identified. Please ask for phone number first."
404
+
405
+ try:
406
+ contact_number = validate_phone_number(contact_number)
407
+ except ValueError as e:
408
+ return f"Error validation phone: {str(e)}"
409
+
410
+ logger.info(f"Booking appointment for {contact_number} at {time}")
411
+
412
+ is_available = self.db.check_slot_availability(datetime.fromisoformat(time))
413
+ if not is_available:
414
+ return "Error: Slot not available."
415
+
416
+ result = self.db.book_appointment(contact_number, time, purpose)
417
+ if result:
418
+ await self._emit_frontend_event("book_appointment", "success", result=result)
419
+ return f"Appointment booked successfully. ID: {result.get('id')}"
420
+ else:
421
+ await self._emit_frontend_event("book_appointment", "failed")
422
+ return "Failed to book appointment."
423
+
424
+ @function_tool()
425
+ async def retrieve_appointments(self, user_confirmation: str):
426
+ """Retrieve past and upcoming appointments for the identified user.
427
+
428
+ Args:
429
+ user_confirmation: The user's confirmation to see their appointments (e.g. 'show them', 'yes').
430
+ """
431
+ await self._emit_frontend_event("retrieve_appointments", "started")
432
+ contact_number = self.user_context.get("contact_number")
433
+ if not contact_number:
434
+ return "Error: User not identified."
435
+
436
+ try:
437
+ contact_number = validate_phone_number(contact_number)
438
+ except ValueError as e:
439
+ return f"Error: {str(e)}"
440
+
441
+ appointments = self.db.get_user_appointments(contact_number)
442
+ if not appointments:
443
+ await self._emit_frontend_event("retrieve_appointments", "success", result=[])
444
+ return "No appointments found."
445
+
446
+ await self._emit_frontend_event("retrieve_appointments", "success", result=appointments)
447
+ return json.dumps(appointments)
448
+
449
+ @function_tool()
450
+ async def cancel_appointment(
451
+ self,
452
+ appointment_id: str
453
+ ):
454
+ """Cancel an appointment.
455
+
456
+ Args:
457
+ appointment_id: The ID of the appointment to cancel.
458
+ """
459
+ await self._emit_frontend_event("cancel_appointment", "started", {"appointment_id": appointment_id})
460
+ success = self.db.cancel_appointment(appointment_id)
461
+ if success:
462
+ await self._emit_frontend_event("cancel_appointment", "success", result={"id": appointment_id})
463
+ return "Appointment cancelled successfully."
464
+ else:
465
+ await self._emit_frontend_event("cancel_appointment", "failed")
466
+ return "Failed to cancel appointment."
467
+
468
+ @function_tool()
469
+ async def modify_appointment(
470
+ self,
471
+ appointment_id: str,
472
+ new_time: str
473
+ ):
474
+ """Modify the date/time of an appointment.
475
+
476
+ Args:
477
+ appointment_id: The ID of the appointment to modify.
478
+ new_time: The new ISO 8601 formatted date and time.
479
+ """
480
+ await self._emit_frontend_event("modify_appointment", "started", {"appointment_id": appointment_id, "new_time": new_time})
481
+ success = self.db.modify_appointment(appointment_id, new_time)
482
+ if success:
483
+ await self._emit_frontend_event("modify_appointment", "success", result={"id": appointment_id, "new_time": new_time})
484
+ return "Appointment modified successfully."
485
+ else:
486
+ await self._emit_frontend_event("modify_appointment", "failed")
487
+ return "Failed to modify appointment."
488
+
489
+ @function_tool()
490
+ async def summarize_call(
491
+ self,
492
+ request: Annotated[str, "User's request for summary"] = "summarize"
493
+ ) -> str:
494
+ """Provide a summary of the current call without ending it.
495
+
496
+ Use this when the user asks for a summary but wants to continue the conversation.
497
+ Example triggers: "Can you summarize?", "What did we discuss?", "Recap please"
498
+
499
+ Args:
500
+ request: The user's request for a summary (e.g., "summarize", "recap")
501
+
502
+ Returns:
503
+ str: A spoken summary of the conversation so far.
504
+ """
505
+ logger.info(f"Generating mid-call summary (not ending): {request}")
506
+
507
+ # Get context and metrics
508
+ contact = self.user_context.get("contact_number")
509
+ if not contact:
510
+ return "So far, we've discussed your appointments. Is there anything else I can help you with?"
511
+
512
+ # Collect usage metrics
513
+ summary = self.usage_collector.get_summary()
514
+ usage_stats = {
515
+ "stt_duration": summary.stt_audio_duration,
516
+ "llm_prompt_tokens": summary.llm_prompt_tokens,
517
+ "llm_completion_tokens": summary.llm_completion_tokens,
518
+ "tts_chars": summary.tts_characters_count
519
+ }
520
+
521
+ duration = (datetime.now() - self.start_time).total_seconds()
522
+ user_name = self.user_context.get("user_name", "the patient")
523
+
524
+ # Generate summary directly
525
+ try:
526
+ summary_data = await generate_and_save_summary(
527
+ self.db,
528
+ self.assistant.chat_ctx,
529
+ contact,
530
+ duration,
531
+ self.avatar_type,
532
+ self.tts_provider,
533
+ user_name,
534
+ usage_stats
535
+ )
536
+ if summary_data and isinstance(summary_data, dict):
537
+ spoken_summary = summary_data.get("spoken_text", "So far, we've discussed your appointments.")
538
+ logger.info(f"Mid-call summary: {spoken_summary}")
539
+ return spoken_summary
540
+ except Exception as e:
541
+ logger.error(f"Failed to generate mid-call summary: {e}")
542
+
543
+ return "So far, we've discussed your appointments. Is there anything else I can help you with?"
544
+
545
+ @function_tool()
546
+ async def end_conversation(self, summary_request: str):
547
+ """End the current conversation session and generate a final summary.
548
+
549
+ Args:
550
+ summary_request: The user's request to end or wrap up (e.g. 'bye', 'summarize', 'we're done').
551
+ """
552
+ logger.info("Ending conversation - generating summary first")
553
+
554
+ # GUARD: Prevent duplicate summaries
555
+ if self.summary_generated:
556
+ logger.warning("Summary already generated - skipping duplicate generation")
557
+ return "Thank you for calling. Goodbye!"
558
+
559
+ spoken_text = "Thank you for calling. Have a great day!"
560
+ summary_sent = False
561
+
562
+ # Get context and metrics
563
+ contact = self.user_context.get("contact_number")
564
+ if contact:
565
+ # Collect usage metrics
566
+ summary = self.usage_collector.get_summary()
567
+ usage_stats = {
568
+ "stt_duration": summary.stt_audio_duration,
569
+ "llm_prompt_tokens": summary.llm_prompt_tokens,
570
+ "llm_completion_tokens": summary.llm_completion_tokens,
571
+ "tts_chars": summary.tts_characters_count
572
+ }
573
+
574
+ duration = (datetime.now() - self.start_time).total_seconds()
575
+ user_name = self.user_context.get("user_name", "the patient")
576
+
577
+ # Generate summary directly
578
+ try:
579
+ summary_data = await generate_and_save_summary(
580
+ self.db,
581
+ self.assistant.chat_ctx,
582
+ contact,
583
+ duration,
584
+ self.avatar_type,
585
+ self.tts_provider,
586
+ user_name,
587
+ usage_stats
588
+ )
589
+ if summary_data and isinstance(summary_data, dict):
590
+ # 1. Get spoken summary
591
+ spoken_text = summary_data.get("spoken_text", spoken_text)
592
+
593
+ # 2. Publish structured data to frontend
594
+ payload = json.dumps({
595
+ "type": "summary",
596
+ "summary": summary_data
597
+ })
598
+ await self.room.local_participant.publish_data(payload, reliable=True)
599
+ logger.info("Summary sent to frontend")
600
+ summary_sent = True
601
+
602
+ # Mark summary as generated to prevent duplicates
603
+ self.summary_generated = True
604
+
605
+ # CRITICAL: Send close_session to trigger auto-disconnect for voice UX
606
+ # Small delay to ensure summary is received first
607
+ await asyncio.sleep(0.1)
608
+ close_payload = json.dumps({"type": "close_session"})
609
+ await self.room.local_participant.publish_data(close_payload, reliable=True)
610
+ logger.info("✅ close_session sent - UI will auto-disconnect")
611
+
612
+ except Exception as e:
613
+ logger.error(f"Failed to process summary: {e}")
614
+
615
+ # CRITICAL: If summary wasn't sent, send fallback with at least cost structure
616
+ if not summary_sent:
617
+ logger.warning("Sending fallback summary with cost placeholder")
618
+ fallback = {
619
+ "content": "Call ended. See cost breakdown below.",
620
+ "spoken_text": spoken_text,
621
+ "costs": {"stt": 0.0, "tts": 0.0, "llm": 0.0, "avatar": 0.0, "total": 0.0},
622
+ "status": "fallback"
623
+ }
624
+ try:
625
+ payload = json.dumps({"type": "summary", "summary": fallback})
626
+ await self.room.local_participant.publish_data(payload, reliable=True)
627
+ logger.info("Fallback summary sent to frontend")
628
+ except Exception as e:
629
+ logger.error(f"Failed to send fallback: {e}")
630
+
631
+ # NOTE: Don't send close_session here - let frontend's 2-second timer handle disconnect
632
+ # This ensures the summary data channel message is received before disconnect
633
+
634
+ # 4. Request disconnect implicitly by setting flag
635
+ # The session listener will handle the actual disconnect after speech ends
636
+ self.should_disconnect = True
637
+ logger.info("Disconnect requested - waiting for speech to finish")
638
+
639
+ # Start safeguard immediately
640
+ asyncio.create_task(self.safeguard_disconnect())
641
+
642
+ # Return the simplified spoken text for the agent to say immediately
643
+ return spoken_text
644
+
645
+ async def safeguard_disconnect(self):
646
+ """Force disconnect if normal flow fails."""
647
+ logger.info("Safeguard: Timer started (10s)...")
648
+ await asyncio.sleep(10.0)
649
+
650
+ state = self.room.connection_state
651
+ logger.info(f"Safeguard: Timeout reached. Room state is: {state}")
652
+
653
+ if state == "connected":
654
+ logger.warning("Safeguard: Timed out. Sending close_session event.")
655
+ try:
656
+ payload = json.dumps({"type": "close_session"})
657
+ await self.room.local_participant.publish_data(payload, reliable=True)
658
+ logger.info("Safeguard: close_session event sent.")
659
+ except Exception as e:
660
+ logger.warning(f"Safeguard: Failed to send event: {e}")
661
+
662
+ await asyncio.sleep(3.0) # Give frontend more time to process
663
+
664
+ if self.room.connection_state == "connected":
665
+ logger.warning("Safeguard: Force disconnecting room now.")
666
+ await self.room.disconnect()
667
+ else:
668
+ logger.info("Safeguard: Room already disconnected, taking no action.")
669
+
670
+ def calculate_costs(duration_seconds: float, tts_chars: int, avatar_type: str, tts_provider: str, prompt_tokens: int = 0, completion_tokens: int = 0):
671
+ # Rates per unit
672
+ stt_rate = 0.006 # Deepgram Nova-2 ($0.006/min)
673
+ # Rates per unit (USD)
674
+ stt_rate = 0.006 # Deepgram Nova-2 ($0.006/min)
675
+
676
+ # LLM Pricing: OpenAI GPT-OSS-120B (used for main conversation)
677
+ # Input: $0.15 / 1M tokens
678
+ # Output: $0.60 / 1M tokens
679
+ llm_rate_input = 0.15 / 1_000_000
680
+ llm_rate_output = 0.60 / 1_000_000
681
+
682
+ # TTS Rates
683
+ if tts_provider == "cartesia":
684
+ tts_rate = 0.050 / 1000 # Cartesia (~$0.05/1k chars)
685
+ tts_label = "Cartesia"
686
+ elif tts_provider == "deepgram":
687
+ tts_rate = 0.015 / 1000 # Deepgram Aura ($0.015/1k chars)
688
+ tts_label = "Deepgram"
689
+ else: # Groq / Other
690
+ tts_rate = 0.000 # Assume Free/Included
691
+ tts_label = "Groq"
692
+
693
+ # Avatar Rates
694
+ avatar_rate = 0.05 if avatar_type == 'bey' else 0 # Beyond Presence (~$0.05/min)
695
+
696
+ # Calculate Standard Costs
697
+ stt_cost = (duration_seconds / 60) * stt_rate
698
+ tts_cost = tts_chars * tts_rate
699
+
700
+ # Use real counts if provided, otherwise estimate (fallback)
701
+ if prompt_tokens == 0 and completion_tokens == 0:
702
+ # Usage estimates (simplified)
703
+ # Assume 150 words/min -> ~200 tokens/min input
704
+ estimated_input_tokens = (duration_seconds / 60) * 200
705
+ estimated_output_tokens = (tts_chars / 4) # Rough char-to-token ratio
706
+ llm_cost = (estimated_input_tokens * llm_rate_input) + (estimated_output_tokens * llm_rate_output)
707
+ else:
708
+ llm_cost = (prompt_tokens * llm_rate_input) + (completion_tokens * llm_rate_output)
709
+ avatar_cost = (duration_seconds / 60) * avatar_rate
710
+
711
+ total = stt_cost + tts_cost + llm_cost + avatar_cost
712
+
713
+ # Log for debugging
714
+ logger.info(f"Cost calculation: duration={duration_seconds}s, tts_chars={tts_chars}, provider={tts_provider}")
715
+ logger.info(f"Costs: STT=${stt_cost:.6f}, TTS=${tts_cost:.6f}, LLM=${llm_cost:.6f}, Avatar=${avatar_cost:.6f}")
716
+
717
+ return {
718
+ "stt": round(stt_cost, 6),
719
+ "tts": round(tts_cost, 6),
720
+ "llm": round(llm_cost, 6),
721
+ "avatar": round(avatar_cost, 6),
722
+ "total": round(total, 6),
723
+ "currency": "USD",
724
+ "labels": {
725
+ "tts": tts_label,
726
+ "stt": "Deepgram",
727
+ "llm": "Groq/OpenAI",
728
+ "avatar": "Beyond Presence" if avatar_type == 'bey' else "3D Avatar"
729
+ }
730
+ }
731
+
732
+ async def generate_and_save_summary(db: Database, chat_ctx: llm.ChatContext, contact_number: str, duration: float, avatar_type: str, tts_provider: str, user_name: str = "the patient", usage_stats: dict = None) -> Optional[Dict[str, Any]]:
733
+ if not contact_number:
734
+ logger.warning("No contact number to save summary for.")
735
+ return
736
+
737
+ logger.info("Generating conversation summary...")
738
+
739
+ transcript = ""
740
+
741
+ # Try to extract messages from chat context
742
+ try:
743
+ if hasattr(chat_ctx, 'items'):
744
+ items = chat_ctx.items
745
+ elif hasattr(chat_ctx, 'messages'):
746
+ items = chat_ctx.messages
747
+ else:
748
+ items = []
749
+
750
+ for item in items:
751
+ if isinstance(item, llm.ChatMessage):
752
+ role = item.role
753
+ content = item.content
754
+ if isinstance(content, list):
755
+ content = " ".join([str(c) for c in content])
756
+
757
+ if isinstance(content, str):
758
+ transcript += f"{role}: {content}\n"
759
+ except Exception as e:
760
+ logger.error(f"Error extracting transcript: {e}")
761
+
762
+ # Calculate costs using official metrics if available, otherwise fallback
763
+ logger.info(f"Calculating costs with usage_stats: {usage_stats}")
764
+ if usage_stats:
765
+ tts_chars = usage_stats.get("tts_chars", 0)
766
+ prompt_tokens = usage_stats.get("llm_prompt_tokens", 0)
767
+ completion_tokens = usage_stats.get("llm_completion_tokens", 0)
768
+ costs = calculate_costs(duration, tts_chars, avatar_type, tts_provider, prompt_tokens, completion_tokens)
769
+ else:
770
+ # Fallback estimation
771
+ tts_chars = len(transcript) // 2
772
+ costs = calculate_costs(duration, tts_chars, avatar_type, tts_provider)
773
+
774
+ logger.info(f"Calculated costs: {costs}")
775
+
776
+ prompt = (
777
+ f"Summarize the conversation with {user_name} in JSON format.\n"
778
+ f"Transcript:\n{transcript}\n\n"
779
+ "CRITICAL: Use natural time formats like '9 AM' or '2:30 PM', NOT 'nine zero zero hours'\n"
780
+ "Return a valid JSON object with exactly two keys:\n"
781
+ "1. 'spoken': A 1-2 sentence spoken closing for TTS. Natural, human-like, polite. No special chars. Start with 'To recap,'.\n"
782
+ "2. 'written': A detailed bulleted summary for the user interface. Include topics, appointments booked, and outcome.\n"
783
+ "IMPORTANT: Ensure the JSON is valid. Do NOT use unescaped newlines in the 'written' string or 'spoken' string. Use \\n for line breaks.\n"
784
+ )
785
+
786
+ max_retries = 3
787
+ retry_delay = 1
788
+
789
+ for attempt in range(max_retries):
790
+ try:
791
+ # Use Groq SDK directly instead of livekit wrapper for reliability
792
+ api_key = os.getenv("GROQ_API_KEY_SUMMARY") or get_groq_api_key()
793
+ client = GroqClient(api_key=api_key)
794
+
795
+ # Use llama-3.3-70b-versatile for JSON reliability
796
+ response = client.chat.completions.create(
797
+ model="llama-3.3-70b-versatile",
798
+ messages=[
799
+ {"role": "system", "content": "You are a helpful assistant. Output valid JSON only. Do not output markdown blocks."},
800
+ {"role": "user", "content": prompt}
801
+ ],
802
+ temperature=0.7,
803
+ max_tokens=500
804
+ )
805
+
806
+ full_response = response.choices[0].message.content
807
+ # Summary uses Llama-3.3-70B-Versatile
808
+ # Pricing: Input $0.59/1M, Output $0.79/1M
809
+ summary_input_cost = response.usage.prompt_tokens * (0.59 / 1_000_000)
810
+ summary_output_cost = response.usage.completion_tokens * (0.79 / 1_000_000)
811
+ summary_cost = summary_input_cost + summary_output_cost
812
+
813
+ logger.info(f"🔍 RAW LLM RESPONSE: {full_response}")
814
+ logger.info(f"💰 Summary LLM cost: ${summary_cost:.6f} ({response.usage.prompt_tokens} + {response.usage.completion_tokens} tokens)")
815
+
816
+ # Attempt to parse JSON
817
+ spoken = "To recap, we discussed your appointments. Have a great day!"
818
+ written = ""
819
+
820
+ try:
821
+ # Clean up markdown code blocks if present
822
+ clean_json = full_response.replace("```json", "").replace("```", "").strip()
823
+
824
+ # Regex heuristic to find the JSON object { ... }
825
+ import re
826
+ match = re.search(r"\{.*\}", clean_json, re.DOTALL)
827
+ if match:
828
+ clean_json = match.group(0)
829
+
830
+ data = json.loads(clean_json)
831
+ spoken = data.get("spoken", spoken)
832
+ written = data.get("written", "")
833
+
834
+ except (json.JSONDecodeError, AttributeError) as e:
835
+ logger.warning(f"Failed to parse JSON summary (standard): {e}. Retrying with Regex Fallback.")
836
+ # Fallback: Regex extraction for common invalid JSON issues (newlines in strings)
837
+ try:
838
+ import re
839
+ # Extract spoken
840
+ s_match = re.search(r'"spoken"\s*:\s*"(.*?)"', clean_json, re.DOTALL)
841
+ if s_match:
842
+ spoken = s_match.group(1)
843
+
844
+ # Extract written (greedy to catch multi-line content)
845
+ w_match = re.search(r'"written"\s*:\s*"(.*?)(?<!\\)"', clean_json, re.DOTALL)
846
+ if w_match:
847
+ written = w_match.group(1).replace("\\n", "\n") # Unescape manual newlines
848
+ else:
849
+ # Fallback for written if regex fails but we have cleaned string
850
+ written = clean_json
851
+ except Exception as ex:
852
+ logger.error(f"Regex fallback failed: {ex}")
853
+ written = clean_json # Last resort: just show the cleaned text
854
+
855
+ # Fallback if written summary is empty
856
+ if not written.strip():
857
+ written = f"Summary: {spoken.strip()}"
858
+
859
+ logger.info(f"Spoken Summary: {spoken.strip()}")
860
+ logger.info(f"📝 WRITTEN SUMMARY:\\n{written.strip()}")
861
+ logger.info(f"=" * 80)
862
+ db.save_summary(contact_number, written.strip())
863
+
864
+ # CRITICAL: Add summary LLM cost to total costs
865
+ costs['llm'] += summary_cost
866
+ costs['total'] += summary_cost
867
+
868
+ # CRITICAL: Always return costs
869
+ summary_result = {
870
+ "text": written.strip(),
871
+ "content": written.strip(),
872
+ "spoken_text": spoken.strip(),
873
+ "costs": costs,
874
+ "status": "completed"
875
+ }
876
+ logger.info(f"📊 Summary with costs: {summary_result}")
877
+
878
+ # Print prominently to CLI
879
+ print(f"\\n{'='*80}")
880
+ print(f"📋 CALL SUMMARY GENERATED")
881
+ print(f"{'='*80}")
882
+ print(f"Contact: {contact_number}")
883
+ print(f"Summary: {written.strip()}")
884
+ print(f"Costs: STT=${costs['stt']:.4f} | TTS=${costs['tts']:.4f} | LLM=${costs['llm']:.6f} | Total=${costs['total']:.4f}")
885
+ print(f"{'='*80}\\n")
886
+
887
+ return summary_result
888
+
889
+ except Exception as e:
890
+ logger.warning(f"Summary generation attempt {attempt+1} failed: {e}")
891
+ if attempt < max_retries - 1:
892
+ await asyncio.sleep(retry_delay * (2 ** attempt)) # Exponential backoff
893
+ else:
894
+ logger.error("All summary generation attempts failed.")
895
+ return {
896
+ "text": "Call summary unavailable.",
897
+ "content": "Call summary unavailable.",
898
+ "spoken_text": "Thank you for calling. Have a great day!",
899
+ "costs": costs,
900
+ "status": "failed"
901
+ }
902
+
903
+
904
+ def prewarm(proc: JobProcess):
905
+ """Prewarm worker to reduce cold start latency"""
906
+ from logger import logger as struct_logger
907
+ from db import Database
908
+
909
+ struct_logger.info("Prewarming worker...")
910
+
911
+ try:
912
+ # 1. Initialize database connection
913
+ db = Database()
914
+ proc.userdata["db"] = db
915
+ struct_logger.info("✅ Database connection prewarmed")
916
+
917
+ # 2. Load VAD model into memory
918
+ proc.userdata["vad"] = silero.VAD.load()
919
+ struct_logger.info("✅ VAD model prewarmed")
920
+
921
+ # 3. Cache available slots
922
+ proc.userdata["slots"] = db.get_available_slots()
923
+ struct_logger.info("✅ Appointment slots cached")
924
+
925
+ # 4. Initialize cache connection
926
+ from cache import cache
927
+ proc.userdata["cache"] = cache
928
+ struct_logger.info(f"✅ Redis cache prewarmed (enabled: {cache.enabled})")
929
+
930
+ struct_logger.info("🚀 Worker prewarmed successfully - ready for calls!")
931
+
932
+ except Exception as e:
933
+ struct_logger.error(f"Prewarming failed: {e}", error=str(e))
934
+
935
+ async def entrypoint(ctx: JobContext):
936
+ # 1. Connect immediately to acknowledge assignment (Fixes AssignmentTimeoutError)
937
+ await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
938
+
939
+ # 2. Retrieve prewarmed resources or initialize if missing
940
+ if "db" in ctx.proc.userdata:
941
+ db = ctx.proc.userdata["db"]
942
+ logger.info("Using prewarmed Database connection")
943
+ else:
944
+ db = Database()
945
+ logger.info("Initialized new Database connection")
946
+
947
+ user_context = {}
948
+ participant = await ctx.wait_for_participant()
949
+
950
+ avatar_type = '3d'
951
+ user_tts_pref = None
952
+ if participant.metadata:
953
+ try:
954
+ metadata = json.loads(participant.metadata)
955
+ avatar_type = metadata.get('avatarType', '3d')
956
+ user_tts_pref = metadata.get('ttsProvider')
957
+ except Exception as e:
958
+ logger.warning(f"Failed to parse participant metadata: {e}")
959
+
960
+ logger.info(f"Avatar type requested by {participant.identity}: {avatar_type}")
961
+
962
+ # TTS Provider Selection (Feature Flag via Flagsmith)
963
+ tts_provider = os.getenv("TTS_PROVIDER", "deepgram") # Default fallback from ENV
964
+ try:
965
+ flags = flagsmith.get_environment_flags()
966
+ # Flagsmith Python SDK v3+ uses get_feature_value() not get_flag_value()
967
+ tts_provider_flag = flags.get_feature_value("tts_provider")
968
+ if tts_provider_flag:
969
+ tts_provider = tts_provider_flag
970
+ logger.info(f"Flagsmith: tts_provider={tts_provider}")
971
+ except Exception as e:
972
+ logger.warning(f"Failed to fetch feature flags from Flagsmith: {e}. Using default: {tts_provider}")
973
+
974
+ if tts_provider == "cartesia":
975
+ logger.info("Using Cartesia TTS")
976
+ agent_tts = cartesia.TTS()
977
+ elif tts_provider == "groq":
978
+ logger.info("Using Groq TTS")
979
+ agent_tts = groq.TTS(model="canopylabs/orpheus-v1-english")
980
+ else:
981
+ logger.info("Using Deepgram TTS (Default)")
982
+ agent_tts = deepgram.TTS()
983
+
984
+ # Initialize metrics collector
985
+ usage_collector = metrics.UsageCollector()
986
+
987
+ # Initialize the AgentSession with a faster model and optimized VAD
988
+ session = AgentSession(
989
+ stt=deepgram.STT(),
990
+ llm=groq.LLM(
991
+ model="openai/gpt-oss-120b",
992
+ api_key=get_groq_api_key(),
993
+ temperature=0.5,
994
+ ),
995
+ tts=agent_tts,
996
+ vad=silero.VAD.load(
997
+ min_speech_duration=0.1,
998
+ min_silence_duration=0.5, # Prevents cutting off mid-sentence
999
+ prefix_padding_duration=0.2, # Fixed deprecated argument
1000
+ ),
1001
+ )
1002
+
1003
+ @session.on("metrics_collected")
1004
+ def _on_metrics_collected(ev: MetricsCollectedEvent):
1005
+ # logger.info(f"Metrics collected: {ev.metrics}")
1006
+ usage_collector.collect(ev.metrics)
1007
+
1008
+ assistant = Assistant(db, user_context, ctx.room)
1009
+ start_time = datetime.now()
1010
+ assistant.usage_collector = usage_collector
1011
+ assistant.assistant = assistant
1012
+ assistant.avatar_type = avatar_type
1013
+ assistant.tts_provider = tts_provider
1014
+
1015
+ @session.on("agent_speech_stopped")
1016
+ def _on_agent_speech_stopped(ev: Any = None):
1017
+ """Disconnect if the agent has finished speaking and a disconnect was requested."""
1018
+ if assistant.should_disconnect:
1019
+ async def _disconnect_sequence():
1020
+ logger.info("Agent finished speaking. Sending close_session event then closing room.")
1021
+ try:
1022
+ payload = json.dumps({"type": "close_session"})
1023
+ await ctx.room.local_participant.publish_data(payload, reliable=True)
1024
+ logger.info("close_session event sent to frontend")
1025
+ except Exception as e:
1026
+ logger.warning(f"Failed to publish close_session: {e}")
1027
+
1028
+ # Give frontend time to process the event and disconnect gracefully
1029
+ await asyncio.sleep(2.0)
1030
+
1031
+ # Only force disconnect if still connected
1032
+ if ctx.room.connection_state == "connected":
1033
+ logger.info("Frontend didn't disconnect, forcing disconnect")
1034
+ await ctx.room.disconnect()
1035
+ else:
1036
+ logger.info("Frontend disconnected gracefully")
1037
+
1038
+ asyncio.create_task(_disconnect_sequence())
1039
+
1040
+ @session.on("agent_speech_interrupted")
1041
+ def _on_agent_speech_interrupted(ev: Any = None):
1042
+ """Handle case where agent summary/goodbye is interrupted by noise/user."""
1043
+ if assistant.should_disconnect:
1044
+ logger.info("Agent speech interrupted during disconnect phase. Triggering disconnect sequence.")
1045
+ # Reuse the same disconnect logic
1046
+ _on_agent_speech_stopped(ev)
1047
+
1048
+ @session.on("agent_speech_started")
1049
+ def _on_agent_speech_started(ev: Any = None):
1050
+ logger.info("Agent speech STARTED.")
1051
+
1052
+
1053
+ await session.start(room=ctx.room, agent=assistant)
1054
+
1055
+
1056
+ # NOTE: Session ready signal will be sent after avatar + greeting (line ~1051)
1057
+ # This ensures UI doesn't show 'Ready' before system is actually ready
1058
+ # NOTE: Moved session_ready to after avatar + greeting (line ~1051)
1059
+ # This ensures UI doesn't show 'Ready' before system is actually ready
1060
+
1061
+
1062
+ # If Beyond Presence avatar is requested and available, initialize it
1063
+ if avatar_type == 'bey' and BEY_AVAILABLE:
1064
+ logger.info("Initializing Beyond Presence avatar...")
1065
+
1066
+ # Send initializing signal repeatedly to ensure frontend gets it
1067
+ # (Data channel might not be fully established for 'User' yet)
1068
+ async def send_init_signal():
1069
+ for _ in range(5):
1070
+ try:
1071
+ await ctx.room.local_participant.publish_data(
1072
+ json.dumps({"type": "avatar_initializing"}),
1073
+ reliable=True
1074
+ )
1075
+ except: pass
1076
+ await asyncio.sleep(0.5)
1077
+
1078
+ asyncio.create_task(send_init_signal())
1079
+
1080
+ try:
1081
+ bey_session = bey.AvatarSession(
1082
+ api_key=os.environ.get("BEYOND_PRESENCE_API_KEY"),
1083
+ avatar_id=os.environ.get("BEYOND_PRESENCE_AVATAR_ID", "b9be11b8-89fb-4227-8f86-4a881393cbdb"),
1084
+ )
1085
+ await bey_session.start(session, room=ctx.room)
1086
+ logger.info("Beyond Presence avatar started successfully (API level)")
1087
+
1088
+ # Wait for the avatar participant to actually join the room and publish tracks
1089
+ # This ensures we don't greet while the user is still looking at a loading screen
1090
+ logger.info("Waiting for avatar participant to join room...")
1091
+ avatar_joined = False
1092
+ for _ in range(40): # Wait up to 40 seconds
1093
+ # Check if avatar is in remote participants
1094
+ # Note: identity might vary setup, but usually 'bey-avatar-agent' or similar
1095
+ # We check for ANY new participant that looks like an avatar if specific ID fails?
1096
+ # For now assume 'bey-avatar-agent'
1097
+ p = ctx.room.remote_participants.get("bey-avatar-agent")
1098
+ if p:
1099
+ # Check if they have video track
1100
+ video_tracks = [t for t in p.track_publications.values() if t.kind == rtc.TrackKind.KIND_VIDEO]
1101
+ if video_tracks:
1102
+ logger.info("✅ Avatar participant joined and video track found!")
1103
+ avatar_joined = True
1104
+ break
1105
+ await asyncio.sleep(1)
1106
+
1107
+ if not avatar_joined:
1108
+ logger.warning("Timed out waiting for avatar participant to join - proceeding anyway")
1109
+
1110
+ except Exception as e:
1111
+ logger.error(f"Failed to start Beyond Presence avatar: {e}")
1112
+ logger.info("Falling back to audio-only mode")
1113
+
1114
+ # Time-aware greeting
1115
+ hour = datetime.now(ZoneInfo("Asia/Kolkata")).hour
1116
+ if 5 <= hour < 12:
1117
+ greeting = "Good morning"
1118
+ elif 12 <= hour < 17:
1119
+ greeting = "Good afternoon"
1120
+ else:
1121
+ greeting = "Good evening"
1122
+
1123
+ # Generate greeting ONLY if session is still active
1124
+ # (Beyond Presence avatar takes ~12s to init, user might disconnect)
1125
+ # Generate greeting
1126
+ # We use room connection state as the truth, since session._state might be internal/laggy
1127
+ # Check against the Enum value properly
1128
+ if ctx.room.connection_state == rtc.ConnectionState.CONN_CONNECTED:
1129
+ try:
1130
+ logger.info(f"Speaking greeting: {greeting}...")
1131
+ # Use .say() directly for instant response
1132
+ await session.say(
1133
+ f"{greeting}, thank you for calling SkyTask Clinic. May I have your phone number?",
1134
+ allow_interruptions=True
1135
+ )
1136
+ except RuntimeError as e:
1137
+ logger.warning(f"Could not speak greeting - error: {e}")
1138
+ else:
1139
+ logger.warning("Session not running - skipping greeting (user may have disconnected)")
1140
+
1141
+ # Always send session_ready if we reached here
1142
+ try:
1143
+ payload = json.dumps({"type": "session_ready"})
1144
+ await ctx.room.local_participant.publish_data(payload, reliable=True)
1145
+ logger.info("✅ Session ready signal sent to frontend")
1146
+ except Exception as e:
1147
+ logger.warning(f"Failed to send session_ready: {e}")
1148
+
1149
+
1150
+
1151
+ # CRITICAL: Keep the agent alive while connected
1152
+ while ctx.room.connection_state == "connected":
1153
+ await asyncio.sleep(1)
1154
+
1155
+ contact_number = user_context.get("contact_number")
1156
+ if contact_number:
1157
+ logger.info("Disconnect summary generation (backup)...")
1158
+ duration = (datetime.now() - start_time).total_seconds()
1159
+ user_name = user_context.get("user_name", "the patient")
1160
+ await generate_and_save_summary(db, assistant.chat_ctx, contact_number, duration, avatar_type, tts_provider, user_name)
1161
+
1162
+
1163
+ def start_health_check_server():
1164
+ """Starts a simple HTTP server for health checks."""
1165
+ try:
1166
+ port = int(os.getenv("PORT", 8080))
1167
+
1168
+ class HealthCheckHandler(http.server.BaseHTTPRequestHandler):
1169
+ def do_GET(self):
1170
+ if self.path == "/health" or self.path == "/":
1171
+ self.send_response(200)
1172
+ self.send_header("Content-type", "application/json")
1173
+ self.end_headers()
1174
+ self.wfile.write(b'{"status": "healthy"}')
1175
+ else:
1176
+ self.send_response(404)
1177
+ self.end_headers()
1178
+
1179
+ def log_message(self, format, *args):
1180
+ pass # Suppress logs to keep console clean
1181
+
1182
+ # Allow reuse of address to prevent 'Address already in use' errors
1183
+ socketserver.TCPServer.allow_reuse_address = True
1184
+
1185
+ httpd = socketserver.TCPServer(("", port), HealthCheckHandler)
1186
+ print(f"✅ Health check server listening on port {port}")
1187
+
1188
+ # Run in a daemon thread so it doesn't block program exit
1189
+ thread = threading.Thread(target=httpd.serve_forever, daemon=True)
1190
+ thread.start()
1191
+ except Exception as e:
1192
+ print(f"⚠️ Failed to start health check server: {e}")
1193
+
1194
+ if __name__ == "__main__":
1195
+ start_health_check_server()
1196
+ # Original configuration (High Performance):
1197
+ # cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm))
1198
+
1199
+ # Configure worker for low-resource environments (Render Free Tier)
1200
+ options = WorkerOptions(
1201
+ entrypoint_fnc=entrypoint,
1202
+ prewarm_fnc=None, # Disable prewarming to save RAM
1203
+ num_idle_processes=0, # Do not keep any processes waiting
1204
+ )
1205
+ cli.run_app(options)
cache.py ADDED
@@ -0,0 +1,87 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Redis Cache Manager using Upstash
3
+ Handles session caching and user data caching
4
+ """
5
+ import os
6
+ import json
7
+ from typing import Optional, Dict, Any
8
+ from dotenv import load_dotenv
9
+ from upstash_redis import Redis
10
+
11
+ # Load environment variables
12
+ load_dotenv()
13
+
14
+ class CacheManager:
15
+ def __init__(self):
16
+ """Initialize Upstash Redis client"""
17
+ redis_url = os.getenv("UPSTASH_REDIS_REST_URL")
18
+ redis_token = os.getenv("UPSTASH_REDIS_REST_TOKEN")
19
+
20
+ if redis_url and redis_token:
21
+ self.redis = Redis(url=redis_url, token=redis_token)
22
+ self.enabled = True
23
+ print("✅ Redis cache enabled (Upstash)")
24
+ else:
25
+ self.redis = None
26
+ self.enabled = False
27
+ print("⚠️ Redis cache disabled (no credentials)")
28
+
29
+ def get(self, key: str) -> Optional[Any]:
30
+ """Get value from cache"""
31
+ if not self.enabled:
32
+ return None
33
+
34
+ try:
35
+ data = self.redis.get(key)
36
+ if data:
37
+ return json.loads(data) if isinstance(data, str) else data
38
+ return None
39
+ except Exception as e:
40
+ print(f"Cache get error: {e}")
41
+ return None
42
+
43
+ def set(self, key: str, value: Any, ttl: int = 3600):
44
+ """Set value in cache with TTL (default 1 hour)"""
45
+ if not self.enabled:
46
+ return False
47
+
48
+ try:
49
+ serialized = json.dumps(value) if not isinstance(value, str) else value
50
+ self.redis.setex(key, ttl, serialized)
51
+ return True
52
+ except Exception as e:
53
+ print(f"Cache set error: {e}")
54
+ return False
55
+
56
+ def delete(self, key: str):
57
+ """Delete key from cache"""
58
+ if not self.enabled:
59
+ return False
60
+
61
+ try:
62
+ self.redis.delete(key)
63
+ return True
64
+ except Exception as e:
65
+ print(f"Cache delete error: {e}")
66
+ return False
67
+
68
+ def get_or_fetch(self, key: str, fetch_fn, ttl: int = 3600):
69
+ """Get from cache or fetch and cache"""
70
+ # Try cache first
71
+ cached = self.get(key)
72
+ if cached is not None:
73
+ print(f"✅ Cache HIT: {key}")
74
+ return cached
75
+
76
+ # Cache miss - fetch
77
+ print(f"❌ Cache MISS: {key}")
78
+ data = fetch_fn()
79
+
80
+ # Cache for next time
81
+ if data is not None:
82
+ self.set(key, data, ttl)
83
+
84
+ return data
85
+
86
+ # Global cache instance
87
+ cache = CacheManager()
db.py ADDED
@@ -0,0 +1,358 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from datetime import datetime, timedelta
3
+ from typing import List, Optional, Dict, Any
4
+ from dotenv import load_dotenv
5
+ from supabase import create_client, Client
6
+ from cache import cache # Import Redis cache
7
+ import uuid
8
+
9
+ load_dotenv()
10
+
11
+ SUPABASE_URL = os.getenv("SUPABASE_URL")
12
+ SUPABASE_KEY = os.getenv("SUPABASE_KEY")
13
+
14
+ class Database:
15
+ def __init__(self):
16
+ self.client = None
17
+ if SUPABASE_URL and SUPABASE_KEY:
18
+ try:
19
+ self.client: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
20
+ except Exception as e:
21
+ print(f"Failed to initialize Supabase client: {e}")
22
+
23
+ # In-memory mock storage
24
+ self.mock_users = [
25
+ {"contact_number": "5550101", "name": "Alice Test", "created_at": datetime.now().isoformat()},
26
+ {"contact_number": "9730102", "name": "Naresh", "created_at": datetime.now().isoformat()}
27
+ ]
28
+ self.mock_appointments = [
29
+ {
30
+ "id": "mock_apt_1",
31
+ "contact_number": "555-0101",
32
+ "appointment_time": "2026-01-22T10:00:00",
33
+ "status": "confirmed",
34
+ "purpose": "Checkup",
35
+ "created_at": datetime.now().isoformat()
36
+ }
37
+ ]
38
+ self.mock_summaries = []
39
+ self.mock_chat_messages = []
40
+ self.cache = cache
41
+
42
+ # Initialize Mock Slots (Next 10 days)
43
+ self.mock_slots = []
44
+ base_time = datetime.now().replace(minute=0, second=0, microsecond=0)
45
+ for d in range(1, 11):
46
+ day = base_time + timedelta(days=d)
47
+ for h in [9, 10, 14, 16]:
48
+ slot_time = day.replace(hour=h).isoformat()
49
+ self.mock_slots.append({"slot_time": slot_time, "is_booked": False})
50
+
51
+ def get_available_slots(self) -> List[str]:
52
+ """Get list of available slot times."""
53
+ if self.client:
54
+ try:
55
+ response = self.client.table("appointment_slots")\
56
+ .select("slot_time")\
57
+ .eq("is_booked", False)\
58
+ .gt("slot_time", datetime.now().isoformat())\
59
+ .order("slot_time")\
60
+ .execute()
61
+ return [row["slot_time"] for row in response.data]
62
+ except Exception as e:
63
+ print(f"Error fetching slots from DB: {e}")
64
+
65
+ # Mock fallback
66
+ # Filter mock slots that are in future and not booked
67
+ now_str = datetime.now().isoformat()
68
+ return [s["slot_time"] for s in self.mock_slots if not s["is_booked"] and s["slot_time"] > now_str]
69
+
70
+ def get_user(self, contact_number: str) -> Optional[Dict[str, Any]]:
71
+ """Check if a user exists by contact number (with caching)."""
72
+ # Normalize input: remove non-digit characters
73
+ contact_number = "".join(filter(str.isdigit, str(contact_number)))
74
+
75
+ # Try cache first
76
+ cache_key = f"user:{contact_number}"
77
+ cached_user = self.cache.get(cache_key)
78
+ if cached_user:
79
+ return cached_user
80
+
81
+ if self.client:
82
+ try:
83
+ response = self.client.table("users").select("*").eq("contact_number", contact_number).execute()
84
+ if response.data:
85
+ user = response.data[0]
86
+ # Cache for 1 hour
87
+ self.cache.set(cache_key, user, ttl=3600)
88
+ return user
89
+ except Exception as e:
90
+ print(f"Error fetching user from DB (falling back to mock): {e}")
91
+
92
+ # Mock implementation fallback
93
+ for user in self.mock_users:
94
+ if user["contact_number"] == contact_number:
95
+ return user
96
+ return None
97
+
98
+ def create_user(self, contact_number: str, name: str = "Unknown") -> Optional[Dict[str, Any]]:
99
+ """Create a new user."""
100
+ if self.client:
101
+ try:
102
+ data = {"contact_number": contact_number, "name": name}
103
+ # Use upsert to handle potential race conditions or existing users
104
+ response = self.client.table("users").upsert(data).execute()
105
+ if response.data:
106
+ return response.data[0]
107
+ except Exception as e:
108
+ print(f"Error creating user in DB (falling back to mock): {e}")
109
+
110
+ # Mock implementation fallback
111
+ new_user = {"contact_number": contact_number, "name": name, "created_at": datetime.now().isoformat()}
112
+ self.mock_users.append(new_user)
113
+ return new_user
114
+
115
+ def get_user_appointments(self, contact_number: str) -> List[Dict[str, Any]]:
116
+ """Fetch past and upcoming appointments for a user."""
117
+ if self.client:
118
+ try:
119
+ response = self.client.table("appointments")\
120
+ .select("*")\
121
+ .eq("contact_number", contact_number)\
122
+ .order("appointment_time", desc=True)\
123
+ .execute()
124
+ return response.data
125
+ except Exception as e:
126
+ print(f"Error fetching appointments from DB (falling back to mock): {e}")
127
+
128
+ # Mock implementation fallback
129
+ return [
130
+ apt for apt in self.mock_appointments
131
+ if apt["contact_number"] == contact_number and apt["status"] != "cancelled"
132
+ ]
133
+
134
+ def check_slot_availability(self, appointment_time: datetime) -> bool:
135
+ """Check if a slot is valid and available."""
136
+ time_str = appointment_time.isoformat()
137
+
138
+ if self.client:
139
+ try:
140
+ # Check appointment_slots table for validity and availability
141
+ response = self.client.table("appointment_slots")\
142
+ .select("*")\
143
+ .eq("slot_time", time_str)\
144
+ .eq("is_booked", False)\
145
+ .execute()
146
+
147
+ return len(response.data) > 0
148
+ except Exception as e:
149
+ print(f"Error checking availability in DB (falling back to mock): {e}")
150
+
151
+ # Mock fallback
152
+ for slot in self.mock_slots:
153
+ if slot["slot_time"] == time_str:
154
+ return not slot["is_booked"]
155
+ return False
156
+
157
+ def book_appointment(self, contact_number: str, appointment_time: str, purpose: str = "General") -> Optional[Dict[str, Any]]:
158
+ """Book an appointment and mark slot as booked."""
159
+ if self.client:
160
+ try:
161
+ # 1. Insert into appointments
162
+ data = {
163
+ "contact_number": contact_number,
164
+ "appointment_time": appointment_time,
165
+ "status": "confirmed",
166
+ "purpose": purpose
167
+ }
168
+ response = self.client.table("appointments").insert(data).execute()
169
+
170
+ # 2. Mark slot as booked
171
+ self.client.table("appointment_slots")\
172
+ .update({"is_booked": True})\
173
+ .eq("slot_time", appointment_time)\
174
+ .execute()
175
+
176
+ if response.data:
177
+ return response.data[0]
178
+ except Exception as e:
179
+ print(f"Error booking appointment in DB (falling back to mock): {e}")
180
+
181
+ # Mock implementation fallback
182
+ import random
183
+ apt_id = f"APT-{random.randint(1000, 9999)}"
184
+ new_apt = {
185
+ "id": apt_id,
186
+ "contact_number": contact_number,
187
+ "appointment_time": appointment_time,
188
+ "status": "confirmed",
189
+ "purpose": purpose,
190
+ "created_at": datetime.now().isoformat()
191
+ }
192
+ self.mock_appointments.append(new_apt)
193
+
194
+ # Mark mock slot as booked
195
+ for slot in self.mock_slots:
196
+ if slot["slot_time"] == appointment_time:
197
+ slot["is_booked"] = True
198
+
199
+ return new_apt
200
+
201
+ def cancel_appointment(self, appointment_id: str) -> bool:
202
+ """Cancel an appointment."""
203
+ if self.client:
204
+ try:
205
+ response = self.client.table("appointments")\
206
+ .update({"status": "cancelled"})\
207
+ .eq("id", appointment_id)\
208
+ .execute()
209
+ return True
210
+ except Exception as e:
211
+ print(f"Error cancelling appointment in DB (falling back to mock): {e}")
212
+
213
+ # Mock implementation fallback
214
+ for apt in self.mock_appointments:
215
+ if apt["id"] == appointment_id:
216
+ apt["status"] = "cancelled"
217
+ return True
218
+ return False
219
+
220
+ def modify_appointment(self, appointment_id: str, new_time: str) -> bool:
221
+ """Modify appointment time."""
222
+ if self.client:
223
+ try:
224
+ response = self.client.table("appointments")\
225
+ .update({"appointment_time": new_time})\
226
+ .eq("id", appointment_id)\
227
+ .execute()
228
+ return True
229
+ except Exception as e:
230
+ print(f"Error modifying appointment in DB (falling back to mock): {e}")
231
+
232
+ # Mock implementation fallback
233
+ for apt in self.mock_appointments:
234
+ if apt["id"] == appointment_id:
235
+ apt["appointment_time"] = new_time
236
+ return True
237
+ return False
238
+
239
+ def save_summary(self, contact_number: str, summary: str) -> bool:
240
+ """Save the conversation summary."""
241
+ if self.client:
242
+ try:
243
+ data = {
244
+ "contact_number": contact_number,
245
+ "summary": summary,
246
+ "created_at": datetime.now().isoformat()
247
+ }
248
+ # Assuming a 'conversations' table exists
249
+ self.client.table("conversations").insert(data).execute()
250
+ return True
251
+ except Exception as e:
252
+ print(f"Error saving summary in DB (falling back to mock): {e}")
253
+
254
+ # Mock implementation fallback
255
+ print(f"Mock saving summary for {contact_number}: {summary}")
256
+ self.mock_summaries.append({
257
+ "contact_number": contact_number,
258
+ "summary": summary,
259
+ "created_at": datetime.now().isoformat()
260
+ })
261
+ return True
262
+
263
+ def save_chat_message(self, session_id: str, contact_number: str, role: str, content: str, tool_name: str = None, tool_args: dict = None) -> bool:
264
+ """Save a single chat message to the database"""
265
+ if self.client:
266
+ try:
267
+ data = {
268
+ "session_id": session_id,
269
+ "contact_number": contact_number,
270
+ "role": role,
271
+ "content": content,
272
+ "tool_name": tool_name,
273
+ "tool_args": tool_args,
274
+ "created_at": datetime.now().isoformat()
275
+ }
276
+ self.client.table("chat_messages").insert(data).execute()
277
+ return True
278
+ except Exception as e:
279
+ print(f"Error saving chat message to DB (falling back to mock): {e}")
280
+
281
+ # Mock fallback
282
+ self.mock_chat_messages.append({
283
+ "session_id": session_id,
284
+ "contact_number": contact_number,
285
+ "role": role,
286
+ "content": content,
287
+ "tool_name": tool_name,
288
+ "tool_args": tool_args,
289
+ "created_at": datetime.now().isoformat()
290
+ })
291
+ return True
292
+
293
+ def save_chat_transcript(self, session_id: str, contact_number: str, messages: list) -> bool:
294
+ """Save entire chat transcript (batch insert)"""
295
+ if not messages:
296
+ return False
297
+
298
+ if self.client:
299
+ try:
300
+ # Prepare batch data
301
+ data = []
302
+ for msg in messages:
303
+ data.append({
304
+ "session_id": session_id,
305
+ "contact_number": contact_number,
306
+ "role": msg.get("role"),
307
+ "content": msg.get("content"),
308
+ "tool_name": msg.get("tool_name"),
309
+ "tool_args": msg.get("tool_args"),
310
+ "created_at": datetime.now().isoformat()
311
+ })
312
+
313
+ # Batch insert
314
+ self.client.table("chat_messages").insert(data).execute()
315
+ print(f"✅ Saved {len(data)} chat messages to database")
316
+ return True
317
+ except Exception as e:
318
+ print(f"Error saving chat transcript to DB (falling back to mock): {e}")
319
+
320
+ # Mock fallback
321
+ for msg in messages:
322
+ self.mock_chat_messages.append({
323
+ "session_id": session_id,
324
+ "contact_number": contact_number,
325
+ "role": msg.get("role"),
326
+ "content": msg.get("content"),
327
+ "tool_name": msg.get("tool_name"),
328
+ "tool_args": msg.get("tool_args"),
329
+ "created_at": datetime.now().isoformat()
330
+ })
331
+ print(f"Mock saved {len(messages)} chat messages")
332
+ return True
333
+
334
+ def get_chat_history(self, contact_number: str, limit: int = 100) -> list:
335
+ """Get chat history for a user"""
336
+ if self.client:
337
+ try:
338
+ response = self.client.table("chat_messages")\
339
+ .select("*")\
340
+ .eq("contact_number", contact_number)\
341
+ .order("created_at", desc=True)\
342
+ .limit(limit)\
343
+ .execute()
344
+ return response.data if response.data else []
345
+ except Exception as e:
346
+ print(f"Error fetching chat history: {e}")
347
+
348
+ # Mock fallback
349
+ return [msg for msg in self.mock_chat_messages if msg["contact_number"] == contact_number][-limit:]
350
+
351
+ # Hardcoded slots for the 'fetch_slots' requirement
352
+ AVAILABLE_SLOTS = [
353
+ "2026-01-22T09:00:00",
354
+ "2026-01-22T10:00:00",
355
+ "2026-01-22T14:00:00",
356
+ "2026-01-23T11:00:00",
357
+ "2026-01-23T15:00:00"
358
+ ]
extras/debug_chat_ctx.py ADDED
@@ -0,0 +1,27 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+ try:
3
+ from livekit.agents import llm
4
+ print("Successfully imported livekit.agents.llm")
5
+
6
+ ctx = llm.ChatContext()
7
+ print(f"ChatContext created: {ctx}")
8
+ print(f"Attributes: {dir(ctx)}")
9
+
10
+ try:
11
+ print(f"ctx.messages: {ctx.messages}")
12
+ except AttributeError as e:
13
+ print(f"Error accessing ctx.messages: {e}")
14
+
15
+ try:
16
+ ctx.add_message(role="user", content="Hello")
17
+ print("Added message via add_message")
18
+ # Check if messages is available now
19
+ if hasattr(ctx, 'messages'):
20
+ print(f"ctx.messages after add: {ctx.messages}")
21
+ except Exception as e:
22
+ print(f"Error adding message: {e}")
23
+
24
+ except ImportError:
25
+ print("Could not import livekit.agents.llm")
26
+ except Exception as e:
27
+ print(f"An error occurred: {e}")
extras/debug_chat_ctx_v2.py ADDED
@@ -0,0 +1,48 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+ try:
3
+ from livekit.agents.llm import ChatContext, ChatMessage
4
+ print("Successfully imported livekit.agents.llm")
5
+
6
+ ctx = ChatContext()
7
+ ctx.add_message(role="user", content="Hello")
8
+
9
+ # Verify 'items' property
10
+ if hasattr(ctx, 'items'):
11
+ print(f"ctx.items type: {type(ctx.items)}")
12
+ print(f"ctx.items content: {ctx.items}")
13
+
14
+ # Try appending to items
15
+ try:
16
+ ctx.items.append(ChatMessage(role="system", content="Injected"))
17
+ print("Successfully appended to ctx.items")
18
+ print(f"ctx.items content after append: {ctx.items}")
19
+ except Exception as e:
20
+ print(f"Failed to append to ctx.items: {e}")
21
+
22
+ # Verify 'insert' method
23
+ if hasattr(ctx, 'insert'):
24
+ try:
25
+ ctx.messages.insert(0, ChatMessage(role="system", content="Inserted"))
26
+ except AttributeError:
27
+ print("ctx.messages.insert failed as expected")
28
+
29
+ try:
30
+ # Try ctx.insert(index, item) ?
31
+ # Or ctx.items.insert?
32
+ pass
33
+ except:
34
+ pass
35
+
36
+ # Try to verify how to insert at beginning
37
+ try:
38
+ # Check if we can do ctx.messages.insert replacement
39
+ # option 1: ctx.items.insert(0, msg)
40
+ ctx.items.insert(0, ChatMessage(role="system", content="Inserted at 0"))
41
+ print("Successfully inserted into ctx.items")
42
+ except Exception as e:
43
+ print(f"Failed to insert into ctx.items: {e}")
44
+
45
+ except ImportError:
46
+ print("Could not import livekit.agents.llm")
47
+ except Exception as e:
48
+ print(f"An error occurred: {e}")
extras/health.py ADDED
@@ -0,0 +1,70 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Health Check Endpoint
3
+ Monitors system health and dependencies
4
+ """
5
+ from datetime import datetime
6
+ from db import Database
7
+ from cache import cache
8
+ import os
9
+
10
+ def check_database():
11
+ """Check if database is accessible"""
12
+ try:
13
+ db = Database()
14
+ # Try a simple operation
15
+ if db.client:
16
+ # Test query
17
+ db.client.table("users").select("*").limit(1).execute()
18
+ return True
19
+ return False
20
+ except Exception as e:
21
+ print(f"Database health check failed: {e}")
22
+ return False
23
+
24
+ def check_redis():
25
+ """Check if Redis is accessible"""
26
+ try:
27
+ if not cache.enabled:
28
+ return False
29
+ # Try to set and get a test value
30
+ cache.set("health_check", "ok", ttl=10)
31
+ result = cache.get("health_check")
32
+ return result == "ok"
33
+ except Exception as e:
34
+ print(f"Redis health check failed: {e}")
35
+ return False
36
+
37
+ def check_livekit():
38
+ """Check if LiveKit credentials are configured"""
39
+ return all([
40
+ os.getenv("LIVEKIT_URL"),
41
+ os.getenv("LIVEKIT_API_KEY"),
42
+ os.getenv("LIVEKIT_API_SECRET")
43
+ ])
44
+
45
+ def check_llm():
46
+ """Check if LLM API keys are configured"""
47
+ return os.getenv("GROQ_API_KEY") is not None
48
+
49
+ def check_tts():
50
+ """Check if TTS API keys are configured"""
51
+ return os.getenv("DEEPGRAM_API_KEY") is not None
52
+
53
+ def get_health_status():
54
+ """Get comprehensive health status"""
55
+ checks = {
56
+ "database": check_database(),
57
+ "redis": check_redis(),
58
+ "livekit": check_livekit(),
59
+ "llm": check_llm(),
60
+ "tts": check_tts()
61
+ }
62
+
63
+ all_healthy = all(checks.values())
64
+
65
+ return {
66
+ "status": "healthy" if all_healthy else "degraded",
67
+ "timestamp": datetime.now().isoformat(),
68
+ "checks": checks,
69
+ "version": "1.0.0"
70
+ }
extras/test_groq.py ADDED
@@ -0,0 +1,63 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from dotenv import load_dotenv
3
+ from openai import AsyncOpenAI
4
+ import asyncio
5
+ import json
6
+
7
+ load_dotenv()
8
+
9
+ async def main():
10
+ api_key = os.environ.get("GROQ_API_KEY")
11
+ client = AsyncOpenAI(
12
+ api_key=api_key,
13
+ base_url="https://api.groq.com/openai/v1"
14
+ )
15
+
16
+ tools = [
17
+ {
18
+ "type": "function",
19
+ "function": {
20
+ "name": "get_weather",
21
+ "description": "Get the current weather in a given location",
22
+ "parameters": {
23
+ "type": "object",
24
+ "properties": {
25
+ "location": {
26
+ "type": "string",
27
+ "description": "The city and state, e.g. San Francisco, CA",
28
+ },
29
+ },
30
+ "required": ["location"],
31
+ },
32
+ },
33
+ }
34
+ ]
35
+
36
+ print("Testing Tool Call with openai/gpt-oss-20b...")
37
+
38
+ try:
39
+ response = await client.chat.completions.create(
40
+ model="openai/gpt-oss-20b",
41
+ messages=[
42
+ {"role": "user", "content": "What's the weather in San Francisco?"}
43
+ ],
44
+ tools=tools,
45
+ tool_choice="auto"
46
+ )
47
+
48
+ message = response.choices[0].message
49
+ print(f"Initial Response Role: {message.role}")
50
+
51
+ if message.tool_calls:
52
+ print(f"Tool Calls: {len(message.tool_calls)}")
53
+ for tc in message.tool_calls:
54
+ print(f" - Function: {tc.function.name}")
55
+ print(f" - Args: {tc.function.arguments}")
56
+ else:
57
+ print("No tool calls triggered.")
58
+
59
+ except Exception as e:
60
+ print(f"Failed during tool call test: {e}")
61
+
62
+ if __name__ == "__main__":
63
+ asyncio.run(main())
logger.py ADDED
@@ -0,0 +1,69 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Structured Logger for Voice Agent
3
+ Provides JSON-formatted logging for better observability
4
+ """
5
+ import logging
6
+ import json
7
+ from datetime import datetime
8
+ import sys
9
+
10
+ class JsonFormatter(logging.Formatter):
11
+ """Format logs as JSON"""
12
+ def format(self, record):
13
+ log_data = {
14
+ "timestamp": datetime.now().isoformat(),
15
+ "level": record.levelname,
16
+ "message": record.getMessage(),
17
+ "module": record.module,
18
+ "function": record.funcName,
19
+ "line": record.lineno
20
+ }
21
+
22
+ # Add extra fields if present
23
+ if hasattr(record, 'user_id'):
24
+ log_data['user_id'] = record.user_id
25
+ if hasattr(record, 'session_id'):
26
+ log_data['session_id'] = record.session_id
27
+ if hasattr(record, 'duration'):
28
+ log_data['duration'] = record.duration
29
+ if hasattr(record, 'error'):
30
+ log_data['error'] = record.error
31
+
32
+ return json.dumps(log_data)
33
+
34
+ class StructuredLogger:
35
+ """Structured logger with JSON output"""
36
+ def __init__(self, name="voice-agent"):
37
+ self.logger = logging.getLogger(name)
38
+ self.logger.setLevel(logging.INFO)
39
+
40
+ # Remove existing handlers
41
+ self.logger.handlers = []
42
+
43
+ # Console handler with JSON format
44
+ handler = logging.StreamHandler(sys.stdout)
45
+ handler.setFormatter(JsonFormatter())
46
+ self.logger.addHandler(handler)
47
+
48
+ def info(self, message, **kwargs):
49
+ """Log info message with optional context"""
50
+ extra = {k: v for k, v in kwargs.items()}
51
+ self.logger.info(message, extra=extra)
52
+
53
+ def error(self, message, **kwargs):
54
+ """Log error message with optional context"""
55
+ extra = {k: v for k, v in kwargs.items()}
56
+ self.logger.error(message, extra=extra)
57
+
58
+ def warning(self, message, **kwargs):
59
+ """Log warning message with optional context"""
60
+ extra = {k: v for k, v in kwargs.items()}
61
+ self.logger.warning(message, extra=extra)
62
+
63
+ def debug(self, message, **kwargs):
64
+ """Log debug message with optional context"""
65
+ extra = {k: v for k, v in kwargs.items()}
66
+ self.logger.debug(message, extra=extra)
67
+
68
+ # Global logger instance
69
+ logger = StructuredLogger()
pinger.py ADDED
@@ -0,0 +1,26 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import requests
2
+ import time
3
+ from datetime import datetime
4
+
5
+ URL = "https://superbryn-task-backend.onrender.com/health"
6
+ INTERVAL = 30 # seconds
7
+
8
+ def ping_server():
9
+ print(f"🚀 Starting pinger for {URL} every {INTERVAL} seconds...")
10
+ while True:
11
+ try:
12
+ timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
13
+ response = requests.get(URL, timeout=10)
14
+
15
+ if response.status_code == 200:
16
+ print(f"[{timestamp}] ✅ Success: {response.status_code} - {response.text}")
17
+ else:
18
+ print(f"[{timestamp}] ⚠️ Warning: Status {response.status_code} - {response.text}")
19
+
20
+ except requests.exceptions.RequestException as e:
21
+ print(f"[{timestamp}] ❌ Error: {e}")
22
+
23
+ time.sleep(INTERVAL)
24
+
25
+ if __name__ == "__main__":
26
+ ping_server()
requirements.txt ADDED
@@ -0,0 +1,12 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ livekit-agents[bey]>=1.3.0
2
+ livekit-plugins-openai>=0.7.0
3
+ livekit-plugins-deepgram>=0.6.0
4
+ livekit-plugins-cartesia>=0.1.0
5
+ livekit-plugins-silero>=0.6.0
6
+ livekit-plugins-groq>=0.1.0
7
+ python-dotenv
8
+ supabase
9
+ flagsmith
10
+ upstash-redis
11
+ pydantic>=2.0.0
12
+ sentry-sdk
validators.py ADDED
@@ -0,0 +1,98 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Input Validators using Pydantic
3
+ Ensures data integrity and security
4
+ """
5
+ from pydantic import BaseModel, validator, Field
6
+ from datetime import datetime
7
+ import re
8
+
9
+ class PhoneNumber(BaseModel):
10
+ """Validate phone numbers"""
11
+ number: str = Field(..., description="Phone number to validate")
12
+
13
+ @validator('number')
14
+ def validate_phone(cls, v):
15
+ if not v:
16
+ raise ValueError('Phone number cannot be empty')
17
+
18
+ # Remove all non-digits
19
+ digits = re.sub(r'\D', '', v)
20
+
21
+ # Check length (7-15 digits is standard international range)
22
+ if len(digits) < 7 or len(digits) > 15:
23
+ raise ValueError(f'Invalid phone number length: {len(digits)} digits')
24
+
25
+ return digits
26
+
27
+ @property
28
+ def formatted(self):
29
+ """Return formatted phone number"""
30
+ return self.number
31
+
32
+ class AppointmentTime(BaseModel):
33
+ """Validate appointment times"""
34
+ time: str = Field(..., description="ISO 8601 datetime string")
35
+
36
+ @validator('time')
37
+ def validate_time(cls, v):
38
+ try:
39
+ # Parse ISO 8601 datetime
40
+ dt = datetime.fromisoformat(v.replace('Z', '+00:00'))
41
+
42
+ # Check if in the future
43
+ if dt < datetime.now():
44
+ raise ValueError('Appointment time must be in the future')
45
+
46
+ return v
47
+ except ValueError as e:
48
+ raise ValueError(f'Invalid datetime format: {e}')
49
+
50
+ class AppointmentPurpose(BaseModel):
51
+ """Validate appointment purpose"""
52
+ purpose: str = Field(..., min_length=3, max_length=200)
53
+
54
+ @validator('purpose')
55
+ def validate_purpose(cls, v):
56
+ # Remove potentially dangerous characters
57
+ cleaned = re.sub(r'[<>{}]', '', v)
58
+
59
+ if len(cleaned.strip()) < 3:
60
+ raise ValueError('Purpose must be at least 3 characters')
61
+
62
+ return cleaned.strip()
63
+
64
+ class AppointmentId(BaseModel):
65
+ """Validate appointment ID"""
66
+ id: str = Field(..., description="Appointment ID")
67
+
68
+ @validator('id')
69
+ def validate_id(cls, v):
70
+ # Allow alphanumeric, hyphens, and underscores only
71
+ if not re.match(r'^[a-zA-Z0-9_-]+$', v):
72
+ raise ValueError('Invalid appointment ID format')
73
+
74
+ if len(v) > 100:
75
+ raise ValueError('Appointment ID too long')
76
+
77
+ return v
78
+
79
+ # Helper functions for easy validation
80
+ def validate_phone_number(number: str) -> str:
81
+ """Validate and return cleaned phone number"""
82
+ validated = PhoneNumber(number=number)
83
+ return validated.formatted
84
+
85
+ def validate_appointment_time(time: str) -> str:
86
+ """Validate appointment time"""
87
+ validated = AppointmentTime(time=time)
88
+ return validated.time
89
+
90
+ def validate_purpose(purpose: str) -> str:
91
+ """Validate appointment purpose"""
92
+ validated = AppointmentPurpose(purpose=purpose)
93
+ return validated.purpose
94
+
95
+ def validate_appointment_id(id: str) -> str:
96
+ """Validate appointment ID"""
97
+ validated = AppointmentId(id=id)
98
+ return validated.id