|
|
import json
|
|
|
import re
|
|
|
import math
|
|
|
import cmath
|
|
|
from typing import Dict, List, Optional, Any
|
|
|
import requests
|
|
|
from dataclasses import dataclass
|
|
|
|
|
|
try:
|
|
|
import sympy as sp
|
|
|
import numpy as np
|
|
|
from scipy import optimize, integrate, stats
|
|
|
MATH_LIBS_AVAILABLE = True
|
|
|
except ImportError:
|
|
|
MATH_LIBS_AVAILABLE = False
|
|
|
print("Warning: Install sympy, numpy, scipy for enhanced math capabilities: pip install sympy numpy scipy")
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class ToolCall:
|
|
|
tool: str
|
|
|
query: str
|
|
|
result: Optional[str] = None
|
|
|
error: Optional[str] = None
|
|
|
|
|
|
|
|
|
class MathEngine:
|
|
|
"""Free mathematical computation engine using SymPy, NumPy, SciPy."""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.available = MATH_LIBS_AVAILABLE
|
|
|
|
|
|
def solve_equation(self, equation_str: str) -> str:
|
|
|
"""Solve mathematical equations."""
|
|
|
try:
|
|
|
|
|
|
if '=' in equation_str:
|
|
|
left, right = equation_str.split('=')
|
|
|
eq = sp.Eq(sp.sympify(left.strip()), sp.sympify(right.strip()))
|
|
|
x = sp.Symbol('x')
|
|
|
solutions = sp.solve(eq, x)
|
|
|
return f"Solutions: {solutions}"
|
|
|
else:
|
|
|
|
|
|
result = sp.sympify(equation_str)
|
|
|
simplified = sp.simplify(result)
|
|
|
return f"Result: {simplified}"
|
|
|
except Exception as e:
|
|
|
return f"Error solving equation: {str(e)}"
|
|
|
|
|
|
def calculus_operations(self, expression: str, operation: str, variable: str = 'x') -> str:
|
|
|
"""Perform calculus operations (derivative, integral, limit)."""
|
|
|
try:
|
|
|
expr = sp.sympify(expression)
|
|
|
var = sp.Symbol(variable)
|
|
|
|
|
|
if operation.lower() in ['derivative', 'diff', 'differentiate']:
|
|
|
result = sp.diff(expr, var)
|
|
|
return f"Derivative of {expression} with respect to {variable}: {result}"
|
|
|
|
|
|
elif operation.lower() in ['integral', 'integrate']:
|
|
|
result = sp.integrate(expr, var)
|
|
|
return f"Integral of {expression} with respect to {variable}: {result}"
|
|
|
|
|
|
elif operation.lower() in ['limit']:
|
|
|
result = sp.limit(expr, var, 0)
|
|
|
return f"Limit of {expression} as {variable} approaches 0: {result}"
|
|
|
|
|
|
else:
|
|
|
return f"Unknown calculus operation: {operation}"
|
|
|
|
|
|
except Exception as e:
|
|
|
return f"Error in calculus operation: {str(e)}"
|
|
|
|
|
|
def basic_math(self, expression: str) -> str:
|
|
|
"""Handle basic mathematical calculations."""
|
|
|
try:
|
|
|
|
|
|
safe_expr = expression.lower()
|
|
|
|
|
|
|
|
|
replacements = {
|
|
|
'sin': 'math.sin',
|
|
|
'cos': 'math.cos',
|
|
|
'tan': 'math.tan',
|
|
|
'log': 'math.log',
|
|
|
'ln': 'math.log',
|
|
|
'sqrt': 'math.sqrt',
|
|
|
'pi': 'math.pi',
|
|
|
'e': 'math.e',
|
|
|
'^': '**'
|
|
|
}
|
|
|
|
|
|
for old, new in replacements.items():
|
|
|
safe_expr = safe_expr.replace(old, new)
|
|
|
|
|
|
|
|
|
result = eval(safe_expr, {"__builtins__": {}, "math": math, "cmath": cmath})
|
|
|
return f"Result: {result}"
|
|
|
|
|
|
except Exception as e:
|
|
|
return f"Error in calculation: {str(e)}"
|
|
|
|
|
|
def statistics_operations(self, data_str: str, operation: str) -> str:
|
|
|
"""Perform statistical calculations."""
|
|
|
try:
|
|
|
|
|
|
data = [float(x.strip()) for x in data_str.replace('[', '').replace(']', '').split(',')]
|
|
|
|
|
|
if operation.lower() in ['mean', 'average']:
|
|
|
result = np.mean(data)
|
|
|
return f"Mean of {data}: {result}"
|
|
|
|
|
|
elif operation.lower() in ['median']:
|
|
|
result = np.median(data)
|
|
|
return f"Median of {data}: {result}"
|
|
|
|
|
|
elif operation.lower() in ['std', 'standard deviation']:
|
|
|
result = np.std(data)
|
|
|
return f"Standard deviation of {data}: {result}"
|
|
|
|
|
|
elif operation.lower() in ['variance']:
|
|
|
result = np.var(data)
|
|
|
return f"Variance of {data}: {result}"
|
|
|
|
|
|
else:
|
|
|
return f"Unknown statistical operation: {operation}"
|
|
|
|
|
|
except Exception as e:
|
|
|
return f"Error in statistical calculation: {str(e)}"
|
|
|
|
|
|
def unit_conversion(self, value: float, from_unit: str, to_unit: str) -> str:
|
|
|
"""Convert between common units."""
|
|
|
try:
|
|
|
|
|
|
if from_unit.lower() == 'celsius' and to_unit.lower() == 'fahrenheit':
|
|
|
result = (value * 9/5) + 32
|
|
|
return f"{value}°C = {result}°F"
|
|
|
elif from_unit.lower() == 'fahrenheit' and to_unit.lower() == 'celsius':
|
|
|
result = (value - 32) * 5/9
|
|
|
return f"{value}°F = {result}°C"
|
|
|
elif from_unit.lower() == 'celsius' and to_unit.lower() == 'kelvin':
|
|
|
result = value + 273.15
|
|
|
return f"{value}°C = {result}K"
|
|
|
|
|
|
|
|
|
elif from_unit.lower() == 'meters' and to_unit.lower() == 'feet':
|
|
|
result = value * 3.28084
|
|
|
return f"{value}m = {result}ft"
|
|
|
elif from_unit.lower() == 'feet' and to_unit.lower() == 'meters':
|
|
|
result = value / 3.28084
|
|
|
return f"{value}ft = {result}m"
|
|
|
|
|
|
else:
|
|
|
return f"Unit conversion not implemented: {from_unit} to {to_unit}"
|
|
|
|
|
|
except Exception as e:
|
|
|
return f"Error in unit conversion: {str(e)}"
|
|
|
|
|
|
def query(self, question: str) -> Dict[str, Any]:
|
|
|
"""Main query interface for mathematical questions."""
|
|
|
if not self.available:
|
|
|
return {
|
|
|
'success': False,
|
|
|
'error': 'Mathematical libraries not available. Install with: pip install sympy numpy scipy'
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
question_lower = question.lower().strip()
|
|
|
results = []
|
|
|
|
|
|
|
|
|
if any(word in question_lower for word in ['derivative', 'differentiate', 'diff']):
|
|
|
|
|
|
expression = question_lower.split('of')[-1].strip()
|
|
|
if 'with respect to' in expression:
|
|
|
expr_part = expression.split('with respect to')[0].strip()
|
|
|
var_part = expression.split('with respect to')[1].strip()
|
|
|
result = self.calculus_operations(expr_part, 'derivative', var_part)
|
|
|
else:
|
|
|
result = self.calculus_operations(expression, 'derivative')
|
|
|
results.append({'title': 'Derivative', 'text': result})
|
|
|
|
|
|
elif any(word in question_lower for word in ['integral', 'integrate', 'antiderivative']):
|
|
|
expression = question_lower.split('of')[-1].strip()
|
|
|
if 'with respect to' in expression:
|
|
|
expr_part = expression.split('with respect to')[0].strip()
|
|
|
var_part = expression.split('with respect to')[1].strip()
|
|
|
result = self.calculus_operations(expr_part, 'integral', var_part)
|
|
|
else:
|
|
|
result = self.calculus_operations(expression, 'integral')
|
|
|
results.append({'title': 'Integral', 'text': result})
|
|
|
|
|
|
elif any(word in question_lower for word in ['solve', 'equation']):
|
|
|
|
|
|
equation_part = question.split('solve')[-1].strip() if 'solve' in question_lower else question
|
|
|
result = self.solve_equation(equation_part)
|
|
|
results.append({'title': 'Equation Solution', 'text': result})
|
|
|
|
|
|
elif any(word in question_lower for word in ['mean', 'average', 'median', 'std', 'variance']):
|
|
|
|
|
|
for op in ['mean', 'average', 'median', 'standard deviation', 'variance']:
|
|
|
if op in question_lower:
|
|
|
data_part = question_lower.replace(op, '').replace('of', '').strip()
|
|
|
result = self.statistics_operations(data_part, op)
|
|
|
results.append({'title': f'Statistics - {op.title()}', 'text': result})
|
|
|
break
|
|
|
|
|
|
elif any(word in question_lower for word in ['convert', 'to fahrenheit', 'to celsius', 'to kelvin', 'to meters', 'to feet']):
|
|
|
|
|
|
words = question_lower.split()
|
|
|
try:
|
|
|
value = float(next(word for word in words if word.replace('.', '').isdigit()))
|
|
|
if 'celsius' in question_lower and 'fahrenheit' in question_lower:
|
|
|
result = self.unit_conversion(value, 'celsius', 'fahrenheit')
|
|
|
elif 'fahrenheit' in question_lower and 'celsius' in question_lower:
|
|
|
result = self.unit_conversion(value, 'fahrenheit', 'celsius')
|
|
|
else:
|
|
|
result = "Unit conversion not recognized"
|
|
|
results.append({'title': 'Unit Conversion', 'text': result})
|
|
|
except:
|
|
|
results.append({'title': 'Unit Conversion', 'text': 'Could not parse conversion request'})
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
|
math_expr = question.lower()
|
|
|
for word in ['calculate', 'compute', 'evaluate', 'what is', 'find', 'test:', 'test']:
|
|
|
math_expr = math_expr.replace(word, '').strip()
|
|
|
|
|
|
|
|
|
import string
|
|
|
math_expr = math_expr.translate(str.maketrans('', '', '?!'))
|
|
|
|
|
|
result = self.basic_math(math_expr)
|
|
|
results.append({'title': 'Calculation', 'text': result})
|
|
|
|
|
|
if results:
|
|
|
return {
|
|
|
'success': True,
|
|
|
'results': results
|
|
|
}
|
|
|
else:
|
|
|
return {
|
|
|
'success': False,
|
|
|
'error': 'Could not process mathematical query'
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {
|
|
|
'success': False,
|
|
|
'error': f'Math engine error: {str(e)}'
|
|
|
}
|
|
|
|
|
|
|
|
|
class SerperAPI:
|
|
|
def __init__(self, api_key: str):
|
|
|
self.api_key = api_key
|
|
|
self.base_url = "https://google.serper.dev/search"
|
|
|
|
|
|
def search(self, query: str, num_results: int = 5) -> Dict[str, Any]:
|
|
|
"""Search the web using Serper API."""
|
|
|
try:
|
|
|
headers = {
|
|
|
'X-API-KEY': self.api_key,
|
|
|
'Content-Type': 'application/json'
|
|
|
}
|
|
|
|
|
|
payload = {
|
|
|
'q': query,
|
|
|
'num': num_results
|
|
|
}
|
|
|
|
|
|
response = requests.post(self.base_url, headers=headers, json=payload, timeout=10)
|
|
|
response.raise_for_status()
|
|
|
|
|
|
data = response.json()
|
|
|
|
|
|
results = []
|
|
|
|
|
|
|
|
|
if 'organic' in data:
|
|
|
for item in data['organic']:
|
|
|
results.append({
|
|
|
'title': item.get('title', ''),
|
|
|
'link': item.get('link', ''),
|
|
|
'snippet': item.get('snippet', ''),
|
|
|
'date': item.get('date', '')
|
|
|
})
|
|
|
|
|
|
|
|
|
knowledge_graph = None
|
|
|
if 'knowledgeGraph' in data:
|
|
|
kg = data['knowledgeGraph']
|
|
|
knowledge_graph = {
|
|
|
'title': kg.get('title', ''),
|
|
|
'type': kg.get('type', ''),
|
|
|
'description': kg.get('description', ''),
|
|
|
'attributes': kg.get('attributes', {})
|
|
|
}
|
|
|
|
|
|
return {
|
|
|
'success': True,
|
|
|
'results': results,
|
|
|
'knowledge_graph': knowledge_graph,
|
|
|
'search_information': data.get('searchInformation', {})
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {
|
|
|
'success': False,
|
|
|
'error': f'Serper API error: {str(e)}'
|
|
|
}
|
|
|
|
|
|
|
|
|
class ToolOrchestrator:
|
|
|
def __init__(self, serper_api_key: Optional[str] = None):
|
|
|
self.math_engine = MathEngine()
|
|
|
self.serper = SerperAPI(serper_api_key) if serper_api_key else None
|
|
|
|
|
|
def should_use_math_engine(self, query: str) -> bool:
|
|
|
"""Determine if query should be routed to the math engine."""
|
|
|
math_indicators = [
|
|
|
|
|
|
r'\b(?:calculate|solve|compute|evaluate|find)\b',
|
|
|
r'[+\-*/=()]',
|
|
|
r'\b(?:integral|derivative|limit|sum|product)\b',
|
|
|
r'\b(?:equation|formula|expression)\b',
|
|
|
|
|
|
r'\b(?:physics|chemistry|biology|mathematics|calculus|algebra|geometry|trigonometry)\b',
|
|
|
r'\b(?:mass|energy|force|velocity|acceleration|temperature|pressure)\b',
|
|
|
r'\b(?:molecular|atomic|quantum|thermodynamic)\b',
|
|
|
|
|
|
r'\b(?:kg|m/s|joule|newton|pascal|kelvin|celsius|fahrenheit)\b',
|
|
|
r'\b(?:pi|euler|planck|avogadro|boltzmann)\b',
|
|
|
|
|
|
r'\d+\s*[\+\-\*/\^]\s*\d+',
|
|
|
r'\b(?:square root|log|ln|sin|cos|tan|exp)\b',
|
|
|
]
|
|
|
|
|
|
query_lower = query.lower()
|
|
|
return any(re.search(pattern, query_lower) for pattern in math_indicators)
|
|
|
|
|
|
def should_use_serper(self, query: str) -> bool:
|
|
|
"""Determine if query should be routed to Serper for web search."""
|
|
|
web_indicators = [
|
|
|
|
|
|
r'\b(?:current|latest|recent|today|yesterday|this year|2024|2025)\b',
|
|
|
r'\b(?:news|breaking|update|announcement)\b',
|
|
|
|
|
|
r'\b(?:when did|what is|who is|where is|how many|what happened)\b',
|
|
|
r'\b(?:price|cost|stock|market|weather|temperature)\b',
|
|
|
|
|
|
r'\b(?:company|corporation|startup|CEO|president|politician)\b',
|
|
|
r'\b(?:movie|film|song|album|book|game|app)\b',
|
|
|
|
|
|
r'\b(?:restaurant|hotel|store|hospital|university|airport)\b',
|
|
|
r'\b(?:near me|in [A-Z][a-z]+|located in)\b',
|
|
|
]
|
|
|
|
|
|
query_lower = query.lower()
|
|
|
return any(re.search(pattern, query_lower) for pattern in web_indicators)
|
|
|
|
|
|
def execute_tool_call(self, tool_call: ToolCall) -> ToolCall:
|
|
|
"""Execute a tool call and return the result."""
|
|
|
try:
|
|
|
if tool_call.tool == "math_engine" and self.math_engine:
|
|
|
result = self.math_engine.query(tool_call.query)
|
|
|
if result['success']:
|
|
|
|
|
|
formatted_results = []
|
|
|
for r in result['results']:
|
|
|
formatted_results.append(f"{r['title']}: {r['text']}")
|
|
|
tool_call.result = "\n".join(formatted_results)
|
|
|
else:
|
|
|
tool_call.error = result['error']
|
|
|
|
|
|
elif tool_call.tool == "serper" and self.serper:
|
|
|
result = self.serper.search(tool_call.query)
|
|
|
if result['success']:
|
|
|
|
|
|
formatted_results = []
|
|
|
|
|
|
|
|
|
if result['knowledge_graph']:
|
|
|
kg = result['knowledge_graph']
|
|
|
formatted_results.append(f"**{kg['title']}**")
|
|
|
if kg['description']:
|
|
|
formatted_results.append(kg['description'])
|
|
|
formatted_results.append("")
|
|
|
|
|
|
|
|
|
for i, r in enumerate(result['results'][:3]):
|
|
|
formatted_results.append(f"{i+1}. **{r['title']}**")
|
|
|
formatted_results.append(f" {r['snippet']}")
|
|
|
formatted_results.append("")
|
|
|
|
|
|
tool_call.result = "\n".join(formatted_results)
|
|
|
else:
|
|
|
tool_call.error = result['error']
|
|
|
|
|
|
else:
|
|
|
tool_call.error = f"Tool '{tool_call.tool}' not available or configured"
|
|
|
|
|
|
except Exception as e:
|
|
|
tool_call.error = f"Tool execution error: {str(e)}"
|
|
|
|
|
|
return tool_call
|
|
|
|
|
|
def route_query(self, query: str) -> Optional[ToolCall]:
|
|
|
"""Determine which tool to use for a query, if any."""
|
|
|
if self.should_use_math_engine(query):
|
|
|
return ToolCall(tool="math_engine", query=query)
|
|
|
elif self.should_use_serper(query):
|
|
|
return ToolCall(tool="serper", query=query)
|
|
|
else:
|
|
|
return None
|
|
|
|