|
|
import logging |
|
|
import asyncio |
|
|
import json |
|
|
import uuid |
|
|
import os |
|
|
from datetime import datetime |
|
|
from zoneinfo import ZoneInfo |
|
|
from typing import Annotated, Optional, AsyncIterable, Any, Dict |
|
|
import random |
|
|
import http.server |
|
|
import socketserver |
|
|
import threading |
|
|
|
|
|
from dotenv import load_dotenv |
|
|
from livekit import rtc |
|
|
from livekit.agents import ( |
|
|
AutoSubscribe, |
|
|
JobContext, |
|
|
JobProcess, |
|
|
WorkerOptions, |
|
|
cli, |
|
|
llm, |
|
|
AgentSession, |
|
|
metrics, |
|
|
MetricsCollectedEvent, |
|
|
Agent, |
|
|
) |
|
|
from livekit.agents.llm import function_tool |
|
|
from livekit.agents.voice import ( |
|
|
RunContext, |
|
|
ModelSettings, |
|
|
) |
|
|
from livekit.plugins import openai, deepgram, cartesia, silero, groq |
|
|
|
|
|
|
|
|
from groq import Groq as GroqClient |
|
|
|
|
|
|
|
|
import sentry_sdk |
|
|
from logger import logger |
|
|
from validators import validate_phone_number, validate_appointment_time, validate_purpose, validate_appointment_id |
|
|
|
|
|
|
|
|
try: |
|
|
from livekit.plugins import bey |
|
|
BEY_AVAILABLE = True |
|
|
except ImportError: |
|
|
BEY_AVAILABLE = False |
|
|
logging.warning("Beyond Presence plugin not available. Install with: pip install \"livekit-agents[bey]\"") |
|
|
|
|
|
from db import Database |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
if os.getenv("SENTRY_DSN"): |
|
|
sentry_sdk.init( |
|
|
dsn=os.getenv("SENTRY_DSN"), |
|
|
traces_sample_rate=0.1, |
|
|
environment=os.getenv("ENVIRONMENT", "production") |
|
|
) |
|
|
print("✅ Sentry error tracking enabled") |
|
|
|
|
|
logger = logging.getLogger("voice-agent") |
|
|
logger.setLevel(logging.INFO) |
|
|
|
|
|
|
|
|
logging.getLogger("hpack").setLevel(logging.WARNING) |
|
|
logging.getLogger("httpx").setLevel(logging.WARNING) |
|
|
logging.getLogger("livekit").setLevel(logging.INFO) |
|
|
logging.getLogger("urllib3").setLevel(logging.WARNING) |
|
|
|
|
|
def get_groq_api_key(): |
|
|
"""Rotate between multiple Groq API keys if available to avoid rate limits.""" |
|
|
keys_str = os.getenv("GROQ_API_KEYS", "") |
|
|
if keys_str: |
|
|
keys = [k.strip() for k in keys_str.split(",") if k.strip()] |
|
|
if keys: |
|
|
chosen = random.choice(keys) |
|
|
print(f"DEBUG: Selected Groq Key from list of {len(keys)}. Prefix: {chosen[:5]}...") |
|
|
return chosen |
|
|
|
|
|
single_key = os.getenv("GROQ_API_KEY") |
|
|
if single_key: |
|
|
print(f"DEBUG: Using single GROQ_API_KEY. Prefix: {single_key[:5]}...") |
|
|
return single_key |
|
|
|
|
|
print("DEBUG: No Groq API Key found!") |
|
|
return None |
|
|
|
|
|
try: |
|
|
from flagsmith import Flagsmith |
|
|
flagsmith = Flagsmith(environment_key=os.getenv("FLAGSMITH_ENVIRONMENT_KEY", "default")) |
|
|
except Exception: |
|
|
flagsmith = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SYSTEM_PROMPT = """ |
|
|
You are the SkyTask Clinic Assistant, a friendly and capable voice receptionist. |
|
|
|
|
|
# User: {name} | Status: {status} | Goal: {goal_instruction} |
|
|
# Rules |
|
|
- Voice response: Plain text only. Natural and polite. |
|
|
- Be warm: Use "Good morning", "Thank you", "Please". |
|
|
- Length: 1-3 sentences, but don't be robotic. |
|
|
- Speak nums: "five five five". No emojis/markdown. |
|
|
- Address user by name if known. |
|
|
# Flow |
|
|
1. Identify user (ask phone/name). |
|
|
2. Tools: book_appointment, check_slots, retrieve_appointments, cancel/modify, summarize_call, end_conversation. |
|
|
- STRICT: Only call these tools. Do NOT invent new tools. |
|
|
- Do NOT speak tool names. Execute silently. |
|
|
- summarize_call: When user asks "summarize" or "recap" - gives summary but continues call |
|
|
- end_conversation: When user says "end call", "goodbye", "bye" - ends the call |
|
|
3. Verify name mismatches. |
|
|
# Guardrails |
|
|
- Privacy protection active. |
|
|
- Scope: Clinic appointments only. |
|
|
""" |
|
|
|
|
|
class Assistant(Agent): |
|
|
def __init__(self, db: Database, user_context: dict, room): |
|
|
current_time_ist = datetime.now(ZoneInfo("Asia/Kolkata")).strftime("%Y-%m-%d %I:%M %p") |
|
|
|
|
|
|
|
|
instructions = SYSTEM_PROMPT.format( |
|
|
name="Guest", |
|
|
status="Unidentified", |
|
|
goal_instruction="Ask for their phone number (and name) to pull up their file. Say: 'Hi! I'm the clinic assistant. May I have your phone number to get started?'" |
|
|
) |
|
|
instructions += f"\n\nCurrent time (IST): {current_time_ist}" |
|
|
|
|
|
super().__init__(instructions=instructions) |
|
|
self.db = db |
|
|
self.user_context = user_context |
|
|
self.room = room |
|
|
self.current_time_str = current_time_ist |
|
|
self.should_disconnect = False |
|
|
|
|
|
|
|
|
self.usage_collector = None |
|
|
self.assistant = None |
|
|
self.start_time = datetime.now() |
|
|
self.avatar_type = None |
|
|
self.tts_provider = None |
|
|
|
|
|
|
|
|
self.summary_generated = False |
|
|
|
|
|
|
|
|
@room.on("data_received") |
|
|
def on_data_received(data_packet): |
|
|
try: |
|
|
payload = data_packet.data.decode('utf-8') |
|
|
data = json.loads(payload) |
|
|
|
|
|
if data.get("type") == "request_end_call": |
|
|
logger.info("🔴 Frontend requested end call via button - triggering end_conversation") |
|
|
|
|
|
asyncio.create_task(self.end_conversation("User clicked End Call button")) |
|
|
except Exception as e: |
|
|
logger.warning(f"Error processing frontend data message: {e}") |
|
|
|
|
|
def update_instructions_with_name(self, name: str): |
|
|
"""Update the agent's instructions to include the user's name""" |
|
|
try: |
|
|
|
|
|
new_instructions = SYSTEM_PROMPT.format( |
|
|
name=name, |
|
|
status="Authenticated", |
|
|
goal_instruction=f"Help {name} with appointments. Address them as {name}." |
|
|
) |
|
|
full_instructions = f"{new_instructions}\n\nCurrent time (IST): {self.current_time_str}" |
|
|
|
|
|
|
|
|
self._instructions = full_instructions |
|
|
|
|
|
print(f"✅ Updated agent instructions with user name: {name}") |
|
|
print(f"🔍 DEBUG - NEW PROMPT:\n{new_instructions}") |
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"Failed to update instructions: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
@function_tool() |
|
|
async def identify_user( |
|
|
self, |
|
|
contact_number: str |
|
|
): |
|
|
"""Identify the user by their phone number. Only call this when you have received a numeric phone number. |
|
|
|
|
|
Args: |
|
|
contact_number: The user's contact phone number (e.g. 555-0101). Do not provide an empty string. |
|
|
""" |
|
|
if not contact_number or len(contact_number.strip()) < 3: |
|
|
return "Error: A valid contact number is required to identify the user." |
|
|
|
|
|
try: |
|
|
contact_number = validate_phone_number(contact_number) |
|
|
except ValueError as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
await self._emit_frontend_event("identify_user", "started", {"contact_number": contact_number}) |
|
|
logger.info(f"Identifying user with number: {contact_number}") |
|
|
user = self.db.get_user(contact_number) |
|
|
if not user: |
|
|
user = self.db.create_user(contact_number) |
|
|
is_new = True |
|
|
else: |
|
|
is_new = False |
|
|
|
|
|
self.user_context["contact_number"] = contact_number |
|
|
self.user_context["user_name"] = user.get("name", "User") |
|
|
|
|
|
name = user.get('name', 'User') |
|
|
|
|
|
|
|
|
self.update_instructions_with_name(name) |
|
|
|
|
|
|
|
|
|
|
|
if hasattr(self, 'chat_ctx') and self.chat_ctx: |
|
|
try: |
|
|
self.chat_ctx.items.append( |
|
|
llm.ChatMessage( |
|
|
role="system", |
|
|
content=[f"IMPORTANT: The user's name is {name}. You MUST address them as {name} in all future responses. When they ask 'what's my name' or 'do you know my name', respond with 'Yes, {name}, your name is {name}.'"] |
|
|
) |
|
|
) |
|
|
print(f"✅ Injected name '{name}' into chat context") |
|
|
except Exception as e: |
|
|
print(f"Could not inject into chat context: {e}") |
|
|
|
|
|
|
|
|
result_msg = f"User identified successfully. Their name is {name}. You MUST immediately respond by saying: 'Great to meet you, {name}! How can I help you today?' Use their name {name} in your response right now." |
|
|
await self._emit_frontend_event("identify_user", "success", result={"name": name, "is_new": is_new}) |
|
|
return result_msg |
|
|
|
|
|
@function_tool() |
|
|
async def verify_identity( |
|
|
self, |
|
|
contact_number: str, |
|
|
stated_name: str |
|
|
): |
|
|
"""Verify the user's identity using both their phone number and stated name. |
|
|
Use this when the user provides both pieces of information. |
|
|
|
|
|
Args: |
|
|
contact_number: The user's phone number (numeric). |
|
|
stated_name: The name the user introduced themselves with. |
|
|
""" |
|
|
if not contact_number or len(contact_number.strip()) < 3: |
|
|
return "Error: A valid contact number is required." |
|
|
|
|
|
try: |
|
|
contact_number = validate_phone_number(contact_number) |
|
|
except ValueError as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
await self._emit_frontend_event("verify_identity", "started", {"contact_number": contact_number, "name": stated_name}) |
|
|
logger.info(f"Verifying identity: {stated_name} with {contact_number}") |
|
|
|
|
|
user = self.db.get_user(contact_number) |
|
|
|
|
|
if not user: |
|
|
|
|
|
user = self.db.create_user(contact_number, name=stated_name) |
|
|
is_new = True |
|
|
db_name = stated_name |
|
|
match = True |
|
|
else: |
|
|
is_new = False |
|
|
db_name = user.get("name", "User") |
|
|
|
|
|
match = stated_name.lower() in db_name.lower() or db_name.lower() in stated_name.lower() |
|
|
|
|
|
self.user_context["contact_number"] = contact_number |
|
|
self.user_context["user_name"] = db_name |
|
|
|
|
|
|
|
|
self.update_instructions_with_name(db_name) |
|
|
|
|
|
if match: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
result_msg = f"Identity verified! The user is indeed {db_name}. Greet them naturally as {db_name}." |
|
|
await self._emit_frontend_event("verify_identity", "success", result={"name": db_name, "match": True}) |
|
|
return result_msg |
|
|
else: |
|
|
|
|
|
result_msg = f"Identity Mismatch Warning: The phone number belongs to '{db_name}', but user said '{stated_name}'. politely ask: 'I have this number registered under {db_name}. Are you {db_name}?'" |
|
|
await self._emit_frontend_event("verify_identity", "warning", result={"db_name": db_name, "stated_name": stated_name, "match": False}) |
|
|
return result_msg |
|
|
|
|
|
async def _emit_frontend_event(self, tool_name: str, status: str, args: dict = None, result: dict = None): |
|
|
try: |
|
|
payload = json.dumps({ |
|
|
"type": "tool_call", |
|
|
"tool": tool_name, |
|
|
"status": status, |
|
|
"args": args, |
|
|
"result": result |
|
|
}) |
|
|
await self.room.local_participant.publish_data(payload, reliable=True) |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to emit frontend event: {e}") |
|
|
|
|
|
@function_tool() |
|
|
async def hello(self, response: str = ""): |
|
|
"""This tool is used for greetings. |
|
|
|
|
|
Args: |
|
|
response: The greeting response. |
|
|
""" |
|
|
return "Hello! How can I help you today?" |
|
|
|
|
|
@function_tool() |
|
|
async def identify_user( |
|
|
self, |
|
|
contact_number: str |
|
|
): |
|
|
"""Identify the user by their phone number. Only call this when you have received a numeric phone number. |
|
|
|
|
|
Args: |
|
|
contact_number: The user's contact phone number (e.g. 555-0101). Do not provide an empty string. |
|
|
""" |
|
|
if not contact_number or len(contact_number.strip()) < 3: |
|
|
return "Error: A valid contact number is required to identify the user." |
|
|
|
|
|
try: |
|
|
contact_number = validate_phone_number(contact_number) |
|
|
except ValueError as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
await self._emit_frontend_event("identify_user", "started", {"contact_number": contact_number}) |
|
|
logger.info(f"Identifying user with number: {contact_number}") |
|
|
user = self.db.get_user(contact_number) |
|
|
if not user: |
|
|
user = self.db.create_user(contact_number) |
|
|
is_new = True |
|
|
else: |
|
|
is_new = False |
|
|
|
|
|
self.user_context["contact_number"] = contact_number |
|
|
self.user_context["user_name"] = user.get("name", "User") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
result_msg = f"User identified. Name: {user.get('name')}. New user: {is_new}." |
|
|
await self._emit_frontend_event("identify_user", "success", result={"name": user.get('name'), "is_new": is_new}) |
|
|
return result_msg |
|
|
|
|
|
@function_tool() |
|
|
async def fetch_slots(self, location: str): |
|
|
"""Fetch available appointment slots. |
|
|
|
|
|
Args: |
|
|
location: The clinic location to check (e.g. 'main', 'downtown'). |
|
|
""" |
|
|
logger.info(f"Fetching available slots for {location}") |
|
|
await self._emit_frontend_event("fetch_slots", "started", {"location": location}) |
|
|
|
|
|
|
|
|
available_slots = self.db.get_available_slots() |
|
|
slots_json = json.dumps(available_slots) |
|
|
|
|
|
await self._emit_frontend_event("fetch_slots", "success", result=available_slots) |
|
|
return slots_json |
|
|
|
|
|
@function_tool() |
|
|
async def book_appointment( |
|
|
self, |
|
|
time: str, |
|
|
purpose: str |
|
|
): |
|
|
"""Book an appointment for the identified user. |
|
|
|
|
|
Args: |
|
|
time: The ISO 8601 formatted date and time for the appointment. |
|
|
purpose: Purpose of the appointment. |
|
|
""" |
|
|
await self._emit_frontend_event("book_appointment", "started", {"time": time, "purpose": purpose}) |
|
|
contact_number = self.user_context.get("contact_number") |
|
|
if not contact_number: |
|
|
return "Error: User not identified. Please ask for phone number first." |
|
|
|
|
|
try: |
|
|
contact_number = validate_phone_number(contact_number) |
|
|
except ValueError as e: |
|
|
return f"Error validation phone: {str(e)}" |
|
|
|
|
|
logger.info(f"Booking appointment for {contact_number} at {time}") |
|
|
|
|
|
is_available = self.db.check_slot_availability(datetime.fromisoformat(time)) |
|
|
if not is_available: |
|
|
return "Error: Slot not available." |
|
|
|
|
|
result = self.db.book_appointment(contact_number, time, purpose) |
|
|
if result: |
|
|
await self._emit_frontend_event("book_appointment", "success", result=result) |
|
|
return f"Appointment booked successfully. ID: {result.get('id')}" |
|
|
else: |
|
|
await self._emit_frontend_event("book_appointment", "failed") |
|
|
return "Failed to book appointment." |
|
|
|
|
|
@function_tool() |
|
|
async def retrieve_appointments(self, user_confirmation: str): |
|
|
"""Retrieve past and upcoming appointments for the identified user. |
|
|
|
|
|
Args: |
|
|
user_confirmation: The user's confirmation to see their appointments (e.g. 'show them', 'yes'). |
|
|
""" |
|
|
await self._emit_frontend_event("retrieve_appointments", "started") |
|
|
contact_number = self.user_context.get("contact_number") |
|
|
if not contact_number: |
|
|
return "Error: User not identified." |
|
|
|
|
|
try: |
|
|
contact_number = validate_phone_number(contact_number) |
|
|
except ValueError as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
appointments = self.db.get_user_appointments(contact_number) |
|
|
if not appointments: |
|
|
await self._emit_frontend_event("retrieve_appointments", "success", result=[]) |
|
|
return "No appointments found." |
|
|
|
|
|
await self._emit_frontend_event("retrieve_appointments", "success", result=appointments) |
|
|
return json.dumps(appointments) |
|
|
|
|
|
@function_tool() |
|
|
async def cancel_appointment( |
|
|
self, |
|
|
appointment_id: str |
|
|
): |
|
|
"""Cancel an appointment. |
|
|
|
|
|
Args: |
|
|
appointment_id: The ID of the appointment to cancel. |
|
|
""" |
|
|
await self._emit_frontend_event("cancel_appointment", "started", {"appointment_id": appointment_id}) |
|
|
success = self.db.cancel_appointment(appointment_id) |
|
|
if success: |
|
|
await self._emit_frontend_event("cancel_appointment", "success", result={"id": appointment_id}) |
|
|
return "Appointment cancelled successfully." |
|
|
else: |
|
|
await self._emit_frontend_event("cancel_appointment", "failed") |
|
|
return "Failed to cancel appointment." |
|
|
|
|
|
@function_tool() |
|
|
async def modify_appointment( |
|
|
self, |
|
|
appointment_id: str, |
|
|
new_time: str |
|
|
): |
|
|
"""Modify the date/time of an appointment. |
|
|
|
|
|
Args: |
|
|
appointment_id: The ID of the appointment to modify. |
|
|
new_time: The new ISO 8601 formatted date and time. |
|
|
""" |
|
|
await self._emit_frontend_event("modify_appointment", "started", {"appointment_id": appointment_id, "new_time": new_time}) |
|
|
success = self.db.modify_appointment(appointment_id, new_time) |
|
|
if success: |
|
|
await self._emit_frontend_event("modify_appointment", "success", result={"id": appointment_id, "new_time": new_time}) |
|
|
return "Appointment modified successfully." |
|
|
else: |
|
|
await self._emit_frontend_event("modify_appointment", "failed") |
|
|
return "Failed to modify appointment." |
|
|
|
|
|
@function_tool() |
|
|
async def summarize_call( |
|
|
self, |
|
|
request: Annotated[str, "User's request for summary"] = "summarize" |
|
|
) -> str: |
|
|
"""Provide a summary of the current call without ending it. |
|
|
|
|
|
Use this when the user asks for a summary but wants to continue the conversation. |
|
|
Example triggers: "Can you summarize?", "What did we discuss?", "Recap please" |
|
|
|
|
|
Args: |
|
|
request: The user's request for a summary (e.g., "summarize", "recap") |
|
|
|
|
|
Returns: |
|
|
str: A spoken summary of the conversation so far. |
|
|
""" |
|
|
logger.info(f"Generating mid-call summary (not ending): {request}") |
|
|
|
|
|
|
|
|
contact = self.user_context.get("contact_number") |
|
|
if not contact: |
|
|
return "So far, we've discussed your appointments. Is there anything else I can help you with?" |
|
|
|
|
|
|
|
|
summary = self.usage_collector.get_summary() |
|
|
usage_stats = { |
|
|
"stt_duration": summary.stt_audio_duration, |
|
|
"llm_prompt_tokens": summary.llm_prompt_tokens, |
|
|
"llm_completion_tokens": summary.llm_completion_tokens, |
|
|
"tts_chars": summary.tts_characters_count |
|
|
} |
|
|
|
|
|
duration = (datetime.now() - self.start_time).total_seconds() |
|
|
user_name = self.user_context.get("user_name", "the patient") |
|
|
|
|
|
|
|
|
try: |
|
|
summary_data = await generate_and_save_summary( |
|
|
self.db, |
|
|
self.assistant.chat_ctx, |
|
|
contact, |
|
|
duration, |
|
|
self.avatar_type, |
|
|
self.tts_provider, |
|
|
user_name, |
|
|
usage_stats |
|
|
) |
|
|
if summary_data and isinstance(summary_data, dict): |
|
|
spoken_summary = summary_data.get("spoken_text", "So far, we've discussed your appointments.") |
|
|
logger.info(f"Mid-call summary: {spoken_summary}") |
|
|
return spoken_summary |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to generate mid-call summary: {e}") |
|
|
|
|
|
return "So far, we've discussed your appointments. Is there anything else I can help you with?" |
|
|
|
|
|
@function_tool() |
|
|
async def end_conversation(self, summary_request: str): |
|
|
"""End the current conversation session and generate a final summary. |
|
|
|
|
|
Args: |
|
|
summary_request: The user's request to end or wrap up (e.g. 'bye', 'summarize', 'we're done'). |
|
|
""" |
|
|
logger.info("Ending conversation - generating summary first") |
|
|
|
|
|
|
|
|
if self.summary_generated: |
|
|
logger.warning("Summary already generated - skipping duplicate generation") |
|
|
return "Thank you for calling. Goodbye!" |
|
|
|
|
|
spoken_text = "Thank you for calling. Have a great day!" |
|
|
summary_sent = False |
|
|
|
|
|
|
|
|
contact = self.user_context.get("contact_number") |
|
|
if contact: |
|
|
|
|
|
summary = self.usage_collector.get_summary() |
|
|
usage_stats = { |
|
|
"stt_duration": summary.stt_audio_duration, |
|
|
"llm_prompt_tokens": summary.llm_prompt_tokens, |
|
|
"llm_completion_tokens": summary.llm_completion_tokens, |
|
|
"tts_chars": summary.tts_characters_count |
|
|
} |
|
|
|
|
|
duration = (datetime.now() - self.start_time).total_seconds() |
|
|
user_name = self.user_context.get("user_name", "the patient") |
|
|
|
|
|
|
|
|
try: |
|
|
summary_data = await generate_and_save_summary( |
|
|
self.db, |
|
|
self.assistant.chat_ctx, |
|
|
contact, |
|
|
duration, |
|
|
self.avatar_type, |
|
|
self.tts_provider, |
|
|
user_name, |
|
|
usage_stats |
|
|
) |
|
|
if summary_data and isinstance(summary_data, dict): |
|
|
|
|
|
spoken_text = summary_data.get("spoken_text", spoken_text) |
|
|
|
|
|
|
|
|
payload = json.dumps({ |
|
|
"type": "summary", |
|
|
"summary": summary_data |
|
|
}) |
|
|
await self.room.local_participant.publish_data(payload, reliable=True) |
|
|
logger.info("Summary sent to frontend") |
|
|
summary_sent = True |
|
|
|
|
|
|
|
|
self.summary_generated = True |
|
|
|
|
|
|
|
|
|
|
|
await asyncio.sleep(0.1) |
|
|
close_payload = json.dumps({"type": "close_session"}) |
|
|
await self.room.local_participant.publish_data(close_payload, reliable=True) |
|
|
logger.info("✅ close_session sent - UI will auto-disconnect") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to process summary: {e}") |
|
|
|
|
|
|
|
|
if not summary_sent: |
|
|
logger.warning("Sending fallback summary with cost placeholder") |
|
|
fallback = { |
|
|
"content": "Call ended. See cost breakdown below.", |
|
|
"spoken_text": spoken_text, |
|
|
"costs": {"stt": 0.0, "tts": 0.0, "llm": 0.0, "avatar": 0.0, "total": 0.0}, |
|
|
"status": "fallback" |
|
|
} |
|
|
try: |
|
|
payload = json.dumps({"type": "summary", "summary": fallback}) |
|
|
await self.room.local_participant.publish_data(payload, reliable=True) |
|
|
logger.info("Fallback summary sent to frontend") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to send fallback: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.should_disconnect = True |
|
|
logger.info("Disconnect requested - waiting for speech to finish") |
|
|
|
|
|
|
|
|
asyncio.create_task(self.safeguard_disconnect()) |
|
|
|
|
|
|
|
|
return spoken_text |
|
|
|
|
|
async def safeguard_disconnect(self): |
|
|
"""Force disconnect if normal flow fails.""" |
|
|
logger.info("Safeguard: Timer started (10s)...") |
|
|
await asyncio.sleep(10.0) |
|
|
|
|
|
state = self.room.connection_state |
|
|
logger.info(f"Safeguard: Timeout reached. Room state is: {state}") |
|
|
|
|
|
if state == "connected": |
|
|
logger.warning("Safeguard: Timed out. Sending close_session event.") |
|
|
try: |
|
|
payload = json.dumps({"type": "close_session"}) |
|
|
await self.room.local_participant.publish_data(payload, reliable=True) |
|
|
logger.info("Safeguard: close_session event sent.") |
|
|
except Exception as e: |
|
|
logger.warning(f"Safeguard: Failed to send event: {e}") |
|
|
|
|
|
await asyncio.sleep(3.0) |
|
|
|
|
|
if self.room.connection_state == "connected": |
|
|
logger.warning("Safeguard: Force disconnecting room now.") |
|
|
await self.room.disconnect() |
|
|
else: |
|
|
logger.info("Safeguard: Room already disconnected, taking no action.") |
|
|
|
|
|
def calculate_costs(duration_seconds: float, tts_chars: int, avatar_type: str, tts_provider: str, prompt_tokens: int = 0, completion_tokens: int = 0): |
|
|
|
|
|
stt_rate = 0.006 |
|
|
|
|
|
stt_rate = 0.006 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
llm_rate_input = 0.15 / 1_000_000 |
|
|
llm_rate_output = 0.60 / 1_000_000 |
|
|
|
|
|
|
|
|
if tts_provider == "cartesia": |
|
|
tts_rate = 0.050 / 1000 |
|
|
tts_label = "Cartesia" |
|
|
elif tts_provider == "deepgram": |
|
|
tts_rate = 0.015 / 1000 |
|
|
tts_label = "Deepgram" |
|
|
else: |
|
|
tts_rate = 0.000 |
|
|
tts_label = "Groq" |
|
|
|
|
|
|
|
|
avatar_rate = 0.05 if avatar_type == 'bey' else 0 |
|
|
|
|
|
|
|
|
stt_cost = (duration_seconds / 60) * stt_rate |
|
|
tts_cost = tts_chars * tts_rate |
|
|
|
|
|
|
|
|
if prompt_tokens == 0 and completion_tokens == 0: |
|
|
|
|
|
|
|
|
estimated_input_tokens = (duration_seconds / 60) * 200 |
|
|
estimated_output_tokens = (tts_chars / 4) |
|
|
llm_cost = (estimated_input_tokens * llm_rate_input) + (estimated_output_tokens * llm_rate_output) |
|
|
else: |
|
|
llm_cost = (prompt_tokens * llm_rate_input) + (completion_tokens * llm_rate_output) |
|
|
avatar_cost = (duration_seconds / 60) * avatar_rate |
|
|
|
|
|
total = stt_cost + tts_cost + llm_cost + avatar_cost |
|
|
|
|
|
|
|
|
logger.info(f"Cost calculation: duration={duration_seconds}s, tts_chars={tts_chars}, provider={tts_provider}") |
|
|
logger.info(f"Costs: STT=${stt_cost:.6f}, TTS=${tts_cost:.6f}, LLM=${llm_cost:.6f}, Avatar=${avatar_cost:.6f}") |
|
|
|
|
|
return { |
|
|
"stt": round(stt_cost, 6), |
|
|
"tts": round(tts_cost, 6), |
|
|
"llm": round(llm_cost, 6), |
|
|
"avatar": round(avatar_cost, 6), |
|
|
"total": round(total, 6), |
|
|
"currency": "USD", |
|
|
"labels": { |
|
|
"tts": tts_label, |
|
|
"stt": "Deepgram", |
|
|
"llm": "Groq/OpenAI", |
|
|
"avatar": "Beyond Presence" if avatar_type == 'bey' else "3D Avatar" |
|
|
} |
|
|
} |
|
|
|
|
|
async def generate_and_save_summary(db: Database, chat_ctx: llm.ChatContext, contact_number: str, duration: float, avatar_type: str, tts_provider: str, user_name: str = "the patient", usage_stats: dict = None) -> Optional[Dict[str, Any]]: |
|
|
if not contact_number: |
|
|
logger.warning("No contact number to save summary for.") |
|
|
return |
|
|
|
|
|
logger.info("Generating conversation summary...") |
|
|
|
|
|
transcript = "" |
|
|
messages_to_save = [] |
|
|
|
|
|
|
|
|
try: |
|
|
if hasattr(chat_ctx, 'items'): |
|
|
items = chat_ctx.items |
|
|
elif hasattr(chat_ctx, 'messages'): |
|
|
items = chat_ctx.messages |
|
|
else: |
|
|
items = [] |
|
|
|
|
|
for item in items: |
|
|
if isinstance(item, llm.ChatMessage): |
|
|
role = item.role |
|
|
content = item.content |
|
|
|
|
|
|
|
|
content_str = content |
|
|
if isinstance(content, list): |
|
|
content_str = " ".join([str(c) for c in content]) |
|
|
|
|
|
if isinstance(content_str, str): |
|
|
transcript += f"{role}: {content_str}\n" |
|
|
|
|
|
|
|
|
msg_data = { |
|
|
"role": role, |
|
|
"content": content_str, |
|
|
"tool_name": None, |
|
|
"tool_args": None |
|
|
} |
|
|
|
|
|
|
|
|
if hasattr(item, 'tool_calls') and item.tool_calls: |
|
|
try: |
|
|
tc = item.tool_calls[0] |
|
|
|
|
|
if isinstance(tc, dict): |
|
|
msg_data["tool_name"] = tc.get('function', {}).get('name') |
|
|
msg_data["tool_args"] = tc.get('function', {}).get('arguments') |
|
|
else: |
|
|
|
|
|
fn = getattr(tc, 'function', None) |
|
|
if fn: |
|
|
msg_data["tool_name"] = getattr(fn, 'name', None) |
|
|
msg_data["tool_args"] = getattr(fn, 'arguments', None) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
if role == "tool": |
|
|
msg_data["tool_name"] = getattr(item, 'name', getattr(item, 'tool_call_id', None)) |
|
|
|
|
|
messages_to_save.append(msg_data) |
|
|
|
|
|
|
|
|
if messages_to_save: |
|
|
try: |
|
|
|
|
|
session_id = str(uuid.uuid4()) |
|
|
db.save_chat_transcript(session_id, contact_number, messages_to_save) |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to save chat transcript to DB: {e}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error extracting transcript: {e}") |
|
|
|
|
|
|
|
|
logger.info(f"Calculating costs with usage_stats: {usage_stats}") |
|
|
if usage_stats: |
|
|
tts_chars = usage_stats.get("tts_chars", 0) |
|
|
prompt_tokens = usage_stats.get("llm_prompt_tokens", 0) |
|
|
completion_tokens = usage_stats.get("llm_completion_tokens", 0) |
|
|
costs = calculate_costs(duration, tts_chars, avatar_type, tts_provider, prompt_tokens, completion_tokens) |
|
|
else: |
|
|
|
|
|
tts_chars = len(transcript) // 2 |
|
|
costs = calculate_costs(duration, tts_chars, avatar_type, tts_provider) |
|
|
|
|
|
logger.info(f"Calculated costs: {costs}") |
|
|
|
|
|
prompt = ( |
|
|
f"Summarize the conversation with {user_name} in JSON format.\n" |
|
|
f"Transcript:\n{transcript}\n\n" |
|
|
"CRITICAL: Use natural time formats like '9 AM' or '2:30 PM', NOT 'nine zero zero hours'\n" |
|
|
"Return a valid JSON object with exactly two keys:\n" |
|
|
"1. 'spoken': A 1-2 sentence spoken closing for TTS. Natural, human-like, polite. No special chars. Start with 'To recap,'.\n" |
|
|
"2. 'written': A detailed bulleted summary for the user interface. Include topics, appointments booked, and outcome.\n" |
|
|
"IMPORTANT: Ensure the JSON is valid. Do NOT use unescaped newlines in the 'written' string or 'spoken' string. Use \\n for line breaks.\n" |
|
|
) |
|
|
|
|
|
max_retries = 3 |
|
|
retry_delay = 1 |
|
|
|
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
|
|
|
api_key = os.getenv("GROQ_API_KEY_SUMMARY") or get_groq_api_key() |
|
|
client = GroqClient(api_key=api_key) |
|
|
|
|
|
|
|
|
response = client.chat.completions.create( |
|
|
model="llama-3.3-70b-versatile", |
|
|
messages=[ |
|
|
{"role": "system", "content": "You are a helpful assistant. Output valid JSON only. Do not output markdown blocks."}, |
|
|
{"role": "user", "content": prompt} |
|
|
], |
|
|
temperature=0.7, |
|
|
max_tokens=500 |
|
|
) |
|
|
|
|
|
full_response = response.choices[0].message.content |
|
|
|
|
|
|
|
|
summary_input_cost = response.usage.prompt_tokens * (0.59 / 1_000_000) |
|
|
summary_output_cost = response.usage.completion_tokens * (0.79 / 1_000_000) |
|
|
summary_cost = summary_input_cost + summary_output_cost |
|
|
|
|
|
logger.info(f"🔍 RAW LLM RESPONSE: {full_response}") |
|
|
logger.info(f"💰 Summary LLM cost: ${summary_cost:.6f} ({response.usage.prompt_tokens} + {response.usage.completion_tokens} tokens)") |
|
|
|
|
|
|
|
|
spoken = "To recap, we discussed your appointments. Have a great day!" |
|
|
written = "" |
|
|
|
|
|
try: |
|
|
|
|
|
clean_json = full_response.replace("```json", "").replace("```", "").strip() |
|
|
|
|
|
|
|
|
import re |
|
|
match = re.search(r"\{.*\}", clean_json, re.DOTALL) |
|
|
if match: |
|
|
clean_json = match.group(0) |
|
|
|
|
|
data = json.loads(clean_json) |
|
|
spoken = data.get("spoken", spoken) |
|
|
written = data.get("written", "") |
|
|
|
|
|
except (json.JSONDecodeError, AttributeError) as e: |
|
|
logger.warning(f"Failed to parse JSON summary (standard): {e}. Retrying with Regex Fallback.") |
|
|
|
|
|
try: |
|
|
import re |
|
|
|
|
|
s_match = re.search(r'"spoken"\s*:\s*"(.*?)"', clean_json, re.DOTALL) |
|
|
if s_match: |
|
|
spoken = s_match.group(1) |
|
|
|
|
|
|
|
|
w_match = re.search(r'"written"\s*:\s*"(.*?)(?<!\\)"', clean_json, re.DOTALL) |
|
|
if w_match: |
|
|
written = w_match.group(1).replace("\\n", "\n") |
|
|
else: |
|
|
|
|
|
written = clean_json |
|
|
except Exception as ex: |
|
|
logger.error(f"Regex fallback failed: {ex}") |
|
|
written = clean_json |
|
|
|
|
|
|
|
|
if not written.strip(): |
|
|
written = f"Summary: {spoken.strip()}" |
|
|
|
|
|
logger.info(f"Spoken Summary: {spoken.strip()}") |
|
|
logger.info(f"📝 WRITTEN SUMMARY:\\n{written.strip()}") |
|
|
logger.info(f"=" * 80) |
|
|
db.save_summary(contact_number, written.strip()) |
|
|
|
|
|
|
|
|
costs['llm'] += summary_cost |
|
|
costs['total'] += summary_cost |
|
|
|
|
|
|
|
|
summary_result = { |
|
|
"text": written.strip(), |
|
|
"content": written.strip(), |
|
|
"spoken_text": spoken.strip(), |
|
|
"costs": costs, |
|
|
"status": "completed" |
|
|
} |
|
|
logger.info(f"📊 Summary with costs: {summary_result}") |
|
|
|
|
|
|
|
|
print(f"\\n{'='*80}") |
|
|
print(f"📋 CALL SUMMARY GENERATED") |
|
|
print(f"{'='*80}") |
|
|
print(f"Contact: {contact_number}") |
|
|
print(f"Summary: {written.strip()}") |
|
|
print(f"Costs: STT=${costs['stt']:.4f} | TTS=${costs['tts']:.4f} | LLM=${costs['llm']:.6f} | Total=${costs['total']:.4f}") |
|
|
print(f"{'='*80}\\n") |
|
|
|
|
|
return summary_result |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Summary generation attempt {attempt+1} failed: {e}") |
|
|
if attempt < max_retries - 1: |
|
|
await asyncio.sleep(retry_delay * (2 ** attempt)) |
|
|
else: |
|
|
logger.error("All summary generation attempts failed.") |
|
|
return { |
|
|
"text": "Call summary unavailable.", |
|
|
"content": "Call summary unavailable.", |
|
|
"spoken_text": "Thank you for calling. Have a great day!", |
|
|
"costs": costs, |
|
|
"status": "failed" |
|
|
} |
|
|
|
|
|
|
|
|
def prewarm(proc: JobProcess): |
|
|
"""Prewarm worker to reduce cold start latency""" |
|
|
from logger import logger as struct_logger |
|
|
from db import Database |
|
|
|
|
|
struct_logger.info("Prewarming worker...") |
|
|
|
|
|
try: |
|
|
|
|
|
db = Database() |
|
|
proc.userdata["db"] = db |
|
|
struct_logger.info("✅ Database connection prewarmed") |
|
|
|
|
|
|
|
|
proc.userdata["vad"] = silero.VAD.load() |
|
|
struct_logger.info("✅ VAD model prewarmed") |
|
|
|
|
|
|
|
|
proc.userdata["slots"] = db.get_available_slots() |
|
|
struct_logger.info("✅ Appointment slots cached") |
|
|
|
|
|
|
|
|
from cache import cache |
|
|
proc.userdata["cache"] = cache |
|
|
struct_logger.info(f"✅ Redis cache prewarmed (enabled: {cache.enabled})") |
|
|
|
|
|
struct_logger.info("🚀 Worker prewarmed successfully - ready for calls!") |
|
|
|
|
|
except Exception as e: |
|
|
struct_logger.error(f"Prewarming failed: {e}", error=str(e)) |
|
|
|
|
|
async def entrypoint(ctx: JobContext): |
|
|
|
|
|
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY) |
|
|
|
|
|
|
|
|
if "db" in ctx.proc.userdata: |
|
|
db = ctx.proc.userdata["db"] |
|
|
logger.info("Using prewarmed Database connection") |
|
|
else: |
|
|
db = Database() |
|
|
logger.info("Initialized new Database connection") |
|
|
|
|
|
user_context = {} |
|
|
participant = await ctx.wait_for_participant() |
|
|
|
|
|
avatar_type = '3d' |
|
|
user_tts_pref = None |
|
|
if participant.metadata: |
|
|
try: |
|
|
metadata = json.loads(participant.metadata) |
|
|
avatar_type = metadata.get('avatarType', '3d') |
|
|
user_tts_pref = metadata.get('ttsProvider') |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to parse participant metadata: {e}") |
|
|
|
|
|
logger.info(f"Avatar type requested by {participant.identity}: {avatar_type}") |
|
|
|
|
|
|
|
|
tts_provider = os.getenv("TTS_PROVIDER", "deepgram") |
|
|
try: |
|
|
flags = flagsmith.get_environment_flags() |
|
|
|
|
|
tts_provider_flag = flags.get_feature_value("tts_provider") |
|
|
if tts_provider_flag: |
|
|
tts_provider = tts_provider_flag |
|
|
logger.info(f"Flagsmith: tts_provider={tts_provider}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to fetch feature flags from Flagsmith: {e}. Using default: {tts_provider}") |
|
|
|
|
|
if tts_provider == "cartesia": |
|
|
logger.info("Using Cartesia TTS") |
|
|
agent_tts = cartesia.TTS() |
|
|
elif tts_provider == "groq": |
|
|
logger.info("Using Groq TTS") |
|
|
agent_tts = groq.TTS(model="canopylabs/orpheus-v1-english") |
|
|
else: |
|
|
logger.info("Using Deepgram TTS (Default)") |
|
|
agent_tts = deepgram.TTS() |
|
|
|
|
|
|
|
|
usage_collector = metrics.UsageCollector() |
|
|
|
|
|
|
|
|
session = AgentSession( |
|
|
stt=deepgram.STT(), |
|
|
llm=groq.LLM( |
|
|
model="openai/gpt-oss-120b", |
|
|
api_key=get_groq_api_key(), |
|
|
temperature=0.5, |
|
|
), |
|
|
tts=agent_tts, |
|
|
vad=silero.VAD.load( |
|
|
min_speech_duration=0.1, |
|
|
min_silence_duration=0.5, |
|
|
prefix_padding_duration=0.2, |
|
|
), |
|
|
) |
|
|
|
|
|
@session.on("metrics_collected") |
|
|
def _on_metrics_collected(ev: MetricsCollectedEvent): |
|
|
|
|
|
usage_collector.collect(ev.metrics) |
|
|
|
|
|
assistant = Assistant(db, user_context, ctx.room) |
|
|
start_time = datetime.now() |
|
|
assistant.usage_collector = usage_collector |
|
|
assistant.assistant = assistant |
|
|
assistant.avatar_type = avatar_type |
|
|
assistant.tts_provider = tts_provider |
|
|
|
|
|
@session.on("agent_speech_stopped") |
|
|
def _on_agent_speech_stopped(ev: Any = None): |
|
|
"""Disconnect if the agent has finished speaking and a disconnect was requested.""" |
|
|
if assistant.should_disconnect: |
|
|
async def _disconnect_sequence(): |
|
|
logger.info("Agent finished speaking. Sending close_session event then closing room.") |
|
|
try: |
|
|
payload = json.dumps({"type": "close_session"}) |
|
|
await ctx.room.local_participant.publish_data(payload, reliable=True) |
|
|
logger.info("close_session event sent to frontend") |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to publish close_session: {e}") |
|
|
|
|
|
|
|
|
await asyncio.sleep(2.0) |
|
|
|
|
|
|
|
|
if ctx.room.connection_state == "connected": |
|
|
logger.info("Frontend didn't disconnect, forcing disconnect") |
|
|
await ctx.room.disconnect() |
|
|
else: |
|
|
logger.info("Frontend disconnected gracefully") |
|
|
|
|
|
asyncio.create_task(_disconnect_sequence()) |
|
|
|
|
|
@session.on("agent_speech_interrupted") |
|
|
def _on_agent_speech_interrupted(ev: Any = None): |
|
|
"""Handle case where agent summary/goodbye is interrupted by noise/user.""" |
|
|
if assistant.should_disconnect: |
|
|
logger.info("Agent speech interrupted during disconnect phase. Triggering disconnect sequence.") |
|
|
|
|
|
_on_agent_speech_stopped(ev) |
|
|
|
|
|
@session.on("agent_speech_started") |
|
|
def _on_agent_speech_started(ev: Any = None): |
|
|
logger.info("Agent speech STARTED.") |
|
|
|
|
|
|
|
|
await session.start(room=ctx.room, agent=assistant) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if avatar_type == 'bey' and BEY_AVAILABLE: |
|
|
logger.info("Initializing Beyond Presence avatar...") |
|
|
|
|
|
|
|
|
|
|
|
async def send_init_signal(): |
|
|
for _ in range(5): |
|
|
try: |
|
|
await ctx.room.local_participant.publish_data( |
|
|
json.dumps({"type": "avatar_initializing"}), |
|
|
reliable=True |
|
|
) |
|
|
except: pass |
|
|
await asyncio.sleep(0.5) |
|
|
|
|
|
asyncio.create_task(send_init_signal()) |
|
|
|
|
|
try: |
|
|
bey_session = bey.AvatarSession( |
|
|
api_key=os.environ.get("BEYOND_PRESENCE_API_KEY"), |
|
|
avatar_id=os.environ.get("BEYOND_PRESENCE_AVATAR_ID", "b9be11b8-89fb-4227-8f86-4a881393cbdb"), |
|
|
) |
|
|
await bey_session.start(session, room=ctx.room) |
|
|
logger.info("Beyond Presence avatar started successfully (API level)") |
|
|
|
|
|
|
|
|
|
|
|
logger.info("Waiting for avatar participant to join room...") |
|
|
avatar_joined = False |
|
|
for _ in range(40): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
p = ctx.room.remote_participants.get("bey-avatar-agent") |
|
|
if p: |
|
|
|
|
|
video_tracks = [t for t in p.track_publications.values() if t.kind == rtc.TrackKind.KIND_VIDEO] |
|
|
if video_tracks: |
|
|
logger.info("✅ Avatar participant joined and video track found!") |
|
|
avatar_joined = True |
|
|
break |
|
|
await asyncio.sleep(1) |
|
|
|
|
|
if not avatar_joined: |
|
|
logger.warning("Timed out waiting for avatar participant to join - proceeding anyway") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to start Beyond Presence avatar: {e}") |
|
|
logger.info("Falling back to audio-only mode") |
|
|
|
|
|
|
|
|
hour = datetime.now(ZoneInfo("Asia/Kolkata")).hour |
|
|
if 5 <= hour < 12: |
|
|
greeting = "Good morning" |
|
|
elif 12 <= hour < 17: |
|
|
greeting = "Good afternoon" |
|
|
else: |
|
|
greeting = "Good evening" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if ctx.room.connection_state == rtc.ConnectionState.CONN_CONNECTED: |
|
|
try: |
|
|
logger.info(f"Speaking greeting: {greeting}...") |
|
|
|
|
|
await session.say( |
|
|
f"{greeting}, thank you for calling SkyTask Clinic. May I have your phone number?", |
|
|
allow_interruptions=True |
|
|
) |
|
|
except RuntimeError as e: |
|
|
logger.warning(f"Could not speak greeting - error: {e}") |
|
|
else: |
|
|
logger.warning("Session not running - skipping greeting (user may have disconnected)") |
|
|
|
|
|
|
|
|
try: |
|
|
payload = json.dumps({"type": "session_ready"}) |
|
|
await ctx.room.local_participant.publish_data(payload, reliable=True) |
|
|
logger.info("✅ Session ready signal sent to frontend") |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to send session_ready: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
while ctx.room.connection_state == "connected": |
|
|
await asyncio.sleep(1) |
|
|
|
|
|
contact_number = user_context.get("contact_number") |
|
|
if contact_number and not assistant.summary_generated: |
|
|
logger.info("Disconnect summary generation (backup)...") |
|
|
duration = (datetime.now() - start_time).total_seconds() |
|
|
user_name = user_context.get("user_name", "the patient") |
|
|
await generate_and_save_summary(db, assistant.chat_ctx, contact_number, duration, avatar_type, tts_provider, user_name) |
|
|
|
|
|
|
|
|
def start_health_check_server(): |
|
|
"""Starts a simple HTTP server for health checks.""" |
|
|
try: |
|
|
port = int(os.getenv("PORT", 8080)) |
|
|
|
|
|
class HealthCheckHandler(http.server.BaseHTTPRequestHandler): |
|
|
def do_GET(self): |
|
|
if self.path == "/health" or self.path == "/": |
|
|
self.send_response(200) |
|
|
self.send_header("Content-type", "application/json") |
|
|
self.end_headers() |
|
|
self.wfile.write(b'{"status": "healthy"}') |
|
|
else: |
|
|
self.send_response(404) |
|
|
self.end_headers() |
|
|
|
|
|
def log_message(self, format, *args): |
|
|
pass |
|
|
|
|
|
|
|
|
socketserver.TCPServer.allow_reuse_address = True |
|
|
|
|
|
httpd = socketserver.TCPServer(("", port), HealthCheckHandler) |
|
|
print(f"✅ Health check server listening on port {port}") |
|
|
|
|
|
|
|
|
thread = threading.Thread(target=httpd.serve_forever, daemon=True) |
|
|
thread.start() |
|
|
except Exception as e: |
|
|
print(f"⚠️ Failed to start health check server: {e}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
start_health_check_server() |
|
|
|
|
|
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|