File size: 4,859 Bytes
51c9eb3
 
 
 
5d6c840
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
51c9eb3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5d6c840
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
51c9eb3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import soundfile as sf 
import numpy as np
import io 
import numpy as np
import os
from dotenv import load_dotenv 
from openai import OpenAI, RateLimitError, APIError, APIConnectionError
import time 
from pydub import AudioSegment
load_dotenv() 

client = OpenAI(
  base_url = "https://integrate.api.nvidia.com/v1",
  api_key = os.environ["NVIDIA_API"]
)

def chat_llm(conversation: None):
    completion = client.chat.completions.create(
    model="meta/llama-3.1-405b-instruct",
    messages=conversation,
    temperature=0.2,
    top_p=0.7,
    max_tokens=4000,
    stream=True
    )

    for chunk in completion:
        if chunk.choices[0].delta.content is not None:
            print(chunk.choices[0].delta.content, end="")

        
if __name__ == "__main__": 
    from _data_model import AppState
    state = AppState(llm_conversation=[
        {
            "role": "system",
            "content": "You are a voice assistant. You are there on my behalf. My name is Deepak and your name is Julia. You are there tell the user how good engineer I am"
        },
        {
            "role": "user", 
            "content": "Hey, what can  you tell ?"
        }
    ])
    e, msg = chat_llm(conversation=state.llm_conversation)
    print(e, msg)




def audio_to_bytes(audio_input) -> bytes:
    """
    Convert a Gradio audio input (numpy array or filepath) to WAV bytes.
    
    Parameters:
        audio_input: tuple | str
            - If tuple: (numpy_array, sample_rate)
            - If str: path to an audio file

    Returns:
        bytes: The WAV file bytes.
    """
    if isinstance(audio_input, str):
        # audio_input is a file path
        samplerate, data = sf.read(audio_input)
    elif isinstance(audio_input, (tuple, list)) and len(audio_input) == 2:
        # audio_input is (numpy array, sample_rate)
        samplerate, data = audio_input
    else:
        raise ValueError("Invalid audio input. Expected (numpy_array, sample_rate) or file path string.")
        # Ensure mono (channel count = 1)
    if data.ndim > 1:
        data = np.mean(data, axis=1)  # average channels to mono

    # Write to an in-memory buffer
    wav_buffer = io.BytesIO()
    sf.write(wav_buffer, data, samplerate, format='WAV')
    wav_bytes = wav_buffer.getvalue()
    wav_buffer.close()
    
    return wav_bytes


def audio_bytes_to_gr_tuple(audio_bytes: bytes) -> tuple[int, np.ndarray]:
    """
    Converts a bytes object containing audio data into a (sample_rate, numpy_array)
    tuple suitable for Gradio's Audio component.
    Supports any format recognized by pydub/ffmpeg.
    """
    # Load the bytes into an AudioSegment
    audio_segment = AudioSegment.from_file(io.BytesIO(audio_bytes))

    # Get raw audio samples as numpy array
    samples = np.array(audio_segment.get_array_of_samples())

    # If stereo, reshape appropriately
    if audio_segment.channels > 1:
        samples = samples.reshape((-1, audio_segment.channels))

    # Return in (sample_rate, np.ndarray) format expected by Gradio
    return audio_segment.frame_rate, samples

'''depcreate'''


# def detect_pause(audio_array, sample_rate, silence_threshold=0.01, min_pause_ms=300) -> bool:
#     """
#     Detect if there is a pause in the audio.
    
#     Parameters:
#         audio_array (np.ndarray): Audio samples (mono or stereo)
#         sample_rate (int): Sampling rate
#         silence_threshold (float): Max amplitude considered silence
#         min_pause_ms (int): Minimum duration (ms) to count as a pause
    
#     Returns:
#         bool: True if a pause is detected, False otherwise
#     """
#     # Convert stereo to mono if needed
#     if audio_array.ndim > 1:
#         audio_array = np.mean(audio_array, axis=1)
    
#     # Absolute amplitude
#     amplitude = np.abs(audio_array)
    
#     # Boolean array: True where below threshold
#     silent = amplitude < silence_threshold
    
#     # Convert pause duration from ms to number of samples
#     min_silent_samples = int(sample_rate * (min_pause_ms / 1000.0))
    
#     # Find if there is a contiguous silent region of that length
#     count = 0
#     for s in silent:
#         if s:
#             count += 1
#             if count >= min_silent_samples:
#                 return True  # pause detected
#         else:
#             count = 0
#     return False  # no long enough silence




# def steaming(audio: tuple, state: AppState):
#     if state.stream is None:
#         state.stream = audio[1]
#         state.sampling_rate = audio[0]
#     else:
#         state.stream =  np.concatenate((state.stream, audio[1]))

#     pause_detected = detect_pause(state.stream, state.sampling_rate)
#     state.pause_detected = pause_detected

#     if state.pause_detected and state.started_talking:
#         return gr.Audio(recording=False), state
#     return None, state