HTT / voice_service.py
Deep
some files added
880d7e9
# Voice Bot Service – TTS handled by browser speechSynthesis
import requests
import re
from datetime import datetime, timedelta
from django.apps import apps
from django.http import JsonResponse, HttpResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from django.utils import timezone
import json
from voice_config import *
from gemini_helper import gemini_ai
def _get_voice_models():
"""Lazy import of VoiceSession / VoiceMessage to avoid AppRegistryNotReady."""
VoiceSession = apps.get_model('store', 'VoiceSession')
VoiceMessage = apps.get_model('store', 'VoiceMessage')
return VoiceSession, VoiceMessage
class GroqTTS:
"""Text-to-Speech using Groq Orpheus TTS API"""
MAX_INPUT_LEN = 200 # Groq Orpheus hard limit per request
def __init__(self):
self.api_key = GROQ_API_KEY
self.model = GROQ_TTS_MODEL
self.voice = GROQ_TTS_VOICE
self.base_url = "https://api.groq.com/openai/v1"
# ── internal: single chunk (≀200 chars) ────────────────────────
def _tts_chunk(self, text):
"""Send a single ≀200-char chunk to Groq and return WAV bytes."""
url = f"{self.base_url}/audio/speech"
headers = {
"Authorization": f"Bearer {self.api_key.strip()}",
"Content-Type": "application/json",
}
data = {
"model": self.model,
"input": text,
"voice": self.voice,
"response_format": "wav",
}
response = requests.post(url, json=data, headers=headers)
response.raise_for_status()
return response.content
# ── split long text into ≀200-char pieces at sentence boundaries ──
@staticmethod
def _split_text(text, limit=200):
"""Break *text* into chunks of at most *limit* characters,
splitting on sentence-ending punctuation when possible."""
chunks = []
while text:
if len(text) <= limit:
chunks.append(text)
break
# Try to split at the last sentence-end within the limit
segment = text[:limit]
split_pos = -1
for sep in ['. ', '! ', '? ', '.\'', '!\"', '?\"']:
idx = segment.rfind(sep)
if idx > split_pos:
split_pos = idx + len(sep)
if split_pos <= 0:
# Fall back to last space
split_pos = segment.rfind(' ')
if split_pos <= 0:
split_pos = limit # hard cut
chunks.append(text[:split_pos].strip())
text = text[split_pos:].strip()
return [c for c in chunks if c]
# ── public API (handles any length) ────────────────────────────
def text_to_speech(self, text):
"""
Convert text to speech using Groq Orpheus TTS.
Automatically chunks text >200 chars and concatenates WAV output.
Returns audio data in WAV format.
"""
import struct, io
chunks = self._split_text(text, self.MAX_INPUT_LEN)
if not chunks:
return None
try:
if len(chunks) == 1:
return self._tts_chunk(chunks[0])
# Multiple chunks: concatenate raw PCM from each WAV
pcm_parts = []
wav_params = None # (channels, sample_width, sample_rate)
for chunk in chunks:
wav_bytes = self._tts_chunk(chunk)
buf = io.BytesIO(wav_bytes)
# Parse minimal WAV header (44 bytes canonical)
buf.read(4) # RIFF
buf.read(4) # file size
buf.read(4) # WAVE
buf.read(4) # fmt
fmt_size = struct.unpack('<I', buf.read(4))[0]
fmt_data = buf.read(fmt_size)
channels = struct.unpack('<H', fmt_data[2:4])[0]
sample_rate = struct.unpack('<I', fmt_data[4:8])[0]
bits = struct.unpack('<H', fmt_data[14:16])[0]
if wav_params is None:
wav_params = (channels, bits // 8, sample_rate)
# Skip to data chunk
while True:
chunk_id = buf.read(4)
if not chunk_id:
break
chunk_size = struct.unpack('<I', buf.read(4))[0]
if chunk_id == b'data':
pcm_parts.append(buf.read(chunk_size))
break
else:
buf.read(chunk_size)
# Rebuild single WAV
pcm = b''.join(pcm_parts)
ch, sw, sr = wav_params
data_size = len(pcm)
header = struct.pack(
'<4sI4s4sIHHIIHH4sI',
b'RIFF', 36 + data_size, b'WAVE',
b'fmt ', 16, 1, ch, sr, sr * ch * sw, ch * sw, sw * 8,
b'data', data_size,
)
return header + pcm
except requests.exceptions.RequestException as e:
print(f"Groq TTS API Error: {e}")
return None
except Exception as e:
print(f"Groq TTS processing error: {e}")
return None
# Backward-compatible alias so existing references keep working
ElevenLabsTTS = GroqTTS
class ConversationFlow:
"""Manages the conversation flow for returns and exchanges"""
def __init__(self):
self.tts = ElevenLabsTTS()
self.state = {}
def process_user_input(self, session_id, user_text, current_step):
"""Process user input and generate appropriate response"""
# Initialize session state
if session_id not in self.state:
self.state[session_id] = {
'step': 'greeting',
'data': {}
}
session = self.state[session_id]
# Process based on current step
if current_step == 'greeting':
# Only short-circuit when the user clearly says exchange/return.
# Without a keyword match we ask explicitly β€” this prevents
# inputs like "I would be 23" from being misclassified.
user_lower = user_text.strip().lower()
intent_keywords = [
'return', 'refund', 'money back', 'send back',
'exchange', 'swap', 'replace', 'replacement', 'different product',
]
has_intent = any(kw in user_lower for kw in intent_keywords)
if has_intent:
question_asked = CONVERSATION_TEMPLATES['ask_exchange_or_return']
request_type = self._extract_request_type(user_text, question_asked)
print(f"[flow] greeting -> keyword found -> classified: {request_type}")
session['data']['request_type'] = request_type
response_text = CONVERSATION_TEMPLATES['ask_order_id']
next_step = 'ask_order_id'
else:
print(f"[flow] greeting -> no intent keyword in '{user_text}', asking explicitly")
response_text = CONVERSATION_TEMPLATES['ask_exchange_or_return']
next_step = 'ask_exchange_or_return'
elif current_step == 'ask_exchange_or_return':
question_asked = CONVERSATION_TEMPLATES['ask_exchange_or_return']
print(f"\n{'='*70}")
print(f"🎀 PROCESSING STEP: ask_exchange_or_return")
print(f"πŸ“ User input: '{user_text}'")
print(f"❓ Question asked: '{question_asked}'")
print(f"{'='*70}")
request_type = self._extract_request_type(user_text, question_asked)
print(f"\n{'='*70}")
print(f"πŸ” EXTRACTION RESULT: '{request_type}'")
print(f"πŸ” Result type: {type(request_type)}")
print(f"πŸ” Result is truthy: {bool(request_type)}")
print(f"{'='*70}\n")
# request_type should NEVER be None now, but just in case
if request_type and request_type in ['exchange', 'return']:
session['data']['request_type'] = request_type
print(f"βœ… SUCCESS: Request type set to: {request_type}")
response_text = CONVERSATION_TEMPLATES['ask_order_id']
next_step = 'ask_order_id'
else:
print(f"❌ UNEXPECTED: Got invalid request_type: {request_type}")
print(f"⚠️ This should not happen - defaulting to exchange")
session['data']['request_type'] = 'exchange'
response_text = CONVERSATION_TEMPLATES['ask_order_id']
next_step = 'ask_order_id'
elif current_step == 'ask_order_id':
question_asked = CONVERSATION_TEMPLATES['ask_order_id']
print(f"\n🎀 PROCESSING STEP: ask_order_id")
print(f"πŸ“ User input: '{user_text}'")
order_id = self._extract_order_id(user_text, question_asked)
print(f"πŸ” Extraction result: {order_id}")
if order_id:
session['data']['order_id'] = order_id
print(f"βœ… Order ID set to: {order_id}")
request_type = session['data']['request_type']
response_text = CONVERSATION_TEMPLATES['ask_reason'].format(request_type=request_type)
next_step = 'ask_reason'
else:
print(f"⚠️ Could not extract order ID from: '{user_text}'")
response_text = "I couldn't find an order number. Please say something like 'order 123' or 'my order number is 456'."
next_step = 'ask_order_id'
elif current_step == 'ask_reason':
# Clean up and standardize the reason using Gemini
request_type = session['data']['request_type']
question_asked = CONVERSATION_TEMPLATES['ask_reason'].format(request_type=request_type)
cleaned_reason = gemini_ai.extract_reason(user_text, request_type, question_asked)
session['data']['reason'] = cleaned_reason
print(f"πŸ€– Gemini cleaned reason: '{user_text}' -> '{cleaned_reason}'")
if request_type == 'exchange':
response_text = CONVERSATION_TEMPLATES['ask_exchange_preference']
next_step = 'ask_exchange_preference'
else:
response_text = self._generate_confirmation(session['data'])
next_step = 'confirm_details'
elif current_step == 'ask_exchange_preference':
question_asked = CONVERSATION_TEMPLATES['ask_exchange_preference']
session['data']['exchange_preference'] = gemini_ai.extract_exchange_preference(user_text, question_asked)
print(f"πŸ€– Gemini cleaned preference: '{user_text}' -> '{session['data']['exchange_preference']}'")
response_text = self._generate_confirmation(session['data'])
next_step = 'confirm_details'
elif current_step == 'confirm_details':
confirmation_question = self._generate_confirmation(session['data'])
if self._is_confirmation(user_text, confirmation_question):
result = self._process_request(session['data'])
response_text = result['message']
next_step = 'completed'
# Save to database
if DATABASE_INTEGRATION:
self._save_to_database(session['data'], result)
else:
# BUG FIX 2: User said no - ask what was wrong
response_text = CONVERSATION_TEMPLATES['ask_what_wrong']
next_step = 'identify_correction'
elif current_step == 'identify_correction':
# User specified what they want to correct
field_to_correct = gemini_ai.identify_correction_field(user_text, session['data'])
print(f"πŸ”§ User wants to correct: {field_to_correct}")
# Redirect to the appropriate question to get NEW answer
if field_to_correct == 'request_type':
response_text = CONVERSATION_TEMPLATES['ask_exchange_or_return']
next_step = 'ask_exchange_or_return'
elif field_to_correct == 'order_id':
response_text = CONVERSATION_TEMPLATES['ask_order_id']
next_step = 'ask_order_id'
elif field_to_correct == 'reason':
request_type = session['data']['request_type']
response_text = CONVERSATION_TEMPLATES['ask_reason'].format(request_type=request_type)
next_step = 'ask_reason'
elif field_to_correct == 'exchange_preference':
response_text = CONVERSATION_TEMPLATES['ask_exchange_preference']
next_step = 'ask_exchange_preference'
else: # everything or unrecognized
# Start over
session['data'] = {}
response_text = CONVERSATION_TEMPLATES['ask_exchange_or_return']
next_step = 'ask_exchange_or_return'
else:
response_text = CONVERSATION_TEMPLATES['greeting']
next_step = 'greeting'
# Update session
session['step'] = next_step
return {
'text': response_text,
'next_step': next_step,
'session_data': session['data'],
'valid_answers': self._get_valid_answers(next_step),
}
def _extract_request_type(self, text, question_asked):
"""Extract whether user wants exchange or return using Gemini AI with question context"""
result = gemini_ai.extract_request_type(text, question_asked)
if result:
print(f"πŸ€– Gemini understood '{text}' as: {result}")
return result
def _extract_order_id(self, text, question_asked):
"""Extract order ID from user input using Gemini AI with question context"""
result = gemini_ai.extract_order_id(text, question_asked)
if result:
print(f"πŸ€– Gemini extracted order ID from '{text}': {result}")
return result
def _is_confirmation(self, text, question_asked):
"""Check if user is confirming using Gemini AI with question context"""
result = gemini_ai.is_confirmation(text, question_asked)
print(f"πŸ€– Gemini understood '{text}' as confirmation: {result}")
return result
def _get_valid_answers(self, step):
"""Return the set of valid / example answers for a given step.
For ask_order_id we pull real order IDs from the database."""
if step == 'greeting' or step == 'ask_exchange_or_return':
return ['exchange', 'return', 'I want to exchange', 'I want to return']
elif step == 'ask_order_id':
try:
Order = apps.get_model('store', 'Order')
order_ids = list(
Order.objects.order_by('-id').values_list('id', flat=True)[:20]
)
return [str(oid) for oid in order_ids] if order_ids else ['No orders found']
except Exception as e:
print(f'[valid_answers] DB error: {e}')
return ['order 123', '12345']
elif step == 'ask_reason':
return ['wrong size', 'defective product', "I don't like it", 'color mismatch', 'poor quality']
elif step == 'ask_exchange_preference':
return ['size large', 'different color - blue', 'size medium', 'black color']
elif step == 'confirm_details':
return ['yes', "yes that's correct", 'no', 'change the order number', 'wrong reason']
return []
def _generate_confirmation(self, data):
"""Generate confirmation message"""
request_type = data['request_type']
order_id = data['order_id']
reason = data['reason']
extra_info = ""
if request_type == 'exchange':
extra_info = f"You want {data.get('exchange_preference', 'a different product')} instead. "
return CONVERSATION_TEMPLATES['confirm_details'].format(
request_type=request_type,
order_id=order_id,
reason=reason,
extra_info=extra_info
)
def _process_request(self, data):
"""Process the return/exchange request"""
request_type = data['request_type']
if request_type == 'exchange':
return self._process_exchange(data)
else:
return self._process_return(data)
def _process_exchange(self, data):
"""Process exchange request"""
tracking_number = f"EXG-{data['order_id']}-{datetime.now().strftime('%Y%m%d%H%M%S')}"
pickup_date = (datetime.now() + timedelta(days=2)).strftime('%B %d, %Y')
delivery_date = (datetime.now() + timedelta(days=EXCHANGE_DELIVERY_DAYS)).strftime('%B %d, %Y')
message = CONVERSATION_TEMPLATES['processing'].format(request_type='exchange')
message += " " + CONVERSATION_TEMPLATES['exchange_pickup_scheduled'].format(pickup_date=pickup_date)
message += " " + CONVERSATION_TEMPLATES['exchange_delivery_scheduled'].format(delivery_date=delivery_date)
message += " " + CONVERSATION_TEMPLATES['success'].format(
request_type='exchange',
tracking_number=tracking_number
)
return {
'message': message,
'tracking_number': tracking_number,
'pickup_date': pickup_date,
'delivery_date': delivery_date
}
def _process_return(self, data):
"""Process return request"""
tracking_number = f"RET-{data['order_id']}-{datetime.now().strftime('%Y%m%d%H%M%S')}"
pickup_date = (datetime.now() + timedelta(days=2)).strftime('%B %d, %Y')
message = CONVERSATION_TEMPLATES['processing'].format(request_type='return')
message += " " + CONVERSATION_TEMPLATES['return_scheduled'].format(pickup_date=pickup_date)
message += " " + CONVERSATION_TEMPLATES['success'].format(
request_type='return',
tracking_number=tracking_number
)
return {
'message': message,
'tracking_number': tracking_number,
'pickup_date': pickup_date
}
def _save_to_database(self, data, result):
"""Save exchange/return data to database"""
try:
# Import your models dynamically
Order = apps.get_model('shop', 'Order')
OrderDetail = apps.get_model('shop', 'OrderDetail')
order_id = data.get('order_id')
# Find the order
try:
order = Order.objects.get(id=order_id)
order_details = OrderDetail.objects.filter(order=order)
# Update all order details
for detail in order_details:
detail.return_reason = data.get('reason', '')
detail.return_status = 'Returned'
detail.return_date = datetime.now()
detail.days_to_return = DEFAULT_RETURN_DAYS
if data['request_type'] == 'exchange':
detail.is_exchanged = True
detail.exchange_order = data.get('exchange_preference', '')
detail.save()
print(f"βœ… Database updated for Order {order_id}")
print(f" Reason: {data.get('reason')}")
print(f" Type: {data['request_type']}")
print(f" Tracking: {result.get('tracking_number')}")
except Order.DoesNotExist:
print(f"❌ Order {order_id} not found in database")
except Exception as e:
print(f"❌ Database error: {e}")
# Global conversation flow instance
conversation_flow = ConversationFlow()
# Django Views
@csrf_exempt
@require_http_methods(["POST"])
def start_conversation(request):
"""Start a new conversation session"""
import uuid
session_id = str(uuid.uuid4())
# Generate greeting audio
tts = ElevenLabsTTS()
greeting_text = CONVERSATION_TEMPLATES['greeting']
# Create DB session record
try:
VoiceSession, _ = _get_voice_models()
VoiceSession.objects.create(session_id=session_id)
except Exception as e:
print(f'[voice] DB session create error: {e}')
return JsonResponse({
'session_id': session_id,
'message': greeting_text,
'next_step': 'greeting',
'gemini_enabled': gemini_ai.enabled,
'valid_answers': ['exchange', 'return', 'I want to exchange', 'I want to return'],
})
@csrf_exempt
@require_http_methods(["POST"])
def process_voice(request):
"""Process user voice input and return audio response"""
try:
data = json.loads(request.body)
session_id = data.get('session_id')
user_text = data.get('text')
current_step = data.get('current_step')
# Process input
# Log what goes into the LLM (and the current state)
print(f"[voice] LLM_INPUT step={current_step} user='{user_text}'")
result = conversation_flow.process_user_input(session_id, user_text, current_step)
print(
f"[voice] session={session_id} step={current_step} -> next={result['next_step']} "
f"user='{user_text}' response='{result['text'][:200]}'"
)
# Log LLM outputs / parsed session data for debugging intent extraction
try:
print(f"[voice] LLM_OUTPUT data={json.dumps(result.get('session_data', {}))}")
except Exception:
print(f"[voice] LLM_OUTPUT data={result.get('session_data')}")
# ── Persist turn to database ──────────────────────────────────
try:
VoiceSession, VoiceMessage = _get_voice_models()
vs = VoiceSession.objects.filter(session_id=session_id).first()
if vs:
turn_num = vs.messages.count() + 1
VoiceMessage.objects.create(
session=vs,
turn_number=turn_num,
step=current_step,
next_step=result['next_step'],
user_text=user_text,
bot_text=result['text'],
extracted_value=str(result.get('session_data', {}).get(
{'greeting': 'request_type',
'ask_exchange_or_return': 'request_type',
'ask_order_id': 'order_id',
'ask_reason': 'reason',
'ask_exchange_preference': 'exchange_preference',
'confirm_details': 'confirmed',
}.get(current_step, ''), '')),
session_data_snapshot=result.get('session_data', {}),
)
# Update session-level fields from collected data
sd = result.get('session_data', {})
vs.request_type = sd.get('request_type', vs.request_type)
vs.order_id = sd.get('order_id', vs.order_id)
vs.reason = sd.get('reason', vs.reason)
vs.exchange_preference = sd.get('exchange_preference', vs.exchange_preference)
vs.session_data_json = sd
if result['next_step'] == 'completed':
vs.status = 'completed'
vs.ended_at = timezone.now()
vs.tracking_number = sd.get('tracking_number', '')
vs.save()
except Exception as e:
print(f'[voice] DB log error: {e}')
# ──────────────────────────────────────────────────────────────
# Return JSON – TTS is handled in the browser via speechSynthesis
return JsonResponse({
'text': result['text'],
'next_step': result['next_step'],
'session_data': result['session_data'],
'valid_answers': result.get('valid_answers', []),
})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
@csrf_exempt
@require_http_methods(["POST"])
def get_speech(request):
"""Return the text as JSON – browser handles TTS via speechSynthesis"""
try:
data = json.loads(request.body)
text = data.get('text', '')
return JsonResponse({'text': text})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)