import os import re import asyncio import pytz from datetime import datetime from groq import AsyncGroq import google.generativeai as genai IST = pytz.timezone("Asia/Kolkata") MAYA_BEHAVIOR_PROMPT = """ You are Maya, a warm, extremely polite, highly intelligent human-like female voice assistant for CallSaathi. You are having a natural phone conversation with a customer for an Indian business. CRITICAL VOICE & CONVERSATION RULES: 1. SOUND HUMAN: Speak like a helpful, sweet, professional Indian female receptionist (Maya). Avoid generic AI greetings or chatbot-like responses. 2. HANDLING GREETINGS & SIMPLE 'HELLO': - If the user simply says 'Hello', 'Hi', 'Namaste', 'Haan', or just repeats your greeting back, DO NOT jump to appointment booking or ask clinical questions (like 'do you have pain?'). - Instead, reply with a warm, natural open invitation: 'Haan ji, batayein, main aapki kya madad kar sakti hoon?' (Yes please tell me, how can I help you?) or 'Ji batayein, kya sahayata karu aapki?'. - If you already asked how to help and they say 'hello' again, vary your response: 'Haan ji, main sun rahi hoon, batayein.' (Yes, I am listening, please tell me). 3. NATURAL PACE & FLOW: - Keep responses extremely short: 1-2 sentences maximum (max 15-20 words). Phone calls require fast, punchy turn-taking. - Do NOT rush the user to book. Wait until they express interest in an appointment, checkup, or a dental problem before launching the booking process. - Ask only ONE question at a time to prevent overwhelming the caller. 4. TONE & EMPATHY: - Be extremely polite and sweet. - If they describe pain or emergency, respond with instant empathy: 'Oh, bahut dukh hua sunkar. Main abhi aapka appointment check karti hoon.' (Oh, so sorry to hear that. Let me check the appointments right away.) 5. ROBUSTNESS: - Never say 'As an AI', 'language model', or 'I don't have access'. - Never use markdown, asterisks (*), lists, or bullet points in your text. - If the transcript is garbled or makes no sense, say: 'Maaf kijiye, mujhe aapki awaaz saaf nahi aayi. Kya aap phir se bolenge?' """ class LLMManager: def __init__(self): # 1. Initialize Gemini (Primary) google_key = os.getenv("GOOGLE_AI_API_KEY") if google_key: genai.configure(api_key=google_key) self.gemini = genai.GenerativeModel('gemini-1.5-flash') print("LLM Manager: Gemini 1.5 Flash initialized (Primary)") else: print("WARNING: GOOGLE_AI_API_KEY is not set. LLM Gemini will fail.") self.gemini = None # 2. Initialize Groq (Fallback - Wrapped to prevent startup crash) groq_key = os.getenv("GROQ_API_KEY") try: self.groq = AsyncGroq(api_key=groq_key or "dummy_key_to_prevent_crash") print("LLM Manager: Groq initialized (Secondary/Fallback)") except Exception as e: print(f"WARNING: Groq failed to initialize ({e}). Falling back to Gemini only.") self.groq = None # Phase E3: Tool Definitions self.tools = [ { "type": "function", "function": { "name": "check_availability", "description": "Check if a specific date and time is available for an appointment.", "parameters": { "type": "object", "properties": { "date": {"type": "string", "description": "YYYY-MM-DD"}, "time": {"type": "string", "description": "HH:MM (24hr)"}, }, "required": ["date", "time"], }, }, }, { "type": "function", "function": { "name": "book_appointment", "description": "Create a new appointment on the calendar.", "parameters": { "type": "object", "properties": { "name": {"type": "string", "description": "Caller's name"}, "phone": {"type": "string", "description": "Caller's phone number"}, "date": {"type": "string", "description": "YYYY-MM-DD"}, "time": {"type": "string", "description": "HH:MM (24hr)"}, "service": {"type": "string", "description": "Type of service (e.g. Checkup, Scaling)"}, }, "required": ["name", "phone", "date", "time"], }, }, } ] print("LLM Manager initialized with Groq tools + Gemini fallback.") async def respond(self, messages: list, tenant_config: dict, current_language: str, extra_instructions: str = "") -> str: """Non-streaming response — used for fallback only.""" system_prompt = self._build_system_prompt(tenant_config, current_language, extra_instructions) try: response = await asyncio.wait_for( self._groq_respond(system_prompt, messages), timeout=8.0 ) return response except Exception as e: print(f"Groq failed ({e}), falling back to Gemini") return await self._gemini_respond(system_prompt, messages) async def respond_streaming(self, messages: list, tenant_config: dict, current_language: str, extra_instructions: str = ""): """ Async generator that yields complete SENTENCES one at a time as Groq streams tokens. This allows TTS to start synthesizing the first sentence while the LLM generates the rest. Latency to first audio: ~500-800ms (vs 3-4s without streaming). """ system_prompt = self._build_system_prompt(tenant_config, current_language, extra_instructions) # Safer sentence boundary: split on .!? but keep them with the sentence sentence_end_re = re.compile(r'(?<=[.!?।])\s+') token_buffer = "" full_response = "" # Filter out empty/null-content messages and tool-result messages from history # to prevent Groq API errors with malformed message sequences clean_messages = [] for m in messages: # Skip messages with empty content unless they are tool-call messages (content=None allowed) content = m.get("content") if content is None and "tool_calls" not in m and m.get("role") != "tool": continue if isinstance(content, str) and not content.strip(): continue clean_messages.append(m) try: # Phase E3: Include current date/time in system prompt for relative time parsing now_ist = datetime.now(IST) today = now_ist.strftime("%A, %Y-%m-%d") current_time = now_ist.strftime("%H:%M") time_context = f"\nToday is {today}, and the current time is {current_time}. Use this for relative dates like 'tomorrow', 'next Tuesday', or 'later today'." if not self.groq: # If Groq is disabled due to library crash, skip to Gemini fallback raise ValueError("Groq disabled") stream = await asyncio.wait_for( self.groq.chat.completions.create( model="llama-3.1-8b-instant", messages=[{"role": "system", "content": system_prompt + time_context}] + clean_messages, max_tokens=120, # 1-2 short sentences is enough for a phone call temperature=0.3, stop=["\n\n", "User:", "Human:", "System:"], # Prevent runaway generation tools=self.tools, tool_choice="auto", stream=True, ), timeout=15.0 ) # Accumulate tool call chunks — Groq streams them across multiple chunks # We MUST assemble them into one complete object before yielding accumulated_tool_calls: dict = {} # index -> {id, name, arguments} async for chunk in stream: if not chunk.choices: continue delta = chunk.choices[0].delta # --- TOOL CALL ACCUMULATION --- if delta.tool_calls: for tc_delta in delta.tool_calls: idx = tc_delta.index if idx not in accumulated_tool_calls: accumulated_tool_calls[idx] = { "id": tc_delta.id or "", "type": "function", "function": {"name": "", "arguments": ""} } if tc_delta.id: accumulated_tool_calls[idx]["id"] = tc_delta.id if tc_delta.function: if tc_delta.function.name: accumulated_tool_calls[idx]["function"]["name"] += tc_delta.function.name if tc_delta.function.arguments: accumulated_tool_calls[idx]["function"]["arguments"] += tc_delta.function.arguments continue # Don't process text tokens when accumulating tool calls # --- TEXT STREAMING --- token = delta.content or "" if not token: continue token_buffer += token full_response += token # Split on sentence boundaries, keep the last incomplete part parts = sentence_end_re.split(token_buffer) if len(parts) > 1: for sentence in parts[:-1]: sentence = sentence.strip() if sentence: yield sentence token_buffer = parts[-1] # After stream ends: yield completed tool calls if any were accumulated if accumulated_tool_calls: # Convert to list of simple namespace-like objects that app.py expects class _TC: def __init__(self, d): self.id = d["id"] self.type = d["type"] class _Fn: def __init__(self, f): self.name = f["name"] self.arguments = f["arguments"] self.function = _Fn(d["function"]) yield {"tool_calls": [_TC(d) for d in accumulated_tool_calls.values()]} return # Flush any remaining text after stream ends remainder = token_buffer.strip() if remainder: yield remainder except Exception as e: print(f"LLM: Streaming error: {e}. Falling back to Gemini.") try: if self.gemini: full = await self._gemini_respond(system_prompt, clean_messages) yield full else: raise ValueError("Gemini not configured") except Exception as e2: print(f"LLM: Fallback failed: {e2}. Using static fallback.") # LAST RESORT: Localized static fallback (Zero-latency, no API needed) fallbacks = { 'hi': "Maaf kijiye, mujhe thodi dikkat ho rahi hai. Kya aap phir se bol sakte hain?", 'en': "I'm sorry, I'm having a bit of trouble connecting. Could you please repeat that?", 'gu': "Maaf karsho, mane thodi taklif thai rahi che. Shu tame fari bolsho?" } yield fallbacks.get(current_language[:2].lower(), fallbacks['en']) async def _groq_respond(self, system_prompt: str, messages: list) -> str: response = await self.groq.chat.completions.create( model="llama-3.1-8b-instant", messages=[{"role": "system", "content": system_prompt}] + messages, max_tokens=120, temperature=0.3, stop=["\n\n", "User:", "Human:"], ) return response.choices[0].message.content async def _safe_gemini_generate(self, prompt: str, generation_config: dict = None) -> str: """Robust helper to try multiple Gemini model names to handle library version and 404/v1beta mismatches.""" models_to_try = [ "gemini-2.5-flash", "gemini-1.5-flash", "gemini-1.5-flash-latest", "gemini-2.5-flash-latest", "gemini-pro" ] last_err = None for model_name in models_to_try: try: model = genai.GenerativeModel(model_name) if generation_config: res = await model.generate_content_async(prompt, generation_config=generation_config) else: res = await model.generate_content_async(prompt) return res.text.strip() except Exception as e: print(f"[Gemini-Safe] Model {model_name} failed: {e}") last_err = e continue raise last_err or RuntimeError("All Gemini models failed to generate content") async def _gemini_respond(self, system_prompt: str, messages: list) -> str: # Construct proper history for Gemini history = [f"System: {system_prompt}"] for m in messages: role = "User" if m["role"] == "user" else "Assistant" history.append(f"{role}: {m['content']}") prompt = "\n".join(history) + "\nAssistant:" try: return await self._safe_gemini_generate( prompt, generation_config={"temperature": 0.3, "max_output_tokens": 250} ) except Exception as e: print(f"[Gemini-Safe] Direct generate content failed: {e}") raise def _build_system_prompt(self, config: dict, current_language: str, extra_instructions: str = "") -> str: agent_configs = config.get('agent_configs', {}) lang_map = {'en': 'English', 'hi': 'Hindi', 'gu': 'Gujarati'} lang_code = current_language[:2].lower() base_prompt = agent_configs.get( f'system_prompt_{lang_code}', 'You are a helpful AI assistant. Keep responses very short and conversational.' ) full_lang = lang_map.get(lang_code, current_language) # Phase E4: Name Greeting Instruction name_instruction = "" if "CALLER NAME:" in extra_instructions: name_instruction = "\nIMPORTANT: You know the caller's name. GREET THEM BY NAME if it fits naturally in this turn." # CRITICAL Safeguard: Avoid jumping straight into booking from previous call memory memory_safeguard = "" if "CALLER MEMORY" in extra_instructions: memory_safeguard = ( "\nCRITICAL RULE FOR CALL START / MEMORY PERSISTENCE:\n" "- This is a BRAND NEW call. Even if the [CALLER MEMORY] shows 'LAST CALL SUMMARY' or an incomplete booking, DO NOT jump straight into booking or continue that topic in your first sentence.\n" "- You MUST start with a generic polite greeting, greet them by name, and ask how you can help them TODAY (e.g. 'How can I assist you today?').\n" "- Only bring up or continue the previous appointment booking if the caller explicitly mentions it first." ) from src.gujarati_processor import build_gujarati_system_prompt_addon, build_hindi_system_prompt_addon addon = "" if current_language == "gujarati": addon = "\n\n" + build_gujarati_system_prompt_addon() elif current_language == "hindi": addon = "\n\n" + build_hindi_system_prompt_addon() critical_overrides = ( "\n\nFINAL CRITICAL OVERRIDES (MUST OBEY OVER ANY OTHER RULES):\n" "- Even if the business-specific instructions focus on booking appointments, DO NOT bring up booking slots or ask appointment-booking questions in response to simple greetings (like 'Hello', 'Hi', 'Haan', 'Ji', 'Namaste').\n" "- You must wait until the user explicitly mentions booking an appointment, checking slots, doctor consultation, or experiencing a dental issue before you initiate any appointment booking script." ) return ( f"{MAYA_BEHAVIOR_PROMPT}\n" f"Business-specific instructions:\n{base_prompt}\n" f"IMPORTANT RULE: The user is speaking {full_lang}. " f"You MUST reply entirely in {full_lang}. Do not mix languages. " f"Keep responses concise — 1-2 sentences max for a phone call." f"{extra_instructions}" f"{name_instruction}" f"{memory_safeguard}" f"{addon}" f"{critical_overrides}" ) async def generate_summary(self, conversation_history: list, language: str) -> str: prompt = ( f"As Maya, the expert AI Medical Receptionist, provide a PROFESSIONAL, HIGHLY ORGANIZED summary of this conversation in {language}.\n" "The summary must be structured for a DOCTOR or CLINIC OWNER to read quickly.\n\n" "Format exactly like this (use Markdown):\n" "### 📋 Call Summary\n" "- **Patient Name**: [Name / Unknown]\n" "- **Contact Number**: [Phone / N/A]\n" "- **Primary Purpose**: [Brief reason: e.g. Tooth Pain, Routine Scaling, Inquiry]\n" "- **Booking Status**: [✅ Booked: YYYY-MM-DD at HH:MM / ❌ Not Booked]\n" "- **Clinical Notes**: [Important details: e.g. Pain in left molar, sensitive to cold, first-time patient]\n" "- **Maya's Action**: [e.g. Confirmed appointment, Sent WhatsApp alert, Advised patient to wait for call]\n" "- **Patient Mood**: [e.g. Calm / Anxious / Urgent]\n\n" "Conversation Transcript:\n" ) prompt += "\n".join([f"{m['role']}: {m.get('content') or '(Tool Call)'}" for m in conversation_history]) try: # Use 70b for professional summarization quality response = await self.groq.chat.completions.create( model="llama-3.1-70b-versatile", messages=[{"role": "user", "content": prompt}], max_tokens=400, temperature=0.3 ) return response.choices[0].message.content.strip() except Exception: # Fallback to Gemini with safe helper to handle any API/model mismatch errors try: return await self._safe_gemini_generate(prompt) except Exception as gemini_err: print(f"[Gemini-Safe] Summary fallback also failed: {gemini_err}") return f"Call ended. Turns: {len(conversation_history)}"