File size: 16,590 Bytes
f7b85fd
 
 
 
 
 
 
 
 
 
 
 
ffaf1db
 
 
 
f7b85fd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ffaf1db
f7b85fd
 
ffaf1db
 
 
 
f7b85fd
 
 
ffaf1db
 
 
 
f7b85fd
ffaf1db
f7b85fd
ffaf1db
 
f7b85fd
ffaf1db
 
 
 
 
 
 
f7b85fd
 
ffaf1db
f7b85fd
ffaf1db
f7b85fd
 
ffaf1db
 
 
 
 
 
 
 
 
f7b85fd
ffaf1db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f7b85fd
ffaf1db
 
 
 
 
 
 
f7b85fd
 
 
 
ffaf1db
f7b85fd
ffaf1db
f7b85fd
 
 
 
ffaf1db
f7b85fd
ffaf1db
 
 
 
 
 
 
 
 
 
 
f7b85fd
ffaf1db
 
 
 
f7b85fd
 
ffaf1db
 
 
 
 
 
 
 
 
 
 
f7b85fd
 
ffaf1db
 
f7b85fd
ffaf1db
 
 
 
 
 
 
 
 
 
 
f7b85fd
ffaf1db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f7b85fd
ffaf1db
 
f7b85fd
ffaf1db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f7b85fd
 
ffaf1db
f7b85fd
 
 
 
 
ffaf1db
f7b85fd
 
 
ffaf1db
f7b85fd
 
 
ffaf1db
f7b85fd
 
 
ffaf1db
 
 
 
 
 
 
 
 
 
 
 
 
f7b85fd
ffaf1db
f7b85fd
 
 
ffaf1db
 
 
 
 
f7b85fd
 
 
 
ffaf1db
 
f7b85fd
ffaf1db
 
 
 
 
f7b85fd
 
 
 
 
 
288e434
 
 
ffaf1db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f7b85fd
ffaf1db
 
 
 
 
 
 
 
f7b85fd
ffaf1db
 
 
 
 
 
 
 
f7b85fd
ffaf1db
 
288e434
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ffaf1db
 
 
 
 
 
 
 
 
 
 
288e434
 
 
 
ffaf1db
f7b85fd
 
 
 
 
 
ffaf1db
f7b85fd
ffaf1db
 
 
 
 
 
 
 
 
 
 
 
 
 
f7b85fd
 
 
 
 
ffaf1db
 
 
f7b85fd
 
ffaf1db
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
"""
Audio utilities for Hugging Face Spaces integration.
This module provides audio streaming for Hugging Face Spaces environments.
"""

import os
import asyncio
import numpy as np
import random
import time
import threading
import base64
import json
import tempfile
import soundfile as sf
from concurrent.futures import ThreadPoolExecutor

# Try to import the Hugging Face-specific audio utilities
try:
    from transformers.pipelines.audio_utils import ffmpeg_microphone_live
    HF_AUDIO_AVAILABLE = True
except ImportError:
    HF_AUDIO_AVAILABLE = False
    print("Warning: transformers.pipelines.audio_utils not available, will use fallback audio simulation")

class HFAudioStreamer:
    """Audio streamer for Hugging Face Spaces that works with or without real audio devices"""
    
    def __init__(self, stream_manager):
        """Initialize the HF Audio Streamer"""
        self.stream_manager = stream_manager
        self.is_streaming = False
        self.use_ffmpeg = HF_AUDIO_AVAILABLE
        self.mic_stream = None
        self.executor = ThreadPoolExecutor(max_workers=2)
        self.loop = asyncio.get_event_loop()
        
        # Initialize tasks
        self.input_task = None
        self.output_task = None
        
        # Check if we're in HF Spaces
        self.is_hf_spaces = "SPACE_ID" in os.environ or ("SYSTEM" in os.environ and os.environ.get("SYSTEM") == "spaces")
        
        # Create output directory for audio files
        self.output_dir = os.path.join(tempfile.gettempdir(), "nova_output")
        os.makedirs(self.output_dir, exist_ok=True)
        
        print(f"HF Audio Streamer initialized. Using ffmpeg: {self.use_ffmpeg}, In HF Spaces: {self.is_hf_spaces}")
        print(f"Audio output will be saved to: {self.output_dir}")
    
    async def initialize_ffmpeg_mic(self):
        """Initialize the FFMPEG microphone if available"""
        if not self.use_ffmpeg:
            return False
            
        # If we're in HF Spaces, expect ALSA errors and handle them gracefully
        if self.is_hf_spaces:
            print("HF Spaces detected - ALSA errors are expected and will be handled")
            # Set environment variable to suppress ALSA errors
            os.environ['AUDIODEV'] = 'null'
            
        try:
            # Create in a thread to avoid blocking
            sampling_rate = 16000  # 16kHz as required by Nova Sonic
            chunk_length_s = 0.5   # Process 0.5 seconds at a time
            stream_chunk_s = 0.25  # Stream in 0.25 second chunks
            
            # In HF Spaces, we expect this to fail with ALSA errors
            # But we'll try anyway in case they add audio support later
            self.mic_stream = await self.loop.run_in_executor(
                self.executor,
                lambda: ffmpeg_microphone_live(
                    sampling_rate=sampling_rate,
                    chunk_length_s=chunk_length_s,
                    stream_chunk_s=stream_chunk_s
                )
            )
            print("Successfully initialized FFMPEG microphone")
            return True
        except Exception as e:
            # Check for ALSA errors which are expected in Hugging Face Spaces
            error_str = str(e)
            if "ALSA" in error_str and "PCM" in error_str:
                print("ALSA audio device errors detected - this is expected in cloud environments")
                print("Switching to simulated audio input (no real microphone will be used)")
            else:
                print(f"Error initializing FFMPEG microphone: {e}")
                
            # Always fall back to simulated audio in HF Spaces
            self.use_ffmpeg = False
            return False
            
    async def ffmpeg_audio_processor(self):
        """Process audio from ffmpeg microphone"""
        if not self.mic_stream:
            print("FFMPEG microphone not initialized")
            self.use_ffmpeg = False
            return
            
        print("Starting FFMPEG audio processing")
        try:
            # Track for logging
            chunks_processed = 0
            last_log_time = time.time()
            
            # Use the mic_stream as an iterator
            for audio_chunk in self.mic_stream:
                if not self.is_streaming:
                    break
                    
                # Process the chunk
                if isinstance(audio_chunk, np.ndarray):
                    # Convert float32 [-1.0, 1.0] to int16 for Nova Sonic
                    audio_int16 = (audio_chunk * 32767).astype(np.int16)
                    audio_bytes = audio_int16.tobytes()
                    
                    # Send to Bedrock
                    self.stream_manager.add_audio_chunk(audio_bytes)
                    
                    # Log periodically to show activity
                    chunks_processed += 1
                    current_time = time.time()
                    if current_time - last_log_time > 2.0:
                        print(f"FFMPEG audio: processed {chunks_processed} chunks")
                        chunks_processed = 0
                        last_log_time = current_time
                
                # Add a small sleep to prevent tight loops
                await asyncio.sleep(0.01)
                
        except Exception as e:
            print(f"Error in FFMPEG audio processor: {e}")
            # If the ffmpeg processor fails, fall back to simulated audio
            self.use_ffmpeg = False
            # Start simulated input if we're still streaming
            if self.is_streaming:
                print("Falling back to simulated audio input")
                asyncio.create_task(self.generate_simulated_input())
            
        finally:
            # Cleanup
            if hasattr(self.mic_stream, 'close'):
                try:
                    self.mic_stream.close()
                except:
                    pass
            self.mic_stream = None
            print("FFMPEG audio processor stopped")
    
    async def generate_simulated_input(self):
        """Generate simulated audio input when real microphone isn't available"""
        print("Starting simulated audio input")
        
        # Create a few temporary audio files with silence/noise
        audio_files = []
        for i in range(3):
            noise_level = 0.01 * (i + 1)  # Vary noise level
            duration = 1.0  # 1 second of audio
            samples = np.random.normal(0, noise_level, int(16000 * duration))
            
            # Create temporary file
            with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
                sf.write(f.name, samples, 16000)
                audio_files.append(f.name)
                
        try:
            # Send simulated speech in a pattern
            sequence_count = 0
            while self.is_streaming:
                # Choose a random file
                file_path = np.random.choice(audio_files)
                
                # Load the audio
                try:
                    audio_data, _ = sf.read(file_path)
                    audio_int16 = (audio_data * 32767).astype(np.int16)
                    audio_bytes = audio_int16.tobytes()
                    
                    # Send to Bedrock
                    self.stream_manager.add_audio_chunk(audio_bytes)
                except Exception as e:
                    print(f"Error processing simulated audio file: {e}")
                
                # Wait between chunks
                await asyncio.sleep(0.2)
                
                # Increment sequence counter
                sequence_count += 1
                
                # After a sequence of noise, send text to get a response
                if sequence_count >= 10:  # After 10 chunks (about 2 seconds)
                    sequence_count = 0
                    # Send text instead of more simulated audio
                    messages = [
                        "Hello there",
                        "How are you today?",
                        "Tell me something interesting",
                        "What's the weather like?",
                        "I'm learning to speak more fluently"
                    ]
                    message = np.random.choice(messages)
                    await self.send_text_message(message)
                    # Add transcription to the output queue for UI
                    await self.stream_manager.output_queue.put({
                        "event": {
                            "textOutput": {
                                "content": message,
                                "role": "USER"
                            }
                        }
                    })
                    # Wait for Nova to respond
                    await asyncio.sleep(3.0)
                    
        except Exception as e:
            print(f"Error in simulated audio generator: {e}")
            import traceback
            traceback.print_exc()
        finally:
            # Clean up temp files
            for file_path in audio_files:
                try:
                    os.unlink(file_path)
                except:
                    pass
                    
    async def play_output_audio(self):
        """Handle audio output from Nova Sonic"""
        while self.is_streaming:
            try:
                # Get audio data from the stream manager's queue
                audio_data = await asyncio.wait_for(
                    self.stream_manager.audio_output_queue.get(),
                    timeout=0.5
                )
                
                if audio_data and self.is_streaming:
                    # Store info in output queue for other parts of the app
                    self.stream_manager.output_queue.put_nowait({
                        "event": {
                            "audioOutput": {
                                "content": "Audio received from Nova"
                            }
                        }
                    })
                    
                    # In HF Spaces, we can't play audio directly, but we can save it
                    timestamp = int(time.time())
                    output_path = os.path.join(self.output_dir, f"nova_response_{timestamp}.wav")
                    
                    try:
                        # Convert from raw PCM to numpy for soundfile
                        audio_np = np.frombuffer(audio_data, dtype=np.int16)
                        sf.write(output_path, audio_np, 24000)  # Nova outputs at 24kHz
                        print(f"Saved Nova audio response to {output_path}")
                    except Exception as e:
                        print(f"Error saving audio response: {e}")
                    
            except asyncio.TimeoutError:
                # No data available within timeout
                continue
            except Exception as e:
                if self.is_streaming:
                    print(f"Error handling output audio: {e}")
                    import traceback
                    traceback.print_exc()
                await asyncio.sleep(0.1)
                
    async def start_streaming(self):
        """Start streaming audio"""
        if self.is_streaming:
            return
            
        print(f"Starting audio streaming in HF mode...")
        
        # For HF Spaces, we'll use our enhanced error handling
        if self.is_hf_spaces:
            # Set environment variables to help with audio issues
            os.environ['AUDIODEV'] = 'null'
            os.environ['SDL_AUDIODRIVER'] = 'dummy'
        
        # Send audio content start event
        await self.stream_manager.send_audio_content_start_event()
        
        self.is_streaming = True
        
        # Start with a welcome message from Nova
        await self.send_text_message("Hi there! I'm Nova, your conversation partner. How are you doing today?")
        
        # In HF Spaces, just go straight to simulated audio to avoid ALSA errors
        if self.is_hf_spaces:
            print("Running in Hugging Face Spaces - using simulated audio")
            self.use_ffmpeg = False
            self.input_task = asyncio.create_task(self.generate_simulated_input())
            self.output_task = asyncio.create_task(self.play_output_audio())
            
            # Let the user know what's happening
            print("Speech-to-speech functionality is active:")
            print("- Simulated audio is being sent to Nova Sonic")
            print("- Nova's responses will be saved as WAV files")
            print("- Conversation will be shown as text transcriptions")
            
            return
            
        # For non-HF environments, try the ffmpeg approach    
        tasks = []
        
        # Initialize FFMPEG mic if available and create audio input task
        if self.use_ffmpeg:
            ffmpeg_available = await self.initialize_ffmpeg_mic()
            if ffmpeg_available:
                self.input_task = asyncio.create_task(self.ffmpeg_audio_processor())
                tasks.append(self.input_task)
            else:
                self.use_ffmpeg = False
        
        # Fall back to simulated audio if FFMPEG isn't available
        if not self.use_ffmpeg:
            self.input_task = asyncio.create_task(self.generate_simulated_input())
            tasks.append(self.input_task)
            
        # Start output processing
        self.output_task = asyncio.create_task(self.play_output_audio())
        tasks.append(self.output_task)
        
        # Let the tasks run - we won't wait for input() here because that's handled in the UI
        # This will allow the tasks to continue running until stop_streaming is called
        
    async def send_text_message(self, text):
        """Send a text message to Nova to simulate user input"""
        try:
            # Create text content start event
            content_name = str(time.time())
            text_content_start = self.stream_manager.TEXT_CONTENT_START_EVENT % (
                self.stream_manager.prompt_name, 
                content_name, 
                "USER"
            )
            await self.stream_manager.send_raw_event(text_content_start)
            
            # Create text input event
            text_input = self.stream_manager.TEXT_INPUT_EVENT % (
                self.stream_manager.prompt_name, 
                content_name, 
                text
            )
            await self.stream_manager.send_raw_event(text_input)
            
            # Create content end event
            content_end = self.stream_manager.CONTENT_END_EVENT % (
                self.stream_manager.prompt_name, 
                content_name
            )
            await self.stream_manager.send_raw_event(content_end)
            
            print(f"Sent text message to Nova: {text}")
            
            # Also add message to output queue for UI
            await self.stream_manager.output_queue.put({
                "event": {
                    "textOutput": {
                        "content": text,
                        "role": "USER"
                    }
                }
            })
            
            return True
        except Exception as e:
            print(f"Error sending text message: {e}")
            return False
            
    async def stop_streaming(self):
        """Stop streaming audio"""
        if not self.is_streaming:
            return
            
        self.is_streaming = False
        print("Stopping HF audio streaming...")
        
        # Cancel all tasks
        tasks = []
        if self.input_task and not self.input_task.done():
            self.input_task.cancel()
            tasks.append(self.input_task)
        if self.output_task and not self.output_task.done():
            self.output_task.cancel()
            tasks.append(self.output_task)
            
        if tasks:
            await asyncio.gather(*tasks, return_exceptions=True)
            
        # Close ffmpeg mic if open
        if self.mic_stream and hasattr(self.mic_stream, 'close'):
            try:
                self.mic_stream.close()
            except:
                pass
            self.mic_stream = None
                
        # Shutdown executor
        self.executor.shutdown(wait=False)
        
        # Always close the stream manager
        await self.stream_manager.close()
        
        print("HF audio streaming stopped")