Spaces:
Sleeping
Sleeping
| import os | |
| import tempfile | |
| import gradio as gr | |
| import openai | |
| from typing import Optional, List | |
| import hashlib | |
| import base64 | |
| import json | |
| import time | |
| from dotenv import load_dotenv | |
| from gtts import gTTS | |
| import io | |
| import numpy as np | |
| # Load environment variables | |
| load_dotenv() | |
| # Initialize OpenAI client with error handling | |
| api_key = os.getenv("OPENAI_API_KEY") | |
| if not api_key: | |
| raise ValueError("OPENAI_API_KEY environment variable is not set") | |
| # Initialize OpenAI client with older API syntax | |
| openai.api_key = api_key | |
| # Custom CSS for a beautiful, modern look | |
| custom_css = """ | |
| html, body, .gradio-container { | |
| height: 100vh !important; | |
| min-height: 100vh !important; | |
| max-width: 100vw !important; | |
| margin: 0 !important; | |
| padding: 0 !important; | |
| font-family: 'Inter', 'Segoe UI', Arial, sans-serif; | |
| background: #f4f7fb; | |
| color: #222; | |
| } | |
| .centered-main { | |
| display: flex; | |
| flex-direction: column; | |
| align-items: center; | |
| justify-content: flex-start; | |
| min-height: 100vh; | |
| width: 100vw; | |
| padding-top: 32px; | |
| } | |
| .compact-box { | |
| background: #fff; | |
| border-radius: 18px; | |
| box-shadow: 0 4px 24px rgba(0, 60, 180, 0.07), 0 1.5px 4px rgba(0,0,0,0.04); | |
| padding: 32px 32px 20px 32px; | |
| margin-bottom: 32px; | |
| width: 100%; | |
| max-width: 600px; | |
| margin-left: auto; | |
| margin-right: auto; | |
| border: 1.5px solid #e3e8f0; | |
| } | |
| .section-title { | |
| font-size: 1.25rem; | |
| font-weight: 700; | |
| margin-bottom: 18px; | |
| color: #1a237e; | |
| letter-spacing: 0.01em; | |
| } | |
| .upload-btn, .send-btn, .audio-btn, .reset-btn { | |
| background: linear-gradient(135deg, #1976D2 0%, #00bcd4 100%); | |
| color: white; | |
| border: none; | |
| padding: 12px 28px; | |
| border-radius: 24px; | |
| cursor: pointer; | |
| font-weight: 600; | |
| font-size: 16px; | |
| margin-top: 10px; | |
| margin-bottom: 10px; | |
| transition: all 0.2s; | |
| box-shadow: 0 2px 8px rgba(25, 118, 210, 0.08); | |
| } | |
| .upload-btn:hover, .send-btn:hover, .audio-btn:hover, .reset-btn:hover { | |
| background: linear-gradient(135deg, #00bcd4 0%, #1976D2 100%); | |
| box-shadow: 0 4px 16px rgba(0, 188, 212, 0.13); | |
| } | |
| .gradio-chatbot { | |
| border-radius: 14px !important; | |
| border: 1.5px solid #e3e8f0 !important; | |
| background: #f8fafc !important; | |
| padding: 12px !important; | |
| min-height: 350px !important; | |
| max-height: 400px !important; | |
| overflow-y: auto !important; | |
| margin-bottom: 10px; | |
| } | |
| .gradio-audio { | |
| margin-top: 12px; | |
| margin-bottom: 12px; | |
| } | |
| .textbox { | |
| border-radius: 12px !important; | |
| border: 1.5px solid #e3e8f0 !important; | |
| padding: 12px !important; | |
| font-size: 16px !important; | |
| margin-bottom: 10px; | |
| background: #f8fafc !important; | |
| color: #222 !important; | |
| } | |
| .textbox:focus { | |
| border-color: #1976D2 !important; | |
| box-shadow: 0 0 0 2px rgba(25, 118, 210, 0.13) !important; | |
| } | |
| .status-text { | |
| color: #1976D2; | |
| font-size: 15px; | |
| margin-top: 10px; | |
| font-weight: 500; | |
| background: #e3f2fd; | |
| border-radius: 8px; | |
| padding: 8px 12px; | |
| } | |
| /* File upload area */ | |
| input[type="file"]::-webkit-file-upload-button { | |
| background: #1976D2; | |
| color: #fff; | |
| border: none; | |
| border-radius: 8px; | |
| padding: 8px 18px; | |
| font-weight: 600; | |
| cursor: pointer; | |
| } | |
| input[type="file"]::-webkit-file-upload-button:hover { | |
| background: #00bcd4; | |
| } | |
| /* Only one main scroll */ | |
| body, .gradio-container, #root, #app { | |
| overflow: auto !important; | |
| height: 100vh !important; | |
| } | |
| #component-0, #component-1, #component-2, .chatbot, .chat-container { | |
| overflow: visible !important; | |
| height: auto !important; | |
| max-height: none !important; | |
| } | |
| """ | |
| # Custom audio recorder component with improved styling | |
| def create_audio_recorder(): | |
| return gr.HTML(""" | |
| <div class="audio-recorder"> | |
| <button id="recordButton" class="record-button"> | |
| <span class="record-icon">π€</span> | |
| <span class="record-text">Start Recording</span> | |
| </button> | |
| <div id="recordingStatus" class="status-text"></div> | |
| <audio id="audioPlayback" controls style="display: none; margin-top: 10px;"></audio> | |
| </div> | |
| <script> | |
| let mediaRecorder; | |
| let audioChunks = []; | |
| let isRecording = false; | |
| const recordButton = document.getElementById('recordButton'); | |
| const recordingStatus = document.getElementById('recordingStatus'); | |
| const audioPlayback = document.getElementById('audioPlayback'); | |
| recordButton.addEventListener('click', async () => { | |
| if (!isRecording) { | |
| try { | |
| const stream = await navigator.mediaDevices.getUserMedia({ audio: true }); | |
| mediaRecorder = new MediaRecorder(stream); | |
| audioChunks = []; | |
| mediaRecorder.ondataavailable = (event) => { | |
| audioChunks.push(event.data); | |
| }; | |
| mediaRecorder.onstop = () => { | |
| const audioBlob = new Blob(audioChunks, { type: 'audio/wav' }); | |
| const audioUrl = URL.createObjectURL(audioBlob); | |
| audioPlayback.src = audioUrl; | |
| audioPlayback.style.display = 'block'; | |
| const reader = new FileReader(); | |
| reader.readAsDataURL(audioBlob); | |
| reader.onloadend = () => { | |
| const base64Audio = reader.result; | |
| window.parent.postMessage({ | |
| type: 'audio_data', | |
| data: base64Audio | |
| }, '*'); | |
| }; | |
| }; | |
| mediaRecorder.start(); | |
| isRecording = true; | |
| recordButton.classList.add('recording'); | |
| recordButton.querySelector('.record-text').textContent = 'Stop Recording'; | |
| recordingStatus.textContent = 'Recording...'; | |
| } catch (err) { | |
| console.error('Error accessing microphone:', err); | |
| recordingStatus.textContent = 'Error accessing microphone'; | |
| } | |
| } else { | |
| mediaRecorder.stop(); | |
| isRecording = false; | |
| recordButton.classList.remove('recording'); | |
| recordButton.querySelector('.record-text').textContent = 'Start Recording'; | |
| recordingStatus.textContent = 'Recording saved'; | |
| } | |
| }); | |
| </script> | |
| """) | |
| class AdvancedRAG: | |
| def __init__(self): | |
| self.thread_id: Optional[str] = None | |
| self.file_ids: List[str] = [] | |
| self.assistant_id: Optional[str] = os.getenv("ASSISTANT_ID") | |
| if hasattr(self, 'vector_store_id'): | |
| self.vector_store_id = None | |
| def create_thread(self) -> str: | |
| thread = openai.beta.threads.create() | |
| self.thread_id = thread.id | |
| return self.thread_id | |
| def upload_document(self, file) -> str: | |
| # Delete previous file from OpenAI if it exists | |
| if self.file_ids: | |
| for file_id in self.file_ids: | |
| try: | |
| openai.files.delete(file_id) | |
| except Exception as e: | |
| print(f"Warning: Could not delete file {file_id}: {e}") | |
| self.thread_id = None | |
| self.file_ids = [] | |
| if hasattr(self, 'vector_store_id'): | |
| try: | |
| openai.beta.vector_stores.delete(self.vector_store_id) | |
| except Exception as e: | |
| print(f"Warning: Could not delete vector store: {e}") | |
| self.vector_store_id = None | |
| # Wait a moment to ensure deletion is processed | |
| time.sleep(2) | |
| # Upload new file | |
| if not file: | |
| raise Exception("No file uploaded.") | |
| filename = 'uploaded_file.pdf' | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(filename)[1]) as tmp: | |
| tmp.write(file) | |
| tmp.flush() | |
| with open(tmp.name, "rb") as file_obj: | |
| file_obj = openai.files.create( | |
| file=file_obj, | |
| purpose="assistants" | |
| ) | |
| self.file_ids = [file_obj.id] | |
| # Create a new thread for the new document | |
| thread = openai.beta.threads.create() | |
| self.thread_id = thread.id | |
| # Send a message in the new thread with only the new file as an attachment | |
| openai.beta.threads.messages.create( | |
| thread_id=self.thread_id, | |
| role="user", | |
| content="I have uploaded a document. Please analyze it.", | |
| attachments=[{"file_id": self.file_ids[0], "tools": [{"type": "file_search"}]}] | |
| ) | |
| return self.file_ids[0] | |
| def ask_question(self, question: str) -> str: | |
| try: | |
| if not self.thread_id: | |
| self.create_thread() | |
| # Add the question to the thread | |
| openai.beta.threads.messages.create( | |
| thread_id=self.thread_id, | |
| role="user", | |
| content=question | |
| ) | |
| # Create a run | |
| run = openai.beta.threads.runs.create( | |
| thread_id=self.thread_id, | |
| assistant_id=self.assistant_id | |
| ) | |
| # Wait for the run to complete | |
| waited = 0 | |
| while True: | |
| run_status = openai.beta.threads.runs.retrieve( | |
| thread_id=self.thread_id, | |
| run_id=run.id | |
| ) | |
| if run_status.status == 'completed': | |
| break | |
| elif run_status.status == 'failed': | |
| raise Exception("Run failed") | |
| time.sleep(0.2) | |
| waited += 0.2 | |
| if waited > 60: | |
| raise Exception("Run timed out after 60 seconds.") | |
| # Get the latest message | |
| messages = openai.beta.threads.messages.list( | |
| thread_id=self.thread_id, | |
| order='desc', | |
| limit=1 | |
| ) | |
| if not messages.data: | |
| return "No response received from the assistant." | |
| return messages.data[0].content[0].text.value | |
| except Exception as e: | |
| return f"[Error: {str(e)}]" | |
| def transcribe_audio(self, audio_file): | |
| try: | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: | |
| tmp.write(audio_file.read()) | |
| tmp.flush() | |
| tmp_path = tmp.name | |
| with open(tmp_path, "rb") as audio: | |
| transcript = openai.audio.transcriptions.create( | |
| model="whisper-1", | |
| file=audio, | |
| language="en" | |
| ) | |
| os.remove(tmp_path) | |
| return transcript.text | |
| except Exception as e: | |
| return f"[Error transcribing audio: {str(e)}]" | |
| # Initialize RAG system | |
| rag = AdvancedRAG() | |
| def process_file(file): | |
| if file is None: | |
| return "Please upload a file first." | |
| try: | |
| rag.upload_document(file) | |
| return "File uploaded successfully! You can now ask questions about the document." | |
| except Exception as e: | |
| return f"Error uploading file: {str(e)}" | |
| def process_question(question, history): | |
| # Prevent sending empty messages | |
| if not question or not question.strip(): | |
| return "", history, "", None | |
| if not rag.thread_id: | |
| return "Please upload a document first.", history, "", None | |
| try: | |
| response = rag.ask_question(question) | |
| history.append({"role": "user", "content": question}) | |
| history.append({"role": "assistant", "content": response}) | |
| return "", history, "", None | |
| except Exception as e: | |
| history.append({"role": "assistant", "content": f"Error: {str(e)}"}) | |
| return "", history, "", None | |
| def synthesize_text(text): | |
| try: | |
| tts = gTTS(text) | |
| fp = io.BytesIO() | |
| tts.write_to_fp(fp) | |
| fp.seek(0) | |
| return fp.read() | |
| except Exception as e: | |
| return None | |
| def process_voice_note(audio_file, history): | |
| if audio_file is None: | |
| return "Please record or upload an audio file.", history, "", None, None | |
| try: | |
| transcript = None | |
| # If audio_file is a string (filepath), open it as a file | |
| if isinstance(audio_file, str): | |
| with open(audio_file, "rb") as f: | |
| transcript = rag.transcribe_audio(f) | |
| # If audio_file is a tuple (sample_rate, np.ndarray), save as temp WAV and open | |
| elif isinstance(audio_file, tuple) and isinstance(audio_file[1], np.ndarray): | |
| import soundfile as sf | |
| sample_rate, audio_data = audio_file | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: | |
| sf.write(tmp.name, audio_data, sample_rate) | |
| tmp.flush() | |
| with open(tmp.name, "rb") as f: | |
| transcript = rag.transcribe_audio(f) | |
| else: | |
| transcript = rag.transcribe_audio(audio_file) | |
| if not transcript or not str(transcript).strip(): | |
| history.append({"role": "user", "content": "π€ [No audio detected or transcription failed]"}) | |
| history.append({"role": "assistant", "content": "Sorry, I couldn't understand the audio. Please try again."}) | |
| return "", history, "", None, None | |
| if not rag.thread_id: | |
| return "Please upload a document first.", history, "", None, None | |
| response = rag.ask_question(transcript) | |
| history.append({"role": "user", "content": f"π€ {transcript}"}) | |
| history.append({"role": "assistant", "content": response}) | |
| tts_audio = synthesize_text(response) | |
| return "", history, "", None, tts_audio | |
| except Exception as e: | |
| history.append({"role": "user", "content": f"π€ [Error transcribing audio: {str(e)}]"}) | |
| history.append({"role": "assistant", "content": "It seems there was an error while transcribing audio due to a technical issue. If there's anything specific from the document or any other questions you have regarding the content, please let me know, and I can assist you with that information."}) | |
| return "", history, "", None, None | |
| def reset_all(): | |
| rag.thread_id = None | |
| if hasattr(rag, 'file_ids'): | |
| rag.file_ids = [] | |
| if hasattr(rag, 'vector_store_id'): | |
| rag.vector_store_id = None | |
| return "", [], "", None, None | |
| # Create Gradio interface with improved layout | |
| with gr.Blocks(css=custom_css, title="Document Q&A System") as demo: | |
| gr.Markdown(""" | |
| # <span style='color:#1976D2;'>Document Q&A System</span> | |
| <div style='text-align:center; color:#1976D2; margin-bottom:18px;'>Upload a document, record your voice, and chat!</div> | |
| """) | |
| chatbot = gr.Chatbot(height=400, elem_classes="gradio-chatbot", label=None, type="messages") | |
| audio_input = gr.Audio(type="filepath", label="Record or Upload Audio", elem_classes="gradio-audio", visible=False) | |
| tts_output = gr.Audio(label="Assistant Voice Reply", interactive=False, visible=False) | |
| with gr.Row(): | |
| # Left: Document Q&A controls | |
| with gr.Column(scale=1, min_width=350): | |
| with gr.Group(elem_classes="compact-box"): | |
| gr.Markdown("<div class='section-title'>Document Q&A Controls</div>") | |
| file_input = gr.File(label="Upload Document", file_types=[".pdf", ".txt", ".doc", ".docx"], file_count="single", type="binary", elem_classes="upload-btn") | |
| mic_btn = gr.Button("π€ Record Voice", elem_classes="audio-btn") | |
| audio_input | |
| send_voice_btn = gr.Button("Send Voice Note", elem_classes="send-btn", visible=False) | |
| reset_btn = gr.Button("Reset Chat & Upload New Document", elem_classes="reset-btn") | |
| file_output = gr.Textbox(label="Upload Status", interactive=False, elem_classes="textbox") | |
| question = gr.Textbox(label="Type your question and press Enter", placeholder="Ask a question about your document...", elem_classes="textbox") | |
| file_input.change(process_file, file_input, file_output) | |
| def reset_all(): | |
| rag.thread_id = None | |
| if hasattr(rag, 'file_ids'): | |
| rag.file_ids = [] | |
| if hasattr(rag, 'vector_store_id'): | |
| rag.vector_store_id = None | |
| return "", [], "", None, None | |
| reset_btn.click(reset_all, None, [file_output, chatbot, question, audio_input, tts_output]) | |
| def show_audio(): | |
| return {audio_input: gr.update(visible=True), send_voice_btn: gr.update(visible=True)} | |
| mic_btn.click(show_audio, None, [audio_input, send_voice_btn]) | |
| def hide_audio(): | |
| return {audio_input: gr.update(visible=False), send_voice_btn: gr.update(visible=False)} | |
| send_voice_btn.click(process_voice_note, [audio_input, chatbot], [file_output, chatbot, question, audio_input, tts_output]) | |
| send_voice_btn.click(hide_audio, None, [audio_input, send_voice_btn]) | |
| question.submit(process_question, [question, chatbot], [question, chatbot, question, audio_input]) | |
| tts_output | |
| # Right: Chatbot screen | |
| with gr.Column(scale=2, min_width=500): | |
| with gr.Group(elem_classes="compact-box"): | |
| chatbot | |
| # Add JavaScript for audio handling | |
| demo.load( | |
| fn=None, | |
| inputs=None, | |
| outputs=None, | |
| js=""" | |
| function() { | |
| window.addEventListener('message', function(event) { | |
| if (event.data.type === 'audio_data') { | |
| const audioData = event.data.data; | |
| const byteString = atob(audioData.split(',')[1]); | |
| const mimeString = audioData.split(',')[0].split(':')[1].split(';')[0]; | |
| const ab = new ArrayBuffer(byteString.length); | |
| const ia = new Uint8Array(ab); | |
| for (let i = 0; i < byteString.length; i++) { | |
| ia[i] = byteString.charCodeAt(i); | |
| } | |
| const blob = new Blob([ab], {type: mimeString}); | |
| const file = new File([blob], "recording.wav", {type: mimeString}); | |
| const audioInput = document.querySelector('input[type="file"]'); | |
| const dataTransfer = new DataTransfer(); | |
| dataTransfer.items.add(file); | |
| audioInput.files = dataTransfer.files; | |
| audioInput.dispatchEvent(new Event('change', { bubbles: true })); | |
| } | |
| }); | |
| } | |
| """ | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch( | |
| share=True, | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| show_error=True | |
| ) |