Spaces:
Sleeping
Sleeping
| # ============================================================ | |
| # YouTube RAG QA System β app.py | |
| # Transcript: Supadata API (works on HuggingFace, no SSL block) | |
| # LLM: Groq LLaMA 3.3-70B | |
| # Vector DB: FAISS + sentence-transformers | |
| # UI: Gradio 5 | |
| # ============================================================ | |
| import os | |
| import re | |
| import requests | |
| import numpy as np | |
| import faiss | |
| import gradio as gr | |
| from sentence_transformers import SentenceTransformer | |
| from groq import Groq | |
| # βββ GLOBAL STATE ββββββββββββββββββββββββββββββββββββββββββββ | |
| _embed_model = None | |
| _faiss_index = None | |
| _chunks = [] | |
| _groq_client = None | |
| # βββ LAZY CLIENTS ββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_groq_client(): | |
| global _groq_client | |
| if _groq_client is not None: | |
| return _groq_client | |
| api_key = os.environ.get("GROQ_API_KEY", "").strip() | |
| if not api_key: | |
| raise ValueError( | |
| "GROQ_API_KEY not set!\n" | |
| "Space β Settings β Variables and secrets β New secret\n" | |
| "Name: GROQ_API_KEY Value: gsk_xxxxxxxxxx" | |
| ) | |
| _groq_client = Groq(api_key=api_key) | |
| return _groq_client | |
| def get_embed_model(): | |
| global _embed_model | |
| if _embed_model is None: | |
| _embed_model = SentenceTransformer("all-MiniLM-L6-v2") | |
| return _embed_model | |
| # βββ MODULE 1: Transcript Fetcher (via Supadata REST API) ββββ | |
| def extract_video_id(url: str) -> str: | |
| """Extract 11-char YouTube video ID from any URL format.""" | |
| for pat in [ | |
| r"(?:v=|\/)([0-9A-Za-z_-]{11})", | |
| r"youtu\.be\/([0-9A-Za-z_-]{11})", | |
| r"shorts\/([0-9A-Za-z_-]{11})", | |
| ]: | |
| m = re.search(pat, url) | |
| if m: | |
| return m.group(1) | |
| raise ValueError(f"Cannot extract video ID from: {url}") | |
| def fetch_transcript(url: str) -> str: | |
| """ | |
| Fetch transcript using Supadata API β works on HuggingFace | |
| (no direct YouTube SSL connection needed). | |
| Free tier: 100 requests/month β get key at supadata.ai | |
| """ | |
| supadata_key = os.environ.get("SUPADATA_API_KEY", "").strip() | |
| if not supadata_key: | |
| raise ValueError( | |
| "SUPADATA_API_KEY not set!\n" | |
| "1. Go to https://supadata.ai β Sign up (free, no credit card)\n" | |
| "2. Copy your API key\n" | |
| "3. Space β Settings β Variables and secrets β New secret\n" | |
| " Name: SUPADATA_API_KEY Value: your_key_here" | |
| ) | |
| video_id = extract_video_id(url) | |
| response = requests.get( | |
| "https://api.supadata.ai/v1/youtube/transcript", | |
| params={"videoId": video_id, "text": "true"}, | |
| headers={"x-api-key": supadata_key}, | |
| timeout=30, | |
| ) | |
| if response.status_code == 401: | |
| raise ValueError("Invalid SUPADATA_API_KEY β check your key at supadata.ai") | |
| if response.status_code == 404: | |
| raise ValueError("No transcript found for this video (may be private or have no captions)") | |
| if response.status_code != 200: | |
| raise ValueError(f"Supadata API error {response.status_code}: {response.text}") | |
| data = response.json() | |
| # text=true returns plain string in data["content"] | |
| if isinstance(data.get("content"), str): | |
| return data["content"] | |
| # fallback: join segment list | |
| if isinstance(data.get("content"), list): | |
| return " ".join(seg.get("text", "") for seg in data["content"]) | |
| raise ValueError(f"Unexpected Supadata response: {data}") | |
| # βββ MODULE 2: Text Chunker βββββββββββββββββββββββββββββββββββ | |
| def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> list: | |
| """Split transcript into overlapping word-based chunks.""" | |
| words, chunks, start = text.split(), [], 0 | |
| while start < len(words): | |
| end = min(start + chunk_size, len(words)) | |
| chunks.append(" ".join(words[start:end])) | |
| if end == len(words): | |
| break | |
| start += chunk_size - overlap | |
| return chunks | |
| # βββ MODULE 3: Vector Store (FAISS) ββββββββββββββββββββββββββ | |
| def build_faiss_index(chunks: list): | |
| """Encode chunks with MiniLM β build FAISS L2 index.""" | |
| emb = get_embed_model().encode(chunks, show_progress_bar=False).astype("float32") | |
| index = faiss.IndexFlatL2(emb.shape[1]) | |
| index.add(emb) | |
| return index | |
| def retrieve_chunks(query: str, index, chunks: list, top_k: int = 4) -> list: | |
| """Return top-k most relevant chunks for a query.""" | |
| q_vec = get_embed_model().encode([query]).astype("float32") | |
| _, idxs = index.search(q_vec, top_k) | |
| return [chunks[i] for i in idxs[0] if i < len(chunks)] | |
| # βββ MODULE 4: LLM via Groq βββββββββββββββββββββββββββββββββββ | |
| def ask_llm(question: str, context_chunks: list) -> str: | |
| """Build RAG prompt and call Groq LLaMA 3.3-70B.""" | |
| context = "\n\n".join(f"[Chunk {i+1}]:\n{c}" for i, c in enumerate(context_chunks)) | |
| prompt = ( | |
| "You are a helpful assistant. Answer ONLY from the transcript context below.\n" | |
| "If the answer is not in the context, say: 'I could not find that in the video.'\n\n" | |
| f"CONTEXT:\n{context}\n\nQUESTION: {question}\n\nANSWER:" | |
| ) | |
| resp = get_groq_client().chat.completions.create( | |
| model="llama-3.3-70b-versatile", | |
| messages=[{"role": "user", "content": prompt}], | |
| max_tokens=1024, | |
| temperature=0.3, | |
| ) | |
| return resp.choices[0].message.content.strip() | |
| # βββ HANDLER: Process Video βββββββββββββββββββββββββββββββββββ | |
| def process_video(url: str): | |
| """Generator β yields live status messages to Textbox.""" | |
| global _faiss_index, _chunks | |
| if not url or not url.strip(): | |
| yield "β οΈ Please enter a YouTube URL first." | |
| return | |
| # Check keys before starting | |
| if not os.environ.get("SUPADATA_API_KEY", "").strip(): | |
| yield ( | |
| "β SUPADATA_API_KEY is missing!\n\n" | |
| "Steps to fix:\n" | |
| "1. Go to https://supadata.ai β Sign up FREE (no credit card)\n" | |
| "2. Get your API key from dashboard\n" | |
| "3. HuggingFace Space β Settings β Variables and secrets\n" | |
| "4. Click 'New secret'\n" | |
| " Name: SUPADATA_API_KEY\n" | |
| " Value: your_supadata_key_here\n" | |
| "5. Save β Space will restart β Try again!" | |
| ) | |
| return | |
| try: | |
| yield "β³ [1/4] Fetching transcript via Supadata API..." | |
| transcript = fetch_transcript(url.strip()) | |
| yield f"β [1/4] Transcript fetched! ({len(transcript.split()):,} words)\nβ³ [2/4] Splitting into chunks..." | |
| _chunks = chunk_text(transcript) | |
| yield f"β [2/4] {len(_chunks)} chunks created\nβ³ [3/4] Generating embeddings (30-60 sec on CPU)..." | |
| _faiss_index = build_faiss_index(_chunks) | |
| yield ( | |
| f"β [3/4] Embeddings generated\n" | |
| f"β [4/4] FAISS index ready!\n\n" | |
| f"π Done! {len(_chunks)} chunks indexed.\n" | |
| f"π Switch to 'π¬ Chat with Video' tab and ask your questions!" | |
| ) | |
| except Exception as e: | |
| _faiss_index = None | |
| _chunks = [] | |
| yield f"β Error: {e}" | |
| # βββ HANDLER: Chat ββββββββββββββββββββββββββββββββββββββββββββ | |
| def chat_fn(message: str, history: list): | |
| """RAG pipeline: retrieve β augment β LLM β answer.""" | |
| if not message.strip(): | |
| return history, "" | |
| if _faiss_index is None or not _chunks: | |
| history.append({"role": "user", "content": message}) | |
| history.append({"role": "assistant", "content": | |
| "β οΈ No video processed yet!\n\n" | |
| "1. Go to 'πΉ Process Video' tab\n" | |
| "2. Paste a YouTube URL\n" | |
| "3. Click π Process Video\n" | |
| "4. Wait for β success\n" | |
| "5. Come back here to chat!" | |
| }) | |
| return history, "" | |
| try: | |
| if not os.environ.get("GROQ_API_KEY", "").strip(): | |
| history.append({"role": "user", "content": message}) | |
| history.append({"role": "assistant", "content": | |
| "β GROQ_API_KEY is missing!\n\n" | |
| "Space β Settings β Variables and secrets β New secret\n" | |
| "Name: GROQ_API_KEY Value: gsk_xxxxxxxxxx" | |
| }) | |
| return history, "" | |
| context = retrieve_chunks(message, _faiss_index, _chunks) | |
| answer = ask_llm(message, context) | |
| history.append({"role": "user", "content": message}) | |
| history.append({"role": "assistant", "content": answer}) | |
| except Exception as e: | |
| history.append({"role": "user", "content": message}) | |
| history.append({"role": "assistant", "content": f"β Error: {e}"}) | |
| return history, "" | |
| # βββ GRADIO UI ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Blocks(title="YouTube RAG QA", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # π¬ YouTube RAG QA System | |
| ### Kisi bhi YouTube video se sawaal poochho! | |
| **Powered by:** Supadata Β· FAISS Β· sentence-transformers Β· Groq LLaMA 3.3-70B Β· Gradio 5 | |
| **Step 1 β** URL daalo + Process karo **Step 2 β** Chat tab mein sawaal karo | |
| """) | |
| with gr.Tabs(): | |
| # ββ Tab 1: Process Video ββββββββββββββββββββββββββββββ | |
| with gr.Tab("πΉ Process Video"): | |
| gr.Markdown("YouTube URL paste karo. Transcript fetch β chunk β embed β FAISS index.") | |
| with gr.Row(): | |
| url_box = gr.Textbox( | |
| label="π YouTube URL", | |
| placeholder="https://www.youtube.com/watch?v=... ya https://youtu.be/...", | |
| scale=4, | |
| ) | |
| process_btn = gr.Button("π Process Video", variant="primary", scale=1) | |
| status_box = gr.Textbox( | |
| label="π Live Processing Status", | |
| interactive=False, | |
| lines=9, | |
| ) | |
| process_btn.click(process_video, inputs=[url_box], outputs=[status_box]) | |
| # ββ Tab 2: Chat βββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("π¬ Chat with Video"): | |
| gr.Markdown("Video process hone ke baad yahan sawaal poochho.") | |
| chatbot = gr.Chatbot(type="messages", height=430, label="Chat") | |
| with gr.Row(): | |
| msg_box = gr.Textbox( | |
| placeholder="Sawaal likho aur Enter dabao...", | |
| label="Your Question", | |
| scale=5, | |
| ) | |
| send_btn = gr.Button("Send β€", variant="primary", scale=1) | |
| clear_btn = gr.Button("ποΈ Clear Chat", variant="secondary") | |
| send_btn.click(chat_fn, [msg_box, chatbot], [chatbot, msg_box]) | |
| msg_box.submit(chat_fn, [msg_box, chatbot], [chatbot, msg_box]) | |
| clear_btn.click(lambda: ([], ""), outputs=[chatbot, msg_box]) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) |