vBot-2.3 / app.py
iajitpanday's picture
Update app.py
6258eca verified
# app.py
import os
import uuid
import json
import time
import gradio as gr
import numpy as np
import torch
import whisper
import mysql.connector
from mysql.connector import pooling
from transformers import AutoTokenizer, AutoModelForCausalLM
from pydub import AudioSegment
import tempfile
import hashlib
import datetime
import secrets
import traceback
# Initialize models (lightweight versions for Spaces)
ASR_MODEL = "base" # Smaller Whisper model
NLU_MODEL = "facebook/blenderbot-400M-distill" # Smaller conversation model
# Database configuration
DB_CONFIG = {
"host": "hopper.proxy.rlwy.net",
"port": 16751,
"user": "root",
"password": "svLvVDyJwyvWsAxTAEkrMPqkzLBkLMrD",
"database": "railway",
"pool_name": "voicebot_pool",
"pool_size": 5
}
# Create connection pool
try:
print(f"Attempting to connect to MySQL at {DB_CONFIG['host']}:{DB_CONFIG['port']}...")
cnx_pool = mysql.connector.pooling.MySQLConnectionPool(**DB_CONFIG)
print("Database connection pool created successfully")
# Test the connection by getting one
test_conn = cnx_pool.get_connection()
if test_conn.is_connected():
print(f"Successfully connected to {DB_CONFIG['database']} database")
test_conn.close()
except Exception as e:
print(f"Error creating database pool: {e}")
# Use in-memory dictionary as fallback
print("Using in-memory storage as fallback")
in_memory_db = {"clients": {}, "conversations": {}}
# Initialize models
print("Loading ASR model...")
asr_model = whisper.load_model(ASR_MODEL)
print("ASR model loaded")
print("Loading NLU model...")
tokenizer = AutoTokenizer.from_pretrained(NLU_MODEL)
nlu_model = AutoModelForCausalLM.from_pretrained(NLU_MODEL)
print("NLU model loaded")
# Database schema initialization
def initialize_database():
try:
conn = cnx_pool.get_connection()
cursor = conn.cursor()
# Create tables if they don't exist
cursor.execute("""
CREATE TABLE IF NOT EXISTS clients (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE,
phone VARCHAR(50),
api_key VARCHAR(64) NOT NULL UNIQUE,
pbx_type ENUM('Asterisk', 'FreeSwitch', '3CX', 'Nextiva', 'Other'),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS conversations (
id INT AUTO_INCREMENT PRIMARY KEY,
client_id INT,
caller_id VARCHAR(50),
start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
end_time TIMESTAMP NULL,
transcript TEXT,
FOREIGN KEY (client_id) REFERENCES clients(id)
)
""")
conn.commit()
print("Database initialized successfully")
except Exception as e:
print(f"Error initializing database: {e}")
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals() and conn.is_connected():
conn.close()
# Initialize database on startup
initialize_database()
# API Key Management
def generate_api_key():
"""Generate a secure API key"""
return hashlib.sha256(secrets.token_bytes(32)).hexdigest()
def create_client(name, email, phone, pbx_type):
"""Create a new client and generate API key"""
api_key = generate_api_key()
try:
conn = cnx_pool.get_connection()
cursor = conn.cursor()
query = """
INSERT INTO clients (name, email, phone, api_key, pbx_type)
VALUES (%s, %s, %s, %s, %s)
"""
cursor.execute(query, (name, email, phone, api_key, pbx_type))
conn.commit()
return {"success": True, "api_key": api_key}
except Exception as e:
print(f"Error creating client: {e}")
# Fallback to in-memory storage
if 'in_memory_db' in globals():
client_id = str(uuid.uuid4())
in_memory_db["clients"][client_id] = {
"name": name,
"email": email,
"phone": phone,
"api_key": api_key,
"pbx_type": pbx_type,
"created_at": datetime.datetime.now().isoformat()
}
return {"success": True, "api_key": api_key}
return {"success": False, "error": str(e)}
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals() and conn.is_connected():
conn.close()
def validate_api_key(api_key):
"""Validate an API key and return client details"""
if not api_key:
return None
try:
conn = cnx_pool.get_connection()
cursor = conn.cursor(dictionary=True)
query = "SELECT * FROM clients WHERE api_key = %s"
cursor.execute(query, (api_key,))
client = cursor.fetchone()
return client
except Exception as e:
print(f"Error validating API key: {e}")
# Fallback to in-memory storage
if 'in_memory_db' in globals():
for client_id, client in in_memory_db["clients"].items():
if client["api_key"] == api_key:
return client
return None
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals() and conn.is_connected():
conn.close()
# Update the transcribe_audio function to fix the numpy array boolean ambiguity error
def transcribe_audio(audio, sample_rate=None):
"""Transcribe audio using Whisper"""
try:
# Check if audio input is empty
if audio is None:
print("Error: Audio input is None")
return "Error: No audio data received"
# Debug the input
print(f"Audio input type: {type(audio)}")
# Handle various input formats
if isinstance(audio, tuple) and len(audio) == 2:
print("Audio is a tuple, extracting array and sample rate")
audio_array, sample_rate = audio
else:
print("Audio is not a tuple")
audio_array = audio
# If sample_rate is None, provide a default value
if sample_rate is None:
sample_rate = 16000 # Common default sample rate
print(f"Using default sample rate: {sample_rate}")
# More detailed debugging
print(f"Audio array type: {type(audio_array)}")
if hasattr(audio_array, 'shape'):
print(f"Audio array shape: {audio_array.shape}")
print(f"Audio array dtype: {audio_array.dtype}")
# Guard against invalid input
if audio_array is None:
print("Empty audio data received")
return "Error: No audio data received"
if isinstance(audio_array, (list, np.ndarray)):
if len(audio_array) == 0:
print("Empty audio array received")
return "Error: No audio data received"
# Convert to numpy array if needed
if not isinstance(audio_array, np.ndarray):
print("Converting to numpy array")
audio_array = np.array(audio_array, dtype=np.float32)
# Normalize audio if needed - FIX: Don't use the array in a boolean context
max_val = np.max(np.abs(audio_array))
if np.isscalar(max_val) and max_val > 1.0: # Fix: Check if scalar and then compare
print(f"Normalizing audio values from max {max_val} to [-1.0, 1.0] range")
audio_array = audio_array / max_val
# Get temporary file
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_file:
filename = temp_file.name
print(f"Created temp file: {filename}")
# Convert and save audio
try:
print(f"Creating AudioSegment with sample rate {sample_rate}")
audio_segment = AudioSegment(
audio_array.tobytes(),
frame_rate=sample_rate,
sample_width=audio_array.dtype.itemsize,
channels=1
)
print("AudioSegment created, exporting to WAV")
audio_segment.export(filename, format="wav")
print("WAV file created successfully")
except Exception as audio_e:
print(f"Error in audio conversion: {audio_e}")
return f"Error in audio conversion: {str(audio_e)}"
# Transcribe with Whisper
try:
print("Starting transcription with Whisper")
result = asr_model.transcribe(filename)
print("Transcription completed")
transcribed_text = result["text"].strip()
print(f"Transcribed text: {transcribed_text}")
# Return empty message if no text was transcribed
if not transcribed_text:
return "I couldn't hear anything. Please try speaking again."
return transcribed_text
except Exception as whisper_e:
print(f"Error in Whisper transcription: {whisper_e}")
return f"Error in transcription: {str(whisper_e)}"
finally:
# Clean up
try:
os.unlink(filename)
print(f"Deleted temp file: {filename}")
except Exception as e:
print(f"Warning: Could not delete temp file {filename}: {e}")
except Exception as e:
print(f"Error transcribing audio: {e}")
traceback.print_exc()
return f"Error processing audio: {str(e)}"
def generate_response(text):
"""Generate a response using the NLU model"""
try:
if not text or text.startswith("Error:"):
return "I'm sorry, I couldn't understand what you said. Could you please try again?"
inputs = tokenizer(text, return_tensors="pt")
# Generate a response
with torch.no_grad():
outputs = nlu_model.generate(
inputs["input_ids"],
max_length=100,
num_return_sequences=1,
temperature=0.7,
top_k=50,
top_p=0.95,
pad_token_id=tokenizer.eos_token_id
)
response = tokenizer.decode(outputs[0], skip_special_tokens=True)
return response
except Exception as e:
print(f"Error generating response: {e}")
traceback.print_exc()
return "I'm sorry, I encountered an error processing your request."
def log_conversation(client_id, caller_id, transcript):
"""Log a conversation to the database"""
try:
conn = cnx_pool.get_connection()
cursor = conn.cursor()
query = """
INSERT INTO conversations (client_id, caller_id, transcript)
VALUES (%s, %s, %s)
"""
cursor.execute(query, (client_id, caller_id, json.dumps(transcript)))
conn.commit()
return True
except Exception as e:
print(f"Error logging conversation: {e}")
# Fallback to in-memory storage
if 'in_memory_db' in globals():
conv_id = str(uuid.uuid4())
in_memory_db["conversations"][conv_id] = {
"client_id": client_id,
"caller_id": caller_id,
"start_time": datetime.datetime.now().isoformat(),
"transcript": transcript
}
return False
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals() and conn.is_connected():
conn.close()
def process_voice_interaction(audio, api_key, caller_id="unknown"):
"""Process a voice interaction with the bot"""
# Validate API key
client = validate_api_key(api_key)
if not client:
return {"error": "Invalid API key"}
# Check audio data
if audio is None:
return {"error": "No audio data received"}
# Process the audio
try:
print(f"Received audio data type: {type(audio)}")
# Process audio data
transcription = transcribe_audio(audio)
# Log the transcription for debugging
print(f"Transcription: {transcription}")
# Generate response
response_text = generate_response(transcription)
# Log the conversation
transcript = {
"timestamp": time.time(),
"caller_id": caller_id,
"user_input": transcription,
"bot_response": response_text
}
# Use client ID from database if available, otherwise use API key as identifier
client_id = client.get("id", api_key)
log_conversation(client_id, caller_id, transcript)
return {
"success": True,
"transcription": transcription,
"response": response_text
}
except Exception as e:
print(f"Error processing voice interaction: {e}")
traceback.print_exc()
return {"error": str(e)}
# Admin functions
def admin_create_client(name, email, phone, pbx_type):
"""Admin interface to create a client"""
if not name or not email:
return {"error": "Name and email are required"}
result = create_client(name, email, phone, pbx_type)
if result["success"]:
return {"success": True, "message": f"Client created with API key: {result['api_key']}"}
else:
return {"error": result.get("error", "Unknown error")}
def admin_get_clients():
"""Admin interface to get all clients"""
try:
conn = cnx_pool.get_connection()
cursor = conn.cursor(dictionary=True)
query = "SELECT id, name, email, phone, pbx_type, created_at FROM clients"
cursor.execute(query)
clients = cursor.fetchall()
# Convert datetime objects to strings for JSON serialization
for client in clients:
if isinstance(client["created_at"], datetime.datetime):
client["created_at"] = client["created_at"].isoformat()
return {"success": True, "clients": clients}
except Exception as e:
print(f"Error getting clients: {e}")
# Fallback to in-memory
if 'in_memory_db' in globals():
return {"success": True, "clients": list(in_memory_db["clients"].values())}
return {"error": str(e)}
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals() and conn.is_connected():
conn.close()
def admin_get_conversations():
"""Admin interface to get all conversations"""
try:
conn = cnx_pool.get_connection()
cursor = conn.cursor(dictionary=True)
query = """
SELECT c.id, cl.name as client_name, c.caller_id, c.start_time, c.end_time, c.transcript
FROM conversations c
JOIN clients cl ON c.client_id = cl.id
ORDER BY c.start_time DESC
LIMIT 100
"""
cursor.execute(query)
conversations = cursor.fetchall()
# Convert datetime objects and parse transcript JSON
for conv in conversations:
if isinstance(conv["start_time"], datetime.datetime):
conv["start_time"] = conv["start_time"].isoformat()
if isinstance(conv["end_time"], datetime.datetime):
conv["end_time"] = conv["end_time"].isoformat()
if conv["transcript"]:
try:
conv["transcript"] = json.loads(conv["transcript"])
except json.JSONDecodeError:
pass
return {"success": True, "conversations": conversations}
except Exception as e:
print(f"Error getting conversations: {e}")
# Fallback to in-memory
if 'in_memory_db' in globals():
return {"success": True, "conversations": list(in_memory_db["conversations"].values())}
return {"error": str(e)}
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals() and conn.is_connected():
conn.close()
# Debug function
def debug_audio(audio):
"""Debug function to understand audio format"""
try:
if audio is None:
return {"error": "No audio provided"}
result = {
"type": type(audio).__name__,
"is_tuple": isinstance(audio, tuple),
"length": len(audio) if hasattr(audio, "__len__") else "N/A"
}
if isinstance(audio, tuple) and len(audio) == 2:
result["data_type"] = type(audio[0]).__name__
result["sample_rate"] = audio[1]
if hasattr(audio[0], "shape"):
result["shape"] = audio[0].shape
result["dtype"] = str(audio[0].dtype)
result["min_val"] = float(audio[0].min())
result["max_val"] = float(audio[0].max())
return {"debug_info": result}
except Exception as e:
traceback.print_exc()
return {"error": str(e)}
def build_gradio_interface():
# Admin section
with gr.Blocks() as admin_interface:
gr.Markdown("# Voice Bot Admin Dashboard")
with gr.Tab("Create Client"):
with gr.Row():
client_name = gr.Textbox(label="Client Name")
client_email = gr.Textbox(label="Email")
with gr.Row():
client_phone = gr.Textbox(label="Phone Number")
client_pbx = gr.Dropdown(label="PBX Type", choices=["Asterisk", "FreeSwitch", "3CX", "Nextiva", "Other"])
create_btn = gr.Button("Create Client")
create_output = gr.JSON(label="Result")
create_btn.click(
admin_create_client,
inputs=[client_name, client_email, client_phone, client_pbx],
outputs=create_output
)
with gr.Tab("View Clients"):
refresh_clients_btn = gr.Button("Refresh Client List")
clients_output = gr.JSON(label="Clients")
refresh_clients_btn.click(
admin_get_clients,
inputs=[],
outputs=clients_output
)
with gr.Tab("View Conversations"):
refresh_convs_btn = gr.Button("Refresh Conversations")
convs_output = gr.JSON(label="Recent Conversations")
refresh_convs_btn.click(
admin_get_conversations,
inputs=[],
outputs=convs_output
)
# Test interface for voice bot API
with gr.Blocks() as test_interface:
gr.Markdown("# Voice Bot Test Interface")
with gr.Row():
api_key_input = gr.Textbox(label="API Key")
caller_id_input = gr.Textbox(label="Caller ID (optional)", value="test_caller")
# Conversation history display
conversation_display = gr.Markdown("*Conversation will appear here*")
# Real-time audio input - compatible with older Gradio versions
audio_input = gr.Audio(
label="Speak",
type="numpy"
)
# State to store conversation history
conversation_state = gr.State([])
# Function to process audio and update conversation
def process_and_update(audio, api_key, caller_id, conversation_history):
if not api_key:
return "**Error:** API key is required.", conversation_history
if audio is None:
return "*Conversation will appear here*", conversation_history
# Process the audio
result = process_voice_interaction(audio, api_key, caller_id)
# Update conversation history
if "transcription" in result and "response" in result:
# Add new conversation turn
conversation_history.append({
"user": result["transcription"],
"bot": result["response"]
})
# Format the conversation as markdown
markdown = "## Conversation\n\n"
for turn in conversation_history:
markdown += f"**You:** {turn['user']}\n\n"
markdown += f"**Bot:** {turn['bot']}\n\n"
return markdown, conversation_history
else:
# If there was an error
error_msg = result.get("error", "Unknown error")
return f"**Error:** {error_msg}", conversation_history
# Submit button for audio processing
submit_btn = gr.Button("Process Audio")
# Event handler for submit button
submit_btn.click(
process_and_update,
inputs=[audio_input, api_key_input, caller_id_input, conversation_state],
outputs=[conversation_display, conversation_state]
)
# Clear conversation button
clear_btn = gr.Button("Clear Conversation")
def clear_conversation():
return "*Conversation will appear here*", []
clear_btn.click(
clear_conversation,
inputs=[],
outputs=[conversation_display, conversation_state]
)
# Debug interface
with gr.Blocks() as debug_interface:
gr.Markdown("# Debug Interface")
audio_input_debug = gr.Audio(label="Test Audio Input")
debug_btn = gr.Button("Debug Audio Format")
output_json = gr.JSON(label="Debug Info")
debug_btn.click(
debug_audio,
inputs=audio_input_debug,
outputs=output_json
)
# Create a tabbed interface with all three tabs
demo = gr.TabbedInterface(
[admin_interface, test_interface, debug_interface],
["Admin Dashboard", "Test Interface", "Debug"]
)
return demo
# Create and launch the interface
interface = build_gradio_interface()
# Launch for Hugging Face Spaces
interface.launch()