File size: 3,835 Bytes
354908d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from smolagents import Tool
import pandas as pd
from typing import Optional
import os

class TableExtractorTool(Tool):
    """
    Extracts tables from Excel (.xlsx, .xls) or CSV files and answers queries.
    Auto-detects file type based on extension.
    """
    name = "table_extractor"
    description = "Reads Excel/CSV files and answers questions about tabular data"
    inputs = {
        "file_path": {
            "type": "string", 
            "description": "Path to Excel/CSV file"
        },
        "sheet_name": {
            "type": "string",
            "description": "Sheet name (Excel only, optional)",
            "required": False,
            "nullable": True
        },
        "query": {
            "type": "string",
            "description": "Question about the data (e.g., 'total sales')",
            "required": False,
            "nullable": True
        }
    }
    output_type = "string"

    def forward(self, 
               file_path: str,
               sheet_name: Optional[str] = None,
               query: Optional[str] = None) -> str:
        
        try:
            # Validate file exists
            if not os.path.exists(file_path):
                return f"Error: File not found at {file_path}"
            
            # Read file based on extension
            ext = os.path.splitext(file_path)[1].lower()
            
            if ext in ('.xlsx', '.xls'):
                df = self._read_excel(file_path, sheet_name)
            elif ext == '.csv':
                df = pd.read_csv(file_path)
            else:
                return f"Error: Unsupported file type {ext}"
            
            if df.empty:
                return "Error: No data found in file."
            
            return self._answer_query(df, query) if query else df.to_string()
            
        except Exception as e:
            return f"Error processing file: {str(e)}"

    def _read_excel(self, path: str, sheet_name: Optional[str]) -> pd.DataFrame:
        """Read Excel file with sheet selection logic"""
        if sheet_name:
            return pd.read_excel(path, sheet_name=sheet_name)
        
        # Auto-detect first non-empty sheet
        sheets = pd.ExcelFile(path).sheet_names
        for sheet in sheets:
            df = pd.read_excel(path, sheet_name=sheet)
            if not df.empty:
                return df
        return pd.DataFrame()  # Return empty if all sheets are blank

    def _answer_query(self, df: pd.DataFrame, query: str) -> str:
        """Handles queries with pandas operations"""
        query = query.lower()
        
        try:
            # SUM QUERIES (e.g., "total revenue")
            if "total" in query or "sum" in query:
                for col in df.select_dtypes(include='number').columns:
                    if col.lower() in query:
                        return f"Total {col}: {df[col].sum():.2f}"
            
            # AVERAGE QUERIES (e.g., "average price")
            elif "average" in query or "mean" in query:
                for col in df.select_dtypes(include='number').columns:
                    if col.lower() in query:
                        return f"Average {col}: {df[col].mean():.2f}"
            
            # FILTER QUERIES (e.g., "show sales > 1000")
            elif ">" in query or "<" in query:
                col = next((c for c in df.columns if c.lower() in query), None)
                if col:
                    filtered = df.query(query.replace(col, f"`{col}`"))
                    return filtered.to_string()
            
            # DEFAULT: Return full table with column names
            return f"Data:\nColumns: {', '.join(df.columns)}\n\n{df.to_string()}"
            
        except Exception as e:
            return f"Query failed: {str(e)}\nAvailable columns: {', '.join(df.columns)}"