MSDSF25M005 / app.py
Ansnaeem's picture
Upload folder using huggingface_hub
2819eb9 verified
"""
Version 5 — Multimodal AI Assistant (Image Generation)
Extends Version 4 with:
- On-demand image generation: after LLM response, decide if visual illustration helps
- Text-to-image via HF Inference API (FLUX.1-schnell) or DALL-E 3
- General Chat tab (free-form Q&A with image gen)
- Model: llama-3.3-70b-versatile | STT: whisper-large-v3 | TTS: gTTS
"""
# Patch for Hugging Face Spaces: HfFolder removed in huggingface_hub 0.26+
import huggingface_hub
if not hasattr(huggingface_hub, "HfFolder"):
class _HfFolderStub:
@staticmethod
def save_token(token): pass
@staticmethod
def get_token(): return None
huggingface_hub.HfFolder = _HfFolderStub
import json
import os
import re
import tempfile
import urllib3
from urllib.parse import urlparse
import gradio as gr
import requests
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from groq import Groq
from youtube_transcript_api import YouTubeTranscriptApi
from storage import load_storage, save_storage, history_dicts_to_tuples
load_dotenv()
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
BRIGHTDATA_API_KEY = os.getenv("BRIGHTDATA_API_KEY")
BRIGHTDATA_UNLOCKER_ZONE = os.getenv("BRIGHTDATA_UNLOCKER_ZONE", "goodreads_unlocker")
YOUTUBE_UNLOCKER_ZONE = os.getenv("YOUTUBE_UNLOCKER_ZONE", "").strip() or BRIGHTDATA_UNLOCKER_ZONE
HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HUGGING_FACE_HUB_TOKEN")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
if not GROQ_API_KEY:
raise ValueError("GROQ_API_KEY is not set.")
client = Groq(api_key=GROQ_API_KEY)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
contexts = {"scraper": "", "youtube": ""}
storage = load_storage()
# Model config
LLM_MODEL = "llama-3.3-70b-versatile" # fallback to llama-3.1-8b-instant if unavailable
IMAGE_MODELS_HF = ["black-forest-labs/FLUX.1-schnell", "stabilityai/stable-diffusion-xl-base-1.0"]
# Lazy-load Whisper (heavy model)
_transcriber = None
def _get_transcriber():
global _transcriber
if _transcriber is None:
from transformers import pipeline
_transcriber = pipeline(
"automatic-speech-recognition",
model="openai/whisper-large-v3",
device=-1,
)
return _transcriber
# ---------------------------------------------------------------------------
# Image Generation
# ---------------------------------------------------------------------------
_IMAGE_KEYWORDS = (
"visually", "visual", "show me", "diagram", "image", "picture", "illustrate",
"illustration", "draw", "sketch", "look like", "looks like", "what does",
"architecture", "structure", "solar system", "transformer", "neural network",
"show", "display", "see", "demonstrate",
)
def _should_generate_image(user_query: str, bot_response: str) -> bool:
"""Decide if a visual illustration would help. Uses keyword check + optional LLM fallback."""
q = (user_query or "").lower()
for kw in _IMAGE_KEYWORDS:
if kw in q or re.search(kw.replace(".*", ".*"), q):
return True
# Optional: short LLM call for edge cases
try:
resp = client.chat.completions.create(
model="llama-3.1-8b-instant",
messages=[
{"role": "system", "content": "Answer YES or NO only. Would a visual illustration help explain the user's question?"},
{"role": "user", "content": f"User asked: {user_query[:200]}"},
],
max_tokens=10,
temperature=0,
)
ans = (resp.choices[0].message.content or "").strip().upper()
return "YES" in ans
except Exception:
return False
def _generate_image_prompt(user_query: str) -> str:
"""Create a condensed, descriptive prompt for the image model from the user query."""
try:
resp = client.chat.completions.create(
model="llama-3.1-8b-instant",
messages=[
{"role": "system", "content": "Generate a short, descriptive image prompt (max 100 chars) for a text-to-image model. Describe the main visual subject. No quotes."},
{"role": "user", "content": user_query[:300]},
],
max_tokens=80,
temperature=0.5,
)
prompt = (resp.choices[0].message.content or user_query[:100]).strip()
return prompt[:200] or user_query[:100]
except Exception:
return user_query[:150]
def _generate_image(prompt: str) -> str | None:
"""Generate image via HF Inference API (FLUX) or DALL-E 3. Returns path or None."""
if not prompt:
return None
# Try DALL-E 3 first if OpenAI key is set
if OPENAI_API_KEY:
try:
from openai import OpenAI
oai = OpenAI(api_key=OPENAI_API_KEY)
resp = oai.images.generate(model="dall-e-3", prompt=prompt, size="1024x1024", n=1)
url = resp.data[0].url
r = requests.get(url, timeout=30)
r.raise_for_status()
fd, path = tempfile.mkstemp(suffix=".png")
os.close(fd)
with open(path, "wb") as f:
f.write(r.content)
return path
except Exception as e:
import logging
logging.warning(f"DALL-E 3 image gen failed: {e}")
# HF Inference Providers (router.huggingface.co - old api-inference is deprecated)
if HF_TOKEN:
try:
from huggingface_hub import InferenceClient
hf_client = InferenceClient(provider="auto", api_key=HF_TOKEN)
image = None
for model_id in IMAGE_MODELS_HF:
try:
image = hf_client.text_to_image(prompt, model=model_id)
if image is not None:
break
except Exception:
continue
if image is None:
raise ValueError("All HF image models failed")
fd, path = tempfile.mkstemp(suffix=".png")
os.close(fd)
if hasattr(image, "save"):
image.save(path)
else:
with open(path, "wb") as f:
f.write(image if isinstance(image, bytes) else image)
return path
except Exception as e:
import logging
logging.warning(f"HF image gen failed: {e}")
return None
# ---------------------------------------------------------------------------
# Tab 1: Bot-Protected Website Scraper
# ---------------------------------------------------------------------------
def scrape_website(url: str):
if not url:
return "Please enter a URL.", ""
parsed = urlparse(url)
target_url = url
if "goodreads.com" in (parsed.netloc or "") and (parsed.path in ("", "/")):
target_url = "https://www.goodreads.com/list/show/1.Best_Books_Ever"
api_url = "https://api.brightdata.com/request"
headers = {
"Authorization": f"Bearer {BRIGHTDATA_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"zone": BRIGHTDATA_UNLOCKER_ZONE,
"url": target_url,
"format": "raw",
"method": "GET",
}
try:
resp = requests.post(api_url, json=payload, headers=headers, timeout=120, verify=False)
if resp.status_code in (400, 401):
contexts["scraper"] = ""
try:
err_body = resp.json()
except Exception:
err_body = resp.text[:500] if resp.text else ""
return (
f"Bright Data error ({resp.status_code}): {resp.reason}. Details: {err_body}. "
"Check BRIGHTDATA_API_KEY and BRIGHTDATA_UNLOCKER_ZONE.",
"",
)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
if "goodreads.com/list/show/1.Best_Books_Ever" in target_url:
books_data = []
book_rows = soup.find_all("tr", itemtype="http://schema.org/Book")
for idx, row in enumerate(book_rows):
title_elem = row.find("a", class_="bookTitle")
author_elem = row.find("a", class_="authorName")
rating_elem = row.find("span", class_="minirating")
title = title_elem.text.strip() if title_elem else "Unknown Title"
author = author_elem.text.strip() if author_elem else "Unknown Author"
rating = rating_elem.text.strip() if rating_elem else "Unknown Rating"
books_data.append({"Rank": idx + 1, "Title": title, "Author": author, "Rating": rating})
if books_data:
lines = ["Here is the scraped data from Goodreads Best Books Ever list:\n"]
for b in books_data:
lines.append(f"{b['Rank']}. {b['Title']} by {b['Author']} - {b['Rating']}")
text_content = "\n".join(lines)
else:
text_content = soup.get_text(separator=" ", strip=True)
else:
text_content = soup.get_text(separator=" ", strip=True)
contexts["scraper"] = text_content[:15000]
preview = text_content[:500] + "..." if len(text_content) > 500 else text_content
return "Website scraped successfully. You can now chat about it.", preview
except Exception as e:
contexts["scraper"] = ""
return f"Error scraping website: {e}", ""
# ---------------------------------------------------------------------------
# Tab 2: YouTube Transcript Q&A
# ---------------------------------------------------------------------------
_YOUTUBE_ID_REGEX = re.compile(
r"(https?://)?(www\.)?"
r"(youtube|youtu|youtube-nocookie)\.(com|be)/"
r"(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})"
)
def _extract_video_id(url_or_id: str) -> str:
match = _YOUTUBE_ID_REGEX.search(url_or_id)
return match.group(6) if match else url_or_id.strip()
def _fetch_transcript_via_brightdata(video_id: str) -> str:
if not BRIGHTDATA_API_KEY:
raise ValueError("BRIGHTDATA_API_KEY is required for YouTube transcript on this environment.")
api_url = "https://api.brightdata.com/request"
headers = {
"Authorization": f"Bearer {BRIGHTDATA_API_KEY}",
"Content-Type": "application/json",
}
zone = YOUTUBE_UNLOCKER_ZONE
watch_url = f"https://www.youtube.com/watch?v={video_id}"
payload = {"zone": zone, "url": watch_url, "format": "raw", "method": "GET"}
resp = requests.post(api_url, json=payload, headers=headers, timeout=120, verify=False)
if resp.status_code == 400:
raise ValueError(f"Bright Data zone '{zone}' rejected the YouTube URL.")
resp.raise_for_status()
html = resp.text
match = re.search(r"ytInitialPlayerResponse\s*=\s*(\{)", html)
if not match:
raise ValueError("Could not find caption data on the video page.")
start = match.end(1) - 1
depth, i = 0, start
while i < len(html):
if html[i] == "{":
depth += 1
elif html[i] == "}":
depth -= 1
if depth == 0:
player = json.loads(html[start : i + 1])
break
i += 1
else:
raise ValueError("Could not parse caption data from the video page.")
captions = player.get("captions", {}) or {}
renderer = captions.get("playerCaptionsTracklistRenderer", {})
tracks = renderer.get("captionTracks", [])
if not tracks:
raise ValueError("No transcript available for this video.")
base_url = tracks[0].get("baseUrl", "")
if not base_url:
raise ValueError("No caption track URL found.")
caption_url = base_url + ("&" if "?" in base_url else "?") + "fmt=json3"
payload2 = {"zone": zone, "url": caption_url, "format": "raw", "method": "GET"}
resp2 = requests.post(api_url, json=payload2, headers=headers, timeout=60, verify=False)
resp2.raise_for_status()
caption_data = resp2.json()
pieces = []
for event in caption_data.get("events", []):
for seg in event.get("segs", []):
text = seg.get("utf8", "").strip()
if text and text != "\n":
pieces.append(text)
return " ".join(pieces)
def fetch_youtube_transcript(video_input: str):
if not video_input:
return "Please enter a YouTube Video URL or ID.", ""
video_id = _extract_video_id(video_input)
try:
api = YouTubeTranscriptApi()
transcript_list = api.list(video_id)
transcript = None
try:
transcript = transcript_list.find_transcript(["en", "ur"])
except Exception:
try:
transcript = transcript_list.find_generated_transcript(["en", "ur"])
except Exception:
for t in transcript_list:
transcript = t
break
if transcript is None:
raise Exception("No transcript available for this video.")
transcript_data = transcript.fetch()
pieces = []
for t in transcript_data:
if isinstance(t, dict):
pieces.append(t.get("text", ""))
else:
pieces.append(getattr(t, "text", ""))
transcript_text = " ".join(pieces)
contexts["youtube"] = transcript_text[:15000]
preview = transcript_text[:500] + "..." if len(transcript_text) > 500 else transcript_text
return "Transcript fetched successfully. You can now chat about the video.", preview
except Exception as e:
err_str = str(e).lower()
is_network_error = "resolve" in err_str or "hostname" in err_str or "no address" in err_str or "max retries" in err_str
if is_network_error and BRIGHTDATA_API_KEY:
try:
transcript_text = _fetch_transcript_via_brightdata(video_id)
contexts["youtube"] = transcript_text[:15000]
preview = transcript_text[:500] + "..." if len(transcript_text) > 500 else transcript_text
return "Transcript fetched via Bright Data. You can now chat about the video.", preview
except Exception as fallback_err:
contexts["youtube"] = ""
return f"Direct fetch failed. Bright Data fallback failed: {fallback_err}", ""
contexts["youtube"] = ""
if is_network_error:
msg = "YouTube transcript fetching failed (network restricted). Add BRIGHTDATA_API_KEY and YOUTUBE_UNLOCKER_ZONE."
else:
msg = f"Error: No transcript for video ID ({video_id}). Details: {e}"
return msg, ""
# ---------------------------------------------------------------------------
# Multi-turn chat with image generation
# ---------------------------------------------------------------------------
def _build_system_prompt(mode: str) -> str:
context = contexts.get(mode, "")
prefs = storage.get("user_preferences", "").strip()
ctx_placeholder = "(None — the user has NOT scraped or fetched transcript yet. You must refuse to answer and tell them to scrape/fetch first.)"
base = (
"You are a helpful assistant. You must use ONLY the provided context to answer. "
"NEVER use external knowledge. If the context says 'None' or the user has not scraped yet, refuse to answer and tell them to scrape or fetch transcript first. "
"If the answer is not in the context, say so. You have conversation history for follow-up questions.\n\n"
f"Context:\n{context.strip() if context else ctx_placeholder}"
)
if prefs:
base += f"\n\nUser preferences (follow these):\n{prefs}"
return base
def _chat_with_image(mode: str, message: str, history_key: str, system_prompt_fn):
"""Shared chat logic with optional image generation. Returns (clear_msg, history_tuples, image_path)."""
if not message or not message.strip():
return "", history_dicts_to_tuples(storage.get(history_key, [])), None
context = contexts.get(mode, "") or "" if mode != "general" else "general"
if mode != "general" and not context.strip():
history_dicts = list(storage.get(history_key, []))
history_dicts.append({"role": "user", "content": message.strip()})
history_dicts.append({"role": "assistant", "content": "Please scrape a website or fetch a transcript first, then ask questions."})
storage[history_key] = history_dicts
save_storage(storage)
return "", history_dicts_to_tuples(history_dicts), None
history_dicts = list(storage.get(history_key, []))
history_dicts.append({"role": "user", "content": message.strip()})
system_prompt = system_prompt_fn()
messages = [{"role": "system", "content": system_prompt}]
for m in history_dicts:
if m.get("role") in ("user", "assistant"):
messages.append({"role": m["role"], "content": m.get("content", "")})
try:
resp = client.chat.completions.create(
model=LLM_MODEL,
messages=messages,
max_tokens=1024,
temperature=0.3,
)
reply = resp.choices[0].message.content
except Exception as e:
try:
resp = client.chat.completions.create(
model="llama-3.1-8b-instant",
messages=messages,
max_tokens=1024,
temperature=0.3,
)
reply = resp.choices[0].message.content
except Exception:
reply = f"Error communicating with Groq: {e}"
image_path = None
if _should_generate_image(message.strip(), reply):
img_prompt = _generate_image_prompt(message.strip())
image_path = _generate_image(img_prompt)
if image_path:
reply += "\n\n*Generated illustration below.*"
elif HF_TOKEN:
reply += "\n\n*(Image generation failed — check HF Inference Providers at hf.co/settings/inference-providers)*"
history_dicts.append({"role": "assistant", "content": reply})
storage[history_key] = history_dicts
save_storage(storage)
return "", history_dicts_to_tuples(history_dicts), image_path
def chat_turn_scraper(message: str, _history_ignored):
out = _chat_with_image("scraper", message, "scraper_history", lambda: _build_system_prompt("scraper"))
return out[0], out[1], out[2]
def chat_turn_youtube(message: str, _history_ignored):
out = _chat_with_image("youtube", message, "youtube_history", lambda: _build_system_prompt("youtube"))
return out[0], out[1], out[2]
def chat_turn_general(message: str, _history_ignored):
"""General chat: free-form Q&A with image generation. No context constraint."""
if not message or not message.strip():
return "", history_dicts_to_tuples(storage.get("general_history", [])), None
prefs = storage.get("user_preferences", "").strip()
system_prompt = "You are a helpful assistant. Answer concisely. You may use general knowledge."
if prefs:
system_prompt += f"\n\nUser preferences (follow these):\n{prefs}"
out = _chat_with_image("general", message, "general_history", lambda: system_prompt)
return out[0], out[1], out[2]
def save_preferences(prefs: str):
global storage
storage["user_preferences"] = prefs or ""
save_storage(storage)
return "Preferences saved."
# ---------------------------------------------------------------------------
# Tab: Voice Assistant (Speech-to-Text → LLM → Text-to-Speech + optional Image)
# ---------------------------------------------------------------------------
def voice_chatbot(audio_input):
if audio_input is None:
return "No audio received.", None, "", None
audio_path = audio_input[0] if isinstance(audio_input, tuple) else (audio_input.get("name") or audio_input.get("path", "") if isinstance(audio_input, dict) else audio_input)
if not audio_path or not os.path.isfile(audio_path):
return "Invalid audio file.", None, "", None
try:
transcriber = _get_transcriber()
transcription = transcriber(audio_path)
user_text = transcription.get("text", "").strip() or "(no speech detected)"
except Exception as e:
return f"Transcription error: {e}", None, "", None
prefs = storage.get("user_preferences", "").strip()
system_prompt = "You are a helpful assistant. Keep answers concise."
if prefs:
system_prompt += f"\n\nUser preferences (follow these):\n{prefs}"
try:
resp = client.chat.completions.create(
model=LLM_MODEL,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_text},
],
max_tokens=512,
temperature=0.7,
)
bot_text = resp.choices[0].message.content
except Exception:
try:
resp = client.chat.completions.create(
model="llama-3.1-8b-instant",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_text},
],
max_tokens=512,
temperature=0.7,
)
bot_text = resp.choices[0].message.content
except Exception as e:
bot_text = f"Error communicating with Groq: {e}"
image_path = None
if _should_generate_image(user_text, bot_text):
img_prompt = _generate_image_prompt(user_text)
image_path = _generate_image(img_prompt)
if image_path:
bot_text += "\n\n*Generated illustration below.*"
elif HF_TOKEN:
bot_text += "\n\n*(Image generation failed — check HF Inference Providers)*"
try:
from gtts import gTTS
fd, output_path = tempfile.mkstemp(suffix=".mp3")
os.close(fd)
tts = gTTS(text=bot_text, lang="en")
tts.save(output_path)
except Exception as e:
output_path = None
bot_text += f"\n\n(TTS error: {e})"
return bot_text, output_path, user_text, image_path
# ---------------------------------------------------------------------------
# Gradio UI
# ---------------------------------------------------------------------------
with gr.Blocks(title="Scraper Bot v5 — Multimodal AI (Image Gen)") as demo:
gr.Markdown("# Version 5 — Multimodal AI Assistant (Image Generation)")
gr.Markdown(
"**Model:** llama-3.3-70b-versatile | **STT:** whisper-large-v3 | **TTS:** gTTS (🔓 Open-source) \n"
"Extends v4 with **on-demand image generation**: ask for visual explanations and get illustrations via FLUX.1-schnell (HF) or DALL-E 3."
)
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### User preferences")
prefs_input = gr.Textbox(
label="Preferences",
value=storage.get("user_preferences", ""),
placeholder="e.g., Always respond formally.",
lines=3,
)
save_prefs_btn = gr.Button("Save preferences")
prefs_status = gr.Textbox(label="Status", interactive=False)
save_prefs_btn.click(save_preferences, inputs=[prefs_input], outputs=[prefs_status])
with gr.Column(scale=2):
with gr.Tabs():
with gr.TabItem("General Chat (Image Gen)"):
gr.Markdown("Free-form Q&A with optional image generation. Try: *Explain the solar system visually* or *Show me what a transformer architecture looks like*.")
gen_chatbot = gr.Chatbot(
value=history_dicts_to_tuples(storage.get("general_history", [])),
height=300,
label="Chat",
)
gen_img_out = gr.Image(label="Generated illustration", type="filepath")
gen_msg = gr.Textbox(label="Message", placeholder="Ask anything. Say 'visually' or 'show me' for images.")
gen_msg.submit(
lambda m, h: chat_turn_general(m, h),
inputs=[gen_msg, gen_chatbot],
outputs=[gen_msg, gen_chatbot, gen_img_out],
)
with gr.TabItem("Bot-Protected Website Scraper"):
gr.Markdown("Scrape a URL, then chat. Image gen when you ask for visual explanations.")
with gr.Row():
url_input = gr.Textbox(label="URL", placeholder="https://www.goodreads.com/", scale=3)
scrape_btn = gr.Button("Scrape URL", scale=1)
scrape_status = gr.Textbox(label="Status", interactive=False)
scrape_preview = gr.Textbox(label="Content preview", interactive=False, lines=4)
scraper_chatbot = gr.Chatbot(
value=history_dicts_to_tuples(storage.get("scraper_history", [])),
height=280,
label="Chat",
)
scraper_img_out = gr.Image(label="Generated illustration", type="filepath")
scraper_msg = gr.Textbox(label="Message", placeholder="e.g., What are the top 5 books?")
scrape_btn.click(scrape_website, inputs=[url_input], outputs=[scrape_status, scrape_preview])
scraper_msg.submit(
lambda m, h: chat_turn_scraper(m, h),
inputs=[scraper_msg, scraper_chatbot],
outputs=[scraper_msg, scraper_chatbot, scraper_img_out],
)
with gr.TabItem("YouTube Transcript Q&A"):
gr.Markdown("Fetch transcript, then chat. Image gen when you ask for visual explanations.")
with gr.Row():
yt_input = gr.Textbox(label="YouTube URL or ID", placeholder="dQw4w9WgXcQ", scale=3)
yt_btn = gr.Button("Get Transcript", scale=1)
yt_status = gr.Textbox(label="Status", interactive=False)
yt_preview = gr.Textbox(label="Transcript preview", interactive=False, lines=4)
yt_chatbot = gr.Chatbot(
value=history_dicts_to_tuples(storage.get("youtube_history", [])),
height=280,
label="Chat",
)
yt_img_out = gr.Image(label="Generated illustration", type="filepath")
yt_msg = gr.Textbox(label="Message", placeholder="e.g., Summarize the video.")
yt_btn.click(fetch_youtube_transcript, inputs=[yt_input], outputs=[yt_status, yt_preview])
yt_msg.submit(
lambda m, h: chat_turn_youtube(m, h),
inputs=[yt_msg, yt_chatbot],
outputs=[yt_msg, yt_chatbot, yt_img_out],
)
with gr.TabItem("Voice Assistant"):
gr.Markdown("Speak your query. Get text + audio + optional image.")
mic_input = gr.Audio(
label="Speak",
sources=["microphone"],
type="filepath",
)
user_text_box = gr.Textbox(label="Transcribed text", interactive=False)
bot_text_output = gr.Textbox(label="Bot text response", interactive=False)
bot_audio_output = gr.Audio(label="Bot audio response", type="filepath", interactive=False)
voice_img_out = gr.Image(label="Generated illustration", type="filepath")
mic_input.change(
fn=voice_chatbot,
inputs=mic_input,
outputs=[bot_text_output, bot_audio_output, user_text_box, voice_img_out],
)
if __name__ == "__main__":
if os.environ.get("SPACE_ID"):
demo.launch()
else:
demo.launch(server_name="127.0.0.1", server_port=7866)