import os import requests import pandas as pd from tools._session import _session DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data") os.makedirs(DATA_DIR, exist_ok=True) def _parse_file(file_path: str, content_bytes: bytes, ext: str) -> str: """Parse file content based on extension and return as string.""" try: if ext == ".csv": df = pd.read_csv(file_path) return f"CSV file ({len(df)} rows, {len(df.columns)} columns):\n{df.to_string(index=False)}" elif ext in (".xlsx", ".xls"): # Read all sheets xl = pd.ExcelFile(file_path) parts = [] for sheet in xl.sheet_names: df = xl.parse(sheet) parts.append(f"Sheet '{sheet}' ({len(df)} rows, {len(df.columns)} columns):\n{df.to_string(index=False)}") return "\n\n".join(parts) elif ext in (".py", ".txt", ".md", ".json", ".xml", ".html", ""): return f"File contents:\n{content_bytes.decode('utf-8', errors='replace')[:5000]}" else: try: return f"File contents:\n{content_bytes.decode('utf-8', errors='replace')[:5000]}" except Exception: return f"Binary file, cannot display as text. Size: {len(content_bytes)} bytes." except Exception as e: return f"Failed to parse file: {e}" def prefetch_file(task_id: str) -> str | None: """ Try to download the file for a task_id. Returns parsed file content string if found, None if no attachment exists. Caches file to data/ directory. """ # Check cache first cached = [f for f in os.listdir(DATA_DIR) if f.startswith(task_id)] if cached: file_path = os.path.join(DATA_DIR, cached[0]) ext = os.path.splitext(cached[0])[-1].lower() with open(file_path, "rb") as f: content_bytes = f.read() return _parse_file(file_path, content_bytes, ext) file_url = f"{DEFAULT_API_URL}/files/{task_id}" try: response = _session.get(file_url, timeout=30) if response.status_code == 404: return None response.raise_for_status() except Exception: return None # Determine extension ext = "" cd = response.headers.get("content-disposition", "") if "filename=" in cd: fname = cd.split("filename=")[-1].strip().strip('"') ext = os.path.splitext(fname)[-1].lower() content_type = response.headers.get("content-type", "") if not ext: if "csv" in content_type: ext = ".csv" elif "excel" in content_type or "spreadsheet" in content_type or "openxmlformats" in content_type: ext = ".xlsx" elif "text" in content_type: ext = ".txt" # Save to data/ file_path = os.path.join(DATA_DIR, f"{task_id}{ext}") with open(file_path, "wb") as f: f.write(response.content) return _parse_file(file_path, response.content, ext) def download_and_read_file(task_id: str) -> str: """Download and read a file attachment for a given task_id. Supports CSV, Excel (.xlsx/.xls), and plain text files. """ result = prefetch_file(task_id) if result is None: return "No file attachment found for this task." return result