| | import os |
| | import subprocess |
| |
|
| | import mimetypes |
| | from google.cloud import storage |
| | from typing import Literal |
| | import requests |
| | import re |
| | from markdownify import markdownify |
| | from requests.exceptions import RequestException |
| | from langchain_core.tools import convert_runnable_to_tool |
| | from smolagents.utils import truncate_content |
| | from langchain_core.runnables import RunnableLambda |
| |
|
| | from pytubefix import YouTube |
| | from pytubefix.cli import on_progress |
| |
|
| | from langchain_core.tools import tool |
| | from langchain_core.prompts import ChatPromptTemplate |
| | from langchain_core.output_parsers import StrOutputParser |
| | from langchain_google_vertexai import ChatVertexAI |
| | from langchain.agents import Tool |
| | from langchain_experimental.tools import PythonREPLTool |
| | from langchain_community.tools import WikipediaQueryRun |
| | from langchain_community.utilities import WikipediaAPIWrapper |
| | from langchain_community.utilities import GoogleSerperAPIWrapper |
| |
|
| | from system_prompts import SYSTEM_PROMPT_VIDEO, SYSTEM_PROMPT_AUDIO, SYSTEM_PROMPT_IMAGE |
| |
|
| | llm_flash = ChatVertexAI(model="gemini-2.5-flash") |
| |
|
| | |
| | _EXTRA_MIME = { |
| | ".mp3": "audio/mpeg", |
| | ".mp4": "video/mp4", |
| | } |
| | mimetypes.add_type("audio/mpeg", ".mp3") |
| | mimetypes.add_type("video/mp4", ".mp4") |
| |
|
| | def upload_file_to_bucket( |
| | local_path: str, |
| | bucket_name: str = os.getenv("GCP_BUCKET_NAME"), |
| | ) -> str: |
| | """ |
| | Sube cualquier fichero a Cloud Storage y devuelve su URI gs://. |
| | • Detecta automáticamente el MIME según la extensión. |
| | • Admite sobrescribir `object_name` para cambiar la ruta en el bucket. |
| | • Aplica precondición `if_generation_match=0` (subida segura: falla si ya existe). |
| | """ |
| | if not os.path.isfile(local_path): |
| | raise FileNotFoundError(f"No existe: {local_path}") |
| |
|
| | |
| | _, ext = os.path.splitext(local_path) |
| | ext = ext.lower() |
| | object_name = f"data{ext}" |
| |
|
| | |
| | file_type, _ = mimetypes.guess_type(local_path) |
| | if not file_type and ext in _EXTRA_MIME: |
| | file_type = _EXTRA_MIME[ext] |
| | if not file_type: |
| | raise ValueError(f"No se pudo inferir MIME para «{ext}»") |
| |
|
| | |
| | client = storage.Client() |
| | bucket = client.bucket(bucket_name) |
| | blob = bucket.blob(object_name) |
| |
|
| | blob.upload_from_filename( |
| | local_path, |
| | content_type=file_type, |
| | ) |
| |
|
| | gs_uri = f"gs://{bucket_name}/{object_name}" |
| | print(f"✅ Subido → {gs_uri} ({file_type})") |
| | return gs_uri |
| |
|
| |
|
| | def download_youtube_video(url: str, mode: Literal["video", "audio"]) -> str: |
| | """ |
| | Downloads a YouTube video or audio file based on the specified mode. |
| | |
| | Args: |
| | url (str): The URL of the YouTube video to download. |
| | mode (Literal["audio", "video"]): The download mode. Use "audio" to download the audio track as an .mp3 file, |
| | or "video" to download the full video as an .mp4 file. |
| | |
| | Returns: |
| | Tuple[str, str]: |
| | A two-element tuple *(local_path, gcp_path)* where |
| | |
| | * **local_path** is the absolute path of the file saved on disk. |
| | * **gcp_path** is the `gs://bucket/object` URI (or signed HTTPS |
| | URL) of the file uploaded to Google Cloud Storage. |
| | |
| | Raises: |
| | ValueError: If the mode is not "audio" or "video". |
| | Exception: If an error occurs during the download process. |
| | """ |
| | if mode not in ["audio", "video"]: |
| | raise ValueError("'Mode' argument is not valid! It should be audio or video.") |
| | |
| | data_folder = "data/" |
| | yt = YouTube(url, on_progress_callback=on_progress) |
| |
|
| | if mode == "video": |
| | ys = yt.streams.get_highest_resolution() |
| | tmp_path = ys.download(output_path=data_folder) |
| | base, _ = os.path.splitext(tmp_path) |
| | mp4_path = f"{base}.mp4" |
| | |
| | mp4_files = [ |
| | f for f in os.listdir(data_folder) |
| | if f.lower().endswith(".mp4") |
| | ] |
| | |
| | path_filename = mp4_path |
| | uri_path = upload_file_to_bucket(path_filename) |
| |
|
| | elif mode == "audio": |
| | audio = yt.streams.filter(only_audio=True).first() |
| | tmp_path = audio.download(output_path=data_folder) |
| | base, _ = os.path.splitext(tmp_path) |
| | mp3_path = f"{base}.mp3" |
| | |
| | |
| | subprocess.run( |
| | [ |
| | "ffmpeg", "-y", |
| | "-i", tmp_path, |
| | "-vn", |
| | "-ar", "44100", |
| | "-ab", "192k", |
| | "-loglevel", "error", |
| | mp3_path, |
| | ], |
| | check=True, |
| | ) |
| | |
| | os.remove(tmp_path) |
| | path_filename = os.path.abspath(mp3_path) |
| | uri_path = upload_file_to_bucket(path_filename) |
| |
|
| | return path_filename, uri_path |
| |
|
| | @tool |
| | def query_video(gcp_uri: str, query: str) -> str: |
| | """Analyzes a video file from a Google Cloud Storage (GCS) URI to answer a specific question about its visual content. |
| | |
| | This tool is the correct choice for any task that requires understanding or describing |
| | events, objects, or actions within a video. The video must be accessible via a GCS URI. |
| | |
| | Args: |
| | gcp_uri (str): The full Google Cloud Storage URI for the video file. |
| | It MUST be a .mp4 file and the URI MUST start with 'gs://'. |
| | query (str): A clear, specific question about the video's content. |
| | For example: 'What is the maximum number of birds on screen at the same time?' |
| | or 'What color is the car that appears at the 15-second mark?'. |
| | |
| | Returns: |
| | str: A string containing the answer to the query based on the video analysis. |
| | """ |
| | |
| | _, file_extension = os.path.splitext(gcp_uri) |
| | if file_extension.lower() != '.mp4': |
| | return "Error: The video cannot be processed because it is not a .mp4 file. The gcp_uri must point to a .mp4 file." |
| | |
| | |
| | |
| | chat_prompt = ChatPromptTemplate.from_messages([ |
| | ("system", SYSTEM_PROMPT_VIDEO), |
| | ("human", [ |
| | "{query}", |
| | { |
| | "type": "media", |
| | "file_uri": "{video_uri}", |
| | "mime_type": "video/mp4" |
| | } |
| | ]), |
| | ]) |
| |
|
| | |
| | chain = chat_prompt | llm_flash | StrOutputParser() |
| |
|
| | |
| | result = chain.invoke({ |
| | "query": query, |
| | "video_uri": gcp_uri |
| | }) |
| |
|
| | return result |
| |
|
| | @tool |
| | def query_audio(gcp_uri: str, query: str) -> str: |
| | """Analyzes an audio file from a Google Cloud Storage (GCS) URI to answer a specific question about its content. |
| | |
| | This tool is ideal for tasks like transcription, speaker identification, sound analysis, |
| | or answering questions about speech or music within an audio file. |
| | |
| | Args: |
| | gcp_uri (str): The full Google Cloud Storage URI for the audio file. |
| | It MUST be a .mp3 file and the URI MUST start with 'gs://'. |
| | query (str): A clear, specific question about the audio's content. |
| | For example: 'Transcribe the speech in this audio,' 'Is the speaker male or female?' |
| | or 'What song is playing in the background?'. |
| | |
| | Returns: |
| | str: A string containing the answer to the query based on the audio analysis. |
| | """ |
| | |
| | _, file_extension = os.path.splitext(gcp_uri) |
| | if file_extension.lower() != '.mp3': |
| | return "Error: The audio cannot be processed because it is not a .mp3 file. The gcp_uri must point to a .mp3 file." |
| | |
| | chat_prompt = ChatPromptTemplate.from_messages([ |
| | ("system", SYSTEM_PROMPT_AUDIO), |
| | ("human", [ |
| | "{query}", |
| | { |
| | "type": "media", |
| | "file_uri": "{audio_uri}", |
| | "mime_type": "audio/mpeg" |
| | } |
| | ]), |
| | ]) |
| |
|
| | |
| | chain = chat_prompt | llm_flash | StrOutputParser() |
| |
|
| | result = chain.invoke({ |
| | "query": query, |
| | "audio_uri": gcp_uri |
| | }) |
| |
|
| | return result |
| |
|
| | @tool |
| | def query_image(gcp_uri: str, query: str) -> str: |
| | """Analyzes an image file from a Google Cloud Storage (GCS) URI to answer a question about its visual content. |
| | |
| | This tool is ideal for tasks like reading text from an image (OCR), identifying objects, |
| | describing a scene, or answering any question based on the visual information in a static image. |
| | |
| | Args: |
| | gcp_uri (str): The full Google Cloud Storage URI for the image file. |
| | It MUST be a .png file and the URI MUST start with 'gs://'. |
| | query (str): A clear, specific question about the image's content. |
| | For example: 'What text is written on the street sign?', |
| | 'How many people are in this picture?', or 'Describe the main activity in this image.' |
| | |
| | Returns: |
| | str: A string containing the answer to the query based on the image's content. |
| | """ |
| | |
| | _, file_extension = os.path.splitext(gcp_uri) |
| | if file_extension.lower() != '.png': |
| | return "Error: The image cannot be processed because it is not a .png file. The gcp_uri must point to a .png file." |
| |
|
| | |
| | chat_prompt = ChatPromptTemplate.from_messages([ |
| | ("system", SYSTEM_PROMPT_IMAGE), |
| | ("human", [ |
| | "{query}", |
| | { |
| | "type": "image_url", |
| | "image_url": {"url": "{gcp_uri}"} |
| | } |
| | ]), |
| | ]) |
| |
|
| | |
| | chain = chat_prompt | llm_flash | StrOutputParser() |
| |
|
| | result = chain.invoke({ |
| | "query": query, |
| | "gcp_uri": gcp_uri |
| | }) |
| |
|
| | return result |
| |
|
| | def visit_webpage(url: str) -> str: |
| | try: |
| | |
| | response = requests.get(url, timeout=20) |
| | response.raise_for_status() |
| |
|
| | |
| | markdown_content = markdownify(response.text).strip() |
| |
|
| | |
| | markdown_content = re.sub(r"\n{3,}", "\n\n", markdown_content) |
| |
|
| | return truncate_content(markdown_content, 10000) |
| |
|
| | except requests.exceptions.Timeout: |
| | return "The request timed out. Please try again later or check the URL." |
| | except RequestException as e: |
| | return f"Error fetching the webpage: {str(e)}" |
| | except Exception as e: |
| | return f"An unexpected error occurred: {str(e)}" |
| |
|
| | visit_webpage_with_retry = RunnableLambda(visit_webpage).with_retry( |
| | wait_exponential_jitter=True, |
| | stop_after_attempt=3, |
| | ) |
| |
|
| | visit_webpage_tool = convert_runnable_to_tool( |
| | visit_webpage_with_retry, |
| | name="visit_webpage", |
| | description=( |
| | "Visits a webpage at the given url and reads its content as a markdown string. Use this to browse webpages." |
| | ), |
| | arg_types={"url": "str"}, |
| | ) |
| |
|
| | python_tool = PythonREPLTool() |
| |
|
| | search = GoogleSerperAPIWrapper() |
| | search_tool = Tool(name="web_search", func=search.run, description="useful for when you need to ask with search on the internet") |
| |
|
| | wikipedia = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper()) |
| | wikipedia_tool = Tool(name="wikipedia_search", func=wikipedia.run, description="useful for when you need to ask with search on Wikipedia") |
| |
|
| | def get_tools(): |
| | visit_webpage_with_retry = RunnableLambda(visit_webpage).with_retry( |
| | wait_exponential_jitter=True, |
| | stop_after_attempt=3, |
| | ) |
| |
|
| | visit_webpage_tool = convert_runnable_to_tool( |
| | visit_webpage_with_retry, |
| | name="visit_webpage", |
| | description=( |
| | "Visits a webpage at the given url and reads its content as a markdown string. Use this to browse webpages." |
| | ), |
| | arg_types={"url": "str"}, |
| | ) |
| |
|
| | python_tool = PythonREPLTool() |
| |
|
| | search = GoogleSerperAPIWrapper() |
| | search_tool = Tool(name="web_search", func=search.run, description="useful for when you need to ask with search on the internet") |
| |
|
| | wikipedia = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper()) |
| | wikipedia_tool = Tool(name="wikipedia_search", func=wikipedia.run, description="useful for when you need to ask with search on Wikipedia") |
| |
|
| | tools = [python_tool, search_tool, wikipedia_tool, visit_webpage_tool, query_video, query_image, query_audio] |
| |
|
| | return tools |