Spaces:
Sleeping
Sleeping
File size: 8,526 Bytes
69553a1 be2cf45 69553a1 be2cf45 69553a1 e2ba992 01388ff 69553a1 be2cf45 69553a1 be2cf45 69553a1 be2cf45 69553a1 be2cf45 69553a1 be2cf45 69553a1 be2cf45 69553a1 be2cf45 69553a1 be2cf45 69553a1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 |
from dotenv import load_dotenv
load_dotenv()
import os
if not os.environ.get("GOOGLE_API_KEY"):
raise RuntimeError("Please set the GOOGLE_API_KEY environment variable with your Google API key.")
import gradio as gr
from supadata import Supadata
from langchain_core.prompts import PromptTemplate
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores.faiss import FAISS
from langchain_huggingface import HuggingFaceEmbeddings, HuggingFaceEndpoint, ChatHuggingFace
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage
from langchain_google_genai import GoogleGenerativeAIEmbeddings
# Initialize Supadata
supadata = Supadata(api_key=os.environ.get("SUPADATA_API_KEY"))
# Initialize the text splitter
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=300,
chunk_overlap=30,
separators=["\n\n", "\n", " ", ".", ","],
length_function=len,
is_separator_regex=False
)
# Initialize the embeddings model
embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-mpnet-base-v2"
)
# Initialize the LLM
# llm = HuggingFaceEndpoint(
# repo_id="microsoft/Phi-3-mini-4k-instruct",
# task="text-generation",
# max_new_tokens=512,
# do_sample=False,
# repetition_penalty=1.03,
# huggingfacehub_api_token=os.getenv("HUGGINGFACE_TOKEN")
# )
chat = ChatGoogleGenerativeAI(
model="gemini-2.0-flash",
temperature=0.3,
max_tokens=5000,
timeout=None,
max_retries=2
)
# Define the prompt template
prompt = PromptTemplate(
template="""
You are an intelligent AI assistant specialized in analyzing YouTube video transcripts. Your task is to provide accurate, detailed, and helpful answers based solely on the provided transcript content.
IMPORTANT GUIDELINES:
- Answer ONLY from the provided transcript context
- If the context is insufficient to answer the question, clearly state "I don't have enough information from the transcript to answer this question"
- Provide specific details and examples from the transcript when possible
- Be concise but comprehensive in your responses
- If asked for a summary, organize the information logically
- If asked about specific topics, focus on what was actually discussed in the video
- Maintain a helpful and informative tone
TRANSCRIPT CONTEXT:
{context}
QUESTION: {question}
Please provide a clear and detailed answer based on the transcript above:
""",
input_variables=['context', 'question']
)
# Global variable to store the current retriever
current_retriever = None
current_video_id = None
def extract_video_id(url):
"""Extract video ID from YouTube URL."""
if "youtube.com/watch?v=" in url:
return url.split("watch?v=")[1].split("&")[0]
elif "youtu.be/" in url:
return url.split("youtu.be/")[1].split("?")[0]
return url # Assume it's already a video ID
def process_video_url(video_url_or_id):
"""Process video URL and create retriever object."""
global current_retriever, current_video_id
try:
# Extract video ID if URL is provided
video_id = extract_video_id(video_url_or_id)
# Check if we already have a retriever for this video
if current_video_id == video_id and current_retriever is not None:
return f"β
Video already processed: {video_id}"
# Get transcript using Supadata
transcript_response = supadata.youtube.transcript(
video_id=video_id,
text=True # Get plain text transcript
)
# Extract the transcript text
full_transcript_text = transcript_response.content
# print(f"β
Full transcript text: {full_transcript_text}")
# Create chunks
chunks = text_splitter.create_documents([full_transcript_text])
# Create vector store
vector_store = FAISS.from_documents(chunks, embeddings)
current_retriever = vector_store.as_retriever(search_type="similarity", search_kwargs={"k": 8})
# print(f"β
Current Retreiver : {current_retriever}")
current_video_id = video_id
return f"β
Video processed successfully: {video_id}"
except Exception as e:
return f"β Error processing video: {str(e)}"
def answer_question(question):
global current_retriever
if current_retriever is None:
return "β Process a video first."
try:
# Retrieve docs with scores
docs_and_scores = current_retriever.vectorstore.similarity_search_with_score(
question, k=8
)
for doc, score in docs_and_scores:
print(f"[DEBUG] score={score:.3f}, text={doc.page_content}")
# Extract docs
retrieved_docs = [doc for doc, _ in docs_and_scores]
# Build context and reply as before
context_text = "\n\n".join(doc.page_content for doc in retrieved_docs)
# print("\nContext text:\n", context_text)
final_prompt = prompt.invoke({"context": context_text, "question": question})
answer = chat.invoke(final_prompt)
return answer.content
except Exception as e:
print(f"[ERROR] in answer_question: {e}")
return f"β Error: {str(e)}"
def process_video(video_url_or_id, question):
"""Legacy function for backward compatibility."""
# Process video first
process_result = process_video_url(video_url_or_id)
if "β" in process_result:
return process_result
# Then answer question
return answer_question(question)
def main():
with gr.Blocks(title="YouTube Transcript Q&A", theme=gr.themes.Soft()) as iface:
gr.Markdown("""
# YouTube Transcript Q&A
Ask questions about any YouTube video's content!
## How to use:
1. Paste a YouTube video URL (e.g., https://www.youtube.com/watch?v=JaRGJVrJBQ8) or just the video ID
2. Click "Process Video" to download and process the transcript (this happens once per video)
3. Type your question about the video content
4. Click "Ask Question" to get your answer
""")
with gr.Row():
with gr.Column():
video_input = gr.Textbox(
label="YouTube Video URL or ID",
placeholder="Enter YouTube URL or video ID (e.g., JaRGJVrJBQ8)",
lines=1
)
process_btn = gr.Button("Process Video", variant="primary")
process_status = gr.Textbox(
label="Processing Status",
lines=1,
interactive=False
)
gr.Markdown("---")
question_input = gr.Textbox(
label="Your Question",
placeholder="What would you like to know about this video?",
lines=2
)
ask_btn = gr.Button("Ask Question", variant="secondary")
with gr.Column():
output = gr.Textbox(
label="Answer",
lines=8,
show_copy_button=True
)
# Example inputs
gr.Examples(
examples=[
["https://www.youtube.com/watch?v=-moW9jvvMr4&t=1s", "What is this video about?"],
["https://www.youtube.com/watch?v=-moW9jvvMr4&t=1s", "What are the main topics discussed in this video?"],
["https://www.youtube.com/watch?v=-moW9jvvMr4&t=1s", "Can you summarize the key points from this video?"],
["-moW9jvvMr4", "What are the most important takeaways from this content?"]
],
inputs=[video_input, question_input]
)
# Connect the buttons
process_btn.click(
fn=process_video_url,
inputs=[video_input],
outputs=[process_status]
)
ask_btn.click(
fn=answer_question,
inputs=[question_input],
outputs=[output]
)
iface.launch()
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == "--test":
result = process_video("https://www.youtube.com/watch?v=gN-QWM5iY9M", "What is this video about?")
print(result)
else:
main() |