""" OpenAI Voice Streaming Integration for NAVADA Startup Viability Agent Provides real-time voice conversation capabilities with specialized startup coaching personas """ import asyncio import json import logging import os from typing import Dict, Any, Optional, List import openai from openai import AsyncOpenAI import websockets import base64 import io import wave logger = logging.getLogger(__name__) class VoiceStreamingManager: """Manages OpenAI real-time voice streaming with startup coaching personas""" def __init__(self): self.client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) self.voice_prompt_id = os.getenv("OPENAI_VOICE_PROMPT_ID", "pmpt_68b4975074d0819087217d0b0717bb1b0c32a4ef223cc971") self.voice_model = os.getenv("VOICE_MODEL", "gpt-4o-realtime-preview-2024-10-01") self.output_format = os.getenv("VOICE_OUTPUT_FORMAT", "pcm16") self.sample_rate = int(os.getenv("VOICE_SAMPLE_RATE", "24000")) self.current_persona = "general_advisor" self.conversation_context = [] async def initialize_voice_session(self, persona: str = "general_advisor") -> Dict[str, Any]: """Initialize a voice streaming session with specified persona""" try: self.current_persona = persona persona_instructions = self._get_persona_instructions(persona) session_config = { "model": self.voice_model, "voice": "alloy", "instructions": persona_instructions, "input_audio_format": "pcm16", "output_audio_format": self.output_format, "input_audio_transcription": { "model": "whisper-1" }, "turn_detection": { "type": "server_vad", "threshold": 0.5, "prefix_padding_ms": 300, "silence_duration_ms": 500 }, "tools": [ { "type": "function", "name": "analyze_startup_idea", "description": "Analyze a startup idea for viability and provide detailed feedback", "parameters": { "type": "object", "properties": { "idea": {"type": "string", "description": "The startup idea to analyze"}, "industry": {"type": "string", "description": "The industry sector"}, "target_market": {"type": "string", "description": "Target market description"} }, "required": ["idea"] } }, { "type": "function", "name": "get_market_data", "description": "Retrieve real-time market data for analysis", "parameters": { "type": "object", "properties": { "query": {"type": "string", "description": "Market data query"} }, "required": ["query"] } } ] } return {"status": "initialized", "config": session_config} except Exception as e: logger.error(f"Failed to initialize voice session: {e}") return {"status": "error", "message": str(e)} def _get_persona_instructions(self, persona: str) -> str: """Get specialized instructions for different startup coach personas""" personas = { "general_advisor": """ You are NAVADA, an expert startup viability advisor with 20 years of experience in venture capital and startup ecosystems. You provide comprehensive, actionable advice on startup ideas, market validation, business models, and growth strategies. Your expertise includes: - Market analysis and competitive intelligence - Business model validation and optimization - Financial modeling and investment readiness - Product-market fit assessment - Go-to-market strategy development Communicate in a warm, encouraging tone while being direct about potential challenges. Always provide specific, actionable next steps. """, "technical_advisor": """ You are NAVADA's Technical Advisor, a seasoned CTO and technology strategist with deep expertise in: - Technology stack selection and architecture - MVP development and product roadmaps - Technical feasibility assessment - Scalability planning and infrastructure - AI/ML integration strategies - Cybersecurity and compliance Focus on technical viability, development timelines, and technology risks. Provide specific technical recommendations and implementation strategies. """, "market_analyst": """ You are NAVADA's Market Intelligence Specialist with expertise in: - Market size analysis and TAM/SAM/SOM calculations - Competitive landscape mapping - Customer segmentation and persona development - Industry trend analysis and forecasting - Regulatory environment assessment - International market expansion strategies Provide data-driven market insights with specific metrics and actionable market entry strategies. """, "financial_advisor": """ You are NAVADA's Financial Strategist with deep expertise in: - Financial modeling and projections - Funding strategy and investor readiness - Valuation methodologies - Revenue model optimization - Unit economics and profitability analysis - Risk assessment and mitigation Focus on financial viability, funding requirements, and investor appeal. Provide specific financial metrics and funding recommendations. """, "pitch_coach": """ You are NAVADA's Pitch Coach, specializing in: - Investor pitch development and refinement - Storytelling and narrative structure - Presentation skills and delivery coaching - Q&A preparation and objection handling - Demo preparation and product showcasing - Investor psychology and decision-making Help entrepreneurs craft compelling pitches and prepare for investor meetings. Provide specific feedback on pitch structure, messaging, and delivery. """ } return personas.get(persona, personas["general_advisor"]) async def process_audio_stream(self, audio_data: bytes) -> Dict[str, Any]: """Process incoming audio stream and return response""" try: # Convert audio to base64 for API audio_b64 = base64.b64encode(audio_data).decode() # Create conversation event event = { "type": "conversation.item.create", "item": { "type": "message", "role": "user", "content": [ { "type": "input_audio", "audio": audio_b64 } ] } } # Process with OpenAI Realtime API response = await self._send_realtime_event(event) return response except Exception as e: logger.error(f"Error processing audio stream: {e}") return {"status": "error", "message": str(e)} async def _send_realtime_event(self, event: Dict[str, Any]) -> Dict[str, Any]: """Send event to OpenAI Realtime API""" try: # This is a placeholder for the actual WebSocket connection to OpenAI Realtime API # In production, you would establish a WebSocket connection to wss://api.openai.com/v1/realtime # For now, return a mock response structure return { "type": "conversation.item.created", "item": { "id": "msg_001", "type": "message", "role": "assistant", "content": [ { "type": "audio", "audio": "", # Base64 encoded audio response "transcript": "I understand your startup idea. Let me analyze the market viability..." } ] } } except Exception as e: logger.error(f"Error sending realtime event: {e}") raise async def switch_persona(self, new_persona: str) -> Dict[str, Any]: """Switch to a different coaching persona during conversation""" try: if new_persona not in ["general_advisor", "technical_advisor", "market_analyst", "financial_advisor", "pitch_coach"]: return {"status": "error", "message": "Invalid persona specified"} self.current_persona = new_persona instructions = self._get_persona_instructions(new_persona) # Send persona switch event event = { "type": "session.update", "session": { "instructions": instructions } } await self._send_realtime_event(event) return { "status": "success", "message": f"Switched to {new_persona.replace('_', ' ').title()}", "persona": new_persona } except Exception as e: logger.error(f"Error switching persona: {e}") return {"status": "error", "message": str(e)} async def get_conversation_summary(self) -> Dict[str, Any]: """Get AI-generated summary of the current conversation""" try: if not self.conversation_context: return {"status": "empty", "summary": "No conversation yet"} # Use GPT-4 to summarize the conversation response = await self.client.chat.completions.create( model="gpt-4", messages=[ { "role": "system", "content": "Summarize this startup coaching conversation, highlighting key insights, recommendations, and next steps." }, { "role": "user", "content": f"Conversation context: {json.dumps(self.conversation_context[-10:])}" # Last 10 exchanges } ], max_tokens=500 ) summary = response.choices[0].message.content return { "status": "success", "summary": summary, "persona": self.current_persona, "total_exchanges": len(self.conversation_context) } except Exception as e: logger.error(f"Error generating conversation summary: {e}") return {"status": "error", "message": str(e)} class VoiceUIManager: """Manages voice interface components for Chainlit integration""" def __init__(self): self.voice_manager = VoiceStreamingManager() self.is_recording = False self.current_session = None async def create_voice_interface(self) -> str: """Create HTML/JS interface for voice interaction""" return """
🔴 Not Recording
Current: General Advisor
""" async def handle_voice_message(self, message_type: str, data: Dict[str, Any]) -> Dict[str, Any]: """Handle different types of voice-related messages""" try: if message_type == "voice_audio": # Process audio data audio_b64 = data.get("audio", "") audio_data = base64.b64decode(audio_b64) response = await self.voice_manager.process_audio_stream(audio_data) return response elif message_type == "switch_persona": # Switch coaching persona persona = data.get("persona", "general_advisor") response = await self.voice_manager.switch_persona(persona) return response elif message_type == "get_summary": # Get conversation summary response = await self.voice_manager.get_conversation_summary() return response else: return {"status": "error", "message": "Unknown message type"} except Exception as e: logger.error(f"Error handling voice message: {e}") return {"status": "error", "message": str(e)}