from langchain_google_genai import GoogleGenerativeAIEmbeddings, ChatGoogleGenerativeAI from langchain_community.vectorstores import FAISS from langchain.schema import Document from langchain.chains import RetrievalQA from langchain.chains.summarize import load_summarize_chain from langchain.prompts import PromptTemplate import requests import xml.etree.ElementTree as ET import os from sqlalchemy.orm import Session from ..db import crud GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY') COOKIES = os.getenv('COOKIES') REF = os.getenv('REFERER') transcript_api = os.getenv('TRANSCRIPT_API_URL') llm = ChatGoogleGenerativeAI(model="gemini-flash-lite-latest", google_api_key=GOOGLE_API_KEY) embeddings = GoogleGenerativeAIEmbeddings(model="gemini-embedding-001", google_api_key=GOOGLE_API_KEY) tools = [{"google_search": {}}] llm_with_grounding = llm.bind_tools(tools) local_cache = {} def parse_subtitle_content(subtitle_content): try: root = ET.fromstring(subtitle_content) transcript = [] for elem in root.findall('text'): start = float(elem.attrib['start']) dur = float(elem.attrib.get('dur', 0)) text = elem.text or '' transcript.append({ 'start': start, 'duration': dur, 'text': text.replace('\n', ' ') }) return transcript except Exception as e: print(f"Error parsing subtitle content: {e}") return [] # Most reliable method to extract YouTube video transcripts when running locally. # Transcripts are almost always retrievable if available. # Note: This method may not work reliably on remote servers, as YouTube often blocks data center IP addresses. # def fetch_transcript(video_id, preferred_langs=['en-orig', 'en']): # youtube_url = f"https://www.youtube.com/watch?v={video_id}" # # yt-dlp configuration to only extract subtitles, not download video # ydl_opts = { # 'skip_download': True, # 'writesubtitles': True, # 'writeautomaticsub': True, # 'quiet': True, # 'no_warnings': True, # 'log_warnings': False, # 'format': 'bestaudio/best', # } # try: # # Use yt-dlp to extract video metadata and available subtitles # with yt_dlp.YoutubeDL(ydl_opts) as ydl: # info_dict = ydl.extract_info(youtube_url, download=False) # # Collect all caption tracks from both manual and auto subtitles # all_caption_tracks = {} # # Add manually provided subtitles to the caption track list # if 'subtitles' in info_dict: # for lang, tracks in info_dict['subtitles'].items(): # if lang not in all_caption_tracks: # all_caption_tracks[lang] = [] # all_caption_tracks[lang].extend(tracks) # # Add auto-generated captions to the caption track list # if 'automatic_captions' in info_dict: # for lang, tracks in info_dict['automatic_captions'].items(): # if lang not in all_caption_tracks: # all_caption_tracks[lang] = [] # all_caption_tracks[lang].extend(tracks) # best_transcript_url = None # best_transcript_ext = None # # Helper: Return the first track with a non-JSON file format # def find_first_non_json_track(tracks): # for track in tracks: # ext = track.get('ext') # if ext not in ['json', 'json3']: # return track # return None # No suitable non-json track found # # Step 1: Try to find a track in preferred languages # for p_lang in preferred_langs: # if p_lang in all_caption_tracks: # best_track = find_first_non_json_track(all_caption_tracks[p_lang]) # if best_track: # best_transcript_url = best_track['url'] # best_transcript_ext = best_track['ext'] # print(f"Found preferred language '{p_lang}' track with extension '{best_transcript_ext}'.") # break # Stop searching once we find a match # if best_transcript_url: # break # Already found a usable track # # Step 2: If no match in preferred languages, fallback to any other available language # if not best_transcript_url: # for lang, tracks in all_caption_tracks.items(): # if 'live_chat' in lang or lang in preferred_langs: # continue # best_track = find_first_non_json_track(tracks) # if best_track: # best_transcript_url = best_track['url'] # best_transcript_ext = best_track['ext'] # print(f"Found any language '{lang}' track with extension '{best_transcript_ext}'.") # break # # If a valid transcript URL and extension are found, fetch and parse # if best_transcript_url and best_transcript_ext: # try: # print(f"Attempting to download transcript from: {best_transcript_url}") # response = requests.get(best_transcript_url, stream=True) # response.raise_for_status() # subtitle_content = response.text # Raw subtitle XML # return parse_subtitle_content(subtitle_content) # Convert XML to structured transcript # except requests.exceptions.RequestException as e: # print(f"Error fetching subtitle content from URL {best_transcript_url}: {e}") # return [] # else: # print(f"No suitable non-json/json3 transcript URL found for {youtube_url} after checking all options.") # all_langs_found = set(all_caption_tracks.keys()) # if all_langs_found: # print(f"Available caption languages found in info_dict (including potentially json/live_chat): {', '.join(all_langs_found)}") # else: # print("No caption tracks found at all in the info_dict.") # return [] # except yt_dlp.utils.DownloadError as e: # print(f"Error with yt-dlp (e.g., video not found, geo-restricted): {e}") # return [] # except Exception as e: # print(f"An unexpected error occurred during yt-dlp extraction: {e}") # return [] # For Deployment purposes: def fetch_transcript(video_id: str): headers = { "User-Agent": "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Mobile Safari/537.36", "Accept": "application/json", "Referer": REF, "Cookie": COOKIES } url = f"{transcript_api}&video_id={video_id}" try: response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() data = response.json() except requests.RequestException as e: print(f"Request error while fetching transcript: {e}") return [] except ValueError: print("Failed to decode JSON response.") return [] if data.get("code") != 100000 or data.get("message") != "success": print(f"No transcript or API error for video {video_id}: {data.get('message')}") return [] try: print('Transcript fetched successfully!') transcripts = data.get("data", {}).get("transcripts", {}) if not transcripts: return [] language_code = data.get("data", {}).get("language_code", []) if not language_code: return [] lang_code = language_code[0]['code'] return transcripts[lang_code].get("default", []) except Exception as e: print(f'Unexpected error while parsing transcript: {e}') return [] def get_transcript(db: Session, video_id: str) -> str: """Fetch transcript from DB cache or from source, then cache it.""" cached_video = crud.get_or_create_video_store(db, video_id) if cached_video and cached_video.transcript: print(f"Using cached transcript for video ID: {video_id}") return cached_video.transcript print(f"Fetching transcript from source for video ID: {video_id}") try: captions = fetch_transcript(video_id) if not captions: raise ValueError(f"No transcript available for video ID: {video_id}") formatted_lines = [] for snippet in captions: timestamp = f'({snippet['start']})' formatted_line = f"{timestamp} {snippet['text']}" formatted_lines.append(formatted_line) full_transcript = " ".join(formatted_lines) crud.update_transcript(db, video_id=video_id, transcript=full_transcript) return full_transcript except ValueError as ve: # Re-raise the ValueError indicating no transcript raise ve except Exception as e: # Catch any other unexpected errors during transcript fetching/processing print(f"An unexpected error occurred while fetching/processing transcript for {video_id}: {e}") raise RuntimeError(f"Failed to retrieve transcript due to an internal issue: {str(e)}") def chunk_transcript(transcript, chunk_size=1000, overlap=200): """Split transcript into overlapping chunks for better context preservation.""" if not transcript: return [] words = transcript.split() chunks = [] for i in range(0, len(words), chunk_size - overlap): chunk_words = words[i:i + chunk_size] chunk_text = ' '.join(chunk_words) chunks.append(Document(page_content=chunk_text)) # Break if we've reached the end if i + chunk_size >= len(words): break return chunks summary_prompt = PromptTemplate( input_variables=["text", "title", "channel_name"], template=""" IMPORTANT: Keep your entire response under 1000 tokens. Be concise. Focus on essential insights. Avoid over-explaining or repeating. You are a helpful and critical-thinking assistant tasked with analyzing and summarizing YouTube video content. You are summarizing a video titled: "{title}", published by the channel: "{channel_name}". The input is a transcript of the video formatted as a continuous string. Each sentence is preceded by a timestamp in the format [hh:mm:ss], followed by the spoken text. The entire transcript is space-separated without line breaks. Example: (00:00:00) So, I've been coding since 2012, and I (00:00:03) really wish someone told me these 10 (00:00:07) things before I wasted years figuring them out... Your task is to: 1. **Summarize**: Provide a clear and concise summary of the video content, focusing on the main points, key takeaways, and any critical insights that help someone understand the video's purpose without watching it. 2. **Main Points Covered**: List the main points discussed in the video using bullet points. Include timestamps to indicate when each point is mentioned. 3. **Fact Check**: Evaluate the factual accuracy of claims made by the speaker. For each claim that makes a factual assertion (e.g., dates, statistics, scientific or historical facts), verify if it is true or potentially misleading. Flag inaccuracies or unsupported claims with a note, and provide a short explanation or correction when appropriate. Return your output in this format: **Summary**: ... **Main Points Covered**: ... **Fact Check Notes**: - [hh:mm:ss] Claim: "..." → ✅ True / ❌ False - Explanation: ... **Transcript**: {text} **Output**: """ ) def get_video_qa_prompt(summary): """Create QA prompt template with video summary context and assertive reasoning.""" qa_prompt = PromptTemplate( input_variables=["context", "question"], template=f""" You are an expert analyst evaluating the content of a YouTube video. Here is a summary of the video: {summary} Here are the most relevant transcript segments: {{context}} You will be asked questions about the video content, including factual accuracy, logic, reasoning, and opinions expressed by the speaker. Your response should: - Be **honest, direct, and grounded** in general knowledge, logic, and factual correctness. - **Do not avoid critical analysis** of opinion-based or controversial takes—provide a clear and well-reasoned perspective based on known facts or expert consensus. - When possible, reference specific timestamps from the transcript. - Avoid vague disclaimers like "this is subjective" or "it depends" unless no other conclusion is possible. - If the speaker's take is incorrect, misleading, or lacks evidence, **state that clearly and explain why**. - If the speaker makes a reasonable or accurate claim, acknowledge that as well. Question: {{question}} Answer: """ ) return qa_prompt def ensure_processed_transcript(db: Session, video_id: str): """Ensure transcript chunks are processed and cached for a video.""" if video_id not in local_cache: local_cache[video_id] = {} # Check if processed chunks are already cached if "TranscriptChunks" in local_cache[video_id]: return local_cache[video_id]["TranscriptChunks"] try: transcript = get_transcript(db, video_id) chunks = chunk_transcript(transcript) if not chunks: raise ValueError("No valid transcript chunks could be created for the video.") local_cache[video_id]["TranscriptChunks"] = chunks return chunks except ValueError as ve: raise ve except Exception as e: print(f"An unexpected error occurred during transcript chunk processing for {video_id}: {e}") raise RuntimeError(f"Failed to process transcript chunks due to an internal issue: {str(e)}") async def summarize_video(db: Session, video_id: str, title: str='', channel_name: str=''): """Summarize video transcript, using DB for caching.""" cached_video = crud.get_or_create_video_store(db, video_id) if cached_video and cached_video.video_summary: print(f"Using cached video summary for video ID: {video_id}") return cached_video.video_summary try: transcript = get_transcript(db, video_id) if not transcript: raise ValueError("Transcript not found, cannot summarize.") transcript_docs = Document(page_content=transcript) summary_chain = load_summarize_chain(llm=llm_with_grounding, chain_type="stuff", prompt=summary_prompt) response = summary_chain.invoke({ "input_documents": [transcript_docs], "title": title, "channel_name": channel_name }) summary = response['output_text'].strip() if not summary: raise ValueError("LLM returned an empty summary for the video.") # Cache the summary in the database crud.update_video_summary(db, video_id=video_id, summary=summary) return summary except ValueError as ve: # Re-raise ValueErrors that indicate business logic failures (like no transcript or empty summary) raise ve except Exception as e: # Catch any other unexpected errors during the summarization process (e.g., LLM issues) print(f"Error creating video summary for {video_id}: {e}") raise RuntimeError(f"Error creating summary: {str(e)}") async def answer_video_question(db: Session, video_id: str, question: str): try: """Answer questions about video content using transcript and summary from DB.""" summary = await summarize_video(db, video_id) chunks = ensure_processed_transcript(db, video_id) if not chunks: raise ValueError("No transcript chunks available to answer the question after processing.") # Check if vectorstore is already cached if "Vectorstore" not in local_cache.get(video_id, {}): print(f"Creating and caching vectorstore for video ID: {video_id}") try: vectorstore = FAISS.from_documents(chunks, embeddings) local_cache.setdefault(video_id, {})["Vectorstore"] = vectorstore except Exception as e: print(f"Error creating vectorstore for video ID {video_id}: {e}") raise RuntimeError(f"Error creating vectorstore: {str(e)}") else: print(f"Using cached vectorstore for video ID: {video_id}") vectorstore = local_cache[video_id]["Vectorstore"] qa_prompt = get_video_qa_prompt(summary) retriever = vectorstore.as_retriever() qa_chain = RetrievalQA.from_chain_type( llm=llm_with_grounding, retriever=retriever, chain_type="stuff", chain_type_kwargs={"prompt": qa_prompt}, ) answer = qa_chain.invoke(question) return answer['result'] except (ValueError, RuntimeError) as e: # Re-raise specific exceptions from sub-functions (summarize_video, ensure_processed_transcript, vectorstore creation) raise e except Exception as e: # Catch any other unexpected errors during the QA process print(f"Error answering video question for {video_id} with question '{question}': {e}") # Transform general exceptions into a RuntimeError for the API layer raise RuntimeError(f"Error processing question: {str(e)}")