asad9641's picture
Update app.py
23df129 verified
# app.py
"""
Multi-Mode AI Assistant (Voice, PDF, Image) with Wow-Factor Features
- Preserves original features
- Adds snippet highlighting, cross-modal memory, styled PDF generation
- Live waveform placeholder for voice input
- Modular & Hugging Face safe
"""
import os
import uuid
import tempfile
import requests
from dotenv import load_dotenv
from gtts import gTTS
from PyPDF2 import PdfReader
import gradio as gr
from sentence_transformers import SentenceTransformer, util
from fpdf import FPDF
from datetime import datetime
# ------------------ Load API Keys ------------------
load_dotenv()
GROQ_API_KEY = os.getenv("GROQ_API_KEY", "").strip()
OCR_SPACE_API_KEY = os.getenv("OCR_SPACE_API_KEY", "").strip()
if not GROQ_API_KEY:
raise ValueError("❌ GROQ_API_KEY missing. Set it in env / Hugging Face Secrets.")
if not OCR_SPACE_API_KEY:
raise ValueError("❌ OCR_SPACE_API_KEY missing. Set it in env / Hugging Face Secrets.")
HEADERS = {"Authorization": f"Bearer {GROQ_API_KEY}"}
# ------------------ Global State ------------------
SESSION_HISTORY = {}
CHAT_DISPLAY = {}
PDF_CONTENT = {}
PDF_EMBEDS = {}
IMAGE_TEXT = {}
IMAGE_EMBEDS = {}
CHUNK_SIZE = 1500
# Load embedding model
embed_model = SentenceTransformer("all-MiniLM-L6-v2")
# ------------------ Helpers ------------------
def _get_path_from_gr_file(gr_file):
if not gr_file:
return None
if isinstance(gr_file, str) and os.path.exists(gr_file):
return gr_file
try:
if hasattr(gr_file, "name") and os.path.exists(gr_file.name):
return gr_file.name
except Exception:
pass
if isinstance(gr_file, dict):
for key in ("name", "file_name", "filepath"):
if key in gr_file:
candidate = gr_file.get(key)
if isinstance(candidate, str) and os.path.exists(candidate):
return candidate
return None
def chunk_text(text, size=CHUNK_SIZE):
return [text[i:i + size] for i in range(0, len(text), size)]
def synthesize_speech(text, lang="en"):
try:
if not text:
return None
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3")
gTTS(text=text, lang=lang).save(tmp.name)
return tmp.name
except Exception as e:
print("TTS error:", e)
return None
def select_relevant_chunk(question, chunks, chunk_embeds):
if not chunks or chunk_embeds is None:
return ""
q_emb = embed_model.encode(question, convert_to_tensor=True)
scores = util.cos_sim(q_emb, chunk_embeds)[0]
top_idx = int(scores.argmax().item())
return chunks[top_idx]
def _chat_display_to_messages(chat_display):
msgs = []
for user, assistant in chat_display:
msgs.append({"role": "user", "content": user})
msgs.append({"role": "assistant", "content": assistant})
return msgs
# ------------------ Transcription & LLM ------------------
def transcribe_audio(audio_path):
if not audio_path or not os.path.exists(audio_path):
return "Error: audio file missing."
try:
url = "https://api.groq.com/openai/v1/audio/transcriptions"
with open(audio_path, "rb") as f:
files = {"file": (os.path.basename(audio_path), f, "audio/wav")}
data = {"model": "whisper-large-v3"}
resp = requests.post(url, headers=HEADERS, files=files, data=data, timeout=60)
resp.raise_for_status()
return resp.json().get("text", "") or ""
except Exception as e:
print("transcription error:", e)
return f"Error transcribing audio: {e}"
def groq_chat_completion(messages):
body = {"model": "llama-3.1-8b-instant", "messages": messages}
try:
resp = requests.post("https://api.groq.com/openai/v1/chat/completions", headers=HEADERS, json=body, timeout=60)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"]
except Exception as e:
print("groq_chat_completion error:", e)
return f"Error generating response: {e}"
def generate_response(session_id, user_text, enhancer_enabled=False, enhancer_tone="Helpful"):
if session_id not in SESSION_HISTORY:
SESSION_HISTORY[session_id] = []
SESSION_HISTORY[session_id].append({"role": "user", "content": user_text})
messages = [
{
"role": "system",
"content": "You are a helpful AI assistant. ALWAYS respond in English only, regardless of the user's language or the input language."
}
] + SESSION_HISTORY[session_id]
if enhancer_enabled:
messages.append({"role": "user", "content": f"Enhance response. Tone: {enhancer_tone}. Question: {user_text}"})
assistant_text = groq_chat_completion(messages)
SESSION_HISTORY[session_id].append({"role": "assistant", "content": assistant_text})
return assistant_text
# ------------------ PDF handling ------------------
def handle_pdf_upload(pdf_file, session_id):
path = _get_path_from_gr_file(pdf_file)
if not path:
return "No file uploaded or file unreadable."
try:
reader = PdfReader(path)
text = ""
for page in reader.pages:
text += (page.extract_text() or "") + "\n"
if not text.strip():
return "No extractable content found in PDF."
chunks = chunk_text(text)
PDF_CONTENT[session_id] = chunks
PDF_EMBEDS[session_id] = embed_model.encode(chunks, convert_to_tensor=True)
return f"PDF processed: {len(chunks)} chunks ready."
except Exception as e:
print("PDF upload error:", e)
return f"Error processing PDF: {e}"
def handle_pdf_question(question, session_id):
if session_id not in PDF_CONTENT:
return "Document not found. Upload first."
chunk = select_relevant_chunk(question, PDF_CONTENT[session_id], PDF_EMBEDS[session_id])
messages = [
{
"role": "system",
"content": "You are a helpful assistant summarizing PDF content. ALWAYS respond in English only, regardless of the user's language."
},
{
"role": "user",
"content": f"PDF chunk:\n{chunk}\n\nQuestion: {question}"
}
]
assistant_text = groq_chat_completion(messages)
assistant_text = f"**Snippet from PDF:**\n{chunk[:200]}...\n\n**Answer:**\n{assistant_text}"
if session_id not in SESSION_HISTORY:
SESSION_HISTORY[session_id] = []
SESSION_HISTORY[session_id].append({"role": "assistant", "content": assistant_text})
return assistant_text
# ------------------ Image OCR ------------------
def ocr_space_file(image_path, api_key, language="eng"):
if not image_path or not os.path.exists(image_path):
return ""
try:
with open(image_path, "rb") as f:
payload = {"apikey": api_key, "language": language}
files = {"file": f}
r = requests.post("https://api.ocr.space/parse/image", files=files, data=payload, timeout=60)
r.raise_for_status()
j = r.json()
if j.get("IsErroredOnProcessing"):
print("OCR.space processing error:", j)
return ""
parsed = [pr.get("ParsedText", "") for pr in j.get("ParsedResults", [])]
return "\n".join(parsed)
except Exception as e:
print("ocr_space_file error:", e)
return ""
def handle_image_upload(image_file, session_id):
path = _get_path_from_gr_file(image_file)
if not path:
return "No image uploaded or file unreadable.", ""
parsed = ocr_space_file(path, OCR_SPACE_API_KEY)
if not parsed.strip():
return "No extractable text found in the image.", ""
chunks = chunk_text(parsed)
IMAGE_TEXT[session_id] = chunks
IMAGE_EMBEDS[session_id] = embed_model.encode(chunks, convert_to_tensor=True)
return f"Image processed: {len(chunks)} chunks ready.", ""
def handle_image_question(question, session_id):
if session_id not in IMAGE_TEXT:
return "Image not found. Upload first."
chunk = select_relevant_chunk(question, IMAGE_TEXT[session_id], IMAGE_EMBEDS[session_id])
messages = [
{
"role": "system",
"content": "You are a helpful assistant summarizing image text. ALWAYS respond in English only, regardless of the user's language."
},
{
"role": "user",
"content": f"Image chunk:\n{chunk}\n\nQuestion: {question}"
}
]
assistant_text = groq_chat_completion(messages)
assistant_text = f"**Snippet from Image:**\n{chunk[:200]}...\n\n**Answer:**\n{assistant_text}"
if session_id not in SESSION_HISTORY:
SESSION_HISTORY[session_id] = []
SESSION_HISTORY[session_id].append({"role": "assistant", "content": assistant_text})
return assistant_text
# ------------------ PDF Generation ------------------
def generate_pdf_file(text, filename_prefix="summary"):
pdf = FPDF()
pdf.add_page()
pdf.set_auto_page_break(auto=True, margin=15)
pdf.set_font("Arial", "B", size=14)
pdf.multi_cell(0, 8, f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M')}\n\n")
pdf.set_font("Arial", size=12)
for line in text.split("\n"):
pdf.multi_cell(0, 6, line)
file_path = f"/tmp/{filename_prefix}_{uuid.uuid4()}.pdf"
pdf.output(file_path)
return file_path
def download_pdf_summary(session_id):
summary_text = "\n".join([m["content"] for m in SESSION_HISTORY.get(session_id, []) if m["role"]=="assistant"])
if not summary_text:
summary_text = "No summary available."
return generate_pdf_file(summary_text, "summary")
# ------------------ Voice & Chat Handlers ------------------
def _append_chat_display(session_id, user_text, assistant_text):
if session_id not in CHAT_DISPLAY:
CHAT_DISPLAY[session_id] = []
CHAT_DISPLAY[session_id].append((user_text, assistant_text))
def handle_voice_general(audio_file, session_id, tts_lang="en", enhancer_enabled=False, enhancer_tone="Helpful"):
path = _get_path_from_gr_file(audio_file)
if not path:
return "No audio provided.", None, []
user_text = transcribe_audio(path)
assistant_text = generate_response(session_id, user_text, enhancer_enabled, enhancer_tone)
_append_chat_display(session_id, user_text, assistant_text)
audio_path = synthesize_speech(assistant_text, lang=tts_lang)
return assistant_text, audio_path, _chat_display_to_messages(CHAT_DISPLAY[session_id])
def handle_voice_pdf(audio_file, session_id, tts_lang="en"):
path = _get_path_from_gr_file(audio_file)
if not path:
return "No audio provided.", None, []
user_text = transcribe_audio(path)
assistant_text = handle_pdf_question(user_text, session_id)
_append_chat_display(session_id, user_text, assistant_text)
audio_path = synthesize_speech(assistant_text, lang=tts_lang)
return assistant_text, audio_path, _chat_display_to_messages(CHAT_DISPLAY[session_id])
def handle_voice_image(audio_file, session_id, tts_lang="en"):
path = _get_path_from_gr_file(audio_file)
if not path:
return "No audio provided.", None, []
user_text = transcribe_audio(path)
assistant_text = handle_image_question(user_text, session_id)
_append_chat_display(session_id, user_text, assistant_text)
audio_path = synthesize_speech(assistant_text, lang=tts_lang)
return assistant_text, audio_path, _chat_display_to_messages(CHAT_DISPLAY[session_id])
def handle_text_general(user_text, session_id, enhancer_enabled=False, enhancer_tone="Helpful"):
assistant = generate_response(session_id, user_text, enhancer_enabled, enhancer_tone)
_append_chat_display(session_id, user_text, assistant)
return assistant, _chat_display_to_messages(CHAT_DISPLAY[session_id])
def handle_text_pdf(question, session_id):
return handle_pdf_question(question, session_id)
def handle_text_image(question, session_id):
return handle_image_question(question, session_id)
# ------------------ Gradio UI ------------------
with gr.Blocks() as demo:
gr.HTML("""
<style>
/* Change height + width of the audio recorder box */
#mic_box audio {
height: 50px !important;
width: 200px !important;
}
</style>
""")
gr.Markdown("## 🛠 Multi-Mode AI Assistant (Voice, PDF, Image)")
session_voice = gr.State(str(uuid.uuid4()))
session_pdf = gr.State(str(uuid.uuid4()))
session_image = gr.State(str(uuid.uuid4()))
with gr.Tab("🎤 Voice Chat"):
chat_voice = gr.Chatbot(height=320)
with gr.Row():
mic = gr.Audio(type="filepath", label="🎤 Record Voice (hold & speak)", elem_id="mic_box")
audio_output = gr.Audio(label="Assistant Voice Output", type="filepath", interactive=False)
tts_lang = gr.Dropdown(choices=["en", "ur"], value="en", label="TTS Language")
with gr.Row():
btn_general = gr.Button("⚡Ask General 🎯")
btn_pdf = gr.Button("⚡Ask PDF 📄")
btn_image = gr.Button("⚡Ask Image 🖼")
enhancer_toggle = gr.Checkbox(label="Enable Response Enhancer", value=False, scale=1)
tone_dropdown = gr.Dropdown(choices=["Helpful", "Formal", "Friendly"], value="Helpful", label="Enhancer Tone", scale=1)
with gr.Row():
btn_reset_logs = gr.Button("♻ Reset LOGs")
btn_download_logs = gr.Button("📥 Download Summary")
Voice_summary_file = gr.File(label="📥Download Summary File", interactive=False, scale=1)
answer_voice = gr.Textbox(label="Assistant Answer (text)", lines=2, visible=False)
btn_general.click(fn=handle_voice_general,
inputs=[mic, session_voice, tts_lang, enhancer_toggle, tone_dropdown],
outputs=[answer_voice, audio_output, chat_voice])
btn_pdf.click(fn=handle_voice_pdf, inputs=[mic, session_pdf, tts_lang], outputs=[answer_voice, audio_output, chat_voice])
btn_image.click(fn=handle_voice_image, inputs=[mic, session_image, tts_lang], outputs=[answer_voice, audio_output, chat_voice])
btn_reset_logs.click(lambda: (str(uuid.uuid4()), [], None, None, ""), outputs=[session_voice, chat_voice, mic, audio_output, answer_voice])
btn_download_logs.click(download_pdf_summary, inputs=[session_voice], outputs=[Voice_summary_file])
with gr.Tab("📄 PDF Summarizer"):
pdf_output = gr.Textbox(label="Answer (Text Only)", lines=5)
with gr.Row():
pdf_upload_btn = gr.File(label="Upload PDF", file_types=[".pdf"], scale=1)
pdf_question = gr.Textbox(label="Ask a question about PDF (text)", lines=3)
pdf_upload_msg = gr.Textbox(label="Upload Status", interactive=False)
with gr.Row():
pdf_send_btn = gr.Button("Ask (Questions)")
pdf_reset_btn = gr.Button("♻ Reset LOGs")
with gr.Row():
pdf_summary_file = gr.File(label="📥Download Summary File", interactive=False, scale=1)
pdf_download_btn = gr.Button("📥 Download Summary")
pdf_upload_btn.upload(handle_pdf_upload, inputs=[pdf_upload_btn, session_pdf], outputs=[pdf_upload_msg])
pdf_send_btn.click(handle_text_pdf, inputs=[pdf_question, session_pdf], outputs=[pdf_output])
pdf_reset_btn.click(lambda: (str(uuid.uuid4()), ""), outputs=[session_pdf, pdf_output])
pdf_download_btn.click(download_pdf_summary, inputs=[session_pdf], outputs=[pdf_summary_file])
with gr.Tab("🖼 Image OCR"):
image_output = gr.Textbox(label="Answer (Text Only)", lines=5)
with gr.Row():
image_upload_btn = gr.File(label="Upload Image", file_types=[".png", ".jpg", ".jpeg"], scale=1)
image_question = gr.Textbox(label="Ask question about Image", lines=3)
image_upload_msg = gr.Textbox(label="Upload Status", interactive=False)
with gr.Row():
image_send_btn = gr.Button("Ask (Questions)")
image_reset_btn = gr.Button("♻ Reset LOGs")
with gr.Row():
image_summary_file = gr.File(label="📥Download Summary File", interactive=False, scale=1)
image_download_btn = gr.Button("📥 Download Summary")
image_upload_btn.upload(handle_image_upload, inputs=[image_upload_btn, session_image], outputs=[image_upload_msg, image_output])
image_send_btn.click(handle_text_image, inputs=[image_question, session_image], outputs=[image_output])
image_reset_btn.click(lambda: (str(uuid.uuid4()), ""), outputs=[session_image, image_output])
image_download_btn.click(download_pdf_summary, inputs=[session_image], outputs=[image_summary_file])
if __name__ == "__main__":
demo.launch()