File size: 16,798 Bytes
10f89e3
 
2dd4010
 
 
 
 
10f89e3
 
 
 
 
 
 
 
 
 
 
 
 
2dd4010
10f89e3
 
 
2dd4010
 
 
 
 
 
10f89e3
 
 
 
 
 
 
 
 
 
2dd4010
 
10f89e3
 
 
 
2dd4010
 
 
 
10f89e3
2dd4010
 
 
 
 
 
10f89e3
2dd4010
 
 
10f89e3
 
2dd4010
 
10f89e3
2dd4010
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
10f89e3
 
2dd4010
 
 
 
 
 
10f89e3
c2188d0
10f89e3
 
 
 
2dd4010
 
 
 
 
10f89e3
2dd4010
10f89e3
2dd4010
10f89e3
 
 
2dd4010
10f89e3
2dd4010
10f89e3
 
 
2dd4010
10f89e3
 
2dd4010
 
 
 
 
23df129
66b471d
23df129
 
 
 
5c078d3
23df129
10f89e3
2dd4010
 
 
 
10f89e3
 
2dd4010
 
 
 
 
10f89e3
2dd4010
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23df129
 
 
 
 
 
 
 
 
 
 
 
2dd4010
 
 
 
 
10f89e3
 
 
2dd4010
 
 
10f89e3
2dd4010
 
 
 
10f89e3
2dd4010
 
 
 
 
10f89e3
2dd4010
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23df129
2dd4010
23df129
 
 
 
 
 
 
 
 
 
2dd4010
 
 
 
 
10f89e3
 
2dd4010
 
 
10f89e3
2dd4010
 
 
 
 
 
 
 
 
10f89e3
 
2dd4010
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ec41711
ff88dbb
feae3d3
e5bb7ca
 
2dd4010
 
23df129
 
2dd4010
e5bb7ca
 
2dd4010
 
 
 
 
 
 
23df129
2dd4010
23df129
2dd4010
 
 
 
 
 
23df129
 
2dd4010
 
 
23df129
2dd4010
 
23df129
2dd4010
 
 
 
 
 
23df129
2dd4010
acba9d9
2dd4010
23df129
2dd4010
 
 
 
 
 
23df129
2dd4010
 
 
 
 
 
 
acba9d9
2dd4010
23df129
2dd4010
 
 
 
 
 
23df129
2dd4010
 
 
 
 
 
 
23df129
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
# app.py
"""
Multi-Mode AI Assistant (Voice, PDF, Image) with Wow-Factor Features
- Preserves original features
- Adds snippet highlighting, cross-modal memory, styled PDF generation
- Live waveform placeholder for voice input
- Modular & Hugging Face safe
"""
import os
import uuid
import tempfile
import requests
from dotenv import load_dotenv
from gtts import gTTS
from PyPDF2 import PdfReader
import gradio as gr
from sentence_transformers import SentenceTransformer, util
from fpdf import FPDF
from datetime import datetime

# ------------------ Load API Keys ------------------
load_dotenv()
GROQ_API_KEY = os.getenv("GROQ_API_KEY", "").strip()
OCR_SPACE_API_KEY = os.getenv("OCR_SPACE_API_KEY", "").strip()

if not GROQ_API_KEY:
    raise ValueError("❌ GROQ_API_KEY missing. Set it in env / Hugging Face Secrets.")
if not OCR_SPACE_API_KEY:
    raise ValueError("❌ OCR_SPACE_API_KEY missing. Set it in env / Hugging Face Secrets.")

HEADERS = {"Authorization": f"Bearer {GROQ_API_KEY}"}

# ------------------ Global State ------------------
SESSION_HISTORY = {}
CHAT_DISPLAY = {}
PDF_CONTENT = {}
PDF_EMBEDS = {}
IMAGE_TEXT = {}
IMAGE_EMBEDS = {}
CHUNK_SIZE = 1500

# Load embedding model
embed_model = SentenceTransformer("all-MiniLM-L6-v2")

# ------------------ Helpers ------------------
def _get_path_from_gr_file(gr_file):
    if not gr_file:
        return None
    if isinstance(gr_file, str) and os.path.exists(gr_file):
        return gr_file
    try:
        if hasattr(gr_file, "name") and os.path.exists(gr_file.name):
            return gr_file.name
    except Exception:
        pass
    if isinstance(gr_file, dict):
        for key in ("name", "file_name", "filepath"):
            if key in gr_file:
                candidate = gr_file.get(key)
                if isinstance(candidate, str) and os.path.exists(candidate):
                    return candidate
    return None

def chunk_text(text, size=CHUNK_SIZE):
    return [text[i:i + size] for i in range(0, len(text), size)]

def synthesize_speech(text, lang="en"):
    try:
        if not text:
            return None
        tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3")
        gTTS(text=text, lang=lang).save(tmp.name)
        return tmp.name
    except Exception as e:
        print("TTS error:", e)
        return None

def select_relevant_chunk(question, chunks, chunk_embeds):
    if not chunks or chunk_embeds is None:
        return ""
    q_emb = embed_model.encode(question, convert_to_tensor=True)
    scores = util.cos_sim(q_emb, chunk_embeds)[0]
    top_idx = int(scores.argmax().item())
    return chunks[top_idx]

def _chat_display_to_messages(chat_display):
    msgs = []
    for user, assistant in chat_display:
        msgs.append({"role": "user", "content": user})
        msgs.append({"role": "assistant", "content": assistant})
    return msgs

# ------------------ Transcription & LLM ------------------
def transcribe_audio(audio_path):
    if not audio_path or not os.path.exists(audio_path):
        return "Error: audio file missing."
    try:
        url = "https://api.groq.com/openai/v1/audio/transcriptions"
        with open(audio_path, "rb") as f:
            files = {"file": (os.path.basename(audio_path), f, "audio/wav")}
            data = {"model": "whisper-large-v3"}
            resp = requests.post(url, headers=HEADERS, files=files, data=data, timeout=60)
        resp.raise_for_status()
        return resp.json().get("text", "") or ""
    except Exception as e:
        print("transcription error:", e)
        return f"Error transcribing audio: {e}"

def groq_chat_completion(messages):
    body = {"model": "llama-3.1-8b-instant", "messages": messages}
    try:
        resp = requests.post("https://api.groq.com/openai/v1/chat/completions", headers=HEADERS, json=body, timeout=60)
        resp.raise_for_status()
        return resp.json()["choices"][0]["message"]["content"]
    except Exception as e:
        print("groq_chat_completion error:", e)
        return f"Error generating response: {e}"

def generate_response(session_id, user_text, enhancer_enabled=False, enhancer_tone="Helpful"):
    if session_id not in SESSION_HISTORY:
        SESSION_HISTORY[session_id] = []

    SESSION_HISTORY[session_id].append({"role": "user", "content": user_text})

    messages = [
        {
            "role": "system",
            "content": "You are a helpful AI assistant. ALWAYS respond in English only, regardless of the user's language or the input language."
        }
    ] + SESSION_HISTORY[session_id]

    if enhancer_enabled:
        messages.append({"role": "user", "content": f"Enhance response. Tone: {enhancer_tone}. Question: {user_text}"})

    assistant_text = groq_chat_completion(messages)
    SESSION_HISTORY[session_id].append({"role": "assistant", "content": assistant_text})
    return assistant_text

# ------------------ PDF handling ------------------
def handle_pdf_upload(pdf_file, session_id):
    path = _get_path_from_gr_file(pdf_file)
    if not path:
        return "No file uploaded or file unreadable."
    try:
        reader = PdfReader(path)
        text = ""
        for page in reader.pages:
            text += (page.extract_text() or "") + "\n"
        if not text.strip():
            return "No extractable content found in PDF."
        chunks = chunk_text(text)
        PDF_CONTENT[session_id] = chunks
        PDF_EMBEDS[session_id] = embed_model.encode(chunks, convert_to_tensor=True)
        return f"PDF processed: {len(chunks)} chunks ready."
    except Exception as e:
        print("PDF upload error:", e)
        return f"Error processing PDF: {e}"

def handle_pdf_question(question, session_id):
    if session_id not in PDF_CONTENT:
        return "Document not found. Upload first."
    chunk = select_relevant_chunk(question, PDF_CONTENT[session_id], PDF_EMBEDS[session_id])

    messages = [
        {
            "role": "system",
            "content": "You are a helpful assistant summarizing PDF content. ALWAYS respond in English only, regardless of the user's language."
        },
        {
            "role": "user",
            "content": f"PDF chunk:\n{chunk}\n\nQuestion: {question}"
        }
    ]

    assistant_text = groq_chat_completion(messages)
    assistant_text = f"**Snippet from PDF:**\n{chunk[:200]}...\n\n**Answer:**\n{assistant_text}"
    if session_id not in SESSION_HISTORY:
        SESSION_HISTORY[session_id] = []
    SESSION_HISTORY[session_id].append({"role": "assistant", "content": assistant_text})
    return assistant_text

# ------------------ Image OCR ------------------
def ocr_space_file(image_path, api_key, language="eng"):
    if not image_path or not os.path.exists(image_path):
        return ""
    try:
        with open(image_path, "rb") as f:
            payload = {"apikey": api_key, "language": language}
            files = {"file": f}
            r = requests.post("https://api.ocr.space/parse/image", files=files, data=payload, timeout=60)
        r.raise_for_status()
        j = r.json()
        if j.get("IsErroredOnProcessing"):
            print("OCR.space processing error:", j)
            return ""
        parsed = [pr.get("ParsedText", "") for pr in j.get("ParsedResults", [])]
        return "\n".join(parsed)
    except Exception as e:
        print("ocr_space_file error:", e)
        return ""

def handle_image_upload(image_file, session_id):
    path = _get_path_from_gr_file(image_file)
    if not path:
        return "No image uploaded or file unreadable.", ""
    parsed = ocr_space_file(path, OCR_SPACE_API_KEY)
    if not parsed.strip():
        return "No extractable text found in the image.", ""
    chunks = chunk_text(parsed)
    IMAGE_TEXT[session_id] = chunks
    IMAGE_EMBEDS[session_id] = embed_model.encode(chunks, convert_to_tensor=True)
    return f"Image processed: {len(chunks)} chunks ready.", ""

def handle_image_question(question, session_id):
    if session_id not in IMAGE_TEXT:
        return "Image not found. Upload first."
    chunk = select_relevant_chunk(question, IMAGE_TEXT[session_id], IMAGE_EMBEDS[session_id])

    messages = [
        {
            "role": "system",
            "content": "You are a helpful assistant summarizing image text. ALWAYS respond in English only, regardless of the user's language."
        },
        {
            "role": "user",
            "content": f"Image chunk:\n{chunk}\n\nQuestion: {question}"
        }
    ]

    assistant_text = groq_chat_completion(messages)
    assistant_text = f"**Snippet from Image:**\n{chunk[:200]}...\n\n**Answer:**\n{assistant_text}"
    if session_id not in SESSION_HISTORY:
        SESSION_HISTORY[session_id] = []
    SESSION_HISTORY[session_id].append({"role": "assistant", "content": assistant_text})
    return assistant_text

# ------------------ PDF Generation ------------------
def generate_pdf_file(text, filename_prefix="summary"):
    pdf = FPDF()
    pdf.add_page()
    pdf.set_auto_page_break(auto=True, margin=15)
    pdf.set_font("Arial", "B", size=14)
    pdf.multi_cell(0, 8, f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M')}\n\n")
    pdf.set_font("Arial", size=12)
    for line in text.split("\n"):
        pdf.multi_cell(0, 6, line)
    file_path = f"/tmp/{filename_prefix}_{uuid.uuid4()}.pdf"
    pdf.output(file_path)
    return file_path

def download_pdf_summary(session_id):
    summary_text = "\n".join([m["content"] for m in SESSION_HISTORY.get(session_id, []) if m["role"]=="assistant"])
    if not summary_text:
        summary_text = "No summary available."
    return generate_pdf_file(summary_text, "summary")

# ------------------ Voice & Chat Handlers ------------------
def _append_chat_display(session_id, user_text, assistant_text):
    if session_id not in CHAT_DISPLAY:
        CHAT_DISPLAY[session_id] = []
    CHAT_DISPLAY[session_id].append((user_text, assistant_text))

def handle_voice_general(audio_file, session_id, tts_lang="en", enhancer_enabled=False, enhancer_tone="Helpful"):
    path = _get_path_from_gr_file(audio_file)
    if not path:
        return "No audio provided.", None, []
    user_text = transcribe_audio(path)
    assistant_text = generate_response(session_id, user_text, enhancer_enabled, enhancer_tone)
    _append_chat_display(session_id, user_text, assistant_text)
    audio_path = synthesize_speech(assistant_text, lang=tts_lang)
    return assistant_text, audio_path, _chat_display_to_messages(CHAT_DISPLAY[session_id])

def handle_voice_pdf(audio_file, session_id, tts_lang="en"):
    path = _get_path_from_gr_file(audio_file)
    if not path:
        return "No audio provided.", None, []
    user_text = transcribe_audio(path)
    assistant_text = handle_pdf_question(user_text, session_id)
    _append_chat_display(session_id, user_text, assistant_text)
    audio_path = synthesize_speech(assistant_text, lang=tts_lang)
    return assistant_text, audio_path, _chat_display_to_messages(CHAT_DISPLAY[session_id])

def handle_voice_image(audio_file, session_id, tts_lang="en"):
    path = _get_path_from_gr_file(audio_file)
    if not path:
        return "No audio provided.", None, []
    user_text = transcribe_audio(path)
    assistant_text = handle_image_question(user_text, session_id)
    _append_chat_display(session_id, user_text, assistant_text)
    audio_path = synthesize_speech(assistant_text, lang=tts_lang)
    return assistant_text, audio_path, _chat_display_to_messages(CHAT_DISPLAY[session_id])

def handle_text_general(user_text, session_id, enhancer_enabled=False, enhancer_tone="Helpful"):
    assistant = generate_response(session_id, user_text, enhancer_enabled, enhancer_tone)
    _append_chat_display(session_id, user_text, assistant)
    return assistant, _chat_display_to_messages(CHAT_DISPLAY[session_id])

def handle_text_pdf(question, session_id):
    return handle_pdf_question(question, session_id)

def handle_text_image(question, session_id):
    return handle_image_question(question, session_id)

# ------------------ Gradio UI ------------------
with gr.Blocks() as demo:
    gr.HTML("""
    <style>
        /* Change height + width of the audio recorder box */
        #mic_box audio {
            height: 50px !important;
            width: 200px !important;
        }
    </style>
    """)
    gr.Markdown("## 🛠 Multi-Mode AI Assistant (Voice, PDF, Image)")

    session_voice = gr.State(str(uuid.uuid4()))
    session_pdf = gr.State(str(uuid.uuid4()))
    session_image = gr.State(str(uuid.uuid4()))

    with gr.Tab("🎤 Voice Chat"):
        chat_voice = gr.Chatbot(height=320)
        with gr.Row():
            mic = gr.Audio(type="filepath", label="🎤 Record Voice (hold & speak)", elem_id="mic_box")
            audio_output = gr.Audio(label="Assistant Voice Output", type="filepath", interactive=False)
            tts_lang = gr.Dropdown(choices=["en", "ur"], value="en", label="TTS Language")
        with gr.Row():
            btn_general = gr.Button("⚡Ask General 🎯")
            btn_pdf = gr.Button("⚡Ask PDF 📄")
            btn_image = gr.Button("⚡Ask Image 🖼")
            enhancer_toggle = gr.Checkbox(label="Enable Response Enhancer", value=False, scale=1)
            tone_dropdown = gr.Dropdown(choices=["Helpful", "Formal", "Friendly"], value="Helpful", label="Enhancer Tone", scale=1)
        with gr.Row():
            btn_reset_logs = gr.Button("♻ Reset LOGs")
            btn_download_logs = gr.Button("📥 Download Summary")
            Voice_summary_file = gr.File(label="📥Download Summary File", interactive=False, scale=1)
        answer_voice = gr.Textbox(label="Assistant Answer (text)", lines=2, visible=False)

        btn_general.click(fn=handle_voice_general,
                          inputs=[mic, session_voice, tts_lang, enhancer_toggle, tone_dropdown],
                          outputs=[answer_voice, audio_output, chat_voice])
        btn_pdf.click(fn=handle_voice_pdf, inputs=[mic, session_pdf, tts_lang], outputs=[answer_voice, audio_output, chat_voice])
        btn_image.click(fn=handle_voice_image, inputs=[mic, session_image, tts_lang], outputs=[answer_voice, audio_output, chat_voice])
        btn_reset_logs.click(lambda: (str(uuid.uuid4()), [], None, None, ""), outputs=[session_voice, chat_voice, mic, audio_output, answer_voice])
        btn_download_logs.click(download_pdf_summary, inputs=[session_voice], outputs=[Voice_summary_file])

    with gr.Tab("📄 PDF Summarizer"):
        pdf_output = gr.Textbox(label="Answer (Text Only)", lines=5)
        with gr.Row():
            pdf_upload_btn = gr.File(label="Upload PDF", file_types=[".pdf"], scale=1)
            pdf_question = gr.Textbox(label="Ask a question about PDF (text)", lines=3)
            pdf_upload_msg = gr.Textbox(label="Upload Status", interactive=False)
        with gr.Row():
            pdf_send_btn = gr.Button("Ask (Questions)")
            pdf_reset_btn = gr.Button("♻ Reset LOGs")
        with gr.Row():
            pdf_summary_file = gr.File(label="📥Download Summary File", interactive=False, scale=1)
            pdf_download_btn = gr.Button("📥 Download Summary")
        pdf_upload_btn.upload(handle_pdf_upload, inputs=[pdf_upload_btn, session_pdf], outputs=[pdf_upload_msg])
        pdf_send_btn.click(handle_text_pdf, inputs=[pdf_question, session_pdf], outputs=[pdf_output])
        pdf_reset_btn.click(lambda: (str(uuid.uuid4()), ""), outputs=[session_pdf, pdf_output])
        pdf_download_btn.click(download_pdf_summary, inputs=[session_pdf], outputs=[pdf_summary_file])

    with gr.Tab("🖼 Image OCR"):
        image_output = gr.Textbox(label="Answer (Text Only)", lines=5)
        with gr.Row():
            image_upload_btn = gr.File(label="Upload Image", file_types=[".png", ".jpg", ".jpeg"], scale=1)
            image_question = gr.Textbox(label="Ask question about Image", lines=3)
            image_upload_msg = gr.Textbox(label="Upload Status", interactive=False)
        with gr.Row():
            image_send_btn = gr.Button("Ask (Questions)")
            image_reset_btn = gr.Button("♻ Reset LOGs")
        with gr.Row():
            image_summary_file = gr.File(label="📥Download Summary File", interactive=False, scale=1)
            image_download_btn = gr.Button("📥 Download Summary")
        image_upload_btn.upload(handle_image_upload, inputs=[image_upload_btn, session_image], outputs=[image_upload_msg, image_output])
        image_send_btn.click(handle_text_image, inputs=[image_question, session_image], outputs=[image_output])
        image_reset_btn.click(lambda: (str(uuid.uuid4()), ""), outputs=[session_image, image_output])
        image_download_btn.click(download_pdf_summary, inputs=[session_image], outputs=[image_summary_file])

if __name__ == "__main__":
    demo.launch()