Spaces:
Sleeping
Sleeping
| # Voice Bot Service β TTS handled by browser speechSynthesis | |
| import requests | |
| import re | |
| from datetime import datetime, timedelta | |
| from django.apps import apps | |
| from django.http import JsonResponse, HttpResponse | |
| from django.views.decorators.csrf import csrf_exempt | |
| from django.views.decorators.http import require_http_methods | |
| from django.utils import timezone | |
| import json | |
| from voice_config import * | |
| from gemini_helper import gemini_ai | |
| def _get_voice_models(): | |
| """Lazy import of VoiceSession / VoiceMessage to avoid AppRegistryNotReady.""" | |
| VoiceSession = apps.get_model('store', 'VoiceSession') | |
| VoiceMessage = apps.get_model('store', 'VoiceMessage') | |
| return VoiceSession, VoiceMessage | |
| class GroqTTS: | |
| """Text-to-Speech using Groq Orpheus TTS API""" | |
| MAX_INPUT_LEN = 200 # Groq Orpheus hard limit per request | |
| def __init__(self): | |
| self.api_key = GROQ_API_KEY | |
| self.model = GROQ_TTS_MODEL | |
| self.voice = GROQ_TTS_VOICE | |
| self.base_url = "https://api.groq.com/openai/v1" | |
| # ββ internal: single chunk (β€200 chars) ββββββββββββββββββββββββ | |
| def _tts_chunk(self, text): | |
| """Send a single β€200-char chunk to Groq and return WAV bytes.""" | |
| url = f"{self.base_url}/audio/speech" | |
| headers = { | |
| "Authorization": f"Bearer {self.api_key.strip()}", | |
| "Content-Type": "application/json", | |
| } | |
| data = { | |
| "model": self.model, | |
| "input": text, | |
| "voice": self.voice, | |
| "response_format": "wav", | |
| } | |
| response = requests.post(url, json=data, headers=headers) | |
| response.raise_for_status() | |
| return response.content | |
| # ββ split long text into β€200-char pieces at sentence boundaries ββ | |
| def _split_text(text, limit=200): | |
| """Break *text* into chunks of at most *limit* characters, | |
| splitting on sentence-ending punctuation when possible.""" | |
| chunks = [] | |
| while text: | |
| if len(text) <= limit: | |
| chunks.append(text) | |
| break | |
| # Try to split at the last sentence-end within the limit | |
| segment = text[:limit] | |
| split_pos = -1 | |
| for sep in ['. ', '! ', '? ', '.\'', '!\"', '?\"']: | |
| idx = segment.rfind(sep) | |
| if idx > split_pos: | |
| split_pos = idx + len(sep) | |
| if split_pos <= 0: | |
| # Fall back to last space | |
| split_pos = segment.rfind(' ') | |
| if split_pos <= 0: | |
| split_pos = limit # hard cut | |
| chunks.append(text[:split_pos].strip()) | |
| text = text[split_pos:].strip() | |
| return [c for c in chunks if c] | |
| # ββ public API (handles any length) ββββββββββββββββββββββββββββ | |
| def text_to_speech(self, text): | |
| """ | |
| Convert text to speech using Groq Orpheus TTS. | |
| Automatically chunks text >200 chars and concatenates WAV output. | |
| Returns audio data in WAV format. | |
| """ | |
| import struct, io | |
| chunks = self._split_text(text, self.MAX_INPUT_LEN) | |
| if not chunks: | |
| return None | |
| try: | |
| if len(chunks) == 1: | |
| return self._tts_chunk(chunks[0]) | |
| # Multiple chunks: concatenate raw PCM from each WAV | |
| pcm_parts = [] | |
| wav_params = None # (channels, sample_width, sample_rate) | |
| for chunk in chunks: | |
| wav_bytes = self._tts_chunk(chunk) | |
| buf = io.BytesIO(wav_bytes) | |
| # Parse minimal WAV header (44 bytes canonical) | |
| buf.read(4) # RIFF | |
| buf.read(4) # file size | |
| buf.read(4) # WAVE | |
| buf.read(4) # fmt | |
| fmt_size = struct.unpack('<I', buf.read(4))[0] | |
| fmt_data = buf.read(fmt_size) | |
| channels = struct.unpack('<H', fmt_data[2:4])[0] | |
| sample_rate = struct.unpack('<I', fmt_data[4:8])[0] | |
| bits = struct.unpack('<H', fmt_data[14:16])[0] | |
| if wav_params is None: | |
| wav_params = (channels, bits // 8, sample_rate) | |
| # Skip to data chunk | |
| while True: | |
| chunk_id = buf.read(4) | |
| if not chunk_id: | |
| break | |
| chunk_size = struct.unpack('<I', buf.read(4))[0] | |
| if chunk_id == b'data': | |
| pcm_parts.append(buf.read(chunk_size)) | |
| break | |
| else: | |
| buf.read(chunk_size) | |
| # Rebuild single WAV | |
| pcm = b''.join(pcm_parts) | |
| ch, sw, sr = wav_params | |
| data_size = len(pcm) | |
| header = struct.pack( | |
| '<4sI4s4sIHHIIHH4sI', | |
| b'RIFF', 36 + data_size, b'WAVE', | |
| b'fmt ', 16, 1, ch, sr, sr * ch * sw, ch * sw, sw * 8, | |
| b'data', data_size, | |
| ) | |
| return header + pcm | |
| except requests.exceptions.RequestException as e: | |
| print(f"Groq TTS API Error: {e}") | |
| return None | |
| except Exception as e: | |
| print(f"Groq TTS processing error: {e}") | |
| return None | |
| # Backward-compatible alias so existing references keep working | |
| ElevenLabsTTS = GroqTTS | |
| class ConversationFlow: | |
| """Manages the conversation flow for returns and exchanges""" | |
| def __init__(self): | |
| self.tts = ElevenLabsTTS() | |
| self.state = {} | |
| def process_user_input(self, session_id, user_text, current_step): | |
| """Process user input and generate appropriate response""" | |
| # Initialize session state | |
| if session_id not in self.state: | |
| self.state[session_id] = { | |
| 'step': 'greeting', | |
| 'data': {} | |
| } | |
| session = self.state[session_id] | |
| # Process based on current step | |
| if current_step == 'greeting': | |
| # Only short-circuit when the user clearly says exchange/return. | |
| # Without a keyword match we ask explicitly β this prevents | |
| # inputs like "I would be 23" from being misclassified. | |
| user_lower = user_text.strip().lower() | |
| intent_keywords = [ | |
| 'return', 'refund', 'money back', 'send back', | |
| 'exchange', 'swap', 'replace', 'replacement', 'different product', | |
| ] | |
| has_intent = any(kw in user_lower for kw in intent_keywords) | |
| if has_intent: | |
| question_asked = CONVERSATION_TEMPLATES['ask_exchange_or_return'] | |
| request_type = self._extract_request_type(user_text, question_asked) | |
| print(f"[flow] greeting -> keyword found -> classified: {request_type}") | |
| session['data']['request_type'] = request_type | |
| response_text = CONVERSATION_TEMPLATES['ask_order_id'] | |
| next_step = 'ask_order_id' | |
| else: | |
| print(f"[flow] greeting -> no intent keyword in '{user_text}', asking explicitly") | |
| response_text = CONVERSATION_TEMPLATES['ask_exchange_or_return'] | |
| next_step = 'ask_exchange_or_return' | |
| elif current_step == 'ask_exchange_or_return': | |
| question_asked = CONVERSATION_TEMPLATES['ask_exchange_or_return'] | |
| print(f"\n{'='*70}") | |
| print(f"π€ PROCESSING STEP: ask_exchange_or_return") | |
| print(f"π User input: '{user_text}'") | |
| print(f"β Question asked: '{question_asked}'") | |
| print(f"{'='*70}") | |
| request_type = self._extract_request_type(user_text, question_asked) | |
| print(f"\n{'='*70}") | |
| print(f"π EXTRACTION RESULT: '{request_type}'") | |
| print(f"π Result type: {type(request_type)}") | |
| print(f"π Result is truthy: {bool(request_type)}") | |
| print(f"{'='*70}\n") | |
| # request_type should NEVER be None now, but just in case | |
| if request_type and request_type in ['exchange', 'return']: | |
| session['data']['request_type'] = request_type | |
| print(f"β SUCCESS: Request type set to: {request_type}") | |
| response_text = CONVERSATION_TEMPLATES['ask_order_id'] | |
| next_step = 'ask_order_id' | |
| else: | |
| print(f"β UNEXPECTED: Got invalid request_type: {request_type}") | |
| print(f"β οΈ This should not happen - defaulting to exchange") | |
| session['data']['request_type'] = 'exchange' | |
| response_text = CONVERSATION_TEMPLATES['ask_order_id'] | |
| next_step = 'ask_order_id' | |
| elif current_step == 'ask_order_id': | |
| question_asked = CONVERSATION_TEMPLATES['ask_order_id'] | |
| print(f"\nπ€ PROCESSING STEP: ask_order_id") | |
| print(f"π User input: '{user_text}'") | |
| order_id = self._extract_order_id(user_text, question_asked) | |
| print(f"π Extraction result: {order_id}") | |
| if order_id: | |
| session['data']['order_id'] = order_id | |
| print(f"β Order ID set to: {order_id}") | |
| request_type = session['data']['request_type'] | |
| response_text = CONVERSATION_TEMPLATES['ask_reason'].format(request_type=request_type) | |
| next_step = 'ask_reason' | |
| else: | |
| print(f"β οΈ Could not extract order ID from: '{user_text}'") | |
| response_text = "I couldn't find an order number. Please say something like 'order 123' or 'my order number is 456'." | |
| next_step = 'ask_order_id' | |
| elif current_step == 'ask_reason': | |
| # Clean up and standardize the reason using Gemini | |
| request_type = session['data']['request_type'] | |
| question_asked = CONVERSATION_TEMPLATES['ask_reason'].format(request_type=request_type) | |
| cleaned_reason = gemini_ai.extract_reason(user_text, request_type, question_asked) | |
| session['data']['reason'] = cleaned_reason | |
| print(f"π€ Gemini cleaned reason: '{user_text}' -> '{cleaned_reason}'") | |
| if request_type == 'exchange': | |
| response_text = CONVERSATION_TEMPLATES['ask_exchange_preference'] | |
| next_step = 'ask_exchange_preference' | |
| else: | |
| response_text = self._generate_confirmation(session['data']) | |
| next_step = 'confirm_details' | |
| elif current_step == 'ask_exchange_preference': | |
| question_asked = CONVERSATION_TEMPLATES['ask_exchange_preference'] | |
| session['data']['exchange_preference'] = gemini_ai.extract_exchange_preference(user_text, question_asked) | |
| print(f"π€ Gemini cleaned preference: '{user_text}' -> '{session['data']['exchange_preference']}'") | |
| response_text = self._generate_confirmation(session['data']) | |
| next_step = 'confirm_details' | |
| elif current_step == 'confirm_details': | |
| confirmation_question = self._generate_confirmation(session['data']) | |
| if self._is_confirmation(user_text, confirmation_question): | |
| result = self._process_request(session['data']) | |
| response_text = result['message'] | |
| next_step = 'completed' | |
| # Save to database | |
| if DATABASE_INTEGRATION: | |
| self._save_to_database(session['data'], result) | |
| else: | |
| # BUG FIX 2: User said no - ask what was wrong | |
| response_text = CONVERSATION_TEMPLATES['ask_what_wrong'] | |
| next_step = 'identify_correction' | |
| elif current_step == 'identify_correction': | |
| # User specified what they want to correct | |
| field_to_correct = gemini_ai.identify_correction_field(user_text, session['data']) | |
| print(f"π§ User wants to correct: {field_to_correct}") | |
| # Redirect to the appropriate question to get NEW answer | |
| if field_to_correct == 'request_type': | |
| response_text = CONVERSATION_TEMPLATES['ask_exchange_or_return'] | |
| next_step = 'ask_exchange_or_return' | |
| elif field_to_correct == 'order_id': | |
| response_text = CONVERSATION_TEMPLATES['ask_order_id'] | |
| next_step = 'ask_order_id' | |
| elif field_to_correct == 'reason': | |
| request_type = session['data']['request_type'] | |
| response_text = CONVERSATION_TEMPLATES['ask_reason'].format(request_type=request_type) | |
| next_step = 'ask_reason' | |
| elif field_to_correct == 'exchange_preference': | |
| response_text = CONVERSATION_TEMPLATES['ask_exchange_preference'] | |
| next_step = 'ask_exchange_preference' | |
| else: # everything or unrecognized | |
| # Start over | |
| session['data'] = {} | |
| response_text = CONVERSATION_TEMPLATES['ask_exchange_or_return'] | |
| next_step = 'ask_exchange_or_return' | |
| else: | |
| response_text = CONVERSATION_TEMPLATES['greeting'] | |
| next_step = 'greeting' | |
| # Update session | |
| session['step'] = next_step | |
| return { | |
| 'text': response_text, | |
| 'next_step': next_step, | |
| 'session_data': session['data'], | |
| 'valid_answers': self._get_valid_answers(next_step), | |
| } | |
| def _extract_request_type(self, text, question_asked): | |
| """Extract whether user wants exchange or return using Gemini AI with question context""" | |
| result = gemini_ai.extract_request_type(text, question_asked) | |
| if result: | |
| print(f"π€ Gemini understood '{text}' as: {result}") | |
| return result | |
| def _extract_order_id(self, text, question_asked): | |
| """Extract order ID from user input using Gemini AI with question context""" | |
| result = gemini_ai.extract_order_id(text, question_asked) | |
| if result: | |
| print(f"π€ Gemini extracted order ID from '{text}': {result}") | |
| return result | |
| def _is_confirmation(self, text, question_asked): | |
| """Check if user is confirming using Gemini AI with question context""" | |
| result = gemini_ai.is_confirmation(text, question_asked) | |
| print(f"π€ Gemini understood '{text}' as confirmation: {result}") | |
| return result | |
| def _get_valid_answers(self, step): | |
| """Return the set of valid / example answers for a given step. | |
| For ask_order_id we pull real order IDs from the database.""" | |
| if step == 'greeting' or step == 'ask_exchange_or_return': | |
| return ['exchange', 'return', 'I want to exchange', 'I want to return'] | |
| elif step == 'ask_order_id': | |
| try: | |
| Order = apps.get_model('store', 'Order') | |
| order_ids = list( | |
| Order.objects.order_by('-id').values_list('id', flat=True)[:20] | |
| ) | |
| return [str(oid) for oid in order_ids] if order_ids else ['No orders found'] | |
| except Exception as e: | |
| print(f'[valid_answers] DB error: {e}') | |
| return ['order 123', '12345'] | |
| elif step == 'ask_reason': | |
| return ['wrong size', 'defective product', "I don't like it", 'color mismatch', 'poor quality'] | |
| elif step == 'ask_exchange_preference': | |
| return ['size large', 'different color - blue', 'size medium', 'black color'] | |
| elif step == 'confirm_details': | |
| return ['yes', "yes that's correct", 'no', 'change the order number', 'wrong reason'] | |
| return [] | |
| def _generate_confirmation(self, data): | |
| """Generate confirmation message""" | |
| request_type = data['request_type'] | |
| order_id = data['order_id'] | |
| reason = data['reason'] | |
| extra_info = "" | |
| if request_type == 'exchange': | |
| extra_info = f"You want {data.get('exchange_preference', 'a different product')} instead. " | |
| return CONVERSATION_TEMPLATES['confirm_details'].format( | |
| request_type=request_type, | |
| order_id=order_id, | |
| reason=reason, | |
| extra_info=extra_info | |
| ) | |
| def _process_request(self, data): | |
| """Process the return/exchange request""" | |
| request_type = data['request_type'] | |
| if request_type == 'exchange': | |
| return self._process_exchange(data) | |
| else: | |
| return self._process_return(data) | |
| def _process_exchange(self, data): | |
| """Process exchange request""" | |
| tracking_number = f"EXG-{data['order_id']}-{datetime.now().strftime('%Y%m%d%H%M%S')}" | |
| pickup_date = (datetime.now() + timedelta(days=2)).strftime('%B %d, %Y') | |
| delivery_date = (datetime.now() + timedelta(days=EXCHANGE_DELIVERY_DAYS)).strftime('%B %d, %Y') | |
| message = CONVERSATION_TEMPLATES['processing'].format(request_type='exchange') | |
| message += " " + CONVERSATION_TEMPLATES['exchange_pickup_scheduled'].format(pickup_date=pickup_date) | |
| message += " " + CONVERSATION_TEMPLATES['exchange_delivery_scheduled'].format(delivery_date=delivery_date) | |
| message += " " + CONVERSATION_TEMPLATES['success'].format( | |
| request_type='exchange', | |
| tracking_number=tracking_number | |
| ) | |
| return { | |
| 'message': message, | |
| 'tracking_number': tracking_number, | |
| 'pickup_date': pickup_date, | |
| 'delivery_date': delivery_date | |
| } | |
| def _process_return(self, data): | |
| """Process return request""" | |
| tracking_number = f"RET-{data['order_id']}-{datetime.now().strftime('%Y%m%d%H%M%S')}" | |
| pickup_date = (datetime.now() + timedelta(days=2)).strftime('%B %d, %Y') | |
| message = CONVERSATION_TEMPLATES['processing'].format(request_type='return') | |
| message += " " + CONVERSATION_TEMPLATES['return_scheduled'].format(pickup_date=pickup_date) | |
| message += " " + CONVERSATION_TEMPLATES['success'].format( | |
| request_type='return', | |
| tracking_number=tracking_number | |
| ) | |
| return { | |
| 'message': message, | |
| 'tracking_number': tracking_number, | |
| 'pickup_date': pickup_date | |
| } | |
| def _save_to_database(self, data, result): | |
| """Save exchange/return data to database""" | |
| try: | |
| # Import your models dynamically | |
| Order = apps.get_model('shop', 'Order') | |
| OrderDetail = apps.get_model('shop', 'OrderDetail') | |
| order_id = data.get('order_id') | |
| # Find the order | |
| try: | |
| order = Order.objects.get(id=order_id) | |
| order_details = OrderDetail.objects.filter(order=order) | |
| # Update all order details | |
| for detail in order_details: | |
| detail.return_reason = data.get('reason', '') | |
| detail.return_status = 'Returned' | |
| detail.return_date = datetime.now() | |
| detail.days_to_return = DEFAULT_RETURN_DAYS | |
| if data['request_type'] == 'exchange': | |
| detail.is_exchanged = True | |
| detail.exchange_order = data.get('exchange_preference', '') | |
| detail.save() | |
| print(f"β Database updated for Order {order_id}") | |
| print(f" Reason: {data.get('reason')}") | |
| print(f" Type: {data['request_type']}") | |
| print(f" Tracking: {result.get('tracking_number')}") | |
| except Order.DoesNotExist: | |
| print(f"β Order {order_id} not found in database") | |
| except Exception as e: | |
| print(f"β Database error: {e}") | |
| # Global conversation flow instance | |
| conversation_flow = ConversationFlow() | |
| # Django Views | |
| def start_conversation(request): | |
| """Start a new conversation session""" | |
| import uuid | |
| session_id = str(uuid.uuid4()) | |
| # Generate greeting audio | |
| tts = ElevenLabsTTS() | |
| greeting_text = CONVERSATION_TEMPLATES['greeting'] | |
| # Create DB session record | |
| try: | |
| VoiceSession, _ = _get_voice_models() | |
| VoiceSession.objects.create(session_id=session_id) | |
| except Exception as e: | |
| print(f'[voice] DB session create error: {e}') | |
| return JsonResponse({ | |
| 'session_id': session_id, | |
| 'message': greeting_text, | |
| 'next_step': 'greeting', | |
| 'gemini_enabled': gemini_ai.enabled, | |
| 'valid_answers': ['exchange', 'return', 'I want to exchange', 'I want to return'], | |
| }) | |
| def process_voice(request): | |
| """Process user voice input and return audio response""" | |
| try: | |
| data = json.loads(request.body) | |
| session_id = data.get('session_id') | |
| user_text = data.get('text') | |
| current_step = data.get('current_step') | |
| # Process input | |
| # Log what goes into the LLM (and the current state) | |
| print(f"[voice] LLM_INPUT step={current_step} user='{user_text}'") | |
| result = conversation_flow.process_user_input(session_id, user_text, current_step) | |
| print( | |
| f"[voice] session={session_id} step={current_step} -> next={result['next_step']} " | |
| f"user='{user_text}' response='{result['text'][:200]}'" | |
| ) | |
| # Log LLM outputs / parsed session data for debugging intent extraction | |
| try: | |
| print(f"[voice] LLM_OUTPUT data={json.dumps(result.get('session_data', {}))}") | |
| except Exception: | |
| print(f"[voice] LLM_OUTPUT data={result.get('session_data')}") | |
| # ββ Persist turn to database ββββββββββββββββββββββββββββββββββ | |
| try: | |
| VoiceSession, VoiceMessage = _get_voice_models() | |
| vs = VoiceSession.objects.filter(session_id=session_id).first() | |
| if vs: | |
| turn_num = vs.messages.count() + 1 | |
| VoiceMessage.objects.create( | |
| session=vs, | |
| turn_number=turn_num, | |
| step=current_step, | |
| next_step=result['next_step'], | |
| user_text=user_text, | |
| bot_text=result['text'], | |
| extracted_value=str(result.get('session_data', {}).get( | |
| {'greeting': 'request_type', | |
| 'ask_exchange_or_return': 'request_type', | |
| 'ask_order_id': 'order_id', | |
| 'ask_reason': 'reason', | |
| 'ask_exchange_preference': 'exchange_preference', | |
| 'confirm_details': 'confirmed', | |
| }.get(current_step, ''), '')), | |
| session_data_snapshot=result.get('session_data', {}), | |
| ) | |
| # Update session-level fields from collected data | |
| sd = result.get('session_data', {}) | |
| vs.request_type = sd.get('request_type', vs.request_type) | |
| vs.order_id = sd.get('order_id', vs.order_id) | |
| vs.reason = sd.get('reason', vs.reason) | |
| vs.exchange_preference = sd.get('exchange_preference', vs.exchange_preference) | |
| vs.session_data_json = sd | |
| if result['next_step'] == 'completed': | |
| vs.status = 'completed' | |
| vs.ended_at = timezone.now() | |
| vs.tracking_number = sd.get('tracking_number', '') | |
| vs.save() | |
| except Exception as e: | |
| print(f'[voice] DB log error: {e}') | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Return JSON β TTS is handled in the browser via speechSynthesis | |
| return JsonResponse({ | |
| 'text': result['text'], | |
| 'next_step': result['next_step'], | |
| 'session_data': result['session_data'], | |
| 'valid_answers': result.get('valid_answers', []), | |
| }) | |
| except Exception as e: | |
| return JsonResponse({'error': str(e)}, status=500) | |
| def get_speech(request): | |
| """Return the text as JSON β browser handles TTS via speechSynthesis""" | |
| try: | |
| data = json.loads(request.body) | |
| text = data.get('text', '') | |
| return JsonResponse({'text': text}) | |
| except Exception as e: | |
| return JsonResponse({'error': str(e)}, status=500) | |