Spaces:
Build error
Build error
| import base64 | |
| import asyncio | |
| import logging | |
| from typing import Any, Dict | |
| import cv2 | |
| from reachy_mini_conversation_app.tools.core_tools import Tool, ToolDependencies | |
| logger = logging.getLogger(__name__) | |
| class Camera(Tool): | |
| """Take a picture with the camera and ask a question about it.""" | |
| name = "camera" | |
| description = "Take a picture with the camera and ask a question about it." | |
| parameters_schema = { | |
| "type": "object", | |
| "properties": { | |
| "question": { | |
| "type": "string", | |
| "description": "The question to ask about the picture", | |
| }, | |
| }, | |
| "required": ["question"], | |
| } | |
| async def __call__(self, deps: ToolDependencies, **kwargs: Any) -> Dict[str, Any]: | |
| """Take a picture with the camera and ask a question about it.""" | |
| image_query = (kwargs.get("question") or "").strip() | |
| if not image_query: | |
| logger.warning("camera: empty question") | |
| return {"error": "question must be a non-empty string"} | |
| logger.info("Tool call: camera question=%s", image_query[:120]) | |
| # Get frame from camera worker buffer (like main_works.py) | |
| if deps.camera_worker is not None: | |
| frame = deps.camera_worker.get_latest_frame() | |
| if frame is None: | |
| logger.error("No frame available from camera worker") | |
| return {"error": "No frame available"} | |
| else: | |
| logger.error("Camera worker not available") | |
| return {"error": "Camera worker not available"} | |
| # Use vision manager for processing if available | |
| if deps.vision_manager is not None: | |
| vision_result = await asyncio.to_thread( | |
| deps.vision_manager.processor.process_image, frame, image_query, | |
| ) | |
| if isinstance(vision_result, dict) and "error" in vision_result: | |
| return vision_result | |
| return ( | |
| {"image_description": vision_result} | |
| if isinstance(vision_result, str) | |
| else {"error": "vision returned non-string"} | |
| ) | |
| # Encode image directly to JPEG bytes without writing to file | |
| success, buffer = cv2.imencode('.jpg', frame) | |
| if not success: | |
| raise RuntimeError("Failed to encode frame as JPEG") | |
| b64_encoded = base64.b64encode(buffer.tobytes()).decode("utf-8") | |
| return {"b64_im": b64_encoded} | |