Spaces:
Runtime error
Runtime error
| import speech_recognition as sr | |
| from gtts import gTTS | |
| import gradio as gr | |
| from io import BytesIO | |
| import numpy as np | |
| from dataclasses import dataclass, field | |
| import time | |
| from pydub import AudioSegment | |
| import librosa | |
| from utils.vad import get_speech_timestamps, collect_chunks, VadOptions | |
| from PIL import Image | |
| from ClassPrompt import PromptClass | |
| import render | |
| creator_prompt = PromptClass() | |
| r = sr.Recognizer() | |
| class AppState: | |
| stream: np.ndarray | None = None | |
| sampling_rate: int = 0 | |
| pause_detected: bool = False | |
| started_talking: bool = False | |
| stopped: bool = False | |
| history: list = field(default_factory=list) | |
| typing: bool = False | |
| painting:bool = False | |
| image_out:Image.Image = None | |
| image_in:Image = None | |
| conversation:list = field(default_factory=list) | |
| recording: bool = False # Thêm thuộc tính recording | |
| pause_threshold: float = 1 # Thêm thuộc tính pause_threshold | |
| def run_vad(ori_audio, sr): | |
| _st = time.time() | |
| try: | |
| audio = ori_audio | |
| audio = audio.astype(np.float32) / 32768.0 | |
| sampling_rate = 16000 | |
| audio = librosa.resample(audio, orig_sr=sr, target_sr=sampling_rate) | |
| vad_parameters = {} | |
| vad_parameters = VadOptions(**vad_parameters) | |
| speech_chunks = get_speech_timestamps(audio, vad_parameters) | |
| audio = collect_chunks(audio, speech_chunks) | |
| duration_after_vad = audio.shape[0] / sampling_rate # Khai báo và tính toán duration_after_vad | |
| vad_audio = audio | |
| vad_audio = np.round(vad_audio * 32768.0).astype(np.int16) | |
| vad_audio_bytes = vad_audio.tobytes() | |
| return duration_after_vad, vad_audio_bytes, round(time.time() - _st, 4) | |
| except Exception as e: | |
| return -1, ori_audio, round(time.time() - _st, 4) | |
| def determine_pause(audio:np.ndarray,sampling_rate:int,state:AppState) -> bool: | |
| """Phát hiện tạm dừng trong âm thanh.""" | |
| temp_audio = audio | |
| dur_vad, _, time_vad = run_vad(temp_audio, sampling_rate) | |
| duration = len(audio) / sampling_rate | |
| if dur_vad > 0.5 and not state.started_talking: | |
| print("started talking") | |
| state.started_talking = True | |
| return False | |
| print(f"duration_after_vad: {dur_vad:.3f} s, time_vad: {time_vad:.3f} s") | |
| return (duration - dur_vad) > state.pause_threshold # Sử dụng state.pause_threshold | |
| def process_audio(audio:tuple,state:AppState,image:Image): | |
| if state.recording: # Kiểm tra state.stream: | |
| if state.stream is not None: | |
| try: | |
| state.stream = np.concatenate((state.stream, audio[1])) | |
| except Exception as e: | |
| print(f"Lỗi tổng hợp giọng nói: {e}") | |
| return state, None | |
| else: | |
| state.stream = audio[1] | |
| state.sampling_rate = audio[0] | |
| state.image_in=image | |
| pause_detected = determine_pause(state.stream, state.sampling_rate, state) | |
| state.pause_detected = pause_detected | |
| if state.pause_detected and state.started_talking: | |
| state.started_talking = False | |
| state.recording = False | |
| return state, gr.Audio(recording=False) | |
| return state, None | |
| def transcribe_audio(audio_segment): | |
| audio_buffer = BytesIO() | |
| audio_segment.export(audio_buffer, format="wav") | |
| audio_buffer.seek(0) | |
| try: | |
| with sr.AudioFile(audio_buffer) as source: | |
| r.adjust_for_ambient_noise(source) | |
| text = r.recognize_google(r.record(source), language='vi') | |
| return text | |
| except sr.UnknownValueError: | |
| print("Could not understand audio.") | |
| except sr.RequestError as e: | |
| print(f"Could not request results from Google Speech Recognition service; {e}") | |
| return "" | |
| def chat_with_onlinemodel(user_input, state:AppState): | |
| state.history.append({"role": "user", "content": user_input}) | |
| response = creator_prompt.chat(provider="SambaNova", model="Meta-Llama-3.1-405B-Instruct", input_text=state.history) | |
| bot_response = response | |
| characters = bot_response.replace("*","") | |
| state.history.append({"role": "assistant", "content": characters}) | |
| state.conversation.append({"role": "user", "content":"Bạn: " + user_input}) | |
| state.conversation.append({"role": "assistant", "content":"Bot: " + characters}) | |
| return characters, state | |
| def synthesize_speech(text): | |
| """Chuyển đổi text sang giọng nói bằng gTTS.""" | |
| try: | |
| mp3 = gTTS(text, tld='com.vn', lang='vi', slow=False) | |
| mp3_fp = BytesIO() | |
| mp3.write_to_fp(mp3_fp) | |
| audio_bytes = mp3_fp.getvalue() | |
| mp3_fp.close() | |
| return audio_bytes # Chỉ trả về audio_bytes | |
| except Exception as e: | |
| print(f"Lỗi tổng hợp giọng nói: {e}") | |
| return None | |
| def response_audio(state:AppState, progress=gr.Progress(track_tqdm=True)): | |
| """Xử lý yêu cầu và tạo phản hồi.""" | |
| if not state.pause_detected and not state.started_talking: | |
| return state, None | |
| textin="" | |
| audio_segment = AudioSegment( | |
| state.stream.tobytes(), | |
| frame_rate=state.sampling_rate, | |
| sample_width=state.stream.dtype.itemsize, | |
| channels=1 if state.stream.ndim == 1 else state.stream.shape[1] | |
| ) | |
| textin = transcribe_audio(audio_segment) | |
| state.stream = None | |
| if state.typing is False: | |
| txt,state = chuyen_trangthai(textin, state) | |
| if txt == True: | |
| return state, synthesize_speech("chuyển sang trạng thái dùng bàn phím") | |
| if textin != "": | |
| paint=state.painting | |
| state.painting = text_check(textin, state.painting) | |
| if paint != state.painting: | |
| return state, synthesize_speech("Đã chuyển sang chế độ " + ("vẽ" if state.painting else "nói chuyện")) | |
| if state.painting is True: | |
| promptx = prompt_hugingface(textin,"Hugging Face","Qwen/Qwen2.5-72B-Instruct","Medium") | |
| imgtxt="" | |
| if state.image_in: | |
| img=resize(state.image_in) | |
| imgtxt = creator_prompt.img2text(img) | |
| else: | |
| img=None | |
| state.image_out = render.generate_images(imgtxt+promptx,img,progress) | |
| audio_bytes = synthesize_speech("Bạn thấy tôi vẽ "+textin+" có đẹp không") | |
| return state, audio_bytes | |
| else: | |
| print("Đang nghĩ...") | |
| text_out, state = chat_with_onlinemodel(textin,state) | |
| audio_bytes = synthesize_speech(text_out) | |
| return state, audio_bytes | |
| else: | |
| return state, synthesize_speech("Tôi nghe không rõ") # Trả về thông báo lỗi nếu synthesize_speech thất bại | |
| def response_text(state:AppState,textin,image:Image, prompt,progress=gr.Progress(track_tqdm=True)): | |
| """Xử lý yêu cầu và tạo phản hồi.""" | |
| #state.recording = False # Dừng ghi âm | |
| if state.typing is True: | |
| txt,state = chuyen_trangthai(textin, state) | |
| if txt == False: | |
| return state, synthesize_speech("chuyển sang trạng thái nói") | |
| if textin != "": | |
| paint=state.painting | |
| state.painting = text_check(textin, state.painting) | |
| if paint != state.painting: | |
| return state, synthesize_speech("Đã chuyển sang chế độ " + ("vẽ" if state.painting else "nói chuyện")) | |
| if state.painting is True: | |
| state.conversation.append({"role": "user", "content":"Bạn: " + textin}) | |
| #state.image_out = generate_image(textin, image, streng, ckpt,guidance) | |
| imgtxt="" | |
| if image: | |
| img=resize(image) | |
| imgtxt = creator_prompt.img2text(img) | |
| else: | |
| img=None | |
| image_out = render.generate_images(imgtxt+textin,img,progress) | |
| state.image_out = image_out | |
| audio_bytes = synthesize_speech("Bạn thấy tôi vẽ "+prompt+" có đẹp không") | |
| return state, audio_bytes | |
| else: | |
| print("Đang nghĩ...") | |
| text_out, state = chat_with_onlinemodel(textin,state=state) | |
| audio_bytes = synthesize_speech(text_out) | |
| return state, audio_bytes | |
| else: | |
| return state, synthesize_speech("Hãy gõ nội dung") # Trả về thông báo lỗi nếu synthesize_speech thất bại | |
| def text_check(textin, painting): | |
| if not painting: | |
| return "sang chế độ vẽ" in textin | |
| return "sang chế độ nói" not in textin | |
| def chuyen_trangthai(textin, state:AppState): | |
| if "muốn nói chuyện" in textin: | |
| state.started_talking = False | |
| state.recording = True | |
| state.stopped=False | |
| state.typing = False | |
| return False, state | |
| elif "dùng bàn phím" in textin: | |
| state.started_talking = False | |
| state.recording = False | |
| state.stopped=True | |
| state.typing = True | |
| return True, state | |
| else: | |
| return state.typing, state | |
| def start_recording_user(state:AppState): # Sửa lỗi tại đây | |
| state.stopped = False # Cho phép bắt đầu ghi âm lại nếu đang ở trạng thái recording | |
| state.started_talking = False | |
| state.recording = True | |
| return gr.Audio(recording=True), state | |
| def restart_recording(state:AppState): # Sửa lỗi tại đây | |
| if not state.stopped: # Cho phép bắt đầu ghi âm lại nếu đang ở trạng thái recording | |
| state.started_talking = False | |
| state.recording = True | |
| return gr.Audio(recording=True), state | |
| else: | |
| state.started_talking = False | |
| state.recording = False | |
| return gr.Audio(recording=False), state | |
| def prompt_hugingface(prompt,llm_provider,model,type): | |
| result = creator_prompt.generate( | |
| input_text=prompt, | |
| long_talk=True, | |
| compress=True, | |
| compression_level="hard", | |
| poster=False, | |
| prompt_type=type, # Use the updated prompt_type here | |
| custom_base_prompt="", | |
| provider=llm_provider, | |
| model=model | |
| ) | |
| output = result | |
| return output | |
| def resize(img:Image.Image): | |
| height = (img.height // 8) * 8 | |
| width = (img.width // 8) * 8 | |
| imgre = img.resize((width,height)) | |
| return imgre | |
| loaded = "" | |
| steps = 50 | |
| def update_model_choices(provider): | |
| provider_models = { | |
| "Hugging Face": [ | |
| "Qwen/Qwen2.5-72B-Instruct", | |
| "meta-llama/Meta-Llama-3.1-70B-Instruct", | |
| "mistralai/Mixtral-8x7B-Instruct-v0.1", | |
| "mistralai/Mistral-7B-Instruct-v0.3" | |
| ], | |
| "SambaNova": [ | |
| "Meta-Llama-3.1-70B-Instruct", | |
| "Meta-Llama-3.1-405B-Instruct", | |
| "Meta-Llama-3.1-8B-Instruct" | |
| ], | |
| } | |
| models = provider_models.get(provider, []) | |
| return gr.Dropdown(choices=models, value=models[0] if models else "") | |
| prompt_types = ["Long", "Short", "Medium", "OnlyObjects", "NoFigure", "Landscape", "Fantasy"] | |
| title = "Chat tiếng việt by tuphamkts" | |
| description = "Muốn vẽ nói: Chuyển sang chế độ vẽ. Muốn chat nói: Chuyển sang chế độ nói. Chế độ gõ: Tôi muốn dùng bàn phím, chế độ nói: Tôi muốn nói chuyện. Ghi chú: Chỉ dừng chương trình khi tôi đang nói (lịch sử chat sẽ bị xóa khi dừng chương trình)." | |
| examples = ["Chuyển sang chế độ vẽ","Chuyển sang chế độ nói","Tôi nuốn nói chuyện","Tôi muốn dùng bàn phím"] | |
| with gr.Blocks(title=title) as demo: | |
| gr.HTML(f"<div style='text-align: center;'><h1>{title}</h1><p>{description}</p></div>") | |
| with gr.Row(): | |
| with gr.Column(): | |
| with gr.Column(visible=False) as prompt_visible: | |
| with gr.Row(): | |
| llm_provider = gr.Dropdown(choices=["Hugging Face", "SambaNova"], label="Nguồn model", value="Hugging Face") | |
| model = gr.Dropdown(label="Chọn Model", choices=["Qwen/Qwen2.5-72B-Instruct","meta-llama/Meta-Llama-3.1-70B-Instruct","mistralai/Mixtral-8x7B-Instruct-v0.1","mistralai/Mistral-7B-Instruct-v0.3"], value="Qwen/Qwen2.5-72B-Instruct") | |
| prompt_type = gr.Dropdown(choices=prompt_types, label="Phong cách", value="Medium", interactive=True) | |
| input_prompt = gr.Textbox(label="Nhập nội dung muốn vẽ",value="Một cô gái", type="text"), | |
| generate_prompt = gr.Button("Tạo Prompt", variant="stop") | |
| with gr.Column(visible=False) as typing_visible: | |
| input_text = gr.Textbox(label="Nhập nội dung trao đổi", type="text"), | |
| submit = gr.Button("Áp dụng", variant="stop") | |
| input_audio = gr.Audio(label="Nói cho tôi nghe nào", sources="microphone", type="numpy") | |
| output_audio = gr.Audio(label="Trợ lý", autoplay=True, sources=None,type="numpy") | |
| input_image = gr.Image(label="Hình ảnh của bạn", sources=["upload","clipboard","webcam"], type="pil",visible=True) | |
| with gr.Column(visible=False) as image_visible: | |
| output_image = gr.Image(label="Hình ảnh sau xử lý", sources=None, type="pil",visible=True) | |
| with gr.Column(visible=True) as chatbot_visible: | |
| chatbot = gr.Chatbot(label="Nội dung trò chuyện",type="messages") | |
| state = gr.State(value=AppState()) | |
| #state = gr.State(value=AppState(typing=True, painting=True)) | |
| startrecord = input_audio.start_recording( | |
| start_recording_user, | |
| [state], | |
| [input_audio, state], | |
| ) | |
| stream = input_audio.stream( | |
| process_audio, | |
| [input_audio,state,input_image], | |
| [state,input_audio], | |
| stream_every=1, | |
| time_limit=30, | |
| ) | |
| respond = input_audio.stop_recording( | |
| response_audio, | |
| [state], | |
| [state, output_audio], | |
| ) | |
| respond.then(lambda s: s.conversation, [state], [chatbot]) | |
| respond.then(lambda s: s.image_out, [state], [output_image]) | |
| restart = output_audio.stop( | |
| restart_recording, | |
| [state], | |
| [input_audio, state], | |
| ) | |
| restart.then(lambda s: gr.update(visible= not s.typing, recording = not s.typing), [state], [input_audio]) | |
| restart.then(lambda s: gr.update(visible=s.typing), [state], [typing_visible]) | |
| restart.then(lambda s: gr.update(visible=s.painting), [state], [image_visible]) | |
| restart.then(lambda s: gr.update(visible=(s.painting and s.typing) if s.painting==True else False), [state], [prompt_visible]) | |
| restart.then(lambda s: gr.update(visible= not s.painting), [state], [chatbot_visible]) | |
| cancel = gr.Button("Dừng chương trình", variant="stop", interactive=False) | |
| stream.then(lambda s: gr.update(interactive= not s.stopped), [state], [cancel]) | |
| cancel.click( | |
| lambda: (AppState(stopped=True, recording=False, started_talking = False), gr.Audio(recording=False), gr.update(interactive=False)), | |
| None,[state, input_audio, cancel], | |
| cancels=[respond, stream, startrecord, restart] # Thêm startrecord và stream vào cancels | |
| ) | |
| sub = submit.click( | |
| response_text, | |
| [state, input_text[0], input_image, input_prompt[0]], | |
| [state, output_audio], | |
| ) | |
| sub.then(lambda s: s.conversation, [state], [chatbot]) | |
| sub.then(lambda s: s.image_out, [state], [output_image]) | |
| generator = generate_prompt.click( | |
| prompt_hugingface, | |
| [input_prompt[0],llm_provider,model,prompt_type], | |
| [input_text[0]] | |
| ) | |
| llm_provider.change( | |
| update_model_choices, | |
| [llm_provider], | |
| [model] | |
| ) | |
| gr.Examples( | |
| examples=examples, | |
| inputs=input_text, | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |