Spaces:
Sleeping
Sleeping
| import io | |
| import os | |
| import re | |
| import sys | |
| from typing import List, Callable, Any | |
| import openai | |
| import pandas as pd | |
| import requests | |
| from dotenv import load_dotenv | |
| from google import genai | |
| from google.genai import types | |
| from langchain_community.document_loaders import WebBaseLoader, ImageCaptionLoader, WikipediaLoader, ArxivLoader | |
| from langchain_community.tools import DuckDuckGoSearchResults | |
| from langchain_core.tools import tool | |
| from langchain_text_splitters import CharacterTextSplitter | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| load_dotenv() | |
| def multiply(a: int, b: int) -> int: | |
| return a * b | |
| def add(a: int, b: int) -> int: | |
| return a + b | |
| def subtract(a: int, b: int) -> int: | |
| return a - b | |
| def divide(a: int, b: int) -> float: | |
| if b == 0: | |
| raise ValueError("Cannot divide by zero.") | |
| return a / b | |
| def modulus(a: int, b: int) -> int: | |
| return a % b | |
| def wiki_search(query: str) -> str: | |
| print("wiki_search called with:", query) | |
| search_docs = WikipediaLoader(query=query, load_max_docs=2).load() | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
| for doc in search_docs | |
| ]) | |
| return formatted_search_docs | |
| def visit_webpage(url: str) -> str: | |
| try: | |
| response = requests.get(url, timeout=5) | |
| return response.text[:5000] | |
| except Exception as e: | |
| return f"[ERROR fetching {url}]: {str(e)}" | |
| def duckduck_websearch(query: str) -> str: | |
| search_engine = DuckDuckGoSearchResults(output_format="list", num_results=2) | |
| results = search_engine.invoke({"query": query}) | |
| page_urls = [url["link"] for url in results] | |
| loader = WebBaseLoader(web_paths=page_urls) | |
| docs = loader.load() | |
| combined_text = "\n\n".join(doc.page_content[:15000] for doc in docs) | |
| # Clean up excessive newlines, spaces and strip leading/trailing whitespace | |
| cleaned_text = re.sub(r'\n{3,}', '\n\n', combined_text).strip() | |
| cleaned_text = re.sub(r'[ \t]{6,}', ' ', cleaned_text) | |
| # Strip leading/trailing whitespace | |
| cleaned_text = cleaned_text.strip() | |
| return cleaned_text | |
| def text_splitter(text: str) -> List[str]: | |
| splitter = CharacterTextSplitter(chunk_size=450, chunk_overlap=10) | |
| return splitter.split_text(text) | |
| def read_file(task_id: str) -> str: | |
| file_url = f'{DEFAULT_API_URL}/files/{task_id}' | |
| r = requests.get(file_url, timeout=15, allow_redirects=True) | |
| with open('temp', "wb") as fp: | |
| fp.write(r.content) | |
| with open('temp') as f: | |
| return f.read() | |
| def excel_read(task_id: str) -> str: | |
| try: | |
| file_url = f'{DEFAULT_API_URL}/files/{task_id}' | |
| r = requests.get(file_url, timeout=15, allow_redirects=True) | |
| with open('temp.xlsx', "wb") as fp: | |
| fp.write(r.content) | |
| # Read the Excel file | |
| df = pd.read_excel('temp.xlsx') | |
| # Run various analyses based on the query | |
| result = ( | |
| f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n" | |
| ) | |
| result += f"Columns: {', '.join(df.columns)}\n\n" | |
| # Add summary statistics | |
| result += "Summary statistics:\n" | |
| result += str(df.describe()) | |
| return result | |
| except Exception as e: | |
| return f"Error analyzing Excel file: {str(e)}" | |
| def csv_read(task_id: str) -> str: | |
| try: | |
| file_url = f'{DEFAULT_API_URL}/files/{task_id}' | |
| r = requests.get(file_url, timeout=15, allow_redirects=True) | |
| with open('temp.csv', "wb") as fp: | |
| fp.write(r.content) | |
| # Read the CSV file | |
| df = pd.read_csv('temp.csv') | |
| # Run various analyses based on the query | |
| result = ( | |
| f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n" | |
| ) | |
| result += f"Columns: {', '.join(df.columns)}\n\n" | |
| # Add summary statistics | |
| result += "Summary statistics:\n" | |
| result += str(df.describe()) | |
| return result | |
| except Exception as e: | |
| return f"Error analyzing CSV file: {str(e)}" | |
| def image_caption(task_id: str) -> str: | |
| file_url = f'{DEFAULT_API_URL}/files/{task_id}' | |
| loader = ImageCaptionLoader(images=[file_url]) | |
| metadata = loader.load() | |
| return metadata[0].page_content | |
| def youtube_search(youtube_url: str, question: str) -> str: | |
| client = genai.Client(api_key=os.getenv("GOOGLE_API_KEY")) | |
| response = client.models.generate_content( | |
| model='models/gemini-2.5-flash', | |
| contents=types.Content( | |
| parts=[ | |
| types.Part( | |
| file_data=types.FileData(file_uri=youtube_url) | |
| ), | |
| types.Part(text=question) | |
| ] | |
| ) | |
| ) | |
| return response.text | |
| def arvix_search(query: str) -> str: | |
| search_docs = ArxivLoader(query=query, load_max_docs=3).load() | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>' | |
| for doc in search_docs | |
| ]) | |
| return formatted_search_docs | |
| def whisper_transcribe_api(task_id: str) -> str: | |
| openai.api_key = os.getenv("OPENAI_API_KEY") | |
| file_url = f'{DEFAULT_API_URL}/files/{task_id}' | |
| try: | |
| r = requests.get(file_url, timeout=15, allow_redirects=True) | |
| temp_path = 'temp.mp3' | |
| with open(temp_path, "wb") as fp: | |
| fp.write(r.content) | |
| with open(temp_path, "rb") as audio_file: | |
| transcript = openai.audio.transcriptions.create( | |
| file=audio_file, | |
| model="whisper-1" | |
| ) | |
| return transcript.text | |
| except Exception as e: | |
| return f"Error transcribing audio: {e}" | |
| def run_python_file(task_id: str, file_name: str) -> str: | |
| file_path = file_name | |
| buffer = io.StringIO() | |
| old_stdout = sys.stdout | |
| ns = {"__builtins__": __builtins__, "__name__": "__main__"} | |
| try: | |
| file_url = f"{DEFAULT_API_URL}/files/{task_id}" | |
| r = requests.get(file_url, timeout=15, allow_redirects=True) | |
| if r.status_code != 200: | |
| return f"❌ Failed to download file: {r.status_code}" | |
| with open(file_path, "wb") as f: | |
| f.write(r.content) | |
| with open(file_path, "r", encoding="utf-8", errors="replace") as f: | |
| code = f.read() | |
| sys.stdout = buffer | |
| try: | |
| compiled = compile(code, file_path, "exec") | |
| exec(compiled, ns, ns) | |
| finally: | |
| sys.stdout = old_stdout | |
| if "result" in ns: | |
| return str(ns["result"]) | |
| else: | |
| output = buffer.getvalue().strip() | |
| return output or "No output produced." | |
| except Exception as e: | |
| # Prefer returning a computed result or any partial stdout if available | |
| try: | |
| sys.stdout = old_stdout | |
| except Exception: | |
| pass | |
| if "result" in ns: | |
| return str(ns["result"]) | |
| output = buffer.getvalue().strip() | |
| if output: | |
| return output | |
| return f"❌ Error executing Python file: {e}" | |
| finally: | |
| # Ensure the downloaded code file is removed after execution | |
| try: | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| except Exception: | |
| pass | |
| TOOLS: List[Callable[..., Any]] = [ | |
| multiply, | |
| add, | |
| subtract, | |
| divide, | |
| modulus, | |
| duckduck_websearch, | |
| arvix_search, | |
| wiki_search, | |
| visit_webpage, | |
| youtube_search, | |
| text_splitter, | |
| read_file, | |
| excel_read, | |
| csv_read, | |
| image_caption, | |
| whisper_transcribe_api, | |
| run_python_file | |
| ] |