|
|
|
|
|
""" |
|
|
Calculator Tool for GAIA Agent System |
|
|
Handles mathematical calculations, unit conversions, and statistical operations |
|
|
""" |
|
|
|
|
|
import re |
|
|
import math |
|
|
import statistics |
|
|
import logging |
|
|
from typing import Dict, List, Optional, Any, Union |
|
|
from dataclasses import dataclass |
|
|
|
|
|
from tools import BaseTool |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
@dataclass |
|
|
class CalculationResult: |
|
|
"""Container for calculation results""" |
|
|
expression: str |
|
|
result: Union[float, int, str] |
|
|
result_type: str |
|
|
steps: List[str] |
|
|
units: Optional[str] = None |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
return { |
|
|
"expression": self.expression, |
|
|
"result": self.result, |
|
|
"result_type": self.result_type, |
|
|
"steps": self.steps, |
|
|
"units": self.units |
|
|
} |
|
|
|
|
|
class CalculatorTool(BaseTool): |
|
|
""" |
|
|
Calculator tool for mathematical operations |
|
|
Supports basic math, advanced functions, statistics, and unit conversions |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
super().__init__("calculator") |
|
|
|
|
|
|
|
|
self.safe_functions = { |
|
|
|
|
|
'abs': abs, 'round': round, 'min': min, 'max': max, |
|
|
'sum': sum, 'len': len, |
|
|
|
|
|
|
|
|
'sin': math.sin, 'cos': math.cos, 'tan': math.tan, |
|
|
'asin': math.asin, 'acos': math.acos, 'atan': math.atan, |
|
|
'sinh': math.sinh, 'cosh': math.cosh, 'tanh': math.tanh, |
|
|
'exp': math.exp, 'log': math.log, 'log10': math.log10, |
|
|
'sqrt': math.sqrt, 'pow': pow, 'ceil': math.ceil, 'floor': math.floor, |
|
|
'factorial': math.factorial, 'gcd': math.gcd, |
|
|
|
|
|
|
|
|
'pi': math.pi, 'e': math.e, |
|
|
|
|
|
|
|
|
'mean': statistics.mean, 'median': statistics.median, |
|
|
'mode': statistics.mode, 'stdev': statistics.stdev, |
|
|
'variance': statistics.variance |
|
|
} |
|
|
|
|
|
|
|
|
self.unit_conversions = { |
|
|
|
|
|
'length': { |
|
|
'mm': 0.001, 'cm': 0.01, 'dm': 0.1, 'm': 1, |
|
|
'km': 1000, 'in': 0.0254, 'ft': 0.3048, |
|
|
'yd': 0.9144, 'mi': 1609.344 |
|
|
}, |
|
|
|
|
|
'weight': { |
|
|
'mg': 0.001, 'g': 1, 'kg': 1000, |
|
|
'oz': 28.3495, 'lb': 453.592, 'ton': 1000000 |
|
|
}, |
|
|
|
|
|
'temperature': { |
|
|
'celsius': 'celsius', 'fahrenheit': 'fahrenheit', |
|
|
'kelvin': 'kelvin', 'c': 'celsius', 'f': 'fahrenheit', 'k': 'kelvin' |
|
|
}, |
|
|
|
|
|
'time': { |
|
|
's': 1, 'min': 60, 'h': 3600, 'hr': 3600, |
|
|
'day': 86400, 'week': 604800, 'month': 2629746, 'year': 31556952 |
|
|
}, |
|
|
|
|
|
'area': { |
|
|
'mm2': 0.000001, 'cm2': 0.0001, 'm2': 1, |
|
|
'km2': 1000000, 'in2': 0.00064516, 'ft2': 0.092903 |
|
|
}, |
|
|
|
|
|
'volume': { |
|
|
'ml': 0.001, 'l': 1, 'gal': 3.78541, 'qt': 0.946353, |
|
|
'pt': 0.473176, 'cup': 0.236588, 'fl_oz': 0.0295735 |
|
|
} |
|
|
} |
|
|
|
|
|
def _execute_impl(self, input_data: Any, **kwargs) -> Dict[str, Any]: |
|
|
""" |
|
|
Execute calculator operations based on input type |
|
|
|
|
|
Args: |
|
|
input_data: Can be: |
|
|
- str: Mathematical expression |
|
|
- dict: {"expression": str, "operation": str, "data": list, "units": dict} |
|
|
""" |
|
|
|
|
|
if isinstance(input_data, str): |
|
|
return self._evaluate_expression(input_data) |
|
|
|
|
|
elif isinstance(input_data, dict): |
|
|
operation = input_data.get("operation", "evaluate") |
|
|
|
|
|
if operation == "evaluate": |
|
|
expression = input_data.get("expression", "") |
|
|
return self._evaluate_expression(expression) |
|
|
elif operation == "statistics": |
|
|
data = input_data.get("data", []) |
|
|
return self._calculate_statistics(data) |
|
|
elif operation == "convert": |
|
|
value = input_data.get("value", 0) |
|
|
from_unit = input_data.get("from_unit", "") |
|
|
to_unit = input_data.get("to_unit", "") |
|
|
return self._convert_units(value, from_unit, to_unit) |
|
|
else: |
|
|
raise ValueError(f"Unknown operation: {operation}") |
|
|
else: |
|
|
raise ValueError(f"Unsupported input type: {type(input_data)}") |
|
|
|
|
|
def _evaluate_expression(self, expression: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Safely evaluate a mathematical expression |
|
|
""" |
|
|
try: |
|
|
|
|
|
original_expression = expression |
|
|
expression = self._clean_expression(expression) |
|
|
|
|
|
steps = [f"Original: {original_expression}", f"Cleaned: {expression}"] |
|
|
|
|
|
|
|
|
unit_match = re.search(r'(\d+\.?\d*)\s*(\w+)\s+to\s+(\w+)', expression) |
|
|
if unit_match: |
|
|
value, from_unit, to_unit = unit_match.groups() |
|
|
return self._convert_units(float(value), from_unit, to_unit) |
|
|
|
|
|
|
|
|
expression = self._replace_math_expressions(expression) |
|
|
steps.append(f"With functions: {expression}") |
|
|
|
|
|
|
|
|
if not self._is_safe_expression(expression): |
|
|
raise ValueError("Expression contains unsafe operations") |
|
|
|
|
|
|
|
|
safe_dict = { |
|
|
"__builtins__": {}, |
|
|
**self.safe_functions |
|
|
} |
|
|
|
|
|
|
|
|
result = eval(expression, safe_dict) |
|
|
|
|
|
|
|
|
if isinstance(result, (int, float)): |
|
|
if result == int(result): |
|
|
result = int(result) |
|
|
result_type = "integer" |
|
|
else: |
|
|
result = round(result, 10) |
|
|
result_type = "float" |
|
|
else: |
|
|
result_type = type(result).__name__ |
|
|
|
|
|
calc_result = CalculationResult( |
|
|
expression=original_expression, |
|
|
result=result, |
|
|
result_type=result_type, |
|
|
steps=steps |
|
|
) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"calculation": calc_result.to_dict(), |
|
|
"message": f"Successfully evaluated: {result}" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"success": False, |
|
|
"expression": expression, |
|
|
"message": f"Calculation failed: {str(e)}", |
|
|
"error_type": type(e).__name__ |
|
|
} |
|
|
|
|
|
def _clean_expression(self, expression: str) -> str: |
|
|
"""Clean and normalize mathematical expression""" |
|
|
|
|
|
expression = re.sub(r'\s+', ' ', expression.strip()) |
|
|
|
|
|
|
|
|
replacements = { |
|
|
' plus ': '+', ' minus ': '-', ' times ': '*', ' multiply ': '*', |
|
|
' divided by ': '/', ' divide ': '/', ' power ': '**', ' to the power of ': '**' |
|
|
} |
|
|
|
|
|
for text, symbol in replacements.items(): |
|
|
expression = expression.replace(text, symbol) |
|
|
|
|
|
|
|
|
expression = re.sub(r'(\d+\.?\d*)%', r'(\1/100)', expression) |
|
|
|
|
|
return expression |
|
|
|
|
|
def _replace_math_expressions(self, expression: str) -> str: |
|
|
"""Replace mathematical function names with proper calls""" |
|
|
|
|
|
expression = re.sub(r'sqrt\s*\(([^)]+)\)', r'sqrt(\1)', expression) |
|
|
expression = re.sub(r'square root of (\d+\.?\d*)', r'sqrt(\1)', expression) |
|
|
|
|
|
|
|
|
expression = re.sub(r'log\s*\(([^)]+)\)', r'log(\1)', expression) |
|
|
expression = re.sub(r'ln\s*\(([^)]+)\)', r'log(\1)', expression) |
|
|
|
|
|
|
|
|
trig_functions = ['sin', 'cos', 'tan', 'asin', 'acos', 'atan'] |
|
|
for func in trig_functions: |
|
|
expression = re.sub(f'{func}\\s*\\(([^)]+)\\)', f'{func}(\\1)', expression) |
|
|
|
|
|
return expression |
|
|
|
|
|
def _is_safe_expression(self, expression: str) -> bool: |
|
|
"""Check if expression is safe to evaluate""" |
|
|
|
|
|
forbidden_patterns = [ |
|
|
r'__.*__', |
|
|
r'import\s', |
|
|
r'exec\s*\(', |
|
|
r'eval\s*\(', |
|
|
r'open\s*\(', |
|
|
r'file\s*\(', |
|
|
r'input\s*\(', |
|
|
r'raw_input\s*\(', |
|
|
] |
|
|
|
|
|
for pattern in forbidden_patterns: |
|
|
if re.search(pattern, expression, re.IGNORECASE): |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
def _calculate_statistics(self, data: List[float]) -> Dict[str, Any]: |
|
|
"""Calculate statistical measures for a dataset""" |
|
|
try: |
|
|
if not data: |
|
|
raise ValueError("Empty dataset provided") |
|
|
|
|
|
data = [float(x) for x in data] |
|
|
|
|
|
stats = { |
|
|
"count": len(data), |
|
|
"sum": sum(data), |
|
|
"mean": statistics.mean(data), |
|
|
"median": statistics.median(data), |
|
|
"min": min(data), |
|
|
"max": max(data), |
|
|
"range": max(data) - min(data) |
|
|
} |
|
|
|
|
|
|
|
|
if len(data) > 1: |
|
|
stats["stdev"] = statistics.stdev(data) |
|
|
stats["variance"] = statistics.variance(data) |
|
|
|
|
|
|
|
|
try: |
|
|
stats["mode"] = statistics.mode(data) |
|
|
except statistics.StatisticsError: |
|
|
stats["mode"] = "No unique mode" |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"statistics": stats, |
|
|
"data": data, |
|
|
"message": f"Calculated statistics for {len(data)} data points" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"success": False, |
|
|
"message": f"Statistics calculation failed: {str(e)}", |
|
|
"error_type": type(e).__name__ |
|
|
} |
|
|
|
|
|
def _convert_units(self, value: float, from_unit: str, to_unit: str) -> Dict[str, Any]: |
|
|
"""Convert between different units""" |
|
|
try: |
|
|
from_unit = from_unit.lower() |
|
|
to_unit = to_unit.lower() |
|
|
|
|
|
|
|
|
unit_type = None |
|
|
for utype, units in self.unit_conversions.items(): |
|
|
if from_unit in units and to_unit in units: |
|
|
unit_type = utype |
|
|
break |
|
|
|
|
|
if not unit_type: |
|
|
raise ValueError(f"Cannot convert between {from_unit} and {to_unit}") |
|
|
|
|
|
|
|
|
if unit_type == 'temperature': |
|
|
result = self._convert_temperature(value, from_unit, to_unit) |
|
|
else: |
|
|
|
|
|
from_factor = self.unit_conversions[unit_type][from_unit] |
|
|
to_factor = self.unit_conversions[unit_type][to_unit] |
|
|
result = value * from_factor / to_factor |
|
|
|
|
|
|
|
|
if result == int(result): |
|
|
result = int(result) |
|
|
else: |
|
|
result = round(result, 6) |
|
|
|
|
|
conversion_result = CalculationResult( |
|
|
expression=f"{value} {from_unit} to {to_unit}", |
|
|
result=result, |
|
|
result_type="conversion", |
|
|
steps=[ |
|
|
f"Convert {value} {from_unit} to {to_unit}", |
|
|
f"Result: {result} {to_unit}" |
|
|
], |
|
|
units=to_unit |
|
|
) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"conversion": conversion_result.to_dict(), |
|
|
"message": f"Converted {value} {from_unit} = {result} {to_unit}" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"success": False, |
|
|
"message": f"Unit conversion failed: {str(e)}", |
|
|
"error_type": type(e).__name__ |
|
|
} |
|
|
|
|
|
def _convert_temperature(self, value: float, from_unit: str, to_unit: str) -> float: |
|
|
"""Convert temperature between Celsius, Fahrenheit, and Kelvin""" |
|
|
|
|
|
unit_map = {'c': 'celsius', 'f': 'fahrenheit', 'k': 'kelvin'} |
|
|
from_unit = unit_map.get(from_unit, from_unit) |
|
|
to_unit = unit_map.get(to_unit, to_unit) |
|
|
|
|
|
|
|
|
if from_unit == 'fahrenheit': |
|
|
celsius = (value - 32) * 5/9 |
|
|
elif from_unit == 'kelvin': |
|
|
celsius = value - 273.15 |
|
|
else: |
|
|
celsius = value |
|
|
|
|
|
|
|
|
if to_unit == 'fahrenheit': |
|
|
return celsius * 9/5 + 32 |
|
|
elif to_unit == 'kelvin': |
|
|
return celsius + 273.15 |
|
|
else: |
|
|
return celsius |
|
|
|
|
|
def test_calculator_tool(): |
|
|
"""Test the calculator tool with various operations""" |
|
|
tool = CalculatorTool() |
|
|
|
|
|
|
|
|
test_cases = [ |
|
|
"2 + 3 * 4", |
|
|
"sqrt(16) + 2^3", |
|
|
"sin(pi/2) + cos(0)", |
|
|
{"operation": "statistics", "data": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]}, |
|
|
{"operation": "convert", "value": 100, "from_unit": "cm", "to_unit": "m"}, |
|
|
{"operation": "convert", "value": 32, "from_unit": "f", "to_unit": "c"}, |
|
|
"10 factorial", |
|
|
"mean([1, 2, 3, 4, 5])", |
|
|
"15% of 200" |
|
|
] |
|
|
|
|
|
print("🧪 Testing Calculator Tool...") |
|
|
|
|
|
for i, test_case in enumerate(test_cases, 1): |
|
|
print(f"\n--- Test {i}: {test_case} ---") |
|
|
try: |
|
|
result = tool.execute(test_case) |
|
|
|
|
|
if result.success: |
|
|
if 'calculation' in result.result: |
|
|
calc = result.result['calculation'] |
|
|
print(f"✅ Result: {calc['result']} ({calc['result_type']})") |
|
|
elif 'statistics' in result.result: |
|
|
stats = result.result['statistics'] |
|
|
print(f"✅ Mean: {stats['mean']}, Median: {stats['median']}, StDev: {stats.get('stdev', 'N/A')}") |
|
|
elif 'conversion' in result.result: |
|
|
conv = result.result['conversion'] |
|
|
print(f"✅ Conversion: {conv['result']} {conv['units']}") |
|
|
print(f" Message: {result.result.get('message', 'No message')}") |
|
|
else: |
|
|
print(f"❌ Error: {result.result.get('message', 'Unknown error')}") |
|
|
|
|
|
print(f" Execution time: {result.execution_time:.3f}s") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Exception: {str(e)}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
test_calculator_tool() |