Spaces:
Sleeping
Sleeping
| import os | |
| import glob | |
| import requests | |
| import re | |
| import pandas as pd | |
| from smolagents import tool | |
| # --- Constants --- | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| api_url = DEFAULT_API_URL | |
| questions_url = f"{api_url}/questions" | |
| files_url = f"{api_url}/files/" | |
| def download_and_read_excel_file(task_id: str) -> pd.DataFrame: | |
| """Downloads an Excel file for a given task_id and reads it into a pandas DataFrame. | |
| Args: | |
| task_id: The task_id for which the file should be downloaded. | |
| Returns: | |
| A pandas DataFrame containing the content of the Excel file or None if the download failed. | |
| """ | |
| filename = download_file(task_id=task_id, output_dir='attachments') | |
| if filename is None: | |
| print(f"Failed to download file for task_id: {task_id}") | |
| return None | |
| file_path = os.path.join('attachments', filename) | |
| try: | |
| # Read the Excel file into a pandas DataFrame | |
| df = pd.read_excel(file_path, engine='openpyxl') | |
| print(f"Successfully read Excel file: {file_path}") | |
| return df | |
| except Exception as e: | |
| print(f"Error reading Excel file {file_path}: {e}") | |
| return None | |
| def text_file_tool(filename: str) -> str: | |
| """Fetch the content of a text file based on its filename. | |
| Args: | |
| filename: The filename of the text file to fetch.""" | |
| # search in the attachment folder for a file with the same task_id and with any extension: | |
| file_path = os.path.join('attachments', f"{filename}") | |
| #open the file | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as file: | |
| content = file.read() | |
| print(f"Successfully read file: {file_path}") | |
| return content | |
| except FileNotFoundError: | |
| print(f"File {file_path} not found.") | |
| return f"File {file_path} not found." | |
| def download_file( output_dir:str,task_id:str) -> str: | |
| """Downloads a file for a given task_id and saves it to the specified output directory. | |
| Args: | |
| output_dir: The directory where the file should be saved. | |
| task_id: The task_id for which the file should be downloaded. | |
| Returns: | |
| The filename of the downloaded file or None if the download failed. | |
| """ | |
| try: | |
| response = requests.get(files_url+task_id, timeout=15) | |
| response.raise_for_status() | |
| # It's good practice to try and get the filename from headers if available | |
| # The API documentation doesn't explicitly state Content-Disposition, | |
| # but it's a common practice. If not present, you'd need a default name. | |
| filename = None | |
| if 'content-disposition' in response.headers: | |
| # Example: Content-Disposition: attachment; filename="my_file.txt" | |
| cd = response.headers['content-disposition'] | |
| filename_match = re.search(r'filename="([^"]+)"', cd) | |
| if filename_match: | |
| filename = filename_match.group(1) | |
| if not filename: | |
| # Fallback filename if not provided by the API (e.g., using task_id and a common extension) | |
| # You might need to infer the correct extension based on the content-type or typical files | |
| content_type = response.headers.get('content-type', 'application/octet-stream') | |
| if 'json' in content_type: | |
| filename = f"{task_id}.json" | |
| elif 'text' in content_type: | |
| filename = f"{task_id}.txt" | |
| elif 'image' in content_type: # e.g., image/jpeg, image/png | |
| ext = content_type.split('/')[-1] | |
| filename = f"{task_id}.{ext}" | |
| else: | |
| filename = f"{task_id}.bin" # Generic binary extension | |
| # Ensure the output directory exists | |
| os.makedirs(output_dir, exist_ok=True) | |
| file_path = os.path.join(output_dir, filename) | |
| # Get the raw binary content of the file | |
| file_content = response.content | |
| # Save the content to a local file | |
| with open(file_path, 'wb') as f: | |
| f.write(file_content) | |
| print(f"Successfully downloaded file for task '{task_id}' to: {file_path}") | |
| return filename | |
| except requests.exceptions.HTTPError as e: | |
| print(f"HTTP Error: {e}") | |
| print(f"Response content: {e.response.text}") # Print response text for debugging 4xx/5xx errors | |
| return None | |
| except requests.exceptions.ConnectionError as e: | |
| print(f"Connection Error: {e}") | |
| return None | |
| except requests.exceptions.Timeout as e: | |
| print(f"Timeout Error: {e}") | |
| return None | |
| except requests.exceptions.RequestException as e: | |
| print(f"An unexpected error occurred: {e}") | |
| return None |