| """ |
| Real Tool Executor |
| ================== |
| Actually installs packages and executes tool code in a real Python subprocess. |
| No simulation β real results only. |
| """ |
| import os |
| import sys |
| import json |
| import subprocess |
| import hashlib |
| import tempfile |
| import threading |
| from pathlib import Path |
|
|
| |
| PKG_DIR = Path(os.environ.get("HOME", "/home/user")) / ".praison_pkgs" |
| PKG_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| _install_lock = threading.Lock() |
| _installed_cache: set = set() |
|
|
|
|
| |
| PREINSTALLED = { |
| "requests", "json", "os", "sys", "re", "math", "datetime", "time", |
| "urllib", "urllib3", "base64", "hashlib", "pathlib", "collections", |
| "itertools", "functools", "string", "random", "uuid", "tempfile", |
| "subprocess", "threading", "asyncio", "io", "csv", "html", "http", |
| "duckduckgo_search", "duckduckgo-search", |
| "bs4", "beautifulsoup4", "httpx", "gtts", |
| } |
|
|
| |
| PKG_ALIASES = { |
| "bs4": "beautifulsoup4", |
| "duckduckgo_search": "duckduckgo-search", |
| "PIL": "Pillow", |
| "cv2": "opencv-python-headless", |
| "sklearn": "scikit-learn", |
| "yaml": "pyyaml", |
| "dotenv": "python-dotenv", |
| "telegram": "python-telegram-bot", |
| "wikipedia": "wikipedia-api", |
| "googlesearch": "googlesearch-python", |
| "forex_python": "forex-python", |
| "yfinance": "yfinance", |
| "pandas": "pandas", |
| "numpy": "numpy", |
| "matplotlib": "matplotlib", |
| } |
|
|
|
|
| def pip_install(packages: list[str]) -> tuple[bool, str]: |
| """Install one or more packages. Returns (success, message).""" |
| to_install = [] |
| for pkg in packages: |
| norm = pkg.strip().lower().replace("-", "_") |
| pip_name = PKG_ALIASES.get(pkg, PKG_ALIASES.get(norm, pkg)) |
| if pip_name.lower().replace("-","_") not in _installed_cache: |
| to_install.append(pip_name) |
|
|
| if not to_install: |
| return True, "All packages already installed" |
|
|
| with _install_lock: |
| cmd = [sys.executable, "-m", "pip", "install", "--quiet", |
| "--target", str(PKG_DIR)] + to_install |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) |
| if result.returncode == 0: |
| for p in to_install: |
| _installed_cache.add(p.lower().replace("-","_")) |
| return True, f"Installed: {', '.join(to_install)}" |
| else: |
| return False, result.stderr[-500:] |
|
|
|
|
| def build_exec_wrapper(tool_code: str, function_name: str, call_args: dict) -> str: |
| """Wrap tool code in a self-contained script that prints JSON result.""" |
| pkg_dir_str = str(PKG_DIR) |
| args_repr = json.dumps(call_args) |
| return f''' |
| import sys, json |
| sys.path.insert(0, {repr(pkg_dir_str)}) |
| |
| # ββ Tool implementation ββ |
| {tool_code} |
| |
| # ββ Execute and report ββ |
| try: |
| import inspect |
| fn = {function_name} |
| sig = inspect.signature(fn) |
| call_kwargs = json.loads({repr(args_repr)}) |
| # If single positional string arg, handle gracefully |
| params = list(sig.parameters.keys()) |
| if params and not call_kwargs: |
| call_kwargs = {{params[0]: ""}} |
| result = fn(**call_kwargs) |
| print(json.dumps({{"ok": True, "result": str(result)}})) |
| except Exception as e: |
| import traceback |
| print(json.dumps({{"ok": False, "error": str(e), "trace": traceback.format_exc()[-300:]}})) |
| ''' |
|
|
|
|
| def execute_tool( |
| tool_code: str, |
| function_name: str, |
| call_args: dict, |
| required_packages: list[str] | None = None, |
| timeout: int = 30, |
| ) -> dict: |
| """ |
| Actually execute a tool's Python code. |
| Returns {"ok": bool, "result": str, "install_msg": str, "error": str} |
| """ |
| install_msg = "" |
|
|
| |
| if required_packages: |
| ok, msg = pip_install(required_packages) |
| install_msg = msg |
| if not ok: |
| return {"ok": False, "result": "", "install_msg": install_msg, |
| "error": f"Package install failed: {msg}"} |
|
|
| |
| script = build_exec_wrapper(tool_code, function_name, call_args) |
| with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False, encoding="utf-8") as f: |
| f.write(script) |
| tmp_path = f.name |
|
|
| try: |
| env = os.environ.copy() |
| env["PYTHONPATH"] = str(PKG_DIR) + os.pathsep + env.get("PYTHONPATH", "") |
| proc = subprocess.run( |
| [sys.executable, tmp_path], |
| capture_output=True, text=True, timeout=timeout, env=env |
| ) |
| raw_out = proc.stdout.strip() |
| raw_err = proc.stderr.strip() |
|
|
| if raw_out: |
| try: |
| data = json.loads(raw_out.split("\n")[-1]) |
| data["install_msg"] = install_msg |
| data["stderr"] = raw_err[-300:] if raw_err else "" |
| return data |
| except json.JSONDecodeError: |
| return {"ok": True, "result": raw_out[:2000], "install_msg": install_msg, "stderr": raw_err} |
| else: |
| return {"ok": False, "result": "", "install_msg": install_msg, |
| "error": raw_err[:500] or "No output produced"} |
|
|
| except subprocess.TimeoutExpired: |
| return {"ok": False, "result": "", "install_msg": install_msg, |
| "error": f"Tool timed out after {timeout}s"} |
| except Exception as e: |
| return {"ok": False, "result": "", "install_msg": install_msg, "error": str(e)} |
| finally: |
| try: |
| os.unlink(tmp_path) |
| except Exception: |
| pass |
|
|
|
|
| |
|
|
| BUILTIN_TOOL_IMPLEMENTATIONS = { |
| "get_current_datetime": { |
| "packages": [], |
| "code": """ |
| import datetime |
| def get_current_datetime() -> str: |
| now = datetime.datetime.now() |
| utc = datetime.datetime.utcnow() |
| return (f"Local datetime: {now.strftime('%A, %B %d, %Y at %I:%M:%S %p')}\\n" |
| f"UTC datetime: {utc.strftime('%Y-%m-%d %H:%M:%S')} UTC\\n" |
| f"Unix timestamp: {int(now.timestamp())}\\n" |
| f"Timezone: {datetime.datetime.now().astimezone().tzname()}") |
| """, |
| "args": {}, |
| }, |
| "search_web": { |
| "packages": ["duckduckgo-search"], |
| "code": """ |
| import sys |
| sys.path.insert(0, '__PKG_DIR__') |
| def search_web(query: str, max_results: int = 6) -> str: |
| from duckduckgo_search import DDGS |
| results = [] |
| with DDGS() as ddgs: |
| for r in ddgs.text(query, max_results=max_results): |
| results.append(f"Title: {r['title']}\\nURL: {r['href']}\\nSummary: {r['body']}\\n") |
| return '\\n---\\n'.join(results) if results else 'No results found' |
| """.replace("__PKG_DIR__", str(PKG_DIR)), |
| "args": {"query": ""}, |
| }, |
| "fetch_webpage": { |
| "packages": ["requests", "beautifulsoup4"], |
| "code": """ |
| import sys |
| sys.path.insert(0, '__PKG_DIR__') |
| def fetch_webpage(url: str) -> str: |
| import requests |
| from bs4 import BeautifulSoup |
| try: |
| r = requests.get(url, timeout=10, headers={'User-Agent': 'Mozilla/5.0'}) |
| soup = BeautifulSoup(r.text, 'html.parser') |
| for tag in soup(['script','style','nav','footer','header']): |
| tag.decompose() |
| text = soup.get_text(separator='\\n', strip=True) |
| lines = [l for l in text.splitlines() if len(l.strip()) > 20] |
| return '\\n'.join(lines[:150]) |
| except Exception as e: |
| return f'Error fetching {url}: {e}' |
| """.replace("__PKG_DIR__", str(PKG_DIR)), |
| "args": {"url": ""}, |
| }, |
| "run_python_code": { |
| "packages": [], |
| "code": """ |
| import subprocess, sys, tempfile, os |
| def run_python_code(code: str) -> str: |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: |
| f.write(code) |
| tmp = f.name |
| try: |
| result = subprocess.run([sys.executable, tmp], capture_output=True, text=True, timeout=20) |
| out = (result.stdout + result.stderr).strip() |
| return out[:3000] if out else '(no output)' |
| except subprocess.TimeoutExpired: |
| return 'Error: timed out after 20s' |
| finally: |
| try: os.unlink(tmp) |
| except: pass |
| """, |
| "args": {"code": ""}, |
| }, |
| "create_voice": { |
| "packages": ["gtts"], |
| "code": """ |
| import sys, base64, io |
| sys.path.insert(0, '__PKG_DIR__') |
| def create_voice(text: str, lang: str = 'en') -> str: |
| from gtts import gTTS |
| tts = gTTS(text=text[:2000], lang=lang, slow=False) |
| buf = io.BytesIO() |
| tts.write_to_fp(buf) |
| buf.seek(0) |
| b64 = base64.b64encode(buf.read()).decode('utf-8') |
| return 'AUDIO_B64:' + b64 |
| """.replace("__PKG_DIR__", str(PKG_DIR)), |
| "args": {"text": ""}, |
| }, |
| "calculate": { |
| "packages": [], |
| "code": """ |
| import math |
| def calculate(expression: str) -> str: |
| safe_ns = {k: getattr(math, k) for k in dir(math) if not k.startswith('_')} |
| safe_ns['__builtins__'] = {} |
| try: |
| result = eval(expression.replace('^','**'), safe_ns) |
| return f'{expression} = {result}' |
| except Exception as e: |
| return f'Error: {e}' |
| """, |
| "args": {"expression": ""}, |
| }, |
| } |
|
|
|
|
| def run_builtin_tool(name: str, user_message: str = "") -> dict: |
| """Execute a real built-in tool.""" |
| spec = BUILTIN_TOOL_IMPLEMENTATIONS.get(name) |
| if not spec: |
| return {"ok": False, "result": "", "error": f"Unknown built-in: {name}"} |
| args = dict(spec["args"]) |
| |
| for k in args: |
| if args[k] == "" and user_message: |
| args[k] = user_message |
| break |
| return execute_tool(spec["code"], name, args, spec["packages"], timeout=45) |
|
|