| |
| """ |
| Test script for OpenAI Realtime API connection and audio handling. |
| |
| This script tests: |
| 1. OpenAI API connection |
| 2. Event receiving |
| 3. Audio sending/receiving (if Reachy Mini is available) |
| 4. Audio conversion utilities |
| |
| Usage: |
| python test_openai_connection.py |
| """ |
|
|
| import os |
| import asyncio |
| import json |
| import base64 |
| import logging |
| from pathlib import Path |
| from dotenv import load_dotenv |
| import websockets |
|
|
| |
| env_paths = [ |
| Path(__file__).parent / ".env", |
| Path.cwd() / ".env", |
| ] |
| for env_path in env_paths: |
| if env_path.exists(): |
| load_dotenv(env_path) |
| print(f"β
Loaded .env from {env_path}") |
| break |
| else: |
| load_dotenv() |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| |
| OPENAI_MODEL = "gpt-realtime-2025-08-28" |
| OPENAI_VOICE = "alloy" |
|
|
|
|
| async def test_openai_connection(): |
| """Test basic OpenAI Realtime API connection""" |
| api_key = os.getenv("OPENAI_API_KEY") |
| if not api_key: |
| print("β OPENAI_API_KEY not set in environment!") |
| return False |
| |
| print(f"π API Key found: {api_key[:10]}...") |
| |
| url = f"wss://api.openai.com/v1/realtime?model={OPENAI_MODEL}" |
| headers = { |
| "Authorization": f"Bearer {api_key}", |
| "OpenAI-Beta": "realtime=v1" |
| } |
| |
| print(f"π Connecting to OpenAI Realtime API...") |
| print(f" URL: {url}") |
| |
| try: |
| ws = await websockets.connect( |
| url, |
| additional_headers=headers, |
| ping_interval=20, |
| ping_timeout=10 |
| ) |
| print("β
Connected to OpenAI!") |
| |
| |
| print("β³ Waiting for session.created event...") |
| response = await asyncio.wait_for(ws.recv(), timeout=10.0) |
| event = json.loads(response) |
| |
| if event.get("type") == "session.created": |
| print(f"β
Session created: {event.get('session', {}).get('id', 'unknown')}") |
| else: |
| print(f"β οΈ Unexpected event: {event.get('type')}") |
| print(f" Event: {json.dumps(event, indent=2)}") |
| |
| |
| print("βοΈ Configuring session...") |
| config = { |
| "type": "session.update", |
| "session": { |
| "modalities": ["audio", "text"], |
| "instructions": "You are a helpful assistant. Respond briefly.", |
| "voice": OPENAI_VOICE, |
| "input_audio_format": "pcm16", |
| "output_audio_format": "pcm16", |
| "input_audio_transcription": { |
| "model": "whisper-1" |
| }, |
| "turn_detection": { |
| "type": "semantic_vad", |
| "eagerness": "low", |
| "create_response": True, |
| "interrupt_response": True |
| }, |
| "temperature": 0.8, |
| "max_response_output_tokens": 500 |
| } |
| } |
| |
| await ws.send(json.dumps(config)) |
| print("β
Session configured") |
| |
| |
| print("π¬ Triggering test response...") |
| await ws.send(json.dumps({ |
| "type": "response.create", |
| "response": { |
| "instructions": "Say 'Hello! This is a test. Can you hear me?'" |
| } |
| })) |
| |
| |
| print("π Listening for events (10 seconds)...") |
| events_received = 0 |
| audio_chunks_received = 0 |
| transcription_received = False |
| |
| async def listen_for_events(): |
| nonlocal events_received, audio_chunks_received, transcription_received |
| async for message in ws: |
| event = json.loads(message) |
| event_type = event.get("type", "unknown") |
| events_received += 1 |
| |
| print(f"π¨ Event #{events_received}: {event_type}") |
| |
| if event_type == "response.audio.delta": |
| audio_b64 = event.get("delta", "") |
| if audio_b64: |
| audio_chunks_received += 1 |
| if audio_chunks_received % 10 == 0: |
| print(f" π Received {audio_chunks_received} audio chunks") |
| |
| elif event_type == "conversation.item.input_audio_transcription.completed": |
| transcript = event.get("transcript", "") |
| print(f" π Transcription: {transcript}") |
| transcription_received = True |
| |
| elif event_type == "response.done": |
| print(f" β
Response completed") |
| return True |
| |
| elif event_type == "error": |
| error = event.get("error", {}) |
| print(f" β Error: {error}") |
| |
| if events_received >= 20: |
| return True |
| |
| try: |
| await asyncio.wait_for(listen_for_events(), timeout=10.0) |
| except asyncio.TimeoutError: |
| print("β±οΈ Timeout waiting for events") |
| |
| |
| print("\nπ Test Summary:") |
| print(f" Events received: {events_received}") |
| print(f" Audio chunks: {audio_chunks_received}") |
| print(f" Transcription: {'β
' if transcription_received else 'β'}") |
| |
| |
| await ws.close() |
| print("β
Connection closed") |
| |
| return True |
| |
| except Exception as e: |
| print(f"β Error: {e}") |
| import traceback |
| traceback.print_exc() |
| return False |
|
|
|
|
| async def test_audio_transcription(): |
| """Test audio transcription by sending audio to OpenAI""" |
| print("\nπ§ͺ Testing audio transcription...") |
| |
| api_key = os.getenv("OPENAI_API_KEY") |
| if not api_key: |
| print(" β OPENAI_API_KEY not set!") |
| return False |
| |
| try: |
| from twenty_questions_game.audio_utils import prepare_audio_for_openai, OPENAI_SAMPLE_RATE |
| import numpy as np |
| |
| url = f"wss://api.openai.com/v1/realtime?model={OPENAI_MODEL}" |
| headers = { |
| "Authorization": f"Bearer {api_key}", |
| "OpenAI-Beta": "realtime=v1" |
| } |
| |
| print(" π Connecting to OpenAI...") |
| ws = await websockets.connect( |
| url, |
| additional_headers=headers, |
| ping_interval=20, |
| ping_timeout=10 |
| ) |
| |
| |
| response = await ws.recv() |
| event = json.loads(response) |
| if event.get("type") != "session.created": |
| print(f" β Unexpected event: {event.get('type')}") |
| await ws.close() |
| return False |
| |
| |
| config = { |
| "type": "session.update", |
| "session": { |
| "modalities": ["audio", "text"], |
| "instructions": "You are a helpful assistant. Transcribe what you hear.", |
| "voice": OPENAI_VOICE, |
| "input_audio_format": "pcm16", |
| "output_audio_format": "pcm16", |
| "input_audio_transcription": { |
| "model": "whisper-1" |
| }, |
| "turn_detection": { |
| "type": "semantic_vad", |
| "eagerness": "low", |
| "create_response": False, |
| "interrupt_response": False |
| }, |
| "temperature": 0.8 |
| } |
| } |
| await ws.send(json.dumps(config)) |
| |
| |
| |
| print(" π΅ Generating test audio...") |
| sample_rate = 16000 |
| duration = 0.5 |
| frequency = 440 |
| samples = int(sample_rate * duration) |
| t = np.linspace(0, duration, samples, False) |
| |
| test_audio = np.sin(2 * np.pi * frequency * t) * (1 + 0.5 * np.sin(2 * np.pi * 5 * t)) |
| test_audio = (test_audio * 0.3 * 32767).astype(np.int16) |
| |
| |
| audio_bytes = prepare_audio_for_openai(test_audio, sample_rate) |
| |
| |
| expected_samples_24k = int(len(test_audio) * 24000 / sample_rate) |
| expected_bytes = expected_samples_24k * 2 |
| print(f" π Audio: {len(test_audio)} samples @ {sample_rate}Hz -> {len(audio_bytes)} bytes @ 24kHz") |
| print(f" π Expected: {expected_samples_24k} samples = {expected_bytes} bytes") |
| |
| |
| |
| chunk_size_bytes = len(audio_bytes) // 10 |
| if chunk_size_bytes == 0: |
| chunk_size_bytes = len(audio_bytes) |
| |
| chunks = [] |
| for i in range(0, len(audio_bytes), chunk_size_bytes): |
| chunk_bytes = audio_bytes[i:i+chunk_size_bytes] |
| chunk_b64 = base64.b64encode(chunk_bytes).decode('ascii') |
| chunks.append(chunk_b64) |
| |
| print(f" π€ Sending {len(chunks)} audio chunks ({len(audio_bytes)} total bytes) to OpenAI...") |
| for i, chunk in enumerate(chunks): |
| await ws.send(json.dumps({ |
| "type": "input_audio_buffer.append", |
| "audio": chunk |
| })) |
| if i < len(chunks) - 1: |
| await asyncio.sleep(0.01) |
| |
| |
| await asyncio.sleep(0.1) |
| |
| |
| print(" β
Committing audio buffer...") |
| await ws.send(json.dumps({ |
| "type": "input_audio_buffer.commit" |
| })) |
| |
| print(" π Waiting for transcription (5 seconds)...") |
| transcription_received = False |
| transcript_text = "" |
| events_received = 0 |
| |
| async def listen_for_transcription(): |
| nonlocal transcription_received, transcript_text, events_received |
| async for message in ws: |
| event = json.loads(message) |
| events_received += 1 |
| event_type = event.get("type", "unknown") |
| |
| if event_type == "conversation.item.input_audio_transcription.completed": |
| transcript = event.get("transcript", "") |
| transcript_text = transcript |
| transcription_received = True |
| print(f" π Transcription received: '{transcript}'") |
| return True |
| elif event_type == "conversation.item.input_audio_transcription.failed": |
| error = event.get("error", {}) |
| print(f" β Transcription failed: {error}") |
| return False |
| elif event_type == "error": |
| error = event.get("error", {}) |
| print(f" β Error: {error}") |
| return False |
| |
| if events_received >= 50: |
| return False |
| |
| try: |
| result = await asyncio.wait_for(listen_for_transcription(), timeout=5.0) |
| except asyncio.TimeoutError: |
| print(" β±οΈ Timeout waiting for transcription") |
| result = False |
| |
| await ws.close() |
| |
| if transcription_received: |
| print(f" β
Transcription test passed: '{transcript_text}'") |
| return True |
| else: |
| print(f" β No transcription received (got {events_received} events)") |
| return False |
| |
| except Exception as e: |
| print(f" β Error: {e}") |
| import traceback |
| traceback.print_exc() |
| return False |
|
|
|
|
| async def test_audio_conversion(): |
| """Test audio conversion utilities""" |
| print("\nπ§ͺ Testing audio conversion utilities...") |
| |
| try: |
| from twenty_questions_game.audio_utils import ( |
| prepare_audio_for_openai, |
| decode_audio_from_openai, |
| prepare_audio_for_reachy, |
| OPENAI_SAMPLE_RATE |
| ) |
| import numpy as np |
| |
| |
| sample_rate = 16000 |
| duration = 0.1 |
| frequency = 440 |
| samples = int(sample_rate * duration) |
| t = np.linspace(0, duration, samples, False) |
| test_audio = np.sin(2 * np.pi * frequency * t) |
| test_audio = (test_audio * 32767).astype(np.int16) |
| |
| print(f" Created test audio: {len(test_audio)} samples at {sample_rate}Hz") |
| |
| |
| audio_bytes = prepare_audio_for_openai(test_audio, sample_rate) |
| print(f" β
Reachy->OpenAI: {len(audio_bytes)} bytes") |
| |
| |
| audio_b64 = base64.b64encode(audio_bytes).decode('ascii') |
| audio_decoded = decode_audio_from_openai(audio_b64) |
| audio_for_reachy = prepare_audio_for_reachy(audio_decoded, 48000) |
| print(f" β
OpenAI->Reachy: {len(audio_for_reachy)} samples at 48kHz") |
| |
| return True |
| |
| except Exception as e: |
| print(f" β Error: {e}") |
| import traceback |
| traceback.print_exc() |
| return False |
|
|
|
|
| async def test_with_reachy(): |
| """Test with actual Reachy Mini (if available)""" |
| print("\nπ€ Testing with Reachy Mini...") |
| |
| try: |
| from reachy_mini import ReachyMini |
| |
| print(" Connecting to Reachy Mini...") |
| reachy = ReachyMini() |
| print(" β
Connected to Reachy Mini") |
| |
| |
| print(" Testing audio capture...") |
| reachy.media.start_recording() |
| |
| samples_received = 0 |
| for i in range(50): |
| audio = reachy.media.get_audio_sample() |
| if audio is not None and len(audio) > 0: |
| samples_received += 1 |
| |
| reachy.media.stop_recording() |
| |
| print(f" β
Audio capture: {samples_received}/50 samples received") |
| |
| |
| print(" Testing audio playback...") |
| import numpy as np |
| |
| test_audio = np.zeros(4800, dtype=np.float32) |
| reachy.media.start_playing() |
| reachy.media.push_audio_sample(test_audio) |
| await asyncio.sleep(0.2) |
| reachy.media.stop_playing() |
| print(" β
Audio playback test completed") |
| |
| return True |
| |
| except ImportError: |
| print(" β οΈ Reachy Mini not available (this is OK for testing)") |
| return None |
| except Exception as e: |
| print(f" β Error: {e}") |
| import traceback |
| traceback.print_exc() |
| return False |
|
|
|
|
| async def main(): |
| """Run all tests""" |
| print("=" * 60) |
| print("π§ͺ OpenAI Realtime API Test Script") |
| print("=" * 60) |
| |
| results = {} |
| |
| |
| print("\n" + "=" * 60) |
| print("TEST 1: OpenAI Connection") |
| print("=" * 60) |
| results['openai'] = await test_openai_connection() |
| |
| |
| print("\n" + "=" * 60) |
| print("TEST 2: Audio Transcription") |
| print("=" * 60) |
| results['transcription'] = await test_audio_transcription() |
| |
| |
| print("\n" + "=" * 60) |
| print("TEST 3: Audio Conversion Utilities") |
| print("=" * 60) |
| results['audio_conversion'] = await test_audio_conversion() |
| |
| |
| print("\n" + "=" * 60) |
| print("TEST 4: Reachy Mini Integration (Optional)") |
| print("=" * 60) |
| results['reachy'] = await test_with_reachy() |
| |
| |
| print("\n" + "=" * 60) |
| print("π FINAL SUMMARY") |
| print("=" * 60) |
| for test_name, result in results.items(): |
| if result is None: |
| status = "β οΈ SKIPPED" |
| elif result: |
| status = "β
PASSED" |
| else: |
| status = "β FAILED" |
| print(f" {test_name:20s}: {status}") |
| |
| print("\n" + "=" * 60) |
|
|
|
|
| if __name__ == "__main__": |
| asyncio.run(main()) |
|
|
|
|