|
|
from google import genai |
|
|
from langchain_community.document_loaders import ArxivLoader |
|
|
from langchain_community.document_loaders import WikipediaLoader |
|
|
from langchain_community.vectorstores import SupabaseVectorStore |
|
|
from langchain_core.messages import SystemMessage, HumanMessage |
|
|
from langchain_core.messages import ToolMessage |
|
|
from langchain_core.tools import tool |
|
|
from langchain_tavily import TavilySearch |
|
|
from langchain.tools.retriever import create_retriever_tool |
|
|
from markitdown import MarkItDown |
|
|
from pathlib import Path |
|
|
from typing import Dict |
|
|
from urllib.parse import urlparse |
|
|
import os |
|
|
|
|
|
|
|
|
@tool(parse_docstring=True) |
|
|
def multiply(a: int, b: int) -> int: |
|
|
"""Multiply two numbers. |
|
|
|
|
|
Args: |
|
|
a: first int |
|
|
b: second int |
|
|
""" |
|
|
return a * b |
|
|
|
|
|
|
|
|
@tool(parse_docstring=True) |
|
|
def add(a: int, b: int) -> int: |
|
|
"""Add two numbers. |
|
|
|
|
|
Args: |
|
|
a: first int |
|
|
b: second int |
|
|
""" |
|
|
return a + b |
|
|
|
|
|
|
|
|
@tool(parse_docstring=True) |
|
|
def subtract(a: int, b: int) -> int: |
|
|
"""Subtract two numbers. |
|
|
|
|
|
Args: |
|
|
a: first int |
|
|
b: second int |
|
|
""" |
|
|
return a - b |
|
|
|
|
|
|
|
|
@tool(parse_docstring=True) |
|
|
def divide(a: int, b: int) -> float: |
|
|
"""Divide two numbers. |
|
|
|
|
|
Args: |
|
|
a: first int |
|
|
b: second int |
|
|
""" |
|
|
if b == 0: |
|
|
raise ValueError("Cannot divide by zero.") |
|
|
return a / b |
|
|
|
|
|
|
|
|
@tool(parse_docstring=True) |
|
|
def modulus(a: int, b: int) -> int: |
|
|
"""Get the modulus of two numbers. |
|
|
|
|
|
Args: |
|
|
a: first int |
|
|
b: second int |
|
|
""" |
|
|
return a % b |
|
|
|
|
|
|
|
|
@tool(parse_docstring=True) |
|
|
def wiki_search(query: str) -> Dict[str, list]: |
|
|
"""Search Wikipedia for a query and return maximum 3 results. |
|
|
|
|
|
Args: |
|
|
query: The search query. |
|
|
""" |
|
|
search_docs = WikipediaLoader(query=query, load_max_docs=3).load() |
|
|
formatted_search_docs = "\n\n---\n\n".join( |
|
|
[ |
|
|
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' |
|
|
for doc in search_docs |
|
|
] |
|
|
) |
|
|
return {"wiki_results": formatted_search_docs} |
|
|
|
|
|
|
|
|
@tool(parse_docstring=True) |
|
|
def web_search(query: str) -> ToolMessage: |
|
|
"""Search in the web with Tavily for a query and return maximum 4 results. |
|
|
|
|
|
Args: |
|
|
query: The search query. |
|
|
""" |
|
|
return TavilySearch(max_results=5, include_images=False).invoke({"query": query}) |
|
|
|
|
|
|
|
|
@tool(parse_docstring=True) |
|
|
def arvix_search(query: str) -> Dict[str, list]: |
|
|
"""Search Arxiv for a query and return maximum 3 result. |
|
|
|
|
|
Args: |
|
|
query: The search query. |
|
|
""" |
|
|
search_docs = ArxivLoader(query=query, load_max_docs=3).load() |
|
|
formatted_search_docs = "\n\n---\n\n".join( |
|
|
[ |
|
|
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>' |
|
|
for doc in search_docs |
|
|
] |
|
|
) |
|
|
return {"arvix_results": formatted_search_docs} |
|
|
|
|
|
|
|
|
@tool(parse_docstring=True) |
|
|
def read_video(url: str) -> Dict[str, any]: |
|
|
"""Search a youtube video given its `url` and returns its metadata and transcription. |
|
|
|
|
|
Args: |
|
|
url: Video url direction. |
|
|
""" |
|
|
|
|
|
|
|
|
parsed_url = urlparse(url) |
|
|
if not all([parsed_url.scheme, parsed_url.netloc]): |
|
|
raise ValueError( |
|
|
"Please provide a valid video URL with http:// or https:// prefix." |
|
|
) |
|
|
|
|
|
|
|
|
if "youtube.com" not in url and "youtu.be" not in url: |
|
|
raise ValueError("Only YouTube videos are supported at this time.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
md = MarkItDown(enable_plugins=True) |
|
|
result = md.convert(url) |
|
|
return result.text_content |
|
|
except Exception as err: |
|
|
raise type(err)(f"Could not obtain video information > {err}") |
|
|
|
|
|
|
|
|
@tool(parse_docstring=True) |
|
|
def read_csv_file(file_path: str) -> str: |
|
|
""" |
|
|
Reads and parses a CSV file to markdown. |
|
|
|
|
|
Args: |
|
|
file_path: Path to the CSV file |
|
|
""" |
|
|
try: |
|
|
|
|
|
validate_file_path(file_path) |
|
|
validate_file_ext(file_path, ".csv") |
|
|
md = MarkItDown(enable_plugins=True) |
|
|
result = md.convert(file_path) |
|
|
return result.text_content |
|
|
except Exception as err: |
|
|
raise type(err)(f"Could not parse csv file > {err}") |
|
|
|
|
|
|
|
|
@tool(parse_docstring=True) |
|
|
def read_excel_file(file_path: str) -> str: |
|
|
""" |
|
|
Reads and parses an Excel file to markdown. |
|
|
|
|
|
Args: |
|
|
file_path: Path to the Excel file |
|
|
""" |
|
|
|
|
|
try: |
|
|
|
|
|
validate_file_path(file_path) |
|
|
validate_file_ext(file_path, ".xlx", ".xlsx") |
|
|
md = MarkItDown(enable_plugins=True) |
|
|
result = md.convert(file_path) |
|
|
return result.text_content |
|
|
except Exception as err: |
|
|
raise type(err)(f"Could not parse excel file > {err}") |
|
|
|
|
|
|
|
|
@tool(parse_docstring=True) |
|
|
def read_python_file(file_path: str) -> str: |
|
|
""" |
|
|
Reads and parses an Python file to markdown. |
|
|
|
|
|
Args: |
|
|
file_path: Path to the Python file |
|
|
""" |
|
|
|
|
|
try: |
|
|
|
|
|
validate_file_path(file_path) |
|
|
validate_file_ext(file_path, ".py") |
|
|
md = MarkItDown(enable_plugins=True) |
|
|
result = md.convert(file_path) |
|
|
return result.text_content |
|
|
except Exception as err: |
|
|
raise type(err)(f"Could not parse python file > {err}") |
|
|
|
|
|
|
|
|
DEFAULT_DESCRIPTION_GOOGLE_MODEL = "gemini-2.0-flash" |
|
|
|
|
|
|
|
|
@tool(parse_docstring=True) |
|
|
def describe_image_file(file_path: str, query: str = "") -> str: |
|
|
""" |
|
|
Reads an image file and describes it accordingly to an optional query. |
|
|
|
|
|
Args: |
|
|
file_path: Path to the image file |
|
|
query: Otional query to generate an expected image description |
|
|
""" |
|
|
api_key = os.environ.get("GOOGLE_API_KEY", "") |
|
|
model_name = os.environ.get( |
|
|
"DESCRIPTION_GOOGLE_MODEL", DEFAULT_DESCRIPTION_GOOGLE_MODEL |
|
|
) |
|
|
|
|
|
if api_key == "": |
|
|
raise EnvironmentError( |
|
|
"GOOGLE API KEY not present in environment, please do provide one." |
|
|
) |
|
|
if query == "": |
|
|
query = "Caption this image, do not ommit important detail as number of subjects, or time of day." |
|
|
|
|
|
try: |
|
|
validate_file_path(file_path) |
|
|
validate_file_ext(file_path, ".png", ".jpg", ".jpge") |
|
|
client = genai.Client(api_key=api_key) |
|
|
|
|
|
file = client.files.upload(file=file_path) |
|
|
|
|
|
response = client.models.generate_content( |
|
|
model=model_name, contents=[file, query] |
|
|
) |
|
|
return response.text |
|
|
except Exception as err: |
|
|
raise type(err)(f"Could not generate an image description > {err}") |
|
|
|
|
|
|
|
|
@tool(parse_docstring=True) |
|
|
def describe_audio_file(file_path: str, query: str = "") -> str: |
|
|
""" |
|
|
Reads an audio file and describes it accordingly to an optional query. |
|
|
|
|
|
Args: |
|
|
file_path: Path to the audio file |
|
|
query: Otional query to generate an expected image description |
|
|
""" |
|
|
api_key = os.environ.get("GOOGLE_API_KEY", "") |
|
|
model_name = os.environ.get( |
|
|
"DESCRIPTION_GOOGLE_MODEL", DEFAULT_DESCRIPTION_GOOGLE_MODEL |
|
|
) |
|
|
|
|
|
if api_key == "": |
|
|
raise EnvironmentError( |
|
|
"GOOGLE API KEY not present in environment, please do provide one." |
|
|
) |
|
|
if query == "": |
|
|
query = "Transcribe speech present in audio, if more than one speaker is detected use a notation of [speaker_n] where n would be different per each speaker." |
|
|
try: |
|
|
validate_file_path(file_path) |
|
|
validate_file_ext( |
|
|
file_path, |
|
|
".mp3", |
|
|
) |
|
|
client = genai.Client(api_key=api_key) |
|
|
|
|
|
file = client.files.upload(file=file_path) |
|
|
|
|
|
response = client.models.generate_content( |
|
|
model=model_name, contents=[file, query] |
|
|
) |
|
|
return response.text |
|
|
except Exception as err: |
|
|
raise type(err)(f"Could not generate an audio description > {err}") |
|
|
|
|
|
|
|
|
@tool(parse_docstring=True) |
|
|
def describe_video_file(file_path: str, query: str = "") -> str: |
|
|
""" |
|
|
Reads an video from a file and describes it accordingly to an optional query. |
|
|
|
|
|
Args: |
|
|
file_path: Path to the Video file |
|
|
query: Otional query to generate an expected image description |
|
|
""" |
|
|
api_key = os.environ.get("GOOGLE_API_KEY", "") |
|
|
model_name = os.environ.get( |
|
|
"DESCRIPTION_GOOGLE_MODEL", DEFAULT_DESCRIPTION_GOOGLE_MODEL |
|
|
) |
|
|
|
|
|
if api_key == "": |
|
|
raise EnvironmentError( |
|
|
"GOOGLE API KEY not present in environment, please do provide one." |
|
|
) |
|
|
if query == "": |
|
|
query = "Transcribe the audio from this video, giving timestamps for salient events in the video. Also provide visual descriptions." |
|
|
try: |
|
|
validate_file_path(file_path) |
|
|
validate_file_ext(file_path, ".mp4", ".mpeg", ".avi") |
|
|
client = genai.Client(api_key=api_key) |
|
|
|
|
|
file = client.files.upload(file=file_path) |
|
|
client = genai.Client(api_key=api_key) |
|
|
|
|
|
response = client.models.generate_content( |
|
|
model=model_name, contents=[file, query] |
|
|
) |
|
|
return response.text |
|
|
except Exception as err: |
|
|
raise type(err)(f"Could not generate an audio description > {err}") |
|
|
|
|
|
|
|
|
@tool(parse_docstring=True) |
|
|
def describe_youtube_video(video_url: str, query: str = "") -> str: |
|
|
""" |
|
|
Reads an video from a youtube and describes it accordingly to an optional query. |
|
|
|
|
|
Args: |
|
|
video_url: URL to youtube video |
|
|
query: Otional query to generate an expected image description |
|
|
""" |
|
|
api_key = os.environ.get("GOOGLE_API_KEY", "") |
|
|
model_name = os.environ.get( |
|
|
"DESCRIPTION_GOOGLE_MODEL", DEFAULT_DESCRIPTION_GOOGLE_MODEL |
|
|
) |
|
|
|
|
|
if api_key == "": |
|
|
raise EnvironmentError( |
|
|
"GOOGLE API KEY not present in environment, please do provide one." |
|
|
) |
|
|
if query == "": |
|
|
query = "Transcribe the audio from this video, giving timestamps for salient events in the video. Also provide visual descriptions." |
|
|
try: |
|
|
validate_url(video_url, "youtube.com", "youtu.be") |
|
|
client = genai.Client(api_key=api_key) |
|
|
|
|
|
video_part = genai.types.Part( |
|
|
file_data=genai.types.FileData(file_uri=video_url) |
|
|
) |
|
|
prompt_part = genai.types.Part(text=query) |
|
|
|
|
|
response = client.models.generate_content( |
|
|
model=model_name, |
|
|
contents=genai.types.Content(parts=[video_part, prompt_part]), |
|
|
) |
|
|
return response.text |
|
|
except Exception as err: |
|
|
raise type(err)(f"Could not generate an audio description > {err}") |
|
|
|
|
|
|
|
|
def validate_url(url: str, *site: str): |
|
|
|
|
|
parsed_url = urlparse(url) |
|
|
if not all([parsed_url.scheme, parsed_url.netloc]): |
|
|
raise ValueError( |
|
|
"Please provide a valid video URL with http:// or https:// prefix." |
|
|
) |
|
|
|
|
|
|
|
|
if not any(site_ in url for site_ in site): |
|
|
raise ValueError( |
|
|
f"URL ({url}) is not one of supported sites ({' ,'.join(site_ for site_ in site)})." |
|
|
) |
|
|
|
|
|
|
|
|
def validate_file_path(file_path: str): |
|
|
path = Path(file_path) |
|
|
assert path.exists() |
|
|
|
|
|
|
|
|
def validate_file_ext(file_path: str, *extension: str): |
|
|
path = Path(file_path) |
|
|
extensions = set(ext for ext in extension) |
|
|
assert ( |
|
|
path.suffix in extensions |
|
|
), f"File extension {path.suffix} is not valid ({extensions})" |
|
|
|
|
|
|
|
|
basic_tools = [ |
|
|
multiply, |
|
|
add, |
|
|
subtract, |
|
|
divide, |
|
|
modulus, |
|
|
wiki_search, |
|
|
web_search, |
|
|
arvix_search, |
|
|
|
|
|
read_csv_file, |
|
|
read_excel_file, |
|
|
read_python_file, |
|
|
describe_image_file, |
|
|
describe_audio_file, |
|
|
describe_video_file, |
|
|
describe_youtube_video, |
|
|
] |
|
|
|