| import base64 |
| import mimetypes |
| import os |
| from io import BytesIO |
| from typing import Optional |
|
|
| from PIL import Image |
| from pydantic import Field |
|
|
| from app.daytona.tool_base import Sandbox, SandboxToolsBase, ThreadMessage |
| from app.tool.base import ToolResult |
|
|
|
|
| |
| MAX_IMAGE_SIZE = 10 * 1024 * 1024 |
| MAX_COMPRESSED_SIZE = 5 * 1024 * 1024 |
|
|
| |
| DEFAULT_MAX_WIDTH = 1920 |
| DEFAULT_MAX_HEIGHT = 1080 |
| DEFAULT_JPEG_QUALITY = 85 |
| DEFAULT_PNG_COMPRESS_LEVEL = 6 |
|
|
| _VISION_DESCRIPTION = """ |
| A sandbox-based vision tool that allows the agent to read image files inside the sandbox using the see_image action. |
| * Only the see_image action is supported, with the parameter being the relative path of the image under /workspace. |
| * The image will be compressed and converted to base64 for use in subsequent context. |
| * Supported formats: JPG, PNG, GIF, WEBP. Maximum size: 10MB. |
| """ |
|
|
|
|
| class SandboxVisionTool(SandboxToolsBase): |
| name: str = "sandbox_vision" |
| description: str = _VISION_DESCRIPTION |
| parameters: dict = { |
| "type": "object", |
| "properties": { |
| "action": { |
| "type": "string", |
| "enum": ["see_image"], |
| "description": "要执行的视觉动作,目前仅支持 see_image", |
| }, |
| "file_path": { |
| "type": "string", |
| "description": "图片在 /workspace 下的相对路径,如 'screenshots/image.png'", |
| }, |
| }, |
| "required": ["action", "file_path"], |
| "dependencies": {"see_image": ["file_path"]}, |
| } |
|
|
| |
| |
| |
| |
|
|
| vision_message: Optional[ThreadMessage] = Field(default=None, exclude=True) |
|
|
| def __init__( |
| self, sandbox: Optional[Sandbox] = None, thread_id: Optional[str] = None, **data |
| ): |
| """Initialize with optional sandbox and thread_id.""" |
| super().__init__(**data) |
| if sandbox is not None: |
| self._sandbox = sandbox |
|
|
| def compress_image(self, image_bytes: bytes, mime_type: str, file_path: str): |
| """压缩图片,保持合理质量。""" |
| try: |
| img = Image.open(BytesIO(image_bytes)) |
| if img.mode in ("RGBA", "LA", "P"): |
| background = Image.new("RGB", img.size, (255, 255, 255)) |
| if img.mode == "P": |
| img = img.convert("RGBA") |
| background.paste( |
| img, mask=img.split()[-1] if img.mode == "RGBA" else None |
| ) |
| img = background |
| width, height = img.size |
| if width > DEFAULT_MAX_WIDTH or height > DEFAULT_MAX_HEIGHT: |
| ratio = min(DEFAULT_MAX_WIDTH / width, DEFAULT_MAX_HEIGHT / height) |
| new_width = int(width * ratio) |
| new_height = int(height * ratio) |
| img = img.resize((new_width, new_height), Image.Resampling.LANCZOS) |
| output = BytesIO() |
| if mime_type == "image/gif": |
| img.save(output, format="GIF", optimize=True) |
| output_mime = "image/gif" |
| elif mime_type == "image/png": |
| img.save( |
| output, |
| format="PNG", |
| optimize=True, |
| compress_level=DEFAULT_PNG_COMPRESS_LEVEL, |
| ) |
| output_mime = "image/png" |
| else: |
| img.save( |
| output, format="JPEG", quality=DEFAULT_JPEG_QUALITY, optimize=True |
| ) |
| output_mime = "image/jpeg" |
| compressed_bytes = output.getvalue() |
| return compressed_bytes, output_mime |
| except Exception: |
| return image_bytes, mime_type |
|
|
| async def execute( |
| self, action: str, file_path: Optional[str] = None, **kwargs |
| ) -> ToolResult: |
| """ |
| 执行视觉动作,目前仅支持 see_image。 |
| 参数: |
| action: 必须为 'see_image' |
| file_path: 图片相对路径 |
| """ |
| if action != "see_image": |
| return self.fail_response(f"未知的视觉动作: {action}") |
| if not file_path: |
| return self.fail_response("file_path 参数不能为空") |
| try: |
| await self._ensure_sandbox() |
| cleaned_path = self.clean_path(file_path) |
| full_path = f"{self.workspace_path}/{cleaned_path}" |
| try: |
| file_info = self.sandbox.fs.get_file_info(full_path) |
| if file_info.is_dir: |
| return self.fail_response(f"路径 '{cleaned_path}' 是目录,不是图片文件。") |
| except Exception: |
| return self.fail_response(f"图片文件未找到: '{cleaned_path}'") |
| if file_info.size > MAX_IMAGE_SIZE: |
| return self.fail_response( |
| f"图片文件 '{cleaned_path}' 过大 ({file_info.size / (1024*1024):.2f}MB),最大允许 {MAX_IMAGE_SIZE / (1024*1024)}MB。" |
| ) |
| try: |
| image_bytes = self.sandbox.fs.download_file(full_path) |
| except Exception: |
| return self.fail_response(f"无法读取图片文件: {cleaned_path}") |
| mime_type, _ = mimetypes.guess_type(full_path) |
| if not mime_type or not mime_type.startswith("image/"): |
| ext = os.path.splitext(cleaned_path)[1].lower() |
| if ext == ".jpg" or ext == ".jpeg": |
| mime_type = "image/jpeg" |
| elif ext == ".png": |
| mime_type = "image/png" |
| elif ext == ".gif": |
| mime_type = "image/gif" |
| elif ext == ".webp": |
| mime_type = "image/webp" |
| else: |
| return self.fail_response( |
| f"不支持或未知的图片格式: '{cleaned_path}'。支持: JPG, PNG, GIF, WEBP。" |
| ) |
| compressed_bytes, compressed_mime_type = self.compress_image( |
| image_bytes, mime_type, cleaned_path |
| ) |
| if len(compressed_bytes) > MAX_COMPRESSED_SIZE: |
| return self.fail_response( |
| f"图片文件 '{cleaned_path}' 压缩后仍过大 ({len(compressed_bytes) / (1024*1024):.2f}MB),最大允许 {MAX_COMPRESSED_SIZE / (1024*1024)}MB。" |
| ) |
| base64_image = base64.b64encode(compressed_bytes).decode("utf-8") |
| image_context_data = { |
| "mime_type": compressed_mime_type, |
| "base64": base64_image, |
| "file_path": cleaned_path, |
| "original_size": file_info.size, |
| "compressed_size": len(compressed_bytes), |
| } |
| message = ThreadMessage( |
| type="image_context", content=image_context_data, is_llm_message=False |
| ) |
| self.vision_message = message |
| |
| return ToolResult( |
| output=f"成功加载并压缩图片 '{cleaned_path}'", |
| base64_image=base64_image, |
| ) |
| except Exception as e: |
| return self.fail_response(f"see_image 执行异常: {str(e)}") |
|
|