|
|
import os
|
|
|
import subprocess
|
|
|
import base64
|
|
|
import mimetypes
|
|
|
from typing import List, Union
|
|
|
from pathlib import Path
|
|
|
import math
|
|
|
import requests
|
|
|
import pandas as pd
|
|
|
import cv2
|
|
|
from pytubefix import YouTube
|
|
|
from youtube_transcript_api import YouTubeTranscriptApi
|
|
|
from openai import OpenAI, APIError
|
|
|
|
|
|
from langchain_core.tools import tool
|
|
|
from langchain_community.document_loaders import CSVLoader
|
|
|
|
|
|
from web_search_agent import web_search_graph
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@tool("sort_tool")
|
|
|
def sort_tool(items: List[Union[float, str]], order: str = "ascending") -> Union[List[Union[float, str]], str]:
|
|
|
"""
|
|
|
Sort a list of numbers (in numeric order) or strings (in alphabetical order).
|
|
|
Use this tool whenever you need to sort a list of items.
|
|
|
|
|
|
Args:
|
|
|
items (List[Union[float, str]]): The list of items to sort. The list must contain either only numbers or only strings.
|
|
|
order (str, optional): The sorting order. Valid values: 'ascending' or 'descending'. Default is 'ascending'.
|
|
|
|
|
|
Returns:
|
|
|
Union[List[Union[float, str]], str]: The sorted list if successful, or an error message string in case of failure.
|
|
|
"""
|
|
|
print(f"--- ESECUZIONE DEL TOOL 'sort_list' CON INPUT: items={items}, order='{order}' ---")
|
|
|
|
|
|
|
|
|
if not items:
|
|
|
return []
|
|
|
|
|
|
|
|
|
normalized_order = order.lower().strip()
|
|
|
if normalized_order == "ascending":
|
|
|
reverse_flag = False
|
|
|
elif normalized_order == "descending":
|
|
|
reverse_flag = True
|
|
|
else:
|
|
|
return "Error: The 'order' value is invalid. Allowed values are 'ascending' or 'descending'."
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
|
sorted_items = sorted(items, reverse=reverse_flag)
|
|
|
return sorted_items
|
|
|
except TypeError:
|
|
|
|
|
|
|
|
|
return "Error: The list contains incompatible data types that cannot be sorted together (e.g., numbers and strings)."
|
|
|
except Exception as e:
|
|
|
return f"An unexpected error occurred during sorting: {e}"
|
|
|
|
|
|
|
|
|
@tool("download_tool")
|
|
|
def download_tool(task_id: str) -> Union[str, str]:
|
|
|
"""
|
|
|
Download a file associated with a task_id from a predefined URL.
|
|
|
The file is saved in the 'uploads' directory with the name 'task_id.ext',
|
|
|
where the extension (.ext) is determined dynamically from the server response.
|
|
|
|
|
|
Args:
|
|
|
task_id (str): The unique identifier for the task and the file to download.
|
|
|
|
|
|
Returns:
|
|
|
Union[str, str]: The filename of the downloaded file if successful or an error message string in case of failure.
|
|
|
"""
|
|
|
print(f"--- ESECUZIONE DEL TOOL 'download_file' CON INPUT: task_id={task_id} ---")
|
|
|
|
|
|
|
|
|
BASE_URL = "https://agents-course-unit4-scoring.hf.space/files/"
|
|
|
UPLOADS_DIR = "./uploads/"
|
|
|
|
|
|
|
|
|
try:
|
|
|
os.makedirs(UPLOADS_DIR, exist_ok=True)
|
|
|
except OSError as e:
|
|
|
error_message = f"Error: Unable to create the destination directory '{UPLOADS_DIR}'. Details: {e}"
|
|
|
print(error_message)
|
|
|
return error_message
|
|
|
|
|
|
|
|
|
url = f"{BASE_URL}{task_id}"
|
|
|
try:
|
|
|
|
|
|
with requests.get(url, stream=True, timeout=30) as response:
|
|
|
|
|
|
response.raise_for_status()
|
|
|
|
|
|
|
|
|
content_disposition = response.headers.get('content-disposition')
|
|
|
if not content_disposition:
|
|
|
error_message = "Error: The server response does not contain the 'content-disposition' header to get the file name."
|
|
|
print(error_message)
|
|
|
return error_message
|
|
|
|
|
|
|
|
|
parts = content_disposition.split(';')
|
|
|
filename_part = next((part for part in parts if 'filename=' in part), None)
|
|
|
|
|
|
if not filename_part:
|
|
|
error_message = "Error: Unable to find 'filename' in the 'content-disposition' header."
|
|
|
print(error_message)
|
|
|
return error_message
|
|
|
|
|
|
original_filename = filename_part.split('=')[1].strip().strip('"')
|
|
|
_, extension = os.path.splitext(original_filename)
|
|
|
|
|
|
if not extension:
|
|
|
error_message = f"Error: Unable to get the file extension from '{original_filename}'."
|
|
|
print(error_message)
|
|
|
return error_message
|
|
|
|
|
|
|
|
|
local_filename = f"{task_id}{extension}"
|
|
|
local_filepath = os.path.join(UPLOADS_DIR, local_filename)
|
|
|
|
|
|
with open(local_filepath, 'wb') as f:
|
|
|
|
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
|
f.write(chunk)
|
|
|
|
|
|
success_message = f"File scaricato con successo e salvato in: {local_filepath}"
|
|
|
print(success_message)
|
|
|
return local_filename
|
|
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
|
|
|
error_message = f"Network error occurred while downloading the file: {e}"
|
|
|
print(error_message)
|
|
|
return error_message
|
|
|
except Exception as e:
|
|
|
|
|
|
error_message = f"An unexpected error occurred: {e}"
|
|
|
print(error_message)
|
|
|
return error_message
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@tool("add_tool")
|
|
|
def add_tool(numbers: List[float]) -> float:
|
|
|
"""
|
|
|
Calculate the sum of a list of numbers.
|
|
|
Use this tool when you need to perform a sum operation on multiple numbers.
|
|
|
|
|
|
Args:
|
|
|
numbers (List[float]): The list of numbers to be summed.
|
|
|
"""
|
|
|
print(f"--- ESECUZIONE DEL TOOL 'sum_numbers' CON INPUT: {numbers} ---")
|
|
|
return sum(numbers)
|
|
|
|
|
|
|
|
|
@tool("multiply_tool")
|
|
|
def multiply_tool(numbers: List[float]) -> float:
|
|
|
"""
|
|
|
Calculate the product of a list of numbers.
|
|
|
Use this tool when you need to multiply two or more numbers together.
|
|
|
|
|
|
Args:
|
|
|
numbers (List[float]): The list of numbers to be multiplied.
|
|
|
"""
|
|
|
print(f"--- ESECUZIONE DEL TOOL 'multiply_numbers' CON INPUT: {numbers} ---")
|
|
|
if not numbers:
|
|
|
return 0
|
|
|
return math.prod(numbers)
|
|
|
|
|
|
|
|
|
@tool("subtract_tool")
|
|
|
def subtract_tool(minuend: float, subtrahend: float) -> float:
|
|
|
"""
|
|
|
Calculate the subtraction between two numbers (minuend - subtrahend).
|
|
|
Use this tool to subtract one number from another.
|
|
|
|
|
|
Args:
|
|
|
minuend (float): The number from which to subtract (the first number).
|
|
|
subtrahend (float): The number to subtract (the second number).
|
|
|
"""
|
|
|
print(f"--- ESECUZIONE DEL TOOL 'subtract_numbers' CON INPUT: minuend={minuend}, subtrahend={subtrahend} ---")
|
|
|
return minuend - subtrahend
|
|
|
|
|
|
|
|
|
@tool("divide_tool")
|
|
|
def divide_tool(dividend: float, divisor: float) -> Union[float, str]:
|
|
|
"""
|
|
|
Calculate the division between two numbers (dividend / divisor).
|
|
|
Also handles the case of division by zero.
|
|
|
|
|
|
Args:
|
|
|
dividend (float): The number to be divided (the numerator).
|
|
|
divisor (float): The number to divide by (the denominator).
|
|
|
"""
|
|
|
print(f"--- ESECUZIONE DEL TOOL 'divide_numbers' CON INPUT: dividend={dividend}, divisor={divisor} ---")
|
|
|
if divisor == 0:
|
|
|
return "Error: Division by zero is not allowed."
|
|
|
return dividend / divisor
|
|
|
|
|
|
|
|
|
@tool("modulus_tool")
|
|
|
def modulus_tool(dividend: float, divisor: float) -> Union[float, str]:
|
|
|
"""
|
|
|
Calculate the remainder of the division between two numbers (dividend % divisor).
|
|
|
Use this tool when asked for the 'remainder' or the 'modulus' of a division.
|
|
|
|
|
|
Args:
|
|
|
dividend (float): The number being divided (the numerator).
|
|
|
divisor (float): The number by which to divide (the denominator).
|
|
|
"""
|
|
|
print(f"--- ESECUZIONE DEL TOOL 'calculate_remainder' CON INPUT: dividend={dividend}, divisor={divisor} ---")
|
|
|
if divisor == 0:
|
|
|
return "Error: The divisor cannot be zero for the modulus operation."
|
|
|
return dividend % divisor
|
|
|
|
|
|
|
|
|
@tool("power_tool")
|
|
|
def power_tool(base: float, exponent: float) -> Union[float, str]:
|
|
|
"""
|
|
|
Calculate a number raised to a power (base^exponent).
|
|
|
Use this tool for exponentiation operations.
|
|
|
|
|
|
Args:
|
|
|
base (float): The base of the operation.
|
|
|
exponent (float): The exponent to which the base is raised.
|
|
|
"""
|
|
|
print(f"--- ESECUZIONE DEL TOOL 'calculate_power' CON INPUT: base={base}, exponent={exponent} ---")
|
|
|
try:
|
|
|
|
|
|
result = math.pow(base, exponent)
|
|
|
return result
|
|
|
except ValueError:
|
|
|
|
|
|
return "Error: Invalid operation. Ensure that the base and exponent do not result in a complex number (e.g., even root of a negative number)."
|
|
|
|
|
|
|
|
|
@tool("square_root_tool")
|
|
|
def square_root_tool(number: float) -> Union[float, str]:
|
|
|
"""
|
|
|
Calculate the square root of a non-negative number.
|
|
|
Use this tool specifically to compute the square root.
|
|
|
|
|
|
Args:
|
|
|
number (float): The number for which to calculate the square root. Must be >= 0.
|
|
|
"""
|
|
|
print(f"--- ESECUZIONE DEL TOOL 'square_root' CON INPUT: number={number} ---")
|
|
|
if number < 0:
|
|
|
return "Error: Cannot calculate the square root of a negative number."
|
|
|
return math.sqrt(number)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@tool("tabular_tool")
|
|
|
def tabular_tool(filename: str) -> Union[str, str]:
|
|
|
"""
|
|
|
Analyze a local tabular data file (CSV, XLSX, XLS) and return its content
|
|
|
as a formatted string. For Excel files, each worksheet is processed individually.
|
|
|
|
|
|
Args:
|
|
|
filename (str): The filename of the CSV, XLSX, or XLS file to analyze.
|
|
|
|
|
|
Returns:
|
|
|
Union[str, str]: A formatted string containing the file's data, or an error message in case of issues.
|
|
|
"""
|
|
|
print(f"--- ESECUZIONE DEL TOOL 'analyze_tabular_data' CON INPUT: filename='{filename}' ---")
|
|
|
UPLOADS_DIR = "./uploads/"
|
|
|
file_path = os.path.join(UPLOADS_DIR, filename)
|
|
|
|
|
|
|
|
|
if not os.path.exists(file_path):
|
|
|
return f"Error: The file '{file_path}' was not found. Make sure it has been downloaded first."
|
|
|
|
|
|
try:
|
|
|
|
|
|
file_extension = Path(file_path).suffix.lower()
|
|
|
csv_files_to_process = []
|
|
|
|
|
|
|
|
|
if file_extension in ['.xlsx', '.xls']:
|
|
|
print(f"Rilevato file Excel. Inizio la conversione dei fogli in CSV temporanei...")
|
|
|
|
|
|
|
|
|
excel_sheets = pd.read_excel(file_path, sheet_name=None)
|
|
|
|
|
|
if not excel_sheets:
|
|
|
return f"Error: The Excel file '{file_path}' is empty or contains no worksheets."
|
|
|
|
|
|
|
|
|
base_name = Path(file_path).stem
|
|
|
uploads_dir = Path(file_path).parent
|
|
|
|
|
|
for sheet_name, df in excel_sheets.items():
|
|
|
|
|
|
safe_sheet_name = "".join(c for c in sheet_name if c.isalnum() or c in (' ', '_')).rstrip()
|
|
|
temp_csv_path = uploads_dir / f"{base_name}_sheet_{safe_sheet_name}.csv"
|
|
|
|
|
|
|
|
|
df.to_csv(temp_csv_path, index=False)
|
|
|
print(f" - Foglio '{sheet_name}' convertito e salvato in: {temp_csv_path}")
|
|
|
csv_files_to_process.append(str(temp_csv_path))
|
|
|
|
|
|
|
|
|
elif file_extension == '.csv':
|
|
|
print(f"Rilevato file CSV. Verrà processato direttamente.")
|
|
|
csv_files_to_process.append(file_path)
|
|
|
|
|
|
|
|
|
else:
|
|
|
return f"Error: Unsupported file format '{file_extension}'. This tool supports only CSV, XLSX, and XLS."
|
|
|
|
|
|
|
|
|
if not csv_files_to_process:
|
|
|
return "Error: No file to process was found."
|
|
|
|
|
|
all_docs = []
|
|
|
for csv_path in csv_files_to_process:
|
|
|
loader = CSVLoader(file_path=csv_path)
|
|
|
docs = loader.load()
|
|
|
all_docs.extend(docs)
|
|
|
|
|
|
|
|
|
|
|
|
formatted_output = "\n\n---\n\n".join(
|
|
|
[
|
|
|
f'<Document source="{Path(doc.metadata["source"]).name}" page="{doc.metadata.get("page", 0)}">\n{doc.page_content[:2500]}\n</Document>'
|
|
|
for doc in all_docs
|
|
|
]
|
|
|
)
|
|
|
|
|
|
print("Analisi completata con successo.")
|
|
|
return formatted_output
|
|
|
|
|
|
except Exception as e:
|
|
|
error_message = f"An unexpected error occurred while analyzing the file '{file_path}': {e}"
|
|
|
print(error_message)
|
|
|
return error_message
|
|
|
|
|
|
|
|
|
@tool("audio_tool")
|
|
|
def audio_tool(filename: str) -> Union[str, str]:
|
|
|
"""
|
|
|
Transcribes a local audio file into text using OpenAI's Whisper model.
|
|
|
Use this tool when you need to extract the textual content from an audio file.
|
|
|
Supports common formats such as MP3, MP4, MPEG, MPGA, M4A, WAV, and WEBM.
|
|
|
|
|
|
Args:
|
|
|
filename (str): The filename of the audio file to transcribe.
|
|
|
|
|
|
Returns:
|
|
|
Union[str, str]: The transcribed text if successful, or an error message string in case of failure.
|
|
|
"""
|
|
|
print(f"--- ESECUZIONE DEL TOOL 'transcribe_audio' CON INPUT: file_path='{filename}' ---")
|
|
|
UPLOADS_DIR = "./uploads/"
|
|
|
file_path = os.path.join(UPLOADS_DIR, filename)
|
|
|
client = OpenAI()
|
|
|
|
|
|
|
|
|
if client is None:
|
|
|
return "Error: The OpenAI client is not configured. Please check your API key."
|
|
|
|
|
|
|
|
|
if not os.path.exists(file_path):
|
|
|
return f"Error: The file '{file_path}' was not found. Make sure it has been downloaded first."
|
|
|
|
|
|
try:
|
|
|
|
|
|
with open(file_path, "rb") as audio_file:
|
|
|
transcription = client.audio.transcriptions.create(
|
|
|
model="whisper-1",
|
|
|
file=audio_file
|
|
|
)
|
|
|
|
|
|
print("Trascrizione completata con successo.")
|
|
|
|
|
|
return transcription.text
|
|
|
|
|
|
except APIError as e:
|
|
|
|
|
|
error_message = f"Error from the OpenAI API during transcription: {e}"
|
|
|
print(error_message)
|
|
|
return error_message
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
error_message = f"An unexpected error occurred while transcribing the file '{file_path}': {e}"
|
|
|
print(error_message)
|
|
|
return error_message
|
|
|
|
|
|
|
|
|
@tool("image_tool")
|
|
|
def image_tool(filename: str, user_question: str) -> Union[str, str]:
|
|
|
"""
|
|
|
Reads a local image file and encodes it in base64 format, ready to be analyzed by a multimodal model (such as GPT-4o).
|
|
|
Use this tool to prepare any image (JPG, PNG, WEBP, etc.) before asking questions about its content.
|
|
|
|
|
|
Args:
|
|
|
filename (str): The filename of the image file to prepare.
|
|
|
user_question (str): The user's original question to guide the analysis.
|
|
|
|
|
|
Returns:
|
|
|
Union[str, str]: A textual analysis based on the base64 encoded image data, or an error message string.
|
|
|
"""
|
|
|
print(f"--- ESECUZIONE DEL TOOL 'prepare_image_for_analysis' CON INPUT: file_path='{filename}' ---")
|
|
|
UPLOADS_DIR = "./uploads/"
|
|
|
file_path = os.path.join(UPLOADS_DIR, filename)
|
|
|
client = OpenAI()
|
|
|
|
|
|
|
|
|
if not os.path.exists(file_path):
|
|
|
return f"Error: The image file '{file_path}' was not found."
|
|
|
|
|
|
try:
|
|
|
|
|
|
mime_type, _ = mimetypes.guess_type(file_path)
|
|
|
if not mime_type or not mime_type.startswith('image/'):
|
|
|
return f"Error: The file '{file_path}' is not a supported image format."
|
|
|
|
|
|
|
|
|
with open(file_path, "rb") as image_file:
|
|
|
encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
|
|
|
|
|
|
|
|
|
|
|
|
image_data = f"data:{mime_type};base64,{encoded_string}"
|
|
|
|
|
|
analysis_prompt = [
|
|
|
{
|
|
|
"role": "user",
|
|
|
"content": [
|
|
|
{
|
|
|
"type": "text",
|
|
|
"text": f"""
|
|
|
You are an expert visual analyst. Your task is to describe the provided image in extreme detail to help answer the user's question.
|
|
|
Focus on the elements relevant to the question. Be objective and precise.
|
|
|
|
|
|
**User's Question:** '{user_question}'
|
|
|
|
|
|
Analyze the image and provide a detailed description.
|
|
|
"""
|
|
|
},
|
|
|
{
|
|
|
"type": "image_url",
|
|
|
"image_url": {
|
|
|
"url": image_data,
|
|
|
"detail": "high"
|
|
|
}
|
|
|
}
|
|
|
]
|
|
|
}
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
response = client.chat.completions.create(
|
|
|
model="gpt-4o-mini",
|
|
|
messages=analysis_prompt,
|
|
|
max_tokens=1000,
|
|
|
temperature=0
|
|
|
)
|
|
|
|
|
|
description = response.choices[0].message.content
|
|
|
print("--- Image analysis complete. ---")
|
|
|
return description
|
|
|
except Exception as e:
|
|
|
return f"An error occurred during the visual analysis: {e}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@tool("code_writer_tool")
|
|
|
def code_writer_tool(code: str, task_id: str) -> str:
|
|
|
"""
|
|
|
Writes a string of Python code to a local file. This is the first step
|
|
|
for any task that requires writing and then executing code. The task_id is the name for the file.
|
|
|
|
|
|
Args:
|
|
|
code (str): A string containing the complete, valid Python code to be written to the file.
|
|
|
task_id (str): The name for the file.
|
|
|
|
|
|
Returns:
|
|
|
local_filename (str): The local filename of python file to execute.
|
|
|
"""
|
|
|
print(f"--- TOOL: Writing code to file: {task_id}.py ---")
|
|
|
UPLOADS_DIR = "./uploads/"
|
|
|
local_filename = f"{task_id}.py"
|
|
|
file_path = os.path.join(UPLOADS_DIR, local_filename)
|
|
|
|
|
|
try:
|
|
|
|
|
|
with open(file_path, "w", encoding="utf-8") as f:
|
|
|
f.write(code)
|
|
|
|
|
|
success_message = f"Successfully wrote code to {file_path}."
|
|
|
print(success_message)
|
|
|
|
|
|
|
|
|
return local_filename
|
|
|
except Exception as e:
|
|
|
error_message = f"An error occurred while writing the file: {e}"
|
|
|
print(error_message)
|
|
|
return error_message
|
|
|
|
|
|
|
|
|
@tool("code_tool")
|
|
|
def code_tool(filename: str, timeout_seconds: int = 100) -> Union[str, dict]:
|
|
|
"""
|
|
|
Executes a programming code file in an isolated and secure environment, capturing its standard output and errors.
|
|
|
Use this tool to run programming code when you need to analyze its behavior or output.
|
|
|
|
|
|
Args:
|
|
|
filename (str): The filename of the code file to execute.
|
|
|
timeout_seconds (int, optional): The maximum number of seconds the execution is allowed to run before forcibly terminating the process. Default is 10.
|
|
|
|
|
|
Returns:
|
|
|
Union[str, dict]: A dictionary containing 'stdout', 'stderr', and 'return_code' if successful, or an error message string if the tool itself fails.
|
|
|
"""
|
|
|
print(f"--- ESECUZIONE DEL TOOL 'execute_python_file' SU: {filename} ---")
|
|
|
UPLOADS_DIR = "./uploads/"
|
|
|
file_path = os.path.join(UPLOADS_DIR, filename)
|
|
|
|
|
|
|
|
|
if not os.path.exists(file_path):
|
|
|
return f"Error: The code file '{file_path}' was not found."
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
process = subprocess.run(
|
|
|
['python', file_path],
|
|
|
capture_output=True,
|
|
|
text=True,
|
|
|
timeout=timeout_seconds
|
|
|
)
|
|
|
|
|
|
execution_result = {
|
|
|
"return_code": process.returncode,
|
|
|
"stdout": process.stdout.strip(),
|
|
|
"stderr": process.stderr.strip()
|
|
|
}
|
|
|
|
|
|
|
|
|
return execution_result
|
|
|
|
|
|
except FileNotFoundError:
|
|
|
|
|
|
return "Error: The 'python' interpreter was not found on the system. Unable to execute the code."
|
|
|
except subprocess.TimeoutExpired as e:
|
|
|
|
|
|
return {
|
|
|
"return_code": -1,
|
|
|
"stdout": e.stdout.strip() if e.stdout else "",
|
|
|
"stderr": f"Error: Execution terminated after {timeout_seconds} seconds (Timeout)."
|
|
|
}
|
|
|
except Exception as e:
|
|
|
|
|
|
return f"An unexpected error occurred while executing the tool: {e}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@tool("youtube_info_tool")
|
|
|
def youtube_info_tool(youtube_url: str) -> Union[str, dict]:
|
|
|
"""
|
|
|
Collects information and resources from a YouTube video. Downloads both audio and video, and retrieves the official transcript if available.
|
|
|
This is ALWAYS the first tool to call when working with a YouTube video.
|
|
|
|
|
|
Args:
|
|
|
youtube_url (str): The full URL of the YouTube video.
|
|
|
|
|
|
Returns:
|
|
|
Union[str, dict]: A dictionary with the collected resources (transcript, audio_filename, video_filename) or an error message.
|
|
|
"""
|
|
|
print(f"--- ESECUZIONE DEL TOOL 'get_youtube_video_info' CON URL: {youtube_url} ---")
|
|
|
UPLOADS_DIR = "./uploads/"
|
|
|
|
|
|
try:
|
|
|
yt = YouTube(youtube_url)
|
|
|
video_id = yt.video_id
|
|
|
|
|
|
|
|
|
transcript_text = None
|
|
|
try:
|
|
|
transcript_list = YouTubeTranscriptApi.get_transcript(video_id)
|
|
|
transcript_text = " ".join([d['text'] for d in transcript_list])
|
|
|
print("Trascrizione ufficiale trovata.")
|
|
|
except Exception:
|
|
|
print("Nessuna trascrizione ufficiale disponibile.")
|
|
|
|
|
|
|
|
|
audio_stream = yt.streams.get_audio_only()
|
|
|
audio_path = audio_stream.download(output_path=UPLOADS_DIR, filename=f"{video_id}.m4a")
|
|
|
if transcript_text is None:
|
|
|
transcript_text = audio_tool.invoke({"filename":f"{video_id}.m4a"})
|
|
|
print(f"Audio scaricato in: {audio_path}")
|
|
|
|
|
|
|
|
|
video_stream = yt.streams.get_highest_resolution()
|
|
|
video_path = video_stream.download(output_path=UPLOADS_DIR, filename=f"{video_id}.mp4")
|
|
|
print(f"Video scaricato in: {video_path}")
|
|
|
|
|
|
video_info = {
|
|
|
"title": yt.title,
|
|
|
"description": yt.description,
|
|
|
"transcript": transcript_text,
|
|
|
"audio_filename": f"{video_id}.m4a",
|
|
|
"video_filename": f"{video_id}.mp4"
|
|
|
}
|
|
|
|
|
|
return video_info
|
|
|
except Exception as e:
|
|
|
return f"Error while retrieving information from the YouTube video: {e}"
|
|
|
|
|
|
|
|
|
@tool("youtube_frame_tool")
|
|
|
def youtube_frame_tool(filename: str, title: str, description: str, transcript: str, user_question: str, sample_rate_seconds: int = 5) -> Union[str, str]:
|
|
|
"""
|
|
|
Analyzes video content by combining visual information from frames with the provided transcript to answer a specific user question.
|
|
|
To be used as a last resort, when the transcript and audio are not sufficient, or for purely visual questions.
|
|
|
|
|
|
Args:
|
|
|
filename (str): The filename of the video file.
|
|
|
title (str): The title of the video.
|
|
|
description (str): A brief description of the video.
|
|
|
transcript (str): The full text transcript of the video (either official or from audio).
|
|
|
user_question (str): The user's original question to guide the analysis.
|
|
|
sample_rate_seconds (int): Interval in seconds between frames to analyze. Default is 3.
|
|
|
|
|
|
Returns:
|
|
|
Union[str, str]: A textual analysis based on the video frames or an error message.
|
|
|
"""
|
|
|
print(f"--- ESECUZIONE DEL TOOL 'analyze_video_frames' SU: {filename} ---")
|
|
|
UPLOADS_DIR = "./uploads/"
|
|
|
video_path = os.path.join(UPLOADS_DIR, filename)
|
|
|
client = OpenAI()
|
|
|
|
|
|
if not os.path.exists(video_path):
|
|
|
return f"Error: Video file not found at '{video_path}'."
|
|
|
if client is None:
|
|
|
return "Error: The OpenAI client is not configured."
|
|
|
|
|
|
video = cv2.VideoCapture(video_path)
|
|
|
fps = video.get(cv2.CAP_PROP_FPS)
|
|
|
frame_interval = int(fps * sample_rate_seconds)
|
|
|
|
|
|
base64_frames = []
|
|
|
frame_count = 0
|
|
|
|
|
|
while video.isOpened():
|
|
|
success, frame = video.read()
|
|
|
if not success:
|
|
|
break
|
|
|
|
|
|
if frame_count % frame_interval == 0:
|
|
|
_, buffer = cv2.imencode(".jpg", frame)
|
|
|
base64_frames.append(base64.b64encode(buffer).decode("utf-8"))
|
|
|
|
|
|
frame_count += 1
|
|
|
|
|
|
video.release()
|
|
|
print(f"Campionati {len(base64_frames)} frame dal video.")
|
|
|
|
|
|
if not base64_frames:
|
|
|
return "Error: Unable to extract frames from the video."
|
|
|
|
|
|
prompt_messages = [
|
|
|
{
|
|
|
"role": "user",
|
|
|
"content": [
|
|
|
{
|
|
|
"type": "text",
|
|
|
"text": f"""
|
|
|
You are a video content analyst.
|
|
|
Your task is to answer the user's question by combining information from different sources:
|
|
|
- the title
|
|
|
- the description
|
|
|
- the transcript
|
|
|
- a series of sampled frames
|
|
|
**IMPORTANT**: Analyze the all sources of the video in great detail, because there may be important information to solve the task.
|
|
|
|
|
|
**User Question**: {user_question}
|
|
|
**Video Title**: {title}
|
|
|
**Video Description**: {description}
|
|
|
**Video Transcript**: {transcript if transcript else "No transcript available."}
|
|
|
|
|
|
Begin your rigorous analysis now. Here are the frames:
|
|
|
"""
|
|
|
},
|
|
|
|
|
|
*map(lambda x: {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{x}", "detail": "low"}}, base64_frames),
|
|
|
],
|
|
|
}
|
|
|
]
|
|
|
|
|
|
try:
|
|
|
response = client.chat.completions.create(
|
|
|
model="gpt-4o-mini",
|
|
|
temperature=0,
|
|
|
messages=prompt_messages,
|
|
|
max_tokens=1000,
|
|
|
)
|
|
|
analysis_summary = response.choices[0].message.content
|
|
|
return analysis_summary
|
|
|
except Exception as e:
|
|
|
return f"Error while analyzing frames with the OpenAI API: {e}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@tool("web_search_tool")
|
|
|
def web_search_tool(task: str) -> str:
|
|
|
"""
|
|
|
Delegates complex research tasks to a specialized, cyclic research agent.
|
|
|
Use this for any question that requires external, up-to-date, or detailed knowledge.
|
|
|
"""
|
|
|
print(f"--- MAIN AGENT: DELEGATING RESEARCH FOR: '{task}' ---")
|
|
|
|
|
|
|
|
|
|
|
|
initial_state = {"task": task, "context_summary": ""}
|
|
|
|
|
|
|
|
|
final_state = web_search_graph.invoke(initial_state)
|
|
|
|
|
|
|
|
|
final_answer = final_state["messages"][-1].content
|
|
|
return final_answer
|
|
|
|
|
|
|
|
|
assistant_tools_list = [
|
|
|
sort_tool, download_tool, add_tool, multiply_tool, subtract_tool, divide_tool, modulus_tool, tabular_tool, audio_tool, image_tool, code_writer_tool, code_tool, youtube_info_tool, youtube_frame_tool, web_search_tool
|
|
|
]
|
|
|
|