import os import glob import requests import re import pandas as pd from smolagents import tool # --- Constants --- DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" api_url = DEFAULT_API_URL questions_url = f"{api_url}/questions" files_url = f"{api_url}/files/" @tool def download_and_read_excel_file(task_id: str) -> pd.DataFrame: """Downloads an Excel file for a given task_id and reads it into a pandas DataFrame. Args: task_id: The task_id for which the file should be downloaded. Returns: A pandas DataFrame containing the content of the Excel file or None if the download failed. """ filename = download_file(task_id=task_id, output_dir='attachments') if filename is None: print(f"Failed to download file for task_id: {task_id}") return None file_path = os.path.join('attachments', filename) try: # Read the Excel file into a pandas DataFrame df = pd.read_excel(file_path, engine='openpyxl') print(f"Successfully read Excel file: {file_path}") return df except Exception as e: print(f"Error reading Excel file {file_path}: {e}") return None @tool def text_file_tool(filename: str) -> str: """Fetch the content of a text file based on its filename. Args: filename: The filename of the text file to fetch.""" # search in the attachment folder for a file with the same task_id and with any extension: file_path = os.path.join('attachments', f"{filename}") #open the file try: with open(file_path, 'r', encoding='utf-8') as file: content = file.read() print(f"Successfully read file: {file_path}") return content except FileNotFoundError: print(f"File {file_path} not found.") return f"File {file_path} not found." @tool def download_file( output_dir:str,task_id:str) -> str: """Downloads a file for a given task_id and saves it to the specified output directory. Args: output_dir: The directory where the file should be saved. task_id: The task_id for which the file should be downloaded. Returns: The filename of the downloaded file or None if the download failed. """ try: response = requests.get(files_url+task_id, timeout=15) response.raise_for_status() # It's good practice to try and get the filename from headers if available # The API documentation doesn't explicitly state Content-Disposition, # but it's a common practice. If not present, you'd need a default name. filename = None if 'content-disposition' in response.headers: # Example: Content-Disposition: attachment; filename="my_file.txt" cd = response.headers['content-disposition'] filename_match = re.search(r'filename="([^"]+)"', cd) if filename_match: filename = filename_match.group(1) if not filename: # Fallback filename if not provided by the API (e.g., using task_id and a common extension) # You might need to infer the correct extension based on the content-type or typical files content_type = response.headers.get('content-type', 'application/octet-stream') if 'json' in content_type: filename = f"{task_id}.json" elif 'text' in content_type: filename = f"{task_id}.txt" elif 'image' in content_type: # e.g., image/jpeg, image/png ext = content_type.split('/')[-1] filename = f"{task_id}.{ext}" else: filename = f"{task_id}.bin" # Generic binary extension # Ensure the output directory exists os.makedirs(output_dir, exist_ok=True) file_path = os.path.join(output_dir, filename) # Get the raw binary content of the file file_content = response.content # Save the content to a local file with open(file_path, 'wb') as f: f.write(file_content) print(f"Successfully downloaded file for task '{task_id}' to: {file_path}") return filename except requests.exceptions.HTTPError as e: print(f"HTTP Error: {e}") print(f"Response content: {e.response.text}") # Print response text for debugging 4xx/5xx errors return None except requests.exceptions.ConnectionError as e: print(f"Connection Error: {e}") return None except requests.exceptions.Timeout as e: print(f"Timeout Error: {e}") return None except requests.exceptions.RequestException as e: print(f"An unexpected error occurred: {e}") return None