File size: 6,337 Bytes
0a735c8
 
 
 
 
 
 
16d64f1
 
 
 
 
 
 
0a735c8
16d64f1
 
0a735c8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
from websocket_storage import WebSocketGPUStorage
from gpu_chip import GPUChip
from typing import Dict, Any, List, Optional
import time
import numpy as np

class MultiGPUSystem:
    def __init__(self, num_gpus: int = 8, storage=None):
        self.storage = storage
        if self.storage is None:
            from websocket_storage import WebSocketGPUStorage
            self.storage = WebSocketGPUStorage()
            if not self.storage.wait_for_connection():
                raise RuntimeError("Could not connect to GPU storage server")
            
        # Initialize GPUs with shared storage
        self.gpus = [GPUChip(i, storage=self.storage) for i in range(num_gpus)]
        
        # Initialize system state
        self.system_state = {
            "num_gpus": num_gpus,
            "nvlink_state": {
                "connections": self._init_nvlink_topology(num_gpus),
                "active_transfers": {}
            },
            "global_memory_state": {
                "total_vram_gb": num_gpus * 24,  # Assuming 24GB per GPU
                "allocated_vram_gb": 0
            },
            "power_state": {
                "total_watts": 0,
                "gpu_watts": [0] * num_gpus
            }
        }
        self.store_system_state()
        
    def _init_nvlink_topology(self, num_gpus: int) -> Dict[str, Any]:
        """Initialize NVLink connection topology"""
        topology = {}
        for i in range(num_gpus):
            for j in range(i + 1, num_gpus):
                link_id = f"nvlink_{i}_{j}"
                topology[link_id] = {
                    "gpu_a": i,
                    "gpu_b": j,
                    "bandwidth_gbps": 300,  # NVLink 4.0 speed
                    "active": True
                }
        return topology
        
    def store_system_state(self):
        """Store system state in WebSocket storage"""
        self.storage.store_state("multi_gpu_system", "state", self.system_state)
        
    def allocate_distributed(self, size: int) -> List[str]:
        """Allocate memory across multiple GPUs"""
        size_per_gpu = size // len(self.gpus)
        block_ids = []
        
        for gpu in self.gpus:
            block_id = gpu.allocate_memory(size_per_gpu)
            block_ids.append(block_id)
            
        self.system_state["global_memory_state"]["allocated_vram_gb"] += size / (1024 * 1024 * 1024)
        self.store_system_state()
        
        return block_ids
        
    def transfer_between_gpus(self, src_gpu: int, dst_gpu: int, data_id: str):
        """Transfer data between GPUs using NVLink"""
        if not (0 <= src_gpu < len(self.gpus) and 0 <= dst_gpu < len(self.gpus)):
            raise ValueError("Invalid GPU indices")
            
        link_id = f"nvlink_{min(src_gpu, dst_gpu)}_{max(src_gpu, dst_gpu)}"
        if link_id not in self.system_state["nvlink_state"]["connections"]:
            raise ValueError("No NVLink connection between specified GPUs")
            
        # Start transfer
        transfer_id = f"transfer_{time.time_ns()}"
        self.system_state["nvlink_state"]["active_transfers"][transfer_id] = {
            "source_gpu": src_gpu,
            "dest_gpu": dst_gpu,
            "data_id": data_id,
            "start_time": time.time_ns()
        }
        self.store_system_state()
        
        # Get data from source GPU
        data = self.storage.load_tensor(data_id)
        if data is not None:
            # Store in destination GPU
            new_block_id = self.gpus[dst_gpu].allocate_memory(len(data))
            self.storage.store_tensor(new_block_id, data)
            
            # Update transfer state
            self.system_state["nvlink_state"]["active_transfers"][transfer_id]["completed"] = True
            self.system_state["nvlink_state"]["active_transfers"][transfer_id]["end_time"] = time.time_ns()
            self.store_system_state()
            
            return new_block_id
        return None
        
    def schedule_distributed_compute(self, compute_graph: Dict[str, Any]):
        """Schedule computation across multiple GPUs"""
        # Simple round-robin scheduling for now
        scheduled_ops = []
        for i, op in enumerate(compute_graph["operations"]):
            gpu_index = i % len(self.gpus)
            warp_id = self.gpus[gpu_index].schedule_compute(
                sm_index=i % self.gpus[gpu_index].chip_state["num_sms"],
                warp_state=op
            )
            scheduled_ops.append({
                "op": op,
                "gpu": gpu_index,
                "warp_id": warp_id
            })
            
        # Store scheduling decision
        self.storage.store_state(
            "compute_schedule",
            f"schedule_{time.time_ns()}",
            {"operations": scheduled_ops}
        )
        
        return scheduled_ops
        
    def synchronize(self):
        """Synchronize all GPUs"""
        sync_point = f"sync_{time.time_ns()}"
        for i, gpu in enumerate(self.gpus):
            gpu.chip_state["sync_point"] = sync_point
            gpu.store_chip_state()
            
        self.system_state["last_sync"] = sync_point
        self.store_system_state()
        
    def get_system_stats(self) -> Dict[str, Any]:
        """Get comprehensive system statistics"""
        stats = {
            "num_gpus": len(self.gpus),
            "total_vram_gb": self.system_state["global_memory_state"]["total_vram_gb"],
            "allocated_vram_gb": self.system_state["global_memory_state"]["allocated_vram_gb"],
            "gpus": [gpu.get_stats() for gpu in self.gpus],
            "nvlink": {
                "active_connections": sum(1 for conn in self.system_state["nvlink_state"]["connections"].values() if conn["active"]),
                "active_transfers": len(self.system_state["nvlink_state"]["active_transfers"])
            },
            "power": {
                "total_watts": sum(gpu.chip_state["power_state"]["total_watts"] for gpu in self.gpus),
                "per_gpu_watts": [gpu.chip_state["power_state"]["total_watts"] for gpu in self.gpus]
            }
        }
        return stats