Final_Assignment_Template / tools /youtube_video_tool.py
FD900's picture
Update tools/youtube_video_tool.py
cdfe973 verified
# tools/youtube_video_tool.py
import base64
import os
import re
import requests
import subprocess
import tempfile
from io import BytesIO
from tools.base_tool import BaseTool
import av
import yt_dlp
from tools.speech_recognition_tool import SpeechRecognitionTool
class YouTubeVideoTool(BaseTool):
name = 'youtube_video'
description = 'Process a YouTube video and answer questions based on content.'
def __init__(
self,
speech_tool: SpeechRecognitionTool = None,
quality: int = 360,
frame_interval: float = 2.0,
chunk_duration: float = 2.0,
debug: bool = False,
):
self.speech_tool = speech_tool
self.quality = quality
self.frame_interval = frame_interval
self.chunk_duration = chunk_duration
self.debug = debug
def forward(self, url: str, query: str) -> str:
video = self._download_video_info(url)
captions = self._get_captions(video)
title, description = video['title'], video['description']
chunks = self._split_captions(captions)
answer = ""
for chunk in chunks:
prompt = self._build_prompt(title, description, chunk, query, answer)
response = self._mock_llm(prompt) # replace with real call to your LLM
answer = response.strip()
return answer
def _download_video_info(self, url: str):
opts = {
'quiet': True,
'skip_download': True,
'format': f'bestvideo[height<={self.quality}]+bestaudio/best',
}
with yt_dlp.YoutubeDL(opts) as ydl:
return ydl.extract_info(url, download=False)
def _get_captions(self, info: dict):
lang = 'en'
subs = info.get('subtitles', {}).get(lang) or info.get('automatic_captions', {}).get(lang)
if subs:
sub = next((s for s in subs if s['ext'] == 'vtt'), None)
if sub:
text = requests.get(sub['url']).text
return self._parse_vtt(text)
# fallback to Whisper-based transcription
if self.speech_tool:
audio_url = self._select_audio_format(info['formats'])
audio = self._download_audio(audio_url)
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
f.write(audio.read())
f.flush()
transcription = self.speech_tool.forward(audio=f.name, with_time_markers=True)
return self._parse_whisper_transcription(transcription)
return []
def _select_audio_format(self, formats):
audio_only = [f for f in formats if f.get('vcodec') == 'none']
audio_only.sort(key=lambda f: f.get('abr', 0), reverse=True)
return audio_only[0]['url']
def _download_audio(self, audio_url: str) -> BytesIO:
cmd = ["ffmpeg", "-i", audio_url, "-f", "wav", "-ac", "1", "-ar", "16000", "-"]
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
return BytesIO(proc.stdout)
def _parse_vtt(self, vtt_data: str):
segments = []
entries = re.findall(r'(\d+:\d+:\d+\.\d+ --> \d+:\d+:\d+\.\d+)(.*?)\n(?=\n|\d)', vtt_data, re.DOTALL)
for (time_range, text) in entries:
clean_text = re.sub(r'<.*?>', '', text).strip().replace("\n", " ")
segments.append({"text": clean_text})
return segments
def _parse_whisper_transcription(self, text: str):
pattern = re.compile(r'\[(\d+\.\d+)]\n(.+?)\n\[(\d+\.\d+)]')
return [{"text": match[1]} for match in pattern.findall(text)]
def _split_captions(self, captions):
# Simple fixed-length chunking
return [
{"text": " ".join([c["text"] for c in captions[i:i+3]])}
for i in range(0, len(captions), 3)
]
def _build_prompt(self, title, desc, chunk, query, prev):
base = f"""
Video Title: {title}
Video Description: {desc}
Transcript:
{chunk['text']}
"""
if prev:
base += f"\nPrevious answer: {prev}\n"
base += f"Question: {query}"
return base.strip()
def _mock_llm(self, prompt: str):
# Replace this with call to your real LLM
return "I need to keep watching."