import asyncio from typing import Any, Optional, Dict from openspace.grounding.core.transport.connectors import AioHttpConnector from openspace.grounding.core.security import SecurityPolicyManager from openspace.utils.logging import Logger logger = Logger.get_logger(__name__) class ShellConnector(AioHttpConnector): """ Shell backend HTTP connector Basic routes: POST /run_python {"code": str} POST /run_bash_script {"script": str, "timeout": int, "working_dir": str | None} """ def __init__( self, vm_ip: str, port: int = 5000, *, retry_times: int = 3, retry_interval: float = 5, security_manager: "SecurityPolicyManager | None" = None, ) -> None: base_url = f"http://{vm_ip}:{port}" super().__init__(base_url) self.retry_times = retry_times self.retry_interval = retry_interval self._security_manager = security_manager async def _retry_invoke( self, name: str, payload: Dict[str, Any], script_timeout: int, *, break_on_timeout: bool = False ): """ Execute HTTP request and retry Args: name: RPC method name payload: Request payload script_timeout: Script execution timeout break_on_timeout: Whether to exit immediately on timeout (default False) Returns: Server response result Raises: Exception: Last exception thrown after all retries fail """ last_exc: Exception | None = None # HTTP request timeout should be longer than script execution timeout, leaving buffer time http_timeout = script_timeout + 60 for attempt in range(1, self.retry_times + 1): try: # Pass timeout parameter to server result = await self.invoke(name, payload | {"timeout": script_timeout}) logger.info("%s executed successfully (attempt %d/%d)", name, attempt, self.retry_times) return result except asyncio.TimeoutError as exc: # Timeout exception usually does not need to be retried (script execution time too long) if break_on_timeout: logger.error("%s timed out after %d seconds, aborting retry", name, script_timeout) raise RuntimeError( f"Script execution timed out after {script_timeout} seconds" ) from exc last_exc = exc if attempt == self.retry_times: break logger.warning( "%s timed out (attempt %d/%d), retrying in %.1f seconds...", name, attempt, self.retry_times, self.retry_interval ) await asyncio.sleep(self.retry_interval) except Exception as exc: last_exc = exc if attempt == self.retry_times: break logger.warning( "%s failed (attempt %d/%d): %s, retrying in %.1f seconds...", name, attempt, self.retry_times, exc, self.retry_interval ) await asyncio.sleep(self.retry_interval) error_msg = f"{name} failed after {self.retry_times} retries" logger.error(error_msg) raise last_exc or RuntimeError(error_msg) async def run_python_script( self, code: str, *, timeout: int = 90, working_dir: Optional[str] = None, env: Optional[Dict[str, str]] = None, conda_env: Optional[str] = None ) -> Any: """ Execute Python script on remote server Args: code: Python code string timeout: Execution timeout in seconds (default 90 seconds) working_dir: Working directory for script execution (optional) env: Environment variables for script execution (optional) conda_env: Conda environment name to activate (optional) Returns: Server response result Raises: PermissionError: Security policy blocked execution RuntimeError: Execution failed or timed out """ if self._security_manager: from openspace.grounding.core.types import BackendType allowed = await self._security_manager.check_command_allowed(BackendType.SHELL, code) if not allowed: logger.error("SecurityPolicy blocked python code execution") raise PermissionError("SecurityPolicy: python code execution blocked") payload = {"code": code, "working_dir": working_dir, "env": env, "conda_env": conda_env} logger.info( "Executing python script with timeout=%d seconds%s%s%s", timeout, f", working_dir={working_dir}" if working_dir else "", f", env={list(env.keys())}" if env else "", f", conda_env={conda_env}" if conda_env else "" ) # Python script timed out, exit immediately without retry (timeout usually means script logic problem) return await self._retry_invoke( "POST /run_python", payload, timeout, break_on_timeout=True ) async def run_bash_script( self, script: str, *, timeout: int = 90, working_dir: Optional[str] = None, env: Optional[Dict[str, str]] = None, conda_env: Optional[str] = None ) -> Any: """ Execute Bash script on remote server Args: script: Bash script content (can be multi-line) timeout: Execution timeout in seconds (default 90 seconds) working_dir: Working directory for script execution (optional) env: Environment variables for script execution (optional) conda_env: Conda environment name to activate (optional) Returns: Server response result, containing status, output, error, returncode, etc. Raises: PermissionError: Security policy blocked execution RuntimeError: Execution failed or timed out """ if self._security_manager: from openspace.grounding.core.types import BackendType allowed = await self._security_manager.check_command_allowed(BackendType.SHELL, script) if not allowed: logger.error("SecurityPolicy blocked bash script execution") raise PermissionError("SecurityPolicy: bash script execution blocked") payload = {"script": script, "working_dir": working_dir, "env": env, "conda_env": conda_env} logger.info( "Executing bash script with timeout=%d seconds%s%s%s", timeout, f", working_dir={working_dir}" if working_dir else "", f", env={list(env.keys())}" if env else "", f", conda_env={conda_env}" if conda_env else "" ) # Bash script timed out, exit immediately without retry (timeout usually means script logic problem) result = await self._retry_invoke( "POST /run_bash_script", payload, timeout, break_on_timeout=True ) # Record execution result if isinstance(result, dict) and "returncode" in result: logger.info("Bash script executed with return code: %d", result.get("returncode", -1)) return result