voiceBot / _utils.py
Deepak Sahu
working update
5d6c840
import soundfile as sf
import numpy as np
import io
import numpy as np
import os
from dotenv import load_dotenv
from openai import OpenAI, RateLimitError, APIError, APIConnectionError
import time
from pydub import AudioSegment
load_dotenv()
client = OpenAI(
base_url = "https://integrate.api.nvidia.com/v1",
api_key = os.environ["NVIDIA_API"]
)
def chat_llm(conversation: None):
completion = client.chat.completions.create(
model="meta/llama-3.1-405b-instruct",
messages=conversation,
temperature=0.2,
top_p=0.7,
max_tokens=4000,
stream=True
)
for chunk in completion:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="")
if __name__ == "__main__":
from _data_model import AppState
state = AppState(llm_conversation=[
{
"role": "system",
"content": "You are a voice assistant. You are there on my behalf. My name is Deepak and your name is Julia. You are there tell the user how good engineer I am"
},
{
"role": "user",
"content": "Hey, what can you tell ?"
}
])
e, msg = chat_llm(conversation=state.llm_conversation)
print(e, msg)
def audio_to_bytes(audio_input) -> bytes:
"""
Convert a Gradio audio input (numpy array or filepath) to WAV bytes.
Parameters:
audio_input: tuple | str
- If tuple: (numpy_array, sample_rate)
- If str: path to an audio file
Returns:
bytes: The WAV file bytes.
"""
if isinstance(audio_input, str):
# audio_input is a file path
samplerate, data = sf.read(audio_input)
elif isinstance(audio_input, (tuple, list)) and len(audio_input) == 2:
# audio_input is (numpy array, sample_rate)
samplerate, data = audio_input
else:
raise ValueError("Invalid audio input. Expected (numpy_array, sample_rate) or file path string.")
# Ensure mono (channel count = 1)
if data.ndim > 1:
data = np.mean(data, axis=1) # average channels to mono
# Write to an in-memory buffer
wav_buffer = io.BytesIO()
sf.write(wav_buffer, data, samplerate, format='WAV')
wav_bytes = wav_buffer.getvalue()
wav_buffer.close()
return wav_bytes
def audio_bytes_to_gr_tuple(audio_bytes: bytes) -> tuple[int, np.ndarray]:
"""
Converts a bytes object containing audio data into a (sample_rate, numpy_array)
tuple suitable for Gradio's Audio component.
Supports any format recognized by pydub/ffmpeg.
"""
# Load the bytes into an AudioSegment
audio_segment = AudioSegment.from_file(io.BytesIO(audio_bytes))
# Get raw audio samples as numpy array
samples = np.array(audio_segment.get_array_of_samples())
# If stereo, reshape appropriately
if audio_segment.channels > 1:
samples = samples.reshape((-1, audio_segment.channels))
# Return in (sample_rate, np.ndarray) format expected by Gradio
return audio_segment.frame_rate, samples
'''depcreate'''
# def detect_pause(audio_array, sample_rate, silence_threshold=0.01, min_pause_ms=300) -> bool:
# """
# Detect if there is a pause in the audio.
# Parameters:
# audio_array (np.ndarray): Audio samples (mono or stereo)
# sample_rate (int): Sampling rate
# silence_threshold (float): Max amplitude considered silence
# min_pause_ms (int): Minimum duration (ms) to count as a pause
# Returns:
# bool: True if a pause is detected, False otherwise
# """
# # Convert stereo to mono if needed
# if audio_array.ndim > 1:
# audio_array = np.mean(audio_array, axis=1)
# # Absolute amplitude
# amplitude = np.abs(audio_array)
# # Boolean array: True where below threshold
# silent = amplitude < silence_threshold
# # Convert pause duration from ms to number of samples
# min_silent_samples = int(sample_rate * (min_pause_ms / 1000.0))
# # Find if there is a contiguous silent region of that length
# count = 0
# for s in silent:
# if s:
# count += 1
# if count >= min_silent_samples:
# return True # pause detected
# else:
# count = 0
# return False # no long enough silence
# def steaming(audio: tuple, state: AppState):
# if state.stream is None:
# state.stream = audio[1]
# state.sampling_rate = audio[0]
# else:
# state.stream = np.concatenate((state.stream, audio[1]))
# pause_detected = detect_pause(state.stream, state.sampling_rate)
# state.pause_detected = pause_detected
# if state.pause_detected and state.started_talking:
# return gr.Audio(recording=False), state
# return None, state