import os from langchain.document_loaders.generic import GenericLoader from langchain.document_loaders.parsers import OpenAIWhisperParser from langchain.document_loaders.blob_loaders.youtube_audio import YoutubeAudioLoader from langchain.schema import Document from smolagents import tool from yt_dlp import YoutubeDL from PIL import Image import cv2 import numpy as np from smolagents.agents import ActionStep from smolagents import CodeAgent def get_video_frames(video_path:str,task_id:str)->list[str]: vidcap = cv2.VideoCapture(video_path) total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = vidcap.get(cv2.CAP_PROP_FPS) frames = [] frame_indices = np.linspace(0, total_frames - 1, 7, dtype=int) for i in frame_indices: vidcap.set(cv2.CAP_PROP_POS_FRAMES, i) success, image = vidcap.read() if success: image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # Convert from BGR to RGB pil_image = Image.fromarray(image) timestamp = round(i / fps, 2) frames.append((pil_image, timestamp)) vidcap.release() observations = [] for frame in frames: image, timestamp = frame image.save(f"attachments/frame_{timestamp}.png", format='PNG') observations.append({ "image": f"attachments/{task_id}_frame_{timestamp}.png", "timestamp": timestamp }) return observations @tool def parse_youtube_video(url:str,task_id:str,save_dir:str="attachments")->Document: """Parse a YouTube video and return its transcript as a Document. Args: url (str): The URL of the YouTube video. task_id (str): The task ID to save the transcript. save_dir (str): The directory to save the transcript file. Defaults to "attachments". Returns: Document: The parsed transcript of the YouTube video.""" loader = YoutubeAudioLoader([url],save_dir) # check that the file exists at transcript_dir and load it if save_dir != None: if os.path.isfile(save_dir+"/"+task_id+".json"): print("trying to load transcript from file...") document = GenericLoader.from_filesystem(save_dir+"/"+task_id+".json").load() else: parser = OpenAIWhisperParser() document = GenericLoader(loader, parser).load() return document @tool def download_youtube_video(url:str,task_id:str,save_dir:str="attachments")->list[str]: """Returns a list of framws of a YouTube video. Args: url (str): The URL of the YouTube video. task_id (str): The task ID to save the transcript. save_dir (str): The directory to save the downloaded video. Defaults to "attachments". Returns: str: The path to the downloaded video file.""" print(f"Downloading audio from YouTube: {url}") #output_path = generate_unique_filename(".wav") ydl_opts = { 'format': 'bestvideo[ext=mp4][height<=480]/bestvideo[ext=mp4]/bestvideo', 'outtmpl': os.path.join(save_dir, task_id+'.%(ext)s'), # 'postprocessors': [{ # 'key': 'FFmpegVideoConvertor', # 'preferredformat': 'mp4', # }], 'progress_hooks': [lambda d: print(d['status'])], 'ignoreerrors': True, 'no_warnings': False, 'log_verbosity': 'quiet', } try: with YoutubeDL(ydl_opts) as ydl: info_dict = ydl.extract_info(url, download=True) video_title = info_dict.get('title', 'video') print(f"Successfully downloaded '{video_title}' as low-quality MP4 (video-only) to '{save_dir}'") except Exception as e: print(f"An error occurred: {e}") # get the video frames: observations = [] observations = get_video_frames(os.path.join(save_dir, task_id+'.mp4'),task_id) return observations def load_images(step_log: ActionStep, agent: CodeAgent) -> None: current_step = step_log.step_number #for step_logs in agent.logs: # Remove previous screenshots from logs for lean processing # if isinstance(step_log, ActionStep) and step_log.step_number <= current_step - 2: # step_logs.observations_images = None print(step_log.model_output) # if isinstance(step_logs, # image = Image.open(BytesIO(png_bytes)) # print(f"Captured a browser screenshot: {image.size} pixels") # step_log.observations_images = [image.copy()] # Create a copy to ensure it persists, important! # # Update observations with current URL # url_info = f"Current url: {driver.current_url}" # step_log.observations = url_info if step_logs.observations is None else step_log.observations + "\n" + url_info return if __name__ == "__main__": url = "https://www.youtube.com/watch?v=1htKBjuUWec" # https://www.youtube.com/watch?v=L1vXCYZAYYM task_id = "test_task" save_dir = "attachments" # Test the YouTube video parsing download_youtube_video(url, task_id, save_dir)