Spaces:
Running
Running
| """ | |
| NeuraPrompt Agent — Code Execution Tools (v9.0) | |
| Upgraded: output files saved to /agent/download/ with download URLs | |
| """ | |
| import subprocess | |
| import tempfile | |
| import os | |
| import re | |
| import uuid | |
| from pathlib import Path | |
| import logging | |
| log = logging.getLogger("agent.tools.code") | |
| # Public output folder — served by FastAPI static files | |
| OUTPUT_DIR = Path("/tmp/agent_outputs") | |
| OUTPUT_DIR.mkdir(parents=True, exist_ok=True) | |
| BASE_URL = os.getenv("SPACE_HOST", "https://deepimagix-self-trained2.hf.space") | |
| def _get_output_url(filename: str) -> str: | |
| return f"{BASE_URL}/agent/download/{filename}" | |
| def run_python(code: str) -> str: | |
| """ | |
| Execute Python code safely. | |
| If code creates files, saves them to public output folder and returns download URLs. | |
| """ | |
| try: | |
| # Inject output dir so agent-generated files land in the right place | |
| preamble = f""" | |
| import os, sys | |
| _OUTPUT_DIR = "{OUTPUT_DIR}" | |
| os.makedirs(_OUTPUT_DIR, exist_ok=True) | |
| os.chdir(_OUTPUT_DIR) | |
| """ | |
| full_code = preamble + "\n" + code | |
| with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f: | |
| f.write(full_code) | |
| temp_file = f.name | |
| result = subprocess.run( | |
| ["python3", temp_file], | |
| capture_output=True, | |
| text=True, | |
| timeout=60, | |
| env={**os.environ, "MPLBACKEND": "Agg"} # headless matplotlib | |
| ) | |
| Path(temp_file).unlink(missing_ok=True) | |
| output = result.stdout.strip() | |
| error = result.stderr.strip() | |
| # Auto-install missing packages and retry | |
| if error and "ModuleNotFoundError" in error: | |
| match = re.search(r"No module named '(\w+)'", error) | |
| if match: | |
| package = match.group(1) | |
| subprocess.run( | |
| ["pip", "install", package, "--quiet"], | |
| capture_output=True | |
| ) | |
| return run_python(code) # retry | |
| # Collect any files created in the output dir | |
| created_files = list(OUTPUT_DIR.glob("*")) | |
| file_links = [] | |
| for f in created_files: | |
| if f.is_file(): | |
| file_links.append(f"📥 Download: {_get_output_url(f.name)}") | |
| parts = [] | |
| if output: | |
| parts.append(output) | |
| if error and "warning" not in error.lower(): | |
| parts.append(f"⚠️ {error}") | |
| if file_links: | |
| parts.append("\n".join(file_links)) | |
| return "\n".join(parts) if parts else "(Code ran successfully, no output)" | |
| except subprocess.TimeoutExpired: | |
| return "Error: Code execution timed out (max 60 seconds)" | |
| except Exception as e: | |
| return f"Python execution error: {str(e)}" | |
| def run_shell(command: str) -> str: | |
| """Execute shell commands safely.""" | |
| BLOCKED = ["rm -rf /", "mkfs", "dd if=", "shutdown", "reboot", "poweroff", "format c:"] | |
| if any(b in command.lower() for b in BLOCKED): | |
| return "Error: Command blocked for safety reasons." | |
| try: | |
| result = subprocess.run( | |
| command, | |
| shell=True, | |
| capture_output=True, | |
| text=True, | |
| timeout=30 | |
| ) | |
| return (result.stdout + "\n" + result.stderr).strip() or "(No output)" | |
| except subprocess.TimeoutExpired: | |
| return "Error: Command timed out" | |
| except Exception as e: | |
| return f"Shell error: {str(e)}" | |