File size: 27,612 Bytes
cc303f4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
---
sidebar_position: 4
---

# Chapter 3: Integrating Vision, Language, and Action

## Learning Objectives

- Understand how to integrate vision, language, and action systems in a unified architecture
- Implement multimodal perception for humanoid robots
- Create systems that can interpret visual information using language models
- Implement action execution based on multimodal understanding
- Build a complete pipeline from perception to action

## Introduction to Vision-Language-Action (VLA) Systems

Vision-Language-Action (VLA) systems represent the integration of perception (vision), cognition (language), and execution (action) in a unified architecture. This integration is fundamental to creating humanoid robots that can understand natural language commands and execute them in real-world environments.

### The VLA Pipeline

The Vision-Language-Action pipeline typically follows this flow:

```
[Visual Perception] → [Language Interpretation] → [Action Planning] → [Action Execution] → [Feedback Loop]
```

Each component builds upon the previous one, creating a seamless system from sensing to action.

## Multimodal Perception

Multimodal perception combines multiple sensory inputs to create a comprehensive understanding of the environment.

### Visual-Textual Integration

```python
import torch
import clip  # CLIP model for vision-language understanding
from transformers import BlipProcessor, BlipForConditionalGeneration
from PIL import Image

class MultimodalPerceptor:
    def __init__(self):
        # Load pre-trained models
        self.clip_model, self.clip_preprocess = clip.load("ViT-B/32")
        self.blip_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
        self.blip_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")
        
    def generate_caption(self, image):
        """Generate a textual description of what's in the image"""
        inputs = self.blip_processor(image, return_tensors="pt")
        out = self.blip_model.generate(**inputs)
        caption = self.blip_processor.decode(out[0], skip_special_tokens=True)
        return caption
    
    def classify_objects(self, image, object_list):
        """Classify objects in an image using text descriptions"""
        # Process image for CLIP
        image_input = self.clip_preprocess(image).unsqueeze(0)
        
        # Tokenize text descriptions
        text_inputs = clip.tokenize(object_list)
        
        # Get similarity scores
        with torch.no_grad():
            logits_per_image, logits_per_text = self.clip_model(image_input, text_inputs)
            probs = logits_per_image.softmax(dim=-1).cpu().numpy()
        
        # Return object with highest probability
        best_match_idx = probs[0].argmax()
        return object_list[best_match_idx], float(probs[0][best_match_idx])
    
    def find_object_by_description(self, image, description):
        """Find objects that match a textual description"""
        # Process image and description
        image_input = self.clip_preprocess(image).unsqueeze(0)
        text_input = clip.tokenize([description])
        
        with torch.no_grad():
            logits_per_image, logits_per_text = self.clip_model(image_input, text_input)
            prob = logits_per_image.softmax(dim=-1).cpu().numpy()[0][0]
        
        return float(prob)
```

### Integration with ROS 2

```python
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from std_msgs.msg import String
from cv_bridge import CvBridge
from PIL import Image as PILImage
import io

class VLAIntegrationNode(Node):
    def __init__(self):
        super().__init__('vla_integration_node')
        
        # Setup publishers and subscribers
        self.image_subscriber = self.create_subscription(
            Image,
            '/camera/image_raw',
            self.image_callback,
            10
        )
        
        self.command_subscriber = self.create_subscription(
            String,
            '/high_level_command',
            self.command_callback,
            10
        )
        
        self.action_publisher = self.create_publisher(
            String,  # In practice, this might be a custom action message
            '/robot_actions',
            10
        )
        
        self.feedback_publisher = self.create_publisher(
            String,
            '/vla_feedback',
            10
        )
        
        # Initialize perception components
        self.perceptor = MultimodalPerceptor()
        self.bridge = CvBridge()
        
        # Current state
        self.current_image = None
        self.pending_command = None
        
        self.get_logger().info('VLA Integration Node initialized')
    
    def image_callback(self, msg):
        """Process incoming camera images"""
        try:
            # Convert ROS Image to PIL Image
            cv_image = self.bridge.imgmsg_to_cv2(msg, "bgr8")
            pil_image = PILImage.fromarray(cv_image)
            
            # Store for later processing
            self.current_image = pil_image
            
            # If we have a pending command, process both together
            if self.pending_command:
                self.process_command_with_image(self.pending_command, pil_image)
                self.pending_command = None
                
        except Exception as e:
            self.get_logger().error(f'Error processing image: {str(e)}')
    
    def command_callback(self, msg):
        """Process incoming high-level commands"""
        command = msg.data
        self.get_logger().info(f'Received command: {command}')
        
        # If we have an image, process immediately; otherwise, store the command
        if self.current_image:
            self.process_command_with_image(command, self.current_image)
        else:
            self.pending_command = command
            self.get_logger().info('Command stored, waiting for image')
    
    def process_command_with_image(self, command, image):
        """Process a command with the current image"""
        self.get_logger().info(f'Processing command "{command}" with image')
        
        # Use multimodal perception to understand the scene
        caption = self.perceptor.generate_caption(image)
        self.get_logger().info(f'Image caption: {caption}')
        
        # Plan actions based on command and scene understanding
        actions = self.plan_vla_actions(command, caption, image)
        
        # Execute or publish planned actions
        for action in actions:
            self.publish_action(action)
        
        # Provide feedback
        feedback_msg = String()
        feedback_msg.data = f'Planned {len(actions)} actions for command: {command}'
        self.feedback_publisher.publish(feedback_msg)
    
    def plan_vla_actions(self, command, caption, image):
        """Plan actions based on command, caption, and image"""
        # This is a simplified example - in practice, this would use more sophisticated reasoning
        
        actions = []
        
        # Example: If command involves finding an object, locate it in the image
        if "find" in command.lower() or "locate" in command.lower():
            # Extract object from command (simplified)
            import re
            words = command.lower().split()
            potential_objects = [w for w in words if w in ["bottle", "cup", "box", "chair", "table"]]
            
            if potential_objects:
                target_object = potential_objects[0]
                
                # Check if object is in the image
                object_prob = self.perceptor.find_object_by_description(image, f"an image of a {target_object}")
                
                if object_prob > 0.5:  # Threshold for "object detected"
                    # Plan navigation to object
                    actions.append(f"navigate_to_object({target_object})")
                    actions.append(f"approach_object({target_object})")
                else:
                    # Object not in view, may need to move
                    actions.append(f"search_for_object({target_object})")
        
        # Example: If command involves manipulation
        if "pick up" in command.lower() or "grasp" in command.lower():
            actions.append("plan_grasp_approach()")
            actions.append("execute_grasp()")
        
        # Default action if no specific command pattern matches
        if not actions:
            actions.append(f"speak_text(Received command: {command})")
        
        return actions
    
    def publish_action(self, action):
        """Publish an action for execution"""
        action_msg = String()
        action_msg.data = action
        self.action_publisher.publish(action_msg)
        self.get_logger().info(f'Published action: {action}')

def main(args=None):
    rclpy.init(args=args)
    node = VLAIntegrationNode()
    
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()
```

## Object Detection and Scene Understanding

For humanoid robots, understanding the 3D environment is crucial:

### 3D Object Detection

```python
import numpy as np
import open3d as o3d

class SceneUnderstanding:
    def __init__(self):
        # Initialize 3D perception models (simplified)
        pass
    
    def detect_objects_3d(self, point_cloud):
        """Detect and segment objects in 3D point cloud"""
        # Convert point cloud to Open3D format
        pcd = o3d.geometry.PointCloud()
        pcd.points = o3d.utility.Vector3dVector(point_cloud[:, :3])
        
        # Segment the ground plane
        plane_model, inliers = pcd.segment_plane(
            distance_threshold=0.01,
            ransac_n=3,
            num_iterations=1000
        )
        
        # Extract the rest of the objects
        objects_cloud = pcd.select_by_index(inliers, invert=True)
        
        # Cluster the remaining points into objects
        labels = np.array(objects_cloud.cluster_dbscan(eps=0.02, min_points=10))
        
        segmented_objects = []
        for i in range(labels.max() + 1):
            object_points = np.asarray(objects_cloud.select_by_index(np.where(labels == i)[0]))
            if len(object_points) > 10:  # At least 10 points to be considered an object
                segmented_objects.append(object_points)
        
        return segmented_objects
    
    def estimate_object_properties(self, object_points):
        """Estimate properties of a segmented object"""
        pcd = o3d.geometry.PointCloud()
        pcd.points = o3d.utility.Vector3dVector(object_points)
        
        # Compute bounding box
        aabb = pcd.get_axis_aligned_bounding_box()
        obb = pcd.get_oriented_bounding_box()

        # Estimate center and size
        center = np.array(obb.get_center())
        size = np.array(obb.get_extent())

        return {
            "center": center,
            "size": size,
            "bbox": obb,
            "point_count": len(object_points)
        }
```

## Language-Guided Action Planning

The integration of language understanding with action planning allows robots to execute complex, natural language commands:

```python
class LanguageGuidedPlanner:
    def __init__(self, robot_capabilities):
        self.capabilities = robot_capabilities
        self.location_map = {
            "kitchen": [1.0, 2.0, 0.0],
            "living_room": [0.0, 0.0, 0.0],
            "bedroom": [-2.0, 1.0, 0.0],
            "dining_room": [-1.0, -1.0, 0.0]
        }
        self.object_map = {
            "drink": ["water_bottle", "soda_can", "juice_box"],
            "snack": ["cookies", "apple", "chips"],
            "tool": ["screwdriver", "wrench", "hammer"]
        }
    
    def parse_and_plan(self, command, current_context):
        """Parse natural language command and generate executable plan"""
        import openai
        import json
        import re
        
        # Create a prompt for the LLM
        prompt = f"""
        You are a language-guided action planner for a humanoid robot. Convert the human command into a sequence of executable actions.
        
        Current context: {current_context}
        Robot capabilities: {self.capabilities}
        
        Human command: "{command}"
        
        Provide the plan as a JSON array with these action types:
        - navigate_to_location: Move to a named location
        - find_object: Search for a specific object
        - pick_up_object: Grasp an object
        - place_object: Place an object at a location
        - speak_text: Say something to the human
        - wave_gesture: Perform a waving gesture
        - wait: Wait for a specific event
        
        Each action should include any required parameters.
        
        Example:
        [
            {{"action": "navigate_to_location", "parameters": {{"location": "kitchen"}}}},
            {{"action": "find_object", "parameters": {{"object": "water_bottle"}}}},
            {{"action": "pick_up_object", "parameters": {{"object": "water_bottle"}}}},
            {{"action": "navigate_to_location", "parameters": {{"location": "living_room"}}}},
            {{"action": "place_object", "parameters": {{"object": "water_bottle", "location": "table"}}}},
            {{"action": "speak_text", "parameters": {{"text": "I have placed the bottle on the table"}}}}
        ]
        
        Plan:
        """
        
        try:
            # In a real implementation, this would call the OpenAI API
            # For this example, we'll simulate the response
            return self.simulate_llm_response(command, current_context)
        except Exception as e:
            self.get_logger().error(f'Error in LLM planning: {str(e)}')
            return [{"action": "speak_text", "parameters": {"text": f"I couldn't understand the command: {command}"}}]
    
    def simulate_llm_response(self, command, context):
        """Simulate the LLM response for demonstration purposes"""
        # This is a simplified simulation - a real implementation would call an LLM API
        
        command_lower = command.lower()
        
        if "bring" in command_lower or "get" in command_lower:
            # Find object type in command
            obj_type = None
            for obj_class, obj_list in self.object_map.items():
                for obj in obj_list:
                    if obj in command_lower:
                        obj_type = obj
                        break
                if obj_type:
                    break
            
            if obj_type:
                # Extract destination if mentioned
                destination = "living_room"  # default
                for loc_name in self.location_map.keys():
                    if loc_name in command_lower:
                        destination = loc_name
                        break
                
                return [
                    {"action": "navigate_to_location", "parameters": {"location": "kitchen"}},
                    {"action": "find_object", "parameters": {"object": obj_type}},
                    {"action": "pick_up_object", "parameters": {"object": obj_type}},
                    {"action": "navigate_to_location", "parameters": {"location": destination}},
                    {"action": "place_object", "parameters": {"object": obj_type, "location": "table"}},
                    {"action": "speak_text", "parameters": {"text": f"I have brought the {obj_type} to the {destination}"}}
                ]
        
        elif "go to" in command_lower or "navigate to" in command_lower:
            # Extract destination
            destination = "living_room"  # default
            for loc_name in self.location_map.keys():
                if loc_name in command_lower:
                    destination = loc_name
                    break
            
            return [
                {"action": "navigate_to_location", "parameters": {"location": destination}},
                {"action": "speak_text", "parameters": {"text": f"I have reached the {destination}"}}
            ]
        
        else:
            return [
                {"action": "speak_text", "parameters": {"text": f"I'm not sure how to execute: {command}"}}
            ]
```

## Action Execution and Control

Once plans are generated, they need to be executed by the robot's control systems:

```python
class ActionExecutor:
    def __init__(self):
        self.current_task = None
        self.is_executing = False
        
    def execute_plan(self, plan):
        """Execute a sequence of actions"""
        for i, action in enumerate(plan):
            self.get_logger().info(f'Executing action {i+1}/{len(plan)}: {action["action"]}')
            success = self.execute_single_action(action)
            
            if not success:
                self.get_logger().error(f'Action failed: {action}')
                return False
        
        return True
    
    def execute_single_action(self, action):
        """Execute a single action"""
        action_type = action["action"]
        params = action.get("parameters", {})
        
        try:
            if action_type == "navigate_to_location":
                return self.execute_navigation(params["location"])
            elif action_type == "find_object":
                return self.execute_object_search(params["object"])
            elif action_type == "pick_up_object":
                return self.execute_grasping(params["object"])
            elif action_type == "place_object":
                return self.execute_placement(params["object"], params["location"])
            elif action_type == "speak_text":
                return self.execute_speech(params["text"])
            elif action_type == "wave_gesture":
                return self.execute_wave()
            else:
                self.get_logger().error(f'Unknown action type: {action_type}')
                return False
        
        except Exception as e:
            self.get_logger().error(f'Error executing action {action_type}: {str(e)}')
            return False
    
    def execute_navigation(self, location):
        """Execute navigation to a specific location"""
        # In a real implementation, this would interface with Nav2
        self.get_logger().info(f'Navigating to {location}')
        # Simulate navigation
        import time
        time.sleep(1)  # Simulate time for navigation
        return True
    
    def execute_object_search(self, obj_name):
        """Execute search for a specific object"""
        self.get_logger().info(f'Searching for {obj_name}')
        # In a real implementation, this would activate perception systems
        # For simulation, assume object is found after a short delay
        import time
        time.sleep(0.5)
        return True
    
    def execute_grasping(self, obj_name):
        """Execute grasping of an object"""
        self.get_logger().info(f'Attempting to grasp {obj_name}')
        # In a real implementation, this would interface with manipulator control
        import time
        time.sleep(0.5)
        return True
    
    def execute_placement(self, obj_name, location):
        """Execute placement of an object at a location"""
        self.get_logger().info(f'Placing {obj_name} at {location}')
        import time
        time.sleep(0.5)
        return True
    
    def execute_speech(self, text):
        """Execute text-to-speech"""
        self.get_logger().info(f'Speaking: {text}')
        # In a real implementation, this would use a TTS system
        return True
    
    def execute_wave(self):
        """Execute waving gesture"""
        self.get_logger().info('Executing waving gesture')
        # In a real implementation, this would move the robot's arm
        import time
        time.sleep(0.5)
        return True
```

## Complete VLA System Integration

Now, let's put all components together into a complete system:

```python
class CompleteVLASystem(Node):
    def __init__(self):
        super().__init__('vla_system_node')
        
        # Setup publishers and subscribers
        self.image_subscriber = self.create_subscription(
            Image,
            '/camera/image_raw',
            self.image_callback,
            10
        )
        
        self.command_subscriber = self.create_subscription(
            String,
            '/high_level_command',
            self.command_callback,
            10
        )
        
        self.feedback_publisher = self.create_publisher(
            String,
            '/vla_system_feedback',
            10
        )
        
        # Initialize components
        self.perceptor = MultimodalPerceptor()
        self.scene_understanding = SceneUnderstanding()
        self.language_planner = LanguageGuidedPlanner([
            "navigate_to_location", "find_object", "pick_up_object", 
            "place_object", "speak_text", "wave_gesture"
        ])
        self.action_executor = ActionExecutor()
        self.bridge = CvBridge()
        
        # State management
        self.current_image = None
        self.pending_command = None
        self.current_context = {
            "robot_location": "unknown",
            "carried_object": None,
            "last_action": "none",
            "timestamp": self.get_clock().now().to_msg()
        }
        
        self.get_logger().info('Complete VLA System initialized')
    
    def image_callback(self, msg):
        """Handle incoming images"""
        try:
            # Convert ROS Image to PIL Image
            cv_image = self.bridge.imgmsg_to_cv2(msg, "bgr8")
            pil_image = PILImage.fromarray(cv_image)
            
            # Update current image
            self.current_image = pil_image
            
            # Process any pending command with the new image
            if self.pending_command:
                self.process_command_with_context(self.pending_command)
                self.pending_command = None
                
        except Exception as e:
            self.get_logger().error(f'Error in image processing: {str(e)}')
    
    def command_callback(self, msg):
        """Handle incoming high-level commands"""
        command = msg.data
        self.get_logger().info(f'Received high-level command: {command}')
        
        if self.current_image:
            # Process immediately if we have an image
            self.process_command_with_context(command)
        else:
            # Store command for when we get an image
            self.pending_command = command
            self.get_logger().info('Command stored, waiting for image')
    
    def process_command_with_context(self, command):
        """Process command with current context"""
        self.get_logger().info(f'Processing command with context: {command}')
        
        try:
            # Plan actions using language guidance
            plan = self.language_planner.parse_and_plan(command, self.current_context)
            self.get_logger().info(f'Generated plan with {len(plan)} actions')
            
            # Execute the plan
            success = self.action_executor.execute_plan(plan)
            
            # Update context based on execution
            if success:
                self.update_context_after_execution(plan)
                feedback = f'Successfully executed command: {command}'
            else:
                feedback = f'Failed to execute command: {command}'
            
            # Publish feedback
            feedback_msg = String()
            feedback_msg.data = feedback
            self.feedback_publisher.publish(feedback_msg)
            
        except Exception as e:
            error_msg = f'Error processing command: {str(e)}'
            self.get_logger().error(error_msg)
            feedback_msg = String()
            feedback_msg.data = error_msg
            self.feedback_publisher.publish(feedback_msg)
    
    def update_context_after_execution(self, plan):
        """Update the system context after plan execution"""
        if plan:
            # Update based on the last action
            last_action = plan[-1]
            self.current_context["last_action"] = last_action["action"]
            self.current_context["timestamp"] = self.get_clock().now().to_msg()
            
            # Update carried object if relevant
            if last_action["action"] == "pick_up_object":
                obj = last_action["parameters"]["object"]
                self.current_context["carried_object"] = obj
            elif last_action["action"] == "place_object":
                self.current_context["carried_object"] = None
    
    def get_logger(self):
        """Wrapper for node logger"""
        return self.get_logger()

def main(args=None):
    rclpy.init(args=args)
    vla_system = CompleteVLASystem()
    
    try:
        rclpy.spin(vla_system)
    except KeyboardInterrupt:
        pass
    finally:
        vla_system.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()
```

## Evaluation and Performance Metrics

### Quantitative Metrics

1. **Task Completion Rate**: Percentage of tasks successfully completed
2. **Planning Accuracy**: How well the plan matches the intended task
3. **Execution Time**: Time from command to task completion
4. **Perception Accuracy**: How accurately objects and scenes are understood

### Qualitative Metrics

1. **Natural Interaction**: How natural the human-robot interaction feels
2. **Robustness**: How well the system handles unexpected situations
3. **Adaptability**: How well the system adapts to new environments or tasks

## Challenges in VLA Integration

### Perception Challenges

1. **Visual Ambiguity**: Similar-looking objects can be confused
2. **Lighting Conditions**: Performance varies with lighting
3. **Occlusions**: Objects may be partially hidden

### Language Challenges

1. **Ambiguity**: Natural language commands can be ambiguous
2. **Context Dependence**: Commands depend heavily on context
3. **Error Propagation**: Misunderstanding a command affects the entire plan

### Action Challenges

1. **Precision**: Fine manipulation requires precise control
2. **Safety**: Actions must be safe for humans and environment
3. **Recovery**: Handling action failures gracefully

## Summary

The integration of vision, language, and action systems creates powerful humanoid robots that can understand and execute natural language commands in real-world environments. The key to success is proper coordination between perception, cognition, and execution modules, with robust error handling and context management. These systems represent the state-of-the-art in embodied AI and human-robot interaction.

## Exercises

1. Implement a simplified VLA system that can process basic commands
2. Add error handling to manage perception failures
3. Create a simulation demonstrating the complete VLA pipeline

## Next Steps

This chapter concludes the core modules of the Physical AI & Humanoid Robotics Course. The next chapter will bring together everything learned into the capstone project, where you'll implement a complete autonomous humanoid system that demonstrates all the concepts covered in the course. This will be the culmination of your learning journey, integrating ROS 2, simulation, Isaac, and Vision-Language-Action systems into a unified autonomous robot.