Spaces:
Sleeping
Sleeping
| # app.py | |
| """ | |
| Multi-Mode AI Assistant (Voice, PDF, Image) with Wow-Factor Features | |
| - Preserves original features | |
| - Adds snippet highlighting, cross-modal memory, styled PDF generation | |
| - Live waveform placeholder for voice input | |
| - Modular & Hugging Face safe | |
| """ | |
| import os | |
| import uuid | |
| import tempfile | |
| import requests | |
| from dotenv import load_dotenv | |
| from gtts import gTTS | |
| from PyPDF2 import PdfReader | |
| import gradio as gr | |
| from sentence_transformers import SentenceTransformer, util | |
| from fpdf import FPDF | |
| from datetime import datetime | |
| # ------------------ Load API Keys ------------------ | |
| load_dotenv() | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY", "").strip() | |
| OCR_SPACE_API_KEY = os.getenv("OCR_SPACE_API_KEY", "").strip() | |
| if not GROQ_API_KEY: | |
| raise ValueError("❌ GROQ_API_KEY missing. Set it in env / Hugging Face Secrets.") | |
| if not OCR_SPACE_API_KEY: | |
| raise ValueError("❌ OCR_SPACE_API_KEY missing. Set it in env / Hugging Face Secrets.") | |
| HEADERS = {"Authorization": f"Bearer {GROQ_API_KEY}"} | |
| # ------------------ Global State ------------------ | |
| SESSION_HISTORY = {} | |
| CHAT_DISPLAY = {} | |
| PDF_CONTENT = {} | |
| PDF_EMBEDS = {} | |
| IMAGE_TEXT = {} | |
| IMAGE_EMBEDS = {} | |
| CHUNK_SIZE = 1500 | |
| # Load embedding model | |
| embed_model = SentenceTransformer("all-MiniLM-L6-v2") | |
| # ------------------ Helpers ------------------ | |
| def _get_path_from_gr_file(gr_file): | |
| if not gr_file: | |
| return None | |
| if isinstance(gr_file, str) and os.path.exists(gr_file): | |
| return gr_file | |
| try: | |
| if hasattr(gr_file, "name") and os.path.exists(gr_file.name): | |
| return gr_file.name | |
| except Exception: | |
| pass | |
| if isinstance(gr_file, dict): | |
| for key in ("name", "file_name", "filepath"): | |
| if key in gr_file: | |
| candidate = gr_file.get(key) | |
| if isinstance(candidate, str) and os.path.exists(candidate): | |
| return candidate | |
| return None | |
| def chunk_text(text, size=CHUNK_SIZE): | |
| return [text[i:i + size] for i in range(0, len(text), size)] | |
| def synthesize_speech(text, lang="en"): | |
| try: | |
| if not text: | |
| return None | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") | |
| gTTS(text=text, lang=lang).save(tmp.name) | |
| return tmp.name | |
| except Exception as e: | |
| print("TTS error:", e) | |
| return None | |
| def select_relevant_chunk(question, chunks, chunk_embeds): | |
| if not chunks or chunk_embeds is None: | |
| return "" | |
| q_emb = embed_model.encode(question, convert_to_tensor=True) | |
| scores = util.cos_sim(q_emb, chunk_embeds)[0] | |
| top_idx = int(scores.argmax().item()) | |
| return chunks[top_idx] | |
| def _chat_display_to_messages(chat_display): | |
| msgs = [] | |
| for user, assistant in chat_display: | |
| msgs.append({"role": "user", "content": user}) | |
| msgs.append({"role": "assistant", "content": assistant}) | |
| return msgs | |
| # ------------------ Transcription & LLM ------------------ | |
| def transcribe_audio(audio_path): | |
| if not audio_path or not os.path.exists(audio_path): | |
| return "Error: audio file missing." | |
| try: | |
| url = "https://api.groq.com/openai/v1/audio/transcriptions" | |
| with open(audio_path, "rb") as f: | |
| files = {"file": (os.path.basename(audio_path), f, "audio/wav")} | |
| data = {"model": "whisper-large-v3"} | |
| resp = requests.post(url, headers=HEADERS, files=files, data=data, timeout=60) | |
| resp.raise_for_status() | |
| return resp.json().get("text", "") or "" | |
| except Exception as e: | |
| print("transcription error:", e) | |
| return f"Error transcribing audio: {e}" | |
| def groq_chat_completion(messages): | |
| body = {"model": "llama-3.1-8b-instant", "messages": messages} | |
| try: | |
| resp = requests.post("https://api.groq.com/openai/v1/chat/completions", headers=HEADERS, json=body, timeout=60) | |
| resp.raise_for_status() | |
| return resp.json()["choices"][0]["message"]["content"] | |
| except Exception as e: | |
| print("groq_chat_completion error:", e) | |
| return f"Error generating response: {e}" | |
| def generate_response(session_id, user_text, enhancer_enabled=False, enhancer_tone="Helpful"): | |
| if session_id not in SESSION_HISTORY: | |
| SESSION_HISTORY[session_id] = [] | |
| SESSION_HISTORY[session_id].append({"role": "user", "content": user_text}) | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": "You are a helpful AI assistant. ALWAYS respond in English only, regardless of the user's language or the input language." | |
| } | |
| ] + SESSION_HISTORY[session_id] | |
| if enhancer_enabled: | |
| messages.append({"role": "user", "content": f"Enhance response. Tone: {enhancer_tone}. Question: {user_text}"}) | |
| assistant_text = groq_chat_completion(messages) | |
| SESSION_HISTORY[session_id].append({"role": "assistant", "content": assistant_text}) | |
| return assistant_text | |
| # ------------------ PDF handling ------------------ | |
| def handle_pdf_upload(pdf_file, session_id): | |
| path = _get_path_from_gr_file(pdf_file) | |
| if not path: | |
| return "No file uploaded or file unreadable." | |
| try: | |
| reader = PdfReader(path) | |
| text = "" | |
| for page in reader.pages: | |
| text += (page.extract_text() or "") + "\n" | |
| if not text.strip(): | |
| return "No extractable content found in PDF." | |
| chunks = chunk_text(text) | |
| PDF_CONTENT[session_id] = chunks | |
| PDF_EMBEDS[session_id] = embed_model.encode(chunks, convert_to_tensor=True) | |
| return f"PDF processed: {len(chunks)} chunks ready." | |
| except Exception as e: | |
| print("PDF upload error:", e) | |
| return f"Error processing PDF: {e}" | |
| def handle_pdf_question(question, session_id): | |
| if session_id not in PDF_CONTENT: | |
| return "Document not found. Upload first." | |
| chunk = select_relevant_chunk(question, PDF_CONTENT[session_id], PDF_EMBEDS[session_id]) | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": "You are a helpful assistant summarizing PDF content. ALWAYS respond in English only, regardless of the user's language." | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"PDF chunk:\n{chunk}\n\nQuestion: {question}" | |
| } | |
| ] | |
| assistant_text = groq_chat_completion(messages) | |
| assistant_text = f"**Snippet from PDF:**\n{chunk[:200]}...\n\n**Answer:**\n{assistant_text}" | |
| if session_id not in SESSION_HISTORY: | |
| SESSION_HISTORY[session_id] = [] | |
| SESSION_HISTORY[session_id].append({"role": "assistant", "content": assistant_text}) | |
| return assistant_text | |
| # ------------------ Image OCR ------------------ | |
| def ocr_space_file(image_path, api_key, language="eng"): | |
| if not image_path or not os.path.exists(image_path): | |
| return "" | |
| try: | |
| with open(image_path, "rb") as f: | |
| payload = {"apikey": api_key, "language": language} | |
| files = {"file": f} | |
| r = requests.post("https://api.ocr.space/parse/image", files=files, data=payload, timeout=60) | |
| r.raise_for_status() | |
| j = r.json() | |
| if j.get("IsErroredOnProcessing"): | |
| print("OCR.space processing error:", j) | |
| return "" | |
| parsed = [pr.get("ParsedText", "") for pr in j.get("ParsedResults", [])] | |
| return "\n".join(parsed) | |
| except Exception as e: | |
| print("ocr_space_file error:", e) | |
| return "" | |
| def handle_image_upload(image_file, session_id): | |
| path = _get_path_from_gr_file(image_file) | |
| if not path: | |
| return "No image uploaded or file unreadable.", "" | |
| parsed = ocr_space_file(path, OCR_SPACE_API_KEY) | |
| if not parsed.strip(): | |
| return "No extractable text found in the image.", "" | |
| chunks = chunk_text(parsed) | |
| IMAGE_TEXT[session_id] = chunks | |
| IMAGE_EMBEDS[session_id] = embed_model.encode(chunks, convert_to_tensor=True) | |
| return f"Image processed: {len(chunks)} chunks ready.", "" | |
| def handle_image_question(question, session_id): | |
| if session_id not in IMAGE_TEXT: | |
| return "Image not found. Upload first." | |
| chunk = select_relevant_chunk(question, IMAGE_TEXT[session_id], IMAGE_EMBEDS[session_id]) | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": "You are a helpful assistant summarizing image text. ALWAYS respond in English only, regardless of the user's language." | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"Image chunk:\n{chunk}\n\nQuestion: {question}" | |
| } | |
| ] | |
| assistant_text = groq_chat_completion(messages) | |
| assistant_text = f"**Snippet from Image:**\n{chunk[:200]}...\n\n**Answer:**\n{assistant_text}" | |
| if session_id not in SESSION_HISTORY: | |
| SESSION_HISTORY[session_id] = [] | |
| SESSION_HISTORY[session_id].append({"role": "assistant", "content": assistant_text}) | |
| return assistant_text | |
| # ------------------ PDF Generation ------------------ | |
| def generate_pdf_file(text, filename_prefix="summary"): | |
| pdf = FPDF() | |
| pdf.add_page() | |
| pdf.set_auto_page_break(auto=True, margin=15) | |
| pdf.set_font("Arial", "B", size=14) | |
| pdf.multi_cell(0, 8, f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M')}\n\n") | |
| pdf.set_font("Arial", size=12) | |
| for line in text.split("\n"): | |
| pdf.multi_cell(0, 6, line) | |
| file_path = f"/tmp/{filename_prefix}_{uuid.uuid4()}.pdf" | |
| pdf.output(file_path) | |
| return file_path | |
| def download_pdf_summary(session_id): | |
| summary_text = "\n".join([m["content"] for m in SESSION_HISTORY.get(session_id, []) if m["role"]=="assistant"]) | |
| if not summary_text: | |
| summary_text = "No summary available." | |
| return generate_pdf_file(summary_text, "summary") | |
| # ------------------ Voice & Chat Handlers ------------------ | |
| def _append_chat_display(session_id, user_text, assistant_text): | |
| if session_id not in CHAT_DISPLAY: | |
| CHAT_DISPLAY[session_id] = [] | |
| CHAT_DISPLAY[session_id].append((user_text, assistant_text)) | |
| def handle_voice_general(audio_file, session_id, tts_lang="en", enhancer_enabled=False, enhancer_tone="Helpful"): | |
| path = _get_path_from_gr_file(audio_file) | |
| if not path: | |
| return "No audio provided.", None, [] | |
| user_text = transcribe_audio(path) | |
| assistant_text = generate_response(session_id, user_text, enhancer_enabled, enhancer_tone) | |
| _append_chat_display(session_id, user_text, assistant_text) | |
| audio_path = synthesize_speech(assistant_text, lang=tts_lang) | |
| return assistant_text, audio_path, _chat_display_to_messages(CHAT_DISPLAY[session_id]) | |
| def handle_voice_pdf(audio_file, session_id, tts_lang="en"): | |
| path = _get_path_from_gr_file(audio_file) | |
| if not path: | |
| return "No audio provided.", None, [] | |
| user_text = transcribe_audio(path) | |
| assistant_text = handle_pdf_question(user_text, session_id) | |
| _append_chat_display(session_id, user_text, assistant_text) | |
| audio_path = synthesize_speech(assistant_text, lang=tts_lang) | |
| return assistant_text, audio_path, _chat_display_to_messages(CHAT_DISPLAY[session_id]) | |
| def handle_voice_image(audio_file, session_id, tts_lang="en"): | |
| path = _get_path_from_gr_file(audio_file) | |
| if not path: | |
| return "No audio provided.", None, [] | |
| user_text = transcribe_audio(path) | |
| assistant_text = handle_image_question(user_text, session_id) | |
| _append_chat_display(session_id, user_text, assistant_text) | |
| audio_path = synthesize_speech(assistant_text, lang=tts_lang) | |
| return assistant_text, audio_path, _chat_display_to_messages(CHAT_DISPLAY[session_id]) | |
| def handle_text_general(user_text, session_id, enhancer_enabled=False, enhancer_tone="Helpful"): | |
| assistant = generate_response(session_id, user_text, enhancer_enabled, enhancer_tone) | |
| _append_chat_display(session_id, user_text, assistant) | |
| return assistant, _chat_display_to_messages(CHAT_DISPLAY[session_id]) | |
| def handle_text_pdf(question, session_id): | |
| return handle_pdf_question(question, session_id) | |
| def handle_text_image(question, session_id): | |
| return handle_image_question(question, session_id) | |
| # ------------------ Gradio UI ------------------ | |
| with gr.Blocks() as demo: | |
| gr.HTML(""" | |
| <style> | |
| /* Change height + width of the audio recorder box */ | |
| #mic_box audio { | |
| height: 50px !important; | |
| width: 200px !important; | |
| } | |
| </style> | |
| """) | |
| gr.Markdown("## 🛠 Multi-Mode AI Assistant (Voice, PDF, Image)") | |
| session_voice = gr.State(str(uuid.uuid4())) | |
| session_pdf = gr.State(str(uuid.uuid4())) | |
| session_image = gr.State(str(uuid.uuid4())) | |
| with gr.Tab("🎤 Voice Chat"): | |
| chat_voice = gr.Chatbot(height=320) | |
| with gr.Row(): | |
| mic = gr.Audio(type="filepath", label="🎤 Record Voice (hold & speak)", elem_id="mic_box") | |
| audio_output = gr.Audio(label="Assistant Voice Output", type="filepath", interactive=False) | |
| tts_lang = gr.Dropdown(choices=["en", "ur"], value="en", label="TTS Language") | |
| with gr.Row(): | |
| btn_general = gr.Button("⚡Ask General 🎯") | |
| btn_pdf = gr.Button("⚡Ask PDF 📄") | |
| btn_image = gr.Button("⚡Ask Image 🖼") | |
| enhancer_toggle = gr.Checkbox(label="Enable Response Enhancer", value=False, scale=1) | |
| tone_dropdown = gr.Dropdown(choices=["Helpful", "Formal", "Friendly"], value="Helpful", label="Enhancer Tone", scale=1) | |
| with gr.Row(): | |
| btn_reset_logs = gr.Button("♻ Reset LOGs") | |
| btn_download_logs = gr.Button("📥 Download Summary") | |
| Voice_summary_file = gr.File(label="📥Download Summary File", interactive=False, scale=1) | |
| answer_voice = gr.Textbox(label="Assistant Answer (text)", lines=2, visible=False) | |
| btn_general.click(fn=handle_voice_general, | |
| inputs=[mic, session_voice, tts_lang, enhancer_toggle, tone_dropdown], | |
| outputs=[answer_voice, audio_output, chat_voice]) | |
| btn_pdf.click(fn=handle_voice_pdf, inputs=[mic, session_pdf, tts_lang], outputs=[answer_voice, audio_output, chat_voice]) | |
| btn_image.click(fn=handle_voice_image, inputs=[mic, session_image, tts_lang], outputs=[answer_voice, audio_output, chat_voice]) | |
| btn_reset_logs.click(lambda: (str(uuid.uuid4()), [], None, None, ""), outputs=[session_voice, chat_voice, mic, audio_output, answer_voice]) | |
| btn_download_logs.click(download_pdf_summary, inputs=[session_voice], outputs=[Voice_summary_file]) | |
| with gr.Tab("📄 PDF Summarizer"): | |
| pdf_output = gr.Textbox(label="Answer (Text Only)", lines=5) | |
| with gr.Row(): | |
| pdf_upload_btn = gr.File(label="Upload PDF", file_types=[".pdf"], scale=1) | |
| pdf_question = gr.Textbox(label="Ask a question about PDF (text)", lines=3) | |
| pdf_upload_msg = gr.Textbox(label="Upload Status", interactive=False) | |
| with gr.Row(): | |
| pdf_send_btn = gr.Button("Ask (Questions)") | |
| pdf_reset_btn = gr.Button("♻ Reset LOGs") | |
| with gr.Row(): | |
| pdf_summary_file = gr.File(label="📥Download Summary File", interactive=False, scale=1) | |
| pdf_download_btn = gr.Button("📥 Download Summary") | |
| pdf_upload_btn.upload(handle_pdf_upload, inputs=[pdf_upload_btn, session_pdf], outputs=[pdf_upload_msg]) | |
| pdf_send_btn.click(handle_text_pdf, inputs=[pdf_question, session_pdf], outputs=[pdf_output]) | |
| pdf_reset_btn.click(lambda: (str(uuid.uuid4()), ""), outputs=[session_pdf, pdf_output]) | |
| pdf_download_btn.click(download_pdf_summary, inputs=[session_pdf], outputs=[pdf_summary_file]) | |
| with gr.Tab("🖼 Image OCR"): | |
| image_output = gr.Textbox(label="Answer (Text Only)", lines=5) | |
| with gr.Row(): | |
| image_upload_btn = gr.File(label="Upload Image", file_types=[".png", ".jpg", ".jpeg"], scale=1) | |
| image_question = gr.Textbox(label="Ask question about Image", lines=3) | |
| image_upload_msg = gr.Textbox(label="Upload Status", interactive=False) | |
| with gr.Row(): | |
| image_send_btn = gr.Button("Ask (Questions)") | |
| image_reset_btn = gr.Button("♻ Reset LOGs") | |
| with gr.Row(): | |
| image_summary_file = gr.File(label="📥Download Summary File", interactive=False, scale=1) | |
| image_download_btn = gr.Button("📥 Download Summary") | |
| image_upload_btn.upload(handle_image_upload, inputs=[image_upload_btn, session_image], outputs=[image_upload_msg, image_output]) | |
| image_send_btn.click(handle_text_image, inputs=[image_question, session_image], outputs=[image_output]) | |
| image_reset_btn.click(lambda: (str(uuid.uuid4()), ""), outputs=[session_image, image_output]) | |
| image_download_btn.click(download_pdf_summary, inputs=[session_image], outputs=[image_summary_file]) | |
| if __name__ == "__main__": | |
| demo.launch() | |