File size: 17,300 Bytes
f11f033
 
1441250
 
 
 
 
f11f033
2152a7e
 
 
 
 
 
02bd198
2152a7e
 
f806da7
1441250
2152a7e
 
 
 
 
1441250
2152a7e
1441250
2152a7e
1441250
2152a7e
 
1b0ed47
bb63563
1f4daab
 
 
 
 
 
d35c338
b7f4a0f
1441250
1ebfe6a
f11f033
1f4daab
1f64b26
b7f4a0f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d9b53fd
1f64b26
b7f4a0f
e83ca54
2152a7e
b7f4a0f
 
 
 
 
 
 
 
 
2152a7e
 
b7f4a0f
 
 
 
 
 
2152a7e
f11f033
b7f4a0f
 
5c70b41
 
b7f4a0f
f11f033
1441250
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1f4daab
5c70b41
1f4daab
5c70b41
1f4daab
 
 
 
 
4e23912
1f4daab
b7f4a0f
 
1441250
1f4daab
1441250
 
1f4daab
 
1441250
1f4daab
5c70b41
1f4daab
4e23912
1441250
2152a7e
b7f4a0f
 
 
 
 
 
 
1f4daab
b7f4a0f
 
 
 
 
 
 
 
 
2152a7e
bb63563
b7f4a0f
 
1441250
b7f4a0f
 
1f4daab
b7f4a0f
1f4daab
5c70b41
1441250
 
 
 
1f4daab
4e23912
1f4daab
1f64b26
b7f4a0f
 
 
 
 
 
 
 
 
 
1441250
b7f4a0f
 
1f4daab
b7f4a0f
 
 
1f64b26
 
b7f4a0f
 
 
 
 
 
 
 
 
 
1f64b26
bb63563
b7f4a0f
 
1441250
b7f4a0f
 
1f4daab
b7f4a0f
1f4daab
1441250
 
 
 
1f4daab
4e23912
1f4daab
52a8c9a
b7f4a0f
 
 
1441250
 
b7f4a0f
1f4daab
b7f4a0f
 
 
 
 
5c70b41
 
 
 
 
3971e9d
c24a00c
bb63563
b7f4a0f
 
5c70b41
e83ca54
1f4daab
b7f4a0f
 
 
 
1f4daab
b7f4a0f
 
5c70b41
1f64b26
1441250
 
 
 
 
 
 
 
5c70b41
1441250
 
 
 
 
 
 
 
 
5c70b41
1441250
1f4daab
 
b7f4a0f
5c70b41
f11f033
1441250
 
 
 
 
 
1f4daab
1ebfe6a
96717c0
 
5c70b41
96717c0
5c70b41
 
96717c0
 
 
1f4daab
 
2152a7e
 
 
5c70b41
 
2152a7e
1f4daab
 
 
5c70b41
1f4daab
1441250
645b4c8
1584ff6
a356872
 
 
5c70b41
 
3c34387
645b4c8
 
5c70b41
 
 
 
 
 
 
 
1441250
 
 
 
 
 
 
5c70b41
1441250
5c70b41
 
b67a3bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5c70b41
b67a3bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5c70b41
9a14736
f4c0bf3
5c70b41
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
# app.py
"""
Multi-Mode AI Assistant (Voice, PDF, Image) with Wow-Factor Features
- Preserves original features
- Adds snippet highlighting, cross-modal memory, styled PDF generation
- Live waveform placeholder for voice input
- Modular & Hugging Face safe
"""
import os
import uuid
import tempfile
import requests
from dotenv import load_dotenv
from gtts import gTTS
from PyPDF2 import PdfReader
import gradio as gr
from sentence_transformers import SentenceTransformer, util
from fpdf import FPDF
from datetime import datetime

# ------------------ Load API Keys ------------------
load_dotenv()
GROQ_API_KEY = os.getenv("GROQ_API_KEY", "").strip()
OCR_SPACE_API_KEY = os.getenv("OCR_SPACE_API_KEY", "").strip()

if not GROQ_API_KEY:
    raise ValueError("❌ GROQ_API_KEY missing. Set it in env / Hugging Face Secrets.")
if not OCR_SPACE_API_KEY:
    raise ValueError("❌ OCR_SPACE_API_KEY missing. Set it in env / Hugging Face Secrets.")

HEADERS = {"Authorization": f"Bearer {GROQ_API_KEY}"}

# ------------------ Global State ------------------
SESSION_HISTORY = {}
CHAT_DISPLAY = {}
PDF_CONTENT = {}
PDF_EMBEDS = {}
IMAGE_TEXT = {}
IMAGE_EMBEDS = {}
CHUNK_SIZE = 1500

# Load embedding model
embed_model = SentenceTransformer("all-MiniLM-L6-v2")

# ------------------ Helpers ------------------
def _get_path_from_gr_file(gr_file):
    if not gr_file:
        return None
    if isinstance(gr_file, str) and os.path.exists(gr_file):
        return gr_file
    try:
        if hasattr(gr_file, "name") and os.path.exists(gr_file.name):
            return gr_file.name
    except Exception:
        pass
    if isinstance(gr_file, dict):
        for key in ("name", "file_name", "filepath"):
            if key in gr_file:
                candidate = gr_file.get(key)
                if isinstance(candidate, str) and os.path.exists(candidate):
                    return candidate
    return None

def chunk_text(text, size=CHUNK_SIZE):
    return [text[i:i + size] for i in range(0, len(text), size)]

def synthesize_speech(text, lang="en"):
    try:
        if not text:
            return None
        tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3")
        gTTS(text=text, lang=lang).save(tmp.name)
        return tmp.name
    except Exception as e:
        print("TTS error:", e)
        return None

def select_relevant_chunk(question, chunks, chunk_embeds):
    if not chunks or chunk_embeds is None:
        return ""
    q_emb = embed_model.encode(question, convert_to_tensor=True)
    scores = util.cos_sim(q_emb, chunk_embeds)[0]
    top_idx = int(scores.argmax().item())
    return chunks[top_idx]

def _chat_display_to_messages(chat_display):
    msgs = []
    for user, assistant in chat_display:
        msgs.append({"role": "user", "content": user})
        msgs.append({"role": "assistant", "content": assistant})
    return msgs

# ------------------ Transcription & LLM ------------------
def transcribe_audio(audio_path):
    if not audio_path or not os.path.exists(audio_path):
        return "Error: audio file missing."
    try:
        url = "https://api.groq.com/openai/v1/audio/transcriptions"
        with open(audio_path, "rb") as f:
            files = {"file": (os.path.basename(audio_path), f, "audio/wav")}
            data = {"model": "whisper-large-v3"}
            resp = requests.post(url, headers=HEADERS, files=files, data=data, timeout=60)
        resp.raise_for_status()
        return resp.json().get("text", "") or ""
    except Exception as e:
        print("transcription error:", e)
        return f"Error transcribing audio: {e}"

def groq_chat_completion(messages):
    body = {"model": "llama-3.1-8b-instant", "messages": messages}
    try:
        resp = requests.post("https://api.groq.com/openai/v1/chat/completions", headers=HEADERS, json=body, timeout=60)
        resp.raise_for_status()
        return resp.json()["choices"][0]["message"]["content"]
    except Exception as e:
        print("groq_chat_completion error:", e)
        return f"Error generating response: {e}"

def generate_response(session_id, user_text, enhancer_enabled=False, enhancer_tone="Helpful"):
    if session_id not in SESSION_HISTORY:
        SESSION_HISTORY[session_id] = []

    SESSION_HISTORY[session_id].append({"role": "user", "content": user_text})
    messages = [{"role": "system", "content": "You are a helpful AI assistant."}] + SESSION_HISTORY[session_id]

    if enhancer_enabled:
        messages.append({"role": "user", "content": f"Enhance response. Tone: {enhancer_tone}. Question: {user_text}"})

    assistant_text = groq_chat_completion(messages)
    SESSION_HISTORY[session_id].append({"role": "assistant", "content": assistant_text})
    return assistant_text

# ------------------ PDF handling ------------------
def handle_pdf_upload(pdf_file, session_id):
    path = _get_path_from_gr_file(pdf_file)
    if not path:
        return "No file uploaded or file unreadable."
    try:
        reader = PdfReader(path)
        text = ""
        for page in reader.pages:
            text += (page.extract_text() or "") + "\n"
        if not text.strip():
            return "No extractable content found in PDF."
        chunks = chunk_text(text)
        PDF_CONTENT[session_id] = chunks
        PDF_EMBEDS[session_id] = embed_model.encode(chunks, convert_to_tensor=True)
        return f"PDF processed: {len(chunks)} chunks ready."
    except Exception as e:
        print("PDF upload error:", e)
        return f"Error processing PDF: {e}"

def handle_pdf_question(question, session_id):
    if session_id not in PDF_CONTENT:
        return "Document not found. Upload first."
    chunk = select_relevant_chunk(question, PDF_CONTENT[session_id], PDF_EMBEDS[session_id])
    messages = [
        {"role": "system", "content": "You are a helpful assistant summarizing PDF content."},
        {"role": "user", "content": f"PDF chunk:\n{chunk}\n\nQuestion: {question}"}
    ]
    assistant_text = groq_chat_completion(messages)
    # Add snippet highlighting for wow factor
    assistant_text = f"**Snippet from PDF:**\n{chunk[:200]}...\n\n**Answer:**\n{assistant_text}"
    if session_id not in SESSION_HISTORY:
        SESSION_HISTORY[session_id] = []
    SESSION_HISTORY[session_id].append({"role": "assistant", "content": assistant_text})
    return assistant_text

# ------------------ Image OCR ------------------
def ocr_space_file(image_path, api_key, language="eng"):
    if not image_path or not os.path.exists(image_path):
        return ""
    try:
        with open(image_path, "rb") as f:
            payload = {"apikey": api_key, "language": language}
            files = {"file": f}
            r = requests.post("https://api.ocr.space/parse/image", files=files, data=payload, timeout=60)
        r.raise_for_status()
        j = r.json()
        if j.get("IsErroredOnProcessing"):
            print("OCR.space processing error:", j)
            return ""
        parsed = [pr.get("ParsedText", "") for pr in j.get("ParsedResults", [])]
        return "\n".join(parsed)
    except Exception as e:
        print("ocr_space_file error:", e)
        return ""

def handle_image_upload(image_file, session_id):
    path = _get_path_from_gr_file(image_file)
    if not path:
        return "No image uploaded or file unreadable.", ""
    parsed = ocr_space_file(path, OCR_SPACE_API_KEY)
    if not parsed.strip():
        return "No extractable text found in the image.", ""
    chunks = chunk_text(parsed)
    IMAGE_TEXT[session_id] = chunks
    IMAGE_EMBEDS[session_id] = embed_model.encode(chunks, convert_to_tensor=True)
    return f"Image processed: {len(chunks)} chunks ready.", ""

def handle_image_question(question, session_id):
    if session_id not in IMAGE_TEXT:
        return "Image not found. Upload first."
    chunk = select_relevant_chunk(question, IMAGE_TEXT[session_id], IMAGE_EMBEDS[session_id])
    messages = [
        {"role": "system", "content": "You are a helpful assistant summarizing image text."},
        {"role": "user", "content": f"Image chunk:\n{chunk}\n\nQuestion: {question}"}
    ]
    assistant_text = groq_chat_completion(messages)
    assistant_text = f"**Snippet from Image:**\n{chunk[:200]}...\n\n**Answer:**\n{assistant_text}"
    if session_id not in SESSION_HISTORY:
        SESSION_HISTORY[session_id] = []
    SESSION_HISTORY[session_id].append({"role": "assistant", "content": assistant_text})
    return assistant_text

# ------------------ PDF Generation ------------------
def generate_pdf_file(text, filename_prefix="summary"):
    pdf = FPDF()
    pdf.add_page()
    pdf.set_auto_page_break(auto=True, margin=15)
    pdf.set_font("Arial", "B", size=14)
    pdf.multi_cell(0, 8, f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M')}\n\n")
    pdf.set_font("Arial", size=12)
    for line in text.split("\n"):
        pdf.multi_cell(0, 6, line)
    file_path = f"/tmp/{filename_prefix}_{uuid.uuid4()}.pdf"
    pdf.output(file_path)
    return file_path

def download_pdf_summary(session_id):
    summary_text = "\n".join([m["content"] for m in SESSION_HISTORY.get(session_id, []) if m["role"]=="assistant"])
    if not summary_text:
        summary_text = "No summary available."
    return generate_pdf_file(summary_text, "summary")

# ------------------ Voice & Chat Handlers ------------------
def _append_chat_display(session_id, user_text, assistant_text):
    if session_id not in CHAT_DISPLAY:
        CHAT_DISPLAY[session_id] = []
    CHAT_DISPLAY[session_id].append((user_text, assistant_text))

def handle_voice_general(audio_file, session_id, tts_lang="en", enhancer_enabled=False, enhancer_tone="Helpful"):
    path = _get_path_from_gr_file(audio_file)
    if not path:
        return "No audio provided.", None, []
    user_text = transcribe_audio(path)
    assistant_text = generate_response(session_id, user_text, enhancer_enabled, enhancer_tone)
    _append_chat_display(session_id, user_text, assistant_text)
    audio_path = synthesize_speech(assistant_text, lang=tts_lang)
    return assistant_text, audio_path, _chat_display_to_messages(CHAT_DISPLAY[session_id])

def handle_voice_pdf(audio_file, session_id, tts_lang="en"):
    path = _get_path_from_gr_file(audio_file)
    if not path:
        return "No audio provided.", None, []
    user_text = transcribe_audio(path)
    assistant_text = handle_pdf_question(user_text, session_id)
    _append_chat_display(session_id, user_text, assistant_text)
    audio_path = synthesize_speech(assistant_text, lang=tts_lang)
    return assistant_text, audio_path, _chat_display_to_messages(CHAT_DISPLAY[session_id])

def handle_voice_image(audio_file, session_id, tts_lang="en"):
    path = _get_path_from_gr_file(audio_file)
    if not path:
        return "No audio provided.", None, []
    user_text = transcribe_audio(path)
    assistant_text = handle_image_question(user_text, session_id)
    _append_chat_display(session_id, user_text, assistant_text)
    audio_path = synthesize_speech(assistant_text, lang=tts_lang)
    return assistant_text, audio_path, _chat_display_to_messages(CHAT_DISPLAY[session_id])

def handle_text_general(user_text, session_id, enhancer_enabled=False, enhancer_tone="Helpful"):
    assistant = generate_response(session_id, user_text, enhancer_enabled, enhancer_tone)
    _append_chat_display(session_id, user_text, assistant)
    return assistant, _chat_display_to_messages(CHAT_DISPLAY[session_id])

def handle_text_pdf(question, session_id):
    return handle_pdf_question(question, session_id)

def handle_text_image(question, session_id):
    return handle_image_question(question, session_id)

# ------------------ Gradio UI ------------------
with gr.Blocks(theme=gr.themes.Soft()) as demo:
    gr.HTML("""
    <style>
        /* Change height + width of the audio recorder box */
        #mic_box audio {
            height: 50px !important;   /* adjust height */
            width: 200px !important;    /* adjust width (optional) */
        }
    </style>
    """)
    gr.Markdown("## 🛠 Multi-Mode AI Assistant (Voice, PDF, Image)")

    session_voice = gr.State(str(uuid.uuid4()))
    session_pdf = gr.State(str(uuid.uuid4()))
    session_image = gr.State(str(uuid.uuid4()))
# FIX: define pdf_summary_file BEFORE it is used
    #pdf_summary_file = gr.File(label="Download Summary", visible=False)

    with gr.Tab("🎤 Voice Chat"):
        chat_voice = gr.Chatbot(type="messages", height=300)
        with gr.Row():
            mic = gr.Audio(type="filepath",label="🎤 Record Voice (hold & speak)", show_download_button=True, elem_id="mic_box")
            audio_output = gr.Audio(label="Assistant Voice Output", type="filepath", interactive=False)
            tts_lang = gr.Dropdown(choices=["en", "ur"], value="en", label="TTS Language")
            
        with gr.Row():
            btn_general = gr.Button("⚡Ask General 🎯")
            btn_pdf = gr.Button("⚡Ask PDF 📄")
            btn_image = gr.Button("⚡Ask Image 🖼")
            enhancer_toggle = gr.Checkbox(label="Enable Response Enhancer", value=False, scale =1)
            tone_dropdown = gr.Dropdown(choices=["Helpful", "Formal", "Friendly"], value="Helpful", label="Enhancer Tone", scale =1)
        with gr.Row():
            btn_reset_logs = gr.Button("♻ Reset LOGs")
            btn_download_logs = gr.Button("📥 Download Summary")
            Voice_summary_file = gr.File(label="📥Download Summary File", interactive=False,scale =1)
            #btn_general = gr.Button("⚡Ask General 🎯")
            #btn_pdf = gr.Button("⚡Ask PDF 📄")
            #btn_image = gr.Button("⚡Ask Image 🖼")
        #with gr.Row():
            #text_input = gr.Textbox(label="Or type a question (General)",visible=False)
            #btn_send_text = gr.Button("Send (Text General)",visible=False)
            #btn_reset_logs = gr.Button("♻ Reset LOGs")
        answer_voice = gr.Textbox(label="Assistant Answer (text)", lines=2, visible=False)

        btn_general.click(fn=handle_voice_general, 
                          inputs=[mic, session_voice, tts_lang, enhancer_toggle, tone_dropdown],
                          outputs=[answer_voice, audio_output, chat_voice])
        btn_pdf.click(fn=handle_voice_pdf, inputs=[mic, session_pdf, tts_lang], outputs=[answer_voice, audio_output, chat_voice])
        btn_image.click(fn=handle_voice_image, inputs=[mic, session_image, tts_lang], outputs=[answer_voice, audio_output, chat_voice])
       # btn_send_text.click(fn=handle_text_general, inputs=[text_input, session_voice, enhancer_toggle, tone_dropdown], outputs=[answer_voice, chat_voice])
        btn_reset_logs.click(lambda: (str(uuid.uuid4()), [], None, None, ""), outputs=[session_voice, chat_voice, mic, audio_output, answer_voice])
        btn_download_logs.click(download_pdf_summary, inputs=[session_voice], outputs=[Voice_summary_file])
        
    with gr.Tab("📄 PDF Summarizer"):
        pdf_output = gr.Textbox(label="Answer (Text Only)", lines=5)
        with gr.Row():
            pdf_upload_btn = gr.File(label="Upload PDF", file_types=[".pdf"], scale=1 )
            pdf_question = gr.Textbox(label="Ask a question about PDF (text)", lines=3)
            pdf_upload_msg = gr.Textbox(label="Upload Status", interactive=False)
        
        with gr.Row():
            pdf_send_btn = gr.Button("Ask (Questions)")
            pdf_reset_btn = gr.Button("♻ Reset LOGs")
        with gr.Row():
            pdf_summary_file = gr.File(label="📥Download Summary File", interactive=False,scale =1)
            pdf_download_btn = gr.Button("📥 Download Summary")

        pdf_upload_btn.upload(handle_pdf_upload, inputs=[pdf_upload_btn, session_pdf], outputs=[pdf_upload_msg])
        pdf_send_btn.click(handle_text_pdf, inputs=[pdf_question, session_pdf], outputs=[pdf_output])
        pdf_reset_btn.click(lambda: (str(uuid.uuid4()), ""), outputs=[session_pdf, pdf_output])
        pdf_download_btn.click(download_pdf_summary, inputs=[session_pdf], outputs=[pdf_summary_file])

    with gr.Tab("🖼 Image OCR"):
        image_output = gr.Textbox(label="Answer (Text Only)", lines=5)
        with gr.Row():
            image_upload_btn = gr.File(label="Upload Image", file_types=[".png", ".jpg", ".jpeg"], scale =1 )
            image_question = gr.Textbox(label="Ask question about Image", lines=3)
            image_upload_msg = gr.Textbox(label="Upload Status", interactive=False)
        
        with gr.Row():
            image_send_btn = gr.Button("Ask (Questions)")
            image_reset_btn = gr.Button("♻ Reset LOGs")
        with gr.Row():
            image_summary_file = gr.File(label="📥Download Summary File", interactive=False,scale =1)
            image_download_btn = gr.Button("📥 Download Summary")
        
        image_upload_btn.upload(handle_image_upload, inputs=[image_upload_btn, session_image], outputs=[image_upload_msg, image_output])
        image_send_btn.click(handle_text_image, inputs=[image_question, session_image], outputs=[image_output])
        image_reset_btn.click(lambda: (str(uuid.uuid4()), ""), outputs=[session_image, image_output])
        image_download_btn.click(download_pdf_summary, inputs=[session_image], outputs=[image_summary_file])

if __name__ == "__main__":
    demo.launch()