| | |
| | """ |
| | Calculator Tool for GAIA Agent System |
| | Handles mathematical calculations, unit conversions, and statistical operations |
| | """ |
| |
|
| | import re |
| | import math |
| | import statistics |
| | import logging |
| | from typing import Dict, List, Optional, Any, Union |
| | from dataclasses import dataclass |
| |
|
| | from tools import BaseTool |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | @dataclass |
| | class CalculationResult: |
| | """Container for calculation results""" |
| | expression: str |
| | result: Union[float, int, str] |
| | result_type: str |
| | steps: List[str] |
| | units: Optional[str] = None |
| | |
| | def to_dict(self) -> Dict[str, Any]: |
| | return { |
| | "expression": self.expression, |
| | "result": self.result, |
| | "result_type": self.result_type, |
| | "steps": self.steps, |
| | "units": self.units |
| | } |
| |
|
| | class CalculatorTool(BaseTool): |
| | """ |
| | Calculator tool for mathematical operations |
| | Supports basic math, advanced functions, statistics, and unit conversions |
| | """ |
| | |
| | def __init__(self): |
| | super().__init__("calculator") |
| | |
| | |
| | self.safe_functions = { |
| | |
| | 'abs': abs, 'round': round, 'min': min, 'max': max, |
| | 'sum': sum, 'len': len, |
| | |
| | |
| | 'sin': math.sin, 'cos': math.cos, 'tan': math.tan, |
| | 'asin': math.asin, 'acos': math.acos, 'atan': math.atan, |
| | 'sinh': math.sinh, 'cosh': math.cosh, 'tanh': math.tanh, |
| | 'exp': math.exp, 'log': math.log, 'log10': math.log10, |
| | 'sqrt': math.sqrt, 'pow': pow, 'ceil': math.ceil, 'floor': math.floor, |
| | 'factorial': math.factorial, 'gcd': math.gcd, |
| | |
| | |
| | 'pi': math.pi, 'e': math.e, |
| | |
| | |
| | 'mean': statistics.mean, 'median': statistics.median, |
| | 'mode': statistics.mode, 'stdev': statistics.stdev, |
| | 'variance': statistics.variance |
| | } |
| | |
| | |
| | self.unit_conversions = { |
| | |
| | 'length': { |
| | 'mm': 0.001, 'cm': 0.01, 'dm': 0.1, 'm': 1, |
| | 'km': 1000, 'in': 0.0254, 'ft': 0.3048, |
| | 'yd': 0.9144, 'mi': 1609.344 |
| | }, |
| | |
| | 'weight': { |
| | 'mg': 0.001, 'g': 1, 'kg': 1000, |
| | 'oz': 28.3495, 'lb': 453.592, 'ton': 1000000 |
| | }, |
| | |
| | 'temperature': { |
| | 'celsius': 'celsius', 'fahrenheit': 'fahrenheit', |
| | 'kelvin': 'kelvin', 'c': 'celsius', 'f': 'fahrenheit', 'k': 'kelvin' |
| | }, |
| | |
| | 'time': { |
| | 's': 1, 'min': 60, 'h': 3600, 'hr': 3600, |
| | 'day': 86400, 'week': 604800, 'month': 2629746, 'year': 31556952 |
| | }, |
| | |
| | 'area': { |
| | 'mm2': 0.000001, 'cm2': 0.0001, 'm2': 1, |
| | 'km2': 1000000, 'in2': 0.00064516, 'ft2': 0.092903 |
| | }, |
| | |
| | 'volume': { |
| | 'ml': 0.001, 'l': 1, 'gal': 3.78541, 'qt': 0.946353, |
| | 'pt': 0.473176, 'cup': 0.236588, 'fl_oz': 0.0295735 |
| | } |
| | } |
| | |
| | def _execute_impl(self, input_data: Any, **kwargs) -> Dict[str, Any]: |
| | """ |
| | Execute calculator operations based on input type |
| | |
| | Args: |
| | input_data: Can be: |
| | - str: Mathematical expression |
| | - dict: {"expression": str, "operation": str, "data": list, "units": dict} |
| | """ |
| | |
| | if isinstance(input_data, str): |
| | return self._evaluate_expression(input_data) |
| | |
| | elif isinstance(input_data, dict): |
| | operation = input_data.get("operation", "evaluate") |
| | |
| | if operation == "evaluate": |
| | expression = input_data.get("expression", "") |
| | return self._evaluate_expression(expression) |
| | elif operation == "statistics": |
| | data = input_data.get("data", []) |
| | return self._calculate_statistics(data) |
| | elif operation == "convert": |
| | value = input_data.get("value", 0) |
| | from_unit = input_data.get("from_unit", "") |
| | to_unit = input_data.get("to_unit", "") |
| | return self._convert_units(value, from_unit, to_unit) |
| | else: |
| | raise ValueError(f"Unknown operation: {operation}") |
| | else: |
| | raise ValueError(f"Unsupported input type: {type(input_data)}") |
| | |
| | def _evaluate_expression(self, expression: str) -> Dict[str, Any]: |
| | """ |
| | Safely evaluate a mathematical expression |
| | """ |
| | try: |
| | |
| | original_expression = expression |
| | expression = self._clean_expression(expression) |
| | |
| | steps = [f"Original: {original_expression}", f"Cleaned: {expression}"] |
| | |
| | |
| | unit_match = re.search(r'(\d+\.?\d*)\s*(\w+)\s+to\s+(\w+)', expression) |
| | if unit_match: |
| | value, from_unit, to_unit = unit_match.groups() |
| | return self._convert_units(float(value), from_unit, to_unit) |
| | |
| | |
| | expression = self._replace_math_expressions(expression) |
| | steps.append(f"With functions: {expression}") |
| | |
| | |
| | if not self._is_safe_expression(expression): |
| | raise ValueError("Expression contains unsafe operations") |
| | |
| | |
| | safe_dict = { |
| | "__builtins__": {}, |
| | **self.safe_functions |
| | } |
| | |
| | |
| | result = eval(expression, safe_dict) |
| | |
| | |
| | if isinstance(result, (int, float)): |
| | if result == int(result): |
| | result = int(result) |
| | result_type = "integer" |
| | else: |
| | result = round(result, 10) |
| | result_type = "float" |
| | else: |
| | result_type = type(result).__name__ |
| | |
| | calc_result = CalculationResult( |
| | expression=original_expression, |
| | result=result, |
| | result_type=result_type, |
| | steps=steps |
| | ) |
| | |
| | return { |
| | "success": True, |
| | "calculation": calc_result.to_dict(), |
| | "message": f"Successfully evaluated: {result}" |
| | } |
| | |
| | except Exception as e: |
| | return { |
| | "success": False, |
| | "expression": expression, |
| | "message": f"Calculation failed: {str(e)}", |
| | "error_type": type(e).__name__ |
| | } |
| | |
| | def _clean_expression(self, expression: str) -> str: |
| | """Clean and normalize mathematical expression""" |
| | |
| | expression = re.sub(r'\s+', ' ', expression.strip()) |
| | |
| | |
| | replacements = { |
| | ' plus ': '+', ' minus ': '-', ' times ': '*', ' multiply ': '*', |
| | ' divided by ': '/', ' divide ': '/', ' power ': '**', ' to the power of ': '**' |
| | } |
| | |
| | for text, symbol in replacements.items(): |
| | expression = expression.replace(text, symbol) |
| | |
| | |
| | expression = re.sub(r'(\d+\.?\d*)%', r'(\1/100)', expression) |
| | |
| | return expression |
| | |
| | def _replace_math_expressions(self, expression: str) -> str: |
| | """Replace mathematical function names with proper calls""" |
| | |
| | expression = re.sub(r'sqrt\s*\(([^)]+)\)', r'sqrt(\1)', expression) |
| | expression = re.sub(r'square root of (\d+\.?\d*)', r'sqrt(\1)', expression) |
| | |
| | |
| | expression = re.sub(r'log\s*\(([^)]+)\)', r'log(\1)', expression) |
| | expression = re.sub(r'ln\s*\(([^)]+)\)', r'log(\1)', expression) |
| | |
| | |
| | trig_functions = ['sin', 'cos', 'tan', 'asin', 'acos', 'atan'] |
| | for func in trig_functions: |
| | expression = re.sub(f'{func}\\s*\\(([^)]+)\\)', f'{func}(\\1)', expression) |
| | |
| | return expression |
| | |
| | def _is_safe_expression(self, expression: str) -> bool: |
| | """Check if expression is safe to evaluate""" |
| | |
| | forbidden_patterns = [ |
| | r'__.*__', |
| | r'import\s', |
| | r'exec\s*\(', |
| | r'eval\s*\(', |
| | r'open\s*\(', |
| | r'file\s*\(', |
| | r'input\s*\(', |
| | r'raw_input\s*\(', |
| | ] |
| | |
| | for pattern in forbidden_patterns: |
| | if re.search(pattern, expression, re.IGNORECASE): |
| | return False |
| | |
| | return True |
| | |
| | def _calculate_statistics(self, data: List[float]) -> Dict[str, Any]: |
| | """Calculate statistical measures for a dataset""" |
| | try: |
| | if not data: |
| | raise ValueError("Empty dataset provided") |
| | |
| | data = [float(x) for x in data] |
| | |
| | stats = { |
| | "count": len(data), |
| | "sum": sum(data), |
| | "mean": statistics.mean(data), |
| | "median": statistics.median(data), |
| | "min": min(data), |
| | "max": max(data), |
| | "range": max(data) - min(data) |
| | } |
| | |
| | |
| | if len(data) > 1: |
| | stats["stdev"] = statistics.stdev(data) |
| | stats["variance"] = statistics.variance(data) |
| | |
| | |
| | try: |
| | stats["mode"] = statistics.mode(data) |
| | except statistics.StatisticsError: |
| | stats["mode"] = "No unique mode" |
| | |
| | return { |
| | "success": True, |
| | "statistics": stats, |
| | "data": data, |
| | "message": f"Calculated statistics for {len(data)} data points" |
| | } |
| | |
| | except Exception as e: |
| | return { |
| | "success": False, |
| | "message": f"Statistics calculation failed: {str(e)}", |
| | "error_type": type(e).__name__ |
| | } |
| | |
| | def _convert_units(self, value: float, from_unit: str, to_unit: str) -> Dict[str, Any]: |
| | """Convert between different units""" |
| | try: |
| | from_unit = from_unit.lower() |
| | to_unit = to_unit.lower() |
| | |
| | |
| | unit_type = None |
| | for utype, units in self.unit_conversions.items(): |
| | if from_unit in units and to_unit in units: |
| | unit_type = utype |
| | break |
| | |
| | if not unit_type: |
| | raise ValueError(f"Cannot convert between {from_unit} and {to_unit}") |
| | |
| | |
| | if unit_type == 'temperature': |
| | result = self._convert_temperature(value, from_unit, to_unit) |
| | else: |
| | |
| | from_factor = self.unit_conversions[unit_type][from_unit] |
| | to_factor = self.unit_conversions[unit_type][to_unit] |
| | result = value * from_factor / to_factor |
| | |
| | |
| | if result == int(result): |
| | result = int(result) |
| | else: |
| | result = round(result, 6) |
| | |
| | conversion_result = CalculationResult( |
| | expression=f"{value} {from_unit} to {to_unit}", |
| | result=result, |
| | result_type="conversion", |
| | steps=[ |
| | f"Convert {value} {from_unit} to {to_unit}", |
| | f"Result: {result} {to_unit}" |
| | ], |
| | units=to_unit |
| | ) |
| | |
| | return { |
| | "success": True, |
| | "conversion": conversion_result.to_dict(), |
| | "message": f"Converted {value} {from_unit} = {result} {to_unit}" |
| | } |
| | |
| | except Exception as e: |
| | return { |
| | "success": False, |
| | "message": f"Unit conversion failed: {str(e)}", |
| | "error_type": type(e).__name__ |
| | } |
| | |
| | def _convert_temperature(self, value: float, from_unit: str, to_unit: str) -> float: |
| | """Convert temperature between Celsius, Fahrenheit, and Kelvin""" |
| | |
| | unit_map = {'c': 'celsius', 'f': 'fahrenheit', 'k': 'kelvin'} |
| | from_unit = unit_map.get(from_unit, from_unit) |
| | to_unit = unit_map.get(to_unit, to_unit) |
| | |
| | |
| | if from_unit == 'fahrenheit': |
| | celsius = (value - 32) * 5/9 |
| | elif from_unit == 'kelvin': |
| | celsius = value - 273.15 |
| | else: |
| | celsius = value |
| | |
| | |
| | if to_unit == 'fahrenheit': |
| | return celsius * 9/5 + 32 |
| | elif to_unit == 'kelvin': |
| | return celsius + 273.15 |
| | else: |
| | return celsius |
| |
|
| | def test_calculator_tool(): |
| | """Test the calculator tool with various operations""" |
| | tool = CalculatorTool() |
| | |
| | |
| | test_cases = [ |
| | "2 + 3 * 4", |
| | "sqrt(16) + 2^3", |
| | "sin(pi/2) + cos(0)", |
| | {"operation": "statistics", "data": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]}, |
| | {"operation": "convert", "value": 100, "from_unit": "cm", "to_unit": "m"}, |
| | {"operation": "convert", "value": 32, "from_unit": "f", "to_unit": "c"}, |
| | "10 factorial", |
| | "mean([1, 2, 3, 4, 5])", |
| | "15% of 200" |
| | ] |
| | |
| | print("🧪 Testing Calculator Tool...") |
| | |
| | for i, test_case in enumerate(test_cases, 1): |
| | print(f"\n--- Test {i}: {test_case} ---") |
| | try: |
| | result = tool.execute(test_case) |
| | |
| | if result.success: |
| | if 'calculation' in result.result: |
| | calc = result.result['calculation'] |
| | print(f"✅ Result: {calc['result']} ({calc['result_type']})") |
| | elif 'statistics' in result.result: |
| | stats = result.result['statistics'] |
| | print(f"✅ Mean: {stats['mean']}, Median: {stats['median']}, StDev: {stats.get('stdev', 'N/A')}") |
| | elif 'conversion' in result.result: |
| | conv = result.result['conversion'] |
| | print(f"✅ Conversion: {conv['result']} {conv['units']}") |
| | print(f" Message: {result.result.get('message', 'No message')}") |
| | else: |
| | print(f"❌ Error: {result.result.get('message', 'Unknown error')}") |
| | |
| | print(f" Execution time: {result.execution_time:.3f}s") |
| | |
| | except Exception as e: |
| | print(f"❌ Exception: {str(e)}") |
| |
|
| | if __name__ == "__main__": |
| | |
| | test_calculator_tool() |