import sounddevice as sd import numpy as np import wave import os from datetime import datetime from typing import Optional, Tuple import threading import queue class AudioRecorder: def __init__(self, sample_rate: int = 44100, channels: int = 1, device_index: int = None): """ Initialize the audio recorder with fallback for environments without audio devices. Args: sample_rate (int): Sample rate in Hz (default: 44100) channels (int): Number of audio channels (1 for mono, 2 for stereo) device_index (int, optional): Index of the audio input device to use """ self.sample_rate = sample_rate self.channels = channels self.device_index = device_index self.recording = False self.audio_queue = queue.Queue() self.recording_thread = None self.recording_data = [] self.stream = None self.final_audio_data = None self.has_audio_device = True # Assume we have audio device by default # Test if we can actually use the audio device try: # Try to query devices to see if we have audio support devices = sd.query_devices() if not devices: print("Warning: No audio devices found") self.has_audio_device = False elif self.device_index is not None and self.device_index >= len(devices): print(f"Warning: Device index {self.device_index} is out of range") self.has_audio_device = False except Exception as e: print(f"Warning: Could not query audio devices: {e}") self.has_audio_device = False def start_recording(self): """Start recording audio from the selected input device.""" if self.recording: return False print("Starting recording...") if not self.has_audio_device: print("Warning: No audio device available, recording will not capture any audio") self.recording = True return True self.recording = True self.recording_data = [] self.audio_queue = queue.Queue() def record(): try: self.stream = sd.InputStream( samplerate=self.sample_rate, channels=self.channels, callback=self._callback, dtype='float32', device=self.device_index, blocksize=1024 ) with self.stream: while self.recording: try: data = self.audio_queue.get(timeout=0.5) if data is not None and len(data) > 0: self.recording_data.append(data) print(f"Collected {len(data)} frames, total: {sum(len(d) for d in self.recording_data)}") except queue.Empty: if not self.recording: break continue except Exception as e: print(f"Error in recording thread: {e}") self.recording = False break except Exception as e: print(f"Error in audio stream: {e}") self.recording = False self.recording_thread = threading.Thread(target=record, daemon=True) self.recording_thread.start() return True def stop_recording(self) -> Optional[np.ndarray]: """Stop recording and return the recorded audio data.""" if not hasattr(self, 'recording') or not self.recording: print("Not currently recording") return None print("Stopping recording...") self.recording = False if not self.has_audio_device: print("No audio device was available during recording") # Return a small silent audio buffer self.final_audio_data = np.zeros((44100, self.channels), dtype=np.float32) return self.final_audio_data # First, get any remaining data from the queue queue_data = [] max_attempts = 10 # Prevent infinite loops attempts = 0 # Keep trying to get data from the queue for a short time while attempts < max_attempts and (not self.audio_queue.empty() or not queue_data): try: while not self.audio_queue.empty(): try: data = self.audio_queue.get_nowait() if data is not None and len(data) > 0: queue_data.append(data) print(f"Got {len(data)} frames from queue") except queue.Empty: break except Exception as e: print(f"Error getting data from queue (attempt {attempts + 1}): {e}") if not queue_data: time.sleep(0.1) # Small delay to allow more data to arrive attempts += 1 # Add queued data to recording data if queue_data: print(f"Adding {len(queue_data)} chunks from queue") self.recording_data.extend(queue_data) # Stop the stream if it's still active if hasattr(self, 'stream') and self.stream is not None: try: print("Stopping audio stream...") self.stream.stop() self.stream.close() except Exception as e: print(f"Error stopping stream: {e}") finally: self.stream = None # Wait for recording thread to finish with a timeout if hasattr(self, 'recording_thread') and self.recording_thread and self.recording_thread.is_alive(): print("Waiting for recording thread to finish...") self.recording_thread.join(timeout=2.0) if self.recording_thread.is_alive(): print("Warning: Recording thread did not finish cleanly") # Process the recorded data if not self.recording_data: print("No audio data was recorded") self.final_audio_data = None return None try: print(f"Processing {len(self.recording_data)} audio chunks...") # Filter out None or invalid chunks valid_chunks = [chunk for chunk in self.recording_data if chunk is not None and hasattr(chunk, 'shape') and len(chunk.shape) > 0] if not valid_chunks: print("No valid audio chunks found in recording data") self.final_audio_data = None return None # Ensure all chunks have the same shape min_len = min(chunk.shape[0] for chunk in valid_chunks) print(f"Trimming all chunks to {min_len} samples") trimmed_data = [chunk[:min_len] for chunk in valid_chunks] # Concatenate the trimmed data print("Concatenating audio data...") concatenated = np.concatenate(trimmed_data, axis=0) print(f"Final audio data shape: {concatenated.shape}") # Store the final data for saving self.final_audio_data = concatenated return concatenated except Exception as e: print(f"Error processing audio data: {e}") if hasattr(self, 'recording_data') and self.recording_data: print(f"Number of chunks: {len(self.recording_data)}") for i, chunk in enumerate(self.recording_data): chunk_type = type(chunk) chunk_shape = chunk.shape if hasattr(chunk, 'shape') else 'No shape' print(f"Chunk {i}: type={chunk_type}, shape={chunk_shape}") self.final_audio_data = None return None finally: # Always clean up self.recording_data = [] self.audio_queue = queue.Queue() self.final_audio_data = None def save_recording(self, filename: str = None, format: str = 'wav') -> str: """ Save the recorded audio to a file. Args: filename (str, optional): Output filename. If None, generates a timestamped filename. format (str): Output format ('wav' or 'npy') Returns: str: Path to the saved file Raises: ValueError: If no recording data is available or if there's an error processing the data """ # Use the final_audio_data if available, otherwise try to process recording_data if hasattr(self, 'final_audio_data') and self.final_audio_data is not None: audio_data = self.final_audio_data print(f"Using pre-processed audio data with shape: {audio_data.shape}") elif not hasattr(self, 'recording_data') or not self.recording_data: error_msg = "No recording data available to save" print(error_msg) print(f"Recording state: {'Active' if hasattr(self, 'recording') and self.recording else 'Inactive'}") print(f"Number of audio chunks: {len(self.recording_data) if hasattr(self, 'recording_data') else 0}") if hasattr(self, 'stream') and self.stream is not None: print("Warning: Stream is still active") raise ValueError(error_msg) else: try: print(f"Processing {len(self.recording_data)} audio chunks...") # Filter out None or invalid chunks valid_chunks = [chunk for chunk in self.recording_data if chunk is not None and hasattr(chunk, 'shape') and len(chunk.shape) > 0] if not valid_chunks: raise ValueError("No valid audio chunks found in recording data") # Ensure all chunks have the same shape min_len = min(chunk.shape[0] for chunk in valid_chunks) trimmed_data = [chunk[:min_len] for chunk in valid_chunks] audio_data = np.concatenate(trimmed_data, axis=0) print(f"Audio data shape: {audio_data.shape}") # Cache the processed data self.final_audio_data = audio_data except Exception as e: error_msg = f"Error processing audio data: {e}" print(error_msg) if hasattr(self, 'recording_data') and self.recording_data: print(f"Number of chunks: {len(self.recording_data)}") for i, chunk in enumerate(self.recording_data): chunk_type = type(chunk) chunk_shape = chunk.shape if hasattr(chunk, 'shape') else 'No shape' print(f"Chunk {i}: type={chunk_type}, shape={chunk_shape}") raise ValueError(error_msg) from e if filename is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"recording_{timestamp}.{format}" os.makedirs("recordings", exist_ok=True) filepath = os.path.join("recordings", filename) if format.lower() == 'wav': # Convert float32 to int16 for WAV format audio_data = (audio_data * 32767).astype(np.int16) with wave.open(filepath, 'wb') as wf: wf.setnchannels(self.channels) wf.setsampwidth(2) # 16-bit wf.setframerate(self.sample_rate) wf.writeframes(audio_data.tobytes()) elif format.lower() == 'npy': np.save(filepath, audio_data) else: raise ValueError(f"Unsupported format: {format}") return filepath def _callback(self, indata, frames, time, status): """Callback function for audio input stream.""" if status: print(f"Audio input status: {status}") try: if self.recording: # Only add data if we're still recording self.audio_queue.put(indata.copy()) except Exception as e: print(f"Error in audio callback: {e}") def list_audio_devices() -> list: """ List all available audio input devices with fallback for environments without audio devices. Returns: list: A list of dictionaries containing device information """ try: # First, check if we can even query devices try: devices = sd.query_devices() input_devices = [] for i, device in enumerate(devices): try: if device.get('max_input_channels', 0) > 0: input_devices.append({ 'id': i, 'name': str(device.get('name', f'Device {i}')), # Ensure name is string 'input_channels': int(device.get('max_input_channels', 1)), 'default_samplerate': int(device.get('default_samplerate', 44100)) }) except (KeyError, TypeError, ValueError) as e: print(f"Warning: Error processing device {i}: {e}") continue if input_devices: return input_devices except Exception as e: print(f"Warning: Could not query audio devices: {e}") # If we get here, either no input devices found or there was an error print("No valid input devices found, using default configuration") return [{ 'id': sd.default.device[0] if hasattr(sd, 'default') and hasattr(sd.default, 'device') else 0, 'name': 'Default Device', 'input_channels': 1, 'default_samplerate': 44100 }] except Exception as e: print(f"Error in list_audio_devices: {e}") # Final fallback in case anything else goes wrong return [{ 'id': 0, 'name': 'Fallback Device', 'input_channels': 1, 'default_samplerate': 44100 }]