PraisonAI / app /tool_executor.py
Sanyam400's picture
Upload 2 files
0b7e02a verified
"""
Real Tool Executor
==================
Actually installs packages and executes tool code in a real Python subprocess.
No simulation β€” real results only.
"""
import os
import sys
import json
import subprocess
import hashlib
import tempfile
import threading
from pathlib import Path
# Persistent package cache dir (survives restarts within same container session)
PKG_DIR = Path(os.environ.get("HOME", "/home/user")) / ".praison_pkgs"
PKG_DIR.mkdir(parents=True, exist_ok=True)
_install_lock = threading.Lock()
_installed_cache: set = set() # packages confirmed installed this session
# ── Pre-installed packages that are always available ──────────────────────────
PREINSTALLED = {
"requests", "json", "os", "sys", "re", "math", "datetime", "time",
"urllib", "urllib3", "base64", "hashlib", "pathlib", "collections",
"itertools", "functools", "string", "random", "uuid", "tempfile",
"subprocess", "threading", "asyncio", "io", "csv", "html", "http",
"duckduckgo_search", "duckduckgo-search",
"bs4", "beautifulsoup4", "httpx", "gtts",
}
# ── Package name normalisation (import name -> pip name) ─────────────────────
PKG_ALIASES = {
"bs4": "beautifulsoup4",
"duckduckgo_search": "duckduckgo-search",
"PIL": "Pillow",
"cv2": "opencv-python-headless",
"sklearn": "scikit-learn",
"yaml": "pyyaml",
"dotenv": "python-dotenv",
"telegram": "python-telegram-bot",
"wikipedia": "wikipedia-api",
"googlesearch": "googlesearch-python",
"forex_python": "forex-python",
"yfinance": "yfinance",
"pandas": "pandas",
"numpy": "numpy",
"matplotlib": "matplotlib",
}
def pip_install(packages: list[str]) -> tuple[bool, str]:
"""Install one or more packages. Returns (success, message)."""
to_install = []
for pkg in packages:
norm = pkg.strip().lower().replace("-", "_")
pip_name = PKG_ALIASES.get(pkg, PKG_ALIASES.get(norm, pkg))
if pip_name.lower().replace("-","_") not in _installed_cache:
to_install.append(pip_name)
if not to_install:
return True, "All packages already installed"
with _install_lock:
cmd = [sys.executable, "-m", "pip", "install", "--quiet",
"--target", str(PKG_DIR)] + to_install
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode == 0:
for p in to_install:
_installed_cache.add(p.lower().replace("-","_"))
return True, f"Installed: {', '.join(to_install)}"
else:
return False, result.stderr[-500:]
def build_exec_wrapper(tool_code: str, function_name: str, call_args: dict) -> str:
"""Wrap tool code in a self-contained script that prints JSON result."""
pkg_dir_str = str(PKG_DIR)
args_repr = json.dumps(call_args)
return f'''
import sys, json
sys.path.insert(0, {repr(pkg_dir_str)})
# ── Tool implementation ──
{tool_code}
# ── Execute and report ──
try:
import inspect
fn = {function_name}
sig = inspect.signature(fn)
call_kwargs = json.loads({repr(args_repr)})
# If single positional string arg, handle gracefully
params = list(sig.parameters.keys())
if params and not call_kwargs:
call_kwargs = {{params[0]: ""}}
result = fn(**call_kwargs)
print(json.dumps({{"ok": True, "result": str(result)}}))
except Exception as e:
import traceback
print(json.dumps({{"ok": False, "error": str(e), "trace": traceback.format_exc()[-300:]}}))
'''
def execute_tool(
tool_code: str,
function_name: str,
call_args: dict,
required_packages: list[str] | None = None,
timeout: int = 30,
) -> dict:
"""
Actually execute a tool's Python code.
Returns {"ok": bool, "result": str, "install_msg": str, "error": str}
"""
install_msg = ""
# Install required packages first
if required_packages:
ok, msg = pip_install(required_packages)
install_msg = msg
if not ok:
return {"ok": False, "result": "", "install_msg": install_msg,
"error": f"Package install failed: {msg}"}
# Write to temp file and execute
script = build_exec_wrapper(tool_code, function_name, call_args)
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False, encoding="utf-8") as f:
f.write(script)
tmp_path = f.name
try:
env = os.environ.copy()
env["PYTHONPATH"] = str(PKG_DIR) + os.pathsep + env.get("PYTHONPATH", "")
proc = subprocess.run(
[sys.executable, tmp_path],
capture_output=True, text=True, timeout=timeout, env=env
)
raw_out = proc.stdout.strip()
raw_err = proc.stderr.strip()
if raw_out:
try:
data = json.loads(raw_out.split("\n")[-1])
data["install_msg"] = install_msg
data["stderr"] = raw_err[-300:] if raw_err else ""
return data
except json.JSONDecodeError:
return {"ok": True, "result": raw_out[:2000], "install_msg": install_msg, "stderr": raw_err}
else:
return {"ok": False, "result": "", "install_msg": install_msg,
"error": raw_err[:500] or "No output produced"}
except subprocess.TimeoutExpired:
return {"ok": False, "result": "", "install_msg": install_msg,
"error": f"Tool timed out after {timeout}s"}
except Exception as e:
return {"ok": False, "result": "", "install_msg": install_msg, "error": str(e)}
finally:
try:
os.unlink(tmp_path)
except Exception:
pass
# ── Built-in real tools (always work, no install needed) ─────────────────────
BUILTIN_TOOL_IMPLEMENTATIONS = {
"get_current_datetime": {
"packages": [],
"code": """
import datetime
def get_current_datetime() -> str:
now = datetime.datetime.now()
utc = datetime.datetime.utcnow()
return (f"Local datetime: {now.strftime('%A, %B %d, %Y at %I:%M:%S %p')}\\n"
f"UTC datetime: {utc.strftime('%Y-%m-%d %H:%M:%S')} UTC\\n"
f"Unix timestamp: {int(now.timestamp())}\\n"
f"Timezone: {datetime.datetime.now().astimezone().tzname()}")
""",
"args": {},
},
"search_web": {
"packages": ["duckduckgo-search"],
"code": """
import sys
sys.path.insert(0, '__PKG_DIR__')
def search_web(query: str, max_results: int = 6) -> str:
from duckduckgo_search import DDGS
results = []
with DDGS() as ddgs:
for r in ddgs.text(query, max_results=max_results):
results.append(f"Title: {r['title']}\\nURL: {r['href']}\\nSummary: {r['body']}\\n")
return '\\n---\\n'.join(results) if results else 'No results found'
""".replace("__PKG_DIR__", str(PKG_DIR)),
"args": {"query": ""},
},
"fetch_webpage": {
"packages": ["requests", "beautifulsoup4"],
"code": """
import sys
sys.path.insert(0, '__PKG_DIR__')
def fetch_webpage(url: str) -> str:
import requests
from bs4 import BeautifulSoup
try:
r = requests.get(url, timeout=10, headers={'User-Agent': 'Mozilla/5.0'})
soup = BeautifulSoup(r.text, 'html.parser')
for tag in soup(['script','style','nav','footer','header']):
tag.decompose()
text = soup.get_text(separator='\\n', strip=True)
lines = [l for l in text.splitlines() if len(l.strip()) > 20]
return '\\n'.join(lines[:150])
except Exception as e:
return f'Error fetching {url}: {e}'
""".replace("__PKG_DIR__", str(PKG_DIR)),
"args": {"url": ""},
},
"run_python_code": {
"packages": [],
"code": """
import subprocess, sys, tempfile, os
def run_python_code(code: str) -> str:
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(code)
tmp = f.name
try:
result = subprocess.run([sys.executable, tmp], capture_output=True, text=True, timeout=20)
out = (result.stdout + result.stderr).strip()
return out[:3000] if out else '(no output)'
except subprocess.TimeoutExpired:
return 'Error: timed out after 20s'
finally:
try: os.unlink(tmp)
except: pass
""",
"args": {"code": ""},
},
"create_voice": {
"packages": ["gtts"],
"code": """
import sys, base64, io
sys.path.insert(0, '__PKG_DIR__')
def create_voice(text: str, lang: str = 'en') -> str:
from gtts import gTTS
tts = gTTS(text=text[:2000], lang=lang, slow=False)
buf = io.BytesIO()
tts.write_to_fp(buf)
buf.seek(0)
b64 = base64.b64encode(buf.read()).decode('utf-8')
return 'AUDIO_B64:' + b64
""".replace("__PKG_DIR__", str(PKG_DIR)),
"args": {"text": ""},
},
"calculate": {
"packages": [],
"code": """
import math
def calculate(expression: str) -> str:
safe_ns = {k: getattr(math, k) for k in dir(math) if not k.startswith('_')}
safe_ns['__builtins__'] = {}
try:
result = eval(expression.replace('^','**'), safe_ns)
return f'{expression} = {result}'
except Exception as e:
return f'Error: {e}'
""",
"args": {"expression": ""},
},
}
def run_builtin_tool(name: str, user_message: str = "") -> dict:
"""Execute a real built-in tool."""
spec = BUILTIN_TOOL_IMPLEMENTATIONS.get(name)
if not spec:
return {"ok": False, "result": "", "error": f"Unknown built-in: {name}"}
args = dict(spec["args"])
# Fill the first string arg with user_message if empty
for k in args:
if args[k] == "" and user_message:
args[k] = user_message
break
return execute_tool(spec["code"], name, args, spec["packages"], timeout=45)