| | import os |
| | import re |
| | import tempfile |
| |
|
| | import gradio as gr |
| | import numpy as np |
| | import soundfile as sf |
| | import torch |
| | from ddgs import DDGS |
| | from huggingface_hub import hf_hub_download |
| | from llama_cpp import Llama |
| | from transformers import pipeline |
| | from TTS.api import TTS |
| |
|
| | |
| | device = "cpu" |
| |
|
| | |
| | print("Loading Whisper...") |
| | STT_MODEL_NAME = "openai/whisper-tiny.en" |
| | stt_pipe = pipeline("automatic-speech-recognition", model=STT_MODEL_NAME, device=device) |
| |
|
| | |
| | print("Setting up Llama.cpp...") |
| | HF_API_TOKEN = os.getenv("HF_TOKEN") |
| |
|
| | print("Downloading gzsol/model_1b GGUF...") |
| | model_path = hf_hub_download( |
| | repo_id="gzsol/model_1b", |
| | filename="model.gguf", |
| | token=HF_API_TOKEN, |
| | ) |
| |
|
| | print(f"Model path: {model_path}") |
| | print(f"File exists: {os.path.exists(model_path)}") |
| | if os.path.exists(model_path): |
| | print(f"File size: {os.path.getsize(model_path)} bytes") |
| | print(f"File size: {os.path.getsize(model_path) / (1024**3):.2f} GiB") |
| |
|
| | print(f"Loading model from {model_path}...") |
| | llm = Llama(model_path=model_path, n_gpu_layers=0, n_ctx=2048) |
| |
|
| | |
| | print("Loading TTS...") |
| | TTS_MODEL_NAME = "tts_models/en/ljspeech/tacotron2-DDC" |
| | tts_model = TTS(model_name=TTS_MODEL_NAME, progress_bar=False) |
| |
|
| |
|
| | |
| | def get_web_context(message): |
| | search_keywords = [ |
| | "current", |
| | "latest", |
| | "recent", |
| | "today", |
| | "now", |
| | "news", |
| | "weather", |
| | "price", |
| | "2024", |
| | "2025", |
| | "what is happening", |
| | "score", |
| | "match", |
| | ] |
| |
|
| | if not any(keyword in message.lower() for keyword in search_keywords): |
| | return None |
| |
|
| | try: |
| | with DDGS() as ddgs: |
| | results = list(ddgs.text(message, max_results=3)) |
| |
|
| | if not results: |
| | print("No search results found") |
| | return None |
| |
|
| | print(f"Found {len(results)} results:") |
| | context = "Current information from web search:\n" |
| | for i, result in enumerate(results): |
| | print(f"Result {i+1}: {result['title']}") |
| | print(f" Body: {result['body'][:100]}...") |
| | context += f"- {result['title']}: {result['body'][:200]}...\n" |
| |
|
| | return context |
| |
|
| | except Exception as e: |
| | print(f"Search error: {e}") |
| | return None |
| |
|
| |
|
| | def chat_with_bot(message, history): |
| | if history is None: |
| | history = [] |
| |
|
| | if not message or not message.strip(): |
| | return history, "" |
| |
|
| | try: |
| | web_context = get_web_context(message=message) |
| |
|
| | |
| | conversation = "" |
| | for h in history: |
| | role = "User" if h.get("role") == "user" else "Assistant" |
| | conversation += f"{role}: {h.get('content', '')}\n" |
| |
|
| | |
| | if web_context: |
| | prompt = f"""Answer ONLY using this information: |
| | |
| | {web_context} |
| | |
| | Question: {message} |
| | Answer:""" |
| | print("The web context has been added to the prompt") |
| | else: |
| | prompt = f"""You are a helpful assistant. Answer naturally and conversationally. |
| | {conversation}User: {message} |
| | Assistant:""" |
| |
|
| | print(f"Generating response with Llama...") |
| |
|
| | |
| | response = llm( |
| | prompt, |
| | max_tokens=200, |
| | temperature=0.7, |
| | top_p=0.95, |
| | stop=["User:", "\nUser:"], |
| | ) |
| |
|
| | response_str = response["choices"][0]["text"].strip() |
| |
|
| | response_str = response_str.strip("'\"") |
| | response_str = response_str.rstrip(",:;") |
| | response_str = response_str.strip("'\"") |
| | response_str = re.sub(r"(\d+\.){10,}", "", response_str) |
| |
|
| | if "User:" in response_str: |
| | response_str = response_str.split("User:")[0].strip() |
| |
|
| | response_str = response_str.replace("[{", "").replace("}]", "") |
| | response_str = response_str.replace("'text':", "").replace('"text":', "") |
| | response_str = response_str.replace("'type': 'text'", "").replace( |
| | '"type": "text"', "" |
| | ) |
| |
|
| | if ", 'type'" in response_str or ', "type"' in response_str: |
| | response_str = ( |
| | response_str.split(", 'type'")[0].split(', "type"')[0].strip() |
| | ) |
| |
|
| | |
| | response_str = response_str.strip("'\",:;") |
| |
|
| | if not response_str: |
| | response_str = "I received an empty response. Please try again." |
| | print("Warning: Empty response from LLM") |
| |
|
| | history.append({"role": "user", "content": message}) |
| | history.append({"role": "assistant", "content": response_str}) |
| |
|
| | return history, response_str |
| |
|
| | except Exception as e: |
| | import traceback |
| |
|
| | error_trace = traceback.format_exc() |
| | print(f"LLM Error: {e}") |
| | print(f"Full traceback:\n{error_trace}") |
| |
|
| | error_msg = f"Error generating response: {str(e) if str(e) else 'Unknown error occurred'}" |
| |
|
| | history.append({"role": "user", "content": message}) |
| | history.append({"role": "assistant", "content": error_msg}) |
| | return history, error_msg |
| |
|
| |
|
| | def text_to_speech_from_chat(chat_response): |
| | """Takes the chat response and converts it to speech.""" |
| | if not chat_response or chat_response.startswith("Error"): |
| | return None, "No valid response to synthesize." |
| |
|
| | output_path = None |
| | try: |
| | temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") |
| | output_path = temp_file.name |
| | temp_file.close() |
| |
|
| | tts_model.tts_to_file( |
| | text=chat_response, |
| | file_path=output_path, |
| | ) |
| | return output_path, "Speech synthesis complete." |
| |
|
| | except Exception as e: |
| | if output_path and os.path.exists(output_path): |
| | os.remove(output_path) |
| | return None, f"Error during TTS: {e}" |
| |
|
| |
|
| | def speech_to_text_and_chat(audio_file_path, history): |
| | """Performs STT, then Chatbot generation, returning the final response text and audio.""" |
| | if audio_file_path is None: |
| | return "Please upload an audio file.", history, "", None, "Awaiting input." |
| |
|
| | |
| | try: |
| | result = stt_pipe(audio_file_path) |
| | transcribed_text = result["text"] |
| | except Exception as e: |
| | return f"Error during STT: {e}", history, "", None, f"Error during STT: {e}" |
| |
|
| | |
| | updated_history, last_response_text = chat_with_bot(transcribed_text, history) |
| |
|
| | |
| | audio_path, status_text = text_to_speech_from_chat(last_response_text) |
| |
|
| | return ( |
| | transcribed_text, |
| | updated_history, |
| | last_response_text, |
| | audio_path, |
| | status_text, |
| | ) |
| |
|
| |
|
| | |
| | custom_css = """ |
| | #status { font-weight: bold; color: #2563eb; } |
| | .chatbot { height: 400px; } |
| | """ |
| |
|
| | with gr.Blocks() as demo: |
| | gr.Markdown("# 🗣️ GGUF Voice Assistant (Running your model_1b)") |
| | gr.Markdown("**Note:** This app uses `gzsol/model_1b` (GGUF) on CPU.") |
| |
|
| | |
| | |
| |
|
| | with gr.Tabs(): |
| |
|
| | |
| | with gr.TabItem("🗣️ Voice Assistant"): |
| | |
| | voice_chat_history = gr.Chatbot( |
| | label="Conversation Log", |
| | elem_classes=["chatbot"], |
| | value=[], |
| | ) |
| |
|
| | with gr.Row(): |
| | audio_in = gr.Audio( |
| | sources=["microphone", "upload"], |
| | type="filepath", |
| | label="Input Audio", |
| | ) |
| | voice_audio_out = gr.Audio(label="AI Voice Response", autoplay=True) |
| |
|
| | voice_transcription = gr.Textbox(label="User Transcription") |
| | voice_response_text = gr.Textbox(label="AI Response (Text)") |
| | voice_status = gr.Textbox(elem_id="status", label="Status") |
| |
|
| | run_btn = gr.Button("Transcribe, Chat & Speak", variant="primary") |
| | clear_voice_btn = gr.Button("Clear") |
| |
|
| | run_btn.click( |
| | fn=speech_to_text_and_chat, |
| | inputs=[audio_in, voice_chat_history], |
| | outputs=[ |
| | voice_transcription, |
| | voice_chat_history, |
| | voice_response_text, |
| | voice_audio_out, |
| | voice_status, |
| | ], |
| | ) |
| |
|
| | clear_voice_btn.click( |
| | lambda: (None, [], "", None, ""), |
| | None, |
| | [ |
| | audio_in, |
| | voice_chat_history, |
| | voice_response_text, |
| | voice_audio_out, |
| | voice_status, |
| | ], |
| | ) |
| |
|
| | |
| | with gr.TabItem("💬 Text Chat"): |
| | chatbot = gr.Chatbot( |
| | label="Conversation", |
| | elem_classes=["chatbot"], |
| | value=[], |
| | ) |
| | msg = gr.Textbox(label="Message") |
| | submit_btn = gr.Button("Send") |
| | clear_btn = gr.Button("Clear") |
| |
|
| | def chat_text_wrapper(message, history): |
| | h, _ = chat_with_bot(message, history) |
| | return h |
| |
|
| | msg.submit(chat_text_wrapper, [msg, chatbot], [chatbot]).then( |
| | lambda: "", None, msg |
| | ) |
| | submit_btn.click(chat_text_wrapper, [msg, chatbot], [chatbot]).then( |
| | lambda: "", None, msg |
| | ) |
| | clear_btn.click(lambda: [], None, chatbot) |
| |
|
| | demo.launch() |
| |
|