# Voice Bot Service – TTS handled by browser speechSynthesis import requests import re from datetime import datetime, timedelta from django.apps import apps from django.http import JsonResponse, HttpResponse from django.views.decorators.csrf import csrf_exempt from django.views.decorators.http import require_http_methods from django.utils import timezone import json from voice_config import * from gemini_helper import gemini_ai def _get_voice_models(): """Lazy import of VoiceSession / VoiceMessage to avoid AppRegistryNotReady.""" VoiceSession = apps.get_model('store', 'VoiceSession') VoiceMessage = apps.get_model('store', 'VoiceMessage') return VoiceSession, VoiceMessage class GroqTTS: """Text-to-Speech using Groq Orpheus TTS API""" MAX_INPUT_LEN = 200 # Groq Orpheus hard limit per request def __init__(self): self.api_key = GROQ_API_KEY self.model = GROQ_TTS_MODEL self.voice = GROQ_TTS_VOICE self.base_url = "https://api.groq.com/openai/v1" # ── internal: single chunk (≤200 chars) ──────────────────────── def _tts_chunk(self, text): """Send a single ≤200-char chunk to Groq and return WAV bytes.""" url = f"{self.base_url}/audio/speech" headers = { "Authorization": f"Bearer {self.api_key.strip()}", "Content-Type": "application/json", } data = { "model": self.model, "input": text, "voice": self.voice, "response_format": "wav", } response = requests.post(url, json=data, headers=headers) response.raise_for_status() return response.content # ── split long text into ≤200-char pieces at sentence boundaries ── @staticmethod def _split_text(text, limit=200): """Break *text* into chunks of at most *limit* characters, splitting on sentence-ending punctuation when possible.""" chunks = [] while text: if len(text) <= limit: chunks.append(text) break # Try to split at the last sentence-end within the limit segment = text[:limit] split_pos = -1 for sep in ['. ', '! ', '? ', '.\'', '!\"', '?\"']: idx = segment.rfind(sep) if idx > split_pos: split_pos = idx + len(sep) if split_pos <= 0: # Fall back to last space split_pos = segment.rfind(' ') if split_pos <= 0: split_pos = limit # hard cut chunks.append(text[:split_pos].strip()) text = text[split_pos:].strip() return [c for c in chunks if c] # ── public API (handles any length) ──────────────────────────── def text_to_speech(self, text): """ Convert text to speech using Groq Orpheus TTS. Automatically chunks text >200 chars and concatenates WAV output. Returns audio data in WAV format. """ import struct, io chunks = self._split_text(text, self.MAX_INPUT_LEN) if not chunks: return None try: if len(chunks) == 1: return self._tts_chunk(chunks[0]) # Multiple chunks: concatenate raw PCM from each WAV pcm_parts = [] wav_params = None # (channels, sample_width, sample_rate) for chunk in chunks: wav_bytes = self._tts_chunk(chunk) buf = io.BytesIO(wav_bytes) # Parse minimal WAV header (44 bytes canonical) buf.read(4) # RIFF buf.read(4) # file size buf.read(4) # WAVE buf.read(4) # fmt fmt_size = struct.unpack(' keyword found -> classified: {request_type}") session['data']['request_type'] = request_type response_text = CONVERSATION_TEMPLATES['ask_order_id'] next_step = 'ask_order_id' else: print(f"[flow] greeting -> no intent keyword in '{user_text}', asking explicitly") response_text = CONVERSATION_TEMPLATES['ask_exchange_or_return'] next_step = 'ask_exchange_or_return' elif current_step == 'ask_exchange_or_return': question_asked = CONVERSATION_TEMPLATES['ask_exchange_or_return'] print(f"\n{'='*70}") print(f"🎤 PROCESSING STEP: ask_exchange_or_return") print(f"📝 User input: '{user_text}'") print(f"❓ Question asked: '{question_asked}'") print(f"{'='*70}") request_type = self._extract_request_type(user_text, question_asked) print(f"\n{'='*70}") print(f"🔍 EXTRACTION RESULT: '{request_type}'") print(f"🔍 Result type: {type(request_type)}") print(f"🔍 Result is truthy: {bool(request_type)}") print(f"{'='*70}\n") # request_type should NEVER be None now, but just in case if request_type and request_type in ['exchange', 'return']: session['data']['request_type'] = request_type print(f"✅ SUCCESS: Request type set to: {request_type}") response_text = CONVERSATION_TEMPLATES['ask_order_id'] next_step = 'ask_order_id' else: print(f"❌ UNEXPECTED: Got invalid request_type: {request_type}") print(f"⚠️ This should not happen - defaulting to exchange") session['data']['request_type'] = 'exchange' response_text = CONVERSATION_TEMPLATES['ask_order_id'] next_step = 'ask_order_id' elif current_step == 'ask_order_id': question_asked = CONVERSATION_TEMPLATES['ask_order_id'] print(f"\n🎤 PROCESSING STEP: ask_order_id") print(f"📝 User input: '{user_text}'") order_id = self._extract_order_id(user_text, question_asked) print(f"🔍 Extraction result: {order_id}") if order_id: session['data']['order_id'] = order_id print(f"✅ Order ID set to: {order_id}") request_type = session['data']['request_type'] response_text = CONVERSATION_TEMPLATES['ask_reason'].format(request_type=request_type) next_step = 'ask_reason' else: print(f"⚠️ Could not extract order ID from: '{user_text}'") response_text = "I couldn't find an order number. Please say something like 'order 123' or 'my order number is 456'." next_step = 'ask_order_id' elif current_step == 'ask_reason': # Clean up and standardize the reason using Gemini request_type = session['data']['request_type'] question_asked = CONVERSATION_TEMPLATES['ask_reason'].format(request_type=request_type) cleaned_reason = gemini_ai.extract_reason(user_text, request_type, question_asked) session['data']['reason'] = cleaned_reason print(f"🤖 Gemini cleaned reason: '{user_text}' -> '{cleaned_reason}'") if request_type == 'exchange': response_text = CONVERSATION_TEMPLATES['ask_exchange_preference'] next_step = 'ask_exchange_preference' else: response_text = self._generate_confirmation(session['data']) next_step = 'confirm_details' elif current_step == 'ask_exchange_preference': question_asked = CONVERSATION_TEMPLATES['ask_exchange_preference'] session['data']['exchange_preference'] = gemini_ai.extract_exchange_preference(user_text, question_asked) print(f"🤖 Gemini cleaned preference: '{user_text}' -> '{session['data']['exchange_preference']}'") response_text = self._generate_confirmation(session['data']) next_step = 'confirm_details' elif current_step == 'confirm_details': confirmation_question = self._generate_confirmation(session['data']) if self._is_confirmation(user_text, confirmation_question): result = self._process_request(session['data']) response_text = result['message'] next_step = 'completed' # Save to database if DATABASE_INTEGRATION: self._save_to_database(session['data'], result) else: # BUG FIX 2: User said no - ask what was wrong response_text = CONVERSATION_TEMPLATES['ask_what_wrong'] next_step = 'identify_correction' elif current_step == 'identify_correction': # User specified what they want to correct field_to_correct = gemini_ai.identify_correction_field(user_text, session['data']) print(f"🔧 User wants to correct: {field_to_correct}") # Redirect to the appropriate question to get NEW answer if field_to_correct == 'request_type': response_text = CONVERSATION_TEMPLATES['ask_exchange_or_return'] next_step = 'ask_exchange_or_return' elif field_to_correct == 'order_id': response_text = CONVERSATION_TEMPLATES['ask_order_id'] next_step = 'ask_order_id' elif field_to_correct == 'reason': request_type = session['data']['request_type'] response_text = CONVERSATION_TEMPLATES['ask_reason'].format(request_type=request_type) next_step = 'ask_reason' elif field_to_correct == 'exchange_preference': response_text = CONVERSATION_TEMPLATES['ask_exchange_preference'] next_step = 'ask_exchange_preference' else: # everything or unrecognized # Start over session['data'] = {} response_text = CONVERSATION_TEMPLATES['ask_exchange_or_return'] next_step = 'ask_exchange_or_return' else: response_text = CONVERSATION_TEMPLATES['greeting'] next_step = 'greeting' # Update session session['step'] = next_step return { 'text': response_text, 'next_step': next_step, 'session_data': session['data'], 'valid_answers': self._get_valid_answers(next_step), } def _extract_request_type(self, text, question_asked): """Extract whether user wants exchange or return using Gemini AI with question context""" result = gemini_ai.extract_request_type(text, question_asked) if result: print(f"🤖 Gemini understood '{text}' as: {result}") return result def _extract_order_id(self, text, question_asked): """Extract order ID from user input using Gemini AI with question context""" result = gemini_ai.extract_order_id(text, question_asked) if result: print(f"🤖 Gemini extracted order ID from '{text}': {result}") return result def _is_confirmation(self, text, question_asked): """Check if user is confirming using Gemini AI with question context""" result = gemini_ai.is_confirmation(text, question_asked) print(f"🤖 Gemini understood '{text}' as confirmation: {result}") return result def _get_valid_answers(self, step): """Return the set of valid / example answers for a given step. For ask_order_id we pull real order IDs from the database.""" if step == 'greeting' or step == 'ask_exchange_or_return': return ['exchange', 'return', 'I want to exchange', 'I want to return'] elif step == 'ask_order_id': try: Order = apps.get_model('store', 'Order') order_ids = list( Order.objects.order_by('-id').values_list('id', flat=True)[:20] ) return [str(oid) for oid in order_ids] if order_ids else ['No orders found'] except Exception as e: print(f'[valid_answers] DB error: {e}') return ['order 123', '12345'] elif step == 'ask_reason': return ['wrong size', 'defective product', "I don't like it", 'color mismatch', 'poor quality'] elif step == 'ask_exchange_preference': return ['size large', 'different color - blue', 'size medium', 'black color'] elif step == 'confirm_details': return ['yes', "yes that's correct", 'no', 'change the order number', 'wrong reason'] return [] def _generate_confirmation(self, data): """Generate confirmation message""" request_type = data['request_type'] order_id = data['order_id'] reason = data['reason'] extra_info = "" if request_type == 'exchange': extra_info = f"You want {data.get('exchange_preference', 'a different product')} instead. " return CONVERSATION_TEMPLATES['confirm_details'].format( request_type=request_type, order_id=order_id, reason=reason, extra_info=extra_info ) def _process_request(self, data): """Process the return/exchange request""" request_type = data['request_type'] if request_type == 'exchange': return self._process_exchange(data) else: return self._process_return(data) def _process_exchange(self, data): """Process exchange request""" tracking_number = f"EXG-{data['order_id']}-{datetime.now().strftime('%Y%m%d%H%M%S')}" pickup_date = (datetime.now() + timedelta(days=2)).strftime('%B %d, %Y') delivery_date = (datetime.now() + timedelta(days=EXCHANGE_DELIVERY_DAYS)).strftime('%B %d, %Y') message = CONVERSATION_TEMPLATES['processing'].format(request_type='exchange') message += " " + CONVERSATION_TEMPLATES['exchange_pickup_scheduled'].format(pickup_date=pickup_date) message += " " + CONVERSATION_TEMPLATES['exchange_delivery_scheduled'].format(delivery_date=delivery_date) message += " " + CONVERSATION_TEMPLATES['success'].format( request_type='exchange', tracking_number=tracking_number ) return { 'message': message, 'tracking_number': tracking_number, 'pickup_date': pickup_date, 'delivery_date': delivery_date } def _process_return(self, data): """Process return request""" tracking_number = f"RET-{data['order_id']}-{datetime.now().strftime('%Y%m%d%H%M%S')}" pickup_date = (datetime.now() + timedelta(days=2)).strftime('%B %d, %Y') message = CONVERSATION_TEMPLATES['processing'].format(request_type='return') message += " " + CONVERSATION_TEMPLATES['return_scheduled'].format(pickup_date=pickup_date) message += " " + CONVERSATION_TEMPLATES['success'].format( request_type='return', tracking_number=tracking_number ) return { 'message': message, 'tracking_number': tracking_number, 'pickup_date': pickup_date } def _save_to_database(self, data, result): """Save exchange/return data to database""" try: # Import your models dynamically Order = apps.get_model('shop', 'Order') OrderDetail = apps.get_model('shop', 'OrderDetail') order_id = data.get('order_id') # Find the order try: order = Order.objects.get(id=order_id) order_details = OrderDetail.objects.filter(order=order) # Update all order details for detail in order_details: detail.return_reason = data.get('reason', '') detail.return_status = 'Returned' detail.return_date = datetime.now() detail.days_to_return = DEFAULT_RETURN_DAYS if data['request_type'] == 'exchange': detail.is_exchanged = True detail.exchange_order = data.get('exchange_preference', '') detail.save() print(f"✅ Database updated for Order {order_id}") print(f" Reason: {data.get('reason')}") print(f" Type: {data['request_type']}") print(f" Tracking: {result.get('tracking_number')}") except Order.DoesNotExist: print(f"❌ Order {order_id} not found in database") except Exception as e: print(f"❌ Database error: {e}") # Global conversation flow instance conversation_flow = ConversationFlow() # Django Views @csrf_exempt @require_http_methods(["POST"]) def start_conversation(request): """Start a new conversation session""" import uuid session_id = str(uuid.uuid4()) # Generate greeting audio tts = ElevenLabsTTS() greeting_text = CONVERSATION_TEMPLATES['greeting'] # Create DB session record try: VoiceSession, _ = _get_voice_models() VoiceSession.objects.create(session_id=session_id) except Exception as e: print(f'[voice] DB session create error: {e}') return JsonResponse({ 'session_id': session_id, 'message': greeting_text, 'next_step': 'greeting', 'gemini_enabled': gemini_ai.enabled, 'valid_answers': ['exchange', 'return', 'I want to exchange', 'I want to return'], }) @csrf_exempt @require_http_methods(["POST"]) def process_voice(request): """Process user voice input and return audio response""" try: data = json.loads(request.body) session_id = data.get('session_id') user_text = data.get('text') current_step = data.get('current_step') # Process input # Log what goes into the LLM (and the current state) print(f"[voice] LLM_INPUT step={current_step} user='{user_text}'") result = conversation_flow.process_user_input(session_id, user_text, current_step) print( f"[voice] session={session_id} step={current_step} -> next={result['next_step']} " f"user='{user_text}' response='{result['text'][:200]}'" ) # Log LLM outputs / parsed session data for debugging intent extraction try: print(f"[voice] LLM_OUTPUT data={json.dumps(result.get('session_data', {}))}") except Exception: print(f"[voice] LLM_OUTPUT data={result.get('session_data')}") # ── Persist turn to database ────────────────────────────────── try: VoiceSession, VoiceMessage = _get_voice_models() vs = VoiceSession.objects.filter(session_id=session_id).first() if vs: turn_num = vs.messages.count() + 1 VoiceMessage.objects.create( session=vs, turn_number=turn_num, step=current_step, next_step=result['next_step'], user_text=user_text, bot_text=result['text'], extracted_value=str(result.get('session_data', {}).get( {'greeting': 'request_type', 'ask_exchange_or_return': 'request_type', 'ask_order_id': 'order_id', 'ask_reason': 'reason', 'ask_exchange_preference': 'exchange_preference', 'confirm_details': 'confirmed', }.get(current_step, ''), '')), session_data_snapshot=result.get('session_data', {}), ) # Update session-level fields from collected data sd = result.get('session_data', {}) vs.request_type = sd.get('request_type', vs.request_type) vs.order_id = sd.get('order_id', vs.order_id) vs.reason = sd.get('reason', vs.reason) vs.exchange_preference = sd.get('exchange_preference', vs.exchange_preference) vs.session_data_json = sd if result['next_step'] == 'completed': vs.status = 'completed' vs.ended_at = timezone.now() vs.tracking_number = sd.get('tracking_number', '') vs.save() except Exception as e: print(f'[voice] DB log error: {e}') # ────────────────────────────────────────────────────────────── # Return JSON – TTS is handled in the browser via speechSynthesis return JsonResponse({ 'text': result['text'], 'next_step': result['next_step'], 'session_data': result['session_data'], 'valid_answers': result.get('valid_answers', []), }) except Exception as e: return JsonResponse({'error': str(e)}, status=500) @csrf_exempt @require_http_methods(["POST"]) def get_speech(request): """Return the text as JSON – browser handles TTS via speechSynthesis""" try: data = json.loads(request.body) text = data.get('text', '') return JsonResponse({'text': text}) except Exception as e: return JsonResponse({'error': str(e)}, status=500)