File size: 3,380 Bytes
2628a0b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import os
import requests
import pandas as pd
from tools._session import _session

DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data")
os.makedirs(DATA_DIR, exist_ok=True)


def _parse_file(file_path: str, content_bytes: bytes, ext: str) -> str:
    """Parse file content based on extension and return as string."""
    try:
        if ext == ".csv":
            df = pd.read_csv(file_path)
            return f"CSV file ({len(df)} rows, {len(df.columns)} columns):\n{df.to_string(index=False)}"
        elif ext in (".xlsx", ".xls"):
            # Read all sheets
            xl = pd.ExcelFile(file_path)
            parts = []
            for sheet in xl.sheet_names:
                df = xl.parse(sheet)
                parts.append(f"Sheet '{sheet}' ({len(df)} rows, {len(df.columns)} columns):\n{df.to_string(index=False)}")
            return "\n\n".join(parts)
        elif ext in (".py", ".txt", ".md", ".json", ".xml", ".html", ""):
            return f"File contents:\n{content_bytes.decode('utf-8', errors='replace')[:5000]}"
        else:
            try:
                return f"File contents:\n{content_bytes.decode('utf-8', errors='replace')[:5000]}"
            except Exception:
                return f"Binary file, cannot display as text. Size: {len(content_bytes)} bytes."
    except Exception as e:
        return f"Failed to parse file: {e}"


def prefetch_file(task_id: str) -> str | None:
    """
    Try to download the file for a task_id.
    Returns parsed file content string if found, None if no attachment exists.
    Caches file to data/ directory.
    """
    # Check cache first
    cached = [f for f in os.listdir(DATA_DIR) if f.startswith(task_id)]
    if cached:
        file_path = os.path.join(DATA_DIR, cached[0])
        ext = os.path.splitext(cached[0])[-1].lower()
        with open(file_path, "rb") as f:
            content_bytes = f.read()
        return _parse_file(file_path, content_bytes, ext)

    file_url = f"{DEFAULT_API_URL}/files/{task_id}"
    try:
        response = _session.get(file_url, timeout=30)
        if response.status_code == 404:
            return None
        response.raise_for_status()
    except Exception:
        return None

    # Determine extension
    ext = ""
    cd = response.headers.get("content-disposition", "")
    if "filename=" in cd:
        fname = cd.split("filename=")[-1].strip().strip('"')
        ext = os.path.splitext(fname)[-1].lower()
    content_type = response.headers.get("content-type", "")
    if not ext:
        if "csv" in content_type:
            ext = ".csv"
        elif "excel" in content_type or "spreadsheet" in content_type or "openxmlformats" in content_type:
            ext = ".xlsx"
        elif "text" in content_type:
            ext = ".txt"

    # Save to data/
    file_path = os.path.join(DATA_DIR, f"{task_id}{ext}")
    with open(file_path, "wb") as f:
        f.write(response.content)

    return _parse_file(file_path, response.content, ext)


def download_and_read_file(task_id: str) -> str:
    """Download and read a file attachment for a given task_id.
    Supports CSV, Excel (.xlsx/.xls), and plain text files.
    """
    result = prefetch_file(task_id)
    if result is None:
        return "No file attachment found for this task."
    return result