| | from typing import Dict, List, Any, Optional, Callable
|
| | import pandas as pd
|
| | import numpy as np
|
| | from llama_index.tools import FunctionTool
|
| | from pathlib import Path
|
| |
|
| | class PandasDataTools:
|
| | """Tools for data analysis operations on CSV files."""
|
| |
|
| | def __init__(self, csv_directory: str):
|
| | """Initialize with directory containing CSV files."""
|
| | self.csv_directory = csv_directory
|
| | self.dataframes = {}
|
| | self.tools = self._create_tools()
|
| |
|
| | def _load_dataframe(self, filename: str) -> pd.DataFrame:
|
| | """Load a CSV file as DataFrame, with caching."""
|
| | if filename not in self.dataframes:
|
| | file_path = Path(self.csv_directory) / filename
|
| | if not file_path.exists() and not filename.endswith('.csv'):
|
| | file_path = Path(self.csv_directory) / f"{filename}.csv"
|
| |
|
| | if file_path.exists():
|
| | self.dataframes[filename] = pd.read_csv(file_path)
|
| | else:
|
| | raise ValueError(f"CSV file not found: {filename}")
|
| |
|
| | return self.dataframes[filename]
|
| |
|
| | def _create_tools(self) -> List[FunctionTool]:
|
| | """Create LlamaIndex function tools for data operations."""
|
| | tools = [
|
| | FunctionTool.from_defaults(
|
| | name="describe_csv",
|
| | description="Get statistical description of a CSV file",
|
| | fn=self.describe_csv
|
| | ),
|
| | FunctionTool.from_defaults(
|
| | name="filter_data",
|
| | description="Filter CSV data based on conditions",
|
| | fn=self.filter_data
|
| | ),
|
| | FunctionTool.from_defaults(
|
| | name="group_and_aggregate",
|
| | description="Group data and calculate aggregate statistics",
|
| | fn=self.group_and_aggregate
|
| | ),
|
| | FunctionTool.from_defaults(
|
| | name="sort_data",
|
| | description="Sort data by specified columns",
|
| | fn=self.sort_data
|
| | ),
|
| | FunctionTool.from_defaults(
|
| | name="calculate_correlation",
|
| | description="Calculate correlation between columns",
|
| | fn=self.calculate_correlation
|
| | )
|
| | ]
|
| | return tools
|
| |
|
| | def get_tools(self) -> List[FunctionTool]:
|
| | """Get all available data tools."""
|
| | return self.tools
|
| |
|
| |
|
| | def describe_csv(self, filename: str) -> Dict[str, Any]:
|
| | """Get statistical description of CSV data."""
|
| | df = self._load_dataframe(filename)
|
| | description = df.describe().to_dict()
|
| |
|
| |
|
| | result = {
|
| | "statistics": description,
|
| | "shape": df.shape,
|
| | "columns": df.columns.tolist(),
|
| | "dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()}
|
| | }
|
| |
|
| | return result
|
| |
|
| | def filter_data(self, filename: str, column: str, condition: str, value: Any) -> Dict[str, Any]:
|
| | """Filter data based on condition (==, >, <, >=, <=, !=, contains)."""
|
| | df = self._load_dataframe(filename)
|
| |
|
| | if condition == "==":
|
| | filtered = df[df[column] == value]
|
| | elif condition == ">":
|
| | filtered = df[df[column] > float(value)]
|
| | elif condition == "<":
|
| | filtered = df[df[column] < float(value)]
|
| | elif condition == ">=":
|
| | filtered = df[df[column] >= float(value)]
|
| | elif condition == "<=":
|
| | filtered = df[df[column] <= float(value)]
|
| | elif condition == "!=":
|
| | filtered = df[df[column] != value]
|
| | elif condition.lower() == "contains":
|
| | filtered = df[df[column].astype(str).str.contains(str(value))]
|
| | else:
|
| | return {"error": f"Unsupported condition: {condition}"}
|
| |
|
| | return {
|
| | "result_count": len(filtered),
|
| | "results": filtered.head(10).to_dict(orient="records"),
|
| | "total_count": len(df)
|
| | }
|
| |
|
| | def group_and_aggregate(self, filename: str, group_by: str, agg_column: str,
|
| | agg_function: str = "mean") -> Dict[str, Any]:
|
| | """Group by column and calculate aggregate statistic."""
|
| | df = self._load_dataframe(filename)
|
| |
|
| | agg_functions = {
|
| | "mean": np.mean,
|
| | "sum": np.sum,
|
| | "min": np.min,
|
| | "max": np.max,
|
| | "count": len,
|
| | "median": np.median
|
| | }
|
| |
|
| | if agg_function not in agg_functions:
|
| | return {"error": f"Unsupported aggregation function: {agg_function}"}
|
| |
|
| | grouped = df.groupby(group_by)[agg_column].agg(agg_functions[agg_function])
|
| |
|
| | return {
|
| | "group_by": group_by,
|
| | "aggregated_column": agg_column,
|
| | "aggregation": agg_function,
|
| | "results": grouped.to_dict()
|
| | }
|
| |
|
| | def sort_data(self, filename: str, sort_by: str, ascending: bool = True) -> Dict[str, Any]:
|
| | """Sort data by column."""
|
| | df = self._load_dataframe(filename)
|
| |
|
| | sorted_df = df.sort_values(by=sort_by, ascending=ascending)
|
| |
|
| | return {
|
| | "sorted_by": sort_by,
|
| | "ascending": ascending,
|
| | "results": sorted_df.head(10).to_dict(orient="records")
|
| | }
|
| |
|
| | def calculate_correlation(self, filename: str, column1: str, column2: str) -> Dict[str, Any]:
|
| | """Calculate correlation between two columns."""
|
| | df = self._load_dataframe(filename)
|
| |
|
| | try:
|
| | correlation = df[column1].corr(df[column2])
|
| | return {
|
| | "correlation": correlation,
|
| | "column1": column1,
|
| | "column2": column2
|
| | }
|
| | except Exception as e:
|
| | return {"error": f"Could not calculate correlation: {str(e)}"}
|
| |
|