Spaces:
Sleeping
Sleeping
File size: 4,366 Bytes
e2cd21d 965274a e2cd21d 965274a e2cd21d 965274a e2cd21d 965274a e2cd21d 965274a e2cd21d 965274a e2cd21d 965274a e2cd21d 965274a e2cd21d 965274a e2cd21d 965274a e2cd21d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
import sounddevice as sd
import soundfile as sf
import numpy as np
import base64
import io
import time
import threading
from config import INPUT_SAMPLE_RATE, OUTPUT_SAMPLE_RATE, BIT_DEPTH, CHANNELS, CHUNK_SIZE
class AudioStreamer:
def __init__(self):
self.input_rate = INPUT_SAMPLE_RATE
self.output_rate = OUTPUT_SAMPLE_RATE
self.channels = CHANNELS
self.chunk_size = CHUNK_SIZE # Samples per chunk
self.last_audio = None
self.recording = False
self.stream = None
def record_audio_stream(self):
"""Generator that yields audio chunks from microphone"""
self.recording = True
# Calculate chunk duration in seconds
chunk_duration = self.chunk_size / self.input_rate
print("Recording... Speak now. (Press Ctrl+C to stop)")
try:
while self.recording:
# Record a small chunk of audio
audio_chunk = sd.rec(
int(self.chunk_size),
samplerate=self.input_rate,
channels=self.channels,
dtype='int16'
)
sd.wait() # Wait until recording is finished
# Convert to bytes for streaming to AWS
audio_bytes = audio_chunk.tobytes()
yield audio_bytes
except KeyboardInterrupt:
print("Recording stopped.")
finally:
self.recording = False
def play_audio_stream(self, audio_data):
"""Play back audio data (supports base64 encoded or raw bytes)"""
if not audio_data:
print("No audio data to play")
return
# Store last audio for replay feature
self.last_audio = audio_data
# Handle base64 encoded audio
if isinstance(audio_data, str) and audio_data.strip().startswith("b'"):
try:
# Extract the base64 content
b64_content = audio_data.split("b'", 1)[1].rsplit("'", 1)[0]
audio_data = base64.b64decode(b64_content)
except Exception as e:
print(f"Error decoding base64 audio: {e}")
return
# Convert bytes to numpy array
try:
# Convert bytes to numpy array (assuming 16-bit PCM format)
audio_array = np.frombuffer(audio_data, dtype=np.int16)
# Play the audio
sd.play(audio_array, self.output_rate)
sd.wait() # Wait until audio is finished playing
except Exception as e:
print(f"Error playing audio: {e}")
def replay_last_audio(self):
"""Replay the last audio response"""
if self.last_audio:
self.play_audio_stream(self.last_audio)
else:
print("No previous audio to replay")
def encode_audio_for_nova(self, audio_data):
"""Convert audio to the format required by Nova"""
# Ensure we have raw bytes
if isinstance(audio_data, str):
if audio_data.startswith("b'"):
# Extract base64 content
b64_content = audio_data.split("b'", 1)[1].rsplit("'", 1)[0]
audio_data = base64.b64decode(b64_content)
# If it's already bytes, return as is
if isinstance(audio_data, bytes):
return audio_data
# If it's a numpy array, convert to bytes
if isinstance(audio_data, np.ndarray):
return audio_data.tobytes()
return audio_data
def decode_nova_audio(self, audio_data):
"""Convert Nova's audio response to playable format"""
# Nova returns base64-encoded LPCM at 24kHz
if isinstance(audio_data, str):
if audio_data.startswith("b'"):
# Extract base64 content
b64_content = audio_data.split("b'", 1)[1].rsplit("'", 1)[0]
return base64.b64decode(b64_content)
# If it's already in bytes, return as is
return audio_data
def cleanup(self):
"""Stop recording and clean up resources"""
self.recording = False |