Spaces:
Sleeping
Sleeping
File size: 4,859 Bytes
51c9eb3 5d6c840 51c9eb3 5d6c840 51c9eb3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
import soundfile as sf
import numpy as np
import io
import numpy as np
import os
from dotenv import load_dotenv
from openai import OpenAI, RateLimitError, APIError, APIConnectionError
import time
from pydub import AudioSegment
load_dotenv()
client = OpenAI(
base_url = "https://integrate.api.nvidia.com/v1",
api_key = os.environ["NVIDIA_API"]
)
def chat_llm(conversation: None):
completion = client.chat.completions.create(
model="meta/llama-3.1-405b-instruct",
messages=conversation,
temperature=0.2,
top_p=0.7,
max_tokens=4000,
stream=True
)
for chunk in completion:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="")
if __name__ == "__main__":
from _data_model import AppState
state = AppState(llm_conversation=[
{
"role": "system",
"content": "You are a voice assistant. You are there on my behalf. My name is Deepak and your name is Julia. You are there tell the user how good engineer I am"
},
{
"role": "user",
"content": "Hey, what can you tell ?"
}
])
e, msg = chat_llm(conversation=state.llm_conversation)
print(e, msg)
def audio_to_bytes(audio_input) -> bytes:
"""
Convert a Gradio audio input (numpy array or filepath) to WAV bytes.
Parameters:
audio_input: tuple | str
- If tuple: (numpy_array, sample_rate)
- If str: path to an audio file
Returns:
bytes: The WAV file bytes.
"""
if isinstance(audio_input, str):
# audio_input is a file path
samplerate, data = sf.read(audio_input)
elif isinstance(audio_input, (tuple, list)) and len(audio_input) == 2:
# audio_input is (numpy array, sample_rate)
samplerate, data = audio_input
else:
raise ValueError("Invalid audio input. Expected (numpy_array, sample_rate) or file path string.")
# Ensure mono (channel count = 1)
if data.ndim > 1:
data = np.mean(data, axis=1) # average channels to mono
# Write to an in-memory buffer
wav_buffer = io.BytesIO()
sf.write(wav_buffer, data, samplerate, format='WAV')
wav_bytes = wav_buffer.getvalue()
wav_buffer.close()
return wav_bytes
def audio_bytes_to_gr_tuple(audio_bytes: bytes) -> tuple[int, np.ndarray]:
"""
Converts a bytes object containing audio data into a (sample_rate, numpy_array)
tuple suitable for Gradio's Audio component.
Supports any format recognized by pydub/ffmpeg.
"""
# Load the bytes into an AudioSegment
audio_segment = AudioSegment.from_file(io.BytesIO(audio_bytes))
# Get raw audio samples as numpy array
samples = np.array(audio_segment.get_array_of_samples())
# If stereo, reshape appropriately
if audio_segment.channels > 1:
samples = samples.reshape((-1, audio_segment.channels))
# Return in (sample_rate, np.ndarray) format expected by Gradio
return audio_segment.frame_rate, samples
'''depcreate'''
# def detect_pause(audio_array, sample_rate, silence_threshold=0.01, min_pause_ms=300) -> bool:
# """
# Detect if there is a pause in the audio.
# Parameters:
# audio_array (np.ndarray): Audio samples (mono or stereo)
# sample_rate (int): Sampling rate
# silence_threshold (float): Max amplitude considered silence
# min_pause_ms (int): Minimum duration (ms) to count as a pause
# Returns:
# bool: True if a pause is detected, False otherwise
# """
# # Convert stereo to mono if needed
# if audio_array.ndim > 1:
# audio_array = np.mean(audio_array, axis=1)
# # Absolute amplitude
# amplitude = np.abs(audio_array)
# # Boolean array: True where below threshold
# silent = amplitude < silence_threshold
# # Convert pause duration from ms to number of samples
# min_silent_samples = int(sample_rate * (min_pause_ms / 1000.0))
# # Find if there is a contiguous silent region of that length
# count = 0
# for s in silent:
# if s:
# count += 1
# if count >= min_silent_samples:
# return True # pause detected
# else:
# count = 0
# return False # no long enough silence
# def steaming(audio: tuple, state: AppState):
# if state.stream is None:
# state.stream = audio[1]
# state.sampling_rate = audio[0]
# else:
# state.stream = np.concatenate((state.stream, audio[1]))
# pause_detected = detect_pause(state.stream, state.sampling_rate)
# state.pause_detected = pause_detected
# if state.pause_detected and state.started_talking:
# return gr.Audio(recording=False), state
# return None, state
|