Spaces:
Build error
Build error
Update app.py
Browse files
app.py
CHANGED
|
@@ -10,38 +10,35 @@ from gtts import gTTS
|
|
| 10 |
import tempfile
|
| 11 |
import base64
|
| 12 |
import time
|
|
|
|
|
|
|
|
|
|
|
|
|
| 13 |
|
| 14 |
-
# --- Configuration & Initialization ---
|
| 15 |
|
| 16 |
-
#
|
| 17 |
GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
|
| 18 |
|
| 19 |
if not GEMINI_API_KEY:
|
| 20 |
-
# If running locally and not on Spaces, you might use a local env variable
|
| 21 |
-
# Or raise an error if mandatory for deployment
|
| 22 |
print("Warning: GEMINI_API_KEY secret not found. Set it in Hugging Face Space settings.")
|
| 23 |
# raise ValueError("GEMINI_API_KEY secret not found. Please set it in your Space settings.")
|
| 24 |
-
#
|
| 25 |
-
# GEMINI_API_KEY = "YOUR_LOCAL_KEY_FOR_TESTING_ONLY"
|
| 26 |
|
| 27 |
-
# Configure Gemini only if the key is available
|
| 28 |
if GEMINI_API_KEY:
|
| 29 |
try:
|
| 30 |
genai.configure(api_key=GEMINI_API_KEY)
|
| 31 |
-
|
| 32 |
-
generation_model = genai.GenerativeModel('gemini-1.5-flash') # Use 1.5 flash as 2.0 is not public
|
| 33 |
except Exception as e:
|
| 34 |
print(f"Error configuring Gemini or initializing model: {e}")
|
| 35 |
generation_model = None
|
| 36 |
else:
|
| 37 |
generation_model = None
|
| 38 |
|
| 39 |
-
# --- File Paths (Relative
|
| 40 |
PDF_PATH = "about_me.pdf"
|
| 41 |
PROFILE_PIC_PATH = "sk.jpeg"
|
| 42 |
|
| 43 |
-
# --- Utility:
|
| 44 |
-
# No caching decorator needed, load once at startup
|
| 45 |
def get_base64_of_file(file_path):
|
| 46 |
try:
|
| 47 |
with open(file_path, "rb") as f:
|
|
@@ -54,10 +51,40 @@ def get_base64_of_file(file_path):
|
|
| 54 |
print(f"Error reading file {file_path}: {e}")
|
| 55 |
return None
|
| 56 |
|
| 57 |
-
# ---
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 58 |
|
| 59 |
def extract_text_from_pdf(pdf_path):
|
| 60 |
-
"""Extract text from local PDF file."""
|
| 61 |
try:
|
| 62 |
if not os.path.exists(pdf_path):
|
| 63 |
print(f"Error: PDF file not found at {pdf_path}")
|
|
@@ -76,16 +103,11 @@ def extract_text_from_pdf(pdf_path):
|
|
| 76 |
return ""
|
| 77 |
|
| 78 |
def create_document_embeddings(text, model):
|
| 79 |
-
|
| 80 |
-
if not text or model is None:
|
| 81 |
-
return [], None
|
| 82 |
try:
|
| 83 |
-
# Simple split by newline, consider more robust chunking if needed
|
| 84 |
chunks = text.split('\n')
|
| 85 |
chunks = [chunk.strip() for chunk in chunks if chunk.strip()]
|
| 86 |
-
if not chunks:
|
| 87 |
-
print("No text chunks found after splitting.")
|
| 88 |
-
return [], None
|
| 89 |
embeddings = model.encode(chunks)
|
| 90 |
print(f"Created {len(embeddings)} embeddings for {len(chunks)} chunks.")
|
| 91 |
return chunks, embeddings
|
|
@@ -94,19 +116,12 @@ def create_document_embeddings(text, model):
|
|
| 94 |
return [], None
|
| 95 |
|
| 96 |
def retrieve_relevant_context(query, chunks, embeddings, model, top_k=3):
|
| 97 |
-
|
| 98 |
-
if not query or not chunks or embeddings is None or model is None:
|
| 99 |
-
return "No context available."
|
| 100 |
try:
|
| 101 |
query_embedding = model.encode([query])[0]
|
| 102 |
similarities = cosine_similarity([query_embedding], embeddings)[0]
|
| 103 |
-
|
| 104 |
-
|
| 105 |
-
# Handle cases where top_k > num_chunks
|
| 106 |
-
k = min(top_k, num_chunks)
|
| 107 |
-
if k == 0:
|
| 108 |
-
return "No relevant context found."
|
| 109 |
-
# Argsort gives indices of smallest values, use [-k:] and reverse
|
| 110 |
top_indices = np.argsort(similarities)[-k:][::-1]
|
| 111 |
relevant_contexts = [chunks[i] for i in top_indices]
|
| 112 |
return " ".join(relevant_contexts)
|
|
@@ -115,53 +130,37 @@ def retrieve_relevant_context(query, chunks, embeddings, model, top_k=3):
|
|
| 115 |
return "Error finding context."
|
| 116 |
|
| 117 |
def generate_gemini_response(query, context):
|
| 118 |
-
|
| 119 |
-
if not
|
| 120 |
-
return "Model not initialized. Check API Key."
|
| 121 |
-
if not query:
|
| 122 |
-
return "No query provided."
|
| 123 |
-
|
| 124 |
full_prompt = f"""
|
| 125 |
Context: {context}
|
| 126 |
-
|
| 127 |
Question: {query}
|
| 128 |
-
|
| 129 |
Based *only* on the provided context about Satyam, answer the question concisely and in a natural, spoken style, from the first-person perspective (as Satyam).
|
| 130 |
If the context does not contain the information needed to answer the question, respond exactly with:
|
| 131 |
-
"Hmm, that specific detail isn't in my knowledge base right now.
|
| 132 |
Do not invent information not present in the context.
|
| 133 |
"""
|
| 134 |
try:
|
| 135 |
response = generation_model.generate_content(full_prompt)
|
| 136 |
-
# Check for safety ratings or blocks if necessary (depending on Gemini version/settings)
|
| 137 |
if response.candidates:
|
| 138 |
-
# Handle potential lack of 'text' attribute gracefully
|
| 139 |
if hasattr(response.candidates[0].content.parts[0], 'text'):
|
| 140 |
return response.candidates[0].content.parts[0].text.strip()
|
| 141 |
else:
|
| 142 |
print("Warning: Response part does not contain text.")
|
| 143 |
-
# You might want to inspect response.candidates[0].content.parts[0] here
|
| 144 |
return "Sorry, I received an unexpected response format."
|
| 145 |
else:
|
| 146 |
-
# Handle cases where no candidates are returned (e.g., blocked content)
|
| 147 |
print(f"Warning: No candidates returned. Response: {response}")
|
| 148 |
-
# Check prompt feedback for block reason
|
| 149 |
block_reason = response.prompt_feedback.block_reason if hasattr(response, 'prompt_feedback') else 'Unknown'
|
| 150 |
return f"Sorry, I couldn't generate a response. Reason: {block_reason}"
|
| 151 |
-
|
| 152 |
except Exception as e:
|
| 153 |
print(f"Error generating response from Gemini: {e}")
|
| 154 |
return f"Sorry, I encountered an error trying to respond: {e}"
|
| 155 |
|
| 156 |
-
|
| 157 |
def text_to_speech(text):
|
| 158 |
-
|
| 159 |
-
if not text:
|
| 160 |
-
return None
|
| 161 |
try:
|
| 162 |
-
# Create a temporary file
|
| 163 |
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_audio:
|
| 164 |
-
tts = gTTS(text=text, lang='en', tld='co.za')
|
| 165 |
tts.save(temp_audio.name)
|
| 166 |
print(f"Generated TTS audio at {temp_audio.name}")
|
| 167 |
return temp_audio.name
|
|
@@ -169,7 +168,7 @@ def text_to_speech(text):
|
|
| 169 |
print(f"Error generating text-to-speech: {e}")
|
| 170 |
return None
|
| 171 |
|
| 172 |
-
# --- Load resources
|
| 173 |
print("Loading resources...")
|
| 174 |
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
|
| 175 |
document_text = extract_text_from_pdf(PDF_PATH)
|
|
@@ -178,61 +177,68 @@ profile_pic_base64 = get_base64_of_file(PROFILE_PIC_PATH)
|
|
| 178 |
print("Resources loaded.")
|
| 179 |
|
| 180 |
# --- Gradio Interface Logic ---
|
| 181 |
-
|
| 182 |
-
# Initialize speech recognizer
|
| 183 |
recognizer = sr.Recognizer()
|
| 184 |
|
| 185 |
-
def transcribe_audio(audio_filepath
|
| 186 |
-
"""Transcribes audio file to text."""
|
| 187 |
if not audio_filepath:
|
| 188 |
return "", "No audio input detected."
|
|
|
|
|
|
|
| 189 |
try:
|
| 190 |
with sr.AudioFile(audio_filepath) as source:
|
|
|
|
|
|
|
| 191 |
status_update = "Processing audio..."
|
| 192 |
-
|
| 193 |
-
audio = recognizer.record(source)
|
| 194 |
status_update = "Transcribing..."
|
| 195 |
-
|
| 196 |
-
# Recognize speech using Google Web Speech API
|
| 197 |
query = recognizer.recognize_google(audio)
|
| 198 |
print(f"Transcribed query: {query}")
|
| 199 |
status_update = f"You asked: {query}"
|
| 200 |
-
return query, status_update
|
| 201 |
except sr.UnknownValueError:
|
| 202 |
print("Google Speech Recognition could not understand audio")
|
| 203 |
status_update = "Sorry, I couldn't understand what you said."
|
| 204 |
-
return "", status_update
|
| 205 |
except sr.RequestError as e:
|
| 206 |
-
print(f"Could not request results
|
| 207 |
status_update = "Sorry, my speech recognition service is unavailable."
|
| 208 |
-
return "", status_update
|
| 209 |
except Exception as e:
|
| 210 |
print(f"Error during transcription: {e}")
|
| 211 |
status_update = f"Error during transcription: {e}"
|
| 212 |
-
return "", status_update
|
| 213 |
finally:
|
| 214 |
-
# Clean up the temporary audio file uploaded by Gradio
|
| 215 |
if audio_filepath and os.path.exists(audio_filepath):
|
| 216 |
-
try:
|
| 217 |
-
|
| 218 |
-
|
| 219 |
-
except Exception as e:
|
| 220 |
-
print(f"Error deleting temp audio file {audio_filepath}: {e}")
|
| 221 |
|
| 222 |
|
| 223 |
def voice_chat_pipeline(audio_filepath, chat_history_state):
|
| 224 |
-
"""Main function to
|
|
|
|
|
|
|
|
|
|
| 225 |
# 1. Transcribe Audio
|
| 226 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 227 |
|
| 228 |
-
|
| 229 |
-
|
|
|
|
|
|
|
| 230 |
error_audio = text_to_speech(status_update)
|
| 231 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 232 |
|
| 233 |
-
|
| 234 |
status_update = f"Thinking about: '{query}'..."
|
| 235 |
-
yield chat_history_state, status_update, gr.
|
| 236 |
|
| 237 |
# 2. Retrieve Context
|
| 238 |
context = retrieve_relevant_context(query, document_chunks, document_embeddings, embedding_model)
|
|
@@ -244,21 +250,30 @@ def voice_chat_pipeline(audio_filepath, chat_history_state):
|
|
| 244 |
response_audio_path = text_to_speech(response_text)
|
| 245 |
|
| 246 |
# 5. Update History
|
| 247 |
-
# Ensure chat_history_state is treated as a list
|
| 248 |
current_history = chat_history_state if chat_history_state is not None else []
|
| 249 |
updated_history = current_history + [[query, response_text]]
|
| 250 |
|
| 251 |
-
# 6. Yield final results
|
|
|
|
| 252 |
status_update = "Here's my response:"
|
| 253 |
-
# Return updated history, final status, and the path to the response audio
|
| 254 |
-
# Use gr.Audio(value=response_audio_path, autoplay=True) if you want auto-play
|
| 255 |
-
yield updated_history, status_update, gr.Audio(value=response_audio_path, autoplay=False)
|
| 256 |
|
| 257 |
-
#
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 258 |
if response_audio_path and os.path.exists(response_audio_path):
|
| 259 |
-
# Add a small delay to ensure Gradio has served the file
|
| 260 |
-
time.sleep(2)
|
| 261 |
try:
|
|
|
|
|
|
|
| 262 |
os.unlink(response_audio_path)
|
| 263 |
print(f"Cleaned up TTS audio file: {response_audio_path}")
|
| 264 |
except Exception as e:
|
|
@@ -266,20 +281,18 @@ def voice_chat_pipeline(audio_filepath, chat_history_state):
|
|
| 266 |
|
| 267 |
|
| 268 |
# --- Build Gradio App ---
|
| 269 |
-
|
| 270 |
-
# Custom CSS (simplified)
|
| 271 |
css = """
|
| 272 |
.bio-card { background-color: #f4f4f4; padding: 20px; border-radius: 10px; margin: 10px 0; }
|
| 273 |
.circular-img { width: 150px; height: 150px; object-fit: cover; border-radius: 50%; border: 3px solid #4CAF50; display: block; margin-left: auto; margin-right: auto; }
|
| 274 |
.gradio-container { max-width: 800px !important; margin: auto; }
|
| 275 |
-
#chat_history .message.user { background-color: #e0f7fa !important; }
|
| 276 |
-
#chat_history .message.bot { background-color: #f1f8e9 !important; }
|
|
|
|
| 277 |
"""
|
| 278 |
|
| 279 |
with gr.Blocks(css=css, theme=gr.themes.Soft()) as app:
|
| 280 |
gr.Markdown("# Voice QA Bot - Talk to Satyam's AI Assistant")
|
| 281 |
|
| 282 |
-
# Use gr.State to hold conversation history
|
| 283 |
chat_history = gr.State([])
|
| 284 |
|
| 285 |
with gr.Row():
|
|
@@ -289,46 +302,32 @@ with gr.Blocks(css=css, theme=gr.themes.Soft()) as app:
|
|
| 289 |
gr.HTML(f'<img src="data:image/jpeg;base64,{profile_pic_base64}" class="circular-img" alt="My Picture">')
|
| 290 |
else:
|
| 291 |
gr.Markdown("_(Profile picture not loaded)_")
|
| 292 |
-
|
| 293 |
-
gr.
|
| 294 |
-
|
| 295 |
-
|
| 296 |
-
<h3>Hi, I'm Satyam's AI Assistant!</h3>
|
| 297 |
-
<p>
|
| 298 |
-
Ask me questions based on Satyam's profile. I have information from his 'about_me.pdf'.
|
| 299 |
-
I can tell you about his background in AI and Data Science, his interests, and professional goals.
|
| 300 |
-
Just use the microphone!
|
| 301 |
-
</p>
|
| 302 |
-
</div>
|
| 303 |
-
"""
|
| 304 |
-
)
|
| 305 |
-
status_textbox = gr.Textbox(label="Status", value="Ready. Use the microphone to ask a question.", interactive=False)
|
| 306 |
|
| 307 |
with gr.Column(scale=2):
|
| 308 |
gr.Markdown("## Conversation")
|
| 309 |
chatbot_ui = gr.Chatbot(label="Chat History", elem_id="chat_history", height=400)
|
| 310 |
audio_input = gr.Audio(sources=["microphone"], type="filepath", label="🎤 Ask your question:")
|
| 311 |
-
audio_output = gr.Audio(label="🔊 My Response", autoplay=False)
|
| 312 |
|
| 313 |
-
# Connect
|
| 314 |
audio_input.change(
|
| 315 |
fn=voice_chat_pipeline,
|
| 316 |
inputs=[audio_input, chat_history],
|
| 317 |
-
|
| 318 |
-
|
|
|
|
| 319 |
)
|
| 320 |
|
| 321 |
gr.Markdown("---")
|
| 322 |
-
gr.Markdown("Powered by Gradio, Google Gemini,
|
| 323 |
|
| 324 |
-
# Launch
|
| 325 |
if __name__ == "__main__":
|
| 326 |
-
if not GEMINI_API_KEY:
|
| 327 |
-
|
| 328 |
-
print("If running locally, set the environment variable or modify the code.")
|
| 329 |
-
print("If running on Hugging Face Spaces, ensure the 'GEMINI_API_KEY' secret is added in the Space settings.\n")
|
| 330 |
-
if generation_model is None:
|
| 331 |
-
print("\nERROR: Gemini model could not be initialized. Check API Key and configuration.\n")
|
| 332 |
-
|
| 333 |
print("Starting Gradio app...")
|
| 334 |
-
app.launch(debug=True)
|
|
|
|
| 10 |
import tempfile
|
| 11 |
import base64
|
| 12 |
import time
|
| 13 |
+
# NOTE: Importing threading and trying to use the old animation logic
|
| 14 |
+
# is highly discouraged and unlikely to work correctly in Gradio/Spaces.
|
| 15 |
+
# This is included ONLY because you requested the original code structure.
|
| 16 |
+
import threading
|
| 17 |
|
|
|
|
| 18 |
|
| 19 |
+
# --- Configuration & Initialization ---
|
| 20 |
GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
|
| 21 |
|
| 22 |
if not GEMINI_API_KEY:
|
|
|
|
|
|
|
| 23 |
print("Warning: GEMINI_API_KEY secret not found. Set it in Hugging Face Space settings.")
|
| 24 |
# raise ValueError("GEMINI_API_KEY secret not found. Please set it in your Space settings.")
|
| 25 |
+
# GEMINI_API_KEY = "YOUR_LOCAL_KEY_FOR_TESTING_ONLY" # Uncomment for local testing
|
|
|
|
| 26 |
|
|
|
|
| 27 |
if GEMINI_API_KEY:
|
| 28 |
try:
|
| 29 |
genai.configure(api_key=GEMINI_API_KEY)
|
| 30 |
+
generation_model = genai.GenerativeModel('gemini-1.5-flash')
|
|
|
|
| 31 |
except Exception as e:
|
| 32 |
print(f"Error configuring Gemini or initializing model: {e}")
|
| 33 |
generation_model = None
|
| 34 |
else:
|
| 35 |
generation_model = None
|
| 36 |
|
| 37 |
+
# --- File Paths (Relative) ---
|
| 38 |
PDF_PATH = "about_me.pdf"
|
| 39 |
PROFILE_PIC_PATH = "sk.jpeg"
|
| 40 |
|
| 41 |
+
# --- Utility: Base64 Image ---
|
|
|
|
| 42 |
def get_base64_of_file(file_path):
|
| 43 |
try:
|
| 44 |
with open(file_path, "rb") as f:
|
|
|
|
| 51 |
print(f"Error reading file {file_path}: {e}")
|
| 52 |
return None
|
| 53 |
|
| 54 |
+
# --- Original Sound Wave Animation Functions (Adapted Attempt - HIGHLY UNLIKELY TO WORK) ---
|
| 55 |
+
# This state needs to be managed differently in Gradio. Using a simple global
|
| 56 |
+
# or class member might work for single-user local testing but not reliably on Spaces.
|
| 57 |
+
# Let's try managing via gr.State passed around, though the threading part remains problematic.
|
| 58 |
+
|
| 59 |
+
def create_sound_wave(num_bars=20, max_height=50, color="#4CAF50"):
|
| 60 |
+
"""Generates HTML for one frame of the sound wave."""
|
| 61 |
+
# Note: time.time() dependency makes this dynamic, which is good for animation
|
| 62 |
+
heights = [int(max_height * (0.5 + 0.5 * np.sin(i + time.time() * 5))) for i in np.linspace(0, 2 * np.pi, num_bars)]
|
| 63 |
+
bars_html = "".join([
|
| 64 |
+
f'<div style="display: inline-block; width: 5px; height: {height}px; margin: 0 2px; background-color: {color}; transition: height 0.1s ease;"></div>'
|
| 65 |
+
for height in heights
|
| 66 |
+
])
|
| 67 |
+
return f'<div style="display: flex; justify-content: center; align-items: center; height: 60px;">{bars_html}</div>'
|
| 68 |
+
|
| 69 |
+
# --- !! Problem Area !! ---
|
| 70 |
+
# The core issue: This function relies on background threading and continuous updates
|
| 71 |
+
# which doesn't map well to Gradio's event model or web server environments.
|
| 72 |
+
# Trying to run this via Gradio events will likely block or fail.
|
| 73 |
+
# `add_script_run_ctx` is Streamlit specific.
|
| 74 |
+
# Direct updates to `gr.HTML` from a background thread are not the standard Gradio way.
|
| 75 |
+
|
| 76 |
+
# We can define the function but calling it effectively from Gradio events is the challenge.
|
| 77 |
+
# Let's *not* actually try to run the thread here, but keep the generator.
|
| 78 |
+
# We will return the *static* HTML from create_sound_wave when needed instead.
|
| 79 |
+
# This means NO ANIMATION, just a static wave picture.
|
| 80 |
+
|
| 81 |
+
# If you absolutely need animation, you'd typically use JavaScript within gr.HTML
|
| 82 |
+
# or find/build a custom Gradio component.
|
| 83 |
+
|
| 84 |
+
|
| 85 |
+
# --- Core Logic (Mostly unchanged from previous Gradio version) ---
|
| 86 |
|
| 87 |
def extract_text_from_pdf(pdf_path):
|
|
|
|
| 88 |
try:
|
| 89 |
if not os.path.exists(pdf_path):
|
| 90 |
print(f"Error: PDF file not found at {pdf_path}")
|
|
|
|
| 103 |
return ""
|
| 104 |
|
| 105 |
def create_document_embeddings(text, model):
|
| 106 |
+
if not text or model is None: return [], None
|
|
|
|
|
|
|
| 107 |
try:
|
|
|
|
| 108 |
chunks = text.split('\n')
|
| 109 |
chunks = [chunk.strip() for chunk in chunks if chunk.strip()]
|
| 110 |
+
if not chunks: return [], None
|
|
|
|
|
|
|
| 111 |
embeddings = model.encode(chunks)
|
| 112 |
print(f"Created {len(embeddings)} embeddings for {len(chunks)} chunks.")
|
| 113 |
return chunks, embeddings
|
|
|
|
| 116 |
return [], None
|
| 117 |
|
| 118 |
def retrieve_relevant_context(query, chunks, embeddings, model, top_k=3):
|
| 119 |
+
if not query or not chunks or embeddings is None or model is None: return "No context available."
|
|
|
|
|
|
|
| 120 |
try:
|
| 121 |
query_embedding = model.encode([query])[0]
|
| 122 |
similarities = cosine_similarity([query_embedding], embeddings)[0]
|
| 123 |
+
k = min(top_k, len(chunks))
|
| 124 |
+
if k == 0: return "No relevant context found."
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 125 |
top_indices = np.argsort(similarities)[-k:][::-1]
|
| 126 |
relevant_contexts = [chunks[i] for i in top_indices]
|
| 127 |
return " ".join(relevant_contexts)
|
|
|
|
| 130 |
return "Error finding context."
|
| 131 |
|
| 132 |
def generate_gemini_response(query, context):
|
| 133 |
+
if not generation_model: return "Model not initialized. Check API Key."
|
| 134 |
+
if not query: return "No query provided."
|
|
|
|
|
|
|
|
|
|
|
|
|
| 135 |
full_prompt = f"""
|
| 136 |
Context: {context}
|
|
|
|
| 137 |
Question: {query}
|
|
|
|
| 138 |
Based *only* on the provided context about Satyam, answer the question concisely and in a natural, spoken style, from the first-person perspective (as Satyam).
|
| 139 |
If the context does not contain the information needed to answer the question, respond exactly with:
|
| 140 |
+
"Hmm, that specific detail isn't in my knowledge base right now. You might need to ask me directly sometime!"
|
| 141 |
Do not invent information not present in the context.
|
| 142 |
"""
|
| 143 |
try:
|
| 144 |
response = generation_model.generate_content(full_prompt)
|
|
|
|
| 145 |
if response.candidates:
|
|
|
|
| 146 |
if hasattr(response.candidates[0].content.parts[0], 'text'):
|
| 147 |
return response.candidates[0].content.parts[0].text.strip()
|
| 148 |
else:
|
| 149 |
print("Warning: Response part does not contain text.")
|
|
|
|
| 150 |
return "Sorry, I received an unexpected response format."
|
| 151 |
else:
|
|
|
|
| 152 |
print(f"Warning: No candidates returned. Response: {response}")
|
|
|
|
| 153 |
block_reason = response.prompt_feedback.block_reason if hasattr(response, 'prompt_feedback') else 'Unknown'
|
| 154 |
return f"Sorry, I couldn't generate a response. Reason: {block_reason}"
|
|
|
|
| 155 |
except Exception as e:
|
| 156 |
print(f"Error generating response from Gemini: {e}")
|
| 157 |
return f"Sorry, I encountered an error trying to respond: {e}"
|
| 158 |
|
|
|
|
| 159 |
def text_to_speech(text):
|
| 160 |
+
if not text: return None
|
|
|
|
|
|
|
| 161 |
try:
|
|
|
|
| 162 |
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_audio:
|
| 163 |
+
tts = gTTS(text=text, lang='en', tld='co.za')
|
| 164 |
tts.save(temp_audio.name)
|
| 165 |
print(f"Generated TTS audio at {temp_audio.name}")
|
| 166 |
return temp_audio.name
|
|
|
|
| 168 |
print(f"Error generating text-to-speech: {e}")
|
| 169 |
return None
|
| 170 |
|
| 171 |
+
# --- Load resources ---
|
| 172 |
print("Loading resources...")
|
| 173 |
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
|
| 174 |
document_text = extract_text_from_pdf(PDF_PATH)
|
|
|
|
| 177 |
print("Resources loaded.")
|
| 178 |
|
| 179 |
# --- Gradio Interface Logic ---
|
|
|
|
|
|
|
| 180 |
recognizer = sr.Recognizer()
|
| 181 |
|
| 182 |
+
def transcribe_audio(audio_filepath):
|
| 183 |
+
"""Transcribes audio file to text. Returns (query, status_message)."""
|
| 184 |
if not audio_filepath:
|
| 185 |
return "", "No audio input detected."
|
| 186 |
+
query = ""
|
| 187 |
+
status_update = ""
|
| 188 |
try:
|
| 189 |
with sr.AudioFile(audio_filepath) as source:
|
| 190 |
+
# Adjust for ambient noise (less critical with file input but doesn't hurt)
|
| 191 |
+
# recognizer.adjust_for_ambient_noise(source, duration=0.5)
|
| 192 |
status_update = "Processing audio..."
|
| 193 |
+
# Yielding status here requires the outer function to be a generator
|
| 194 |
+
audio = recognizer.record(source)
|
| 195 |
status_update = "Transcribing..."
|
| 196 |
+
# Yield status
|
|
|
|
| 197 |
query = recognizer.recognize_google(audio)
|
| 198 |
print(f"Transcribed query: {query}")
|
| 199 |
status_update = f"You asked: {query}"
|
|
|
|
| 200 |
except sr.UnknownValueError:
|
| 201 |
print("Google Speech Recognition could not understand audio")
|
| 202 |
status_update = "Sorry, I couldn't understand what you said."
|
|
|
|
| 203 |
except sr.RequestError as e:
|
| 204 |
+
print(f"Could not request results; {e}")
|
| 205 |
status_update = "Sorry, my speech recognition service is unavailable."
|
|
|
|
| 206 |
except Exception as e:
|
| 207 |
print(f"Error during transcription: {e}")
|
| 208 |
status_update = f"Error during transcription: {e}"
|
|
|
|
| 209 |
finally:
|
|
|
|
| 210 |
if audio_filepath and os.path.exists(audio_filepath):
|
| 211 |
+
try: os.unlink(audio_filepath); print(f"Cleaned up temp audio: {audio_filepath}")
|
| 212 |
+
except Exception as e: print(f"Error deleting temp audio {audio_filepath}: {e}")
|
| 213 |
+
return query, status_update
|
|
|
|
|
|
|
| 214 |
|
| 215 |
|
| 216 |
def voice_chat_pipeline(audio_filepath, chat_history_state):
|
| 217 |
+
"""Main function modified to yield updates for animation attempt."""
|
| 218 |
+
# Initial state: Clear animation, set status
|
| 219 |
+
yield chat_history_state, "Processing...", gr.HTML(value=""), gr.Audio(value=None)
|
| 220 |
+
|
| 221 |
# 1. Transcribe Audio
|
| 222 |
+
# Show "listening" animation (static snapshot)
|
| 223 |
+
listening_wave_html = create_sound_wave(color="#4CAF50")
|
| 224 |
+
yield chat_history_state, "Listening (processing)...", gr.HTML(value=listening_wave_html), gr.Audio(value=None)
|
| 225 |
+
|
| 226 |
+
query, status_update = transcribe_audio(audio_filepath)
|
| 227 |
|
| 228 |
+
# Clear animation after transcription attempt
|
| 229 |
+
yield chat_history_state, status_update, gr.HTML(value=""), gr.Audio(value=None)
|
| 230 |
+
|
| 231 |
+
if not query:
|
| 232 |
error_audio = text_to_speech(status_update)
|
| 233 |
+
# Show static "error" wave? Or just keep it clear. Let's keep clear.
|
| 234 |
+
yield chat_history_state, status_update, gr.HTML(value=""), error_audio or gr.Audio(value=None)
|
| 235 |
+
# Clean up potential error audio
|
| 236 |
+
if error_audio and os.path.exists(error_audio): time.sleep(1); os.unlink(error_audio)
|
| 237 |
+
return # Stop processing if transcription failed
|
| 238 |
|
| 239 |
+
# Update status before generation
|
| 240 |
status_update = f"Thinking about: '{query}'..."
|
| 241 |
+
yield chat_history_state, status_update, gr.HTML(value=""), gr.Audio(value=None)
|
| 242 |
|
| 243 |
# 2. Retrieve Context
|
| 244 |
context = retrieve_relevant_context(query, document_chunks, document_embeddings, embedding_model)
|
|
|
|
| 250 |
response_audio_path = text_to_speech(response_text)
|
| 251 |
|
| 252 |
# 5. Update History
|
|
|
|
| 253 |
current_history = chat_history_state if chat_history_state is not None else []
|
| 254 |
updated_history = current_history + [[query, response_text]]
|
| 255 |
|
| 256 |
+
# 6. Yield final results with "speaking" animation (static snapshot)
|
| 257 |
+
speaking_wave_html = create_sound_wave(color="#FF5733") # Different color for speaking
|
| 258 |
status_update = "Here's my response:"
|
|
|
|
|
|
|
|
|
|
| 259 |
|
| 260 |
+
# Yield history, status, speaking wave, and audio output
|
| 261 |
+
yield updated_history, status_update, gr.HTML(value=speaking_wave_html), gr.Audio(value=response_audio_path, autoplay=False)
|
| 262 |
+
|
| 263 |
+
# Keep the "speaking" wave visible briefly while audio potentially plays, then clear it.
|
| 264 |
+
# This is tricky without knowing exactly when playback finishes in the browser.
|
| 265 |
+
# A simple time delay is a crude approximation.
|
| 266 |
+
time.sleep(3) # Keep wave visible for 3 seconds (adjust as needed)
|
| 267 |
+
|
| 268 |
+
# Final yield to clear the animation after potential playback
|
| 269 |
+
yield updated_history, status_update, gr.HTML(value=""), gr.Audio(value=response_audio_path, autoplay=False)
|
| 270 |
+
|
| 271 |
+
|
| 272 |
+
# Clean up TTS audio file
|
| 273 |
if response_audio_path and os.path.exists(response_audio_path):
|
|
|
|
|
|
|
| 274 |
try:
|
| 275 |
+
# Delay slightly longer before deleting to ensure Gradio served it
|
| 276 |
+
time.sleep(2)
|
| 277 |
os.unlink(response_audio_path)
|
| 278 |
print(f"Cleaned up TTS audio file: {response_audio_path}")
|
| 279 |
except Exception as e:
|
|
|
|
| 281 |
|
| 282 |
|
| 283 |
# --- Build Gradio App ---
|
|
|
|
|
|
|
| 284 |
css = """
|
| 285 |
.bio-card { background-color: #f4f4f4; padding: 20px; border-radius: 10px; margin: 10px 0; }
|
| 286 |
.circular-img { width: 150px; height: 150px; object-fit: cover; border-radius: 50%; border: 3px solid #4CAF50; display: block; margin-left: auto; margin-right: auto; }
|
| 287 |
.gradio-container { max-width: 800px !important; margin: auto; }
|
| 288 |
+
#chat_history .message.user { background-color: #e0f7fa !important; }
|
| 289 |
+
#chat_history .message.bot { background-color: #f1f8e9 !important; }
|
| 290 |
+
#animation_html_output div { min-height: 60px; } /* Ensure space for wave */
|
| 291 |
"""
|
| 292 |
|
| 293 |
with gr.Blocks(css=css, theme=gr.themes.Soft()) as app:
|
| 294 |
gr.Markdown("# Voice QA Bot - Talk to Satyam's AI Assistant")
|
| 295 |
|
|
|
|
| 296 |
chat_history = gr.State([])
|
| 297 |
|
| 298 |
with gr.Row():
|
|
|
|
| 302 |
gr.HTML(f'<img src="data:image/jpeg;base64,{profile_pic_base64}" class="circular-img" alt="My Picture">')
|
| 303 |
else:
|
| 304 |
gr.Markdown("_(Profile picture not loaded)_")
|
| 305 |
+
gr.HTML("""<div class="bio-card"><h3>Hi, I'm Satyam's AI Assistant!</h3><p>Ask me questions based on Satyam's profile using the microphone.</p></div>""")
|
| 306 |
+
status_textbox = gr.Textbox(label="Status", value="Ready.", interactive=False)
|
| 307 |
+
# Placeholder for the "animation" (will show static wave snapshots)
|
| 308 |
+
animation_output = gr.HTML(elem_id="animation_html_output", value="")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 309 |
|
| 310 |
with gr.Column(scale=2):
|
| 311 |
gr.Markdown("## Conversation")
|
| 312 |
chatbot_ui = gr.Chatbot(label="Chat History", elem_id="chat_history", height=400)
|
| 313 |
audio_input = gr.Audio(sources=["microphone"], type="filepath", label="🎤 Ask your question:")
|
| 314 |
+
audio_output = gr.Audio(label="🔊 My Response", autoplay=False)
|
| 315 |
|
| 316 |
+
# Connect audio input to the pipeline
|
| 317 |
audio_input.change(
|
| 318 |
fn=voice_chat_pipeline,
|
| 319 |
inputs=[audio_input, chat_history],
|
| 320 |
+
# Output includes the HTML component for the wave snapshot
|
| 321 |
+
outputs=[chatbot_ui, status_textbox, animation_output, audio_output],
|
| 322 |
+
show_progress="minimal" # Use minimal progress as we have status textbox
|
| 323 |
)
|
| 324 |
|
| 325 |
gr.Markdown("---")
|
| 326 |
+
gr.Markdown("Powered by Gradio, Google Gemini, etc.")
|
| 327 |
|
| 328 |
+
# Launch for local testing (Gradio on Spaces handles this)
|
| 329 |
if __name__ == "__main__":
|
| 330 |
+
if not GEMINI_API_KEY: print("\nERROR: GEMINI_API_KEY not set.\n")
|
| 331 |
+
if generation_model is None: print("\nERROR: Gemini model not initialized.\n")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 332 |
print("Starting Gradio app...")
|
| 333 |
+
app.launch(debug=True)
|