Spaces:
Sleeping
Sleeping
| import soundfile as sf | |
| import numpy as np | |
| import io | |
| import numpy as np | |
| import os | |
| from dotenv import load_dotenv | |
| from openai import OpenAI, RateLimitError, APIError, APIConnectionError | |
| import time | |
| from pydub import AudioSegment | |
| load_dotenv() | |
| client = OpenAI( | |
| base_url = "https://integrate.api.nvidia.com/v1", | |
| api_key = os.environ["NVIDIA_API"] | |
| ) | |
| def chat_llm(conversation: None): | |
| completion = client.chat.completions.create( | |
| model="meta/llama-3.1-405b-instruct", | |
| messages=conversation, | |
| temperature=0.2, | |
| top_p=0.7, | |
| max_tokens=4000, | |
| stream=True | |
| ) | |
| for chunk in completion: | |
| if chunk.choices[0].delta.content is not None: | |
| print(chunk.choices[0].delta.content, end="") | |
| if __name__ == "__main__": | |
| from _data_model import AppState | |
| state = AppState(llm_conversation=[ | |
| { | |
| "role": "system", | |
| "content": "You are a voice assistant. You are there on my behalf. My name is Deepak and your name is Julia. You are there tell the user how good engineer I am" | |
| }, | |
| { | |
| "role": "user", | |
| "content": "Hey, what can you tell ?" | |
| } | |
| ]) | |
| e, msg = chat_llm(conversation=state.llm_conversation) | |
| print(e, msg) | |
| def audio_to_bytes(audio_input) -> bytes: | |
| """ | |
| Convert a Gradio audio input (numpy array or filepath) to WAV bytes. | |
| Parameters: | |
| audio_input: tuple | str | |
| - If tuple: (numpy_array, sample_rate) | |
| - If str: path to an audio file | |
| Returns: | |
| bytes: The WAV file bytes. | |
| """ | |
| if isinstance(audio_input, str): | |
| # audio_input is a file path | |
| samplerate, data = sf.read(audio_input) | |
| elif isinstance(audio_input, (tuple, list)) and len(audio_input) == 2: | |
| # audio_input is (numpy array, sample_rate) | |
| samplerate, data = audio_input | |
| else: | |
| raise ValueError("Invalid audio input. Expected (numpy_array, sample_rate) or file path string.") | |
| # Ensure mono (channel count = 1) | |
| if data.ndim > 1: | |
| data = np.mean(data, axis=1) # average channels to mono | |
| # Write to an in-memory buffer | |
| wav_buffer = io.BytesIO() | |
| sf.write(wav_buffer, data, samplerate, format='WAV') | |
| wav_bytes = wav_buffer.getvalue() | |
| wav_buffer.close() | |
| return wav_bytes | |
| def audio_bytes_to_gr_tuple(audio_bytes: bytes) -> tuple[int, np.ndarray]: | |
| """ | |
| Converts a bytes object containing audio data into a (sample_rate, numpy_array) | |
| tuple suitable for Gradio's Audio component. | |
| Supports any format recognized by pydub/ffmpeg. | |
| """ | |
| # Load the bytes into an AudioSegment | |
| audio_segment = AudioSegment.from_file(io.BytesIO(audio_bytes)) | |
| # Get raw audio samples as numpy array | |
| samples = np.array(audio_segment.get_array_of_samples()) | |
| # If stereo, reshape appropriately | |
| if audio_segment.channels > 1: | |
| samples = samples.reshape((-1, audio_segment.channels)) | |
| # Return in (sample_rate, np.ndarray) format expected by Gradio | |
| return audio_segment.frame_rate, samples | |
| '''depcreate''' | |
| # def detect_pause(audio_array, sample_rate, silence_threshold=0.01, min_pause_ms=300) -> bool: | |
| # """ | |
| # Detect if there is a pause in the audio. | |
| # Parameters: | |
| # audio_array (np.ndarray): Audio samples (mono or stereo) | |
| # sample_rate (int): Sampling rate | |
| # silence_threshold (float): Max amplitude considered silence | |
| # min_pause_ms (int): Minimum duration (ms) to count as a pause | |
| # Returns: | |
| # bool: True if a pause is detected, False otherwise | |
| # """ | |
| # # Convert stereo to mono if needed | |
| # if audio_array.ndim > 1: | |
| # audio_array = np.mean(audio_array, axis=1) | |
| # # Absolute amplitude | |
| # amplitude = np.abs(audio_array) | |
| # # Boolean array: True where below threshold | |
| # silent = amplitude < silence_threshold | |
| # # Convert pause duration from ms to number of samples | |
| # min_silent_samples = int(sample_rate * (min_pause_ms / 1000.0)) | |
| # # Find if there is a contiguous silent region of that length | |
| # count = 0 | |
| # for s in silent: | |
| # if s: | |
| # count += 1 | |
| # if count >= min_silent_samples: | |
| # return True # pause detected | |
| # else: | |
| # count = 0 | |
| # return False # no long enough silence | |
| # def steaming(audio: tuple, state: AppState): | |
| # if state.stream is None: | |
| # state.stream = audio[1] | |
| # state.sampling_rate = audio[0] | |
| # else: | |
| # state.stream = np.concatenate((state.stream, audio[1])) | |
| # pause_detected = detect_pause(state.stream, state.sampling_rate) | |
| # state.pause_detected = pause_detected | |
| # if state.pause_detected and state.started_talking: | |
| # return gr.Audio(recording=False), state | |
| # return None, state | |