Gabandino commited on
Commit
9de0414
·
verified ·
1 Parent(s): 3a62410

Upload all tools for final submission and __init__.py file

Browse files

Upload all tools:
Chess tools
Classifier tool
Content retriever tool
Get attachments tool
Google search tools
YouTube video tool

Also, upload __init__.py file to ensure all tools can be referenced by other files within space.

tools/__init__.py ADDED
@@ -0,0 +1,19 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from .get_attachments_tool import GetAttachmentTool
2
+ from .google_search_tools import GoogleSearchTool, GoogleSiteSearchTool
3
+ from .content_retriever_tool import ContentRetrieverTool
4
+ from .speech_recognition_tool import SpeechRecognitionTool
5
+ from .youtube_video_tool import YouTubeVideoTool
6
+ from .classifier_tool import ClassifierTool
7
+ from .chess_tools import ImageToChessBoardFENTool, chess_engine_locator
8
+
9
+ __all__ = [
10
+ "GetAttachmentTool",
11
+ "GoogleSearchTool",
12
+ "GoogleSiteSearchTool",
13
+ "ContentRetrieverTool",
14
+ "SpeechRecognitionTool",
15
+ "YoutubeVideoTool",
16
+ "ClassifierTool",
17
+ "ImageToChessBoardFENTool",
18
+ "chess_engine_locator",
19
+ ]
tools/chess_tools.py ADDED
@@ -0,0 +1,123 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from smolagents import Tool, tool
2
+ from openai import OpenAI
3
+ import shutil
4
+
5
+
6
+ @tool
7
+ def chess_engine_locator() -> str | None:
8
+ """
9
+ Get the path to the chess engine binary. Can be used with chess.engine.SimpleEngine.popen_uci function from chess.engine Python module.
10
+ Returns:
11
+ str: Path to the chess engine.
12
+ """
13
+ path = shutil.which('stockfish')
14
+ return path if path else None
15
+
16
+ class ImageToChessBoardFENTool(Tool):
17
+ name = 'image_to_chess_board_fen'
18
+ description = '''Convert a chessboard image to board part of the FEN.'''
19
+ inputs = {
20
+ 'image_url': {
21
+ 'type': 'string',
22
+ 'description': 'Public URL of the image (preferred) or base64 encoded image in data URL format',
23
+ }
24
+ }
25
+ output_type = 'string'
26
+
27
+ def __init__(self, client: OpenAI | None = None, **kwargs):
28
+ self.client = client if client is not None else OpenAI()
29
+ super().__init__(**kwargs)
30
+
31
+ def attachment_for(self, task_id: str | None):
32
+ self.task_id = task_id
33
+
34
+ def forward(self, image_url: str) -> str:
35
+ """
36
+ Convert a chessboard image to board part of the FEN.
37
+ Args:
38
+ image_url (str): Public URL of the image (preferred) or base64 encoded image in data URL format.
39
+ Returns:
40
+ str: Board part of the FEN.
41
+ """
42
+ client = self.client
43
+
44
+ response = client.response.create(
45
+ model='gpt-4.1',
46
+ input=[
47
+ {
48
+ 'role': 'user',
49
+ 'content': [
50
+ {
51
+ 'type': 'input_text',
52
+ 'text': 'Describe the position of the pieces on the chessboard from the image. Please, nothing else but description.',
53
+ },
54
+ {'type': 'input_image', 'image_url': image_url},
55
+ ],
56
+ },
57
+ ],
58
+ )
59
+
60
+ response = client.responses.create(
61
+ model='gpt-4.1',
62
+ input=[
63
+ {
64
+ 'role': 'user',
65
+ 'content': [
66
+ {
67
+ 'type': 'input_text',
68
+ 'text': 'Describe the position of the pieces on the chessboard from the image. Please, nothing else but description.',
69
+ },
70
+ ],
71
+ },
72
+ ]
73
+ + response.output
74
+ + [
75
+ {
76
+ 'role': 'user',
77
+ 'content': [
78
+ {
79
+ 'type': 'input_text',
80
+ 'text': """\
81
+ Write down all positions with known pieces.
82
+ Use a standard one-letter code to name pieces.
83
+ It is important to use the correct case for piece code. Use upper case for white and lower case for black.
84
+ It is important to include information about all the mentioned positions.
85
+ Describe each position in a new line.
86
+ Follow format: <piece><position> (piece first, then position, no spaces)
87
+ Return nothing but lines with positions.
88
+ """,
89
+ },
90
+ ],
91
+ }
92
+ ],
93
+ )
94
+ board_pos = response.output_text
95
+
96
+ pos_dict = {}
97
+ for post_str in board_pos.splitlines():
98
+ pos_str = pos_str.strip()
99
+ if len(pos_str) != 3:
100
+ continue
101
+ piece = post_str[0]
102
+ pos = pos_str[1:3]
103
+ pos_dict[pos] = piece
104
+
105
+ board_fen = ''
106
+ for rank in range (8, 0, -1):
107
+ empty = 0
108
+ for file_c in range(ord('a'), ord('h') +1):
109
+ file = chr(file_c)
110
+ square = file + str(rank)
111
+ if square in pos_dict:
112
+ if empty > 0:
113
+ board_fen += str(empty)
114
+ empty = 0
115
+ board_fen += pos_dict[square]
116
+ else:
117
+ empty += 1
118
+ if empty > 0:
119
+ board_fen += str(empty)
120
+ if rank != 1:
121
+ board_fen += '/'
122
+
123
+ return board_fen
tools/classifier_tool.py ADDED
@@ -0,0 +1,83 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from smolagents import Tool
2
+ from openai import OpenAI
3
+
4
+ class ClassifierTool(Tool):
5
+ name = 'open_classifier'
6
+ description = """Classifies given items into given categories from perspective of specific knowledge area."""
7
+ inputs = {
8
+ 'knowledge_area': {
9
+ 'type': 'string',
10
+ 'description': 'The knowledge area that should be used for classification',
11
+ },
12
+ 'environment': { # context makes models too verbose
13
+ 'type': 'string',
14
+ 'description': 'Couple words that describe the environment or location in which items should be classified in case of plural meaning or if only part of item relevant for classification.'
15
+ },
16
+ 'categories': {
17
+ 'type': 'string',
18
+ 'description': 'Comma separated list of categories to distribute objects.',
19
+ },
20
+ 'items': {
21
+ 'type': 'string',
22
+ 'description': 'Comma separated list of items to be classified. Please include adjectives if available.',
23
+ },
24
+ }
25
+ output_type = 'string'
26
+
27
+ def __init__(
28
+ self,
29
+ client: OpenAI | None = None,
30
+ model_id: str = 'gpt-4.1-mini',
31
+ **kwargs,
32
+ ):
33
+ self.client = client or OpenAI()
34
+ self.model_id = model_id
35
+
36
+ super().__init__(**kwargs)
37
+
38
+ def forward(
39
+ self, knowledge_area: str, environment: str, categories: str, items: str
40
+ ) -> str:
41
+ response = self.client.responses.create(
42
+ model=self.model_id,
43
+ input=[
44
+ {
45
+ 'role': 'user',
46
+ 'content': [
47
+ {
48
+ 'type': 'input_text',
49
+ 'text': self._prompt(
50
+ knowledge_area=knowledge_area,
51
+ context=environment,
52
+ categories=categories,
53
+ items=items,
54
+ ),
55
+ },
56
+ ],
57
+ },
58
+ ],
59
+ )
60
+ answer = response.output_text
61
+ return answer
62
+
63
+ def _prompt(
64
+ self, knowledge_area: str, context: str, categories: str, items: str
65
+ ) -> str:
66
+ return f"""\
67
+ You are {knowledge_area} classifier located in {context} context.
68
+ I will provide you a list of items and a list of categories and context in which items should be considered.
69
+ Your task is to classify the items into the categories.
70
+ Use context to determine the meaning of the items and decide if you need to classify entire item or only part of it.
71
+ Do not miss any item and do not add any item to the list of categories.
72
+ Use highest probability category for each item.
73
+ You can add category "Other" if you are not sure about the classification.
74
+ Use only considerations from the {knowledge_area} perspective.
75
+ Explain your reasoning from {knowledge_area} perspective in {context} context and then provide final answer.
76
+ Important: Do not allow {context} influence your judgment for classification.
77
+ ITEMS: {items}
78
+ CATEGORIES: {categories}
79
+ Now provide your reasoning and finalize it with the classification in the following format:
80
+ Category 1: items list
81
+ Category 2: items list
82
+ Other (if needed): items list
83
+ """
tools/content_retriever_tool.py ADDED
@@ -0,0 +1,85 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from smolagents import Tool
2
+ from docling.document_converter import DocumentConverter
3
+ from docling.chunking import HierarchicalChunker
4
+ from sentence_transformers import SentenceTransformer, util
5
+ import torch
6
+
7
+ class ContentRetrieverTool(Tool):
8
+ name = 'retrieve_content'
9
+ description = """Retrieve content of a webpage or document in markdown format. Supports PDF, DOCX, XLSX, HTML, images, and more."""
10
+ inputs = {
11
+ "url": {
12
+ "type": "string",
13
+ "description": "The URL or local path of the webpage or document to retrieve.",
14
+ },
15
+ "query": {
16
+ "type": "string",
17
+ "description": 'The subject on the page you are looking for. The shorter, the more relevant content is returned.',
18
+ },
19
+ }
20
+ output_type = "string"
21
+
22
+ def __init__(
23
+ self,
24
+ model_name: str | None = None,
25
+ threshold: float = 0.2,
26
+ **kwargs,
27
+ ):
28
+ self.threshold = threshold
29
+ self._document_converter = DocumentConverter()
30
+ self._model = SentenceTransformer(
31
+ model_name if model_name is not None else 'all-MiniLM-L6-v2'
32
+ )
33
+ self._chunker = HierarchicalChunker()
34
+
35
+ super().__init__(**kwargs)
36
+
37
+ def forward(self, url: str, query: str) -> str:
38
+ document = self._document_converter.convert(url).document
39
+
40
+ chunks = list(self._chunker.chunk(dl_doc=document))
41
+ if len(chunks) == 0:
42
+ return 'No content found.'
43
+
44
+ chunks_text = [chunk.text for chunk in chunks]
45
+ chunks_with_context = [self._chunker.contextualize(chunk) for chunk in chunks]
46
+ chunks_context = [
47
+ chunks_with_context[i].replace(chunks_text[i], "").strip()
48
+ for i in range(len(chunks))
49
+ ]
50
+ chunk_embeddings = self._model.encode(chunks_text, convert_to_tensor=True)
51
+ context_embeddings = self._model.encode(chunks_context, convert_to_tensor=True)
52
+ query_embedding = self._model.encode(
53
+ [term.strip() for term in query.split(",") if term.strip()],
54
+ convert_to_tensor=True,
55
+ )
56
+
57
+ selected_indices = [] # aggregate indexes across chunks and context matches and for all queries
58
+ for embeddings in [
59
+ context_embeddings,
60
+ chunk_embeddings,
61
+ ]:
62
+ # Compute cosine similarities (returns 1D tensor)
63
+ for cos_scores in util.pytorch_cos_sim(query_embedding, embeddings):
64
+ # Convert to softmax probabilities
65
+ probabilities = torch.nn.functional.softmax(cos_scores, dim=0)
66
+ # Sort by probability descending
67
+ sorted_indices = torch.argsort(probabilities, descending=True)
68
+ # Accumulate until total probability reaches threshold
69
+
70
+ cumulative = 0.0
71
+ for i in sorted_indices:
72
+ cumulative += probabilities[i].item()
73
+ selected_indices.append(i.item())
74
+ if cumulative >= self.threshold:
75
+ break
76
+
77
+ selected_indices = list(
78
+ dict.fromkeys(selected_indices)
79
+ ) # remove duplicates and preserve order
80
+ selected_indices = selected_indices[::-1] # make most relevant items last for better focus
81
+
82
+ if len(selected_indices) == 0:
83
+ return "No content found."
84
+ return "\n\n".join([chunks_with_context[idx] for idx in selected_indices])
85
+
tools/get_attachments_tool.py ADDED
@@ -0,0 +1,76 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from smolagents import Tool
2
+ import requests
3
+ from urllib.parse import urljoin
4
+ import base64
5
+ import tempfile
6
+
7
+ class GetAttachmentTool(Tool):
8
+ name = "get_attachment"
9
+ description = """
10
+ Format to retrieve attachment. Options are: URL (preferred), DATA_URL, LOCAL_FILE_PATH, TEXT. URL returns the URL of the file, DATA_URL returns a base64 encoded data URL, LOCAL_FILE_PATH returns a local file path to the downloaded file, and TEXT returns the content of the file as text.
11
+ """
12
+ inputs = {
13
+ "fmt": {
14
+ "type": "string",
15
+ "description": """Format to retrieve attachment. Options are: URL, DATA_URL, LOCAL_FILE_PATH (preferred for current testing environment), TEXT. URL returns the URL of the file, DATA_URL returns a base64 encoded data URL, LOCAL_FILE_PATH returns a local file path to the downloaded file, and TEXT returns the content of the file as text.""",
16
+ "nullable": True,
17
+ "default": "URL",
18
+ }
19
+ }
20
+ output_type = "string"
21
+
22
+ def __init__(
23
+ self,
24
+ agent_evaluation_api: str | None = None,
25
+ task_id: str | None = None,
26
+ **kwargs,
27
+ ):
28
+ # Default to Hugging Face GAIA testing space
29
+ self.agent_evaluation_api = (
30
+ agent_evaluation_api
31
+ if agent_evaluation_api is not None
32
+ else "https://agents-course-unit4-scoring.hf.space/"
33
+ )
34
+ self.task_id = task_id
35
+ super().__init__(**kwargs)
36
+
37
+ def attachment_for(self, task_id: str| None):
38
+ self.task_id = task_id
39
+
40
+ def forward(self, fmt: str = "URL") -> str:
41
+ # Ensure the format is uppercase for comparison
42
+ fmt = fmt.upper()
43
+ assert fmt in ["URL", "DATA_URL", "LOCAL_FILE_PATH", "TEXT"]
44
+
45
+ if not self.task_id:
46
+ return "No task_id provided to retrieve attachment."
47
+
48
+ file_url = urljoin(self.agent_evaluation_api, f"files/{self.task_id}")
49
+ if fmt == "URL":
50
+ return file_url
51
+
52
+ response = requests.get(
53
+ file_url,
54
+ headers={
55
+ "Content-Type": "application/json",
56
+ "Accept": "application/json",
57
+ },
58
+ )
59
+ if 400 <= response.status_code < 500:
60
+ raise ValueError(f"Error fetching file: {response.status_code} {response.reason}")
61
+
62
+ response.raise_for_status()
63
+ mime = response.headers.get("content-type", "text/plain")
64
+ if fmt == "TEXT":
65
+ if mime.startswith("text/"):
66
+ return response.text
67
+ else:
68
+ raise ValueError(f"Content of file type {mime} cannot be retrieved as TEXT")
69
+ elif fmt == "DATA_URL":
70
+ return f"data:{mime};base64,{base64.b64encode(response.content).decode("utf-8")}"
71
+ elif fmt == "LOCAL_FILE_PATH":
72
+ with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
73
+ tmp_file.write(response.content)
74
+ return tmp_file.name
75
+ else:
76
+ raise ValueError(f"Unsupported format: {fmt}. Supported formats are URL, DATA_URL, LOCAL_FILEPATH, and TEXT.")
tools/google_search_tools.py ADDED
@@ -0,0 +1,79 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from smolagents import Tool
2
+ from googleapiclient.discovery import build
3
+ import os
4
+ class GoogleSearchTool(Tool):
5
+ name = "web_search"
6
+ description = """Performs a google web search for a query then returns top search results in markdown format."""
7
+
8
+ inputs = {
9
+ "query": {
10
+ "type": "string",
11
+ "description": "The query to perform a web search for"
12
+ }
13
+ }
14
+ output_type = "string"
15
+
16
+ skip_forward_signature_validation = True
17
+
18
+ def __init__(
19
+ self,
20
+ api_key: str | None = None,
21
+ search_engine_id: str | None = None,
22
+ num_results: int = 10,
23
+ **kwargs,
24
+ ):
25
+ from dotenv import load_dotenv
26
+ load_dotenv()
27
+ api_key = os.getenv("GOOGLE_API_KEY")
28
+ search_engine_id = os.getenv("GOOGLE_SEARCH_ENGINE_ID")
29
+ if not api_key:
30
+ raise ValueError("GOOGLE_API_KEY is not set")
31
+ if not search_engine_id:
32
+ raise ValueError("GOOGLE_SEARCH_ENGINE_ID is not set")
33
+
34
+ self.cse = build(
35
+ "customsearch",
36
+ "v1",
37
+ developerKey=api_key
38
+ ).cse()
39
+ self.cx = search_engine_id
40
+ self.num = num_results
41
+ super().__init__(**kwargs)
42
+
43
+ def _collect_params(self) -> dict:
44
+ return {}
45
+
46
+ def forward(self, query: str, *args, **kwargs) -> str:
47
+ params = {
48
+ "q": query,
49
+ "cx": self.cx,
50
+ "fields": "items(link, title, snippet)",
51
+ "num": self.num,
52
+ }
53
+
54
+ params = params | self._collect_params(*args, **kwargs)
55
+ res = self.cse.list(**params).execute()
56
+ if "items" not in res:
57
+ return "No results found"
58
+
59
+ return "\n\n".join(f"{item['title']}\n{item['link']}\n{item['snippet']}" for item in res["items"])
60
+
61
+ class GoogleSiteSearchTool(GoogleSearchTool):
62
+ name = "site_search"
63
+ description = """Searches a specific website for a given query and returns the site contents in markdown format. Use when information is likely to be found on a particular domain, such as reddit.com, wikipedia.org, ieee.org, or arxiv.org."""
64
+ inputs = {
65
+ "query": {
66
+ "type": "string",
67
+ "description": "The query to perform search."
68
+ },
69
+ "site": {
70
+ "type": "string",
71
+ "description": "The domain of the site on which to search",
72
+ },
73
+ }
74
+
75
+ def _collect_params(self, site: str) -> dict:
76
+ return {
77
+ "siteSearch": site,
78
+ "siteSearchFilter": "i",
79
+ }
tools/speech_recognition_tool.py ADDED
@@ -0,0 +1,115 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from smolagents import Tool
2
+ import torch
3
+ from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline, logging
4
+ import warnings
5
+
6
+ class SpeechRecognitionTool(Tool):
7
+ name = 'speech_to_text'
8
+ description = '''Transcribes speech from audio'''
9
+
10
+ inputs = {
11
+ 'audio': {
12
+ 'type': 'string',
13
+ 'description': 'Path to the audio file to transcribe.',
14
+ },
15
+ 'with_time_markers': {
16
+ 'type': 'boolean',
17
+ 'description': 'Whether to include timestamps in the transcription output. Each timestamp appears on its own line in the format [float, float], indicating the number of seconds elapsed from the start of the audio.',
18
+ 'nullable': True,
19
+ 'default': False,
20
+ },
21
+ }
22
+ output_type = 'string'
23
+
24
+ chunk_length_s = 30
25
+
26
+ def __new__(cls, *args, **kwargs):
27
+ device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
28
+ torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32
29
+ model_id = 'openai/whisper-large-v3-turbo'
30
+ model = AutoModelForSpeechSeq2Seq.from_pretrained(
31
+ model_id,
32
+ torch_dtype=torch_dtype,
33
+ low_cpu_mem_usage=True,
34
+ use_safetensors=True,
35
+ )
36
+ model.to(device)
37
+ processor = AutoProcessor.from_pretrained(model_id)
38
+
39
+ logging.set_verbosity_error()
40
+ warnings.filterwarnings(
41
+ 'ignore',
42
+ category=FutureWarning,
43
+ message=r'.*The input name "inputs" is deprecated.*',
44
+ )
45
+ cls.pipe = pipeline(
46
+ 'automatic-speech-recognition',
47
+ model=model,
48
+ tokenizer=processor.tokenizer,
49
+ feature_extractor=processor.feature_extractor,
50
+ torch_dtype=torch_dtype,
51
+ device=device,
52
+ chunk_length_s=cls.chunk_length_s,
53
+ return_timestamps=True,
54
+ )
55
+
56
+ return super().__new__(cls, *args, **kwargs)
57
+
58
+ def forward(self, audio: str, with_time_markers: bool = False) -> str:
59
+ '''
60
+ Transcribes speech from audio.
61
+
62
+ Args:
63
+ audio (str): Path to the audio file to transcribe.
64
+ with_time_markers (bool): Whether to include timestamps in the transcription output. Each timestamp appears on its own line in the format [float], indicating the number of seconds elapsed from the start of the audio
65
+ Returns:
66
+ str: The transcribed text.
67
+ '''
68
+ result = self.pipe(audio)
69
+ if not with_time_markers:
70
+ return result['text'].strip()
71
+
72
+ txt = ""
73
+ for chunk in self._normalize_chunks(result["chunks"]):
74
+ txt += f"[{chunk['start']:0.2f}]\n{chunk['text']}\n[{chunk['end']:0.2f}]\n"
75
+ return txt.strip()
76
+
77
+ def _normalize_chunks(self, chunks):
78
+ chunk_length_s = self.chunk_length_s
79
+ absolute_offset = 0.0
80
+ chunk_offset = 0.0
81
+ normalized = []
82
+
83
+ for chunk in chunks:
84
+ timestamp_start = chunk['timestamp'][0]
85
+ timestamp_end = chunk['timestamp'][1]
86
+ if timestamp_start < chunk_offset:
87
+ absolute_offset += chunk_length_s
88
+ chunk_offset = timestamp_start
89
+ absolute_start = absolute_offset + timestamp_start
90
+
91
+ if timestamp_end < timestamp_start:
92
+ absolute_offset += chunk_length_s
93
+ absolute_end = absolute_offset + timestamp_end
94
+ chunk_offset = timestamp_end
95
+
96
+ chunk_text = chunk['text'].strip()
97
+ if chunk_text:
98
+ normalized.append(
99
+ {
100
+ 'start': absolute_start,
101
+ 'end': absolute_end,
102
+ 'text': chunk_text,
103
+ }
104
+ )
105
+ return normalized
106
+
107
+ # TEST THE SCRIPT (UNCOMMENT LINES BELOW)
108
+ # speech_to_text = SpeechRecognitionTool()
109
+
110
+ # transcription = speech_to_text(
111
+ # audio="https://agents-course-unit4-scoring.hf.space/files/99c9cc74-fdc8-46c6-8f8d-3ce2d3bfeea3",
112
+ # with_time_markers=True,
113
+ # )
114
+
115
+ # print(transcription)
tools/youtube_video_tool.py ADDED
@@ -0,0 +1,393 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from smolagents import Tool
2
+ from openai import OpenAI
3
+ from tools.speech_recognition_tool import SpeechRecognitionTool
4
+ from io import BytesIO
5
+ import yt_dlp
6
+ import av
7
+ import torchaudio
8
+ import subprocess
9
+ import requests
10
+ import base64
11
+ import tempfile
12
+ import re
13
+ import os
14
+
15
+ class YouTubeVideoTool(Tool):
16
+ name = 'youtube_video'
17
+ description = '''Process the video and return the requested information from it.'''
18
+ inputs = {
19
+ 'url': {
20
+ 'type': 'string',
21
+ 'description': 'The URL of the YouTube video.',
22
+ },
23
+ 'query': {
24
+ 'type': 'string',
25
+ 'description': 'The question to answer',
26
+ },
27
+ }
28
+ output_type = 'string'
29
+
30
+ def __init__(
31
+ self,
32
+ video_quality: int = 360,
33
+ frames_interval: int | float | None = 2,
34
+ chunk_duration: int | float | None = 2,
35
+ speech_recognition_tool: SpeechRecognitionTool | None = None,
36
+ client: OpenAI | None = None,
37
+ model_id: str = 'gpt-4.1-mini',
38
+ debug: bool = False,
39
+ **kwargs,
40
+ ):
41
+ self.video_quality = video_quality
42
+ self.speech_recognition_tool = speech_recognition_tool
43
+ self.frames_interval = frames_interval
44
+ self.chunk_duration = chunk_duration
45
+
46
+ self.client = client or OpenAI()
47
+ self.model_id = model_id
48
+
49
+ self.debug = debug
50
+
51
+ super().__init__(**kwargs)
52
+
53
+ def forward(self, url: str, query: str):
54
+ '''
55
+ Process the video and return the requested information.
56
+ Args:
57
+ url(str): The URL of the YouTube video.
58
+ query(str): The question to answer.
59
+ Returns:
60
+ str: Answer to the query
61
+ '''
62
+ answer = ''
63
+ for chunk in self._split_video_into_chunks(url):
64
+ prompt = self._prompt(
65
+ chunk,
66
+ query,
67
+ answer,
68
+ )
69
+ response = self.client.responses.create(
70
+ model='gpt-4.1-mini',
71
+ input=[
72
+ {
73
+ 'role': 'user',
74
+ 'content': [
75
+ {
76
+ 'type': 'input_text',
77
+ 'text': prompt,
78
+ },
79
+ ],
80
+ },
81
+ ],
82
+ )
83
+ answer = response.output_text
84
+ if self.debug:
85
+ print(
86
+ f"CHUNK {chunk['start']} - {chunk['end']}:\n\n{prompt}\n\nANSWER:\n{answer}"
87
+ )
88
+ if answer.strip() == 'I need to keep watching.':
89
+ answer = ''
90
+ return answer
91
+ def _prompt(self, chunk, query, aggregated_answer):
92
+ prompt = [
93
+ f"""\
94
+ These are some frames of a YouTube video.
95
+ I will ask a question about the entire video, but you'll only see part of it at a time.
96
+ Aggregate answer about the entire video, use information about previous parts but do not reference the previous parts in the answer directly.
97
+
98
+ Ground your answer based on video title, description, captions, vide frames or answer from previous parts.
99
+ If no evidences presented just say "I need to keep watching".
100
+
101
+ VIDEO TITLE:
102
+ {chunk["title"]}
103
+
104
+ VIDEO DESCRIPTION:
105
+ {chunk["description"]}
106
+
107
+ FRAMES SUBTITLES:
108
+ {chunk["captions"]}"""
109
+ ]
110
+
111
+ if aggregated_answer:
112
+ prompt.append(f"""\
113
+ Here is the answer to the same question based on the previous video parts:
114
+
115
+ BASED ON PREVIOUS PARTS:
116
+ {aggregated_answer}""")
117
+ prompt.append(f"""\
118
+ QUESTION:
119
+ {query}""")
120
+
121
+ return "\n\n".join(prompt)
122
+
123
+ def _split_video_into_chunks(
124
+ self, url: str, with_captions: bool = True, with_frames: bool = True
125
+ ):
126
+ video = self._process_video(
127
+ url, with_captions=with_captions, with_frames=with_frames
128
+ )
129
+ video_duration = video['duration']
130
+ chunk_duration = self.chunk_duration or video_duration
131
+
132
+ chunk_start = 0.0
133
+ while chunk_start < video_duration:
134
+ chunk_end = min(chunk_start + chunk_duration, video_duration)
135
+ chunk = self._get_video_chunk(video, chunk_start, chunk_end)
136
+ yield chunk
137
+ chunk_start += chunk_duration
138
+
139
+ def _get_video_chunk(self, video, start, end):
140
+ chunk_captions = [
141
+ c for c in video['captions'] if c['start'] <= end and c['end'] >= start
142
+ ]
143
+ chunk_frames = [
144
+ f
145
+ for f in video['frames']
146
+ if f['timestamp'] >= start and f['timestamp'] <= end
147
+ ]
148
+
149
+ return {
150
+ 'title': video['title'],
151
+ 'description': video['description'],
152
+ 'start': start,
153
+ 'end': end,
154
+ 'captions': '\n'.join([c['text'] for c in chunk_captions]),
155
+ 'frames': chunk_frames,
156
+ }
157
+
158
+ def _process_video(
159
+ self, url: str, with_captions: bool = True, with_frames: bool = True
160
+ ):
161
+ lang = 'en'
162
+ info = self._get_video_info(url, lang)
163
+
164
+ if with_captions:
165
+ captions = self._extract_captions(
166
+ lang, info.get('subtitles', {}), info.get('automatic_captions', {})
167
+ )
168
+ if not captions and self.speech_recognition_tool:
169
+ audio_url = self._select_audio_format(info['formats'])
170
+ audio = self._capture_audio(audio_url)
171
+ with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp:
172
+ tmp.write(audio.read())
173
+ tmp.flush()
174
+ tmp_path = tmp.name # Save the path before closing
175
+
176
+ # Now the file is closed, safe to use
177
+ try:
178
+ transcription = self.speech_recognition_tool(audio=tmp_path, with_time_markers=True)
179
+ captions = []
180
+ pattern = re.compile(r'\[(\d+\.\d+)\]\n(.+?)\n\[(\d+\.\d+)\]', re.DOTALL)
181
+ for match in pattern.finditer(transcription):
182
+ start, text, end = match.groups()
183
+ captions.append({
184
+ 'start': float(start),
185
+ 'end': float(end),
186
+ 'text': text.strip(),
187
+ })
188
+ finally:
189
+ os.remove(tmp_path) # Clean up the temp file
190
+ else:
191
+ captions = []
192
+
193
+ if with_frames:
194
+ video_url = self._select_video_format(info['formats'], 360)['url']
195
+ frames = self._capture_video_frames(video_url, self.frames_interval)
196
+ else:
197
+ frames = []
198
+
199
+ return {
200
+ 'id': info['id'],
201
+ 'title': info['title'],
202
+ 'description': info['description'],
203
+ 'duration': info['duration'],
204
+ 'captions': captions,
205
+ 'frames': frames,
206
+ }
207
+
208
+ def _get_video_info(self, url: str, lang: str):
209
+ ydl_opts = {
210
+ 'quiet': True,
211
+ 'skip_download': True,
212
+ 'format': 'bestvideo[ext=mp4][height<=360]+bestaudio[ext=m4a]/best[height<=360]',
213
+ "forceurl": True,
214
+ "noplaylist": True,
215
+ "writesubtitles": True,
216
+ "writeautomaticsub": True,
217
+ "subtitlesformat": "vtt",
218
+ "subtitleslangs": [lang],
219
+ }
220
+
221
+ with yt_dlp.YoutubeDL(ydl_opts) as ydl:
222
+ info = ydl.extract_info(url, download=False)
223
+
224
+ return info
225
+
226
+ def _extract_captions(self, lang, subtitles, auto_captions):
227
+ caption_tracks = subtitles.get(lang) or auto_captions.get(lang) or []
228
+
229
+ structured_captions = []
230
+
231
+ srt_track = next(
232
+ (track for track in caption_tracks if track['ext'] == 'srt'), None
233
+ )
234
+ vtt_track = next(
235
+ (track for track in caption_tracks if track['ext'] == 'vtt'), None
236
+ )
237
+
238
+ if srt_track:
239
+ import pysrt
240
+ response = requests.get(srt_track['url'])
241
+ response.raise_for_status()
242
+ srt_data = response.content.decode('utf-8')
243
+
244
+ def to_sec(t):
245
+ return (
246
+ t.hours * 3600 + t.minutes * 60 + t.seconds + t.milliseconds / 1000
247
+ )
248
+
249
+ structured_captions = [
250
+ {
251
+ "start": to_sec(sub.start),
252
+ "end": to_sec(sub.end),
253
+ "text": sub.text.strip(),
254
+ }
255
+ for sub in pysrt.from_string(srt_data)
256
+ ]
257
+ if vtt_track:
258
+ import webvtt
259
+ from io import StringIO
260
+
261
+ response = requests.get(vtt_track['url'])
262
+ response.raise_for_status()
263
+ vtt_data = response.text
264
+
265
+ vtt_file = StringIO(vtt_data)
266
+
267
+ def to_sec(t):
268
+ """Convert 'HH:MM:SS.mmm' to float seconds"""
269
+ h, m, s = t.split(":")
270
+ s, ms = s.split(".")
271
+ return int(h) * 3600 + int(m) * 60 + int(s) + int(ms) / 1000
272
+
273
+ for caption in webvtt.read_buffer(vtt_file):
274
+ structured_captions.append(
275
+ {
276
+ 'start': to_sec(caption.start),
277
+ "end": to_sec(caption.end),
278
+ "text": caption.text.strip(),
279
+ }
280
+ )
281
+ return structured_captions
282
+
283
+ def _select_video_format(self, formats, video_quality):
284
+ video_format = next(
285
+ f
286
+ for f in formats
287
+ if f.get('vcodec') != 'none' and f.get('height') == video_quality
288
+ )
289
+ return video_format
290
+
291
+ def _capture_video_frames(self, video_url, capture_interval_sec=None):
292
+ import tempfile
293
+ import os
294
+
295
+ with tempfile.NamedTemporaryFile(suffix='.mkv', delete=False) as tmp:
296
+ tmp_path = tmp.name
297
+
298
+ try:
299
+ ffmpeg_cmd = [
300
+ "ffmpeg",
301
+ "-y", # Overwrite output file if needed
302
+ "-i", video_url,
303
+ "-f", "matroska",
304
+ tmp_path,
305
+ ]
306
+ result = subprocess.run(ffmpeg_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
307
+ if result.returncode != 0 or not os.path.exists(tmp_path) or os.path.getsize(tmp_path) == 0:
308
+ raise RuntimeError(f"ffmpeg failed to create a valid video file for {video_url}")
309
+
310
+ container = av.open(tmp_path)
311
+ stream = container.streams.video[0]
312
+ time_base = stream.time_base
313
+
314
+ frames = []
315
+ next_capture_time = 0
316
+ for frame in container.decode(stream):
317
+ if frame.pts is None:
318
+ continue
319
+ timestamp = float(frame.pts * time_base)
320
+ if capture_interval_sec is None or timestamp >= next_capture_time:
321
+ frames.append(
322
+ {
323
+ 'timestamp': timestamp,
324
+ 'image': frame.to_image(), # PIL image
325
+ }
326
+ )
327
+ if capture_interval_sec is not None:
328
+ next_capture_time += capture_interval_sec
329
+ container.close()
330
+ return frames
331
+ finally:
332
+ os.remove(tmp_path)
333
+
334
+ def _base64_frames(self, frames):
335
+ base64_frames = []
336
+ for f in frames:
337
+ buffered = BytesIO()
338
+ f['image'].save(buffered, format='JPEG')
339
+ encoded = base64.b64encode(buffered.getvalue()).decode('utf-8')
340
+ base64_frames.append(encoded)
341
+ return base64_frames
342
+
343
+ def _select_audio_format(self, formats):
344
+ audio_formats = [
345
+ f
346
+ for f in formats
347
+ if f.get('vcodec') == 'none'
348
+ and f.get('acodec')
349
+ and f.get('acodec') != 'none'
350
+ ]
351
+
352
+ if not audio_formats:
353
+ raise ValueError('No valid audio-only formats found')
354
+
355
+ # Prefer m4a > webm, highest abr first
356
+ preferred_exts = ['m4a', 'webm']
357
+
358
+ def sort_key(f):
359
+ ext_score = (
360
+ preferred_exts.index(f['ext']) if f['ext'] in preferred_exts else 99
361
+ )
362
+ abr = f.get('abr') or 0
363
+ return (ext_score, -abr)
364
+
365
+ audio_formats.sort(key=sort_key)
366
+ return audio_formats[0]['url']
367
+
368
+ def _capture_audio(self, audio_url) -> BytesIO:
369
+ audio_buffer = BytesIO()
370
+ ffmpeg_audio_cmd = [
371
+ "ffmpeg",
372
+ "-i",
373
+ audio_url,
374
+ "-f",
375
+ "wav",
376
+ "-acodec",
377
+ "pcm_s16le", # Whisper prefers PCM
378
+ "-ac",
379
+ "1", # Mono
380
+ "-ar",
381
+ "16000", # 16kHz for Whisper
382
+ "-",
383
+ ]
384
+
385
+ result = subprocess.run(
386
+ ffmpeg_audio_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
387
+ )
388
+ if result.returncode != 0:
389
+ raise RuntimeError('ffmpeg failed:\n' + result.stderr.decode())
390
+
391
+ audio_buffer = BytesIO(result.stdout)
392
+ audio_buffer.seek(0)
393
+ return audio_buffer