Start-Up_Viability_Agent / voice_streaming.py
Navada25's picture
πŸš€ Update voice_streaming.py - Voice Streaming & AI Coaching Features
cf39b2a verified
"""
OpenAI Voice Streaming Integration for NAVADA Startup Viability Agent
Provides real-time voice conversation capabilities with specialized startup coaching personas
"""
import asyncio
import json
import logging
import os
from typing import Dict, Any, Optional, List
import openai
from openai import AsyncOpenAI
import websockets
import base64
import io
import wave
logger = logging.getLogger(__name__)
class VoiceStreamingManager:
"""Manages OpenAI real-time voice streaming with startup coaching personas"""
def __init__(self):
self.client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
self.voice_prompt_id = os.getenv("OPENAI_VOICE_PROMPT_ID", "pmpt_68b4975074d0819087217d0b0717bb1b0c32a4ef223cc971")
self.voice_model = os.getenv("VOICE_MODEL", "gpt-4o-realtime-preview-2024-10-01")
self.output_format = os.getenv("VOICE_OUTPUT_FORMAT", "pcm16")
self.sample_rate = int(os.getenv("VOICE_SAMPLE_RATE", "24000"))
self.current_persona = "general_advisor"
self.conversation_context = []
async def initialize_voice_session(self, persona: str = "general_advisor") -> Dict[str, Any]:
"""Initialize a voice streaming session with specified persona"""
try:
self.current_persona = persona
persona_instructions = self._get_persona_instructions(persona)
session_config = {
"model": self.voice_model,
"voice": "alloy",
"instructions": persona_instructions,
"input_audio_format": "pcm16",
"output_audio_format": self.output_format,
"input_audio_transcription": {
"model": "whisper-1"
},
"turn_detection": {
"type": "server_vad",
"threshold": 0.5,
"prefix_padding_ms": 300,
"silence_duration_ms": 500
},
"tools": [
{
"type": "function",
"name": "analyze_startup_idea",
"description": "Analyze a startup idea for viability and provide detailed feedback",
"parameters": {
"type": "object",
"properties": {
"idea": {"type": "string", "description": "The startup idea to analyze"},
"industry": {"type": "string", "description": "The industry sector"},
"target_market": {"type": "string", "description": "Target market description"}
},
"required": ["idea"]
}
},
{
"type": "function",
"name": "get_market_data",
"description": "Retrieve real-time market data for analysis",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Market data query"}
},
"required": ["query"]
}
}
]
}
return {"status": "initialized", "config": session_config}
except Exception as e:
logger.error(f"Failed to initialize voice session: {e}")
return {"status": "error", "message": str(e)}
def _get_persona_instructions(self, persona: str) -> str:
"""Get specialized instructions for different startup coach personas"""
personas = {
"general_advisor": """
You are NAVADA, an expert startup viability advisor with 20 years of experience in venture capital and startup ecosystems.
You provide comprehensive, actionable advice on startup ideas, market validation, business models, and growth strategies.
Your expertise includes:
- Market analysis and competitive intelligence
- Business model validation and optimization
- Financial modeling and investment readiness
- Product-market fit assessment
- Go-to-market strategy development
Communicate in a warm, encouraging tone while being direct about potential challenges.
Always provide specific, actionable next steps.
""",
"technical_advisor": """
You are NAVADA's Technical Advisor, a seasoned CTO and technology strategist with deep expertise in:
- Technology stack selection and architecture
- MVP development and product roadmaps
- Technical feasibility assessment
- Scalability planning and infrastructure
- AI/ML integration strategies
- Cybersecurity and compliance
Focus on technical viability, development timelines, and technology risks.
Provide specific technical recommendations and implementation strategies.
""",
"market_analyst": """
You are NAVADA's Market Intelligence Specialist with expertise in:
- Market size analysis and TAM/SAM/SOM calculations
- Competitive landscape mapping
- Customer segmentation and persona development
- Industry trend analysis and forecasting
- Regulatory environment assessment
- International market expansion strategies
Provide data-driven market insights with specific metrics and actionable market entry strategies.
""",
"financial_advisor": """
You are NAVADA's Financial Strategist with deep expertise in:
- Financial modeling and projections
- Funding strategy and investor readiness
- Valuation methodologies
- Revenue model optimization
- Unit economics and profitability analysis
- Risk assessment and mitigation
Focus on financial viability, funding requirements, and investor appeal.
Provide specific financial metrics and funding recommendations.
""",
"pitch_coach": """
You are NAVADA's Pitch Coach, specializing in:
- Investor pitch development and refinement
- Storytelling and narrative structure
- Presentation skills and delivery coaching
- Q&A preparation and objection handling
- Demo preparation and product showcasing
- Investor psychology and decision-making
Help entrepreneurs craft compelling pitches and prepare for investor meetings.
Provide specific feedback on pitch structure, messaging, and delivery.
"""
}
return personas.get(persona, personas["general_advisor"])
async def process_audio_stream(self, audio_data: bytes) -> Dict[str, Any]:
"""Process incoming audio stream and return response"""
try:
# Convert audio to base64 for API
audio_b64 = base64.b64encode(audio_data).decode()
# Create conversation event
event = {
"type": "conversation.item.create",
"item": {
"type": "message",
"role": "user",
"content": [
{
"type": "input_audio",
"audio": audio_b64
}
]
}
}
# Process with OpenAI Realtime API
response = await self._send_realtime_event(event)
return response
except Exception as e:
logger.error(f"Error processing audio stream: {e}")
return {"status": "error", "message": str(e)}
async def _send_realtime_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
"""Send event to OpenAI Realtime API"""
try:
# This is a placeholder for the actual WebSocket connection to OpenAI Realtime API
# In production, you would establish a WebSocket connection to wss://api.openai.com/v1/realtime
# For now, return a mock response structure
return {
"type": "conversation.item.created",
"item": {
"id": "msg_001",
"type": "message",
"role": "assistant",
"content": [
{
"type": "audio",
"audio": "", # Base64 encoded audio response
"transcript": "I understand your startup idea. Let me analyze the market viability..."
}
]
}
}
except Exception as e:
logger.error(f"Error sending realtime event: {e}")
raise
async def switch_persona(self, new_persona: str) -> Dict[str, Any]:
"""Switch to a different coaching persona during conversation"""
try:
if new_persona not in ["general_advisor", "technical_advisor", "market_analyst", "financial_advisor", "pitch_coach"]:
return {"status": "error", "message": "Invalid persona specified"}
self.current_persona = new_persona
instructions = self._get_persona_instructions(new_persona)
# Send persona switch event
event = {
"type": "session.update",
"session": {
"instructions": instructions
}
}
await self._send_realtime_event(event)
return {
"status": "success",
"message": f"Switched to {new_persona.replace('_', ' ').title()}",
"persona": new_persona
}
except Exception as e:
logger.error(f"Error switching persona: {e}")
return {"status": "error", "message": str(e)}
async def get_conversation_summary(self) -> Dict[str, Any]:
"""Get AI-generated summary of the current conversation"""
try:
if not self.conversation_context:
return {"status": "empty", "summary": "No conversation yet"}
# Use GPT-4 to summarize the conversation
response = await self.client.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "system",
"content": "Summarize this startup coaching conversation, highlighting key insights, recommendations, and next steps."
},
{
"role": "user",
"content": f"Conversation context: {json.dumps(self.conversation_context[-10:])}" # Last 10 exchanges
}
],
max_tokens=500
)
summary = response.choices[0].message.content
return {
"status": "success",
"summary": summary,
"persona": self.current_persona,
"total_exchanges": len(self.conversation_context)
}
except Exception as e:
logger.error(f"Error generating conversation summary: {e}")
return {"status": "error", "message": str(e)}
class VoiceUIManager:
"""Manages voice interface components for Chainlit integration"""
def __init__(self):
self.voice_manager = VoiceStreamingManager()
self.is_recording = False
self.current_session = None
async def create_voice_interface(self) -> str:
"""Create HTML/JS interface for voice interaction"""
return """
<div id="voice-interface" class="voice-container">
<div class="voice-controls">
<button id="start-voice" class="voice-btn start">🎀 Start Voice Chat</button>
<button id="stop-voice" class="voice-btn stop" disabled>⏹️ Stop</button>
<select id="persona-select" class="persona-selector">
<option value="general_advisor">General Advisor</option>
<option value="technical_advisor">Technical Advisor</option>
<option value="market_analyst">Market Analyst</option>
<option value="financial_advisor">Financial Advisor</option>
<option value="pitch_coach">Pitch Coach</option>
</select>
</div>
<div class="voice-status">
<div id="recording-indicator" class="recording-off">πŸ”΄ Not Recording</div>
<div id="current-persona">Current: General Advisor</div>
</div>
<div class="audio-visualization">
<canvas id="audio-canvas" width="400" height="100"></canvas>
</div>
<div class="conversation-summary">
<button id="get-summary" class="summary-btn">πŸ“‹ Get Conversation Summary</button>
<div id="summary-display"></div>
</div>
</div>
<style>
.voice-container {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
border-radius: 15px;
padding: 20px;
margin: 20px 0;
color: white;
font-family: 'Inter', sans-serif;
}
.voice-controls {
display: flex;
gap: 15px;
align-items: center;
margin-bottom: 15px;
}
.voice-btn {
padding: 12px 24px;
border: none;
border-radius: 25px;
font-weight: 600;
cursor: pointer;
transition: all 0.3s ease;
font-size: 14px;
}
.voice-btn.start {
background: #4CAF50;
color: white;
}
.voice-btn.stop {
background: #f44336;
color: white;
}
.voice-btn:disabled {
opacity: 0.5;
cursor: not-allowed;
}
.persona-selector {
padding: 8px 15px;
border-radius: 20px;
border: none;
background: rgba(255, 255, 255, 0.2);
color: white;
font-weight: 500;
}
.voice-status {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 15px;
font-size: 14px;
}
.recording-off {
color: #ffcdd2;
}
.recording-on {
color: #c8e6c9;
animation: pulse 1s infinite;
}
@keyframes pulse {
0% { opacity: 1; }
50% { opacity: 0.5; }
100% { opacity: 1; }
}
.audio-visualization {
margin: 15px 0;
text-align: center;
}
#audio-canvas {
border-radius: 10px;
background: rgba(255, 255, 255, 0.1);
}
.conversation-summary {
margin-top: 20px;
}
.summary-btn {
background: rgba(255, 255, 255, 0.2);
color: white;
border: none;
padding: 10px 20px;
border-radius: 20px;
cursor: pointer;
margin-bottom: 10px;
}
#summary-display {
background: rgba(255, 255, 255, 0.1);
padding: 15px;
border-radius: 10px;
margin-top: 10px;
line-height: 1.6;
}
</style>
<script>
let mediaRecorder;
let audioChunks = [];
let audioContext;
let analyser;
let dataArray;
let canvas;
let canvasCtx;
document.addEventListener('DOMContentLoaded', function() {
canvas = document.getElementById('audio-canvas');
canvasCtx = canvas.getContext('2d');
document.getElementById('start-voice').addEventListener('click', startVoiceChat);
document.getElementById('stop-voice').addEventListener('click', stopVoiceChat);
document.getElementById('persona-select').addEventListener('change', switchPersona);
document.getElementById('get-summary').addEventListener('click', getSummary);
});
async function startVoiceChat() {
try {
const stream = await navigator.mediaDevices.getUserMedia({ audio: true });
// Setup audio context for visualization
audioContext = new AudioContext();
analyser = audioContext.createAnalyser();
const source = audioContext.createMediaStreamSource(stream);
source.connect(analyser);
analyser.fftSize = 256;
const bufferLength = analyser.frequencyBinCount;
dataArray = new Uint8Array(bufferLength);
// Start visualization
drawAudioVisualization();
// Setup media recorder
mediaRecorder = new MediaRecorder(stream);
mediaRecorder.ondataavailable = (event) => {
audioChunks.push(event.data);
};
mediaRecorder.onstop = async () => {
const audioBlob = new Blob(audioChunks, { type: 'audio/wav' });
audioChunks = [];
await sendAudioToServer(audioBlob);
};
mediaRecorder.start();
// Update UI
document.getElementById('start-voice').disabled = true;
document.getElementById('stop-voice').disabled = false;
document.getElementById('recording-indicator').textContent = '🟒 Recording...';
document.getElementById('recording-indicator').className = 'recording-on';
} catch (error) {
console.error('Error starting voice chat:', error);
alert('Error accessing microphone. Please check permissions.');
}
}
function stopVoiceChat() {
if (mediaRecorder && mediaRecorder.state === 'recording') {
mediaRecorder.stop();
}
if (audioContext) {
audioContext.close();
}
// Update UI
document.getElementById('start-voice').disabled = false;
document.getElementById('stop-voice').disabled = true;
document.getElementById('recording-indicator').textContent = 'πŸ”΄ Not Recording';
document.getElementById('recording-indicator').className = 'recording-off';
}
function drawAudioVisualization() {
if (!analyser) return;
requestAnimationFrame(drawAudioVisualization);
analyser.getByteFrequencyData(dataArray);
canvasCtx.fillStyle = 'rgba(255, 255, 255, 0.1)';
canvasCtx.fillRect(0, 0, canvas.width, canvas.height);
const barWidth = (canvas.width / dataArray.length) * 2.5;
let barHeight;
let x = 0;
for (let i = 0; i < dataArray.length; i++) {
barHeight = dataArray[i] / 2;
const r = barHeight + 25 * (i / dataArray.length);
const g = 250 * (i / dataArray.length);
const b = 50;
canvasCtx.fillStyle = `rgb(${r}, ${g}, ${b})`;
canvasCtx.fillRect(x, canvas.height - barHeight, barWidth, barHeight);
x += barWidth + 1;
}
}
async function sendAudioToServer(audioBlob) {
// Convert to base64 and send to Python backend
const reader = new FileReader();
reader.onload = function() {
const base64Audio = reader.result.split(',')[1];
// Send via Chainlit
window.chainlitAPI?.sendMessage({
type: 'voice_audio',
audio: base64Audio,
persona: document.getElementById('persona-select').value
});
};
reader.readAsDataURL(audioBlob);
}
async function switchPersona() {
const newPersona = document.getElementById('persona-select').value;
const personaDisplay = document.getElementById('current-persona');
// Send persona switch to backend
window.chainlitAPI?.sendMessage({
type: 'switch_persona',
persona: newPersona
});
// Update display
personaDisplay.textContent = `Current: ${newPersona.replace('_', ' ').replace(/\b\w/g, l => l.toUpperCase())}`;
}
async function getSummary() {
// Request conversation summary
window.chainlitAPI?.sendMessage({
type: 'get_summary'
});
}
</script>
"""
async def handle_voice_message(self, message_type: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Handle different types of voice-related messages"""
try:
if message_type == "voice_audio":
# Process audio data
audio_b64 = data.get("audio", "")
audio_data = base64.b64decode(audio_b64)
response = await self.voice_manager.process_audio_stream(audio_data)
return response
elif message_type == "switch_persona":
# Switch coaching persona
persona = data.get("persona", "general_advisor")
response = await self.voice_manager.switch_persona(persona)
return response
elif message_type == "get_summary":
# Get conversation summary
response = await self.voice_manager.get_conversation_summary()
return response
else:
return {"status": "error", "message": "Unknown message type"}
except Exception as e:
logger.error(f"Error handling voice message: {e}")
return {"status": "error", "message": str(e)}