Youtube-AI-Analyzer / app /services /video_analysis.py
AmaanP314's picture
fix req
9b7e0d3
from langchain_google_genai import GoogleGenerativeAIEmbeddings, ChatGoogleGenerativeAI
from langchain_community.vectorstores import FAISS
from langchain.schema import Document
from langchain.chains import RetrievalQA
from langchain.chains.summarize import load_summarize_chain
from langchain.prompts import PromptTemplate
import requests
import xml.etree.ElementTree as ET
import os
from sqlalchemy.orm import Session
from ..db import crud
GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY')
COOKIES = os.getenv('COOKIES')
REF = os.getenv('REFERER')
transcript_api = os.getenv('TRANSCRIPT_API_URL')
llm = ChatGoogleGenerativeAI(model="gemini-flash-lite-latest", google_api_key=GOOGLE_API_KEY)
embeddings = GoogleGenerativeAIEmbeddings(model="gemini-embedding-001", google_api_key=GOOGLE_API_KEY)
tools = [{"google_search": {}}]
llm_with_grounding = llm.bind_tools(tools)
local_cache = {}
def parse_subtitle_content(subtitle_content):
try:
root = ET.fromstring(subtitle_content)
transcript = []
for elem in root.findall('text'):
start = float(elem.attrib['start'])
dur = float(elem.attrib.get('dur', 0))
text = elem.text or ''
transcript.append({
'start': start,
'duration': dur,
'text': text.replace('\n', ' ')
})
return transcript
except Exception as e:
print(f"Error parsing subtitle content: {e}")
return []
# Most reliable method to extract YouTube video transcripts when running locally.
# Transcripts are almost always retrievable if available.
# Note: This method may not work reliably on remote servers, as YouTube often blocks data center IP addresses.
# def fetch_transcript(video_id, preferred_langs=['en-orig', 'en']):
# youtube_url = f"https://www.youtube.com/watch?v={video_id}"
# # yt-dlp configuration to only extract subtitles, not download video
# ydl_opts = {
# 'skip_download': True,
# 'writesubtitles': True,
# 'writeautomaticsub': True,
# 'quiet': True,
# 'no_warnings': True,
# 'log_warnings': False,
# 'format': 'bestaudio/best',
# }
# try:
# # Use yt-dlp to extract video metadata and available subtitles
# with yt_dlp.YoutubeDL(ydl_opts) as ydl:
# info_dict = ydl.extract_info(youtube_url, download=False)
# # Collect all caption tracks from both manual and auto subtitles
# all_caption_tracks = {}
# # Add manually provided subtitles to the caption track list
# if 'subtitles' in info_dict:
# for lang, tracks in info_dict['subtitles'].items():
# if lang not in all_caption_tracks:
# all_caption_tracks[lang] = []
# all_caption_tracks[lang].extend(tracks)
# # Add auto-generated captions to the caption track list
# if 'automatic_captions' in info_dict:
# for lang, tracks in info_dict['automatic_captions'].items():
# if lang not in all_caption_tracks:
# all_caption_tracks[lang] = []
# all_caption_tracks[lang].extend(tracks)
# best_transcript_url = None
# best_transcript_ext = None
# # Helper: Return the first track with a non-JSON file format
# def find_first_non_json_track(tracks):
# for track in tracks:
# ext = track.get('ext')
# if ext not in ['json', 'json3']:
# return track
# return None # No suitable non-json track found
# # Step 1: Try to find a track in preferred languages
# for p_lang in preferred_langs:
# if p_lang in all_caption_tracks:
# best_track = find_first_non_json_track(all_caption_tracks[p_lang])
# if best_track:
# best_transcript_url = best_track['url']
# best_transcript_ext = best_track['ext']
# print(f"Found preferred language '{p_lang}' track with extension '{best_transcript_ext}'.")
# break # Stop searching once we find a match
# if best_transcript_url:
# break # Already found a usable track
# # Step 2: If no match in preferred languages, fallback to any other available language
# if not best_transcript_url:
# for lang, tracks in all_caption_tracks.items():
# if 'live_chat' in lang or lang in preferred_langs:
# continue
# best_track = find_first_non_json_track(tracks)
# if best_track:
# best_transcript_url = best_track['url']
# best_transcript_ext = best_track['ext']
# print(f"Found any language '{lang}' track with extension '{best_transcript_ext}'.")
# break
# # If a valid transcript URL and extension are found, fetch and parse
# if best_transcript_url and best_transcript_ext:
# try:
# print(f"Attempting to download transcript from: {best_transcript_url}")
# response = requests.get(best_transcript_url, stream=True)
# response.raise_for_status()
# subtitle_content = response.text # Raw subtitle XML
# return parse_subtitle_content(subtitle_content) # Convert XML to structured transcript
# except requests.exceptions.RequestException as e:
# print(f"Error fetching subtitle content from URL {best_transcript_url}: {e}")
# return []
# else:
# print(f"No suitable non-json/json3 transcript URL found for {youtube_url} after checking all options.")
# all_langs_found = set(all_caption_tracks.keys())
# if all_langs_found:
# print(f"Available caption languages found in info_dict (including potentially json/live_chat): {', '.join(all_langs_found)}")
# else:
# print("No caption tracks found at all in the info_dict.")
# return []
# except yt_dlp.utils.DownloadError as e:
# print(f"Error with yt-dlp (e.g., video not found, geo-restricted): {e}")
# return []
# except Exception as e:
# print(f"An unexpected error occurred during yt-dlp extraction: {e}")
# return []
# For Deployment purposes:
def fetch_transcript(video_id: str):
headers = {
"User-Agent": "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Mobile Safari/537.36",
"Accept": "application/json",
"Referer": REF,
"Cookie": COOKIES
}
url = f"{transcript_api}&video_id={video_id}"
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
except requests.RequestException as e:
print(f"Request error while fetching transcript: {e}")
return []
except ValueError:
print("Failed to decode JSON response.")
return []
if data.get("code") != 100000 or data.get("message") != "success":
print(f"No transcript or API error for video {video_id}: {data.get('message')}")
return []
try:
print('Transcript fetched successfully!')
transcripts = data.get("data", {}).get("transcripts", {})
if not transcripts:
return []
language_code = data.get("data", {}).get("language_code", [])
if not language_code:
return []
lang_code = language_code[0]['code']
return transcripts[lang_code].get("default", [])
except Exception as e:
print(f'Unexpected error while parsing transcript: {e}')
return []
def get_transcript(db: Session, video_id: str) -> str:
"""Fetch transcript from DB cache or from source, then cache it."""
cached_video = crud.get_or_create_video_store(db, video_id)
if cached_video and cached_video.transcript:
print(f"Using cached transcript for video ID: {video_id}")
return cached_video.transcript
print(f"Fetching transcript from source for video ID: {video_id}")
try:
captions = fetch_transcript(video_id)
if not captions:
raise ValueError(f"No transcript available for video ID: {video_id}")
formatted_lines = []
for snippet in captions:
timestamp = f'({snippet['start']})'
formatted_line = f"{timestamp} {snippet['text']}"
formatted_lines.append(formatted_line)
full_transcript = " ".join(formatted_lines)
crud.update_transcript(db, video_id=video_id, transcript=full_transcript)
return full_transcript
except ValueError as ve:
# Re-raise the ValueError indicating no transcript
raise ve
except Exception as e:
# Catch any other unexpected errors during transcript fetching/processing
print(f"An unexpected error occurred while fetching/processing transcript for {video_id}: {e}")
raise RuntimeError(f"Failed to retrieve transcript due to an internal issue: {str(e)}")
def chunk_transcript(transcript, chunk_size=1000, overlap=200):
"""Split transcript into overlapping chunks for better context preservation."""
if not transcript:
return []
words = transcript.split()
chunks = []
for i in range(0, len(words), chunk_size - overlap):
chunk_words = words[i:i + chunk_size]
chunk_text = ' '.join(chunk_words)
chunks.append(Document(page_content=chunk_text))
# Break if we've reached the end
if i + chunk_size >= len(words):
break
return chunks
summary_prompt = PromptTemplate(
input_variables=["text", "title", "channel_name"],
template="""
IMPORTANT: Keep your entire response under 1000 tokens. Be concise. Focus on essential insights. Avoid over-explaining or repeating.
You are a helpful and critical-thinking assistant tasked with analyzing and summarizing YouTube video content.
You are summarizing a video titled: "{title}", published by the channel: "{channel_name}".
The input is a transcript of the video formatted as a continuous string. Each sentence is preceded by a timestamp in the format [hh:mm:ss], followed by the spoken text. The entire transcript is space-separated without line breaks.
Example:
(00:00:00) So, I've been coding since 2012, and I (00:00:03) really wish someone told me these 10 (00:00:07) things before I wasted years figuring them out...
Your task is to:
1. **Summarize**: Provide a clear and concise summary of the video content, focusing on the main points, key takeaways, and any critical insights that help someone understand the video's purpose without watching it.
2. **Main Points Covered**: List the main points discussed in the video using bullet points. Include timestamps to indicate when each point is mentioned.
3. **Fact Check**: Evaluate the factual accuracy of claims made by the speaker. For each claim that makes a factual assertion (e.g., dates, statistics, scientific or historical facts), verify if it is true or potentially misleading. Flag inaccuracies or unsupported claims with a note, and provide a short explanation or correction when appropriate.
Return your output in this format:
**Summary**: ...
**Main Points Covered**: ...
**Fact Check Notes**:
- [hh:mm:ss] Claim: "..." → ✅ True / ❌ False
- Explanation: ...
**Transcript**:
{text}
**Output**:
"""
)
def get_video_qa_prompt(summary):
"""Create QA prompt template with video summary context and assertive reasoning."""
qa_prompt = PromptTemplate(
input_variables=["context", "question"],
template=f"""
You are an expert analyst evaluating the content of a YouTube video.
Here is a summary of the video:
{summary}
Here are the most relevant transcript segments:
{{context}}
You will be asked questions about the video content, including factual accuracy, logic, reasoning, and opinions expressed by the speaker.
Your response should:
- Be **honest, direct, and grounded** in general knowledge, logic, and factual correctness.
- **Do not avoid critical analysis** of opinion-based or controversial takes—provide a clear and well-reasoned perspective based on known facts or expert consensus.
- When possible, reference specific timestamps from the transcript.
- Avoid vague disclaimers like "this is subjective" or "it depends" unless no other conclusion is possible.
- If the speaker's take is incorrect, misleading, or lacks evidence, **state that clearly and explain why**.
- If the speaker makes a reasonable or accurate claim, acknowledge that as well.
Question: {{question}}
Answer:
"""
)
return qa_prompt
def ensure_processed_transcript(db: Session, video_id: str):
"""Ensure transcript chunks are processed and cached for a video."""
if video_id not in local_cache:
local_cache[video_id] = {}
# Check if processed chunks are already cached
if "TranscriptChunks" in local_cache[video_id]:
return local_cache[video_id]["TranscriptChunks"]
try:
transcript = get_transcript(db, video_id)
chunks = chunk_transcript(transcript)
if not chunks:
raise ValueError("No valid transcript chunks could be created for the video.")
local_cache[video_id]["TranscriptChunks"] = chunks
return chunks
except ValueError as ve:
raise ve
except Exception as e:
print(f"An unexpected error occurred during transcript chunk processing for {video_id}: {e}")
raise RuntimeError(f"Failed to process transcript chunks due to an internal issue: {str(e)}")
async def summarize_video(db: Session, video_id: str, title: str='', channel_name: str=''):
"""Summarize video transcript, using DB for caching."""
cached_video = crud.get_or_create_video_store(db, video_id)
if cached_video and cached_video.video_summary:
print(f"Using cached video summary for video ID: {video_id}")
return cached_video.video_summary
try:
transcript = get_transcript(db, video_id)
if not transcript:
raise ValueError("Transcript not found, cannot summarize.")
transcript_docs = Document(page_content=transcript)
summary_chain = load_summarize_chain(llm=llm_with_grounding, chain_type="stuff", prompt=summary_prompt)
response = summary_chain.invoke({
"input_documents": [transcript_docs],
"title": title,
"channel_name": channel_name
})
summary = response['output_text'].strip()
if not summary:
raise ValueError("LLM returned an empty summary for the video.")
# Cache the summary in the database
crud.update_video_summary(db, video_id=video_id, summary=summary)
return summary
except ValueError as ve:
# Re-raise ValueErrors that indicate business logic failures (like no transcript or empty summary)
raise ve
except Exception as e:
# Catch any other unexpected errors during the summarization process (e.g., LLM issues)
print(f"Error creating video summary for {video_id}: {e}")
raise RuntimeError(f"Error creating summary: {str(e)}")
async def answer_video_question(db: Session, video_id: str, question: str):
try:
"""Answer questions about video content using transcript and summary from DB."""
summary = await summarize_video(db, video_id)
chunks = ensure_processed_transcript(db, video_id)
if not chunks:
raise ValueError("No transcript chunks available to answer the question after processing.")
# Check if vectorstore is already cached
if "Vectorstore" not in local_cache.get(video_id, {}):
print(f"Creating and caching vectorstore for video ID: {video_id}")
try:
vectorstore = FAISS.from_documents(chunks, embeddings)
local_cache.setdefault(video_id, {})["Vectorstore"] = vectorstore
except Exception as e:
print(f"Error creating vectorstore for video ID {video_id}: {e}")
raise RuntimeError(f"Error creating vectorstore: {str(e)}")
else:
print(f"Using cached vectorstore for video ID: {video_id}")
vectorstore = local_cache[video_id]["Vectorstore"]
qa_prompt = get_video_qa_prompt(summary)
retriever = vectorstore.as_retriever()
qa_chain = RetrievalQA.from_chain_type(
llm=llm_with_grounding,
retriever=retriever,
chain_type="stuff",
chain_type_kwargs={"prompt": qa_prompt},
)
answer = qa_chain.invoke(question)
return answer['result']
except (ValueError, RuntimeError) as e:
# Re-raise specific exceptions from sub-functions (summarize_video, ensure_processed_transcript, vectorstore creation)
raise e
except Exception as e:
# Catch any other unexpected errors during the QA process
print(f"Error answering video question for {video_id} with question '{question}': {e}")
# Transform general exceptions into a RuntimeError for the API layer
raise RuntimeError(f"Error processing question: {str(e)}")