Spaces:
Sleeping
Sleeping
| from langchain_community.tools.tavily_search import TavilySearchResults | |
| from langchain_community.document_loaders import WikipediaLoader | |
| from langchain_community.document_loaders import ArxivLoader | |
| from langchain_community.vectorstores import SupabaseVectorStore | |
| from langchain_core.messages import SystemMessage, HumanMessage | |
| from langchain_core.tools import tool | |
| from langchain.tools.retriever import create_retriever_tool | |
| from langchain_community.document_loaders import YoutubeLoader | |
| from langchain_community.document_loaders.youtube import TranscriptFormat | |
| from langchain_tavily import TavilySearch | |
| import base64 | |
| from groq import Groq | |
| import os | |
| import pandas as pd | |
| import requests | |
| from io import BytesIO | |
| import sys | |
| import io | |
| import traceback | |
| import requests | |
| import subprocess | |
| import tempfile | |
| import traceback | |
| DEFAULT_File_URL = "https://agents-course-unit4-scoring.hf.space/files/" | |
| def add(a: int, b:int) -> int: | |
| """ add two integer | |
| Args: | |
| a: first integer | |
| b: second integer | |
| """ | |
| print("add") | |
| return a + b | |
| def substract(a: int, b:int) -> int: | |
| """ substract two integer | |
| Args: | |
| a : first integer | |
| b : second integer | |
| """ | |
| print("substract") | |
| return a - b | |
| def multiply(a: int, b: int) -> int: | |
| """multiply two integer | |
| Args: | |
| a: first integer | |
| b: second integer | |
| """ | |
| print ("multiply") | |
| return a * b | |
| def divide(a: int, b: int) -> int: | |
| """ divide two integer | |
| args: | |
| a: first integer | |
| b: second integer | |
| """ | |
| print ("divide") | |
| return a / b | |
| def modulus(a: int, b: int) -> int: | |
| """ | |
| Get the modulus of two numbers. | |
| Args: | |
| a (int): the first number | |
| b (int): the second number | |
| """ | |
| return a % b | |
| def wikiSearch1(searchQuery:str) -> str: | |
| """ search wikipedia to get three matching results | |
| args: | |
| searchQuery: the search query | |
| """ | |
| try: | |
| print("wiki search") | |
| search_results = WikipediaLoader(query=searchQuery, load_max_docs=3).load() | |
| formatted_results = "\n\n--\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page","")}"/>\n{doc.page_content}\n</Document>' | |
| for doc in search_results | |
| ]) | |
| return {"wiki_results": formatted_results} | |
| except Exception as e: | |
| print(f"Error getting wikisearch: {str(e)}") | |
| return f"Error getting wikisearch: {str(e)}" | |
| def wikiSearch(query: str) -> str: | |
| """ | |
| Search Wikipedia and return up to 3 summaries. | |
| Args: | |
| query (str): The Wikipedia search query. | |
| Returns: | |
| str: Formatted summaries of matching pages. | |
| """ | |
| try: | |
| # Load up to 3 results from Wikipedia | |
| docs = WikipediaLoader(query=query, load_max_docs=3).load() | |
| if not docs: | |
| return "No Wikipedia results found." | |
| # Format results | |
| results = [] | |
| for doc in docs: | |
| title = doc.metadata.get("title", "Unknown Title") | |
| source = doc.metadata.get("source", "") | |
| content = doc.page_content.strip() | |
| results.append( | |
| f"Title: {title}\nSource: {source}\n\nSummary:\n{content}" | |
| ) | |
| return "\n\n---\n\n".join(results) | |
| except Exception as e: | |
| return f"Error searching Wikipedia: {str(e)}" | |
| def arxivSearch1(searchQuery:str) -> str: | |
| """ | |
| search arxiv to get three matching results | |
| args: | |
| searchQuery: the search query | |
| """ | |
| try: | |
| print("arxivsearch") | |
| search_results = ArxivLoader(query=searchQuery, load_max_docs=3).load() | |
| formatted_results = "\n\n--\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page","")}"/>\n{doc.page_content}\n</Document>' | |
| for doc in search_results | |
| ]) | |
| return {"arxiv_result": formatted_results} | |
| except Exception as e: | |
| print(f"Error getting arxivsearch: {str(e)}") | |
| return f"Error getting arxivsearch: {str(e)}" | |
| def arxivSearch(query: str) -> str: | |
| """ | |
| Search arXiv and return up to 3 matching paper summaries. | |
| Args: | |
| query (str): The search query for arXiv. | |
| Returns: | |
| str: Formatted search results with source links and abstracts. | |
| """ | |
| try: | |
| # Load results | |
| docs = ArxivLoader(query=query, load_max_docs=3).load() | |
| if not docs: | |
| return "No results found." | |
| # Format the output | |
| results = [] | |
| for doc in docs: | |
| source = doc.metadata.get("source", "") | |
| title = doc.metadata.get("Title", "") | |
| content = doc.page_content.strip() | |
| results.append( | |
| f"Title: {title or 'Unknown'}\nSource: {source}\n\nAbstract:\n{content}" | |
| ) | |
| return "\n\n---\n\n".join(results) | |
| except Exception as e: | |
| return f"Error searching arXiv: {str(e)}" | |
| def webSearch(searchQuery:str) -> str: | |
| """ | |
| search the web using Tavily to get 2 matching results | |
| args: | |
| searchQuery: search query | |
| """ | |
| print("web_search: {searchQuery}") | |
| #search_results = TavilySearch(max_results=3).invoke(input=searchQuery) | |
| #search_results = TavilySearchResults(max_results=2).invoke(input=searchQuery) | |
| tavily = TavilySearchResults(max_results=5, search_depth="advanced", include_raw_content=True) | |
| search_results = tavily.run(searchQuery) | |
| formatted_results = "\n\n--\n\n".join( | |
| [ | |
| f'<Document source="{doc["url"]}" title="{doc["title"]}"/>\n{doc["content"]}\n</Document>' | |
| for doc in search_results | |
| ] | |
| ) | |
| return {"web_search": formatted_results} | |
| def youtubeVideoTranscript(youtubeURL:str) -> str: | |
| """ | |
| Get youtube video transcript by passing in the youtube url | |
| args: | |
| youtubeURL: youtube url to pull out the transcript | |
| """ | |
| try: | |
| print("youtube_transcript") | |
| loader = YoutubeLoader.from_youtube_url( | |
| youtubeURL, | |
| add_video_info=True, | |
| transcript_format=TranscriptFormat.CHUNKS, | |
| chunk_size_seconds=30, | |
| ) | |
| formatted_results = "\n\n".join(map(repr, loader.load())) | |
| return {"Youtube transcript":formatted_results} | |
| except Exception as e: | |
| print(f"Error getting youtube transcript: {str(e)}") | |
| return f"Error getting youtube transcript: {str(e)}" | |
| def power(a: float, b: float) -> float: | |
| """ | |
| Get the power of two numbers. | |
| Args: | |
| a (float): the first number | |
| b (float): the second number | |
| """ | |
| return a**b | |
| def count_substring(substring:str, text:str) -> int: | |
| """ | |
| Get the number of occurences of a substring within some text. Useful for 'How many (substring) are in (text)?' | |
| Args: | |
| substring (str): the substring to check for. | |
| text (str): the text to search through. | |
| """ | |
| return text.count(substring) | |
| def read_image_file(taskID: str) -> dict: | |
| """ | |
| Reads an image file and returns a dict containing base64-encoded image data. | |
| This can be passed to a Groq vision-enabled LLM message. | |
| Args: | |
| taskID (str): the task id of the question. This is use to retrieve the attachments or image | |
| Returns: | |
| dict: {"type": "image_url", "image_url": {"url": "data:image/<ext>;base64,<data>"}} | |
| """ | |
| try: | |
| # 1. Download the image from URL | |
| response = requests.get(DEFAULT_File_URL + taskID) | |
| print(f"image url {DEFAULT_File_URL + taskID}") | |
| response.raise_for_status() # ensure no HTTP error | |
| content_type = response.headers.get("Content-Type", "") | |
| # 2. Convert image bytes to base64 | |
| image_base64 = base64.b64encode(response.content).decode("utf-8") | |
| return { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:{content_type};base64,{image_base64}" | |
| } | |
| } | |
| except Exception as e: | |
| return f"Error transcribing image: {str(e)}" | |
| def transcribe_audio(taskID: str) -> str: | |
| """ | |
| Transcribes an audio file (mp3, wav, m4a, etc.) using Groq Whisper model. | |
| Args: | |
| taskID (str): the task id of the question. This is use to retrieve the attachments or audio | |
| Returns: | |
| str: Transcribed text from the audio. | |
| """ | |
| try: | |
| response = requests.get(DEFAULT_File_URL + taskID) | |
| print(f"audio file {DEFAULT_File_URL + taskID}") | |
| response.raise_for_status() # Throw error if download failed | |
| with open("temp_audio.mp3", "wb") as f: | |
| f.write(response.content) | |
| client = Groq(api_key=os.getenv("GROQ_API_KEY")) | |
| with open("temp_audio.mp3", "rb") as f: | |
| transcription = client.audio.transcriptions.create( | |
| model="whisper-large-v3", | |
| file=f | |
| ) | |
| return transcription.text | |
| except Exception as e: | |
| return f"Error transcribing audio: {str(e)}" | |
| def read_excel_from_url(taskID: str) -> str: | |
| """ | |
| Downloads an Excel file from a URL and returns its contents as text or JSON. | |
| Args: | |
| taskID (str): the task id of the question . This is use to retrieve the attachments or excel | |
| Returns: | |
| str: Contents of the Excel file as text or JSON. | |
| """ | |
| try: | |
| response = requests.get(DEFAULT_File_URL + taskID) | |
| print(f"excel url {DEFAULT_File_URL + taskID}") | |
| response.raise_for_status() | |
| data = BytesIO(response.content) | |
| df = pd.read_excel(data) | |
| return df.to_json(orient="records") | |
| except Exception as e: | |
| return f"Error reading Excel file from URL: {str(e)}" | |
| '''@tool | |
| def run_python_code_from_url(taskID: str) -> str: | |
| """ | |
| Downloads Python code from a URL, executes it, and returns the output or errors. | |
| Args: | |
| taskID (str): the task id of the question . This is use to retrieve the attachments or python code | |
| Returns: | |
| str: Captured output or error traceback. | |
| """ | |
| try: | |
| formattedURL = DEFAULT_File_URL + taskID | |
| response = requests.get(formattedURL) | |
| print(f"pythonurl : {formattedURL}") | |
| response.raise_for_status() | |
| code = response.text | |
| old_stdout = sys.stdout | |
| sys.stdout = mystdout = io.StringIO() | |
| exec_globals = {} | |
| exec(code, exec_globals) | |
| sys.stdout = old_stdout | |
| output = mystdout.getvalue() | |
| if not output.strip(): | |
| output = "Code executed successfully with no output." | |
| return output | |
| except Exception: | |
| sys.stdout = old_stdout | |
| return "Error executing code:\n" + traceback.format_exc() | |
| ''' | |
| def run_python_code_from_url(taskID: str) -> str: | |
| """ | |
| Downloads Python code from a URL, runs it in a separate Python process, | |
| and returns stdout/stderr or errors. | |
| """ | |
| try: | |
| formattedURL = DEFAULT_File_URL + taskID | |
| print(f"pythonurl : {formattedURL}") | |
| # Download the Python code | |
| response = requests.get(formattedURL, timeout=10) | |
| response.raise_for_status() | |
| code = response.text | |
| # Write code to a temporary file | |
| with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as tmp: | |
| tmp.write(code) | |
| tmp_path = tmp.name | |
| # Run the code in a separate process with timeout | |
| result = subprocess.run( | |
| ["python", tmp_path], | |
| capture_output=True, | |
| text=True, | |
| timeout=5 # seconds | |
| ) | |
| # Combine stdout and stderr | |
| output = result.stdout.strip() | |
| error = result.stderr.strip() | |
| if output and error: | |
| return f"Output:\n{output}\n\nErrors:\n{error}" | |
| elif output: | |
| return output | |
| elif error: | |
| return f"Error:\n{error}" | |
| else: | |
| return "Code executed successfully with no output." | |
| except subprocess.TimeoutExpired: | |
| return "Error: Code execution timed out." | |
| except Exception: | |
| return "Error executing code:\n" + traceback.format_exc() | |