import os import subprocess import tempfile import zipfile from pathlib import Path from .config import config def execute_python_code(code: str, required_packages: str | None = None) -> str: """ Executes the given Python code in the Docker sandbox; note that created files stay in the sandbox and must be shared explicitly using `export_files`. Args: code (str): The Python code to execute. required_packages (str | None, optional): A comma-separated string of Python package names that are required for the execution. Defaults to None. Returns: str: Standard output or "Executed successfully" if the execution succeeds; otherwise, a detailed error message. """ required_packages = ( [p.strip() for p in required_packages.split(",")] if (required_packages and required_packages.strip()) else None ) if required_packages: pkg_tmp_file = tempfile.NamedTemporaryFile( mode="w", encoding="utf-8", suffix=".py", delete=False, ) pkg_tmp_file.close() try: args = [ "uv", "run", "--with", ",".join(required_packages), pkg_tmp_file.name, ] result = subprocess.run( args, cwd=config.working_dir, capture_output=True, text=True, timeout=config.pkg_timeout, ) if result.returncode != 0: return result.stderr except subprocess.TimeoutExpired: return "Package installation timeout." except Exception as e: return f"Package installation error: {str(e) or 'Unknown failure'}" finally: if pkg_tmp_file and os.path.exists(pkg_tmp_file.name): try: os.remove(pkg_tmp_file.name) except Exception: pass code_tmp_file = tempfile.NamedTemporaryFile( mode="w", encoding="utf-8", suffix=".py", delete=False, ) code_tmp_file.write(code) code_tmp_file.close() try: args = ( [ "uv", "run", "--with", ",".join(required_packages), code_tmp_file.name, ] if required_packages else [ "uv", "run", code_tmp_file.name, ] ) result = subprocess.run( args, cwd=config.working_dir, capture_output=True, text=True, timeout=config.code_timeout, ) if result.returncode == 0: return result.stdout if result.stdout else "Executed successfully." return result.stderr except subprocess.TimeoutExpired: return "Code execution timeout." except Exception as e: return f"Code execution error: {str(e) or 'Unknown failure'}" finally: if code_tmp_file and os.path.exists(code_tmp_file.name): try: os.remove(code_tmp_file.name) except Exception: pass def execute_shell_command(command: str) -> str: """ Executes the given shell command in the Docker sandbox; note that this is mostly intended to install system-level dependencies. Args: command (str): The shell command to execute. Returns: str: Standard output or "Executed successfully" if the execution succeeds; otherwise, a detailed error message. """ try: result = subprocess.run( command, shell=True, cwd=config.working_dir, capture_output=True, text=True, timeout=config.command_timeout, ) if result.returncode == 0: return result.stdout if result.stdout else "Executed successfully." return result.stderr except subprocess.TimeoutExpired: return "Command execution timeout." except Exception as e: return f"Command execution error: {str(e) or 'Unknown failure'}" def export_files(file_paths: str, as_zip: bool = False) -> tuple[list[str], str | None]: """ Exports Docker sandbox files as download URLs to share with the user. Optionally bundles them into a ZIP archive. Args: file_paths (str): A comma-separated string of file paths to export. as_zip (bool, optional): If True, bundles files as a ZIP archive before exporting. Defaults to False. Returns: tuple[list[str], str | None]: A list of download URLs and an error message, if any. """ paths = [Path(f.strip()) for f in file_paths.strip().split(",") if f.strip()] for idx, path in enumerate(paths): resolved = ( path.resolve() if path.is_absolute() else (config.working_dir / path).resolve() ) try: resolved.relative_to(config.working_dir) except ValueError: return [], f"Path '{resolved}' is outside the working directory." paths[idx] = resolved for path in paths: if not path.exists(): return [], f"File '{path}' not found." if not path.is_file(): return [], f"Expected a file, but '{path}' is a directory." if as_zip: with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as zip_tmp_file: with zipfile.ZipFile(zip_tmp_file.name, "w") as zip_file: for path in paths: zip_file.write(str(path), arcname=path.name) return [zip_tmp_file.name], None return [str(p) for p in paths], None