Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python | |
| # coding=utf-8 | |
| # Copyright 2024 The HuggingFace Inc. team. All rights reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| import base64 | |
| import json | |
| import pickle | |
| import re | |
| import time | |
| from io import BytesIO | |
| from pathlib import Path | |
| from textwrap import dedent | |
| from typing import Any, Dict, List, Tuple | |
| import requests | |
| from PIL import Image | |
| from .local_python_executor import PythonExecutor | |
| from .monitoring import LogLevel | |
| from .tools import Tool, get_tools_definition_code | |
| try: | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| except ModuleNotFoundError: | |
| pass | |
| class RemotePythonExecutor(PythonExecutor): | |
| def __init__(self, additional_imports: List[str], logger): | |
| self.additional_imports = additional_imports | |
| self.logger = logger | |
| self.logger.log("Initializing executor, hold on...") | |
| self.final_answer_pattern = re.compile(r"^final_answer\((.*)\)$", re.M) | |
| self.installed_packages = [] | |
| def run_code_raise_errors(self, code: str, return_final_answer: bool = False) -> Tuple[Any, str]: | |
| raise NotImplementedError | |
| def send_tools(self, tools: Dict[str, Tool]): | |
| tool_definition_code = get_tools_definition_code(tools) | |
| packages_to_install = set() | |
| for tool in tools.values(): | |
| for package in tool.to_dict()["requirements"]: | |
| if package not in self.installed_packages: | |
| packages_to_install.add(package) | |
| self.installed_packages.append(package) | |
| execution = self.run_code_raise_errors( | |
| f"!pip install {' '.join(packages_to_install)}\n" + tool_definition_code | |
| ) | |
| self.logger.log(execution[1]) | |
| def send_variables(self, variables: dict): | |
| """ | |
| Send variables to the kernel namespace using pickle. | |
| """ | |
| pickled_vars = base64.b64encode(pickle.dumps(variables)).decode() | |
| code = f""" | |
| import pickle, base64 | |
| vars_dict = pickle.loads(base64.b64decode('{pickled_vars}')) | |
| locals().update(vars_dict) | |
| """ | |
| self.run_code_raise_errors(code) | |
| def __call__(self, code_action: str) -> Tuple[Any, str, bool]: | |
| """Check if code is a final answer and run it accordingly""" | |
| is_final_answer = bool(self.final_answer_pattern.search(code_action)) | |
| output = self.run_code_raise_errors(code_action, return_final_answer=is_final_answer) | |
| return output[0], output[1], is_final_answer | |
| def install_packages(self, additional_imports: List[str]): | |
| additional_imports = additional_imports + ["smolagents"] | |
| self.run_code_raise_errors(f"!pip install {' '.join(additional_imports)}") | |
| return additional_imports | |
| class E2BExecutor(RemotePythonExecutor): | |
| def __init__(self, additional_imports: List[str], logger): | |
| super().__init__(additional_imports, logger) | |
| try: | |
| from e2b_code_interpreter import Sandbox | |
| except ModuleNotFoundError: | |
| raise ModuleNotFoundError( | |
| """Please install 'e2b' extra to use E2BExecutor: `pip install 'smolagents[e2b]'`""" | |
| ) | |
| self.sandbox = Sandbox() | |
| self.installed_packages = self.install_packages(additional_imports) | |
| self.logger.log("E2B is running", level=LogLevel.INFO) | |
| def run_code_raise_errors(self, code: str, return_final_answer: bool = False) -> Tuple[Any, str]: | |
| execution = self.sandbox.run_code( | |
| code, | |
| ) | |
| if execution.error: | |
| execution_logs = "\n".join([str(log) for log in execution.logs.stdout]) | |
| logs = execution_logs | |
| logs += "Executing code yielded an error:" | |
| logs += execution.error.name | |
| logs += execution.error.value | |
| logs += execution.error.traceback | |
| raise ValueError(logs) | |
| self.logger.log(execution.logs) | |
| execution_logs = "\n".join([str(log) for log in execution.logs.stdout]) | |
| if not execution.results: | |
| return None, execution_logs | |
| else: | |
| for result in execution.results: | |
| if result.is_main_result: | |
| for attribute_name in ["jpeg", "png"]: | |
| if getattr(result, attribute_name) is not None: | |
| image_output = getattr(result, attribute_name) | |
| decoded_bytes = base64.b64decode(image_output.encode("utf-8")) | |
| return Image.open(BytesIO(decoded_bytes)), execution_logs | |
| for attribute_name in [ | |
| "chart", | |
| "data", | |
| "html", | |
| "javascript", | |
| "json", | |
| "latex", | |
| "markdown", | |
| "pdf", | |
| "svg", | |
| "text", | |
| ]: | |
| if getattr(result, attribute_name) is not None: | |
| return getattr(result, attribute_name), execution_logs | |
| if return_final_answer: | |
| raise ValueError("No main result returned by executor!") | |
| return None, execution_logs | |
| class DockerExecutor(RemotePythonExecutor): | |
| """ | |
| Executes Python code using Jupyter Kernel Gateway in a Docker container. | |
| """ | |
| def __init__( | |
| self, | |
| additional_imports: List[str], | |
| logger, | |
| host: str = "127.0.0.1", | |
| port: int = 8888, | |
| ): | |
| """ | |
| Initialize the Docker-based Jupyter Kernel Gateway executor. | |
| """ | |
| super().__init__(additional_imports, logger) | |
| try: | |
| import docker | |
| from websocket import create_connection | |
| except ModuleNotFoundError: | |
| raise ModuleNotFoundError( | |
| "Please install 'docker' extra to use DockerExecutor: `pip install 'smolagents[docker]'`" | |
| ) | |
| self.host = host | |
| self.port = port | |
| # Initialize Docker | |
| try: | |
| self.client = docker.from_env() | |
| except docker.errors.DockerException as e: | |
| raise RuntimeError("Could not connect to Docker daemon: make sure Docker is running.") from e | |
| # Build and start container | |
| try: | |
| self.logger.log("Building Docker image...", level=LogLevel.INFO) | |
| dockerfile_path = Path(__file__).parent / "Dockerfile" | |
| if not dockerfile_path.exists(): | |
| with open(dockerfile_path, "w") as f: | |
| f.write("""FROM python:3.12-slim | |
| RUN pip install jupyter_kernel_gateway requests numpy pandas | |
| RUN pip install jupyter_client notebook | |
| EXPOSE 8888 | |
| CMD ["jupyter", "kernelgateway", "--KernelGatewayApp.ip='0.0.0.0'", "--KernelGatewayApp.port=8888", "--KernelGatewayApp.allow_origin='*'"] | |
| """) | |
| _, build_logs = self.client.images.build( | |
| path=str(dockerfile_path.parent), dockerfile=str(dockerfile_path), tag="jupyter-kernel" | |
| ) | |
| self.logger.log(build_logs, level=LogLevel.DEBUG) | |
| self.logger.log(f"Starting container on {host}:{port}...", level=LogLevel.INFO) | |
| self.container = self.client.containers.run( | |
| "jupyter-kernel", ports={"8888/tcp": (host, port)}, detach=True | |
| ) | |
| retries = 0 | |
| while self.container.status != "running" and retries < 5: | |
| self.logger.log(f"Container status: {self.container.status}, waiting...", level=LogLevel.INFO) | |
| time.sleep(1) | |
| self.container.reload() | |
| retries += 1 | |
| self.base_url = f"http://{host}:{port}" | |
| # Create new kernel via HTTP | |
| r = requests.post(f"{self.base_url}/api/kernels") | |
| if r.status_code != 201: | |
| error_details = { | |
| "status_code": r.status_code, | |
| "headers": dict(r.headers), | |
| "url": r.url, | |
| "body": r.text, | |
| "request_method": r.request.method, | |
| "request_headers": dict(r.request.headers), | |
| "request_body": r.request.body, | |
| } | |
| self.logger.log_error(f"Failed to create kernel. Details: {json.dumps(error_details, indent=2)}") | |
| raise RuntimeError(f"Failed to create kernel: Status {r.status_code}\nResponse: {r.text}") from None | |
| self.kernel_id = r.json()["id"] | |
| ws_url = f"ws://{host}:{port}/api/kernels/{self.kernel_id}/channels" | |
| self.ws = create_connection(ws_url) | |
| self.installed_packages = self.install_packages(additional_imports) | |
| self.logger.log( | |
| f"Container {self.container.short_id} is running with kernel {self.kernel_id}", level=LogLevel.INFO | |
| ) | |
| except Exception as e: | |
| self.cleanup() | |
| raise RuntimeError(f"Failed to initialize Jupyter kernel: {e}") from e | |
| def run_code_raise_errors(self, code_action: str, return_final_answer: bool = False) -> Tuple[Any, str]: | |
| """ | |
| Execute code and return result based on whether it's a final answer. | |
| """ | |
| try: | |
| if return_final_answer: | |
| match = self.final_answer_pattern.search(code_action) | |
| if match: | |
| pre_final_answer_code = self.final_answer_pattern.sub("", code_action) | |
| result_expr = match.group(1) | |
| wrapped_code = pre_final_answer_code + dedent(f""" | |
| import pickle, base64 | |
| _result = {result_expr} | |
| print("RESULT_PICKLE:" + base64.b64encode(pickle.dumps(_result)).decode()) | |
| """) | |
| else: | |
| wrapped_code = code_action | |
| # Send execute request | |
| msg_id = self._send_execute_request(wrapped_code) | |
| # Collect output and results | |
| outputs = [] | |
| result = None | |
| waiting_for_idle = False | |
| while True: | |
| msg = json.loads(self.ws.recv()) | |
| msg_type = msg.get("msg_type", "") | |
| parent_msg_id = msg.get("parent_header", {}).get("msg_id") | |
| # Only process messages related to our execute request | |
| if parent_msg_id != msg_id: | |
| continue | |
| if msg_type == "stream": | |
| text = msg["content"]["text"] | |
| if return_final_answer and text.startswith("RESULT_PICKLE:"): | |
| pickle_data = text[len("RESULT_PICKLE:") :].strip() | |
| result = pickle.loads(base64.b64decode(pickle_data)) | |
| waiting_for_idle = True | |
| else: | |
| outputs.append(text) | |
| elif msg_type == "error": | |
| traceback = msg["content"].get("traceback", []) | |
| raise RuntimeError("\n".join(traceback)) from None | |
| elif msg_type == "status" and msg["content"]["execution_state"] == "idle": | |
| if not return_final_answer or waiting_for_idle: | |
| break | |
| return result, "".join(outputs) | |
| except Exception as e: | |
| self.logger.log_error(f"Code execution failed: {e}") | |
| raise | |
| def _send_execute_request(self, code: str) -> str: | |
| """Send code execution request to kernel.""" | |
| import uuid | |
| # Generate a unique message ID | |
| msg_id = str(uuid.uuid4()) | |
| # Create execute request | |
| execute_request = { | |
| "header": { | |
| "msg_id": msg_id, | |
| "username": "anonymous", | |
| "session": str(uuid.uuid4()), | |
| "msg_type": "execute_request", | |
| "version": "5.0", | |
| }, | |
| "parent_header": {}, | |
| "metadata": {}, | |
| "content": { | |
| "code": code, | |
| "silent": False, | |
| "store_history": True, | |
| "user_expressions": {}, | |
| "allow_stdin": False, | |
| }, | |
| } | |
| self.ws.send(json.dumps(execute_request)) | |
| return msg_id | |
| def cleanup(self): | |
| """Clean up resources.""" | |
| try: | |
| if hasattr(self, "container"): | |
| self.logger.log(f"Stopping and removing container {self.container.short_id}...", level=LogLevel.INFO) | |
| self.container.stop() | |
| self.container.remove() | |
| self.logger.log("Container cleanup completed", level=LogLevel.INFO) | |
| except Exception as e: | |
| self.logger.log_error(f"Error during cleanup: {e}") | |
| def delete(self): | |
| """Ensure cleanup on deletion.""" | |
| self.cleanup() | |
| __all__ = ["E2BExecutor", "DockerExecutor"] | |