""" Version 5 — Multimodal AI Assistant (Image Generation) Extends Version 4 with: - On-demand image generation: after LLM response, decide if visual illustration helps - Text-to-image via HF Inference API (FLUX.1-schnell) or DALL-E 3 - General Chat tab (free-form Q&A with image gen) - Model: llama-3.3-70b-versatile | STT: whisper-large-v3 | TTS: gTTS """ # Patch for Hugging Face Spaces: HfFolder removed in huggingface_hub 0.26+ import huggingface_hub if not hasattr(huggingface_hub, "HfFolder"): class _HfFolderStub: @staticmethod def save_token(token): pass @staticmethod def get_token(): return None huggingface_hub.HfFolder = _HfFolderStub import json import os import re import tempfile import urllib3 from urllib.parse import urlparse import gradio as gr import requests from bs4 import BeautifulSoup from dotenv import load_dotenv from groq import Groq from youtube_transcript_api import YouTubeTranscriptApi from storage import load_storage, save_storage, history_dicts_to_tuples load_dotenv() GROQ_API_KEY = os.getenv("GROQ_API_KEY") BRIGHTDATA_API_KEY = os.getenv("BRIGHTDATA_API_KEY") BRIGHTDATA_UNLOCKER_ZONE = os.getenv("BRIGHTDATA_UNLOCKER_ZONE", "goodreads_unlocker") YOUTUBE_UNLOCKER_ZONE = os.getenv("YOUTUBE_UNLOCKER_ZONE", "").strip() or BRIGHTDATA_UNLOCKER_ZONE HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HUGGING_FACE_HUB_TOKEN") OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") if not GROQ_API_KEY: raise ValueError("GROQ_API_KEY is not set.") client = Groq(api_key=GROQ_API_KEY) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) contexts = {"scraper": "", "youtube": ""} storage = load_storage() # Model config LLM_MODEL = "llama-3.3-70b-versatile" # fallback to llama-3.1-8b-instant if unavailable IMAGE_MODELS_HF = ["black-forest-labs/FLUX.1-schnell", "stabilityai/stable-diffusion-xl-base-1.0"] # Lazy-load Whisper (heavy model) _transcriber = None def _get_transcriber(): global _transcriber if _transcriber is None: from transformers import pipeline _transcriber = pipeline( "automatic-speech-recognition", model="openai/whisper-large-v3", device=-1, ) return _transcriber # --------------------------------------------------------------------------- # Image Generation # --------------------------------------------------------------------------- _IMAGE_KEYWORDS = ( "visually", "visual", "show me", "diagram", "image", "picture", "illustrate", "illustration", "draw", "sketch", "look like", "looks like", "what does", "architecture", "structure", "solar system", "transformer", "neural network", "show", "display", "see", "demonstrate", ) def _should_generate_image(user_query: str, bot_response: str) -> bool: """Decide if a visual illustration would help. Uses keyword check + optional LLM fallback.""" q = (user_query or "").lower() for kw in _IMAGE_KEYWORDS: if kw in q or re.search(kw.replace(".*", ".*"), q): return True # Optional: short LLM call for edge cases try: resp = client.chat.completions.create( model="llama-3.1-8b-instant", messages=[ {"role": "system", "content": "Answer YES or NO only. Would a visual illustration help explain the user's question?"}, {"role": "user", "content": f"User asked: {user_query[:200]}"}, ], max_tokens=10, temperature=0, ) ans = (resp.choices[0].message.content or "").strip().upper() return "YES" in ans except Exception: return False def _generate_image_prompt(user_query: str) -> str: """Create a condensed, descriptive prompt for the image model from the user query.""" try: resp = client.chat.completions.create( model="llama-3.1-8b-instant", messages=[ {"role": "system", "content": "Generate a short, descriptive image prompt (max 100 chars) for a text-to-image model. Describe the main visual subject. No quotes."}, {"role": "user", "content": user_query[:300]}, ], max_tokens=80, temperature=0.5, ) prompt = (resp.choices[0].message.content or user_query[:100]).strip() return prompt[:200] or user_query[:100] except Exception: return user_query[:150] def _generate_image(prompt: str) -> str | None: """Generate image via HF Inference API (FLUX) or DALL-E 3. Returns path or None.""" if not prompt: return None # Try DALL-E 3 first if OpenAI key is set if OPENAI_API_KEY: try: from openai import OpenAI oai = OpenAI(api_key=OPENAI_API_KEY) resp = oai.images.generate(model="dall-e-3", prompt=prompt, size="1024x1024", n=1) url = resp.data[0].url r = requests.get(url, timeout=30) r.raise_for_status() fd, path = tempfile.mkstemp(suffix=".png") os.close(fd) with open(path, "wb") as f: f.write(r.content) return path except Exception as e: import logging logging.warning(f"DALL-E 3 image gen failed: {e}") # HF Inference Providers (router.huggingface.co - old api-inference is deprecated) if HF_TOKEN: try: from huggingface_hub import InferenceClient hf_client = InferenceClient(provider="auto", api_key=HF_TOKEN) image = None for model_id in IMAGE_MODELS_HF: try: image = hf_client.text_to_image(prompt, model=model_id) if image is not None: break except Exception: continue if image is None: raise ValueError("All HF image models failed") fd, path = tempfile.mkstemp(suffix=".png") os.close(fd) if hasattr(image, "save"): image.save(path) else: with open(path, "wb") as f: f.write(image if isinstance(image, bytes) else image) return path except Exception as e: import logging logging.warning(f"HF image gen failed: {e}") return None # --------------------------------------------------------------------------- # Tab 1: Bot-Protected Website Scraper # --------------------------------------------------------------------------- def scrape_website(url: str): if not url: return "Please enter a URL.", "" parsed = urlparse(url) target_url = url if "goodreads.com" in (parsed.netloc or "") and (parsed.path in ("", "/")): target_url = "https://www.goodreads.com/list/show/1.Best_Books_Ever" api_url = "https://api.brightdata.com/request" headers = { "Authorization": f"Bearer {BRIGHTDATA_API_KEY}", "Content-Type": "application/json", } payload = { "zone": BRIGHTDATA_UNLOCKER_ZONE, "url": target_url, "format": "raw", "method": "GET", } try: resp = requests.post(api_url, json=payload, headers=headers, timeout=120, verify=False) if resp.status_code in (400, 401): contexts["scraper"] = "" try: err_body = resp.json() except Exception: err_body = resp.text[:500] if resp.text else "" return ( f"Bright Data error ({resp.status_code}): {resp.reason}. Details: {err_body}. " "Check BRIGHTDATA_API_KEY and BRIGHTDATA_UNLOCKER_ZONE.", "", ) resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") if "goodreads.com/list/show/1.Best_Books_Ever" in target_url: books_data = [] book_rows = soup.find_all("tr", itemtype="http://schema.org/Book") for idx, row in enumerate(book_rows): title_elem = row.find("a", class_="bookTitle") author_elem = row.find("a", class_="authorName") rating_elem = row.find("span", class_="minirating") title = title_elem.text.strip() if title_elem else "Unknown Title" author = author_elem.text.strip() if author_elem else "Unknown Author" rating = rating_elem.text.strip() if rating_elem else "Unknown Rating" books_data.append({"Rank": idx + 1, "Title": title, "Author": author, "Rating": rating}) if books_data: lines = ["Here is the scraped data from Goodreads Best Books Ever list:\n"] for b in books_data: lines.append(f"{b['Rank']}. {b['Title']} by {b['Author']} - {b['Rating']}") text_content = "\n".join(lines) else: text_content = soup.get_text(separator=" ", strip=True) else: text_content = soup.get_text(separator=" ", strip=True) contexts["scraper"] = text_content[:15000] preview = text_content[:500] + "..." if len(text_content) > 500 else text_content return "Website scraped successfully. You can now chat about it.", preview except Exception as e: contexts["scraper"] = "" return f"Error scraping website: {e}", "" # --------------------------------------------------------------------------- # Tab 2: YouTube Transcript Q&A # --------------------------------------------------------------------------- _YOUTUBE_ID_REGEX = re.compile( r"(https?://)?(www\.)?" r"(youtube|youtu|youtube-nocookie)\.(com|be)/" r"(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})" ) def _extract_video_id(url_or_id: str) -> str: match = _YOUTUBE_ID_REGEX.search(url_or_id) return match.group(6) if match else url_or_id.strip() def _fetch_transcript_via_brightdata(video_id: str) -> str: if not BRIGHTDATA_API_KEY: raise ValueError("BRIGHTDATA_API_KEY is required for YouTube transcript on this environment.") api_url = "https://api.brightdata.com/request" headers = { "Authorization": f"Bearer {BRIGHTDATA_API_KEY}", "Content-Type": "application/json", } zone = YOUTUBE_UNLOCKER_ZONE watch_url = f"https://www.youtube.com/watch?v={video_id}" payload = {"zone": zone, "url": watch_url, "format": "raw", "method": "GET"} resp = requests.post(api_url, json=payload, headers=headers, timeout=120, verify=False) if resp.status_code == 400: raise ValueError(f"Bright Data zone '{zone}' rejected the YouTube URL.") resp.raise_for_status() html = resp.text match = re.search(r"ytInitialPlayerResponse\s*=\s*(\{)", html) if not match: raise ValueError("Could not find caption data on the video page.") start = match.end(1) - 1 depth, i = 0, start while i < len(html): if html[i] == "{": depth += 1 elif html[i] == "}": depth -= 1 if depth == 0: player = json.loads(html[start : i + 1]) break i += 1 else: raise ValueError("Could not parse caption data from the video page.") captions = player.get("captions", {}) or {} renderer = captions.get("playerCaptionsTracklistRenderer", {}) tracks = renderer.get("captionTracks", []) if not tracks: raise ValueError("No transcript available for this video.") base_url = tracks[0].get("baseUrl", "") if not base_url: raise ValueError("No caption track URL found.") caption_url = base_url + ("&" if "?" in base_url else "?") + "fmt=json3" payload2 = {"zone": zone, "url": caption_url, "format": "raw", "method": "GET"} resp2 = requests.post(api_url, json=payload2, headers=headers, timeout=60, verify=False) resp2.raise_for_status() caption_data = resp2.json() pieces = [] for event in caption_data.get("events", []): for seg in event.get("segs", []): text = seg.get("utf8", "").strip() if text and text != "\n": pieces.append(text) return " ".join(pieces) def fetch_youtube_transcript(video_input: str): if not video_input: return "Please enter a YouTube Video URL or ID.", "" video_id = _extract_video_id(video_input) try: api = YouTubeTranscriptApi() transcript_list = api.list(video_id) transcript = None try: transcript = transcript_list.find_transcript(["en", "ur"]) except Exception: try: transcript = transcript_list.find_generated_transcript(["en", "ur"]) except Exception: for t in transcript_list: transcript = t break if transcript is None: raise Exception("No transcript available for this video.") transcript_data = transcript.fetch() pieces = [] for t in transcript_data: if isinstance(t, dict): pieces.append(t.get("text", "")) else: pieces.append(getattr(t, "text", "")) transcript_text = " ".join(pieces) contexts["youtube"] = transcript_text[:15000] preview = transcript_text[:500] + "..." if len(transcript_text) > 500 else transcript_text return "Transcript fetched successfully. You can now chat about the video.", preview except Exception as e: err_str = str(e).lower() is_network_error = "resolve" in err_str or "hostname" in err_str or "no address" in err_str or "max retries" in err_str if is_network_error and BRIGHTDATA_API_KEY: try: transcript_text = _fetch_transcript_via_brightdata(video_id) contexts["youtube"] = transcript_text[:15000] preview = transcript_text[:500] + "..." if len(transcript_text) > 500 else transcript_text return "Transcript fetched via Bright Data. You can now chat about the video.", preview except Exception as fallback_err: contexts["youtube"] = "" return f"Direct fetch failed. Bright Data fallback failed: {fallback_err}", "" contexts["youtube"] = "" if is_network_error: msg = "YouTube transcript fetching failed (network restricted). Add BRIGHTDATA_API_KEY and YOUTUBE_UNLOCKER_ZONE." else: msg = f"Error: No transcript for video ID ({video_id}). Details: {e}" return msg, "" # --------------------------------------------------------------------------- # Multi-turn chat with image generation # --------------------------------------------------------------------------- def _build_system_prompt(mode: str) -> str: context = contexts.get(mode, "") prefs = storage.get("user_preferences", "").strip() ctx_placeholder = "(None — the user has NOT scraped or fetched transcript yet. You must refuse to answer and tell them to scrape/fetch first.)" base = ( "You are a helpful assistant. You must use ONLY the provided context to answer. " "NEVER use external knowledge. If the context says 'None' or the user has not scraped yet, refuse to answer and tell them to scrape or fetch transcript first. " "If the answer is not in the context, say so. You have conversation history for follow-up questions.\n\n" f"Context:\n{context.strip() if context else ctx_placeholder}" ) if prefs: base += f"\n\nUser preferences (follow these):\n{prefs}" return base def _chat_with_image(mode: str, message: str, history_key: str, system_prompt_fn): """Shared chat logic with optional image generation. Returns (clear_msg, history_tuples, image_path).""" if not message or not message.strip(): return "", history_dicts_to_tuples(storage.get(history_key, [])), None context = contexts.get(mode, "") or "" if mode != "general" else "general" if mode != "general" and not context.strip(): history_dicts = list(storage.get(history_key, [])) history_dicts.append({"role": "user", "content": message.strip()}) history_dicts.append({"role": "assistant", "content": "Please scrape a website or fetch a transcript first, then ask questions."}) storage[history_key] = history_dicts save_storage(storage) return "", history_dicts_to_tuples(history_dicts), None history_dicts = list(storage.get(history_key, [])) history_dicts.append({"role": "user", "content": message.strip()}) system_prompt = system_prompt_fn() messages = [{"role": "system", "content": system_prompt}] for m in history_dicts: if m.get("role") in ("user", "assistant"): messages.append({"role": m["role"], "content": m.get("content", "")}) try: resp = client.chat.completions.create( model=LLM_MODEL, messages=messages, max_tokens=1024, temperature=0.3, ) reply = resp.choices[0].message.content except Exception as e: try: resp = client.chat.completions.create( model="llama-3.1-8b-instant", messages=messages, max_tokens=1024, temperature=0.3, ) reply = resp.choices[0].message.content except Exception: reply = f"Error communicating with Groq: {e}" image_path = None if _should_generate_image(message.strip(), reply): img_prompt = _generate_image_prompt(message.strip()) image_path = _generate_image(img_prompt) if image_path: reply += "\n\n*Generated illustration below.*" elif HF_TOKEN: reply += "\n\n*(Image generation failed — check HF Inference Providers at hf.co/settings/inference-providers)*" history_dicts.append({"role": "assistant", "content": reply}) storage[history_key] = history_dicts save_storage(storage) return "", history_dicts_to_tuples(history_dicts), image_path def chat_turn_scraper(message: str, _history_ignored): out = _chat_with_image("scraper", message, "scraper_history", lambda: _build_system_prompt("scraper")) return out[0], out[1], out[2] def chat_turn_youtube(message: str, _history_ignored): out = _chat_with_image("youtube", message, "youtube_history", lambda: _build_system_prompt("youtube")) return out[0], out[1], out[2] def chat_turn_general(message: str, _history_ignored): """General chat: free-form Q&A with image generation. No context constraint.""" if not message or not message.strip(): return "", history_dicts_to_tuples(storage.get("general_history", [])), None prefs = storage.get("user_preferences", "").strip() system_prompt = "You are a helpful assistant. Answer concisely. You may use general knowledge." if prefs: system_prompt += f"\n\nUser preferences (follow these):\n{prefs}" out = _chat_with_image("general", message, "general_history", lambda: system_prompt) return out[0], out[1], out[2] def save_preferences(prefs: str): global storage storage["user_preferences"] = prefs or "" save_storage(storage) return "Preferences saved." # --------------------------------------------------------------------------- # Tab: Voice Assistant (Speech-to-Text → LLM → Text-to-Speech + optional Image) # --------------------------------------------------------------------------- def voice_chatbot(audio_input): if audio_input is None: return "No audio received.", None, "", None audio_path = audio_input[0] if isinstance(audio_input, tuple) else (audio_input.get("name") or audio_input.get("path", "") if isinstance(audio_input, dict) else audio_input) if not audio_path or not os.path.isfile(audio_path): return "Invalid audio file.", None, "", None try: transcriber = _get_transcriber() transcription = transcriber(audio_path) user_text = transcription.get("text", "").strip() or "(no speech detected)" except Exception as e: return f"Transcription error: {e}", None, "", None prefs = storage.get("user_preferences", "").strip() system_prompt = "You are a helpful assistant. Keep answers concise." if prefs: system_prompt += f"\n\nUser preferences (follow these):\n{prefs}" try: resp = client.chat.completions.create( model=LLM_MODEL, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_text}, ], max_tokens=512, temperature=0.7, ) bot_text = resp.choices[0].message.content except Exception: try: resp = client.chat.completions.create( model="llama-3.1-8b-instant", messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_text}, ], max_tokens=512, temperature=0.7, ) bot_text = resp.choices[0].message.content except Exception as e: bot_text = f"Error communicating with Groq: {e}" image_path = None if _should_generate_image(user_text, bot_text): img_prompt = _generate_image_prompt(user_text) image_path = _generate_image(img_prompt) if image_path: bot_text += "\n\n*Generated illustration below.*" elif HF_TOKEN: bot_text += "\n\n*(Image generation failed — check HF Inference Providers)*" try: from gtts import gTTS fd, output_path = tempfile.mkstemp(suffix=".mp3") os.close(fd) tts = gTTS(text=bot_text, lang="en") tts.save(output_path) except Exception as e: output_path = None bot_text += f"\n\n(TTS error: {e})" return bot_text, output_path, user_text, image_path # --------------------------------------------------------------------------- # Gradio UI # --------------------------------------------------------------------------- with gr.Blocks(title="Scraper Bot v5 — Multimodal AI (Image Gen)") as demo: gr.Markdown("# Version 5 — Multimodal AI Assistant (Image Generation)") gr.Markdown( "**Model:** llama-3.3-70b-versatile | **STT:** whisper-large-v3 | **TTS:** gTTS (🔓 Open-source) \n" "Extends v4 with **on-demand image generation**: ask for visual explanations and get illustrations via FLUX.1-schnell (HF) or DALL-E 3." ) with gr.Row(): with gr.Column(scale=1): gr.Markdown("### User preferences") prefs_input = gr.Textbox( label="Preferences", value=storage.get("user_preferences", ""), placeholder="e.g., Always respond formally.", lines=3, ) save_prefs_btn = gr.Button("Save preferences") prefs_status = gr.Textbox(label="Status", interactive=False) save_prefs_btn.click(save_preferences, inputs=[prefs_input], outputs=[prefs_status]) with gr.Column(scale=2): with gr.Tabs(): with gr.TabItem("General Chat (Image Gen)"): gr.Markdown("Free-form Q&A with optional image generation. Try: *Explain the solar system visually* or *Show me what a transformer architecture looks like*.") gen_chatbot = gr.Chatbot( value=history_dicts_to_tuples(storage.get("general_history", [])), height=300, label="Chat", ) gen_img_out = gr.Image(label="Generated illustration", type="filepath") gen_msg = gr.Textbox(label="Message", placeholder="Ask anything. Say 'visually' or 'show me' for images.") gen_msg.submit( lambda m, h: chat_turn_general(m, h), inputs=[gen_msg, gen_chatbot], outputs=[gen_msg, gen_chatbot, gen_img_out], ) with gr.TabItem("Bot-Protected Website Scraper"): gr.Markdown("Scrape a URL, then chat. Image gen when you ask for visual explanations.") with gr.Row(): url_input = gr.Textbox(label="URL", placeholder="https://www.goodreads.com/", scale=3) scrape_btn = gr.Button("Scrape URL", scale=1) scrape_status = gr.Textbox(label="Status", interactive=False) scrape_preview = gr.Textbox(label="Content preview", interactive=False, lines=4) scraper_chatbot = gr.Chatbot( value=history_dicts_to_tuples(storage.get("scraper_history", [])), height=280, label="Chat", ) scraper_img_out = gr.Image(label="Generated illustration", type="filepath") scraper_msg = gr.Textbox(label="Message", placeholder="e.g., What are the top 5 books?") scrape_btn.click(scrape_website, inputs=[url_input], outputs=[scrape_status, scrape_preview]) scraper_msg.submit( lambda m, h: chat_turn_scraper(m, h), inputs=[scraper_msg, scraper_chatbot], outputs=[scraper_msg, scraper_chatbot, scraper_img_out], ) with gr.TabItem("YouTube Transcript Q&A"): gr.Markdown("Fetch transcript, then chat. Image gen when you ask for visual explanations.") with gr.Row(): yt_input = gr.Textbox(label="YouTube URL or ID", placeholder="dQw4w9WgXcQ", scale=3) yt_btn = gr.Button("Get Transcript", scale=1) yt_status = gr.Textbox(label="Status", interactive=False) yt_preview = gr.Textbox(label="Transcript preview", interactive=False, lines=4) yt_chatbot = gr.Chatbot( value=history_dicts_to_tuples(storage.get("youtube_history", [])), height=280, label="Chat", ) yt_img_out = gr.Image(label="Generated illustration", type="filepath") yt_msg = gr.Textbox(label="Message", placeholder="e.g., Summarize the video.") yt_btn.click(fetch_youtube_transcript, inputs=[yt_input], outputs=[yt_status, yt_preview]) yt_msg.submit( lambda m, h: chat_turn_youtube(m, h), inputs=[yt_msg, yt_chatbot], outputs=[yt_msg, yt_chatbot, yt_img_out], ) with gr.TabItem("Voice Assistant"): gr.Markdown("Speak your query. Get text + audio + optional image.") mic_input = gr.Audio( label="Speak", sources=["microphone"], type="filepath", ) user_text_box = gr.Textbox(label="Transcribed text", interactive=False) bot_text_output = gr.Textbox(label="Bot text response", interactive=False) bot_audio_output = gr.Audio(label="Bot audio response", type="filepath", interactive=False) voice_img_out = gr.Image(label="Generated illustration", type="filepath") mic_input.change( fn=voice_chatbot, inputs=mic_input, outputs=[bot_text_output, bot_audio_output, user_text_box, voice_img_out], ) if __name__ == "__main__": if os.environ.get("SPACE_ID"): demo.launch() else: demo.launch(server_name="127.0.0.1", server_port=7866)