Spaces:
Running
Running
| """Robot hardware control tools.""" | |
| from pipecat.adapters.schemas.function_schema import FunctionSchema | |
| from pipecat.services.llm_service import FunctionCallParams | |
| from loguru import logger | |
| # Displacement movements that require explicit user request | |
| DISPLACEMENT_MOVEMENTS = { | |
| "step_forward", "walk_forward", "step_backward", "walk_backward", | |
| "turn_left", "turn_right", "turn_left_slow", "turn_right_slow" | |
| } | |
| def classify_movements(movements: list[str]) -> tuple[list[str], list[str]]: | |
| """Classify movements into displacement and safe categories.""" | |
| displacement = [m for m in movements if m in DISPLACEMENT_MOVEMENTS] | |
| safe = [m for m in movements if m not in DISPLACEMENT_MOVEMENTS] | |
| return displacement, safe | |
| async def execute_movement(params: FunctionCallParams): | |
| """Execute physical movement on TARS hardware.""" | |
| movements = params.arguments.get("movements", []) | |
| if not movements: | |
| await params.result_callback("No movements specified.") | |
| return | |
| # Classify and guard | |
| displacement, safe = classify_movements(movements) | |
| if displacement: | |
| logger.warning(f"Blocked displacement: {displacement}") | |
| await params.result_callback( | |
| f"Cannot execute displacement ({', '.join(displacement)}) " | |
| "unless user explicitly requests. Use do_gesture() instead." | |
| ) | |
| return | |
| # Execute safe movements | |
| if not safe: | |
| await params.result_callback("No valid movements.") | |
| return | |
| try: | |
| from services import tars_robot | |
| result = await tars_robot.execute_movement(safe) | |
| await params.result_callback(result) | |
| except Exception as e: | |
| logger.error(f"Movement execution error: {e}", exc_info=True) | |
| await params.result_callback(f"Error executing movement: {str(e)}") | |
| async def capture_camera_view(params: FunctionCallParams): | |
| """Capture image from RPi camera and analyze with vision model.""" | |
| question = params.arguments.get("question", "What do you see?") | |
| try: | |
| from services import tars_robot | |
| import base64 | |
| from pipecat.frames.frames import VisionImageRawFrame | |
| from pipecat.processors.frame_processor import FrameDirection | |
| logger.info(f"Capturing camera view for question: {question}") | |
| result = await tars_robot.capture_camera_view() | |
| if result.get("status") == "error": | |
| error = result.get("error", "unknown error") | |
| logger.warning(f"Camera capture failed: {error}") | |
| await params.result_callback(f"Unable to capture camera image: {error}") | |
| return | |
| # Get base64 image | |
| img_base64 = result.get("image") | |
| if not img_base64: | |
| await params.result_callback("Camera returned no image data.") | |
| return | |
| # Decode base64 to bytes | |
| img_bytes = base64.b64decode(img_base64) | |
| # Send vision frame for analysis | |
| vision_frame = VisionImageRawFrame( | |
| image=img_bytes, | |
| size=(result.get("width", 640), result.get("height", 480)), | |
| format=result.get("format", "jpeg"), | |
| text=question | |
| ) | |
| await params.llm.push_frame(vision_frame, FrameDirection.UPSTREAM) | |
| logger.info(f"Camera image sent for vision analysis: {result.get('width')}x{result.get('height')}") | |
| await params.result_callback("Processing camera image...") | |
| except Exception as e: | |
| logger.error(f"Camera capture error: {e}", exc_info=True) | |
| await params.result_callback(f"Error capturing camera view: {str(e)}") | |
| def create_movement_schema() -> FunctionSchema: | |
| """Create the execute_movement function schema.""" | |
| return FunctionSchema( | |
| name="execute_movement", | |
| description=( | |
| "Execute DISPLACEMENT movements on TARS hardware. " | |
| "IMPORTANT: Use ONLY when user explicitly requests to move TARS' position - " | |
| "walking, turning, stepping forward/backward. " | |
| "For gestures (wave, bow, tilt), use do_gesture() instead. " | |
| "Available displacement movements: " | |
| "step_forward, walk_forward, step_backward, walk_backward, " | |
| "turn_left, turn_right, turn_left_slow, turn_right_slow. " | |
| "Examples: User says 'walk forward' → ['walk_forward'], " | |
| "User says 'turn around' → ['turn_left', 'turn_left']. " | |
| "Do NOT use for gestures or expressions." | |
| ), | |
| properties={ | |
| "movements": { | |
| "type": "array", | |
| "items": {"type": "string"}, | |
| "description": "List of displacement movements to execute in sequence", | |
| "minItems": 1 | |
| } | |
| }, | |
| required=["movements"], | |
| ) | |
| def create_camera_capture_schema() -> FunctionSchema: | |
| """Create the capture_camera_view function schema.""" | |
| return FunctionSchema( | |
| name="capture_camera_view", | |
| description=( | |
| "Capture an image from TARS' camera on the Raspberry Pi and analyze what's visible. " | |
| "Use this when the user asks what TARS can see from its own perspective/camera, " | |
| "such as 'What can you see from your camera?', 'Look around', 'What's in front of you?'. " | |
| "This is DIFFERENT from fetch_user_image which captures from the user's camera during a video call. " | |
| "ONLY call this for questions about TARS' physical camera view, not the user's camera feed." | |
| ), | |
| properties={ | |
| "question": { | |
| "type": "string", | |
| "description": "The specific question about what TARS should look for in its camera view", | |
| "default": "What do you see?" | |
| } | |
| }, | |
| required=[], | |
| ) | |