Spaces:
Paused
Paused
rudyByte
fix: inject final critical overrides at absolute bottom of system prompt to enforce behavior rules over database configs
8a5e277 | import os | |
| import re | |
| import asyncio | |
| import pytz | |
| from datetime import datetime | |
| from groq import AsyncGroq | |
| import google.generativeai as genai | |
| IST = pytz.timezone("Asia/Kolkata") | |
| MAYA_BEHAVIOR_PROMPT = """ | |
| You are Maya, a warm, extremely polite, highly intelligent human-like female voice assistant for CallSaathi. | |
| You are having a natural phone conversation with a customer for an Indian business. | |
| CRITICAL VOICE & CONVERSATION RULES: | |
| 1. SOUND HUMAN: Speak like a helpful, sweet, professional Indian female receptionist (Maya). Avoid generic AI greetings or chatbot-like responses. | |
| 2. HANDLING GREETINGS & SIMPLE 'HELLO': | |
| - If the user simply says 'Hello', 'Hi', 'Namaste', 'Haan', or just repeats your greeting back, DO NOT jump to appointment booking or ask clinical questions (like 'do you have pain?'). | |
| - Instead, reply with a warm, natural open invitation: 'Haan ji, batayein, main aapki kya madad kar sakti hoon?' (Yes please tell me, how can I help you?) or 'Ji batayein, kya sahayata karu aapki?'. | |
| - If you already asked how to help and they say 'hello' again, vary your response: 'Haan ji, main sun rahi hoon, batayein.' (Yes, I am listening, please tell me). | |
| 3. NATURAL PACE & FLOW: | |
| - Keep responses extremely short: 1-2 sentences maximum (max 15-20 words). Phone calls require fast, punchy turn-taking. | |
| - Do NOT rush the user to book. Wait until they express interest in an appointment, checkup, or a dental problem before launching the booking process. | |
| - Ask only ONE question at a time to prevent overwhelming the caller. | |
| 4. TONE & EMPATHY: | |
| - Be extremely polite and sweet. | |
| - If they describe pain or emergency, respond with instant empathy: 'Oh, bahut dukh hua sunkar. Main abhi aapka appointment check karti hoon.' (Oh, so sorry to hear that. Let me check the appointments right away.) | |
| 5. ROBUSTNESS: | |
| - Never say 'As an AI', 'language model', or 'I don't have access'. | |
| - Never use markdown, asterisks (*), lists, or bullet points in your text. | |
| - If the transcript is garbled or makes no sense, say: 'Maaf kijiye, mujhe aapki awaaz saaf nahi aayi. Kya aap phir se bolenge?' | |
| """ | |
| class LLMManager: | |
| def __init__(self): | |
| # 1. Initialize Gemini (Primary) | |
| google_key = os.getenv("GOOGLE_AI_API_KEY") | |
| if google_key: | |
| genai.configure(api_key=google_key) | |
| self.gemini = genai.GenerativeModel('gemini-1.5-flash') | |
| print("LLM Manager: Gemini 1.5 Flash initialized (Primary)") | |
| else: | |
| print("WARNING: GOOGLE_AI_API_KEY is not set. LLM Gemini will fail.") | |
| self.gemini = None | |
| # 2. Initialize Groq (Fallback - Wrapped to prevent startup crash) | |
| groq_key = os.getenv("GROQ_API_KEY") | |
| try: | |
| self.groq = AsyncGroq(api_key=groq_key or "dummy_key_to_prevent_crash") | |
| print("LLM Manager: Groq initialized (Secondary/Fallback)") | |
| except Exception as e: | |
| print(f"WARNING: Groq failed to initialize ({e}). Falling back to Gemini only.") | |
| self.groq = None | |
| # Phase E3: Tool Definitions | |
| self.tools = [ | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "check_availability", | |
| "description": "Check if a specific date and time is available for an appointment.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "date": {"type": "string", "description": "YYYY-MM-DD"}, | |
| "time": {"type": "string", "description": "HH:MM (24hr)"}, | |
| }, | |
| "required": ["date", "time"], | |
| }, | |
| }, | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "book_appointment", | |
| "description": "Create a new appointment on the calendar.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "name": {"type": "string", "description": "Caller's name"}, | |
| "phone": {"type": "string", "description": "Caller's phone number"}, | |
| "date": {"type": "string", "description": "YYYY-MM-DD"}, | |
| "time": {"type": "string", "description": "HH:MM (24hr)"}, | |
| "service": {"type": "string", "description": "Type of service (e.g. Checkup, Scaling)"}, | |
| }, | |
| "required": ["name", "phone", "date", "time"], | |
| }, | |
| }, | |
| } | |
| ] | |
| print("LLM Manager initialized with Groq tools + Gemini fallback.") | |
| async def respond(self, messages: list, tenant_config: dict, current_language: str, extra_instructions: str = "") -> str: | |
| """Non-streaming response — used for fallback only.""" | |
| system_prompt = self._build_system_prompt(tenant_config, current_language, extra_instructions) | |
| try: | |
| response = await asyncio.wait_for( | |
| self._groq_respond(system_prompt, messages), | |
| timeout=8.0 | |
| ) | |
| return response | |
| except Exception as e: | |
| print(f"Groq failed ({e}), falling back to Gemini") | |
| return await self._gemini_respond(system_prompt, messages) | |
| async def respond_streaming(self, messages: list, tenant_config: dict, current_language: str, extra_instructions: str = ""): | |
| """ | |
| Async generator that yields complete SENTENCES one at a time as Groq streams tokens. | |
| This allows TTS to start synthesizing the first sentence while the LLM generates the rest. | |
| Latency to first audio: ~500-800ms (vs 3-4s without streaming). | |
| """ | |
| system_prompt = self._build_system_prompt(tenant_config, current_language, extra_instructions) | |
| # Safer sentence boundary: split on .!? but keep them with the sentence | |
| sentence_end_re = re.compile(r'(?<=[.!?।])\s+') | |
| token_buffer = "" | |
| full_response = "" | |
| # Filter out empty/null-content messages and tool-result messages from history | |
| # to prevent Groq API errors with malformed message sequences | |
| clean_messages = [] | |
| for m in messages: | |
| # Skip messages with empty content unless they are tool-call messages (content=None allowed) | |
| content = m.get("content") | |
| if content is None and "tool_calls" not in m and m.get("role") != "tool": | |
| continue | |
| if isinstance(content, str) and not content.strip(): | |
| continue | |
| clean_messages.append(m) | |
| try: | |
| # Phase E3: Include current date/time in system prompt for relative time parsing | |
| now_ist = datetime.now(IST) | |
| today = now_ist.strftime("%A, %Y-%m-%d") | |
| current_time = now_ist.strftime("%H:%M") | |
| time_context = f"\nToday is {today}, and the current time is {current_time}. Use this for relative dates like 'tomorrow', 'next Tuesday', or 'later today'." | |
| if not self.groq: | |
| # If Groq is disabled due to library crash, skip to Gemini fallback | |
| raise ValueError("Groq disabled") | |
| stream = await asyncio.wait_for( | |
| self.groq.chat.completions.create( | |
| model="llama-3.1-8b-instant", | |
| messages=[{"role": "system", "content": system_prompt + time_context}] + clean_messages, | |
| max_tokens=120, # 1-2 short sentences is enough for a phone call | |
| temperature=0.3, | |
| stop=["\n\n", "User:", "Human:", "System:"], # Prevent runaway generation | |
| tools=self.tools, | |
| tool_choice="auto", | |
| stream=True, | |
| ), | |
| timeout=15.0 | |
| ) | |
| # Accumulate tool call chunks — Groq streams them across multiple chunks | |
| # We MUST assemble them into one complete object before yielding | |
| accumulated_tool_calls: dict = {} # index -> {id, name, arguments} | |
| async for chunk in stream: | |
| if not chunk.choices: | |
| continue | |
| delta = chunk.choices[0].delta | |
| # --- TOOL CALL ACCUMULATION --- | |
| if delta.tool_calls: | |
| for tc_delta in delta.tool_calls: | |
| idx = tc_delta.index | |
| if idx not in accumulated_tool_calls: | |
| accumulated_tool_calls[idx] = { | |
| "id": tc_delta.id or "", | |
| "type": "function", | |
| "function": {"name": "", "arguments": ""} | |
| } | |
| if tc_delta.id: | |
| accumulated_tool_calls[idx]["id"] = tc_delta.id | |
| if tc_delta.function: | |
| if tc_delta.function.name: | |
| accumulated_tool_calls[idx]["function"]["name"] += tc_delta.function.name | |
| if tc_delta.function.arguments: | |
| accumulated_tool_calls[idx]["function"]["arguments"] += tc_delta.function.arguments | |
| continue # Don't process text tokens when accumulating tool calls | |
| # --- TEXT STREAMING --- | |
| token = delta.content or "" | |
| if not token: | |
| continue | |
| token_buffer += token | |
| full_response += token | |
| # Split on sentence boundaries, keep the last incomplete part | |
| parts = sentence_end_re.split(token_buffer) | |
| if len(parts) > 1: | |
| for sentence in parts[:-1]: | |
| sentence = sentence.strip() | |
| if sentence: | |
| yield sentence | |
| token_buffer = parts[-1] | |
| # After stream ends: yield completed tool calls if any were accumulated | |
| if accumulated_tool_calls: | |
| # Convert to list of simple namespace-like objects that app.py expects | |
| class _TC: | |
| def __init__(self, d): | |
| self.id = d["id"] | |
| self.type = d["type"] | |
| class _Fn: | |
| def __init__(self, f): | |
| self.name = f["name"] | |
| self.arguments = f["arguments"] | |
| self.function = _Fn(d["function"]) | |
| yield {"tool_calls": [_TC(d) for d in accumulated_tool_calls.values()]} | |
| return | |
| # Flush any remaining text after stream ends | |
| remainder = token_buffer.strip() | |
| if remainder: | |
| yield remainder | |
| except Exception as e: | |
| print(f"LLM: Streaming error: {e}. Falling back to Gemini.") | |
| try: | |
| if self.gemini: | |
| full = await self._gemini_respond(system_prompt, clean_messages) | |
| yield full | |
| else: | |
| raise ValueError("Gemini not configured") | |
| except Exception as e2: | |
| print(f"LLM: Fallback failed: {e2}. Using static fallback.") | |
| # LAST RESORT: Localized static fallback (Zero-latency, no API needed) | |
| fallbacks = { | |
| 'hi': "Maaf kijiye, mujhe thodi dikkat ho rahi hai. Kya aap phir se bol sakte hain?", | |
| 'en': "I'm sorry, I'm having a bit of trouble connecting. Could you please repeat that?", | |
| 'gu': "Maaf karsho, mane thodi taklif thai rahi che. Shu tame fari bolsho?" | |
| } | |
| yield fallbacks.get(current_language[:2].lower(), fallbacks['en']) | |
| async def _groq_respond(self, system_prompt: str, messages: list) -> str: | |
| response = await self.groq.chat.completions.create( | |
| model="llama-3.1-8b-instant", | |
| messages=[{"role": "system", "content": system_prompt}] + messages, | |
| max_tokens=120, | |
| temperature=0.3, | |
| stop=["\n\n", "User:", "Human:"], | |
| ) | |
| return response.choices[0].message.content | |
| async def _safe_gemini_generate(self, prompt: str, generation_config: dict = None) -> str: | |
| """Robust helper to try multiple Gemini model names to handle library version and 404/v1beta mismatches.""" | |
| models_to_try = [ | |
| "gemini-2.5-flash", | |
| "gemini-1.5-flash", | |
| "gemini-1.5-flash-latest", | |
| "gemini-2.5-flash-latest", | |
| "gemini-pro" | |
| ] | |
| last_err = None | |
| for model_name in models_to_try: | |
| try: | |
| model = genai.GenerativeModel(model_name) | |
| if generation_config: | |
| res = await model.generate_content_async(prompt, generation_config=generation_config) | |
| else: | |
| res = await model.generate_content_async(prompt) | |
| return res.text.strip() | |
| except Exception as e: | |
| print(f"[Gemini-Safe] Model {model_name} failed: {e}") | |
| last_err = e | |
| continue | |
| raise last_err or RuntimeError("All Gemini models failed to generate content") | |
| async def _gemini_respond(self, system_prompt: str, messages: list) -> str: | |
| # Construct proper history for Gemini | |
| history = [f"System: {system_prompt}"] | |
| for m in messages: | |
| role = "User" if m["role"] == "user" else "Assistant" | |
| history.append(f"{role}: {m['content']}") | |
| prompt = "\n".join(history) + "\nAssistant:" | |
| try: | |
| return await self._safe_gemini_generate( | |
| prompt, | |
| generation_config={"temperature": 0.3, "max_output_tokens": 250} | |
| ) | |
| except Exception as e: | |
| print(f"[Gemini-Safe] Direct generate content failed: {e}") | |
| raise | |
| def _build_system_prompt(self, config: dict, current_language: str, extra_instructions: str = "") -> str: | |
| agent_configs = config.get('agent_configs', {}) | |
| lang_map = {'en': 'English', 'hi': 'Hindi', 'gu': 'Gujarati'} | |
| lang_code = current_language[:2].lower() | |
| base_prompt = agent_configs.get( | |
| f'system_prompt_{lang_code}', | |
| 'You are a helpful AI assistant. Keep responses very short and conversational.' | |
| ) | |
| full_lang = lang_map.get(lang_code, current_language) | |
| # Phase E4: Name Greeting Instruction | |
| name_instruction = "" | |
| if "CALLER NAME:" in extra_instructions: | |
| name_instruction = "\nIMPORTANT: You know the caller's name. GREET THEM BY NAME if it fits naturally in this turn." | |
| # CRITICAL Safeguard: Avoid jumping straight into booking from previous call memory | |
| memory_safeguard = "" | |
| if "CALLER MEMORY" in extra_instructions: | |
| memory_safeguard = ( | |
| "\nCRITICAL RULE FOR CALL START / MEMORY PERSISTENCE:\n" | |
| "- This is a BRAND NEW call. Even if the [CALLER MEMORY] shows 'LAST CALL SUMMARY' or an incomplete booking, DO NOT jump straight into booking or continue that topic in your first sentence.\n" | |
| "- You MUST start with a generic polite greeting, greet them by name, and ask how you can help them TODAY (e.g. 'How can I assist you today?').\n" | |
| "- Only bring up or continue the previous appointment booking if the caller explicitly mentions it first." | |
| ) | |
| from src.gujarati_processor import build_gujarati_system_prompt_addon, build_hindi_system_prompt_addon | |
| addon = "" | |
| if current_language == "gujarati": | |
| addon = "\n\n" + build_gujarati_system_prompt_addon() | |
| elif current_language == "hindi": | |
| addon = "\n\n" + build_hindi_system_prompt_addon() | |
| critical_overrides = ( | |
| "\n\nFINAL CRITICAL OVERRIDES (MUST OBEY OVER ANY OTHER RULES):\n" | |
| "- Even if the business-specific instructions focus on booking appointments, DO NOT bring up booking slots or ask appointment-booking questions in response to simple greetings (like 'Hello', 'Hi', 'Haan', 'Ji', 'Namaste').\n" | |
| "- You must wait until the user explicitly mentions booking an appointment, checking slots, doctor consultation, or experiencing a dental issue before you initiate any appointment booking script." | |
| ) | |
| return ( | |
| f"{MAYA_BEHAVIOR_PROMPT}\n" | |
| f"Business-specific instructions:\n{base_prompt}\n" | |
| f"IMPORTANT RULE: The user is speaking {full_lang}. " | |
| f"You MUST reply entirely in {full_lang}. Do not mix languages. " | |
| f"Keep responses concise — 1-2 sentences max for a phone call." | |
| f"{extra_instructions}" | |
| f"{name_instruction}" | |
| f"{memory_safeguard}" | |
| f"{addon}" | |
| f"{critical_overrides}" | |
| ) | |
| async def generate_summary(self, conversation_history: list, language: str) -> str: | |
| prompt = ( | |
| f"As Maya, the expert AI Medical Receptionist, provide a PROFESSIONAL, HIGHLY ORGANIZED summary of this conversation in {language}.\n" | |
| "The summary must be structured for a DOCTOR or CLINIC OWNER to read quickly.\n\n" | |
| "Format exactly like this (use Markdown):\n" | |
| "### 📋 Call Summary\n" | |
| "- **Patient Name**: [Name / Unknown]\n" | |
| "- **Contact Number**: [Phone / N/A]\n" | |
| "- **Primary Purpose**: [Brief reason: e.g. Tooth Pain, Routine Scaling, Inquiry]\n" | |
| "- **Booking Status**: [✅ Booked: YYYY-MM-DD at HH:MM / ❌ Not Booked]\n" | |
| "- **Clinical Notes**: [Important details: e.g. Pain in left molar, sensitive to cold, first-time patient]\n" | |
| "- **Maya's Action**: [e.g. Confirmed appointment, Sent WhatsApp alert, Advised patient to wait for call]\n" | |
| "- **Patient Mood**: [e.g. Calm / Anxious / Urgent]\n\n" | |
| "Conversation Transcript:\n" | |
| ) | |
| prompt += "\n".join([f"{m['role']}: {m.get('content') or '(Tool Call)'}" for m in conversation_history]) | |
| try: | |
| # Use 70b for professional summarization quality | |
| response = await self.groq.chat.completions.create( | |
| model="llama-3.1-70b-versatile", | |
| messages=[{"role": "user", "content": prompt}], | |
| max_tokens=400, | |
| temperature=0.3 | |
| ) | |
| return response.choices[0].message.content.strip() | |
| except Exception: | |
| # Fallback to Gemini with safe helper to handle any API/model mismatch errors | |
| try: | |
| return await self._safe_gemini_generate(prompt) | |
| except Exception as gemini_err: | |
| print(f"[Gemini-Safe] Summary fallback also failed: {gemini_err}") | |
| return f"Call ended. Turns: {len(conversation_history)}" | |