Spaces:
Build error
Build error
File size: 2,462 Bytes
b82aa95 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 | import base64
import asyncio
import logging
from typing import Any, Dict
import cv2
from reachy_mini_conversation_app.tools.core_tools import Tool, ToolDependencies
logger = logging.getLogger(__name__)
class Camera(Tool):
"""Take a picture with the camera and ask a question about it."""
name = "camera"
description = "Take a picture with the camera and ask a question about it."
parameters_schema = {
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "The question to ask about the picture",
},
},
"required": ["question"],
}
async def __call__(self, deps: ToolDependencies, **kwargs: Any) -> Dict[str, Any]:
"""Take a picture with the camera and ask a question about it."""
image_query = (kwargs.get("question") or "").strip()
if not image_query:
logger.warning("camera: empty question")
return {"error": "question must be a non-empty string"}
logger.info("Tool call: camera question=%s", image_query[:120])
# Get frame from camera worker buffer (like main_works.py)
if deps.camera_worker is not None:
frame = deps.camera_worker.get_latest_frame()
if frame is None:
logger.error("No frame available from camera worker")
return {"error": "No frame available"}
else:
logger.error("Camera worker not available")
return {"error": "Camera worker not available"}
# Use vision manager for processing if available
if deps.vision_manager is not None:
vision_result = await asyncio.to_thread(
deps.vision_manager.processor.process_image, frame, image_query,
)
if isinstance(vision_result, dict) and "error" in vision_result:
return vision_result
return (
{"image_description": vision_result}
if isinstance(vision_result, str)
else {"error": "vision returned non-string"}
)
# Encode image directly to JPEG bytes without writing to file
success, buffer = cv2.imencode('.jpg', frame)
if not success:
raise RuntimeError("Failed to encode frame as JPEG")
b64_encoded = base64.b64encode(buffer.tobytes()).decode("utf-8")
return {"b64_im": b64_encoded}
|