Voice-Scheduling-Agent / utils /audio_utils.py
ADEG1KOR
Restructured project
c3986c1
"""
Audio processing utilities for voice AI agent.
Handles audio format conversion, validation, and preprocessing.
"""
import os
import logging
import wave
import struct
from typing import Optional, Tuple
import numpy as np
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def validate_audio_file(file_path: str) -> bool:
"""
Validate if file is a valid audio file.
Args:
file_path: Path to audio file
Returns:
True if valid, False otherwise
"""
if not os.path.exists(file_path):
logger.error(f"File not found: {file_path}")
return False
valid_extensions = ['.wav', '.mp3', '.flac', '.ogg', '.m4a', '.webm']
_, ext = os.path.splitext(file_path)
if ext.lower() not in valid_extensions:
logger.error(f"Unsupported audio format: {ext}")
return False
return True
def convert_to_wav(
input_path: str,
output_path: Optional[str] = None,
sample_rate: int = 16000,
channels: int = 1
) -> str:
"""
Convert audio file to WAV format using ffmpeg.
Args:
input_path: Input audio file path
output_path: Output WAV file path (auto-generated if None)
sample_rate: Target sample rate in Hz
channels: Number of audio channels (1 = mono, 2 = stereo)
Returns:
Path to converted WAV file
"""
import subprocess
if output_path is None:
base, _ = os.path.splitext(input_path)
output_path = f"{base}_converted.wav"
logger.info(f"Converting {input_path} to WAV format")
try:
# Use ffmpeg for conversion
cmd = [
'ffmpeg',
'-i', input_path,
'-ar', str(sample_rate),
'-ac', str(channels),
'-y', # Overwrite output file
output_path
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True
)
logger.info(f"Conversion successful: {output_path}")
return output_path
except subprocess.CalledProcessError as e:
logger.error(f"Conversion failed: {e.stderr}")
raise
except FileNotFoundError:
logger.error("ffmpeg not found. Please install ffmpeg.")
raise
def get_audio_duration(file_path: str) -> float:
"""
Get duration of audio file in seconds.
Args:
file_path: Path to audio file
Returns:
Duration in seconds
"""
try:
with wave.open(file_path, 'rb') as wf:
frames = wf.getnframes()
rate = wf.getframerate()
duration = frames / float(rate)
return duration
except Exception as e:
logger.error(f"Failed to get audio duration: {e}")
# Fallback: use ffprobe
import subprocess
try:
cmd = [
'ffprobe',
'-v', 'error',
'-show_entries', 'format=duration',
'-of', 'default=noprint_wrappers=1:nokey=1',
file_path
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return float(result.stdout.strip())
except:
return 0.0
def normalize_audio(audio_data: np.ndarray) -> np.ndarray:
"""
Normalize audio data to [-1, 1] range.
Args:
audio_data: Audio samples as numpy array
Returns:
Normalized audio data
"""
max_val = np.abs(audio_data).max()
if max_val > 0:
return audio_data / max_val
return audio_data
def trim_silence(
file_path: str,
output_path: Optional[str] = None,
silence_threshold: float = 0.01,
min_silence_duration: float = 0.5
) -> str:
"""
Remove silence from beginning and end of audio.
Args:
file_path: Input audio file path
output_path: Output file path (auto-generated if None)
silence_threshold: Amplitude threshold for silence detection
min_silence_duration: Minimum silence duration to trim (seconds)
Returns:
Path to trimmed audio file
"""
if output_path is None:
base, ext = os.path.splitext(file_path)
output_path = f"{base}_trimmed{ext}"
logger.info(f"Trimming silence from {file_path}")
try:
# Read WAV file
with wave.open(file_path, 'rb') as wf:
sample_rate = wf.getframerate()
n_channels = wf.getnchannels()
sample_width = wf.getsampwidth()
frames = wf.readframes(wf.getnframes())
# Convert to numpy array
if sample_width == 2:
audio_data = np.frombuffer(frames, dtype=np.int16)
elif sample_width == 4:
audio_data = np.frombuffer(frames, dtype=np.int32)
else:
raise ValueError(f"Unsupported sample width: {sample_width}")
# Reshape for multi-channel
if n_channels > 1:
audio_data = audio_data.reshape(-1, n_channels)
# Normalize
normalized = audio_data.astype(np.float32) / (2 ** (sample_width * 8 - 1))
# Find non-silent regions
if n_channels > 1:
magnitude = np.abs(normalized).mean(axis=1)
else:
magnitude = np.abs(normalized)
# Find start and end of non-silent audio
non_silent = magnitude > silence_threshold
if not non_silent.any():
logger.warning("Entire audio is silent!")
return file_path
start_idx = np.argmax(non_silent)
end_idx = len(non_silent) - np.argmax(non_silent[::-1])
# Extract non-silent portion
if n_channels > 1:
trimmed = audio_data[start_idx:end_idx]
else:
trimmed = audio_data[start_idx:end_idx]
# Write output WAV
with wave.open(output_path, 'wb') as wf:
wf.setnchannels(n_channels)
wf.setsampwidth(sample_width)
wf.setframerate(sample_rate)
wf.writeframes(trimmed.tobytes())
logger.info(f"Silence trimmed: {output_path}")
return output_path
except Exception as e:
logger.error(f"Failed to trim silence: {e}")
return file_path
def resample_audio(
file_path: str,
target_rate: int = 16000,
output_path: Optional[str] = None
) -> str:
"""
Resample audio to target sample rate.
Args:
file_path: Input audio file
target_rate: Target sample rate in Hz
output_path: Output file path
Returns:
Path to resampled audio
"""
if output_path is None:
base, ext = os.path.splitext(file_path)
output_path = f"{base}_resampled{ext}"
return convert_to_wav(
input_path=file_path,
output_path=output_path,
sample_rate=target_rate
)
def split_audio_chunks(
file_path: str,
chunk_duration: float = 30.0,
overlap: float = 1.0
) -> list:
"""
Split audio into overlapping chunks for processing long files.
Args:
file_path: Input audio file
chunk_duration: Duration of each chunk in seconds
overlap: Overlap between chunks in seconds
Returns:
List of (start_time, end_time, chunk_data) tuples
"""
logger.info(f"Splitting audio into {chunk_duration}s chunks")
try:
with wave.open(file_path, 'rb') as wf:
sample_rate = wf.getframerate()
n_channels = wf.getnchannels()
sample_width = wf.getsampwidth()
total_frames = wf.getnframes()
chunk_frames = int(chunk_duration * sample_rate)
overlap_frames = int(overlap * sample_rate)
chunks = []
position = 0
while position < total_frames:
wf.setpos(position)
frames = wf.readframes(min(chunk_frames, total_frames - position))
start_time = position / sample_rate
end_time = min((position + chunk_frames) / sample_rate, total_frames / sample_rate)
chunks.append((start_time, end_time, frames))
position += chunk_frames - overlap_frames
logger.info(f"Split into {len(chunks)} chunks")
return chunks
except Exception as e:
logger.error(f"Failed to split audio: {e}")
raise
if __name__ == "__main__":
print("Audio utilities module loaded successfully!")
print("Available functions:")
print(" - validate_audio_file")
print(" - convert_to_wav")
print(" - get_audio_duration")
print(" - normalize_audio")
print(" - trim_silence")
print(" - resample_audio")
print(" - split_audio_chunks")