File size: 4,802 Bytes
857c2e9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#!/usr/bin/env python3
"""
VLAC Integration Test

Simple test to verify VLAC integration with SimpleVLA-RL rollout works correctly.
This test creates a minimal rollout configuration and runs a few steps.
"""

import os
import sys
import time
import subprocess
import requests
from pathlib import Path

# Add project root to path
sys.path.insert(0, str(Path(__file__).parent))

from verl.utils.vlac_client import VLACClient
import numpy as np


class MockConfig:
    """Mock configuration for testing VLAC integration."""
    def __init__(self, use_vlac=True, vlac_service_url="http://localhost:8111"):
        self.use_vlac = use_vlac
        self.vlac_service_url = vlac_service_url
        self.val_only = False  # Training mode
        self.task_suite_name = "libero_10"
        self.max_steps = {"libero_10": 512}


def test_vlac_client():
    """Test VLAC client functionality."""
    print("Testing VLAC client...")
    
    try:
        # Initialize client
        client = VLACClient(service_url="http://localhost:8111", timeout=30)
        
        # Create test frames (mock robot camera images)
        height, width = 256, 256
        first_frame = np.random.randint(0, 255, (height, width, 3), dtype=np.uint8)
        prev_frame = np.random.randint(0, 255, (height, width, 3), dtype=np.uint8)
        curr_frame = np.random.randint(0, 255, (height, width, 3), dtype=np.uint8)
        
        # Test done detection
        print("Testing done detection...")
        done, prob = client.check_done(
            task="Pick up the red bowl and place it in the white box",
            first_frame=first_frame,
            prev_frame=prev_frame,
            curr_frame=curr_frame
        )
        print(f"Done check result: done={done}, prob={prob:.3f}")
        
        # Test trajectory values
        print("Testing trajectory value computation...")
        frames = [first_frame, prev_frame, curr_frame]
        value_list, critic_list = client.compute_trajectory_values(
            task="Pick up the red bowl and place it in the white box",
            frames=frames,
            skip=1
        )
        print(f"Trajectory values: {len(value_list)} values, {len(critic_list)} critics")
        print(f"Final value: {value_list[-1]:.3f}")
        
        # Test pairwise critic
        print("Testing pairwise critic...")
        critic_score = client.pairwise_critic(
            task="Pick up the red bowl and place it in the white box",
            image_a=prev_frame,
            image_b=curr_frame
        )
        print(f"Pairwise critic score: {critic_score:.3f}")
        
        print("βœ“ VLAC client tests passed!")
        return True
        
    except Exception as e:
        print(f"βœ— VLAC client test failed: {e}")
        return False


def test_config_integration():
    """Test configuration integration."""
    print("Testing configuration integration...")
    
    # Test training config
    train_config = MockConfig(use_vlac=True)
    assert train_config.use_vlac == True
    assert train_config.val_only == False
    print("βœ“ Training configuration correct")
    
    # Test evaluation config  
    eval_config = MockConfig(use_vlac=False)
    eval_config.val_only = True
    assert eval_config.use_vlac == False
    assert eval_config.val_only == True
    print("βœ“ Evaluation configuration correct")
    
    return True


def check_service_health():
    """Check if VLAC service is running and healthy."""
    try:
        response = requests.post(
            "http://localhost:8111/healthcheck",
            timeout=10
        )
        if response.status_code == 200:
            result = response.json()
            print(f"βœ“ VLAC service healthy: {result}")
            return True
        else:
            print(f"βœ— VLAC service unhealthy: {response.status_code}")
            return False
    except Exception as e:
        print(f"βœ— VLAC service not accessible: {e}")
        return False


def main():
    print("VLAC Integration Test Suite")
    print("=" * 50)
    
    # Check if VLAC service is running
    print("1. Checking VLAC service health...")
    if not check_service_health():
        print("\nPlease start VLAC service first:")
        print("python vlac_service.py --port 8111 --gpu-ids 0")
        return 1
    
    # Test configuration
    print("\n2. Testing configuration integration...")
    if not test_config_integration():
        return 1
    
    # Test VLAC client
    print("\n3. Testing VLAC client...")
    if not test_vlac_client():
        return 1
    
    print("\n" + "=" * 50)
    print("πŸŽ‰ All VLAC integration tests passed!")
    print("\nYou can now run training with VLAC:")
    print("bash examples/run_openvla_oft_rl_vlac.sh")
    
    return 0


if __name__ == "__main__":
    exit(main())