Spaces:
Sleeping
Sleeping
| import os | |
| from langchain.document_loaders.generic import GenericLoader | |
| from langchain.document_loaders.parsers import OpenAIWhisperParser | |
| from langchain.document_loaders.blob_loaders.youtube_audio import YoutubeAudioLoader | |
| from langchain.schema import Document | |
| from smolagents import tool | |
| from yt_dlp import YoutubeDL | |
| from PIL import Image | |
| import cv2 | |
| import numpy as np | |
| from smolagents.agents import ActionStep | |
| from smolagents import CodeAgent | |
| def get_video_frames(video_path:str,task_id:str)->list[str]: | |
| vidcap = cv2.VideoCapture(video_path) | |
| total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| fps = vidcap.get(cv2.CAP_PROP_FPS) | |
| frames = [] | |
| frame_indices = np.linspace(0, total_frames - 1, 7, dtype=int) | |
| for i in frame_indices: | |
| vidcap.set(cv2.CAP_PROP_POS_FRAMES, i) | |
| success, image = vidcap.read() | |
| if success: | |
| image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # Convert from BGR to RGB | |
| pil_image = Image.fromarray(image) | |
| timestamp = round(i / fps, 2) | |
| frames.append((pil_image, timestamp)) | |
| vidcap.release() | |
| observations = [] | |
| for frame in frames: | |
| image, timestamp = frame | |
| image.save(f"attachments/frame_{timestamp}.png", format='PNG') | |
| observations.append({ | |
| "image": f"attachments/{task_id}_frame_{timestamp}.png", | |
| "timestamp": timestamp | |
| }) | |
| return observations | |
| def parse_youtube_video(url:str,task_id:str,save_dir:str="attachments")->Document: | |
| """Parse a YouTube video and return its transcript as a Document. | |
| Args: | |
| url (str): The URL of the YouTube video. | |
| task_id (str): The task ID to save the transcript. | |
| save_dir (str): The directory to save the transcript file. Defaults to "attachments". | |
| Returns: | |
| Document: The parsed transcript of the YouTube video.""" | |
| loader = YoutubeAudioLoader([url],save_dir) | |
| # check that the file exists at transcript_dir and load it | |
| if save_dir != None: | |
| if os.path.isfile(save_dir+"/"+task_id+".json"): | |
| print("trying to load transcript from file...") | |
| document = GenericLoader.from_filesystem(save_dir+"/"+task_id+".json").load() | |
| else: | |
| parser = OpenAIWhisperParser() | |
| document = GenericLoader(loader, parser).load() | |
| return document | |
| def download_youtube_video(url:str,task_id:str,save_dir:str="attachments")->list[str]: | |
| """Returns a list of framws of a YouTube video. | |
| Args: | |
| url (str): The URL of the YouTube video. | |
| task_id (str): The task ID to save the transcript. | |
| save_dir (str): The directory to save the downloaded video. Defaults to "attachments". | |
| Returns: | |
| str: The path to the downloaded video file.""" | |
| print(f"Downloading audio from YouTube: {url}") | |
| #output_path = generate_unique_filename(".wav") | |
| ydl_opts = { | |
| 'format': 'bestvideo[ext=mp4][height<=480]/bestvideo[ext=mp4]/bestvideo', | |
| 'outtmpl': os.path.join(save_dir, task_id+'.%(ext)s'), | |
| # 'postprocessors': [{ | |
| # 'key': 'FFmpegVideoConvertor', | |
| # 'preferredformat': 'mp4', | |
| # }], | |
| 'progress_hooks': [lambda d: print(d['status'])], | |
| 'ignoreerrors': True, | |
| 'no_warnings': False, | |
| 'log_verbosity': 'quiet', | |
| } | |
| try: | |
| with YoutubeDL(ydl_opts) as ydl: | |
| info_dict = ydl.extract_info(url, download=True) | |
| video_title = info_dict.get('title', 'video') | |
| print(f"Successfully downloaded '{video_title}' as low-quality MP4 (video-only) to '{save_dir}'") | |
| except Exception as e: | |
| print(f"An error occurred: {e}") | |
| # get the video frames: | |
| observations = [] | |
| observations = get_video_frames(os.path.join(save_dir, task_id+'.mp4'),task_id) | |
| return observations | |
| def load_images(step_log: ActionStep, agent: CodeAgent) -> None: | |
| current_step = step_log.step_number | |
| #for step_logs in agent.logs: # Remove previous screenshots from logs for lean processing | |
| # if isinstance(step_log, ActionStep) and step_log.step_number <= current_step - 2: | |
| # step_logs.observations_images = None | |
| print(step_log.model_output) | |
| # if isinstance(step_logs, | |
| # image = Image.open(BytesIO(png_bytes)) | |
| # print(f"Captured a browser screenshot: {image.size} pixels") | |
| # step_log.observations_images = [image.copy()] # Create a copy to ensure it persists, important! | |
| # # Update observations with current URL | |
| # url_info = f"Current url: {driver.current_url}" | |
| # step_log.observations = url_info if step_logs.observations is None else step_log.observations + "\n" + url_info | |
| return | |
| if __name__ == "__main__": | |
| url = "https://www.youtube.com/watch?v=1htKBjuUWec" | |
| # https://www.youtube.com/watch?v=L1vXCYZAYYM | |
| task_id = "test_task" | |
| save_dir = "attachments" | |
| # Test the YouTube video parsing | |
| download_youtube_video(url, task_id, save_dir) |